@@ -5,22 +5,21 @@ use chroma_system::{
5
5
wrap, ChannelError , ComponentContext , ComponentHandle , Dispatcher , Handler , Orchestrator ,
6
6
PanicError , TaskError , TaskMessage , TaskResult ,
7
7
} ;
8
- use chroma_types:: {
9
- operator:: { Filter , GetResult , Limit , Projection , ProjectionOutput } ,
10
- CollectionAndSegments ,
11
- } ;
8
+ use chroma_types:: operator:: { GetResult , Limit , Projection , ProjectionOutput } ;
12
9
use thiserror:: Error ;
13
10
use tokio:: sync:: oneshot:: { error:: RecvError , Sender } ;
14
11
use tracing:: Span ;
15
12
16
13
use crate :: execution:: operators:: {
17
- fetch_log:: { FetchLogError , FetchLogOperator , FetchLogOutput } ,
18
- filter:: { FilterError , FilterInput , FilterOutput } ,
14
+ fetch_log:: FetchLogError ,
15
+ filter:: FilterError ,
19
16
limit:: { LimitError , LimitInput , LimitOutput } ,
20
17
prefetch_record:: { PrefetchRecordError , PrefetchRecordOperator , PrefetchRecordOutput } ,
21
18
projection:: { ProjectionError , ProjectionInput } ,
22
19
} ;
23
20
21
+ use super :: filter:: FilterOrchestratorOutput ;
22
+
24
23
#[ derive( Error , Debug ) ]
25
24
pub enum GetError {
26
25
#[ error( "Error sending message through channel: {0}" ) ]
@@ -69,51 +68,47 @@ where
69
68
}
70
69
}
71
70
72
- /// The `GetOrchestrator` chains a sequence of operators in sequence to evaluate
73
- /// a `<collection>.get(...)` query from the user
71
+ /// The `GetOrchestrator` chains a sequence of operators in sequence to get data for user.
72
+ /// When used together with `FilterOrchestrator`, they evaluate a `<collection>.get(...)` query
74
73
///
75
74
/// # Pipeline
76
75
/// ```text
77
- /// ┌────────────┐
78
- /// │ │
79
- /// │ on_start │
80
- /// │ │
81
- /// └──────┬─────┘
82
- /// │
83
- /// ▼
84
- /// ┌────────────────────┐
85
- /// │ │
86
- /// │ FetchLogOperator │
87
- /// │ │
88
- /// └─────────┬──────────┘
89
- /// │
90
- /// ▼
91
- /// ┌───────────────────┐
92
- /// │ │
93
- /// │ FilterOperator │
94
- /// │ │
95
- /// └─────────┬─────────┘
96
76
/// │
97
- /// ▼
98
- /// ┌─────────────────┐
99
- /// │ │
100
- /// │ LimitOperator │
101
- /// │ │
102
- /// └────────┬────────┘
103
77
/// │
104
78
/// ▼
105
- /// ┌ ──────────────────────┐
106
- /// │ │
107
- /// │ ProjectionOperator │
108
- /// │ │
109
- /// └ ──────────┬───────────┘
79
+ /// ┌─ ──────────────────────┐
80
+ /// │ │
81
+ /// │ FilterOrchestrator │
82
+ /// │ │
83
+ /// └─ ──────────┬───────────┘
110
84
/// │
85
+ /// ┌────────── │ ────────────┐
86
+ /// │ │ Get │
87
+ /// │ │ Orchestrator │
88
+ /// │ ▼ │
89
+ /// │ ┌─────────────────┐ │
90
+ /// │ │ │ │
91
+ /// │ │ LimitOperator │ │
92
+ /// │ │ │ │
93
+ /// │ └────────┬────────┘ │
94
+ /// │ │ │
95
+ /// │ ▼ │
96
+ /// │ ┌──────────────────────┐ │
97
+ /// │ │ │ │
98
+ /// │ │ ProjectionOperator │ │
99
+ /// │ │ │ │
100
+ /// │ └──────────┬───────────┘ │
101
+ /// │ │ │
102
+ /// │ ▼ │
103
+ /// │ ┌──────────────────┐ │
104
+ /// │ │ │ │
105
+ /// │ │ result_channel │ │
106
+ /// │ │ │ │
107
+ /// │ └──────────────────┘ │
108
+ /// │ │ │
109
+ /// └────────── │ ────────────┘
111
110
/// ▼
112
- /// ┌──────────────────┐
113
- /// │ │
114
- /// │ result_channel │
115
- /// │ │
116
- /// └──────────────────┘
111
+ ///
117
112
/// ```
118
113
#[ derive( Debug ) ]
119
114
pub struct GetOrchestrator {
@@ -122,17 +117,10 @@ pub struct GetOrchestrator {
122
117
dispatcher : ComponentHandle < Dispatcher > ,
123
118
queue : usize ,
124
119
125
- // Collection segments
126
- collection_and_segments : CollectionAndSegments ,
127
-
128
- // Fetch logs
129
- fetch_log : FetchLogOperator ,
130
-
131
- // Fetched logs
132
- fetched_logs : Option < FetchLogOutput > ,
120
+ // Output from FilterOrchestrator
121
+ filter_output : FilterOrchestratorOutput ,
133
122
134
123
// Pipelined operators
135
- filter : Filter ,
136
124
limit : Limit ,
137
125
projection : Projection ,
138
126
@@ -146,20 +134,15 @@ impl GetOrchestrator {
146
134
blockfile_provider : BlockfileProvider ,
147
135
dispatcher : ComponentHandle < Dispatcher > ,
148
136
queue : usize ,
149
- collection_and_segments : CollectionAndSegments ,
150
- fetch_log : FetchLogOperator ,
151
- filter : Filter ,
137
+ filter_output : FilterOrchestratorOutput ,
152
138
limit : Limit ,
153
139
projection : Projection ,
154
140
) -> Self {
155
141
Self {
156
142
blockfile_provider,
157
143
dispatcher,
158
144
queue,
159
- collection_and_segments,
160
- fetch_log,
161
- fetched_logs : None ,
162
- filter,
145
+ filter_output,
163
146
limit,
164
147
projection,
165
148
result_channel : None ,
@@ -181,7 +164,17 @@ impl Orchestrator for GetOrchestrator {
181
164
ctx : & ComponentContext < Self > ,
182
165
) -> Vec < ( TaskMessage , Option < Span > ) > {
183
166
vec ! [ (
184
- wrap( Box :: new( self . fetch_log. clone( ) ) , ( ) , ctx. receiver( ) ) ,
167
+ wrap(
168
+ Box :: new( self . limit. clone( ) ) ,
169
+ LimitInput {
170
+ logs: self . filter_output. logs. clone( ) ,
171
+ blockfile_provider: self . blockfile_provider. clone( ) ,
172
+ record_segment: self . filter_output. record_segment. clone( ) ,
173
+ log_offset_ids: self . filter_output. filter_output. log_offset_ids. clone( ) ,
174
+ compact_offset_ids: self . filter_output. filter_output. compact_offset_ids. clone( ) ,
175
+ } ,
176
+ ctx. receiver( ) ,
177
+ ) ,
185
178
Some ( Span :: current( ) ) ,
186
179
) ]
187
180
}
@@ -201,68 +194,6 @@ impl Orchestrator for GetOrchestrator {
201
194
}
202
195
}
203
196
204
- #[ async_trait]
205
- impl Handler < TaskResult < FetchLogOutput , FetchLogError > > for GetOrchestrator {
206
- type Result = ( ) ;
207
-
208
- async fn handle (
209
- & mut self ,
210
- message : TaskResult < FetchLogOutput , FetchLogError > ,
211
- ctx : & ComponentContext < Self > ,
212
- ) {
213
- let output = match self . ok_or_terminate ( message. into_inner ( ) , ctx) . await {
214
- Some ( output) => output,
215
- None => return ,
216
- } ;
217
-
218
- self . fetched_logs = Some ( output. clone ( ) ) ;
219
-
220
- let task = wrap (
221
- Box :: new ( self . filter . clone ( ) ) ,
222
- FilterInput {
223
- logs : output,
224
- blockfile_provider : self . blockfile_provider . clone ( ) ,
225
- metadata_segment : self . collection_and_segments . metadata_segment . clone ( ) ,
226
- record_segment : self . collection_and_segments . record_segment . clone ( ) ,
227
- } ,
228
- ctx. receiver ( ) ,
229
- ) ;
230
- self . send ( task, ctx, Some ( Span :: current ( ) ) ) . await ;
231
- }
232
- }
233
-
234
- #[ async_trait]
235
- impl Handler < TaskResult < FilterOutput , FilterError > > for GetOrchestrator {
236
- type Result = ( ) ;
237
-
238
- async fn handle (
239
- & mut self ,
240
- message : TaskResult < FilterOutput , FilterError > ,
241
- ctx : & ComponentContext < Self > ,
242
- ) {
243
- let output = match self . ok_or_terminate ( message. into_inner ( ) , ctx) . await {
244
- Some ( output) => output,
245
- None => return ,
246
- } ;
247
- let task = wrap (
248
- Box :: new ( self . limit . clone ( ) ) ,
249
- LimitInput {
250
- logs : self
251
- . fetched_logs
252
- . as_ref ( )
253
- . expect ( "FetchLogOperator should have finished already" )
254
- . clone ( ) ,
255
- blockfile_provider : self . blockfile_provider . clone ( ) ,
256
- record_segment : self . collection_and_segments . record_segment . clone ( ) ,
257
- log_offset_ids : output. log_offset_ids ,
258
- compact_offset_ids : output. compact_offset_ids ,
259
- } ,
260
- ctx. receiver ( ) ,
261
- ) ;
262
- self . send ( task, ctx, Some ( Span :: current ( ) ) ) . await ;
263
- }
264
- }
265
-
266
197
#[ async_trait]
267
198
impl Handler < TaskResult < LimitOutput , LimitError > > for GetOrchestrator {
268
199
type Result = ( ) ;
@@ -278,13 +209,9 @@ impl Handler<TaskResult<LimitOutput, LimitError>> for GetOrchestrator {
278
209
} ;
279
210
280
211
let input = ProjectionInput {
281
- logs : self
282
- . fetched_logs
283
- . as_ref ( )
284
- . expect ( "FetchLogOperator should have finished already" )
285
- . clone ( ) ,
212
+ logs : self . filter_output . logs . clone ( ) ,
286
213
blockfile_provider : self . blockfile_provider . clone ( ) ,
287
- record_segment : self . collection_and_segments . record_segment . clone ( ) ,
214
+ record_segment : self . filter_output . record_segment . clone ( ) ,
288
215
offset_ids : output. offset_ids . iter ( ) . collect ( ) ,
289
216
} ;
290
217
@@ -332,9 +259,8 @@ impl Handler<TaskResult<ProjectionOutput, ProjectionError>> for GetOrchestrator
332
259
} ;
333
260
334
261
let pulled_log_bytes = self
335
- . fetched_logs
336
- . as_ref ( )
337
- . expect ( "FetchLogOperator should have finished already" )
262
+ . filter_output
263
+ . logs
338
264
. iter ( )
339
265
. map ( |( l, _) | l. size_bytes ( ) )
340
266
. sum ( ) ;
0 commit comments