Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] A 'wait for completion' utility method for Spark based indexing into OpenSearch #499

Open
dgoldenberg-ias opened this issue Jul 29, 2024 · 2 comments
Labels
enhancement New feature or request

Comments

@dgoldenberg-ias
Copy link
Contributor

dgoldenberg-ias commented Jul 29, 2024

Is your feature request related to a problem?

I have been implementing data indexing scripts in PySpark, where, given a large input dataset, we want to chunk it up and index each chunk as a dataframe into OpenSearch. However, just invoking save() on the DataframeWriter is not enough. You have to poll for indexing completion from OpenSearch otherwise you run into errors such as org.opensearch.hadoop.rest.OpenSearchHadoopNoNodesLeftException or TOO_MANY_REQUESTS.

Rate limiting is a separate problem but it's related and is largely alleviated if each ingested chunk can be indexed absolutely fully before one moves on to the next one. And that can be done with a wait_for_completion.

The idea here is that ideally there should be an easy way to poll for completion for the whole dataframe full of documents. Once all have been accounted for, the caller can move on to the next chunk.

Caveat: somehow, the ingestion rate needs to be figured in here and I don't see a way to compute it other than by trial and error for each deployment (?). Perhaps, if the rate limiting could be more easily determined, then wait_for_completion would be less necessary, or even unnecessary?

What solution would you like?

A wait_for_completion(df: DataFrame, timeout_seconds: int=-1, wait_additional_seconds=-1) -> Tuple[bool, DataFrame] type of method, or something along those lines, which would:

  • For each document in the dataframe supplied to the original DataframeWriter.save() call, this would poll OpenSearch for status. If indexed, then the "status" column in the output dataframe would be set to "complete".
  • If there is an error on the given document, the error code would be set into "error_code" and error message into "message" column.
  • Once all documents have been accounted for, the output dataframe would get returned, with the 0-th element of the returned tuple set to True (meaning that wait_for_completion completed successfully).
  • If the method's execution times out before the timeout_seconds timeout interval, then whatever status information has been gathered would be returned in the 1st element of the returned tuple. The 0-th element would be set to False (wait_for_completion timed out).
  • If timeout_seconds is <= 0 then don't timeout the polling.
  • If wait_additional_seconds is supplied then wait for that many seconds once the polling has been completed. Why? Because in my testing, even completing the polling is not enough and occasionally I have to wait an extra minute for OpenSearch to "calm down" from the ingested chunk. Weird, but that is what I'm seeing.
  • There may also be the desire to fail fast during polling: if at least one failure has been detected, terminate the polling and return the results. So perhaps an extra fail_fast: bool=False type of argument.

The solution may be different; I'm just providing a basic outline which can probably be improved.

What alternatives have you considered?

  • Writing it myself, using PySpark and opensearch-py.
  • Would rather use something tried and optimized and a part of opensearch-hadoop.
  • Are there any existing implementations of this?
  • An alternative may be a Listener interface that the caller registers to get events notifying them of completion (doc ID / status / message).
  • Another alternative is that the save() method only returns once it's clear that OpenSearch is ready to ingest more. That may be hard. And that's probably the crux of the matter: How can one tell what the rate limit is and when it's safe to index more data after having fed N documents?

Do you have any additional context?

My code is basically this:

df.write.format("opensearch")
  .option("opensearch.nodes", f"https://{self.host}")
  .option("opensearch.port", str(self.port))
  .option("opensearch.resource", coll_name)
  .option("opensearch.nodes.wan.only", "true")
  .option("opensearch.net.ssl", "true")
  .option("opensearch.aws.sigv4.enabled", "true")
  .option("opensearch.aws.sigv4.region", AWS_REGION)
  .option("opensearch.mapping.id", "doc_id")
  .option("opensearch.batch.write.retry.count", "3")
  .option("opensearch.batch.write.retry.wait", "10s")
  .option("opensearch.batch.size.entries", str(batch_size))
  .option("opensearch.ingest.pipeline", ingest_pipeline_name)
  .mode("append")
  .save()

The types of errors I have seen often are, for example as below:

Py4JJavaError: An error occurred while calling o803.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 17 in stage 259.0 failed 4 times, most recent failure: Lost task 17.3 in stage 259.0 (TID 13496) (10.104.3.230 executor 9): org.apache.spark.util.TaskCompletionListenerException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[https://my-opensearch.us-east-1.es.amazonaws.com:443]] 
	at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:283)
	at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:173)
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:166)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:220)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186)
	at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103)
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108)
	at scala.util.Using$.resource(Using.scala:269)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:958)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:961)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:853)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
	Suppressed: org.opensearch.hadoop.rest.OpenSearchHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[https://my-opensearch.us-east-1.es.amazonaws.com:443]] 
		at org.opensearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:170)
		at org.opensearch.hadoop.rest.RestClient.execute(RestClient.java:443)
		at org.opensearch.hadoop.rest.RestClient.execute(RestClient.java:439)
		at org.opensearch.hadoop.rest.RestClient.execute(RestClient.java:399)
		at org.opensearch.hadoop.rest.RestClient.execute(RestClient.java:403)
		at org.opensearch.hadoop.rest.RestClient.refresh(RestClient.java:299)
		at org.opensearch.hadoop.rest.bulk.BulkProcessor.close(BulkProcessor.java:579)
		at org.opensearch.hadoop.rest.RestRepository.close(RestRepository.java:233)
		at org.opensearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:131)
		at org.opensearch.spark.rdd.OpenSearchRDDWriter$$anon$1.onTaskCompletion(OpenSearchRDDWriter.scala:80)
		at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1(TaskContextImpl.scala:173)
		at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1$adapted(TaskContextImpl.scala:173)
		at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:228)
		... 24 more

@dgoldenberg-ias dgoldenberg-ias added enhancement New feature or request untriaged labels Jul 29, 2024
@dgoldenberg-ias dgoldenberg-ias changed the title [FEATURE] A wait_for_completion utility method for Spark based indexing into OpenSearch [FEATURE] A 'wait for completion' utility method for Spark based indexing into OpenSearch Jul 29, 2024
@dblock dblock removed the untriaged label Aug 19, 2024
@dblock
Copy link
Member

dblock commented Aug 19, 2024

Catch All Triage - 1, 2, 3

@dgoldenberg-ias
Copy link
Contributor Author

I keep wondering if this issue is really about proper rate limiting rather than polling for completion. Ideally, the connector would always ingest at a rate that is safe enough to keep far enough away from the TOO MANY REQUESTS error. I guess it would have to dynamically ascertain the cluster capacity and do sufficient levels of backpressure...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants