48
48
import org .apache .cassandra .db .Keyspace ;
49
49
import org .apache .cassandra .db .ReadCommand ;
50
50
import org .apache .cassandra .db .filter .RowFilter ;
51
- import org .apache .cassandra .db .partitions .BasePartitionIterator ;
52
51
import org .apache .cassandra .db .partitions .ParallelCommandProcessor ;
53
- import org .apache .cassandra .db .partitions .PartitionIterator ;
54
52
import org .apache .cassandra .db .partitions .UnfilteredPartitionIterator ;
55
53
import org .apache .cassandra .db .rows .BaseRowIterator ;
56
54
import org .apache .cassandra .db .rows .Row ;
60
58
import org .apache .cassandra .index .sai .IndexContext ;
61
59
import org .apache .cassandra .index .sai .StorageAttachedIndex ;
62
60
import org .apache .cassandra .index .sai .utils .AbortedOperationException ;
63
- import org .apache .cassandra .index .sai .utils .InMemoryPartitionIterator ;
64
61
import org .apache .cassandra .index .sai .utils .InMemoryUnfilteredPartitionIterator ;
65
62
import org .apache .cassandra .index .sai .utils .PartitionInfo ;
66
63
import org .apache .cassandra .index .sai .utils .TypeUtil ;
74
71
/**
75
72
* Processor applied to SAI based ORDER BY queries. This class could likely be refactored into either two filter
76
73
* methods depending on where the processing is happening or into two classes.
77
- *
78
- * This processor performs the following steps on a replica:
79
- * - collect LIMIT rows from partition iterator, making sure that all are valid.
80
- * - return rows in Primary Key order
81
- *
82
- * This processor performs the following steps on a coordinator:
83
- * - consume all rows from the provided partition iterator and sort them according to the specified order.
84
- * For vectors, that is similarit score and for all others, that is the ordering defined by their
85
- * {@link org.apache.cassandra.db.marshal.AbstractType}. If there are multiple vector indexes,
86
- * the final score is the sum of all vector index scores.
87
- * - remove rows with the lowest scores from PQ if PQ size exceeds limit
88
- * - return rows from PQ in primary key order to caller
74
+ * Ordering on the coordinator is delegated to CQL.
89
75
*/
90
76
public class TopKProcessor
91
77
{
@@ -123,8 +109,8 @@ public TopKProcessor(ReadCommand command)
123
109
/**
124
110
* Executor to use for parallel index reads.
125
111
* Defined by -Dcassandra.index_read.parallele=true/false, true by default.
126
- *
127
- * INDEX_READ uses 2 * cpus threads by default but can be overridden with -Dcassandra.index_read.parallel_thread_num=# value
112
+ * </p>
113
+ * INDEX_READ uses 2 * cpus threads by default but can be overridden with {@literal -Dcassandra.index_read.parallel_thread_num=< value>}
128
114
*
129
115
* @return stage to use, default INDEX_READ
130
116
*/
@@ -147,7 +133,7 @@ private static LocalAwareExecutorPlus getExecutor()
147
133
* Filter given partitions and keep the rows with highest scores. In case of {@link UnfilteredPartitionIterator},
148
134
* all tombstones will be kept. Caller must close the supplied iterator.
149
135
*/
150
- public < U extends Unfiltered , R extends BaseRowIterator < U >, P extends BasePartitionIterator < R >> BasePartitionIterator <?> filter (P partitions )
136
+ public UnfilteredPartitionIterator filter (UnfilteredPartitionIterator partitions )
151
137
{
152
138
// filterInternal consumes the partitions iterator and creates a new one. Use a try-with-resources block
153
139
// to ensure the original iterator is closed. We do not expect exceptions from filterInternal, but if they
@@ -159,12 +145,14 @@ public <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BasePartit
159
145
}
160
146
}
161
147
162
- private < U extends Unfiltered , R extends BaseRowIterator < U >, P extends BasePartitionIterator < R >> BasePartitionIterator <?> filterInternal (P partitions )
148
+ private UnfilteredPartitionIterator filterInternal (UnfilteredPartitionIterator partitions )
163
149
{
164
150
// priority queue ordered by score in descending order
165
151
Comparator <Triple <PartitionInfo , Row , ?>> comparator ;
166
152
if (queryVector != null )
153
+ {
167
154
comparator = Comparator .comparing ((Triple <PartitionInfo , Row , ?> t ) -> (Float ) t .getRight ()).reversed ();
155
+ }
168
156
else
169
157
{
170
158
comparator = Comparator .comparing (t -> (ByteBuffer ) t .getRight (), indexContext .getValidator ());
@@ -175,13 +163,15 @@ private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BaseParti
175
163
// to store top-k results in primary key order
176
164
TreeMap <PartitionInfo , TreeSet <Unfiltered >> unfilteredByPartition = new TreeMap <>(Comparator .comparing (p -> p .key ));
177
165
178
- if (PARALLEL_EXECUTOR != ImmediateExecutor .INSTANCE && partitions instanceof ParallelCommandProcessor ) {
166
+ if (PARALLEL_EXECUTOR != ImmediateExecutor .INSTANCE && partitions instanceof ParallelCommandProcessor )
167
+ {
179
168
ParallelCommandProcessor pIter = (ParallelCommandProcessor ) partitions ;
180
169
var commands = pIter .getUninitializedCommands ();
181
170
List <CompletableFuture <PartitionResults >> results = new ArrayList <>(commands .size ());
182
171
183
172
int count = commands .size ();
184
- for (var command : commands ) {
173
+ for (var command : commands )
174
+ {
185
175
CompletableFuture <PartitionResults > future = new CompletableFuture <>();
186
176
results .add (future );
187
177
@@ -201,7 +191,8 @@ private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BaseParti
201
191
});
202
192
}
203
193
204
- for (CompletableFuture <PartitionResults > triplesFuture : results ) {
194
+ for (CompletableFuture <PartitionResults > triplesFuture : results )
195
+ {
205
196
PartitionResults pr ;
206
197
try
207
198
{
@@ -216,10 +207,12 @@ private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BaseParti
216
207
if (pr == null )
217
208
continue ;
218
209
topK .addAll (pr .rows );
219
- for (var uf : pr .tombstones )
210
+ for (var uf : pr .tombstones )
220
211
addUnfiltered (unfilteredByPartition , pr .partitionInfo , uf );
221
212
}
222
- } else if (partitions instanceof StorageAttachedIndexSearcher .ScoreOrderedResultRetriever ) {
213
+ }
214
+ else if (partitions instanceof StorageAttachedIndexSearcher .ScoreOrderedResultRetriever )
215
+ {
223
216
// FilteredPartitions does not implement ParallelizablePartitionIterator.
224
217
// Realistically, this won't benefit from parallelizm as these are coming from in-memory/memtable data.
225
218
int rowsMatched = 0 ;
@@ -232,7 +225,9 @@ private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BaseParti
232
225
rowsMatched += processSingleRowPartition (unfilteredByPartition , partitionRowIterator );
233
226
}
234
227
}
235
- } else {
228
+ }
229
+ else
230
+ {
236
231
// FilteredPartitions does not implement ParallelizablePartitionIterator.
237
232
// Realistically, this won't benefit from parallelizm as these are coming from in-memory/memtable data.
238
233
while (partitions .hasNext ())
@@ -244,7 +239,7 @@ private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BaseParti
244
239
{
245
240
PartitionResults pr = processPartition (partitionRowIterator );
246
241
topK .addAll (pr .rows );
247
- for (var uf : pr .tombstones )
242
+ for (var uf : pr .tombstones )
248
243
addUnfiltered (unfilteredByPartition , pr .partitionInfo , uf );
249
244
}
250
245
else
@@ -255,7 +250,6 @@ private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BaseParti
255
250
topK .add (Triple .of (PartitionInfo .create (partitionRowIterator ), row , row .getCell (expression .column ()).buffer ()));
256
251
}
257
252
}
258
-
259
253
}
260
254
}
261
255
}
@@ -264,17 +258,17 @@ private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BaseParti
264
258
for (var triple : topK .getUnsortedShared ())
265
259
addUnfiltered (unfilteredByPartition , triple .getLeft (), triple .getMiddle ());
266
260
267
- if (partitions instanceof PartitionIterator )
268
- return new InMemoryPartitionIterator (command , unfilteredByPartition );
269
261
return new InMemoryUnfilteredPartitionIterator (command , unfilteredByPartition );
270
262
}
271
263
272
- private class PartitionResults {
264
+ private class PartitionResults
265
+ {
273
266
final PartitionInfo partitionInfo ;
274
267
final SortedSet <Unfiltered > tombstones = new TreeSet <>(command .metadata ().comparator );
275
268
final List <Triple <PartitionInfo , Row , Float >> rows = new ArrayList <>();
276
269
277
- PartitionResults (PartitionInfo partitionInfo ) {
270
+ PartitionResults (PartitionInfo partitionInfo )
271
+ {
278
272
this .partitionInfo = partitionInfo ;
279
273
}
280
274
@@ -283,15 +277,17 @@ void addTombstone(Unfiltered uf)
283
277
tombstones .add (uf );
284
278
}
285
279
286
- void addRow (Triple <PartitionInfo , Row , Float > triple ) {
280
+ void addRow (Triple <PartitionInfo , Row , Float > triple )
281
+ {
287
282
rows .add (triple );
288
283
}
289
284
}
290
285
291
286
/**
292
287
* Processes a single partition, calculating scores for rows and extracting tombstones.
293
288
*/
294
- private PartitionResults processPartition (BaseRowIterator <?> partitionRowIterator ) {
289
+ private PartitionResults processPartition (BaseRowIterator <?> partitionRowIterator )
290
+ {
295
291
// Compute key and static row score once per partition
296
292
DecoratedKey key = partitionRowIterator .partitionKey ();
297
293
Row staticRow = partitionRowIterator .staticRow ();
@@ -322,7 +318,8 @@ private PartitionResults processPartition(BaseRowIterator<?> partitionRowIterato
322
318
* Processes a single partition, without scoring it.
323
319
*/
324
320
private int processSingleRowPartition (TreeMap <PartitionInfo , TreeSet <Unfiltered >> unfilteredByPartition ,
325
- BaseRowIterator <?> partitionRowIterator ) {
321
+ BaseRowIterator <?> partitionRowIterator )
322
+ {
326
323
if (!partitionRowIterator .hasNext ())
327
324
return 0 ;
328
325
0 commit comments