Skip to content

Commit 3c2a0bf

Browse files
committed
feat(redis sink): support input based on encoding type
1 parent 96bc594 commit 3c2a0bf

File tree

4 files changed

+201
-6
lines changed

4 files changed

+201
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
The redis sink now supports any input event type that the configured encoding supports. It previously only supported log events.
2+
3+
authors: ynachi

src/sinks/redis/config.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ impl SinkConfig for RedisSinkConfig {
146146
}
147147

148148
fn input(&self) -> Input {
149-
Input::new(self.encoding.config().input_type() & DataType::Log)
149+
Input::new(self.encoding.config().input_type())
150150
}
151151

152152
fn acknowledgements(&self) -> &AcknowledgementsConfig {

src/sinks/redis/integration_tests.rs

+167-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ use vector_lib::{
77
event::LogEvent,
88
};
99

10+
use crate::event::{
11+
BatchNotifier, BatchStatus, Event, Metric, MetricKind, MetricValue, TraceEvent,
12+
};
13+
1014
use super::config::{DataTypeConfig, ListOption, Method, RedisSinkConfig};
1115
use crate::{
1216
sinks::prelude::*,
@@ -15,7 +19,7 @@ use crate::{
1519
assert_data_volume_sink_compliance, assert_sink_compliance, DATA_VOLUME_SINK_TAGS,
1620
SINK_TAGS,
1721
},
18-
random_lines_with_stream, random_string, trace_init,
22+
map_event_batch_stream, random_lines_with_stream, random_string, trace_init,
1923
},
2024
};
2125

@@ -290,3 +294,165 @@ async fn redis_sink_channel_data_volume_tags() {
290294
}
291295
}
292296
}
297+
298+
#[tokio::test]
299+
async fn redis_sink_metrics() {
300+
trace_init();
301+
302+
let key = Template::try_from(format!("test-metrics-{}", random_string(10)))
303+
.expect("should not fail to create key template");
304+
debug!("Test key name: {}.", key);
305+
let num_events = 1000;
306+
debug!("Test events num: {}.", num_events);
307+
308+
let cnf = RedisSinkConfig {
309+
endpoint: redis_server(),
310+
key: key.clone(),
311+
encoding: JsonSerializerConfig::default().into(),
312+
data_type: DataTypeConfig::List,
313+
list_option: Some(ListOption {
314+
method: Method::RPush,
315+
}),
316+
batch: BatchConfig::default(),
317+
request: TowerRequestConfig {
318+
rate_limit_num: u64::MAX,
319+
..Default::default()
320+
},
321+
acknowledgements: Default::default(),
322+
};
323+
324+
// Create a mix of counter and gauge metrics
325+
let mut events: Vec<Event> = Vec::new();
326+
for i in 0..num_events {
327+
let metric = if i % 2 == 0 {
328+
// Counter metrics
329+
Metric::new(
330+
format!("counter_{}", i),
331+
MetricKind::Absolute,
332+
MetricValue::Counter { value: i as f64 },
333+
)
334+
} else {
335+
// Gauge metrics
336+
Metric::new(
337+
format!("gauge_{}", i),
338+
MetricKind::Absolute,
339+
MetricValue::Gauge { value: i as f64 },
340+
)
341+
};
342+
events.push(metric.into());
343+
}
344+
let input = stream::iter(events.clone().into_iter().map(Into::into));
345+
346+
// Publish events
347+
let cnf2 = cnf.clone();
348+
assert_sink_compliance(&SINK_TAGS, async move {
349+
let cx = SinkContext::default();
350+
let (sink, _healthcheck) = cnf2.build(cx).await.unwrap();
351+
sink.run(input).await
352+
})
353+
.await
354+
.expect("Running sink failed");
355+
356+
// Verify metrics were stored correctly
357+
let mut conn = cnf.build_client().await.unwrap();
358+
359+
let key_exists: bool = conn.exists(key.to_string()).await.unwrap();
360+
debug!("Test key: {} exists: {}.", key, key_exists);
361+
assert!(key_exists);
362+
363+
let llen: usize = conn.llen(key.clone().to_string()).await.unwrap();
364+
debug!("Test key: {} len: {}.", key, llen);
365+
assert_eq!(llen, num_events);
366+
367+
// Verify the content of each metric
368+
for i in 0..num_events {
369+
let original_event = events.get(i).unwrap().as_metric();
370+
let payload: (String, String) = conn.blpop(key.clone().to_string(), 2000.0).await.unwrap();
371+
let val = payload.1;
372+
373+
// Parse the JSON and verify key metric properties
374+
let json: serde_json::Value = serde_json::from_str(&val).unwrap();
375+
376+
if i % 2 == 0 {
377+
// Counter metrics
378+
assert_eq!(json["name"], format!("counter_{}", i));
379+
assert_eq!(json["kind"], "absolute");
380+
assert_eq!(json["counter"]["value"], i as f64);
381+
} else {
382+
// Gauge metrics
383+
assert_eq!(json["name"], format!("gauge_{}", i));
384+
assert_eq!(json["kind"], "absolute");
385+
assert_eq!(json["gauge"]["value"], i as f64);
386+
}
387+
388+
// Verify that the name matches what we expect
389+
assert_eq!(json["name"].as_str().unwrap(), original_event.name());
390+
}
391+
}
392+
393+
#[tokio::test]
394+
async fn redis_sink_traces() {
395+
use crate::test_util::components::{assert_sink_compliance, SINK_TAGS};
396+
397+
trace_init();
398+
399+
assert_sink_compliance(&SINK_TAGS, async {
400+
// Setup Redis sink config
401+
let key = Template::try_from(format!("test-traces-{}", random_string(10))).unwrap();
402+
let config = RedisSinkConfig {
403+
endpoint: redis_server(),
404+
key: key.clone(),
405+
encoding: JsonSerializerConfig::default().into(),
406+
data_type: DataTypeConfig::List,
407+
list_option: Some(ListOption {
408+
method: Method::RPush,
409+
}),
410+
batch: BatchConfig::default(),
411+
request: TowerRequestConfig::default(),
412+
acknowledgements: Default::default(),
413+
};
414+
415+
// Build the sink
416+
let cx = SinkContext::default();
417+
let (sink, _) = config.build(cx).await.unwrap();
418+
419+
// Create a trace event
420+
let mut trace = TraceEvent::default();
421+
trace.insert("name", "test_trace");
422+
trace.insert("service", "redis_test");
423+
424+
// Set up batch notification for checking delivery status
425+
let (batch, receiver) = BatchNotifier::new_with_receiver();
426+
let trace_with_batch = trace.with_batch_notifier(&batch);
427+
428+
// Create event stream with proper conversion to EventArray
429+
let events = vec![Event::Trace(trace_with_batch)];
430+
let stream = map_event_batch_stream(stream::iter(events), Some(batch));
431+
432+
// Run the sink with the stream
433+
sink.run(stream).await.unwrap();
434+
435+
// Check that events were delivered
436+
assert_eq!(receiver.await, BatchStatus::Delivered);
437+
438+
// Verify data in Redis
439+
let mut conn = redis::Client::open(redis_server())
440+
.unwrap()
441+
.get_async_connection()
442+
.await
443+
.unwrap();
444+
445+
let len: usize = conn.llen(key.to_string()).await.unwrap();
446+
assert_eq!(len, 1);
447+
448+
// Check content
449+
let payload: (String, String) = conn.blpop(key.to_string(), 2000.0).await.unwrap();
450+
let json_str = payload.1;
451+
452+
// Verify the trace content
453+
let json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
454+
assert_eq!(json["name"], "test_trace");
455+
assert_eq!(json["service"], "redis_test");
456+
})
457+
.await;
458+
}

