Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2377] Python API for Streaming #2538

Closed
wants to merge 365 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
365 commits
Select commit Hold shift + click to select a range
1fd12ae
WIP
giwa Aug 4, 2014
c05922c
WIP: added PythonTestInputStream
giwa Aug 5, 2014
1f68b78
WIP
giwa Aug 7, 2014
3dda31a
WIP added test case
giwa Aug 11, 2014
7f96294
added basic operation test cases
giwa Aug 11, 2014
fa75d71
delete waste file
giwa Aug 11, 2014
8efa266
fixed PEP-008 violation
giwa Aug 11, 2014
3a671cc
remove export PYSPARK_PYTHON in spark submit
giwa Aug 11, 2014
774f18d
removed unnesessary changes
giwa Aug 11, 2014
33c0f94
edited the comment to add more precise description
giwa Aug 11, 2014
4f2d7e6
added mapValues and flatMapVaules WIP for glom and mapPartitions test
giwa Aug 11, 2014
9767712
WIP: solved partitioned and None is not recognized
giwa Aug 14, 2014
35933e1
broke something
giwa Aug 14, 2014
7051a84
all tests are passed if numSlice is 2 and the numver of each input is…
giwa Aug 15, 2014
99e4bb3
basic function test cases are passed
giwa Aug 15, 2014
580fbc2
modified streaming test case to add coment
giwa Aug 15, 2014
94f2b65
remove waste duplicated code
giwa Aug 15, 2014
e9fab72
added saveAsTextFiles and saveAsPickledFiles
giwa Aug 16, 2014
4aa99e4
added TODO coments
giwa Aug 16, 2014
6d8190a
add comments
giwa Aug 18, 2014
14d4c0e
removed wasted print in DStream
giwa Aug 18, 2014
97742fe
added sparkContext as input parameter in StreamingContext
giwa Aug 18, 2014
e162822
added gorupByKey testcase
giwa Aug 18, 2014
e70f706
added testcase for combineByKey
giwa Aug 18, 2014
f1798c4
merge with master
giwa Aug 18, 2014
185fdbf
merge with master
giwa Aug 19, 2014
199e37f
adopted the latest compression way of python command
giwa Aug 19, 2014
58150f5
Changed the test case to focus the test operation
giwa Aug 19, 2014
09a28bf
improve testcases
giwa Aug 19, 2014
268a6a5
Changed awaitTermination not to call awaitTermincation in Scala. Just…
giwa Aug 19, 2014
4dedd2d
change test case not to use awaitTermination
giwa Aug 19, 2014
171edeb
clean up
giwa Aug 20, 2014
f0ea311
clean up code
giwa Aug 21, 2014
1d84142
remove unimplement test
giwa Aug 21, 2014
583e66d
move tests for streaming inside streaming directory
giwa Aug 21, 2014
b7dab85
improve test case
giwa Aug 21, 2014
0d30109
fixed pep8 violation
giwa Aug 21, 2014
24f95db
clen up examples
giwa Aug 21, 2014
9c85e48
clean up exmples
giwa Aug 21, 2014
7339df2
fixed typo
giwa Aug 21, 2014
9d1de23
revert pom.xml
giwa Aug 21, 2014
4f82c89
remove duplicated import
giwa Aug 21, 2014
50fd6f9
revert pom.xml
giwa Aug 21, 2014
93f7637
fixed explanaiton
giwa Aug 21, 2014
acfcaeb
revert pom.xml
giwa Aug 21, 2014
3b27bd4
remove the last brank line
giwa Aug 21, 2014
2ea769e
added comment in dstream._test_output
giwa Aug 21, 2014
c97377c
delete inproper comments
giwa Aug 21, 2014
67473a9
delete not implemented functions
giwa Aug 21, 2014
d9d59fe
Fix scalastyle errors
Aug 26, 2014
1fd6bc7
Merge pull request #2 from mattf/giwa-master
giwa Aug 28, 2014
4afa390
clean up code
giwa Aug 31, 2014
d68b568
clean up code
giwa Aug 31, 2014
da09768
added StreamingContext.remember
giwa Aug 31, 2014
f5bfb70
added StreamingContext.sparkContext
giwa Aug 31, 2014
fdc9125
added comment for StreamingContext.sparkContext
giwa Aug 31, 2014
ee50c5a
added atexit to handle callback server
giwa Aug 31, 2014
f7bc8f9
WIP:added more test for StreamingContext
giwa Aug 31, 2014
150b94c
added some StreamingContextTestSuite
giwa Sep 1, 2014
454981d
initial commit for pySparkStreaming
giwa Jul 9, 2014
b406252
comment PythonDStream.PairwiseDStream
Jul 15, 2014
87438e2
modify dstream.py to fix indent error
Jul 16, 2014
d7b4d6f
added reducedByKey not working yet
Jul 16, 2014
1a0f065
implementing transform function in Python
Jul 16, 2014
17a74c6
modified the code base on comment in https://github.com/tdas/spark/pu…
Jul 16, 2014
494cae5
remove not implemented DStream functions in python
Jul 16, 2014
e1df940
revert pom.xml
Jul 16, 2014
5bac7ec
revert streaming/pom.xml
Jul 16, 2014
d2099d8
sorted the import following Spark coding convention
Jul 16, 2014
224fc5e
add empty line
Jul 16, 2014
bb7ccf3
remove unused import in python
Jul 16, 2014
f746109
initial commit for socketTextStream
Jul 16, 2014
0d1b954
fied input of socketTextDStream
Jul 16, 2014
ccfd214
added doctest for pyspark.streaming.duration
Jul 17, 2014
b31446a
fixed typo of network_workdcount.py
Jul 17, 2014
dc6995d
delete old file
Jul 17, 2014
c455c8d
added reducedByKey not working yet
Jul 16, 2014
6f98e50
reduceByKey is working
Jul 17, 2014
15feea9
edit python sparkstreaming example
Jul 18, 2014
d3ee86a
added count operation but this implementation need double check
Jul 19, 2014
72b9738
fix map function
Jul 20, 2014
bab31c1
clean up code
Jul 20, 2014
0a8bbbb
clean up codes
Jul 20, 2014
678e854
remove waste file
Jul 20, 2014
b1d2a30
Implemented DStream.foreachRDD in the Python API using Py4J callback …
tdas Jul 23, 2014
05e991b
Added missing file
tdas Aug 1, 2014
9ab8952
Added extra line.
tdas Aug 1, 2014
84a9668
tried to restart callback server
Aug 2, 2014
3b498e1
Kill py4j callback server properly
Aug 3, 2014
b349649
Removed the waste line
giwa Aug 3, 2014
3c45cd2
implemented reduce and count function in Dstream
giwa Aug 4, 2014
d2c01ba
clean up examples
giwa Aug 4, 2014
c462bb3
added stop in StreamingContext
giwa Aug 4, 2014
4d40d63
clean up dstream.py
giwa Aug 4, 2014
29c2bc5
initial commit for testcase
giwa Aug 4, 2014
fe648e3
WIP
giwa Aug 4, 2014
8a0fbbc
update comment
giwa Aug 4, 2014
1523b66
WIP
giwa Aug 4, 2014
1df77f5
WIP: added PythonTestInputStream
giwa Aug 5, 2014
9ad6855
WIP
giwa Aug 7, 2014
ce2acd2
WIP added test case
giwa Aug 11, 2014
878bad7
added basic operation test cases
giwa Aug 11, 2014
f21cab3
delete waste file
giwa Aug 11, 2014
3d37822
fixed PEP-008 violation
giwa Aug 11, 2014
253a863
removed unnesessary changes
giwa Aug 11, 2014
bb10956
edited the comment to add more precise description
giwa Aug 11, 2014
270a9e1
added mapValues and flatMapVaules WIP for glom and mapPartitions test
giwa Aug 11, 2014
bcdec33
WIP: solved partitioned and None is not recognized
giwa Aug 14, 2014
ff14070
broke something
giwa Aug 14, 2014
3000b2b
all tests are passed if numSlice is 2 and the numver of each input is…
giwa Aug 15, 2014
13fb44c
basic function test cases are passed
giwa Aug 15, 2014
18c8723
modified streaming test case to add coment
giwa Aug 15, 2014
f76c182
remove waste duplicated code
giwa Aug 15, 2014
74535d4
added saveAsTextFiles and saveAsPickledFiles
giwa Aug 16, 2014
16aa64f
added TODO coments
giwa Aug 16, 2014
e54f986
add comments
giwa Aug 18, 2014
10b5b04
removed wasted print in DStream
giwa Aug 18, 2014
10ab87b
added sparkContext as input parameter in StreamingContext
giwa Aug 18, 2014
5625bdc
added gorupByKey testcase
giwa Aug 18, 2014
c214199
added testcase for combineByKey
giwa Aug 18, 2014
0b99bec
initial commit for pySparkStreaming
giwa Jul 9, 2014
41886c2
comment PythonDStream.PairwiseDStream
Jul 15, 2014
66fcfff
modify dstream.py to fix indent error
Jul 16, 2014
38adf95
added reducedByKey not working yet
Jul 16, 2014
4bcb318
implementing transform function in Python
Jul 16, 2014
247fd74
modified the code base on comment in https://github.com/tdas/spark/pu…
Jul 16, 2014
dd6de81
initial commit for socketTextStream
Jul 16, 2014
f485b1d
fied input of socketTextDStream
Jul 16, 2014
0df7111
delete old file
Jul 17, 2014
58591d2
reduceByKey is working
Jul 17, 2014
98c2a00
added count operation but this implementation need double check
Jul 19, 2014
eb4bf48
fix map function
Jul 20, 2014
6197a11
clean up code
Jul 20, 2014
2ad7bd3
clean up codes
Jul 20, 2014
fe02547
remove waste file
Jul 20, 2014
4f07163
Implemented DStream.foreachRDD in the Python API using Py4J callback …
tdas Jul 23, 2014
54b5358
tried to restart callback server
Aug 2, 2014
88f7506
Kill py4j callback server properly
Aug 3, 2014
1b83354
Removed the waste line
giwa Aug 3, 2014
92e333e
implemented reduce and count function in Dstream
giwa Aug 4, 2014
0b09cff
added stop in StreamingContext
giwa Aug 4, 2014
932372a
clean up dstream.py
giwa Aug 4, 2014
376e3ac
WIP
giwa Aug 4, 2014
1934726
update comment
giwa Aug 4, 2014
019ef38
WIP
giwa Aug 4, 2014
5c04a5f
WIP: added PythonTestInputStream
giwa Aug 5, 2014
bd3ba53
WIP
giwa Aug 7, 2014
9cde7c9
WIP added test case
giwa Aug 11, 2014
b3b0362
added basic operation test cases
giwa Aug 11, 2014
99410be
delete waste file
giwa Aug 11, 2014
c1d546e
fixed PEP-008 violation
giwa Aug 11, 2014
af610d3
removed unnesessary changes
giwa Aug 11, 2014
953deb0
edited the comment to add more precise description
giwa Aug 11, 2014
f67cf57
added mapValues and flatMapVaules WIP for glom and mapPartitions test
giwa Aug 11, 2014
1e126bf
WIP: solved partitioned and None is not recognized
giwa Aug 14, 2014
795b2cd
broke something
giwa Aug 14, 2014
8dcda84
all tests are passed if numSlice is 2 and the numver of each input is…
giwa Aug 15, 2014
c5ecfc1
basic function test cases are passed
giwa Aug 15, 2014
2a06cdb
remove waste duplicated code
giwa Aug 15, 2014
99ce042
added saveAsTextFiles and saveAsPickledFiles
giwa Aug 16, 2014
ddd4ee1
added TODO coments
giwa Aug 16, 2014
af336b7
add comments
giwa Aug 18, 2014
455e5af
removed wasted print in DStream
giwa Aug 18, 2014
58e41ff
merge with master
giwa Aug 18, 2014
e80647e
adopted the latest compression way of python command
giwa Aug 19, 2014
c00e091
change test case not to use awaitTermination
giwa Aug 19, 2014
3166d31
clean up
giwa Aug 20, 2014
f198d14
clean up code
giwa Aug 21, 2014
b171ec3
fixed pep8 violation
giwa Aug 21, 2014
f04882c
clen up examples
giwa Aug 21, 2014
62dc7a3
clean up exmples
giwa Aug 21, 2014
7dc7391
fixed typo
giwa Aug 21, 2014
6ae3caa
revert pom.xml
giwa Aug 21, 2014
fa4af88
remove duplicated import
giwa Aug 21, 2014
066ba90
revert pom.xml
giwa Aug 21, 2014
8ed93af
fixed explanaiton
giwa Aug 21, 2014
fbed8da
revert pom.xml
giwa Aug 21, 2014
bebb3f3
remove the last brank line
giwa Aug 21, 2014
b0f2015
added comment in dstream._test_output
giwa Aug 21, 2014
f385976
delete inproper comments
giwa Aug 21, 2014
c0a06bc
delete not implemented functions
giwa Aug 21, 2014
2fdf0de
Fix scalastyle errors
Aug 26, 2014
d542743
clean up code
giwa Aug 31, 2014
d39f102
added StreamingContext.remember
giwa Aug 31, 2014
63c881a
added StreamingContext.sparkContext
giwa Aug 31, 2014
d5f5fcb
added comment for StreamingContext.sparkContext
giwa Aug 31, 2014
8ffdbf1
added atexit to handle callback server
giwa Aug 31, 2014
4a59e1e
WIP:added more test for StreamingContext
giwa Aug 31, 2014
2d32a74
added some StreamingContextTestSuite
giwa Sep 1, 2014
e685853
meged with rebased 1.1 branch
giwa Sep 20, 2014
5cdb6fa
changed for SCCallSiteSync
giwa Sep 21, 2014
550dfd9
WIP fixing 1.1 merge
giwa Sep 21, 2014
df098fc
Merge branch 'master' into giwa
davies Sep 25, 2014
7f53086
support transform(), refactor and cleanup
davies Sep 25, 2014
7339be0
delete tests
davies Sep 25, 2014
bd27874
fix scala style
davies Sep 26, 2014
9a57685
fix python style
davies Sep 26, 2014
eec401e
refactor, combine TransformedRDD, fix reuse PythonRDD, fix union
davies Sep 26, 2014
bd13026
fix examples
davies Sep 26, 2014
d357b70
support windowed dstream
davies Sep 26, 2014
c28f520
support updateStateByKey
davies Sep 26, 2014
3f0fb4b
refactor fix tests
davies Sep 27, 2014
c499ba0
remove Time and Duration
davies Sep 27, 2014
604323f
enable streaming tests
davies Sep 27, 2014
b32774c
move java_import into streaming
davies Sep 27, 2014
74df565
fix print and docs
davies Sep 27, 2014
26ea396
refactor
davies Sep 28, 2014
7001b51
refactor of queueStream()
davies Sep 28, 2014
fce0ef5
rafactor of foreachRDD()
davies Sep 28, 2014
e059ca2
move check of window into Python
davies Sep 28, 2014
847f9b9
add more docs, add first(), take()
davies Sep 28, 2014
b983f0f
address comments
davies Sep 29, 2014
98ac6c2
support ssc.transform()
davies Sep 29, 2014
c40c52d
change first(), take(n) to has the same behavior as RDD
davies Sep 29, 2014
6ebceca
add more tests
davies Sep 29, 2014
19797f9
clean up
davies Sep 29, 2014
338580a
change _first(), _take(), _collect() as private API
davies Sep 30, 2014
069a94c
fix the number of partitions during window()
davies Sep 30, 2014
e00136b
address comments
davies Sep 30, 2014
eed6e2a
rollback not needed changes
davies Sep 30, 2014
b98d63f
change private[spark] to private[python]
davies Sep 30, 2014
9a16bd1
change number of partitions during tests
davies Sep 30, 2014
8466916
support checkpoint
davies Sep 30, 2014
a13ff34
address comments
davies Sep 30, 2014
fa7261b
refactor
davies Sep 30, 2014
6f0da2f
recover from checkpoint
davies Oct 1, 2014
d328aca
fix serializer in queueStream
davies Oct 1, 2014
ff88bec
rename RDDFunction to TransformFunction
davies Oct 1, 2014
7797c70
refactor
davies Oct 1, 2014
bd8a4c2
fix scala style
davies Oct 1, 2014
7a88f9f
rollback RDD.setContext(), use textFileStream() to test checkpointing
davies Oct 1, 2014
c2b31cb
Merge branch 'master' of github.com:apache/spark into streaming
davies Oct 1, 2014
54bd92b
improve tests
davies Oct 2, 2014
4d0ea8b
clear reference of SparkEnv after stop
davies Oct 2, 2014
6bb9d91
Merge branch 'master' of github.com:apache/spark into streaming
davies Oct 2, 2014
c7bbbce
fix sphinx docs
davies Oct 2, 2014
8071541
Merge branch 'env' into streaming
davies Oct 2, 2014
be5e5ff
merge branch of env, make tests stable.
davies Oct 2, 2014
d05871e
remove reuse of PythonRDD
davies Oct 2, 2014
37fe06f
use random port for callback server
davies Oct 2, 2014
e108ec1
address comments
davies Oct 3, 2014
52c535b
remove fix for sum()
davies Oct 3, 2014
8380064
Merge branch 'master' of github.com:apache/spark into streaming
davies Oct 6, 2014
6db00da
Merge branch 'master' of github.com:apache/spark into streaming
davies Oct 7, 2014
bebeb4a
address all comments
davies Oct 7, 2014
02d0575
add wrapper for foreachRDD()
davies Oct 10, 2014
182be73
Merge branch 'master' of github.com:apache/spark into streaming
davies Oct 10, 2014
3e2492b
change updateStateByKey() to easy API
davies Oct 10, 2014
331ecce
fix example
davies Oct 11, 2014
64561e4
fix tests
davies Oct 11, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.language.existentials
import scala.reflect.ClassTag
import scala.util.{Try, Success, Failure}

import net.razorvine.pickle.{Pickler, Unpickler}

Expand All @@ -42,7 +40,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

private[spark] class PythonRDD(
parent: RDD[_],
@transient parent: RDD[_],
command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
Expand All @@ -55,9 +53,9 @@ private[spark] class PythonRDD(
val bufferSize = conf.getInt("spark.buffer.size", 65536)
val reuse_worker = conf.getBoolean("spark.python.worker.reuse", true)

override def getPartitions = parent.partitions
override def getPartitions = firstParent.partitions

override val partitioner = if (preservePartitoning) parent.partitioner else None
override val partitioner = if (preservePartitoning) firstParent.partitioner else None

override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
val startTime = System.currentTimeMillis
Expand Down Expand Up @@ -234,7 +232,7 @@ private[spark] class PythonRDD(
dataOut.writeInt(command.length)
dataOut.write(command)
// Data values
PythonRDD.writeIteratorToStream(parent.iterator(split, context), dataOut)
PythonRDD.writeIteratorToStream(firstParent.iterator(split, context), dataOut)
dataOut.writeInt(SpecialLengths.END_OF_DATA_SECTION)
dataOut.flush()
} catch {
Expand Down
49 changes: 49 additions & 0 deletions examples/src/main/python/streaming/hdfs_wordcount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Counts words in new text files created in the given directory
Usage: hdfs_wordcount.py <directory>
<directory> is the directory that Spark Streaming will use to find and read new text files.

To run this on your local machine on directory `localdir`, run this example
$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localdir

Then create a text file in `localdir` and the words in the file will get counted.
"""

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to network_wordcount.py, can you add comments on how to run this example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if len(sys.argv) != 2:
print >> sys.stderr, "Usage: hdfs_wordcount.py <directory>"
exit(-1)

sc = SparkContext(appName="PythonStreamingHDFSWordCount")
ssc = StreamingContext(sc, 1)

lines = ssc.textFileStream(sys.argv[1])
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda x: (x, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()
48 changes: 48 additions & 0 deletions examples/src/main/python/streaming/network_wordcount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
Usage: network_wordcount.py <hostname> <port>
<hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.

To run this on your local machine, you need to first run a Netcat server
`$ nc -lk 9999`
and then run the example
`$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999`
"""

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i forgot to mention, can you add instruction on how to run the example (along with nc, etc.) as doc comments? See the comments in the scala / java NetworkWordCount.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if len(sys.argv) != 3:
print >> sys.stderr, "Usage: network_wordcount.py <hostname> <port>"
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1)

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()
57 changes: 57 additions & 0 deletions examples/src/main/python/streaming/stateful_network_wordcount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Counts words in UTF8 encoded, '\n' delimited text received from the
network every second.

Usage: stateful_network_wordcount.py <hostname> <port>
<hostname> and <port> describe the TCP server that Spark Streaming
would connect to receive data.

To run this on your local machine, you need to first run a Netcat server
`$ nc -lk 9999`
and then run the example
`$ bin/spark-submit examples/src/main/python/streaming/stateful_network_wordcount.py \
localhost 9999`
"""

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
if len(sys.argv) != 3:
print >> sys.stderr, "Usage: stateful_network_wordcount.py <hostname> <port>"
exit(-1)
sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")

def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
running_counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.updateStateByKey(updateFunc)

running_counts.pprint()

ssc.start()
ssc.awaitTermination()
2 changes: 1 addition & 1 deletion python/docs/epytext.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
(r"L{([\w.()]+)}", r":class:`\1`"),
(r"[LC]{(\w+\.\w+)\(\)}", r":func:`\1`"),
(r"C{([\w.()]+)}", r":class:`\1`"),
(r"[IBCM]{(.+)}", r"`\1`"),
(r"[IBCM]{([^}]+)}", r"`\1`"),
('pyspark.rdd.RDD', 'RDD'),
)

Expand Down
1 change: 1 addition & 0 deletions python/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Contents:

pyspark
pyspark.sql
pyspark.streaming
pyspark.mllib


Expand Down
3 changes: 2 additions & 1 deletion python/docs/pyspark.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ Subpackages
.. toctree::
:maxdepth: 1

pyspark.mllib
pyspark.sql
pyspark.streaming
pyspark.mllib

Contents
--------
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class SparkContext(object):

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
gateway=None):
gateway=None, jsc=None):
"""
Create a new SparkContext. At least the master and app name should be set,
either through the named parameters here or through C{conf}.
Expand Down Expand Up @@ -104,14 +104,14 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
SparkContext._ensure_initialized(self, gateway=gateway)
try:
self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
conf)
conf, jsc)
except:
# If an error occurs, clean up in order to allow future SparkContext creation:
self.stop()
raise

def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
conf):
conf, jsc):
self.environment = environment or {}
self._conf = conf or SparkConf(_jvm=self._jvm)
self._batchSize = batchSize # -1 represents an unlimited batch size
Expand Down Expand Up @@ -154,7 +154,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
self.environment[varName] = v

# Create the Java SparkContext through Py4J
self._jsc = self._initialize_context(self._conf._jconf)
self._jsc = jsc or self._initialize_context(self._conf._jconf)

# Create a single Accumulator in Java that we'll send all our updates through;
# they will be passed back to us through a TCP server
Expand Down
3 changes: 3 additions & 0 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ def __ne__(self, other):
def __repr__(self):
return "<%s object>" % self.__class__.__name__

def __hash__(self):
return hash(str(self))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar question: are the changes in this file necessary for streaming or was part of the refactoring?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary, we need to check the serializers of dstreams.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha.



class FramedSerializer(Serializer):

Expand Down
21 changes: 21 additions & 0 deletions python/pyspark/streaming/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from pyspark.streaming.context import StreamingContext
from pyspark.streaming.dstream import DStream

__all__ = ['StreamingContext', 'DStream']
Loading