-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Add Kafka receiver #1410
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Kafka receiver #1410
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1410 +/- ##
==========================================
+ Coverage 91.42% 91.50% +0.07%
==========================================
Files 248 252 +4
Lines 17219 17372 +153
==========================================
+ Hits 15743 15896 +153
Misses 1064 1064
Partials 412 412
Continue to review full report at Codecov.
|
7db9a0d
to
a8118b6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be almost impossible for one of the maintainer to review 1500+ lines. Consider to split this PR into more readable PRs, as a rule of thumb 500 lines is considered a large PR.
Thank you for your understanding :)
Alright I will split it to exporter and receiver. |
dfbbd53
to
d311aec
Compare
d311aec
to
36fe01d
Compare
@bogdandrutu I have removed the exporter from this PR add moved it to #1439. This and the other PR is ready for review. |
@pavolloffay I would mark this PR WIP until we make some decisions in the exporter (regarding to the message we add the Kafka) then remove that. Does it make sense? |
Please update accordingly to the changes in the exporter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some preliminary feedback, waiting for the updated PR.
a263ad5
to
a6294c6
Compare
|
||
// Metadata is the namespace for metadata management properties used by the | ||
// Client, and shared by the Producer/Consumer. | ||
Metadata kafkaexporter.Metadata `mapstructure:"metadata"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is being reused between exporter and receiver shall I move it to ./config/configkafka
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do this in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright once this is merged I will submit a refactoring PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert then :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What should I revert?
There is nothing to revert, or do you mean that instead of importing kafkaexporter.Metadata
redefine that struct in the receiver?
I have rebased it and fixed a couple of issues that I remember from the exporter. @bogdandrutu could you please give it a second go? |
|
||
func (c *kafkaConsumer) Start(context.Context, component.Host) error { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
c.cancel = cancel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confused about this cancel, is this just to cancel the "consumerGroup.Consume"? Is that not cancelled by the Close
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used to interrupt the infinite loop that calls c.consumerGroup.Consume
Ping |
Signed-off-by: Pavol Loffay <[email protected]>
ea1f646
to
be70c90
Compare
Signed-off-by: Pavol Loffay <[email protected]>
Signed-off-by: Pavol Loffay <[email protected]>
@bogdandrutu I have updated the PR |
Signed-off-by: Pavol Loffay <[email protected]>
Signed-off-by: Pavol Loffay <[email protected]>
// `Consume` should be called inside an infinite loop, when a | ||
// server-side rebalance happens, the consumer session will need to be | ||
// recreated to get the new claims | ||
if err := c.consumerGroup.Consume(ctx, c.topics, handler); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it not a problem that most likely this will block forever if no messages are coming, so never gets closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When no messages are coming the call passes and the handler is ready to receive messages.
When the close on the receiver is called it closes the consumer group correctly and interrupts the infinite loop via the cancelConsumeLoop context.CancelFunc
(the "cancel" question you had before).
@@ -10,6 +10,7 @@ cloud.google.com/go v0.50.0/go.mod h1:r9sluTvynVuxRIOHXQEHMFffphuXHOMZMycpNR5e6T | |||
cloud.google.com/go v0.51.0/go.mod h1:hWtGJ6gnXH+KgDv+V0zFGDvpi07n3z8ZNj3T1RW0Gcw= | |||
cloud.google.com/go v0.52.0/go.mod h1:pXajvRH/6o3+F9jDHZWQ5PbGhn+o8w9qiu/CffaVdO4= | |||
cloud.google.com/go v0.53.0/go.mod h1:fp/UouUEsRkN6ryDKNW/Upv/JBKnv6WDthjR6+vze6M= | |||
cloud.google.com/go v0.56.0 h1:WRz29PgAsVEyPSDHyk+0fpEkwEFyfhHn+JbksT6gIL4= |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why go sum changes without go mod changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure why but go mod tidy
gives an empty output :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I revert it go mod tidy
brings it back
…y#1410) Bumps [boto3](https://github.com/boto/boto3) from 1.21.27 to 1.21.28. - [Release notes](https://github.com/boto/boto3/releases) - [Changelog](https://github.com/boto/boto3/blob/develop/CHANGELOG.rst) - [Commits](boto/boto3@1.21.27...1.21.28) --- updated-dependencies: - dependency-name: boto3 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Signed-off-by: Pavol Loffay [email protected]
Description: <Describe what has changed.
This patch adds Kafka receiver and Consumer. The implementaiton is using Sarama client https://github.com/Shopify/sarama.
After this is merged I will do a couple of improvements:
Link to tracking Issue:
Resolves: #1331
Resolves: open-telemetry/opentelemetry-collector-contrib#5
Related to : #1101
Testing: < Describe what testing was performed and which tests were added.>
Tested locally with Confluent 5.0.0 https://docs.confluent.io/5.0.0/installation/docker/docs/installation/single-node-client.html
Documentation: < Describe the documentation added.>
Exporter and receiver readmes.