-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-18959: increase the num_workers from 9 to 12 #19274
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18959: increase the num_workers from 9 to 12 #19274
Conversation
I ran several tests to verify that this change can successfully invoke all 12 nodes, and the tests are still functioning properly. Using kafkatest.benchmarks.core.benchmark_test as an example, here is its test report. {
"client_status": {
"TestKey(test_id='kafkatest.benchmarks.core.benchmark_test.Benchmark.test_long_term_producer_throughput.security_protocol=PLAINTEXT.compression_type=none.metadata_quorum=ISOLATED_KRAFT', test_index=1)": {
"exitcode": 0,
"name": "Process-1",
"pid": 348141,
"runner_end_time": 1742913956.5917284,
"runner_start_time": 1742913872.3062623,
"status": "FINISHED"
},
"TestKey(test_id='kafkatest.benchmarks.core.benchmark_test.Benchmark.test_long_term_producer_throughput.security_protocol=PLAINTEXT.compression_type=snappy.metadata_quorum=ISOLATED_KRAFT', test_index=2)": {
"exitcode": 0,
"name": "Process-2",
"pid": 351641,
"runner_end_time": 1742914061.9537954,
"runner_start_time": 1742913956.594597,
"status": "FINISHED"
},
"TestKey(test_id='kafkatest.benchmarks.core.benchmark_test.Benchmark.test_long_term_producer_throughput.security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.2.compression_type=none.metadata_quorum=ISOLATED_KRAFT', test_index=3)": {
"exitcode": 0,
"name": "Process-3",
"pid": 356538,
"runner_end_time": 1742914184.9281225,
"runner_start_time": 1742914061.9568226,
"status": "FINISHED"
},
"TestKey(test_id='kafkatest.benchmarks.core.benchmark_test.Benchmark.test_long_term_producer_throughput.security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.2.compression_type=snappy.metadata_quorum=ISOLATED_KRAFT', test_index=4)": {
"exitcode": 0,
"name": "Process-4",
"pid": 362353,
"runner_end_time": 1742914298.3415258,
"runner_start_time": 1742914184.9310431,
"status": "FINISHED"
},
"TestKey(test_id='kafkatest.benchmarks.core.benchmark_test.Benchmark.test_long_term_producer_throughput.security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.3.compression_type=none.metadata_quorum=ISOLATED_KRAFT', test_index=5)": {
"exitcode": 0,
"name": "Process-5",
"pid": 368155,
"runner_end_time": 1742914485.2978635,
"runner_start_time": 1742914298.3445628,
"status": "FINISHED"
},
"TestKey(test_id='kafkatest.benchmarks.core.benchmark_test.Benchmark.test_long_term_producer_throughput.security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.3.compression_type=snappy.metadata_quorum=ISOLATED_KRAFT', test_index=6)": {
"exitcode": 0,
"name": "Process-6",
"pid": 377512,
"runner_end_time": 1742914588.5678623,
"runner_start_time": 1742914485.3008275,
"status": "FINISHED"
}
},
"cluster_nodes_allocated": {
"max": 7,
"mean": 7.0,
"min": 7
},
"cluster_nodes_used": {
"max": 7,
"mean": 7.0,
"min": 7
},
"cluster_num_nodes": 12,
"cluster_utilization": 0.5827999101183249,
"ducktape_version": "0.12.0",
"num_failed": 0,
"num_ignored": 0,
"num_passed": 6,
"parallelism": 0.9990855602028426,
"results": [
{
"base_results_dir": "/home/apalan60/IdeaProjects/kafka/results/2025-03-25--017/",
"cls_name": "Benchmark",
"data": {
"0": {
"mb_per_sec": 40.35,
"records_per_sec": 423101.3
}
},
"description": "\n Setup: 3 node kafka cluster\n Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.\n\n Collect and return aggregate throughput statistics after all messages have been acknowledged.\n\n (This runs ProducerPerformance.java under the hood)\n ",
"function_name": "test_long_term_producer_throughput",
"injected_args": {
"compression_type": "none",
"metadata_quorum": "ISOLATED_KRAFT",
"security_protocol": "PLAINTEXT"
},
"module_name": "kafkatest.benchmarks.core.benchmark_test",
"nodes_allocated": 7,
"nodes_used": 7,
"relative_results_dir": "Benchmark/test_long_term_producer_throughput/security_protocol=PLAINTEXT.compression_type=none.metadata_quorum=ISOLATED_KRAFT/1/",
"results_dir": "/home/apalan60/IdeaProjects/kafka/results/2025-03-25--017/Benchmark/test_long_term_producer_throughput/security_protocol=PLAINTEXT.compression_type=none.metadata_quorum=ISOLATED_KRAFT/1/",
"run_time_seconds": 84.17871618270874,
"services": [],
"start_time": 1742913872.312041,
"stop_time": 1742913956.4907572,
"summary": "Test Passed",
"test_id": "kafkatest.benchmarks.core.benchmark_test.Benchmark.test_long_term_producer_throughput.security_protocol=PLAINTEXT.compression_type=none.metadata_quorum=ISOLATED_KRAFT",
"test_status": "PASS"
},
{
"base_results_dir": "/home/apalan60/IdeaProjects/kafka/results/2025-03-25--017/",
"cls_name": "Benchmark",
"data": {
"0": {
"mb_per_sec": 27.86,
"records_per_sec": 292090.2
}
},
"description": "\n Setup: 3 node kafka cluster\n Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.\n\n Collect and return aggregate throughput statistics after all messages have been acknowledged.\n\n (This runs ProducerPerformance.java under the hood)\n ",
"function_name": "test_long_term_producer_throughput",
"injected_args": {
"compression_type": "snappy",
"metadata_quorum": "ISOLATED_KRAFT",
"security_protocol": "PLAINTEXT"
},
"module_name": "kafkatest.benchmarks.core.benchmark_test",
"nodes_allocated": 7,
"nodes_used": 7,
"relative_results_dir": "Benchmark/test_long_term_producer_throughput/security_protocol=PLAINTEXT.compression_type=snappy.metadata_quorum=ISOLATED_KRAFT/2/",
"results_dir": "/home/apalan60/IdeaProjects/kafka/results/2025-03-25--017/Benchmark/test_long_term_producer_throughput/security_protocol=PLAINTEXT.compression_type=snappy.metadata_quorum=ISOLATED_KRAFT/2/",
"run_time_seconds": 105.25337219238281,
"services": [],
"start_time": 1742913956.5992308,
"stop_time": 1742914061.852603,
"summary": "Test Passed",
"test_id": "kafkatest.benchmarks.core.benchmark_test.Benchmark.test_long_term_producer_throughput.security_protocol=PLAINTEXT.compression_type=snappy.metadata_quorum=ISOLATED_KRAFT",
"test_status": "PASS"
},
{
"base_results_dir": "/home/apalan60/IdeaProjects/kafka/results/2025-03-25--017/",
"cls_name": "Benchmark",
"data": {
"0": {
"mb_per_sec": 29.66,
"records_per_sec": 311013.0
}
},
"description": "\n Setup: 3 node kafka cluster\n Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.\n\n Collect and return aggregate throughput statistics after all messages have been acknowledged.\n\n (This runs ProducerPerformance.java under the hood)\n ",
"function_name": "test_long_term_producer_throughput",
"injected_args": {
"compression_type": "none",
"interbroker_security_protocol": "PLAINTEXT",
"metadata_quorum": "ISOLATED_KRAFT",
"security_protocol": "SSL",
"tls_version": "TLSv1.2"
},
"module_name": "kafkatest.benchmarks.core.benchmark_test",
"nodes_allocated": 7,
"nodes_used": 7,
"relative_results_dir": "Benchmark/test_long_term_producer_throughput/security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.2.compression_type=none.metadata_quorum=ISOLATED_KRAFT/3/",
"results_dir": "/home/apalan60/IdeaProjects/kafka/results/2025-03-25--017/Benchmark/test_long_term_producer_throughput/security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.2.compression_type=none.metadata_quorum=ISOLATED_KRAFT/3/",
"run_time_seconds": 122.86548900604248,
"services": [],
"start_time": 1742914061.9615622,
"stop_time": 1742914184.8270512,
"summary": "Test Passed",
"test_id": "kafkatest.benchmarks.core.benchmark_test.Benchmark.test_long_term_producer_throughput.security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.2.compression_type=none.metadata_quorum=ISOLATED_KRAFT",
"test_status": "PASS"
},
{
"base_results_dir": "/home/apalan60/IdeaProjects/kafka/results/2025-03-25--017/",
"cls_name": "Benchmark",
"data": {
"0": {
"mb_per_sec": 24.44,
"records_per_sec": 256259.1
}
},
"description": "\n Setup: 3 node kafka cluster\n Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.\n\n Collect and return aggregate throughput statistics after all messages have been acknowledged.\n\n (This runs ProducerPerformance.java under the hood)\n ",
"function_name": "test_long_term_producer_throughput",
"injected_args": {
"compression_type": "snappy",
"interbroker_security_protocol": "PLAINTEXT",
"metadata_quorum": "ISOLATED_KRAFT",
"security_protocol": "SSL",
"tls_version": "TLSv1.2"
},
"module_name": "kafkatest.benchmarks.core.benchmark_test",
"nodes_allocated": 7,
"nodes_used": 7,
"relative_results_dir": "Benchmark/test_long_term_producer_throughput/security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.2.compression_type=snappy.metadata_quorum=ISOLATED_KRAFT/4/",
"results_dir": "/home/apalan60/IdeaProjects/kafka/results/2025-03-25--017/Benchmark/test_long_term_producer_throughput/security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.2.compression_type=snappy.metadata_quorum=ISOLATED_KRAFT/4/",
"run_time_seconds": 113.3045244216919,
"services": [],
"start_time": 1742914184.9360578,
"stop_time": 1742914298.2405822,
"summary": "Test Passed",
"test_id": "kafkatest.benchmarks.core.benchmark_test.Benchmark.test_long_term_producer_throughput.security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.2.compression_type=snappy.metadata_quorum=ISOLATED_KRAFT",
"test_status": "PASS"
},
{
"base_results_dir": "/home/apalan60/IdeaProjects/kafka/results/2025-03-25--017/",
"cls_name": "Benchmark",
"data": {
"0": {
"mb_per_sec": 23.52,
"records_per_sec": 246609.1
}
},
"description": "\n Setup: 3 node kafka cluster\n Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.\n\n Collect and return aggregate throughput statistics after all messages have been acknowledged.\n\n (This runs ProducerPerformance.java under the hood)\n ",
"function_name": "test_long_term_producer_throughput",
"injected_args": {
"compression_type": "none",
"interbroker_security_protocol": "PLAINTEXT",
"metadata_quorum": "ISOLATED_KRAFT",
"security_protocol": "SSL",
"tls_version": "TLSv1.3"
},
"module_name": "kafkatest.benchmarks.core.benchmark_test",
"nodes_allocated": 7,
"nodes_used": 7,
"relative_results_dir": "Benchmark/test_long_term_producer_throughput/security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.3.compression_type=none.metadata_quorum=ISOLATED_KRAFT/5/",
"results_dir": "/home/apalan60/IdeaProjects/kafka/results/2025-03-25--017/Benchmark/test_long_term_producer_throughput/security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.3.compression_type=none.metadata_quorum=ISOLATED_KRAFT/5/",
"run_time_seconds": 186.84774732589722,
"services": [],
"start_time": 1742914298.3491876,
"stop_time": 1742914485.196935,
"summary": "Test Passed",
"test_id": "kafkatest.benchmarks.core.benchmark_test.Benchmark.test_long_term_producer_throughput.security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.3.compression_type=none.metadata_quorum=ISOLATED_KRAFT",
"test_status": "PASS"
},
{
"base_results_dir": "/home/apalan60/IdeaProjects/kafka/results/2025-03-25--017/",
"cls_name": "Benchmark",
"data": {
"0": {
"mb_per_sec": 31.68,
"records_per_sec": 332237.0
}
},
"description": "\n Setup: 3 node kafka cluster\n Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.\n\n Collect and return aggregate throughput statistics after all messages have been acknowledged.\n\n (This runs ProducerPerformance.java under the hood)\n ",
"function_name": "test_long_term_producer_throughput",
"injected_args": {
"compression_type": "snappy",
"interbroker_security_protocol": "PLAINTEXT",
"metadata_quorum": "ISOLATED_KRAFT",
"security_protocol": "SSL",
"tls_version": "TLSv1.3"
},
"module_name": "kafkatest.benchmarks.core.benchmark_test",
"nodes_allocated": 7,
"nodes_used": 7,
"relative_results_dir": "Benchmark/test_long_term_producer_throughput/security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.3.compression_type=snappy.metadata_quorum=ISOLATED_KRAFT/6/",
"results_dir": "/home/apalan60/IdeaProjects/kafka/results/2025-03-25--017/Benchmark/test_long_term_producer_throughput/security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.3.compression_type=snappy.metadata_quorum=ISOLATED_KRAFT/6/",
"run_time_seconds": 103.16111874580383,
"services": [],
"start_time": 1742914485.3056612,
"stop_time": 1742914588.46678,
"summary": "Test Passed",
"test_id": "kafkatest.benchmarks.core.benchmark_test.Benchmark.test_long_term_producer_throughput.security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.3.compression_type=snappy.metadata_quorum=ISOLATED_KRAFT",
"test_status": "PASS"
}
],
"run_time_seconds": 716.2659499645233,
"run_time_statistics": {
"max": 186.84774732589722,
"mean": 119.2684946457545,
"min": 84.17871618270874
},
"session_context": {
"_globals": {},
"compress": false,
"debug": false,
"default_expected_num_nodes": null,
"exit_first": false,
"fail_bad_cluster_utilization": false,
"fail_greedy_tests": false,
"max_parallel": 1,
"no_teardown": false,
"results_dir": "/home/apalan60/IdeaProjects/kafka/results/2025-03-25--017",
"session_id": "2025-03-25--017",
"test_runner_timeout": 1800000
},
"start_time": 1742913872.3035154,
"stop_time": 1742914588.5694654
} |
@apalan60 maybe we should align the number with docker-based e2e? kafka/tests/docker/run_tests.sh Line 19 in b9d5597
WDYT? |
I also think that having a common standard is a good thing. I will try to change it and test to see if any issues arise. Thanks for the reminder. |
I ran a few tests, and so far I have successfully invoked 14 nodes, with the tests remaining stable.
|
[JIRA](https://issues.apache.org/jira/browse/KAFKA-18959) This PR adjusts the default node count for system tests since some tests are using more than 9 nodes. This change ensures such test cases can be run successfully without manual configuration. Reviewers: Chia-Ping Tsai <[email protected]>
JIRA
This PR adjusts the default node count for system tests since some tests are using more than 9 nodes.
This change ensures such test cases can be run successfully without manual configuration.
Reviewers: Chia-Ping Tsai [email protected]