Skip to content

Commit a05d1e2

Browse files
mike-tr-adamsondjatnieks
authored andcommitted
STAR-813: New MessagingService version for Stargazer (#218)
(cherry picked from commit ac0be81) (cherry picked from commit 39fb781) (cherry picked from commit 1937874) (cherry picked from commit af134b4) (cherry picked from commit a37767a) (cherry picked from commit c8a0639)
1 parent 3fcc76c commit a05d1e2

File tree

9 files changed

+63
-9
lines changed

9 files changed

+63
-9
lines changed

src/java/org/apache/cassandra/db/CounterMutation.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.cassandra.utils.btree.BTreeSet;
4848

4949
import static java.util.concurrent.TimeUnit.*;
50+
import static org.apache.cassandra.net.MessagingService.VERSION_SG_10;
5051
import static org.apache.cassandra.net.MessagingService.VERSION_30;
5152
import static org.apache.cassandra.net.MessagingService.VERSION_3014;
5253
import static org.apache.cassandra.net.MessagingService.VERSION_40;
@@ -326,6 +327,7 @@ public long getTimeout(TimeUnit unit)
326327
private int serializedSize30;
327328
private int serializedSize3014;
328329
private int serializedSize40;
330+
private int serializedSizeSG10;
329331

330332
public int serializedSize(int version)
331333
{
@@ -343,6 +345,10 @@ public int serializedSize(int version)
343345
if (serializedSize40 == 0)
344346
serializedSize40 = (int) serializer.serializedSize(this, VERSION_40);
345347
return serializedSize40;
348+
case VERSION_SG_10:
349+
if (serializedSizeSG10 == 0)
350+
serializedSizeSG10 = (int) serializer.serializedSize(this, VERSION_SG_10);
351+
return serializedSizeSG10;
346352
default:
347353
throw new IllegalStateException("Unknown serialization version: " + version);
348354
}

src/java/org/apache/cassandra/db/Mutation.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.cassandra.schema.TableMetadata;
4040
import org.apache.cassandra.utils.ByteBufferUtil;
4141

42+
import static org.apache.cassandra.net.MessagingService.VERSION_SG_10;
4243
import static org.apache.cassandra.net.MessagingService.VERSION_30;
4344
import static org.apache.cassandra.net.MessagingService.VERSION_3014;
4445
import static org.apache.cassandra.net.MessagingService.VERSION_40;
@@ -282,6 +283,7 @@ public String toString(boolean shallow)
282283
private int serializedSize30;
283284
private int serializedSize3014;
284285
private int serializedSize40;
286+
private int serializedSizeSG10;
285287

286288
public int serializedSize(int version)
287289
{
@@ -299,6 +301,10 @@ public int serializedSize(int version)
299301
if (serializedSize40 == 0)
300302
serializedSize40 = (int) serializer.serializedSize(this, VERSION_40);
301303
return serializedSize40;
304+
case VERSION_SG_10:
305+
if (serializedSizeSG10 == 0)
306+
serializedSizeSG10 = (int) serializer.serializedSize(this, VERSION_SG_10);
307+
return serializedSizeSG10;
302308
default:
303309
throw new IllegalStateException("Unknown serialization version: " + version);
304310
}

src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,15 @@ public class CommitLogDescriptor
6262
public static final int VERSION_40 = 7;
6363
// For compatibility with CNDB
6464
public static final int VERSION_DSE_68 = 680;
65+
// Stargazer 1.0 messaging
66+
public static final int VERSION_SG_10 = 100;
6567

6668
/**
6769
* Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes.
6870
* Note: make sure to handle {@link #getMessagingVersion()}
6971
*/
7072
@VisibleForTesting
71-
public static final int current_version = VERSION_40;
73+
public static final int current_version = VERSION_SG_10;
7274

7375
final int version;
7476
public final long id;
@@ -212,6 +214,8 @@ public int getMessagingVersion()
212214
case VERSION_40:
213215
case VERSION_DSE_68:
214216
return MessagingService.VERSION_40;
217+
case VERSION_SG_10:
218+
return MessagingService.VERSION_SG_10;
215219
default:
216220
throw new IllegalStateException("Unknown commitlog version " + version);
217221
}

src/java/org/apache/cassandra/db/filter/RowFilter.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.cassandra.index.IndexRegistry;
4747
import org.apache.cassandra.io.util.DataInputPlus;
4848
import org.apache.cassandra.io.util.DataOutputPlus;
49+
import org.apache.cassandra.net.MessagingService;
4950
import org.apache.cassandra.schema.ColumnMetadata;
5051
import org.apache.cassandra.schema.IndexMetadata;
5152
import org.apache.cassandra.schema.TableMetadata;
@@ -451,22 +452,33 @@ public static class Serializer
451452
{
452453
public void serialize(FilterElement operation, DataOutputPlus out, int version) throws IOException
453454
{
454-
out.writeBoolean(operation.isDisjunction);
455+
assert (!operation.isDisjunction && operation.children().isEmpty()) || version == MessagingService.VERSION_SG_10 :
456+
"Attempting to serialize a disjunct row filter to a node that doesn't support disjunction";
457+
455458
out.writeUnsignedVInt(operation.expressions.size());
456459
for (Expression expr : operation.expressions)
457460
Expression.serializer.serialize(expr, out, version);
461+
462+
if (version < MessagingService.VERSION_SG_10)
463+
return;
464+
465+
out.writeBoolean(operation.isDisjunction);
458466
out.writeUnsignedVInt(operation.children.size());
459467
for (FilterElement child : operation.children)
460468
serialize(child, out, version);
461469
}
462470

463471
public FilterElement deserialize(DataInputPlus in, int version, TableMetadata metadata) throws IOException
464472
{
465-
boolean isDisjunction = in.readBoolean();
466473
int size = (int)in.readUnsignedVInt();
467474
List<Expression> expressions = new ArrayList<>(size);
468475
for (int i = 0; i < size; i++)
469476
expressions.add(Expression.serializer.deserialize(in, version, metadata));
477+
478+
if (version < MessagingService.VERSION_SG_10)
479+
return new FilterElement(false, expressions, Collections.emptyList());
480+
481+
boolean isDisjunction = in.readBoolean();
470482
size = (int)in.readUnsignedVInt();
471483
List<FilterElement> children = new ArrayList<>(size);
472484
for (int i = 0; i < size; i++)
@@ -476,9 +488,14 @@ public FilterElement deserialize(DataInputPlus in, int version, TableMetadata me
476488

477489
public long serializedSize(FilterElement operation, int version)
478490
{
479-
long size = 1 + TypeSizes.sizeofUnsignedVInt(operation.expressions.size());
491+
long size = TypeSizes.sizeofUnsignedVInt(operation.expressions.size());
480492
for (Expression expr : operation.expressions)
481493
size += Expression.serializer.serializedSize(expr, version);
494+
495+
if (version < MessagingService.VERSION_SG_10)
496+
return size;
497+
498+
size++; // isDisjunction boolean
482499
size += TypeSizes.sizeofUnsignedVInt(operation.children.size());
483500
for (FilterElement child : operation.children)
484501
size += serializedSize(child, version);

src/java/org/apache/cassandra/hints/HintsDescriptor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ final class HintsDescriptor
6464

6565
static final int VERSION_30 = 1;
6666
static final int VERSION_40 = 2;
67-
static final int CURRENT_VERSION = VERSION_40;
67+
static final int VERSION_SG_10 = 100;
68+
static final int CURRENT_VERSION = VERSION_SG_10;
6869

6970
static final String COMPRESSION = "compression";
7071
static final String ENCRYPTION = "encryption";
@@ -220,6 +221,8 @@ static int messagingVersion(int hintsVersion)
220221
return MessagingService.VERSION_30;
221222
case VERSION_40:
222223
return MessagingService.VERSION_40;
224+
case VERSION_SG_10:
225+
return MessagingService.VERSION_SG_10;
223226
default:
224227
throw new AssertionError();
225228
}

src/java/org/apache/cassandra/io/sstable/format/trieindex/TrieIndexFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ static class TrieIndexVersion extends Version
323323
hasAccurateLegacyMinMax = version.compareTo("ac") >= 0;
324324
hasOriginatingHostId = version.matches("(a[d-z])|(b[b-z])") || version.compareTo("ca") >= 0;
325325
hasMaxColumnValueLengths = version.matches("b[a-z]"); // DSE only field
326-
correspondingMessagingVersion = version.compareTo("ca") >= 0 ? MessagingService.VERSION_40 : MessagingService.VERSION_3014;
326+
correspondingMessagingVersion = version.compareTo("ca") >= 0 ? MessagingService.VERSION_SG_10 : MessagingService.VERSION_3014;
327327
}
328328

329329
// this is for the ab version which was used in the LABS, and then has been renamed to ba

src/java/org/apache/cassandra/net/Message.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import static org.apache.cassandra.net.MessagingService.VERSION_30;
5757
import static org.apache.cassandra.net.MessagingService.VERSION_3014;
5858
import static org.apache.cassandra.net.MessagingService.VERSION_40;
59+
import static org.apache.cassandra.net.MessagingService.VERSION_SG_10;
5960
import static org.apache.cassandra.net.MessagingService.instance;
6061
import static org.apache.cassandra.utils.MonotonicClock.approxTime;
6162
import static org.apache.cassandra.utils.vint.VIntCoding.computeUnsignedVIntSize;
@@ -1367,6 +1368,7 @@ private static <In,Out> IVersionedAsymmetricSerializer<In, Out> getPayloadSerial
13671368
private int serializedSize30;
13681369
private int serializedSize3014;
13691370
private int serializedSize40;
1371+
private int serializedSizeSG10;
13701372

13711373
/**
13721374
* Serialized size of the entire message, for the provided messaging version. Caches the calculated value.
@@ -1387,6 +1389,10 @@ public int serializedSize(int version)
13871389
if (serializedSize40 == 0)
13881390
serializedSize40 = serializer.serializedSize(this, VERSION_40);
13891391
return serializedSize40;
1392+
case VERSION_SG_10:
1393+
if (serializedSizeSG10 == 0)
1394+
serializedSizeSG10 = (int) serializer.serializedSize(this, VERSION_SG_10);
1395+
return serializedSizeSG10;
13901396
default:
13911397
throw new IllegalStateException();
13921398
}
@@ -1395,6 +1401,7 @@ public int serializedSize(int version)
13951401
private int payloadSize30 = -1;
13961402
private int payloadSize3014 = -1;
13971403
private int payloadSize40 = -1;
1404+
private int payloadSizeSG10 = -1;
13981405

13991406
private int payloadSize(int version)
14001407
{
@@ -1412,6 +1419,10 @@ private int payloadSize(int version)
14121419
if (payloadSize40 < 0)
14131420
payloadSize40 = serializer.payloadSize(this, VERSION_40);
14141421
return payloadSize40;
1422+
case VERSION_SG_10:
1423+
if (payloadSizeSG10 < 0)
1424+
payloadSizeSG10 = serializer.payloadSize(this, VERSION_SG_10);
1425+
return payloadSizeSG10;
14151426
default:
14161427
throw new IllegalStateException();
14171428
}

src/java/org/apache/cassandra/net/MessagingService.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,16 +203,20 @@ public class MessagingService extends MessagingServiceMBeanImpl
203203
public static final int VERSION_30 = 10;
204204
public static final int VERSION_3014 = 11;
205205
public static final int VERSION_40 = 12;
206+
// Current Stargazer version while we have serialization differences
207+
// If differences get merged upstream then we can revert to OS versioning
208+
public static final int VERSION_SG_10 = 100;
206209
public static final int minimum_version = VERSION_30;
207-
public static final int current_version = VERSION_40;
210+
public static final int current_version = VERSION_SG_10;
208211
static AcceptVersions accept_messaging = new AcceptVersions(minimum_version, current_version);
209212
static AcceptVersions accept_streaming = new AcceptVersions(current_version, current_version);
210213

211214
public enum Version
212215
{
213216
VERSION_30(10),
214217
VERSION_3014(11),
215-
VERSION_40(12);
218+
VERSION_40(12),
219+
STARGAZER_10(100);
216220

217221
public final int value;
218222

test/unit/org/apache/cassandra/net/FramingTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import static java.lang.Math.*;
5353
import static org.apache.cassandra.net.MessagingService.VERSION_30;
5454
import static org.apache.cassandra.net.MessagingService.VERSION_3014;
55+
import static org.apache.cassandra.net.MessagingService.VERSION_40;
56+
import static org.apache.cassandra.net.MessagingService.VERSION_SG_10;
5557
import static org.apache.cassandra.net.MessagingService.current_version;
5658
import static org.apache.cassandra.net.MessagingService.minimum_version;
5759
import static org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD;
@@ -251,13 +253,14 @@ public void testSerializeSizeMatchesEdgeCases() // See CASSANDRA-16103
251253

252254
private void burnRandomLegacy(int count)
253255
{
256+
int[] versions = new int[] { VERSION_30, VERSION_3014, VERSION_40, VERSION_SG_10 };
254257
SecureRandom seed = new SecureRandom();
255258
Random random = new Random();
256259
for (int i = 0 ; i < count ; ++i)
257260
{
258261
long innerSeed = seed.nextLong();
259262
float ratio = seed.nextFloat();
260-
int version = minimum_version + random.nextInt(1 + current_version - minimum_version);
263+
int version = versions[random.nextInt(4)];
261264
logger.debug("seed: {}, ratio: {}, version: {}", innerSeed, ratio, version);
262265
random.setSeed(innerSeed);
263266
testRandomSequenceOfMessages(random, ratio, version, new FrameDecoderLegacy(GlobalBufferPoolAllocator.instance, version));

0 commit comments

Comments
 (0)