Skip to content

Commit 404211a

Browse files
adelapenadjatnieks
authored andcommitted
CNDB-12120: 3 test failures in MultiNodeBillingTest (#1462)
This reverts commit c06c94c. It seems the removal of `Index#postProcessor` by CNDB-11762 broke some tests in CNDB's `MultiNodeBillingTest`. Unfortunately that patch [was merged before creating the PR bumping the CC version used by CNDB](#1422 (comment)). [The CNDB PR](riptano/cndb#12076) was created after that merging but it was superseded by other CC version bumps. So I'm adding this reversal so we can investigate how the removal of `Index#postProcessor` affects those tests.
1 parent b47c8b4 commit 404211a

File tree

12 files changed

+242
-205
lines changed

12 files changed

+242
-205
lines changed

src/java/org/apache/cassandra/cql3/statements/SelectStatement.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,9 @@ public ReadQuery getQuery(QueryOptions options,
518518
String.format(TOPK_CONSISTENCY_LEVEL_ERROR, options.getConsistency()));
519519

520520
// Consistency levels with more than one replica are downgraded to ONE/LOCAL_ONE.
521-
if (options.getConsistency().needsReconciliation())
521+
if (options.getConsistency() != ConsistencyLevel.ONE &&
522+
options.getConsistency() != ConsistencyLevel.LOCAL_ONE &&
523+
options.getConsistency() != ConsistencyLevel.NODE_LOCAL)
522524
{
523525
ConsistencyLevel supplied = options.getConsistency();
524526
ConsistencyLevel downgrade = supplied.isDatacenterLocal() ? ConsistencyLevel.LOCAL_ONE : ConsistencyLevel.ONE;

src/java/org/apache/cassandra/db/ReadCommand.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,10 +573,21 @@ public ReadExecutionController executionController(boolean trackRepairedStatus)
573573
return ReadExecutionController.forCommand(this, trackRepairedStatus);
574574
}
575575

576+
/**
577+
* Allow to post-process the result of the query after it has been reconciled on the coordinator
578+
* but before it is passed to the CQL layer to return the ResultSet.
579+
*
580+
* See CASSANDRA-8717 for why this exists.
581+
*/
582+
public PartitionIterator postReconciliationProcessing(PartitionIterator result)
583+
{
584+
return indexQueryPlan == null ? result : indexQueryPlan.postProcessor(this).apply(result);
585+
}
586+
576587
@Override
577588
public PartitionIterator executeInternal(ReadExecutionController controller)
578589
{
579-
return UnfilteredPartitionIterators.filter(executeLocally(controller), nowInSec());
590+
return postReconciliationProcessing(UnfilteredPartitionIterators.filter(executeLocally(controller), nowInSec()));
580591
}
581592

582593
public ReadExecutionController executionController()

src/java/org/apache/cassandra/index/Index.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Optional;
3131
import java.util.Set;
3232
import java.util.concurrent.Callable;
33+
import java.util.function.Function;
3334
import java.util.function.Predicate;
3435
import java.util.function.Supplier;
3536
import javax.annotation.Nonnull;
@@ -50,6 +51,7 @@
5051
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
5152
import org.apache.cassandra.db.marshal.AbstractType;
5253
import org.apache.cassandra.db.memtable.Memtable;
54+
import org.apache.cassandra.db.partitions.PartitionIterator;
5355
import org.apache.cassandra.db.partitions.PartitionUpdate;
5456
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
5557
import org.apache.cassandra.db.rows.Row;
@@ -132,6 +134,13 @@
132134
* cannot support a given Expression. After filtering, the set of candidate indexes are ranked according to the result
133135
* of getEstimatedResultRows and the most selective (i.e. the one expected to return the smallest number of results) is
134136
* chosen. A Searcher instance is then obtained from the searcherFor method and used to perform the actual Index lookup.
137+
* Finally, Indexes can define a post processing step to be performed on the coordinator, after results (partitions from
138+
* the primary table) have been received from replicas and reconciled. This post processing is defined as a
139+
* {@code java.util.functions.BiFunction<PartitionIterator, RowFilter, PartitionIterator>}, that is a function which takes as
140+
* arguments a PartitionIterator (containing the reconciled result rows) and a RowFilter (from the ReadCommand being
141+
* executed) and returns another iterator of partitions, possibly having transformed the initial results in some way.
142+
* The post processing function is obtained from the Index's postProcessorFor method; the built-in indexes which ship
143+
* with Cassandra return a no-op function here.
135144
*
136145
* An optional static method may be provided to validate custom index options (two variants are supported):
137146
*
@@ -1089,6 +1098,27 @@ default void validate(ReadCommand command) throws InvalidRequestException
10891098
*/
10901099
Searcher searcherFor(ReadCommand command);
10911100

1101+
/**
1102+
* Return a function which performs post processing on the results of a partition range read command.
1103+
* In future, this may be used as a generalized mechanism for transforming results on the coordinator prior
1104+
* to returning them to the caller.
1105+
*
1106+
* This is used on the coordinator during execution of a range command to perform post
1107+
* processing of merged results obtained from the necessary replicas. This is the only way in which results are
1108+
* transformed in this way but this may change over time as usage is generalized.
1109+
* See CASSANDRA-8717 for further discussion.
1110+
*
1111+
* The function takes a PartitionIterator of the results from the replicas which has already been collated
1112+
* and reconciled, along with the command being executed. It returns another PartitionIterator containing the results
1113+
* of the transformation (which may be the same as the input if the transformation is a no-op).
1114+
*
1115+
* @param command the read command being executed
1116+
*/
1117+
default Function<PartitionIterator, PartitionIterator> postProcessor(ReadCommand command)
1118+
{
1119+
return partitions -> partitions;
1120+
}
1121+
10921122
/**
10931123
* Transform an initial {@link RowFilter} into the filter that will still need to applied to a set of Rows after
10941124
* the index has performed it's initial scan.

src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexQueryPlan.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.HashSet;
2121
import java.util.Set;
2222
import java.util.concurrent.TimeUnit;
23+
import java.util.function.Function;
2324
import javax.annotation.Nullable;
2425

2526
import com.google.common.collect.ImmutableSet;
@@ -29,6 +30,7 @@
2930
import org.apache.cassandra.db.DecoratedKey;
3031
import org.apache.cassandra.db.ReadCommand;
3132
import org.apache.cassandra.db.filter.RowFilter;
33+
import org.apache.cassandra.db.partitions.PartitionIterator;
3234
import org.apache.cassandra.db.rows.Row;
3335
import org.apache.cassandra.db.rows.Unfiltered;
3436
import org.apache.cassandra.index.Index;
@@ -207,6 +209,19 @@ public Index.Searcher searcherFor(ReadCommand command)
207209
DatabaseDescriptor.getRangeRpcTimeout(TimeUnit.MILLISECONDS));
208210
}
209211

212+
/**
213+
* Called on coordinator after merging replica responses before returning to client
214+
*/
215+
@Override
216+
public Function<PartitionIterator, PartitionIterator> postProcessor(ReadCommand command)
217+
{
218+
if (!isTopK())
219+
return partitions -> partitions;
220+
221+
// in case of top-k query, filter out rows that are not actually global top-K
222+
return partitions -> (PartitionIterator) new TopKProcessor(command).filter(partitions);
223+
}
224+
210225
/**
211226
* @return a filter with all the expressions that are user-defined
212227
*/

src/java/org/apache/cassandra/index/sai/plan/TopKProcessor.java

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@
4848
import org.apache.cassandra.db.Keyspace;
4949
import org.apache.cassandra.db.ReadCommand;
5050
import org.apache.cassandra.db.filter.RowFilter;
51+
import org.apache.cassandra.db.partitions.BasePartitionIterator;
5152
import org.apache.cassandra.db.partitions.ParallelCommandProcessor;
53+
import org.apache.cassandra.db.partitions.PartitionIterator;
5254
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
5355
import org.apache.cassandra.db.rows.BaseRowIterator;
5456
import org.apache.cassandra.db.rows.Row;
@@ -58,6 +60,7 @@
5860
import org.apache.cassandra.index.sai.IndexContext;
5961
import org.apache.cassandra.index.sai.StorageAttachedIndex;
6062
import org.apache.cassandra.index.sai.utils.AbortedOperationException;
63+
import org.apache.cassandra.index.sai.utils.InMemoryPartitionIterator;
6164
import org.apache.cassandra.index.sai.utils.InMemoryUnfilteredPartitionIterator;
6265
import org.apache.cassandra.index.sai.utils.PartitionInfo;
6366
import org.apache.cassandra.index.sai.utils.TypeUtil;
@@ -71,7 +74,18 @@
7174
/**
7275
* Processor applied to SAI based ORDER BY queries. This class could likely be refactored into either two filter
7376
* methods depending on where the processing is happening or into two classes.
74-
* Ordering on the coordinator is delegated to CQL.
77+
*
78+
* This processor performs the following steps on a replica:
79+
* - collect LIMIT rows from partition iterator, making sure that all are valid.
80+
* - return rows in Primary Key order
81+
*
82+
* This processor performs the following steps on a coordinator:
83+
* - consume all rows from the provided partition iterator and sort them according to the specified order.
84+
* For vectors, that is similarit score and for all others, that is the ordering defined by their
85+
* {@link org.apache.cassandra.db.marshal.AbstractType}. If there are multiple vector indexes,
86+
* the final score is the sum of all vector index scores.
87+
* - remove rows with the lowest scores from PQ if PQ size exceeds limit
88+
* - return rows from PQ in primary key order to caller
7589
*/
7690
public class TopKProcessor
7791
{
@@ -109,7 +123,7 @@ public TopKProcessor(ReadCommand command)
109123
/**
110124
* Executor to use for parallel index reads.
111125
* Defined by -Dcassandra.index_read.parallele=true/false, true by default.
112-
* </p>
126+
*
113127
* INDEX_READ uses 2 * cpus threads by default but can be overridden with {@literal -Dcassandra.index_read.parallel_thread_num=<value>}
114128
*
115129
* @return stage to use, default INDEX_READ
@@ -133,7 +147,7 @@ private static LocalAwareExecutorPlus getExecutor()
133147
* Filter given partitions and keep the rows with highest scores. In case of {@link UnfilteredPartitionIterator},
134148
* all tombstones will be kept. Caller must close the supplied iterator.
135149
*/
136-
public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator partitions)
150+
public <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BasePartitionIterator<R>> BasePartitionIterator<?> filter(P partitions)
137151
{
138152
// filterInternal consumes the partitions iterator and creates a new one. Use a try-with-resources block
139153
// to ensure the original iterator is closed. We do not expect exceptions from filterInternal, but if they
@@ -145,14 +159,12 @@ public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator partitions
145159
}
146160
}
147161

148-
private UnfilteredPartitionIterator filterInternal(UnfilteredPartitionIterator partitions)
162+
private <U extends Unfiltered, R extends BaseRowIterator<U>, P extends BasePartitionIterator<R>> BasePartitionIterator<?> filterInternal(P partitions)
149163
{
150164
// priority queue ordered by score in descending order
151165
Comparator<Triple<PartitionInfo, Row, ?>> comparator;
152166
if (queryVector != null)
153-
{
154167
comparator = Comparator.comparing((Triple<PartitionInfo, Row, ?> t) -> (Float) t.getRight()).reversed();
155-
}
156168
else
157169
{
158170
comparator = Comparator.comparing(t -> (ByteBuffer) t.getRight(), indexContext.getValidator());
@@ -163,15 +175,13 @@ private UnfilteredPartitionIterator filterInternal(UnfilteredPartitionIterator p
163175
// to store top-k results in primary key order
164176
TreeMap<PartitionInfo, TreeSet<Unfiltered>> unfilteredByPartition = new TreeMap<>(Comparator.comparing(p -> p.key));
165177

166-
if (PARALLEL_EXECUTOR != ImmediateExecutor.INSTANCE && partitions instanceof ParallelCommandProcessor)
167-
{
178+
if (PARALLEL_EXECUTOR != ImmediateExecutor.INSTANCE && partitions instanceof ParallelCommandProcessor) {
168179
ParallelCommandProcessor pIter = (ParallelCommandProcessor) partitions;
169180
var commands = pIter.getUninitializedCommands();
170181
List<CompletableFuture<PartitionResults>> results = new ArrayList<>(commands.size());
171182

172183
int count = commands.size();
173-
for (var command : commands)
174-
{
184+
for (var command: commands) {
175185
CompletableFuture<PartitionResults> future = new CompletableFuture<>();
176186
results.add(future);
177187

@@ -191,8 +201,7 @@ private UnfilteredPartitionIterator filterInternal(UnfilteredPartitionIterator p
191201
});
192202
}
193203

194-
for (CompletableFuture<PartitionResults> triplesFuture : results)
195-
{
204+
for (CompletableFuture<PartitionResults> triplesFuture: results) {
196205
PartitionResults pr;
197206
try
198207
{
@@ -207,12 +216,10 @@ private UnfilteredPartitionIterator filterInternal(UnfilteredPartitionIterator p
207216
if (pr == null)
208217
continue;
209218
topK.addAll(pr.rows);
210-
for (var uf : pr.tombstones)
219+
for (var uf: pr.tombstones)
211220
addUnfiltered(unfilteredByPartition, pr.partitionInfo, uf);
212221
}
213-
}
214-
else if (partitions instanceof StorageAttachedIndexSearcher.ScoreOrderedResultRetriever)
215-
{
222+
} else if (partitions instanceof StorageAttachedIndexSearcher.ScoreOrderedResultRetriever) {
216223
// FilteredPartitions does not implement ParallelizablePartitionIterator.
217224
// Realistically, this won't benefit from parallelizm as these are coming from in-memory/memtable data.
218225
int rowsMatched = 0;
@@ -225,9 +232,7 @@ else if (partitions instanceof StorageAttachedIndexSearcher.ScoreOrderedResultRe
225232
rowsMatched += processSingleRowPartition(unfilteredByPartition, partitionRowIterator);
226233
}
227234
}
228-
}
229-
else
230-
{
235+
} else {
231236
// FilteredPartitions does not implement ParallelizablePartitionIterator.
232237
// Realistically, this won't benefit from parallelizm as these are coming from in-memory/memtable data.
233238
while (partitions.hasNext())
@@ -239,7 +244,7 @@ else if (partitions instanceof StorageAttachedIndexSearcher.ScoreOrderedResultRe
239244
{
240245
PartitionResults pr = processPartition(partitionRowIterator);
241246
topK.addAll(pr.rows);
242-
for (var uf : pr.tombstones)
247+
for (var uf: pr.tombstones)
243248
addUnfiltered(unfilteredByPartition, pr.partitionInfo, uf);
244249
}
245250
else
@@ -250,6 +255,7 @@ else if (partitions instanceof StorageAttachedIndexSearcher.ScoreOrderedResultRe
250255
topK.add(Triple.of(PartitionInfo.create(partitionRowIterator), row, row.getCell(expression.column()).buffer()));
251256
}
252257
}
258+
253259
}
254260
}
255261
}
@@ -258,17 +264,17 @@ else if (partitions instanceof StorageAttachedIndexSearcher.ScoreOrderedResultRe
258264
for (var triple : topK.getUnsortedShared())
259265
addUnfiltered(unfilteredByPartition, triple.getLeft(), triple.getMiddle());
260266

267+
if (partitions instanceof PartitionIterator)
268+
return new InMemoryPartitionIterator(command, unfilteredByPartition);
261269
return new InMemoryUnfilteredPartitionIterator(command, unfilteredByPartition);
262270
}
263271

264-
private class PartitionResults
265-
{
272+
private class PartitionResults {
266273
final PartitionInfo partitionInfo;
267274
final SortedSet<Unfiltered> tombstones = new TreeSet<>(command.metadata().comparator);
268275
final List<Triple<PartitionInfo, Row, Float>> rows = new ArrayList<>();
269276

270-
PartitionResults(PartitionInfo partitionInfo)
271-
{
277+
PartitionResults(PartitionInfo partitionInfo) {
272278
this.partitionInfo = partitionInfo;
273279
}
274280

@@ -277,17 +283,15 @@ void addTombstone(Unfiltered uf)
277283
tombstones.add(uf);
278284
}
279285

280-
void addRow(Triple<PartitionInfo, Row, Float> triple)
281-
{
286+
void addRow(Triple<PartitionInfo, Row, Float> triple) {
282287
rows.add(triple);
283288
}
284289
}
285290

286291
/**
287292
* Processes a single partition, calculating scores for rows and extracting tombstones.
288293
*/
289-
private PartitionResults processPartition(BaseRowIterator<?> partitionRowIterator)
290-
{
294+
private PartitionResults processPartition(BaseRowIterator<?> partitionRowIterator) {
291295
// Compute key and static row score once per partition
292296
DecoratedKey key = partitionRowIterator.partitionKey();
293297
Row staticRow = partitionRowIterator.staticRow();
@@ -318,8 +322,7 @@ private PartitionResults processPartition(BaseRowIterator<?> partitionRowIterato
318322
* Processes a single partition, without scoring it.
319323
*/
320324
private int processSingleRowPartition(TreeMap<PartitionInfo, TreeSet<Unfiltered>> unfilteredByPartition,
321-
BaseRowIterator<?> partitionRowIterator)
322-
{
325+
BaseRowIterator<?> partitionRowIterator) {
323326
if (!partitionRowIterator.hasNext())
324327
return 0;
325328

0 commit comments

Comments
 (0)