Skip to content

Commit fb44580

Browse files
wbewleebenson
andauthored
enhancement(api): Support tapping component inputs with vector tap (#11293)
* Add inputs as part of tap's watched resources * Wire up inputs_of/outputs_of pattern options Requires a change in the API to rename the patterns argument to outputsPatterns and support an inputsPatterns argument. * Implement basic input tapping * Fix notifications for matching input patterns * Fix existing tests given new types * Add new tests for tapping inputs * Minor clean up * Make inputsPatterns optional in subscription * Fix clippy errors * Update cli cue * Add 0.21 version guide entry for api change * component features Signed-off-by: Lee Benson <[email protected]> Co-authored-by: Lee Benson <[email protected]>
1 parent 9f0acde commit fb44580

File tree

11 files changed

+798
-63
lines changed

11 files changed

+798
-63
lines changed

lib/vector-api-client/graphql/schema.json

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5566,7 +5566,7 @@
55665566
"description": "A stream of events emitted from matched component ID patterns",
55675567
"args": [
55685568
{
5569-
"name": "patterns",
5569+
"name": "outputsPatterns",
55705570
"description": null,
55715571
"type": {
55725572
"kind": "NON_NULL",
@@ -5587,6 +5587,24 @@
55875587
},
55885588
"defaultValue": null
55895589
},
5590+
{
5591+
"name": "inputsPatterns",
5592+
"description": null,
5593+
"type": {
5594+
"kind": "LIST",
5595+
"name": null,
5596+
"ofType": {
5597+
"kind": "NON_NULL",
5598+
"name": null,
5599+
"ofType": {
5600+
"kind": "SCALAR",
5601+
"name": "String",
5602+
"ofType": null
5603+
}
5604+
}
5605+
},
5606+
"defaultValue": null
5607+
},
55905608
{
55915609
"name": "interval",
55925610
"description": null,

lib/vector-api-client/graphql/subscriptions/output_events_by_component_id_patterns.graphql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
subscription OutputEventsByComponentIdPatternsSubscription(
2-
$patterns: [String!]!, $limit: Int!, $interval: Int!, $encoding: EventEncodingType!){
3-
outputEventsByComponentIdPatterns(patterns: $patterns, limit: $limit, interval: $interval) {
2+
$outputsPatterns: [String!]!, $inputsPatterns: [String!], $limit: Int!, $interval: Int!, $encoding: EventEncodingType!){
3+
outputEventsByComponentIdPatterns(outputsPatterns: $outputsPatterns, inputsPatterns: $inputsPatterns, limit: $limit, interval: $interval) {
44
__typename
55
... on Log {
66
componentId

lib/vector-api-client/src/gql/tap.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ pub trait TapSubscriptionExt {
6969
/// Executes an output events subscription.
7070
fn output_events_by_component_id_patterns_subscription(
7171
&self,
72-
component_patterns: Vec<String>,
72+
outputs_patterns: Vec<String>,
73+
inputs_patterns: Vec<String>,
7374
encoding: TapEncodingFormat,
7475
limit: i64,
7576
interval: i64,
@@ -80,14 +81,16 @@ impl TapSubscriptionExt for crate::SubscriptionClient {
8081
/// Executes an output events subscription.
8182
fn output_events_by_component_id_patterns_subscription(
8283
&self,
83-
patterns: Vec<String>,
84+
outputs_patterns: Vec<String>,
85+
inputs_patterns: Vec<String>,
8486
encoding: TapEncodingFormat,
8587
limit: i64,
8688
interval: i64,
8789
) -> BoxedSubscription<OutputEventsByComponentIdPatternsSubscription> {
8890
let request_body = OutputEventsByComponentIdPatternsSubscription::build_query(
8991
output_events_by_component_id_patterns_subscription::Variables {
90-
patterns,
92+
outputs_patterns,
93+
inputs_patterns: Some(inputs_patterns),
9194
limit,
9295
interval,
9396
encoding: encoding.into(),

src/api/schema/events/mod.rs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ pub mod notification;
55
pub mod output;
66
pub mod trace;
77

8+
use std::collections::HashSet;
9+
810
use async_graphql::{Context, Subscription};
911
use encoding::EventEncodingType;
1012
use futures::Stream;
@@ -16,6 +18,32 @@ use tokio_stream::wrappers::ReceiverStream;
1618

1719
use crate::{api::tap::TapController, topology::WatchRx};
1820

21+
/// Patterns (glob) used by tap to match against components and access events
22+
/// flowing into (for_inputs) or out of (for_outputs) specified components
23+
#[derive(Debug)]
24+
pub struct TapPatterns {
25+
pub for_outputs: HashSet<String>,
26+
pub for_inputs: HashSet<String>,
27+
}
28+
29+
impl TapPatterns {
30+
pub const fn new(for_outputs: HashSet<String>, for_inputs: HashSet<String>) -> Self {
31+
Self {
32+
for_outputs,
33+
for_inputs,
34+
}
35+
}
36+
37+
/// Get all user-specified patterns
38+
pub fn all_patterns(&self) -> HashSet<String> {
39+
self.for_outputs
40+
.iter()
41+
.cloned()
42+
.chain(self.for_inputs.iter().cloned())
43+
.collect()
44+
}
45+
}
46+
1947
#[derive(Debug, Default)]
2048
pub struct EventsSubscription;
2149

@@ -25,12 +53,17 @@ impl EventsSubscription {
2553
pub async fn output_events_by_component_id_patterns<'a>(
2654
&'a self,
2755
ctx: &'a Context<'a>,
28-
patterns: Vec<String>,
56+
outputs_patterns: Vec<String>,
57+
inputs_patterns: Option<Vec<String>>,
2958
#[graphql(default = 500)] interval: u32,
3059
#[graphql(default = 100, validator(minimum = 1, maximum = 10_000))] limit: u32,
3160
) -> impl Stream<Item = Vec<OutputEventsPayload>> + 'a {
3261
let watch_rx = ctx.data_unchecked::<WatchRx>().clone();
3362

63+
let patterns = TapPatterns {
64+
for_outputs: outputs_patterns.into_iter().collect(),
65+
for_inputs: inputs_patterns.unwrap_or_default().into_iter().collect(),
66+
};
3467
// Client input is confined to `u32` to provide sensible bounds.
3568
create_events_stream(watch_rx, patterns, interval as u64, limit as usize)
3669
}
@@ -41,7 +74,7 @@ impl EventsSubscription {
4174
/// all matching events; filtering should be done at the caller level.
4275
pub(crate) fn create_events_stream(
4376
watch_rx: WatchRx,
44-
component_id_patterns: Vec<String>,
77+
patterns: TapPatterns,
4578
interval: u64,
4679
limit: usize,
4780
) -> impl Stream<Item = Vec<OutputEventsPayload>> {
@@ -57,7 +90,7 @@ pub(crate) fn create_events_stream(
5790
tokio::spawn(async move {
5891
// Create a tap controller. When this drops out of scope, clean up will be performed on the
5992
// event handlers and topology observation that the tap controller provides.
60-
let _tap_controller = TapController::new(watch_rx, tap_tx, &component_id_patterns);
93+
let _tap_controller = TapController::new(watch_rx, tap_tx, patterns);
6194

6295
// A tick interval to represent when to 'cut' the results back to the client.
6396
let mut interval = time::interval(time::Duration::from_millis(interval));

0 commit comments

Comments
 (0)