Skip to content

Commit 94def0f

Browse files
authored
enhancement(reduce transform): add internal events (#3812)
Closes #3401 Signed-off-by: Luke Steensen <[email protected]>
1 parent 74b43b7 commit 94def0f

File tree

3 files changed

+34
-0
lines changed

3 files changed

+34
-0
lines changed

src/internal_events/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ mod lua;
3333
mod process;
3434
#[cfg(feature = "sources-prometheus")]
3535
mod prometheus;
36+
#[cfg(feature = "transforms-reduce")]
37+
mod reduce;
3638
#[cfg(feature = "transforms-regex_parser")]
3739
mod regex_parser;
3840
mod remap;
@@ -102,6 +104,8 @@ pub use self::lua::*;
102104
pub use self::process::*;
103105
#[cfg(feature = "sources-prometheus")]
104106
pub use self::prometheus::*;
107+
#[cfg(feature = "transforms-reduce")]
108+
pub(crate) use self::reduce::*;
105109
#[cfg(feature = "transforms-regex_parser")]
106110
pub(crate) use self::regex_parser::*;
107111
pub use self::remap::*;

src/internal_events/reduce.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use super::InternalEvent;
2+
use metrics::counter;
3+
4+
#[derive(Debug)]
5+
pub(crate) struct ReduceEventProcessed;
6+
7+
impl InternalEvent for ReduceEventProcessed {
8+
fn emit_metrics(&self) {
9+
counter!("events_processed", 1,
10+
"component_kind" => "transform",
11+
"component_type" => "reduce",
12+
);
13+
}
14+
}
15+
16+
#[derive(Debug)]
17+
pub(crate) struct ReduceStaleEventFlushed;
18+
19+
impl InternalEvent for ReduceStaleEventFlushed {
20+
fn emit_metrics(&self) {
21+
counter!("stale_events_flushed", 1,
22+
"component_kind" => "transform",
23+
"component_type" => "reduce",
24+
);
25+
}
26+
}

src/transforms/reduce/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::{
44
config::{DataType, TransformConfig, TransformContext, TransformDescription},
55
event::discriminant::Discriminant,
66
event::{Event, LogEvent},
7+
internal_events::{ReduceEventProcessed, ReduceStaleEventFlushed},
78
};
89
use async_stream::stream;
910
use futures::{
@@ -180,6 +181,7 @@ impl Reduce {
180181
}
181182
for k in &flush_discriminants {
182183
if let Some(t) = self.reduce_merge_states.remove(k) {
184+
emit!(ReduceStaleEventFlushed);
183185
output.push(Event::from(t.flush()));
184186
}
185187
}
@@ -230,6 +232,8 @@ impl Transform for Reduce {
230232
}
231233
}
232234

235+
emit!(ReduceEventProcessed);
236+
233237
self.flush_into(output);
234238
}
235239

0 commit comments

Comments
 (0)