Skip to content

Commit 592bc68

Browse files
authored
Merge pull request #225 from santiment/fixPrimaryKey
Option to disable Kafka producer sticky partition
2 parents d31a8ad + c0bc8c8 commit 592bc68

File tree

4 files changed

+25
-37
lines changed

4 files changed

+25
-37
lines changed

src/e2e/producer-transaction.spec.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ describe('Producer transactions', function () {
117117
// consumer but there is something I am missing.
118118
setTimeout(async function () {
119119
for (let i = 0; i < num_messages_test; i++) {
120-
exporter.sendDataWithKey({
120+
exporter.sendData({
121121
timestamp: 10000000,
122122
iso_date: new Date().toISOString(),
123123
key: 1

src/lib/constants.ts

+1
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ export const START_PRIMARY_KEY = getIntEnvVariable('START_PRIMARY_KEY', -1);
1313
export const WRITE_SIGNAL_RECORDS_KAFKA = getBoolEnvVariable('WRITE_SIGNAL_RECORDS_KAFKA', false);
1414
export const KAFKA_TOPIC = process.env.KAFKA_TOPIC;
1515
export const TEST_ENV = getBoolEnvVariable('TEST_ENV', false);
16+
export const KAFKA_PRODUCER_DISABLE_STICKY_PARTITION = getBoolEnvVariable('KAFKA_PRODUCER_DISABLE_STICKY_PARTITION', false);
1617

src/lib/kafka_storage.ts

+20-33
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@ class Partitioner {
4343

4444
async init(hashFunction: ((value: object) => number), topicName: string, producer: Kafka.Producer) {
4545
this.hashFunction = hashFunction;
46-
this.partitionCount = await this.findPartitionCount(topicName, producer);
46+
this.partitionCount = await this.#findPartitionCount(topicName, producer);
4747
if (this.partitionCount <= 0) {
4848
throw new Error('Partition count should be > 0');
4949
}
5050
}
5151

52-
async findPartitionCount(topicName: string, producer: Kafka.Producer) {
52+
async #findPartitionCount(topicName: string, producer: Kafka.Producer) {
5353
// We depend on the producer already being connected
5454
logger.info(`Finding partition count for topic ${topicName}`);
5555
const metadata: Kafka.Metadata = await new Promise((resolve, reject) => {
@@ -106,7 +106,7 @@ export class Exporter {
106106
private readonly zookeeperClient: ZookeeperClientAsync;
107107
private partitioner: Partitioner | null;
108108

109-
constructor(exporter_name: string, transactional: boolean, topicName: string) {
109+
constructor(exporter_name: string, transactional: boolean, topicName: string, disableStickyPartition: boolean = false) {
110110
this.exporter_name = exporter_name;
111111

112112
const producer_settings: ProducerGlobalConfig = {
@@ -133,6 +133,10 @@ export class Exporter {
133133
producer_settings['enable.idempotence'] = true;
134134
}
135135

136+
if (disableStickyPartition) {
137+
producer_settings['sticky.partitioning.linger.ms'] = 0;
138+
}
139+
136140
this.producer = new Kafka.Producer(producer_settings);
137141

138142
this.producer.on('event.log', function (log) {
@@ -296,42 +300,24 @@ export class Exporter {
296300
}
297301
}
298302

299-
async sendData(events: Array<any>) {
300-
if (events.constructor !== Array) {
301-
events = [events];
302-
}
303-
304-
events = events.map(
305-
event => (typeof event === 'object' ? JSON.stringify(event) : event)
306-
);
307-
events.forEach(event => {
308-
this.producer.produce(this.topicName, null, Buffer.from(event));
309-
});
310-
311-
return new Promise<void>((resolve, reject) =>
312-
this.producer.flush(KAFKA_FLUSH_TIMEOUT, (err: LibrdKafkaError) => {
313-
if (err) return reject(err);
314-
resolve();
315-
})
316-
);
317-
}
318-
319-
async sendDataWithKey(events: object | Array<object>, keyField: string, signalRecordData: object | null) {
303+
async sendData(events: object | Array<object>, keyField: string | null, signalRecordData: object | null) {
320304
const arrayEvents: Array<object> = (events.constructor !== Array) ? [events] : events
321305

322306
if (signalRecordData !== null && this.partitioner === null) {
323307
throw new Error('Signal record logic needs partitioner');
324308
}
325309

326310
arrayEvents.forEach((event: any) => {
327-
const partitionNumberPayload = this.partitioner ? this.partitioner.getPartitionNumber(event) : null;
311+
const key = keyField !== null ? event[keyField] : null
312+
313+
const partitionNumberOfPayload = this.partitioner ? this.partitioner.getPartitionNumber(event) : null;
328314
const eventString = typeof event === 'object' ? JSON.stringify(event) : event;
329-
this.producer.produce(this.topicName, partitionNumberPayload, Buffer.from(eventString), event[keyField]);
315+
this.producer.produce(this.topicName, partitionNumberOfPayload, Buffer.from(eventString), key);
330316
if (signalRecordData !== null && this.partitioner !== null) {
331317
const signalRecordString = typeof signalRecordData === 'object' ? JSON.stringify(signalRecordData) : signalRecordData;
332318
for (let partitionNumber = 0; partitionNumber < this.partitioner.getPartitionCount(); ++partitionNumber) {
333-
if (partitionNumber !== partitionNumberPayload) {
334-
this.producer.produce(this.topicName, partitionNumber, Buffer.from(signalRecordString), event[keyField]);
319+
if (partitionNumber !== partitionNumberOfPayload) {
320+
this.producer.produce(this.topicName, partitionNumber, Buffer.from(signalRecordString), key);
335321
}
336322
}
337323
}
@@ -382,11 +368,12 @@ export class Exporter {
382368
const signalRecord: object | null = writeSignalRecordsKafka ? { 'santiment_signal_record': true } : null
383369
try {
384370
if (BLOCKCHAIN === 'utxo') {
385-
await this.sendDataWithKey(events, 'height', signalRecord);
386-
} else if (BLOCKCHAIN === 'receipts') {
387-
await this.sendDataWithKey(events, 'transactionHash', signalRecord);
388-
} else {
389-
await this.sendDataWithKey(events, 'primaryKey', signalRecord);
371+
await this.sendData(events, 'height', signalRecord);
372+
} else if (BLOCKCHAIN === 'receipts' || BLOCKCHAIN === 'eth') {
373+
await this.sendData(events, 'transactionHash', signalRecord);
374+
}
375+
else {
376+
await this.sendData(events, 'primaryKey', signalRecord);
390377
}
391378
await this.commitTransaction();
392379
} catch (exception) {

src/main.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ export class Main {
2727
))
2828
}
2929

30-
async initExporter(exporterName: string, isTransactions: boolean, kafkaTopic: string) {
30+
async initExporter(exporterName: string, isTransactions: boolean, kafkaTopic: string, disableStickyPartition: boolean) {
3131
const INIT_EXPORTER_ERR_MSG = 'Error when initializing exporter: ';
32-
this.exporter = new Exporter(exporterName, isTransactions, kafkaTopic);
32+
this.exporter = new Exporter(exporterName, isTransactions, kafkaTopic, disableStickyPartition);
3333
await this.exporter
3434
.connect()
3535
.then(() => this.exporter.initTransactions())
@@ -70,7 +70,7 @@ export class Main {
7070
async init(blockchain: string) {
7171
const blockchainSpecificConstants = require(`./blockchains/${blockchain}/lib/constants`);
7272
const mergedConstants = { ...constantsBase, ...blockchainSpecificConstants };
73-
await this.initExporter(EXPORTER_NAME, true, mergedConstants.KAFKA_TOPIC);
73+
await this.initExporter(EXPORTER_NAME, true, mergedConstants.KAFKA_TOPIC, mergedConstants.KAFKA_PRODUCER_DISABLE_STICKY_PARTITION);
7474
await this.initWorker(blockchain, mergedConstants);
7575
metrics.startCollection();
7676

0 commit comments

Comments
 (0)