You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
In short, we use an Azure event hub as a source for structured streaming, where we for each batch save parts of the data to different folders in a Delta Lake - the folder(s) depend on a column in the batch.
Pseudo-code:
1. Read stream from event hub
2. foreach batch
2.1 Persist batch
2.2 Collect list of folder names (given by column in batch)
2.3 foreach folder name (using Scala foreach)
2.3.1 Filter batchdata by column name
2.3.2 Save filtered data to Delta Lake with path $base_path/$folder_name
2.4 Unpersist batch
I can provide proper code examples, if necessary.
Issue
After running for several hours, the program will start to slow down, and at some point will hang completely until it is manually restarted.
Taking a heap dump after the program has started to hang, we can see that org.apache.qpid.proton.reactor.impl.ReactorImpl occupies 95% of the heap, and looking at the corresponding stacktrace (shown below), we see mentions of azure.eventhubs.impl.EventHubClientImpl, leading us to believe that it may have something to do with this library.
Heap dump stacktrace
at sun.nio.ch.NativeThread.current()J (Native Method)
at sun.nio.ch.SinkChannelImpl.write(Ljava/nio/ByteBuffer;)I (SinkChannelImpl.java:165)
at com.microsoft.azure.eventhubs.impl.ReactorDispatcher.signalWorkQueue()V (ReactorDispatcher.java:97)
at com.microsoft.azure.eventhubs.impl.ReactorDispatcher.invoke(ILcom/microsoft/azure/eventhubs/impl/DispatchHandler;)V (ReactorDispatcher.java:72)
at com.microsoft.azure.eventhubs.impl.Timer.schedule(Ljava/lang/Runnable;Ljava/time/Duration;)Ljava/util/concurrent/CompletableFuture; (Timer.java:26)
at com.microsoft.azure.eventhubs.impl.EventHubClientImpl.managementWithRetry(Ljava/util/Map;)Ljava/util/concurrent/CompletableFuture; (EventHubClientImpl.java:384)
at com.microsoft.azure.eventhubs.impl.EventHubClientImpl.lambda$getPartitionRuntimeInformation$5(Ljava/util/Map;)Ljava/util/concurrent/CompletionStage; (EventHubClientImpl.java:354)
at com.microsoft.azure.eventhubs.impl.EventHubClientImpl$$Lambda$1814.apply(Ljava/lang/Object;)Ljava/lang/Object; (Unknown Source)
at java.util.concurrent.CompletableFuture.uniCompose(Ljava/util/concurrent/CompletableFuture;Ljava/util/function/Function;Ljava/util/concurrent/CompletableFuture$UniCompose;)Z (CompletableFuture.java:966)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(I)Ljava/util/concurrent/CompletableFuture; (CompletableFuture.java:940)
at java.util.concurrent.CompletableFuture.postComplete()V (CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.postFire(Ljava/util/concurrent/CompletableFuture;I)Ljava/util/concurrent/CompletableFuture; (CompletableFuture.java:575)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(I)Ljava/util/concurrent/CompletableFuture; (CompletableFuture.java:594)
at java.util.concurrent.CompletableFuture$Completion.run()V (CompletableFuture.java:456)
at java.util.concurrent.Executors$RunnableAdapter.call()Ljava/lang/Object; (Executors.java:511)
at java.util.concurrent.FutureTask.run()V (FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Ljava/util/concurrent/ScheduledThreadPoolExecutor$ScheduledFutureTask;)V (ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run()V (ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Ljava/util/concurrent/ThreadPoolExecutor$Worker;)V (ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run()V (ThreadPoolExecutor.java:624)
at java.lang.Thread.run()V (Thread.java:748)
How have we tried to solve the problem?
Since we perform several actions (including several writes) in each batch, we ensure that we persist/unpersist the batch data, as per your documentation. Furthermore, have significantly increased spark.locality.wait to ensure locality, and we can see that all tasks are being executed with PROCESS_LOCAL locality level.
For reference, here are the various versions we use:
Spark version: 3.2.2
Artifact id: azure-eventhubs-spark_2.12
Package version: 2.3.22
Can you help with where to go from here? Is this an issue with our code, or a bug in the library?
Please let me know if you have any questions, and thanks in advance for your help 🙂
The text was updated successfully, but these errors were encountered:
mschou1306
changed the title
Program hangs after a while
Structured streaming job hangs after a while
Jan 5, 2023
Setup (what does our program do)
In short, we use an Azure event hub as a source for structured streaming, where we for each batch save parts of the data to different folders in a Delta Lake - the folder(s) depend on a column in the batch.
Pseudo-code:
I can provide proper code examples, if necessary.
Issue
After running for several hours, the program will start to slow down, and at some point will hang completely until it is manually restarted.
Taking a heap dump after the program has started to hang, we can see that
org.apache.qpid.proton.reactor.impl.ReactorImpl
occupies 95% of the heap, and looking at the corresponding stacktrace (shown below), we see mentions ofazure.eventhubs.impl.EventHubClientImpl
, leading us to believe that it may have something to do with this library.Heap dump stacktrace
How have we tried to solve the problem?
Since we perform several actions (including several writes) in each batch, we ensure that we persist/unpersist the batch data, as per your documentation. Furthermore, have significantly increased
spark.locality.wait
to ensure locality, and we can see that all tasks are being executed withPROCESS_LOCAL
locality level.For reference, here are the various versions we use:
Can you help with where to go from here? Is this an issue with our code, or a bug in the library?
Please let me know if you have any questions, and thanks in advance for your help 🙂
The text was updated successfully, but these errors were encountered: