Skip to content

Commit 926fee7

Browse files
adelapenadjatnieks
authored andcommitted
CNDB-11762: Remove StorageAttachedIndexQueryPlan#postProcessor (#1422)
`StorageAttachedIndexQueryPlan#postProcessor` seems redundant because coordinator-side sorting+triming of ANN query results is already done in `SelectStatement`. If that's the case, we should remove it. - [ ] Make sure there is a PR in the CNDB project updating the Converged Cassandra version - [ ] Use `NoSpamLogger` for log lines that may appear frequently in the logs - [ ] Verify test results on Butler - [ ] Test coverage for new/modified code is > 80% - [ ] Proper code formatting - [ ] Proper title for each commit staring with the project-issue number, like CNDB-1234 - [ ] Each commit has a meaningful description - [ ] Each commit is not very long and contains related changes - [ ] Renames, moves and reformatting are in distinct commits
1 parent faec14b commit 926fee7

File tree

11 files changed

+204
-224
lines changed

11 files changed

+204
-224
lines changed

src/java/org/apache/cassandra/cql3/statements/SelectStatement.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -518,9 +518,7 @@ public ReadQuery getQuery(QueryOptions options,
518518
String.format(TOPK_CONSISTENCY_LEVEL_ERROR, options.getConsistency()));
519519

520520
// Consistency levels with more than one replica are downgraded to ONE/LOCAL_ONE.
521-
if (options.getConsistency() != ConsistencyLevel.ONE &&
522-
options.getConsistency() != ConsistencyLevel.LOCAL_ONE &&
523-
options.getConsistency() != ConsistencyLevel.NODE_LOCAL)
521+
if (options.getConsistency().needsReconciliation())
524522
{
525523
ConsistencyLevel supplied = options.getConsistency();
526524
ConsistencyLevel downgrade = supplied.isDatacenterLocal() ? ConsistencyLevel.LOCAL_ONE : ConsistencyLevel.ONE;

src/java/org/apache/cassandra/db/ConsistencyLevel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ public void validateCounterForWrite(TableMetadata metadata, ClientState clientSt
286286
}
287287

288288
/**
289-
* With a replication factor greater than one, reads that contact more than one replica will require
289+
* With a replication factor greater than one, reads that contact more than one replica will require
290290
* reconciliation of the individual replica results at the coordinator.
291291
*
292292
* @return true if reads at this consistency level require merging at the coordinator

src/java/org/apache/cassandra/db/ReadCommand.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -573,21 +573,10 @@ public ReadExecutionController executionController(boolean trackRepairedStatus)
573573
return ReadExecutionController.forCommand(this, trackRepairedStatus);
574574
}
575575

576-
/**
577-
* Allow to post-process the result of the query after it has been reconciled on the coordinator
578-
* but before it is passed to the CQL layer to return the ResultSet.
579-
*
580-
* See CASSANDRA-8717 for why this exists.
581-
*/
582-
public PartitionIterator postReconciliationProcessing(PartitionIterator result)
583-
{
584-
return indexQueryPlan == null ? result : indexQueryPlan.postProcessor(this).apply(result);
585-
}
586-
587576
@Override
588577
public PartitionIterator executeInternal(ReadExecutionController controller)
589578
{
590-
return postReconciliationProcessing(UnfilteredPartitionIterators.filter(executeLocally(controller), nowInSec()));
579+
return UnfilteredPartitionIterators.filter(executeLocally(controller), nowInSec());
591580
}
592581

593582
public ReadExecutionController executionController()

src/java/org/apache/cassandra/index/Index.java

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.util.Optional;
3131
import java.util.Set;
3232
import java.util.concurrent.Callable;
33-
import java.util.function.Function;
3433
import java.util.function.Predicate;
3534
import java.util.function.Supplier;
3635
import javax.annotation.Nonnull;
@@ -51,7 +50,6 @@
5150
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
5251
import org.apache.cassandra.db.marshal.AbstractType;
5352
import org.apache.cassandra.db.memtable.Memtable;
54-
import org.apache.cassandra.db.partitions.PartitionIterator;
5553
import org.apache.cassandra.db.partitions.PartitionUpdate;
5654
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
5755
import org.apache.cassandra.db.rows.Row;
@@ -134,13 +132,6 @@
134132
* cannot support a given Expression. After filtering, the set of candidate indexes are ranked according to the result
135133
* of getEstimatedResultRows and the most selective (i.e. the one expected to return the smallest number of results) is
136134
* chosen. A Searcher instance is then obtained from the searcherFor method and used to perform the actual Index lookup.
137-
* Finally, Indexes can define a post processing step to be performed on the coordinator, after results (partitions from
138-
* the primary table) have been received from replicas and reconciled. This post processing is defined as a
139-
* {@code java.util.functions.BiFunction<PartitionIterator, RowFilter, PartitionIterator>}, that is a function which takes as
140-
* arguments a PartitionIterator (containing the reconciled result rows) and a RowFilter (from the ReadCommand being
141-
* executed) and returns another iterator of partitions, possibly having transformed the initial results in some way.
142-
* The post processing function is obtained from the Index's postProcessorFor method; the built-in indexes which ship
143-
* with Cassandra return a no-op function here.
144135
*
145136
* An optional static method may be provided to validate custom index options (two variants are supported):
146137
*
@@ -1098,27 +1089,6 @@ default void validate(ReadCommand command) throws InvalidRequestException
10981089
*/
10991090
Searcher searcherFor(ReadCommand command);
11001091

1101-
/**
1102-
* Return a function which performs post processing on the results of a partition range read command.
1103-
* In future, this may be used as a generalized mechanism for transforming results on the coordinator prior
1104-
* to returning them to the caller.
1105-
*
1106-
* This is used on the coordinator during execution of a range command to perform post
1107-
* processing of merged results obtained from the necessary replicas. This is the only way in which results are
1108-
* transformed in this way but this may change over time as usage is generalized.
1109-
* See CASSANDRA-8717 for further discussion.
1110-
*
1111-
* The function takes a PartitionIterator of the results from the replicas which has already been collated
1112-
* and reconciled, along with the command being executed. It returns another PartitionIterator containing the results
1113-
* of the transformation (which may be the same as the input if the transformation is a no-op).
1114-
*
1115-
* @param command the read command being executed
1116-
*/
1117-
default Function<PartitionIterator, PartitionIterator> postProcessor(ReadCommand command)
1118-
{
1119-
return partitions -> partitions;
1120-
}
1121-
11221092
/**
11231093
* Transform an initial {@link RowFilter} into the filter that will still need to applied to a set of Rows after
11241094
* the index has performed it's initial scan.

src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexQueryPlan.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.HashSet;
2121
import java.util.Set;
2222
import java.util.concurrent.TimeUnit;
23-
import java.util.function.Function;
2423
import javax.annotation.Nullable;
2524

2625
import com.google.common.collect.ImmutableSet;
@@ -30,7 +29,6 @@
3029
import org.apache.cassandra.db.DecoratedKey;
3130
import org.apache.cassandra.db.ReadCommand;
3231
import org.apache.cassandra.db.filter.RowFilter;
33-
import org.apache.cassandra.db.partitions.PartitionIterator;
3432
import org.apache.cassandra.db.rows.Row;
3533
import org.apache.cassandra.db.rows.Unfiltered;
3634
import org.apache.cassandra.index.Index;
@@ -209,19 +207,6 @@ public Index.Searcher searcherFor(ReadCommand command)
209207
DatabaseDescriptor.getRangeRpcTimeout(TimeUnit.MILLISECONDS));
210208
}
211209

212-
/**
213-
* Called on coordinator after merging replica responses before returning to client
214-
*/
215-
@Override
216-
public Function<PartitionIterator, PartitionIterator> postProcessor(ReadCommand command)
217-
{
218-
if (!isTopK())
219-
return partitions -> partitions;
220-
221-
// in case of top-k query, filter out rows that are not actually global top-K
222-
return partitions -> (PartitionIterator) new TopKProcessor(command).filter(partitions);
223-
}
224-
225210
/**
226211
* @return a filter with all the expressions that are user-defined
227212
*/

src/java/org/apache/cassandra/index/sai/plan/TopKProcessor.java

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,7 @@
4848
import org.apache.cassandra.db.Keyspace;
4949
import org.apache.cassandra.db.ReadCommand;
5050
import org.apache.cassandra.db.filter.RowFilter;
51-
import org.apache.cassandra.db.partitions.BasePartitionIterator;
5251
import org.apache.cassandra.db.partitions.ParallelCommandProcessor;
53-
import org.apache.cassandra.db.partitions.PartitionIterator;
5452
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
5553
import org.apache.cassandra.db.rows.BaseRowIterator;
5654
import org.apache.cassandra.db.rows.Row;
@@ -60,7 +58,6 @@
6058
import org.apache.cassandra.index.sai.IndexContext;
6159
import org.apache.cassandra.index.sai.StorageAttachedIndex;
6260
import org.apache.cassandra.index.sai.utils.AbortedOperationException;
63-
import org.apache.cassandra.index.sai.utils.InMemoryPartitionIterator;
6461
import org.apache.cassandra.index.sai.utils.InMemoryUnfilteredPartitionIterator;
6562
import org.apache.cassandra.index.sai.utils.PartitionInfo;
6663
import org.apache.cassandra.index.sai.utils.TypeUtil;
@@ -74,18 +71,7 @@
7471
/**
7572
* Processor applied to SAI based ORDER BY queries. This class could likely be refactored into either two filter
7673
* methods depending on where the processing is happening or into two classes.
77-
*
78-
* This processor performs the following steps on a replica:
79-
* - collect LIMIT rows from partition iterator, making sure that all are valid.
80-
* - return rows in Primary Key order
81-
*
82-
* This processor performs the following steps on a coordinator:
83-
* - consume all rows from the provided partition iterator and sort them according to the specified order.
84-
* For vectors, that is similarit score and for all others, that is the ordering defined by their
85-
* {@link org.apache.cassandra.db.marshal.AbstractType}. If there are multiple vector indexes,
86-
* the final score is the sum of all vector index scores.
87-
* - remove rows with the lowest scores from PQ if PQ size exceeds limit
88-
* - return rows from PQ in primary key order to caller
74+
* Ordering on the coordinator is delegated to CQL.
8975
*/
9076
public class TopKProcessor
9177
{
@@ -123,8 +109,8 @@ public TopKProcessor(ReadCommand command)
123109
/**
124110
* Executor to use for parallel index reads.
125111
* Defined by -Dcassandra.index_read.parallele=true/false, true by default.
126-
*
127-
* INDEX_READ uses 2 * cpus threads by default but can be overridden with -Dcassandra.index_read.parallel_thread_num=#value
112+
* </p>
113+
* INDEX_READ uses 2 * cpus threads by default but can be overridden with {@literal -Dcassandra.index_read.parallel_thread_num=<value>}
128114
*
129115
* @return stage to use, default INDEX_READ
130116
*/
@@ -147,7 +133,7 @@ private static LocalAwareExecutorPlus getExecutor()
147133
* Filter given partitions and keep the rows with highest scores. In case of {@link UnfilteredPartitionIterator},
148134
* all tombstones will be kept. Caller must close the supplied iterator.
149135
*/
150-
public <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BasePartitionIterator<R>> BasePartitionIterator<?> filter(P partitions)
136+
public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator partitions)
151137
{
152138
// filterInternal consumes the partitions iterator and creates a new one. Use a try-with-resources block
153139
// to ensure the original iterator is closed. We do not expect exceptions from filterInternal, but if they
@@ -159,12 +145,14 @@ public <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BasePartit
159145
}
160146
}
161147

162-
private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BasePartitionIterator<R>> BasePartitionIterator<?> filterInternal(P partitions)
148+
private UnfilteredPartitionIterator filterInternal(UnfilteredPartitionIterator partitions)
163149
{
164150
// priority queue ordered by score in descending order
165151
Comparator<Triple<PartitionInfo, Row, ?>> comparator;
166152
if (queryVector != null)
153+
{
167154
comparator = Comparator.comparing((Triple<PartitionInfo, Row, ?> t) -> (Float) t.getRight()).reversed();
155+
}
168156
else
169157
{
170158
comparator = Comparator.comparing(t -> (ByteBuffer) t.getRight(), indexContext.getValidator());
@@ -175,13 +163,15 @@ private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BaseParti
175163
// to store top-k results in primary key order
176164
TreeMap<PartitionInfo, TreeSet<Unfiltered>> unfilteredByPartition = new TreeMap<>(Comparator.comparing(p -> p.key));
177165

178-
if (PARALLEL_EXECUTOR != ImmediateExecutor.INSTANCE && partitions instanceof ParallelCommandProcessor) {
166+
if (PARALLEL_EXECUTOR != ImmediateExecutor.INSTANCE && partitions instanceof ParallelCommandProcessor)
167+
{
179168
ParallelCommandProcessor pIter = (ParallelCommandProcessor) partitions;
180169
var commands = pIter.getUninitializedCommands();
181170
List<CompletableFuture<PartitionResults>> results = new ArrayList<>(commands.size());
182171

183172
int count = commands.size();
184-
for (var command: commands) {
173+
for (var command : commands)
174+
{
185175
CompletableFuture<PartitionResults> future = new CompletableFuture<>();
186176
results.add(future);
187177

@@ -201,7 +191,8 @@ private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BaseParti
201191
});
202192
}
203193

204-
for (CompletableFuture<PartitionResults> triplesFuture: results) {
194+
for (CompletableFuture<PartitionResults> triplesFuture : results)
195+
{
205196
PartitionResults pr;
206197
try
207198
{
@@ -216,10 +207,12 @@ private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BaseParti
216207
if (pr == null)
217208
continue;
218209
topK.addAll(pr.rows);
219-
for (var uf: pr.tombstones)
210+
for (var uf : pr.tombstones)
220211
addUnfiltered(unfilteredByPartition, pr.partitionInfo, uf);
221212
}
222-
} else if (partitions instanceof StorageAttachedIndexSearcher.ScoreOrderedResultRetriever) {
213+
}
214+
else if (partitions instanceof StorageAttachedIndexSearcher.ScoreOrderedResultRetriever)
215+
{
223216
// FilteredPartitions does not implement ParallelizablePartitionIterator.
224217
// Realistically, this won't benefit from parallelizm as these are coming from in-memory/memtable data.
225218
int rowsMatched = 0;
@@ -232,7 +225,9 @@ private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BaseParti
232225
rowsMatched += processSingleRowPartition(unfilteredByPartition, partitionRowIterator);
233226
}
234227
}
235-
} else {
228+
}
229+
else
230+
{
236231
// FilteredPartitions does not implement ParallelizablePartitionIterator.
237232
// Realistically, this won't benefit from parallelizm as these are coming from in-memory/memtable data.
238233
while (partitions.hasNext())
@@ -244,7 +239,7 @@ private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BaseParti
244239
{
245240
PartitionResults pr = processPartition(partitionRowIterator);
246241
topK.addAll(pr.rows);
247-
for (var uf: pr.tombstones)
242+
for (var uf : pr.tombstones)
248243
addUnfiltered(unfilteredByPartition, pr.partitionInfo, uf);
249244
}
250245
else
@@ -255,7 +250,6 @@ private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BaseParti
255250
topK.add(Triple.of(PartitionInfo.create(partitionRowIterator), row, row.getCell(expression.column()).buffer()));
256251
}
257252
}
258-
259253
}
260254
}
261255
}
@@ -264,17 +258,17 @@ private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BaseParti
264258
for (var triple : topK.getUnsortedShared())
265259
addUnfiltered(unfilteredByPartition, triple.getLeft(), triple.getMiddle());
266260

267-
if (partitions instanceof PartitionIterator)
268-
return new InMemoryPartitionIterator(command, unfilteredByPartition);
269261
return new InMemoryUnfilteredPartitionIterator(command, unfilteredByPartition);
270262
}
271263

272-
private class PartitionResults {
264+
private class PartitionResults
265+
{
273266
final PartitionInfo partitionInfo;
274267
final SortedSet<Unfiltered> tombstones = new TreeSet<>(command.metadata().comparator);
275268
final List<Triple<PartitionInfo, Row, Float>> rows = new ArrayList<>();
276269

277-
PartitionResults(PartitionInfo partitionInfo) {
270+
PartitionResults(PartitionInfo partitionInfo)
271+
{
278272
this.partitionInfo = partitionInfo;
279273
}
280274

@@ -283,15 +277,17 @@ void addTombstone(Unfiltered uf)
283277
tombstones.add(uf);
284278
}
285279

286-
void addRow(Triple<PartitionInfo, Row, Float> triple) {
280+
void addRow(Triple<PartitionInfo, Row, Float> triple)
281+
{
287282
rows.add(triple);
288283
}
289284
}
290285

291286
/**
292287
* Processes a single partition, calculating scores for rows and extracting tombstones.
293288
*/
294-
private PartitionResults processPartition(BaseRowIterator<?> partitionRowIterator) {
289+
private PartitionResults processPartition(BaseRowIterator<?> partitionRowIterator)
290+
{
295291
// Compute key and static row score once per partition
296292
DecoratedKey key = partitionRowIterator.partitionKey();
297293
Row staticRow = partitionRowIterator.staticRow();
@@ -322,7 +318,8 @@ private PartitionResults processPartition(BaseRowIterator<?> partitionRowIterato
322318
* Processes a single partition, without scoring it.
323319
*/
324320
private int processSingleRowPartition(TreeMap<PartitionInfo, TreeSet<Unfiltered>> unfilteredByPartition,
325-
BaseRowIterator<?> partitionRowIterator) {
321+
BaseRowIterator<?> partitionRowIterator)
322+
{
326323
if (!partitionRowIterator.hasNext())
327324
return 0;
328325

0 commit comments

Comments
 (0)