src/sinks/redis/tests.rs

+30-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::collections::HashMap;
22

33
use vector_lib::codecs::{JsonSerializerConfig, TextSerializerConfig};
4-
use vector_lib::event::LogEvent;
4+
use vector_lib::event::{LogEvent, Metric, MetricKind, MetricValue};
55
use vector_lib::request_metadata::GroupedCountByteSize;
66

77
use super::{config::RedisSinkConfig, request_builder::encode_event};
@@ -16,7 +16,7 @@ fn generate_config() {
1616
}
1717

1818
#[test]
19-
fn redis_event_json() {
19+
fn redis_log_event_json() {
2020
let msg = "hello_world".to_owned();
2121
let mut byte_size = GroupedCountByteSize::new_untagged();
2222
let mut evt = LogEvent::from(msg.clone());
@@ -35,7 +35,7 @@ fn redis_event_json() {
3535
}
3636

3737
#[test]
38-
fn redis_event_text() {
38+
fn redis_log_event_text() {
3939
let msg = "hello_world".to_owned();
4040
let evt = LogEvent::from(msg.clone());
4141
let mut byte_size = GroupedCountByteSize::new_untagged();
@@ -52,7 +52,7 @@ fn redis_event_text() {
5252
}
5353

5454
#[test]
55-
fn redis_encode_event() {
55+
fn redis_log_encode_event() {
5656
let msg = "hello_world";
5757
let mut evt = LogEvent::from(msg);
5858
let mut byte_size = GroupedCountByteSize::new_untagged();
@@ -71,3 +71,29 @@ fn redis_encode_event() {
7171
let map: HashMap<String, String> = serde_json::from_slice(&result[..]).unwrap();
7272
assert!(!map.contains_key("key"));
7373
}
74+
75+
#[test]
76+
fn redis_metric_encode_event() {
77+
let mut byte_size = GroupedCountByteSize::new_untagged();
78+
let metric = Metric::new(
79+
"test_counter",
80+
MetricKind::Absolute,
81+
MetricValue::Counter { value: 42.0 },
82+
);
83+
84+
let result = encode_event(
85+
metric.into(),
86+
"metrics.counter".to_string(),
87+
&Default::default(),
88+
&mut Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
89+
&mut byte_size,
90+
)
91+
.unwrap()
92+
.value;
93+
94+
let json: serde_json::Value = serde_json::from_slice(&result).unwrap();
95+
96+
assert_eq!(json["name"], "test_counter");
97+
assert_eq!(json["kind"], "absolute");
98+
assert_eq!(json["counter"]["value"], 42.0);
99+
}

0 commit comments

Comments
 (0)