Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE REQUEST]: Add Trigger() to DataStreamWriter. #139

Closed
petmoy opened this issue Jun 14, 2019 · 13 comments
Closed

[FEATURE REQUEST]: Add Trigger() to DataStreamWriter. #139

petmoy opened this issue Jun 14, 2019 · 13 comments
Assignees
Labels
enhancement New feature or request
Milestone

Comments

@petmoy
Copy link

petmoy commented Jun 14, 2019

Description
Hi...I am trying to create a simple proof of concept application using .NET Spark that streams data from Kafka.

Facing issues:

  • I cannot find how to achieve continuous streaming. Data are processed in batches periodically every n seconds
  • In general, the way of setting configurations such as triggers, batch interval, windows size, sliding windows size it is not clear to me (or I am looking in the wrong place :) ). The available API documentation for .NET Spark has the Option(s) methods but the accepted valid options I cannot find...

Exception with Spark 2.4.x
Moreover, the sample code below works fine (i.e. it receives data from Kafka) with spark version 2.3.x (microsoft-spark-2.3.x-0.3.0.jar) but throws a Java exception with spark version 2.4.x (microsoft-spark-2.4.x-0.3.0.jar)

image

Sample Code

        static void Main(string[] args)
        {
            SparkSession spark = SparkSession
                .Builder()
                .AppName("StructuredKafkaWordCount")
                .GetOrCreate();

            Console.WriteLine("============================= CONFIGURATION ===================");

            foreach(KeyValuePair<string, string> kvp in spark.SparkContext.GetConf().GetAll())
            {
                Console.WriteLine("Key = {0}, Value = {1}", kvp.Key, kvp.Value);
            }

            Console.WriteLine("============================= OUTPUT ===================");

            /*DataFrame lines = spark
                .ReadStream()
                .Format("socket")
                .Option("host", "0.0.0.0")
                .Option("port", 9999)
                .Load();*/

            DataFrame lines = spark
                .ReadStream()
                .Format("kafka")
                .Option("kafka.bootstrap.servers", "10.27.0.245:9092")
                .Option("subscribe", "test")
                .Load()
                .SelectExpr("CAST(value AS STRING)");


            DataFrame words = lines
                .Select(Explode(Split(lines["value"], " "))
                    .Alias("word"));
            DataFrame wordCounts = words.GroupBy("word").Count();

            Microsoft.Spark.Sql.Streaming.StreamingQuery query = wordCounts
                .WriteStream()
                .OutputMode("complete")
                .Format("console")
                .Start();

            query.AwaitTermination();
        }

For running the code I build the project in VS2019 and then in cmd:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 --class org.apache.spark.deploy.DotnetRunner --master local microsoft-spark-2.3.x-0.3.0.jar dotnet HelloSpark.dll

Also, you must have a Producer sending data to a Kafka topic from which to stream data.

@petmoy petmoy added the bug Something isn't working label Jun 14, 2019
@petmoy
Copy link
Author

petmoy commented Jun 14, 2019

@imback82

@imback82
Copy link
Contributor

Exception with Spark 2.4.x
Moreover, the sample code below works fine (i.e. it receives data from Kafka) with spark version 2.3.x (microsoft-spark-2.3.x-0.3.0.jar) but throws a Java exception with spark version 2.4.x (microsoft-spark-2.4.x-0.3.0.jar)

For Spark 2.4.x, what's the command you ran? Did you make sure you are using 2.4 version for spark-sql-kafka: https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10?

@imback82
Copy link
Contributor

imback82 commented Jun 14, 2019

  • I cannot find how to achieve continuous streaming. Data are processed in batches periodically every n seconds

This is how Spark streaming works: http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#programming-model

In general, the way of setting configurations such as triggers, batch interval, windows size, sliding windows size it is not clear to me (or I am looking in the wrong place :) ). The available API documentation for .NET Spark has the Option(s) methods but the accepted valid options I cannot find...

Please refer to the link I pasted above.

@imback82 imback82 added question Further information is requested and removed bug Something isn't working labels Jun 14, 2019
@imback82 imback82 changed the title [BUG]: Kafka continuous streaming and configurations [Question]: Kafka continuous streaming and configurations Jun 14, 2019
@petmoy
Copy link
Author

