Skip to content

Commit 5d41404

Browse files
adelapenadjatnieks
authored andcommitted
CNDB-11762: Remove StorageAttachedIndexQueryPlan#postProcessor (#1422)
`StorageAttachedIndexQueryPlan#postProcessor` seems redundant because coordinator-side sorting+triming of ANN query results is already done in `SelectStatement`. If that's the case, we should remove it. - [ ] Make sure there is a PR in the CNDB project updating the Converged Cassandra version - [ ] Use `NoSpamLogger` for log lines that may appear frequently in the logs - [ ] Verify test results on Butler - [ ] Test coverage for new/modified code is > 80% - [ ] Proper code formatting - [ ] Proper title for each commit staring with the project-issue number, like CNDB-1234 - [ ] Each commit has a meaningful description - [ ] Each commit is not very long and contains related changes - [ ] Renames, moves and reformatting are in distinct commits
1 parent 33bb47a commit 5d41404

File tree

11 files changed

+205
-225
lines changed

11 files changed

+205
-225
lines changed

src/java/org/apache/cassandra/cql3/statements/SelectStatement.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -519,9 +519,7 @@ public ReadQuery getQuery(QueryOptions options,
519519
String.format(TOPK_CONSISTENCY_LEVEL_ERROR, options.getConsistency()));
520520

521521
// Consistency levels with more than one replica are downgraded to ONE/LOCAL_ONE.
522-
if (options.getConsistency() != ConsistencyLevel.ONE &&
523-
options.getConsistency() != ConsistencyLevel.LOCAL_ONE &&
524-
options.getConsistency() != ConsistencyLevel.NODE_LOCAL)
522+
if (options.getConsistency().needsReconciliation())
525523
{
526524
ConsistencyLevel supplied = options.getConsistency();
527525
ConsistencyLevel downgrade = supplied.isDatacenterLocal() ? ConsistencyLevel.LOCAL_ONE : ConsistencyLevel.ONE;

src/java/org/apache/cassandra/db/ConsistencyLevel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ public void validateCounterForWrite(TableMetadata metadata, ClientState clientSt
286286
}
287287

288288
/**
289-
* With a replication factor greater than one, reads that contact more than one replica will require
289+
* With a replication factor greater than one, reads that contact more than one replica will require
290290
* reconciliation of the individual replica results at the coordinator.
291291
*
292292
* @return true if reads at this consistency level require merging at the coordinator

src/java/org/apache/cassandra/db/ReadCommand.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -596,21 +596,10 @@ public ReadExecutionController executionController(boolean trackRepairedStatus)
596596
return ReadExecutionController.forCommand(this, trackRepairedStatus);
597597
}
598598

599-
/**
600-
* Allow to post-process the result of the query after it has been reconciled on the coordinator
601-
* but before it is passed to the CQL layer to return the ResultSet.
602-
*
603-
* See CASSANDRA-8717 for why this exists.
604-
*/
605-
public PartitionIterator postReconciliationProcessing(PartitionIterator result)
606-
{
607-
return indexQueryPlan == null ? result : indexQueryPlan.postProcessor(this).apply(result);
608-
}
609-
610599
@Override
611600
public PartitionIterator executeInternal(ReadExecutionController controller)
612601
{
613-
return postReconciliationProcessing(UnfilteredPartitionIterators.filter(executeLocally(controller), nowInSec()));
602+
return UnfilteredPartitionIterators.filter(executeLocally(controller), nowInSec());
614603
}
615604

616605
public ReadExecutionController executionController()

src/java/org/apache/cassandra/index/Index.java

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.util.Optional;
3131
import java.util.Set;
3232
import java.util.concurrent.Callable;
33-
import java.util.function.Function;
3433
import java.util.function.Predicate;
3534
import java.util.function.Supplier;
3635
import javax.annotation.Nonnull;
@@ -51,7 +50,6 @@
5150
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
5251
import org.apache.cassandra.db.marshal.AbstractType;
5352
import org.apache.cassandra.db.memtable.Memtable;
54-
import org.apache.cassandra.db.partitions.PartitionIterator;
5553
import org.apache.cassandra.db.partitions.PartitionUpdate;
5654
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
5755
import org.apache.cassandra.db.rows.Row;
@@ -135,13 +133,6 @@
135133
* cannot support a given Expression. After filtering, the set of candidate indexes are ranked according to the result
136134
* of getEstimatedResultRows and the most selective (i.e. the one expected to return the smallest number of results) is
137135
* chosen. A Searcher instance is then obtained from the searcherFor method and used to perform the actual Index lookup.
138-
* Finally, Indexes can define a post processing step to be performed on the coordinator, after results (partitions from
139-
* the primary table) have been received from replicas and reconciled. This post processing is defined as a
140-
* {@code java.util.functions.BiFunction<PartitionIterator, RowFilter, PartitionIterator>}, that is a function which takes as
141-
* arguments a PartitionIterator (containing the reconciled result rows) and a RowFilter (from the ReadCommand being
142-
* executed) and returns another iterator of partitions, possibly having transformed the initial results in some way.
143-
* The post processing function is obtained from the Index's postProcessorFor method; the built-in indexes which ship
144-
* with Cassandra return a no-op function here.
145136
*
146137
* An optional static method may be provided to validate custom index options (two variants are supported):
147138
*
@@ -1101,27 +1092,6 @@ default void validate(ReadCommand command) throws InvalidRequestException
11011092
*/
11021093
Searcher searcherFor(ReadCommand command);
11031094

1104-
/**
1105-
* Return a function which performs post processing on the results of a partition range read command.
1106-
* In future, this may be used as a generalized mechanism for transforming results on the coordinator prior
1107-
* to returning them to the caller.
1108-
*
1109-
* This is used on the coordinator during execution of a range command to perform post
1110-
* processing of merged results obtained from the necessary replicas. This is the only way in which results are
1111-
* transformed in this way but this may change over time as usage is generalized.
1112-
* See CASSANDRA-8717 for further discussion.
1113-
*
1114-
* The function takes a PartitionIterator of the results from the replicas which has already been collated
1115-
* and reconciled, along with the command being executed. It returns another PartitionIterator containing the results
1116-
* of the transformation (which may be the same as the input if the transformation is a no-op).
1117-
*
1118-
* @param command the read command being executed
1119-
*/
1120-
default Function<PartitionIterator, PartitionIterator> postProcessor(ReadCommand command)
1121-
{
1122-
return partitions -> partitions;
1123-
}
1124-
11251095
/**
11261096
* Transform an initial {@link RowFilter} into the filter that will still need to applied to a set of Rows after
11271097
* the index has performed it's initial scan.

src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexQueryPlan.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.HashSet;
2121
import java.util.Set;
2222
import java.util.concurrent.TimeUnit;
23-
import java.util.function.Function;
2423
import javax.annotation.Nullable;
2524

2625
import com.google.common.collect.ImmutableSet;
@@ -30,7 +29,6 @@
3029
import org.apache.cassandra.db.DecoratedKey;
3130
import org.apache.cassandra.db.ReadCommand;
3231
import org.apache.cassandra.db.filter.RowFilter;
33-
import org.apache.cassandra.db.partitions.PartitionIterator;
3432
import org.apache.cassandra.db.rows.Row;
3533
import org.apache.cassandra.db.rows.Unfiltered;
3634
import org.apache.cassandra.index.Index;
@@ -206,19 +204,6 @@ public Index.Searcher searcherFor(ReadCommand command)
206204
DatabaseDescriptor.getRangeRpcTimeout(TimeUnit.MILLISECONDS));
207205
}
208206

209-
/**
210-
* Called on coordinator after merging replica responses before returning to client
211-
*/
212-
@Override
213-
public Function<PartitionIterator, PartitionIterator> postProcessor(ReadCommand command)
214-
{
215-
if (!isTopK())
216-
return partitions -> partitions;
217-
218-
// in case of top-k query, filter out rows that are not actually global top-K
219-
return partitions -> (PartitionIterator) new TopKProcessor(command).filter(partitions);
220-
}
221-
222207
/**
223208
* @return a filter with all the expressions that are user-defined
224209
*/

src/java/org/apache/cassandra/index/sai/plan/TopKProcessor.java

Lines changed: 32 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,7 @@
4949
import org.apache.cassandra.db.ReadCommand;
5050
import org.apache.cassandra.db.SinglePartitionReadCommand;
5151
import org.apache.cassandra.db.filter.RowFilter;
52-
import org.apache.cassandra.db.partitions.BasePartitionIterator;
5352
import org.apache.cassandra.db.partitions.ParallelCommandProcessor;
54-
import org.apache.cassandra.db.partitions.PartitionIterator;
5553
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
5654
import org.apache.cassandra.db.rows.BaseRowIterator;
5755
import org.apache.cassandra.db.rows.Row;
@@ -62,7 +60,6 @@
6260
import org.apache.cassandra.index.sai.IndexContext;
6361
import org.apache.cassandra.index.sai.StorageAttachedIndex;
6462
import org.apache.cassandra.index.sai.utils.AbortedOperationException;
65-
import org.apache.cassandra.index.sai.utils.InMemoryPartitionIterator;
6663
import org.apache.cassandra.index.sai.utils.InMemoryUnfilteredPartitionIterator;
6764
import org.apache.cassandra.index.sai.utils.PartitionInfo;
6865
import org.apache.cassandra.index.sai.utils.PrimaryKey;
@@ -77,18 +74,7 @@
7774
/**
7875
* Processor applied to SAI based ORDER BY queries. This class could likely be refactored into either two filter
7976
* methods depending on where the processing is happening or into two classes.
80-
*
81-
* This processor performs the following steps on a replica:
82-
* - collect LIMIT rows from partition iterator, making sure that all are valid.
83-
* - return rows in Primary Key order
84-
*
85-
* This processor performs the following steps on a coordinator:
86-
* - consume all rows from the provided partition iterator and sort them according to the specified order.
87-
* For vectors, that is similarit score and for all others, that is the ordering defined by their
88-
* {@link org.apache.cassandra.db.marshal.AbstractType}. If there are multiple vector indexes,
89-
* the final score is the sum of all vector index scores.
90-
* - remove rows with the lowest scores from PQ if PQ size exceeds limit
91-
* - return rows from PQ in primary key order to caller
77+
* Ordering on the coordinator is delegated to CQL.
9278
*/
9379
public class TopKProcessor
9480
{
@@ -126,8 +112,8 @@ public TopKProcessor(ReadCommand command)
126112
/**
127113
* Executor to use for parallel index reads.
128114
* Defined by -Dcassandra.index_read.parallele=true/false, true by default.
129-
*
130-
* INDEX_READ uses 2 * cpus threads by default but can be overridden with -Dcassandra.index_read.parallel_thread_num=#value
115+
* </p>
116+
* INDEX_READ uses 2 * cpus threads by default but can be overridden with {@literal -Dcassandra.index_read.parallel_thread_num=<value>}
131117
*
132118
* @return stage to use, default INDEX_READ
133119
*/
@@ -150,7 +136,7 @@ private static LocalAwareExecutorPlus getExecutor()
150136
* Filter given partitions and keep the rows with highest scores. In case of {@link UnfilteredPartitionIterator},
151137
* all tombstones will be kept. Caller must close the supplied iterator.
152138
*/
153-
public <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BasePartitionIterator<R>> BasePartitionIterator<?> filter(P partitions)
139+
public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator partitions)
154140
{
155141
// filterInternal consumes the partitions iterator and creates a new one. Use a try-with-resources block
156142
// to ensure the original iterator is closed. We do not expect exceptions from filterInternal, but if they
@@ -162,12 +148,14 @@ public <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BasePartit
162148
}
163149
}
164150

165-
private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BasePartitionIterator<R>> BasePartitionIterator<?> filterInternal(P partitions)
151+
private UnfilteredPartitionIterator filterInternal(UnfilteredPartitionIterator partitions)
166152
{
167153
// priority queue ordered by score in descending order
168154
Comparator<Triple<PartitionInfo, Row, ?>> comparator;
169155
if (queryVector != null)
156+
{
170157
comparator = Comparator.comparing((Triple<PartitionInfo, Row, ?> t) -> (Float) t.getRight()).reversed();
158+
}
171159
else
172160
{
173161
comparator = Comparator.comparing(t -> (ByteBuffer) t.getRight(), indexContext.getValidator());
@@ -178,13 +166,15 @@ private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BaseParti
178166
// to store top-k results in primary key order
179167
TreeMap<PartitionInfo, TreeSet<Unfiltered>> unfilteredByPartition = new TreeMap<>(Comparator.comparing(p -> p.key));
180168

181-
if (PARALLEL_EXECUTOR != ImmediateExecutor.INSTANCE && partitions instanceof ParallelCommandProcessor) {
169+
if (PARALLEL_EXECUTOR != ImmediateExecutor.INSTANCE && partitions instanceof ParallelCommandProcessor)
170+
{
182171
ParallelCommandProcessor pIter = (ParallelCommandProcessor) partitions;
183172
List<Pair<PrimaryKey, SinglePartitionReadCommand>> commands = pIter.getUninitializedCommands();
184173
List<CompletableFuture<PartitionResults>> results = new ArrayList<>(commands.size());
185174

186175
int count = commands.size();
187-
for (Pair<PrimaryKey, SinglePartitionReadCommand> command: commands) {
176+
for (var command : commands)
177+
{
188178
CompletableFuture<PartitionResults> future = new CompletableFuture<>();
189179
results.add(future);
190180

@@ -204,7 +194,8 @@ private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BaseParti
204194
});
205195
}
206196

207-
for (CompletableFuture<PartitionResults> triplesFuture: results) {
197+
for (CompletableFuture<PartitionResults> triplesFuture : results)
198+
{
208199
PartitionResults pr;
209200
try
210201
{
@@ -219,10 +210,12 @@ private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BaseParti
219210
if (pr == null)
220211
continue;
221212
topK.addAll(pr.rows);
222-
for (Unfiltered uf: pr.tombstones)
213+
for (var uf : pr.tombstones)
223214
addUnfiltered(unfilteredByPartition, pr.partitionInfo, uf);
224215
}
225-
} else if (partitions instanceof StorageAttachedIndexSearcher.ScoreOrderedResultRetriever) {
216+
}
217+
else if (partitions instanceof StorageAttachedIndexSearcher.ScoreOrderedResultRetriever)
218+
{
226219
// FilteredPartitions does not implement ParallelizablePartitionIterator.
227220
// Realistically, this won't benefit from parallelizm as these are coming from in-memory/memtable data.
228221
int rowsMatched = 0;
@@ -235,19 +228,21 @@ private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BaseParti
235228
rowsMatched += processSingleRowPartition(unfilteredByPartition, partitionRowIterator);
236229
}
237230
}
238-
} else {
231+
}
232+
else
233+
{
239234
// FilteredPartitions does not implement ParallelizablePartitionIterator.
240235
// Realistically, this won't benefit from parallelizm as these are coming from in-memory/memtable data.
241236
while (partitions.hasNext())
242237
{
243238
// have to close to move to the next partition, otherwise hasNext() fails
244-
try (R partitionRowIterator = partitions.next())
239+
try (var partitionRowIterator = partitions.next())
245240
{
246241
if (queryVector != null)
247242
{
248243
PartitionResults pr = processPartition(partitionRowIterator);
249244
topK.addAll(pr.rows);
250-
for (var uf: pr.tombstones)
245+
for (var uf : pr.tombstones)
251246
addUnfiltered(unfilteredByPartition, pr.partitionInfo, uf);
252247
}
253248
else
@@ -258,7 +253,6 @@ private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BaseParti
258253
topK.add(Triple.of(PartitionInfo.create(partitionRowIterator), row, row.getCell(expression.column()).buffer()));
259254
}
260255
}
261-
262256
}
263257
}
264258
}
@@ -267,17 +261,17 @@ private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BaseParti
267261
for (var triple : topK.getUnsortedShared())
268262
addUnfiltered(unfilteredByPartition, triple.getLeft(), triple.getMiddle());
269263

270-
if (partitions instanceof PartitionIterator)
271-
return new InMemoryPartitionIterator(command, unfilteredByPartition);
272264
return new InMemoryUnfilteredPartitionIterator(command, unfilteredByPartition);
273265
}
274266

275-
private class PartitionResults {
267+
private class PartitionResults
268+
{
276269
final PartitionInfo partitionInfo;
277270
final SortedSet<Unfiltered> tombstones = new TreeSet<>(command.metadata().comparator);
278271
final List<Triple<PartitionInfo, Row, Float>> rows = new ArrayList<>();
279272

280-
PartitionResults(PartitionInfo partitionInfo) {
273+
PartitionResults(PartitionInfo partitionInfo)
274+
{
281275
this.partitionInfo = partitionInfo;
282276
}
283277

@@ -286,15 +280,17 @@ void addTombstone(Unfiltered uf)
286280
tombstones.add(uf);
287281
}
288282

289-
void addRow(Triple<PartitionInfo, Row, Float> triple) {
283+
void addRow(Triple<PartitionInfo, Row, Float> triple)
284+
{
290285
rows.add(triple);
291286
}
292287
}
293288

294289
/**
295290
* Processes a single partition, calculating scores for rows and extracting tombstones.
296291
*/
297-
private PartitionResults processPartition(BaseRowIterator<?> partitionRowIterator) {
292+
private PartitionResults processPartition(BaseRowIterator<?> partitionRowIterator)
293+
{
298294
// Compute key and static row score once per partition
299295
DecoratedKey key = partitionRowIterator.partitionKey();
300296
Row staticRow = partitionRowIterator.staticRow();
@@ -325,7 +321,8 @@ private PartitionResults processPartition(BaseRowIterator<?> partitionRowIterato
325321
* Processes a single partition, without scoring it.
326322
*/
327323
private int processSingleRowPartition(TreeMap<PartitionInfo, TreeSet<Unfiltered>> unfilteredByPartition,
328-
BaseRowIterator<?> partitionRowIterator) {
324+
BaseRowIterator<?> partitionRowIterator)
325+
{
329326
if (!partitionRowIterator.hasNext())
330327
return 0;
331328

0 commit comments

Comments
 (0)