-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Issue #29] [pulsar-spark] Adding SparkPulsarReliableReceiver #31
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work.
Unfortunately I don't know Scala so I cannot give much feedback.
I left a comment.
Hopefully other folks will be able to give more feedback.
We can ask om [email protected] for more reviews
restart("Restart a consumer") | ||
} | ||
latestStorePushTime = System.currentTimeMillis() | ||
new Thread() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we give a name to this thread?
Also, can we keep a reference to the Thread as a field?
Usually this helps in shutting down the system as you can wait for the Thread to exit in order to not leave leaks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Ideally the thread should be stopped before stopping receiver (Controlled by isStopped
). However, I'll add the check just in case.
Did anyone get a chance to review this? |
} | ||
|
||
override def onStop(): Unit = try { | ||
if (consumerThread != null && consumerThread.isAlive) consumerThread.stop() // Ideally consumrThread should be closed beforee calling onStop. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't use Thread.stop() as it is deprecated and also it can lead to unpredictable behaviour (like unreleased locks)
It is enough to wait for it to finish here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be enough because onStop is called after setting isStopped = true. Hence thread should be closed by then.
But, generally speaking, what is the ideal way to close a thread that is stuck on something like IO? I didn't find anything other than stop that can kill the thread.
Can we do something like:
thread.join(30secs)
if(thread.isAlive()) thread.stop()?
@nlu90 can you take a look at this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you run some checkstyle tools to help format the code?
if(autoAcknowledge) | ||
consumer.acknowledge(groupedMessages.map(_.messageId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the autoACK is false
, then the message can be kept in Pulsar infinitely.
Since the messages have been stored into Spark on line 154 (which means the data ownership is transferred to me), why not ack the message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default is true. This is a configurable property if the user wants the ownership of acknowledge (or negative ack for that matter).
@nlu90 Did you get a chance to go through this? |
@@ -184,7 +184,7 @@ | |||
<caffeine.version>2.6.2</caffeine.version> | |||
<java-semver.version>0.9.0</java-semver.version> | |||
<hppc.version>0.7.3</hppc.version> | |||
<spark-streaming_2.10.version>2.1.0</spark-streaming_2.10.version> | |||
<spark-streaming_2.11.version>2.4.4</spark-streaming_2.11.version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aditiwari01 Is there a specific reason to use version 2.4.4?
It is quite old 2.4.8 is out and rather old too.
If we can upgrade to 3.2.1 (requires scala 2.12) we'll benefit from even newer release that includes updated dependencies with variety of CVEs patched, getting us from
Dependencies Scanned: 212 (145 unique)
Vulnerable Dependencies: 22
Vulnerabilities Found: 92
to
Dependencies Scanned: 265 (190 unique)
Vulnerable Dependencies: 12
Vulnerabilities Found: 45
with the rest being suppressable.
you'll need to use
<scala-library.version>2.12.15</scala-library.version>
<spark-streaming_2.12.version>3.2.1</spark-streaming_2.12.version>
and exclude log4j:
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.10</artifactId>
- <version>${spark-streaming_2.10.version}</version>
+ <artifactId>spark-streaming_2.12</artifactId>
+ <version>${spark-streaming_2.12.version}</version>
<exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
<exclusion>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we had spark 2.4.4 (2.11) setup at our end, hence I chose these version initially.
Anyway we're also upgrading internally to 3.2.1_2.12, I'll update spark-pulsar for the same as well.
(will only be able to do that post our internal upgradation)
Meanwhile, do we want to merge this version and cut out a separate branch? (So that we have a support for 2.11 scala and spark 2, in case anyone wants to use?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, I think 2.4.4 is ok for now and we can move to 3.2.1 with pulsar 2.10
Fixes #29
Motivation
Current pulsar-spark adapter uses spark_streaming_2.10 while scala dependency is 2.11. Apart from this current receiver does not take care about reliability, rate limit and. backpressure. Added a new receiver with all these considerations.
Modifications
Includes: