-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathdemo_spark.py
57 lines (47 loc) · 1.29 KB
/
demo_spark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time : 2020/6/3 14:47
# @Author : way
# @Site :
# @Describe: 操作 spark
from pyspark import SparkContext
from operator import add
# 每个程序只能有一个 SparkContext
sc = SparkContext("local", "test")
# sc = SparkContext("spark://host:port", "test") # 提交到 spark 远程执行
words = sc.parallelize(
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"
])
# count
counts = words.count()
print("Number of elements in RDD -> %i" % counts)
# collect
coll = words.collect()
print("Elements in RDD -> %s" % coll)
# foreach
fore = words.foreach(lambda x: print(x) if 'spark' in x else ...)
# filter
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print("Fitered RDD -> %s" % (filtered))
# map
words_map = words.map(lambda x: 'hello ' + x)
mapping = words_map.collect()
print("Key value pair -> %s" % (mapping))
# reduce
nums = sc.parallelize([i for i in range(100)])
adding = nums.reduce(add)
print("Adding all the elements -> %i" % (adding))
# join
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print("Join RDD -> %s" % (final))