-
Notifications
You must be signed in to change notification settings - Fork 31
[pulsar-spark] added option for configuring Pulsar client #14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
5e238c8
to
2ffe7c4
Compare
@atezs82 can you please rebase this patch? |
2ffe7c4
to
cfec027
Compare
@eolivelli Rebased on top of master, but I might have some additional work to do, since tests are failing still locally for |
We are already working on fixing integration tests as you are touching only the spark adapter then you can ignore the Kafka stuff Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changloola good to me.
I left couple of minor comments
PTAL
@codelipenghui you are going to cut a release soon.
What about including this patch?
...java/org/apache/pulsar/tests/integration/compat/kafka/PulsarKafkaProducerThreadSafeTest.java
Outdated
Show resolved
Hide resolved
pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
Outdated
Show resolved
Hide resolved
...nk/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
Outdated
Show resolved
Hide resolved
cfec027
to
0ecd8e5
Compare
Thanks for the comments, removed all non-Spark related changes from PR. Due to some issues, I still do not have tests working. I'm not sure whether this is a local or general problem but I get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
please remove the star import
pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
Outdated
Show resolved
Hide resolved
That is due to the fact the the integration test library hasn't been published to maven central. The workaround is to compile and install it locally. Here are the commands used in CI: pulsar-adapters/.github/workflows/integration-test.yaml Lines 74 to 90 in 9801c43
The commands also cover building |
0ecd8e5
to
0aea629
Compare
@lhotari Thanks for the workaround, I have managed to run all tests successfully locally for the PR. @eolivelli Sorry for the mess, I have corrected the import statement. |
@codelipenghui @sijie PTAL |
this(StorageLevel.MEMORY_AND_DISK_2(), serviceUrl, clientConfig, consumerConfig, authentication); | ||
} | ||
|
||
public SparkStreamingPulsarReceiver(StorageLevel storageLevel, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are introducing a new parameter storageLevel
. But it was not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the finding, corrected this in the upcoming patchset.
this(StorageLevel.MEMORY_AND_DISK_2(), serviceUrl, new HashMap<>(), consumerConfig, authentication); | ||
} | ||
|
||
public SparkStreamingPulsarReceiver( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we are going to introduce so many new builders?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea behind adding 2 new constructors to the 2 existing ones is I wanted to allow users of the interface to be able to configure Pulsar clients regardless whether they want to specify a storage level or leave it to default. Also, I wanted to keep previous constructors as well to keep backward compatibility.
Please let me know what do you think about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be happy to remove eg. the one having a default storage level:
public SparkStreamingPulsarReceiver(
String serviceUrl,
Map<String,Object> clientConfig,
ConsumerConfigurationData<byte[]> consumerConfig,
Authentication authentication) {
this(StorageLevel.MEMORY_AND_DISK_2(), serviceUrl, clientConfig, consumerConfig, authentication);
}
In that way we might have broader functionality (we just enforce the user to set the storage level whenever client configuration is provided).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie What do you think about this proposal? Shall I change the code like this?
0aea629
to
d0e959d
Compare
Motivation
It is currently not possible to configure the Pulsar Client when creating a
SparkStreamingPulsarReceiver
(currently only the service URL, the authentication plugin and the consumer configuration can be set). This renders the receiver unusable when eg. the Pulsar cluster is configured to use TLS, and custom TLS certificates need to be set.Modifications
loadConf
on the client builder does not get called. Existing constructors set this parameter to an empty map to maintain backward compatibility.ClientBuilder
works like this) the configuration overrides other client configuration.serviceUrl
can be overriden by this feature).SparkStreamingPulsarReceiver
(since they seemed to use an older interface). This issue was similar to Integration tests fail #12 (but not exactly the same).Verifying this change
This change added tests and can be verified as follows:
serviceUrl
can be overriden by this feature).Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation