Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

spark-submit must work with the local kubectl proxy endpoint #63

Closed
foxish opened this issue Jan 30, 2017 · 10 comments
Closed

spark-submit must work with the local kubectl proxy endpoint #63

foxish opened this issue Jan 30, 2017 · 10 comments

Comments

@foxish
Copy link
Member

foxish commented Jan 30, 2017

~/g/s/a/spark (k8s-support-alternate-incremental=) kubectl proxy
Starting to serve on 127.0.0.1:8001

~/g/s/a/spark (k8s-support-alternate-incremental=) 
~/g/s/a/spark (k8s-support-alternate-incremental=) ./bin/spark-submit \
                                                   --deploy-mode cluster \
                                                   --class org.apache.spark.examples.SparkPi \
                                                   --master k8s://http://127.0.0.1:8001 \
                                                   --conf spark.executor.instances=5 \
                                                   --conf spark.app.name=foxish \
                                                   --conf spark.kubernetes.namespace=default \
                                                   --conf spark.kubernetes.driver.docker.image=foxish/spark-driver:0.92 \
                                                   --conf spark.kubernetes.executor.docker.image=foxish/spark-executor:0.92 \
                                                   /home/original-spark-examples_2.11-2.2.0-SNAPSHOT.jar 10000
2017-01-30 15:22:11 ERROR WatchConnectionManager:167 - Exec Failure: HTTP:200. Message:
java.net.ProtocolException: Expected HTTP 101 response but was '200 OK'
	at okhttp3.ws.WebSocketCall.createWebSocket(WebSocketCall.java:122)
	at okhttp3.ws.WebSocketCall.access$000(WebSocketCall.java:41)
	at okhttp3.ws.WebSocketCall$1.onResponse(WebSocketCall.java:97)
	at okhttp3.RealCall$AsyncCall.execute(RealCall.java:126)
	at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Exception in thread "main" io.fabric8.kubernetes.client.KubernetesClientException: 
	at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onFailure(WatchConnectionManager.java:169)
	at okhttp3.ws.WebSocketCall$1.onResponse(WebSocketCall.java:99)
	at okhttp3.RealCall$AsyncCall.execute(RealCall.java:126)
	at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Exception in thread "OkHttp Dispatcher" java.lang.NullPointerException
	at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onClose(WatchConnectionManager.java:254)
	at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onFailure(WatchConnectionManager.java:197)
	at okhttp3.ws.WebSocketCall$1.onResponse(WebSocketCall.java:99)
	at okhttp3.RealCall$AsyncCall.execute(RealCall.java:126)
	at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Currently, spark-submit fails against the local endpoint obtained by running kubectl proxy. Since the fabric8 client does not support the other auth providers, we need it to work against the kubectl proxy endpoint.

@iyanuobidele
Copy link

+1

I believe I had this same problem while I was working on the TPR fix on your fork.

I think it has to do with a bug in k8s. See links below
issue1, issue2

@foxish
Copy link
Member Author

foxish commented Jan 31, 2017

@iyanuobidele Thanks for the pointer to those two issues. I'll investigate further.

@foxish
Copy link
Member Author

foxish commented Jan 31, 2017

Ah nvm. @iyanuobidele, authentication issue was at my end and unrelated. Looks like it is watches that are broken and since quite a long time. Maybe in the interim, we need some fallback if watch fails.

@iyanuobidele
Copy link

The exception comes from the okhttp3 client used internally by the fabric8 client in its watcher implementation. I traced it here.

Yes, I agree, but that changes a lot of things though. Since the create request is only sent after establishing a websocket connection

@foxish
Copy link
Member Author

foxish commented Jan 31, 2017

I'm not sure why they used the websockets implementation instead of the plain hanging HTTP GET like a lot of other clients. Websockets seem to have a little broken support, with kubectl proxy and the like. I wonder if they'd take a patch upstream changing how they perform watches.

@ash211
Copy link

ash211 commented Feb 1, 2017

@foxish submitting through kubectl proxy isn't a mode that Matt and I rely on for our workflows, so we'll probably be more passive on this one than some of the other issues/PRs. Still will CR of course though.

What is the path forward for getting this working? Do you intend to submit a PR to fabric8 and depend on a future release of that library?

@foxish
Copy link
Member Author

foxish commented Feb 1, 2017

@erikerlandson put me in contact with one of the guys who works on fabric8 kubernetes-client, who said that they would look into GKE authentication support. If that lands, we can avoid the kubectl proxy solution. If that does not seem viable, the alternatives are:

  • Allow hanging HTTP GETs in addition to websockets as a mechanism to perform watches in fabric8's client (I'll probably need to do make this change and get it merged upstream)
  • Change our code to fall back on polling when watch fails (I discussed this in today's meeting)

I'll have a little bit of discussion with the Fabric8 guys and then we can decide on the best course of action. In either case, I can take this issue.

@iyanuobidele
Copy link

@foxish
So about this, I was thinking, I did run into some similar issue before.

What I did was to extract the http client - like we did before - and then simply issue a get request with readTimeout(0, TimeUnit.MILLISECONDS) and essentially send out a new thread to block until there are bytes to read or the source is exhausted. The thread is blocked until there are available bytes. We get the bytes, deserialize it to a case class or whatever and send it to some parent code that needs it. This is essentially a Server Sent Event implementation.

We don't really need to send any messages to the server so a persistent connection with a one sender (server) should work. However, feel free to pull me into any of these conversations, I will be happy to help.

I could work on this and send in a PR on it shortly.

@foxish
Copy link
Member Author

foxish commented Feb 2, 2017

@iyanuobidele Thanks! I think that in addition to the hanging GET, we must also do a regular GET and then fetch the latest resource version. We should watch providing the resource version. Secondly, we'd need some robust handling of connection failures, or interruptions with a retry loop.

I think the change to allow watches using a hanging HTTP GET should live upstream in the fabric8 library itself. One of the authors said that they'd be okay with taking the patch as long as it is not too complicated. I'm planning on trying this today/tomorrow. If it turns out complicated, we could implement the "alternate-watch" in our spark fork rather than in fabric8's, and in that case, you could send the PR as you described.

@foxish
Copy link
Member Author

foxish commented Feb 7, 2017

Attempted a fix here: fabric8io/kubernetes-client#652
GKE issue was fixed in fabric8io/kubernetes-client#636.

The next release of fabric8's client should unblock us for the release.

ifilonenko pushed a commit to ifilonenko/spark that referenced this issue Apr 18, 2019
## What changes were proposed in this pull request?

In DataSourceV2Strategy, it seems we eliminate the subqueries by mistake after normalizing filters.
We have a sql with a scalar subquery:

``` scala
val plan = spark.sql("select * from t2 where t2a > (select max(t1a) from t1)")
plan.explain(true)
```

And we get the log info of DataSourceV2Strategy:
```
Pushing operators to csv:examples/src/main/resources/t2.txt
Pushed Filters:
Post-Scan Filters: isnotnull(t2a#30)
Output: t2a#30, t2b#31
```

The `Post-Scan Filters` should contain the scalar subquery, but we eliminate it by mistake.
```
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('t2a > scalar-subquery#56 [])
   :  +- 'Project [unresolvedalias('max('t1a), None)]
   :     +- 'UnresolvedRelation `t1`
   +- 'UnresolvedRelation `t2`

== Analyzed Logical Plan ==
t2a: string, t2b: string
Project [t2a#30, t2b#31]
+- Filter (t2a#30 > scalar-subquery#56 [])
   :  +- Aggregate [max(t1a#13) AS max(t1a)apache-spark-on-k8s#63]
   :     +- SubqueryAlias `t1`
   :        +- RelationV2[t1a#13, t1b#14] csv:examples/src/main/resources/t1.txt
   +- SubqueryAlias `t2`
      +- RelationV2[t2a#30, t2b#31] csv:examples/src/main/resources/t2.txt

== Optimized Logical Plan ==
Filter (isnotnull(t2a#30) && (t2a#30 > scalar-subquery#56 []))
:  +- Aggregate [max(t1a#13) AS max(t1a)apache-spark-on-k8s#63]
:     +- Project [t1a#13]
:        +- RelationV2[t1a#13, t1b#14] csv:examples/src/main/resources/t1.txt
+- RelationV2[t2a#30, t2b#31] csv:examples/src/main/resources/t2.txt

== Physical Plan ==
*(1) Project [t2a#30, t2b#31]
+- *(1) Filter isnotnull(t2a#30)
   +- *(1) BatchScan[t2a#30, t2b#31] class org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
```
## How was this patch tested?

ut

Closes apache#24321 from francis0407/SPARK-27411.

Authored-by: francis0407 <hanmingcong123@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants