Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for structured stream checkpointing on azure filesystem #587

Open
mathieu-rossignol opened this issue Aug 24, 2021 · 3 comments
Open
Assignees
Labels

Comments

@mathieu-rossignol
Copy link
Member

Also used to store the current kafka offset while writing to the structured stream sink

@mathieu-rossignol
Copy link
Member Author

mathieu-rossignol commented Aug 24, 2021

@mathieu-rossignol
Copy link
Member Author

mathieu-rossignol commented Aug 24, 2021

Usage with currently commited code in the branch:

In StructuredStream configuration, put something like (replace <blabla> with relevant strings):

spark.base.checkpoint.path: wasbs://<myContainer>@<myStorageAccount>.blob.core.windows.net/spark-checkpointing

Then in KafkaStreamProcessingEngine global configuration, set for instance the matching account key using a custom spark configuration key like:

spark.custom.config.fs.azure.account.key.<myStorageAccount>.blob.core.windows.net: +H5IuOtsebY7fO6QyyntmlRLe3G8Rv0jcye6kzE2Wz4NrU3IdB4Q8ocJY2ScY9cQrJNXxUg2WbYJPndMuQWUCQ==

Generic custom spark configuration keys are introduced in this branch and allow to call sparkConfig.set("xxx.yyy.zzz","someValue") using dynamic property of KafkaStreamProcessingEngine with the form:

spark.custom.config.xxx.yyy.zzz: someValue

See this page for examples on the custom fs.azure.* configuration keys (account key, sas key...)

image

Currently identified problem is that although needed azure clients libs azure-storage-2.0.0.jar and hadoop-azure-2.7.0.jar are well packaged in the spark 2 engine fat jar as well as references to the needed hadoop FileSystem implementation classes in the META-INF/services/org.apache.hadoop.fs.FileSystem (org.apache.hadoop.fs.azure.NativeAzureFileSystem, org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure), the implementation classes are not found. It seems that the classpath defined by logisland.sh puts spark libs before logisland ones and once META-INF/services/org.apache.hadoop.fs.FileSystem delivered in a jar of the used spark installation (SPARK_HOME) is found, dynamic loading (ServiceLoader) system uses it and cannot find our packaged implementation of azure filesystem which results in a java.io.IOException: No FileSystem for scheme: wasbs exception.

The current workaround I found is to put in every jars dir of the spark cluster executor nodes the needed jars (azure-storage-2.0.0.jar and hadoop-azure-2.7.0.jar). I will try to see if I can tweak logisland.sh to change the classpath or find another solution...

@mathieu-rossignol
Copy link
Member Author

merged in release/1.4.0, but requires the workaround

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant