Skip to content

Commit 29ef34e

Browse files
k-rusdjatnieks
authored andcommitted
CNDB-13739 count numRows correctly in SSTable SAI (#1695)
The number of rows stored in SSTable's SAI index was incorrect for analyzed indexes and for indexes on collections as it was counting posting lists for each term. In non-analyzed or non-collection indexes there is a term per row, while in analyzed index there can be many terms in a row. This led to incorrect count of rows. Fixes counting number of rows stored in SSTable SAI index. Minor fixes of code warnings in the affected files.
1 parent 68a92da commit 29ef34e

File tree

8 files changed

+117
-28
lines changed

8 files changed

+117
-28
lines changed

src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ public void complete(Stopwatch stopwatch) throws IOException
146146

147147
private long flush(DecoratedKey minKey, DecoratedKey maxKey, AbstractType<?> termComparator, MemtableTermsIterator terms, int maxSegmentRowId) throws IOException
148148
{
149+
long numPostings;
149150
long numRows;
150151
SegmentMetadataBuilder metadataBuilder = new SegmentMetadataBuilder(0, perIndexComponents);
151152
SegmentMetadata.ComponentMetadataMap indexMetas;
@@ -166,7 +167,8 @@ private long flush(DecoratedKey minKey, DecoratedKey maxKey, AbstractType<?> ter
166167
);
167168

168169
indexMetas = writer.writeAll(metadataBuilder.intercept(terms), docLengths);
169-
numRows = writer.getPostingsCount();
170+
numPostings = writer.getPostingsCount();
171+
numRows = docLengths.size();
170172
}
171173
}
172174
else
@@ -180,18 +182,20 @@ private long flush(DecoratedKey minKey, DecoratedKey maxKey, AbstractType<?> ter
180182
{
181183
ImmutableOneDimPointValues values = ImmutableOneDimPointValues.fromTermEnum(terms, termComparator);
182184
indexMetas = writer.writeAll(metadataBuilder.intercept(values));
183-
numRows = writer.getPointCount();
185+
numPostings = writer.getPointCount();
186+
numRows = numPostings;
184187
}
185188
}
186189

187190
// If no rows were written we need to delete any created column index components
188191
// so that the index is correctly identified as being empty (only having a completion marker)
189-
if (numRows == 0)
192+
if (numPostings == 0)
190193
{
191194
perIndexComponents.forceDeleteAllComponents();
192195
return 0;
193196
}
194197

198+
metadataBuilder.setNumRows(numRows);
195199
metadataBuilder.setKeyRange(pkFactory.createPartitionKeyOnly(minKey), pkFactory.createPartitionKeyOnly(maxKey));
196200
metadataBuilder.setRowIdRange(terms.getMinSSTableRowId(), terms.getMaxSSTableRowId());
197201
metadataBuilder.setTermRange(terms.getMinTerm(), terms.getMaxTerm());
@@ -203,7 +207,7 @@ private long flush(DecoratedKey minKey, DecoratedKey maxKey, AbstractType<?> ter
203207
SegmentMetadata.write(writer, Collections.singletonList(metadata));
204208
}
205209

206-
return numRows;
210+
return numPostings;
207211
}
208212

209213
private boolean writeFrequencies()

src/java/org/apache/cassandra/index/sai/disk/v1/PartitionAwarePrimaryKeyMap.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.cassandra.index.sai.disk.v1;
2020

2121
import java.io.IOException;
22-
import java.nio.ByteBuffer;
2322
import javax.annotation.concurrent.NotThreadSafe;
2423
import javax.annotation.concurrent.ThreadSafe;
2524

src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ public void addRow(PrimaryKey key, Row row, long sstableRowId) throws IOExceptio
113113
if (indexContext.getDefinition().isStatic() != row.isStatic())
114114
return;
115115

116+
boolean addedRow = false;
116117
if (indexContext.isNonFrozenCollection())
117118
{
118119
Iterator<ByteBuffer> valueIterator = indexContext.getValuesOf(row, nowInSec);
@@ -121,16 +122,21 @@ public void addRow(PrimaryKey key, Row row, long sstableRowId) throws IOExceptio
121122
while (valueIterator.hasNext())
122123
{
123124
ByteBuffer value = valueIterator.next();
124-
addTerm(TypeUtil.asIndexBytes(value.duplicate(), indexContext.getValidator()), key, sstableRowId, indexContext.getValidator());
125+
addedRow = addTerm(TypeUtil.asIndexBytes(value.duplicate(), indexContext.getValidator()), key, sstableRowId, indexContext.getValidator());
125126
}
126127
}
127128
}
128129
else
129130
{
130131
ByteBuffer value = indexContext.getValueOf(key.partitionKey(), row, nowInSec);
131132
if (value != null)
132-
addTerm(TypeUtil.asIndexBytes(value.duplicate(), indexContext.getValidator()), key, sstableRowId, indexContext.getValidator());
133+
{
134+
addedRow = addTerm(TypeUtil.asIndexBytes(value.duplicate(), indexContext.getValidator()), key, sstableRowId, indexContext.getValidator());
135+
}
133136
}
137+
if (addedRow)
138+
currentBuilder.incRowCount();
139+
134140
}
135141

136142
@Override
@@ -225,10 +231,10 @@ private boolean maybeAbort()
225231
return true;
226232
}
227233

228-
private void addTerm(ByteBuffer term, PrimaryKey key, long sstableRowId, AbstractType<?> type) throws IOException
234+
private boolean addTerm(ByteBuffer term, PrimaryKey key, long sstableRowId, AbstractType<?> type) throws IOException
229235
{
230236
if (!indexContext.validateMaxTermSize(key.partitionKey(), term))
231-
return;
237+
return false;
232238

233239
if (currentBuilder == null)
234240
{
@@ -241,10 +247,11 @@ else if (shouldFlush(sstableRowId))
241247
}
242248

243249
if (term.remaining() == 0 && TypeUtil.skipsEmptyValue(indexContext.getValidator()))
244-
return;
250+
return false;
245251

246252
long allocated = currentBuilder.analyzeAndAdd(term, type, key, sstableRowId);
247253
limiter.increment(allocated);
254+
return true;
248255
}
249256

250257
private boolean shouldFlush(long sstableRowId)

src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,7 @@ public SegmentMetadata flush() throws IOException
464464
metadataBuilder.setKeyRange(minKey, maxKey);
465465
metadataBuilder.setRowIdRange(minSSTableRowId, maxSSTableRowId);
466466
metadataBuilder.setTermRange(minTerm, maxTerm);
467+
metadataBuilder.setNumRows(getRowCount());
467468

468469
flushInternal(metadataBuilder);
469470
return metadataBuilder.build();
@@ -502,8 +503,6 @@ private long add(List<ByteBuffer> terms, PrimaryKey key, long sstableRowId)
502503
maxTerm = TypeUtil.max(term, maxTerm, termComparator, Version.latest());
503504
}
504505

505-
rowCount++;
506-
507506
// segmentRowIdOffset should encode sstableRowId into Integer
508507
int segmentRowId = Math.toIntExact(sstableRowId - segmentRowIdOffset);
509508

@@ -600,6 +599,11 @@ int getRowCount()
600599
return rowCount;
601600
}
602601

602+
void incRowCount()
603+
{
604+
rowCount++;
605+
}
606+
603607
/**
604608
* @return true if next SSTable row ID exceeds max segment row ID
605609
*/

src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadataBuilder.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ public SegmentMetadataBuilder(long segmentRowIdOffset, IndexComponents.ForWrite
8686
this.termsDistributionBuilder = new TermsDistribution.Builder(context.getValidator(), byteComparableVersion, histogramSize, mostFrequentTermsCount);
8787
}
8888

89+
public void setNumRows(long numRows)
90+
{
91+
this.numRows = numRows;
92+
}
93+
8994
public void setKeyRange(@Nonnull PrimaryKey minKey, @Nonnull PrimaryKey maxKey)
9095
{
9196
assert minKey.compareTo(maxKey) <= 0: "minKey (" + minKey + ") must not be greater than (" + maxKey + ')';
@@ -129,7 +134,6 @@ void add(ByteComparable term, int rowCount)
129134
if (built)
130135
throw new IllegalStateException("Segment metadata already built, no more additions allowed");
131136

132-
numRows += rowCount;
133137
termsDistributionBuilder.add(term, rowCount);
134138
}
135139

@@ -360,7 +364,7 @@ public void intersect(IntersectVisitor visitor) throws IOException
360364
}
361365

362366
@Override
363-
public void close() throws IOException
367+
public void close()
364368
{
365369
if (lastTerm != null)
366370
{
@@ -371,5 +375,3 @@ public void close() throws IOException
371375
}
372376

373377
}
374-
375-

test/unit/org/apache/cassandra/index/sai/cql/BM25Test.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,17 @@
2121
import java.util.HashMap;
2222
import java.util.HashSet;
2323
import java.util.List;
24+
import java.util.Objects;
2425
import java.util.concurrent.ExecutorService;
2526
import java.util.concurrent.Executors;
2627
import java.util.concurrent.Future;
2728
import java.util.regex.Pattern;
2829
import java.util.stream.Collectors;
2930

31+
import org.apache.cassandra.index.sai.SSTableIndex;
32+
import org.apache.cassandra.index.sai.memory.MemtableIndex;
33+
import org.apache.cassandra.index.sai.memory.TrieMemoryIndex;
34+
import org.apache.cassandra.index.sai.memory.TrieMemtableIndex;
3035
import org.assertj.core.api.Assertions;
3136
import org.junit.Before;
3237
import org.junit.Test;
@@ -816,6 +821,71 @@ public void testOrderingSeveralSegments() throws Throwable
816821
"climate");
817822
}
818823

824+
/**
825+
* Asserts that memtable SAI index maintains expected row count, which is, then,
826+
* used to store row count in SSTable SAI index and its segments. This is also
827+
* asserted.
828+
*/
829+
@Test
830+
public void testIndexMetaForNumRows()
831+
{
832+
createTable("CREATE TABLE %s (id int PRIMARY KEY, category text, score int, " +
833+
"title text, body text, bodyset set<text>, " +
834+
"map_category map<int, text>, map_body map<text, text>)");
835+
String bodyIndexName = createAnalyzedIndex("body", true);
836+
String scoreIndexName = createIndex("CREATE CUSTOM INDEX ON %s (score) USING 'StorageAttachedIndex'");
837+
String mapIndexName = createIndex("CREATE CUSTOM INDEX ON %s (map_category) USING 'StorageAttachedIndex'");
838+
insertCollectionData();
839+
840+
assertNumRowsMemtable(scoreIndexName, DATASET.length);
841+
assertNumRowsMemtable(bodyIndexName, DATASET.length);
842+
assertNumRowsMemtable(mapIndexName, DATASET.length);
843+
execute("DELETE FROM %s WHERE id = ?", 5);
844+
flush();
845+
assertNumRowsSSTable(scoreIndexName, DATASET.length - 1);
846+
assertNumRowsSSTable(bodyIndexName, DATASET.length - 1);
847+
assertNumRowsSSTable(mapIndexName, DATASET.length - 1);
848+
execute("DELETE FROM %s WHERE id = ?", 10);
849+
flush();
850+
assertNumRowsSSTable(scoreIndexName, DATASET.length - 1);
851+
assertNumRowsSSTable(bodyIndexName, DATASET.length - 1);
852+
assertNumRowsSSTable(mapIndexName, DATASET.length - 1);
853+
compact();
854+
assertNumRowsSSTable(scoreIndexName, DATASET.length - 2);
855+
assertNumRowsSSTable(bodyIndexName, DATASET.length - 2);
856+
assertNumRowsSSTable(mapIndexName, DATASET.length - 2);
857+
}
858+
859+
private void assertNumRowsMemtable(String indexName, int expectedNumRows)
860+
{
861+
int rowCount = 0;
862+
863+
for (var memtable : getCurrentColumnFamilyStore().getAllMemtables())
864+
{
865+
MemtableIndex memIndex = getIndexContext(indexName).getMemtableIndex(memtable);
866+
assert memIndex instanceof TrieMemtableIndex;
867+
rowCount = Arrays.stream(((TrieMemtableIndex) memIndex).getRangeIndexes())
868+
.map(index -> ((TrieMemoryIndex) index).getDocLengths().size())
869+
.mapToInt(Integer::intValue).sum();
870+
}
871+
assertEquals(expectedNumRows, rowCount);
872+
}
873+
874+
private void assertNumRowsSSTable(String indexName, int expectedNumRows)
875+
{
876+
long indexRowCount = 0;
877+
long segmentRowCount = 0;
878+
for (SSTableIndex sstableIndex : getIndexContext(indexName).getView())
879+
{
880+
indexRowCount += sstableIndex.getRowCount();
881+
segmentRowCount += sstableIndex.getSegments().stream()
882+
.map(s -> Objects.requireNonNull(s.metadata).numRows)
883+
.mapToLong(Long::longValue).sum();
884+
}
885+
assertEquals(indexRowCount, segmentRowCount);
886+
assertEquals(expectedNumRows, indexRowCount);
887+
}
888+
819889
private final static Object[][] DATASET =
820890
{
821891
{ 1, "Climate", 5, "Climate change is a pressing issue. Climate patterns are shifting globally. Scientists study climate data daily.", 1 },

test/unit/org/apache/cassandra/index/sai/disk/v1/InvertedIndexSearcherTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,8 @@
4949
import org.apache.cassandra.service.StorageService;
5050
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
5151
import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse;
52+
import org.assertj.core.api.Assertions;
5253

53-
import static org.hamcrest.Matchers.instanceOf;
54-
import static org.hamcrest.Matchers.is;
5554
import static org.mockito.Mockito.mock;
5655
import static org.mockito.Mockito.when;
5756

@@ -194,6 +193,7 @@ private IndexSearcher buildIndexAndOpenSearcher(int terms, List<InvertedIndexBui
194193
MemtableTermsIterator termsIterator = new MemtableTermsIterator(null, null, iter);
195194
SegmentMetadata.ComponentMetadataMap indexMetas = writer.writeAll(metadataBuilder.intercept(termsIterator), docLengths);
196195
metadataBuilder.setComponentsMetadata(indexMetas);
196+
metadataBuilder.setNumRows(docLengths.values().stream().mapToInt(i -> i).sum());
197197
}
198198

199199
final SegmentMetadata segmentMetadata = metadataBuilder.build();
@@ -207,7 +207,7 @@ private IndexSearcher buildIndexAndOpenSearcher(int terms, List<InvertedIndexBui
207207
indexContext,
208208
indexFiles,
209209
segmentMetadata);
210-
assertThat(searcher, is(instanceOf(InvertedIndexSearcher.class)));
210+
Assertions.assertThat(searcher).isInstanceOf(InvertedIndexSearcher.class);
211211
return searcher;
212212
}
213213
}

test/unit/org/apache/cassandra/index/sai/disk/v1/kdtree/KDTreeIndexBuilder.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,7 @@
6868
import org.apache.cassandra.utils.bytecomparable.ByteSource;
6969
import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse;
7070

71-
import static org.hamcrest.Matchers.instanceOf;
72-
import static org.hamcrest.Matchers.is;
73-
import static org.junit.Assert.assertThat;
71+
import static org.assertj.core.api.Assertions.assertThat;
7472
import static org.junit.Assert.assertTrue;
7573
import static org.mockito.Mockito.mock;
7674
import static org.mockito.Mockito.when;
@@ -182,6 +180,7 @@ protected Pair<ByteComparable.Preencoded, List<RowMapping.RowIdWithFrequency>> c
182180
UTF8Type.instance.fromString("d"));
183181
metadataBuilder.setKeyRange(SAITester.TEST_FACTORY.createTokenOnly(Murmur3Partitioner.instance.decorateKey(UTF8Type.instance.fromString("a")).getToken()),
184182
SAITester.TEST_FACTORY.createTokenOnly(Murmur3Partitioner.instance.decorateKey(UTF8Type.instance.fromString("b")).getToken()));
183+
metadataBuilder.setNumRows(size);
185184
metadata = metadataBuilder.build();
186185
}
187186

@@ -192,7 +191,7 @@ protected Pair<ByteComparable.Preencoded, List<RowMapping.RowIdWithFrequency>> c
192191
when(sstableContext.usedPerSSTableComponents()).thenReturn(indexDescriptor.perSSTableComponents());
193192

194193
IndexSearcher searcher = Version.latest().onDiskFormat().newIndexSearcher(sstableContext, indexContext, indexFiles, metadata);
195-
assertThat(searcher, is(instanceOf(KDTreeIndexSearcher.class)));
194+
assertThat(searcher).isInstanceOf(KDTreeIndexSearcher.class);
196195
return (KDTreeIndexSearcher) searcher;
197196
}
198197
}
@@ -293,7 +292,7 @@ public static IndexSearcher buildShortSearcher(IndexDescriptor indexDescriptor,
293292
*/
294293
public static AbstractGuavaIterator<Pair<ByteComparable.Preencoded, IntArrayList>> singleOrd(Iterator<ByteBuffer> terms, AbstractType<?> type, int segmentRowIdOffset, int size)
295294
{
296-
return new AbstractGuavaIterator<Pair<ByteComparable.Preencoded, IntArrayList>>()
295+
return new AbstractGuavaIterator<>()
297296
{
298297
private long currentTerm = 0;
299298
private int currentSegmentRowId = segmentRowIdOffset;
@@ -343,11 +342,13 @@ public static Iterator<ByteBuffer> longRange(long startInclusive, long endExclus
343342
public static Iterator<ByteBuffer> decimalRange(final BigDecimal startInclusive, final BigDecimal endExclusive)
344343
{
345344
int n = endExclusive.subtract(startInclusive).intValueExact() * 10;
346-
final Supplier<BigDecimal> generator = new Supplier<BigDecimal>() {
345+
final Supplier<BigDecimal> generator = new Supplier<>()
346+
{
347347
BigDecimal current = startInclusive;
348348

349349
@Override
350-
public BigDecimal get() {
350+
public BigDecimal get()
351+
{
351352
BigDecimal result = current;
352353
current = current.add(ONE_TENTH);
353354
return result;
@@ -363,11 +364,13 @@ public BigDecimal get() {
363364
public static Iterator<ByteBuffer> bigIntegerRange(final BigInteger startInclusive, final BigInteger endExclusive)
364365
{
365366
int n = endExclusive.subtract(startInclusive).intValueExact();
366-
final Supplier<BigInteger> generator = new Supplier<BigInteger>() {
367+
final Supplier<BigInteger> generator = new Supplier<>()
368+
{
367369
BigInteger current = startInclusive;
368370

369371
@Override
370-
public BigInteger get() {
372+
public BigInteger get()
373+
{
371374
BigInteger result = current;
372375
current = current.add(BigInteger.ONE);
373376
return result;

0 commit comments

Comments
 (0)