petmoy commented Jun 14, 2019

@imback82 Yes you are right, the spark sql version was wrong.

@petmoy
Copy link
Author

petmoy commented Jun 14, 2019

@imback82 Thanks for the help regarding the versions but the maven link I don't see how it can help me with the configurations.

As I mentioned, I am trying to achieve continuous processing with .NET Spark. Here https://spark.apache.org/docs/2.4.3/structured-streaming-programming-guide.html#triggers I see how this can be done with other languages and I am looking for the equilavent for .NET Spark since a Trigger API does not seem to exist

image

@imback82
Copy link
Contributor

@imback82 Thanks for the help regarding the versions but the maven link I don't see how it can help me with the configurations.

Sorry, I fixed the link.

As I mentioned, I am trying to achieve continuous processing with .NET Spark. Here https://spark.apache.org/docs/2.4.3/structured-streaming-programming-guide.html#triggers I see how this can be done with other languages and I am looking for the equilavent for .NET Spark since a Trigger API does not seem to exist

Got it. Looks like we are missing the API. We will add this in the next release.

@imback82 imback82 changed the title [Question]: Kafka continuous streaming and configurations [FEATURE REQUEST]: Add Trigger() to DataStreamWriter. Jun 14, 2019
@imback82 imback82 added enhancement New feature or request and removed question Further information is requested labels Jun 14, 2019
@imback82 imback82 added this to the July 2019 milestone Jun 14, 2019
@imback82 imback82 self-assigned this Jun 15, 2019
@imback82 imback82 modified the milestones: July 2019, June 2019 Jun 15, 2019
@danny8002
Copy link
Contributor

waiting for this feature ready.

@imback82
Copy link
Contributor

@danny8002 we expect the next release to be the second week of July.

@danny8002
Copy link
Contributor

it is too late for me, could you please send a workable PR? I just implement Trigger() by myself, see #153 , ( i just write it according to spark/Trigger.java). but when i test it , i found Trigger.Continuous() don't work. (Trigger.ProcessingTime works).

here is the code with Trigger.Continuous();

            
            var kafkaServer = string.Join(",", args.Input.KafkaHost);
            var inputKafka = args.Input.Topic;
            var outputKafka = args.Output.Topic;


            SparkSession spark = SparkSession
                .Builder()
                .AppName("Play Spark")
                .GetOrCreate();

            var udfReg = spark.Udf();
            udfReg.Register<string, string, string>("verify", (a,b)=>a+b);


            DataFrame lines = spark
                .ReadStream()
                .Format("kafka")
                .Option("kafka.bootstrap.servers", kafkaServer)
                .Option("subscribe", inputKafka)
                .Load()
                .SelectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

            DataFrame verify = lines.Select(
                Functions.CallUDF("verify", lines.Col("key"), lines.Col("value")).Alias("value"))
                .WithColumn("topic", Functions.Lit(outputKafka));

            var toKafka = verify
                 .WriteStream()
                 .Format("kafka")
                 .Option("kafka.bootstrap.servers", kafkaServer)
                 .Option("checkpointLocation", "file:///d:/temp/abc")
                 .Trigger(Trigger.Continuous(2000))
                 .OutputMode(OutputMode.Update)
                 .Start();

            toKafka.AwaitTermination();

Microsoft.Spark.Worker.exe runs without data, and then closed immediately (my other program keeps sending data to input kafka with qps = 2 records/s), see

stderr

Spark Executor Command: "C:\Progra~1\Java\jdk1.8.0_31\\bin\java" "-cp" "D:\work\v2\spark-2.3.1-bin-hadoop2.7\bin\..\conf\;D:\work\v2\spark-2.3.1-bin-hadoop2.7\jars\*" "-Xmx16384M" "-Dspark.driver.port=5108" "-Dspark.network.timeout=300s" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@localhost:5108" "--executor-id" "0" "--hostname" "localhost" "--cores" "2" "--app-id" "app-20190627112318-0016" "--worker-url" "spark://Worker@localhost:25332"
========================================

DotnetWorker PID:[19044] Args:[-m pyspark.worker] SparkVersion:[2.3.1]
[2019-06-27T03:23:27.0367984Z] [SZ_M3] [Info] [SimpleWorker] RunSimpleWorker() is starting with port = 5240.
[2019-06-27T03:23:27.1087977Z] [SZ_M3] [Info] [TaskRunner] [0] Starting with ReuseSocket[False].
[2019-06-27T03:23:27.1658004Z] [SZ_M3] [Debug] [TaskRunner] [0] Received END_OF_STREAM signal.
[2019-06-27T03:23:27.1658004Z] [SZ_M3] [Info] [TaskRunner] [0] Processed a task: readComplete:True, entries:0
[2019-06-27T03:23:27.1668005Z] [SZ_M3] [Info] [TaskRunner] [0] Waiting for JVM side to close socket.
[2019-06-27T03:23:27.2238012Z] [SZ_M3] [Info] [TaskRunner] [0] JVM side has closed socket.
[2019-06-27T03:23:27.2238012Z] [SZ_M3] [Info] [TaskRunner] [0] Finished running 1 task(s).
[2019-06-27T03:23:27.2238012Z] [SZ_M3] [Info] [SimpleWorker] RunSimpleWorker() finished successfully

stdout

2019-06-27 12:10:13 INFO  MemoryStore:54 - Block broadcast_0_piece0 stored as bytes in memory (estimated size 7.5 KB, free 8.4 GB)
2019-06-27 12:10:18 INFO  TorrentBroadcast:54 - Reading broadcast variable 0 took 5101 ms
2019-06-27 12:10:18 INFO  MemoryStore:54 - Block broadcast_0 stored as values in memory (estimated size 17.3 KB, free 8.4 GB)
2019-06-27 12:10:19 ERROR Executor:91 - Exception in task 0.1 in stage 0.0 (TID 1)
org.apache.spark.sql.execution.streaming.continuous.ContinuousTaskRetryException: Continuous execution does not support task retry
	at org.apache.spark.sql.execution.streaming.continuous.ContinuousDataSourceRDD.compute(ContinuousDataSourceRDDIter.scala:53)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
2019-06-27 12:10:24 INFO  CoarseGrainedExecutorBackend:54 - Got assigned task 2

master log:

2019-06-27 11:24:33 INFO  AbstractCoordinator:542 - Marking the coordinator 10.0.75.1:9092 (id: 2147483647 rack: null) dead for group spark-kafka-source-f9bf6566-f878-4057-98bb-4c51066aa826--111507653-driver-0
Exception in thread "epoch update thread for [id = ecaa75ec-cfaa-422f-8f96-efb673f47cb3, runId = 0d093931-162e-417f-a30f-f17ef701e1d4]" org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [300 seconds]. This timeout is controlled by spark.network.timeout
        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
        at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
        at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
        at org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anon$2$$anonfun$run$1.apply$mcZ$sp(ContinuousExecution.scala:246)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anon$2.run(ContinuousExecution.scala:235)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)

Hope helps from you!

@imback82
Copy link
Contributor

it is too late for me, could you please send a workable PR? I just implement Trigger() by myself, see #153 ,

@danny8002 I will review this.

Microsoft.Spark.Worker.exe runs without data, and then closed immediately (my other program keeps sending data to input kafka with qps = 2 records/s)

This is expected for SimpleWorker. It will be re-launched per task.

Can you first check if continous works fine without UDFs (so that it doesn't involve the worker)?

@danny8002
Copy link
Contributor

danny8002 commented Jun 27, 2019

Yes, it works without UDF

   DataFrame verify = lines.Select(
                Functions.CallUDF("verify", lines.Col("key"), lines.Col("value")).Alias("value"))
                .WithColumn("topic", Functions.Lit(outputKafka));

=>

    DataFrame verify = lines.Select(
               Functions.Concat(lines.Col("key"), lines.Col("value")).Alias("value"))
              .WithColumn("topic", Functions.Lit(outputKafka));

As your say 'it is expected', how to let UDF works? how to let the worker/Task start again? I write the same program with Java and it works for UDF. and i am curious how python works ...

@danny8002
Copy link
Contributor

any insight about running UDF ?

@suhsteve
Copy link
Member

suhsteve commented Jul 15, 2019

@danny8002 I've reproduced your test using pyspark and I'm also encountering issues using continuous trigger and UDFs. This seems to be a known issue Spark-27234 and there is an active PR that should address it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants