You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have been implementing data indexing scripts in PySpark, where, given a large input dataset, we want to chunk it up and index each chunk as a dataframe into OpenSearch. However, just invoking save() on the DataframeWriter is not enough. You have to poll for indexing completion from OpenSearch otherwise you run into errors such as org.opensearch.hadoop.rest.OpenSearchHadoopNoNodesLeftException or TOO_MANY_REQUESTS.
Rate limiting is a separate problem but it's related and is largely alleviated if each ingested chunk can be indexed absolutely fully before one moves on to the next one. And that can be done with a wait_for_completion.
The idea here is that ideally there should be an easy way to poll for completion for the whole dataframe full of documents. Once all have been accounted for, the caller can move on to the next chunk.
Caveat: somehow, the ingestion rate needs to be figured in here and I don't see a way to compute it other than by trial and error for each deployment (?). Perhaps, if the rate limiting could be more easily determined, then wait_for_completion would be less necessary, or even unnecessary?
What solution would you like?
A wait_for_completion(df: DataFrame, timeout_seconds: int=-1, wait_additional_seconds=-1) -> Tuple[bool, DataFrame] type of method, or something along those lines, which would:
For each document in the dataframe supplied to the original DataframeWriter.save() call, this would poll OpenSearch for status. If indexed, then the "status" column in the output dataframe would be set to "complete".
If there is an error on the given document, the error code would be set into "error_code" and error message into "message" column.
Once all documents have been accounted for, the output dataframe would get returned, with the 0-th element of the returned tuple set to True (meaning that wait_for_completion completed successfully).
If the method's execution times out before the timeout_seconds timeout interval, then whatever status information has been gathered would be returned in the 1st element of the returned tuple. The 0-th element would be set to False (wait_for_completion timed out).
If timeout_seconds is <= 0 then don't timeout the polling.
If wait_additional_seconds is supplied then wait for that many seconds once the polling has been completed. Why? Because in my testing, even completing the polling is not enough and occasionally I have to wait an extra minute for OpenSearch to "calm down" from the ingested chunk. Weird, but that is what I'm seeing.
There may also be the desire to fail fast during polling: if at least one failure has been detected, terminate the polling and return the results. So perhaps an extra fail_fast: bool=False type of argument.
The solution may be different; I'm just providing a basic outline which can probably be improved.
What alternatives have you considered?
Writing it myself, using PySpark and opensearch-py.
Would rather use something tried and optimized and a part of opensearch-hadoop.
Are there any existing implementations of this?
An alternative may be a Listener interface that the caller registers to get events notifying them of completion (doc ID / status / message).
Another alternative is that the save() method only returns once it's clear that OpenSearch is ready to ingest more. That may be hard. And that's probably the crux of the matter: How can one tell what the rate limit is and when it's safe to index more data after having fed N documents?
The types of errors I have seen often are, for example as below:
Py4JJavaError: An error occurred while calling o803.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 17 in stage 259.0 failed 4 times, most recent failure: Lost task 17.3 in stage 259.0 (TID 13496) (10.104.3.230 executor 9): org.apache.spark.util.TaskCompletionListenerException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[https://my-opensearch.us-east-1.es.amazonaws.com:443]]
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:283)
at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:173)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:166)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:220)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:958)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:961)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:853)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Suppressed: org.opensearch.hadoop.rest.OpenSearchHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[https://my-opensearch.us-east-1.es.amazonaws.com:443]]
at org.opensearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:170)
at org.opensearch.hadoop.rest.RestClient.execute(RestClient.java:443)
at org.opensearch.hadoop.rest.RestClient.execute(RestClient.java:439)
at org.opensearch.hadoop.rest.RestClient.execute(RestClient.java:399)
at org.opensearch.hadoop.rest.RestClient.execute(RestClient.java:403)
at org.opensearch.hadoop.rest.RestClient.refresh(RestClient.java:299)
at org.opensearch.hadoop.rest.bulk.BulkProcessor.close(BulkProcessor.java:579)
at org.opensearch.hadoop.rest.RestRepository.close(RestRepository.java:233)
at org.opensearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:131)
at org.opensearch.spark.rdd.OpenSearchRDDWriter$$anon$1.onTaskCompletion(OpenSearchRDDWriter.scala:80)
at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1(TaskContextImpl.scala:173)
at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1$adapted(TaskContextImpl.scala:173)
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:228)
... 24 more
The text was updated successfully, but these errors were encountered:
dgoldenberg-ias
changed the title
[FEATURE] A wait_for_completion utility method for Spark based indexing into OpenSearch
[FEATURE] A 'wait for completion' utility method for Spark based indexing into OpenSearch
Jul 29, 2024
I keep wondering if this issue is really about proper rate limiting rather than polling for completion. Ideally, the connector would always ingest at a rate that is safe enough to keep far enough away from the TOO MANY REQUESTS error. I guess it would have to dynamically ascertain the cluster capacity and do sufficient levels of backpressure...
Is your feature request related to a problem?
I have been implementing data indexing scripts in PySpark, where, given a large input dataset, we want to chunk it up and index each chunk as a dataframe into OpenSearch. However, just invoking save() on the DataframeWriter is not enough. You have to poll for indexing completion from OpenSearch otherwise you run into errors such as
org.opensearch.hadoop.rest.OpenSearchHadoopNoNodesLeftException
orTOO_MANY_REQUESTS
.Rate limiting is a separate problem but it's related and is largely alleviated if each ingested chunk can be indexed absolutely fully before one moves on to the next one. And that can be done with a wait_for_completion.
The idea here is that ideally there should be an easy way to poll for completion for the whole dataframe full of documents. Once all have been accounted for, the caller can move on to the next chunk.
Caveat: somehow, the ingestion rate needs to be figured in here and I don't see a way to compute it other than by trial and error for each deployment (?). Perhaps, if the rate limiting could be more easily determined, then wait_for_completion would be less necessary, or even unnecessary?
What solution would you like?
A
wait_for_completion(df: DataFrame, timeout_seconds: int=-1, wait_additional_seconds=-1) -> Tuple[bool, DataFrame]
type of method, or something along those lines, which would:timeout_seconds
timeout interval, then whatever status information has been gathered would be returned in the 1st element of the returned tuple. The 0-th element would be set to False (wait_for_completion timed out).fail fast
during polling: if at least one failure has been detected, terminate the polling and return the results. So perhaps an extrafail_fast: bool=False
type of argument.The solution may be different; I'm just providing a basic outline which can probably be improved.
What alternatives have you considered?
Do you have any additional context?
My code is basically this:
The types of errors I have seen often are, for example as below:
The text was updated successfully, but these errors were encountered: