-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathfinalpyscript.py
317 lines (279 loc) · 19.6 KB
/
finalpyscript.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
import pyspark
from pyspark.sql.functions import *
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.session import SparkSession
sc = SparkContext()
sqlContext = SQLContext(sc)
spark = SparkSession.builder.master("local").appName("app name").config("spark.some.config.option", 'true').getOrCreate()
sampledf1 = spark.read.option("header",True).csv("s3a://nycproject23/rawdata/yellow_tripdata_2014-01.csv")
#Sampling data
import pyspark.sql.functions as F
sampledf1 = sampledf1.sample(False, 0.50, seed=0)
sampledf1 = sampledf1.withColumnRenamed(' passenger_count',"passenger_count")
sampledf1 = sampledf1.withColumnRenamed(' trip_distance',"trip_distance")
sampledf1 = sampledf1.withColumnRenamed(' pickup_longitude',"pickup_longitude")
sampledf1 = sampledf1.withColumnRenamed(' pickup_latitude',"pickup_latitude")
sampledf1 = sampledf1.withColumnRenamed(' rate_code',"rate_code")
sampledf1 = sampledf1.withColumnRenamed(' store_and_fwd_flag',"store_and_fwd_flag")
sampledf1 = sampledf1.withColumnRenamed(' dropoff_longitude',"dropoff_longitude")
sampledf1 = sampledf1.withColumnRenamed(' dropoff_latitude',"dropoff_latitude")
sampledf1 = sampledf1.withColumnRenamed(' payment_type',"payment_type")
sampledf1 = sampledf1.withColumnRenamed(' fare_amount',"fare_amount")
sampledf1 = sampledf1.withColumnRenamed(' surcharge',"surcharge")
sampledf1 = sampledf1.withColumnRenamed(' mta_tax',"mta_tax")
sampledf1 = sampledf1.withColumnRenamed(' tip_amount',"tip_amount")
sampledf1 = sampledf1.withColumnRenamed(' tolls_amount',"tolls_amount")
sampledf1 = sampledf1.withColumnRenamed(' total_amount',"total_amount")
sampledf1 = sampledf1.withColumnRenamed(' pickup_datetime',"pickup_datetime")
sampledf1 = sampledf1.withColumnRenamed(' dropoff_datetime',"dropoff_datetime")
sampledf1.write.parquet("s3a://nycproject23/sampledrawdata/df1.parquet")
sampledf2 = spark.read.option("header",True).csv("s3a://nycproject23/rawdata/yellow_tripdata_2014-02.csv")
import pyspark.sql.functions as F
sampledf2 = sampledf2.sample(False, 0.50, seed=0)
sampledf2 = sampledf2.withColumnRenamed(' passenger_count',"passenger_count")
sampledf2 = sampledf2.withColumnRenamed(' trip_distance',"trip_distance")
sampledf2 = sampledf2.withColumnRenamed(' pickup_longitude',"pickup_longitude")
sampledf2 = sampledf2.withColumnRenamed(' pickup_latitude',"pickup_latitude")
sampledf2 = sampledf2.withColumnRenamed(' rate_code',"rate_code")
sampledf2 = sampledf2.withColumnRenamed(' store_and_fwd_flag',"store_and_fwd_flag")
sampledf2 = sampledf2.withColumnRenamed(' dropoff_longitude',"dropoff_longitude")
sampledf2 = sampledf2.withColumnRenamed(' dropoff_latitude',"dropoff_latitude")
sampledf2 = sampledf2.withColumnRenamed(' payment_type',"payment_type")
sampledf2 = sampledf2.withColumnRenamed(' fare_amount',"fare_amount")
sampledf2 = sampledf2.withColumnRenamed(' surcharge',"surcharge")
sampledf2 = sampledf2.withColumnRenamed(' mta_tax',"mta_tax")
sampledf2 = sampledf2.withColumnRenamed(' tip_amount',"tip_amount")
sampledf2 = sampledf2.withColumnRenamed(' tolls_amount',"tolls_amount")
sampledf2 = sampledf2.withColumnRenamed(' total_amount',"total_amount")
sampledf2 = sampledf2.withColumnRenamed(' pickup_datetime',"pickup_datetime")
sampledf2 = sampledf2.withColumnRenamed(' dropoff_datetime',"dropoff_datetime")
sampledf2.write.parquet("s3a://nycproject23/sampledrawdata/df2.parquet")
sampledf3 = spark.read.option("header",True).csv("s3a://nycproject23/rawdata/yellow_tripdata_2014-03.csv")
import pyspark.sql.functions as F
sampledf3 = sampledf3.sample(False, 0.50, seed=0)
sampledf3 = sampledf3.withColumnRenamed(' passenger_count',"passenger_count")
sampledf3 = sampledf3.withColumnRenamed(' trip_distance',"trip_distance")
sampledf3 = sampledf3.withColumnRenamed(' pickup_longitude',"pickup_longitude")
sampledf3 = sampledf3.withColumnRenamed(' pickup_latitude',"pickup_latitude")
sampledf3 = sampledf3.withColumnRenamed(' rate_code',"rate_code")
sampledf3 = sampledf3.withColumnRenamed(' store_and_fwd_flag',"store_and_fwd_flag")
sampledf3 = sampledf3.withColumnRenamed(' dropoff_longitude',"dropoff_longitude")
sampledf3 = sampledf3.withColumnRenamed(' dropoff_latitude',"dropoff_latitude")
sampledf3 = sampledf3.withColumnRenamed(' payment_type',"payment_type")
sampledf3 = sampledf3.withColumnRenamed(' fare_amount',"fare_amount")
sampledf3 = sampledf3.withColumnRenamed(' surcharge',"surcharge")
sampledf3 = sampledf3.withColumnRenamed(' mta_tax',"mta_tax")
sampledf3 = sampledf3.withColumnRenamed(' tip_amount',"tip_amount")
sampledf3 = sampledf3.withColumnRenamed(' tolls_amount',"tolls_amount")
sampledf3 = sampledf3.withColumnRenamed(' total_amount',"total_amount")
sampledf3 = sampledf3.withColumnRenamed(' pickup_datetime',"pickup_datetime")
sampledf3 = sampledf3.withColumnRenamed(' dropoff_datetime',"dropoff_datetime")
sampledf3.write.parquet("s3a://nycproject23/sampledrawdata/df3.parquet")
sampledf4 = spark.read.option("header",True).csv("s3a://nycproject23/rawdata/yellow_tripdata_2014-04.csv")
import pyspark.sql.functions as F
sampledf4 = sampledf4.sample(False, 0.50, seed=0)
sampledf4 = sampledf4.withColumnRenamed(' passenger_count',"passenger_count")
sampledf4 = sampledf4.withColumnRenamed(' trip_distance',"trip_distance")
sampledf4 = sampledf4.withColumnRenamed(' pickup_longitude',"pickup_longitude")
sampledf4 = sampledf4.withColumnRenamed(' pickup_latitude',"pickup_latitude")
sampledf4 = sampledf4.withColumnRenamed(' rate_code',"rate_code")
sampledf4 = sampledf4.withColumnRenamed(' store_and_fwd_flag',"store_and_fwd_flag")
sampledf4 = sampledf4.withColumnRenamed(' dropoff_longitude',"dropoff_longitude")
sampledf4 = sampledf4.withColumnRenamed(' dropoff_latitude',"dropoff_latitude")
sampledf4 = sampledf4.withColumnRenamed(' payment_type',"payment_type")
sampledf4 = sampledf4.withColumnRenamed(' fare_amount',"fare_amount")
sampledf4 = sampledf4.withColumnRenamed(' surcharge',"surcharge")
sampledf4 = sampledf4.withColumnRenamed(' mta_tax',"mta_tax")
sampledf4 = sampledf4.withColumnRenamed(' tip_amount',"tip_amount")
sampledf4 = sampledf4.withColumnRenamed(' tolls_amount',"tolls_amount")
sampledf4 = sampledf4.withColumnRenamed(' total_amount',"total_amount")
sampledf4 = sampledf4.withColumnRenamed(' pickup_datetime',"pickup_datetime")
sampledf4 = sampledf4.withColumnRenamed(' dropoff_datetime',"dropoff_datetime")
sampledf4.write.parquet("s3a://nycproject23/sampledrawdata/df4.parquet")
sampledf5 = spark.read.option("header",True).csv("s3a://nycproject23/rawdata/yellow_tripdata_2014-05.csv")
import pyspark.sql.functions as F
sampledf5 = sampledf5.sample(False, 0.50, seed=0)
sampledf5 = sampledf5.withColumnRenamed(' passenger_count',"passenger_count")
sampledf5 = sampledf5.withColumnRenamed(' trip_distance',"trip_distance")
sampledf5 = sampledf5.withColumnRenamed(' pickup_longitude',"pickup_longitude")
sampledf5 = sampledf5.withColumnRenamed(' pickup_latitude',"pickup_latitude")
sampledf5 = sampledf5.withColumnRenamed(' rate_code',"rate_code")
sampledf5 = sampledf5.withColumnRenamed(' store_and_fwd_flag',"store_and_fwd_flag")
sampledf5 = sampledf5.withColumnRenamed(' dropoff_longitude',"dropoff_longitude")
sampledf5 = sampledf5.withColumnRenamed(' dropoff_latitude',"dropoff_latitude")
sampledf5 = sampledf5.withColumnRenamed(' payment_type',"payment_type")
sampledf5 = sampledf5.withColumnRenamed(' fare_amount',"fare_amount")
sampledf5 = sampledf5.withColumnRenamed(' surcharge',"surcharge")
sampledf5 = sampledf5.withColumnRenamed(' mta_tax',"mta_tax")
sampledf5 = sampledf5.withColumnRenamed(' tip_amount',"tip_amount")
sampledf5 = sampledf5.withColumnRenamed(' tolls_amount',"tolls_amount")
sampledf5 = sampledf5.withColumnRenamed(' total_amount',"total_amount")
sampledf5 = sampledf5.withColumnRenamed(' pickup_datetime',"pickup_datetime")
sampledf5 = sampledf5.withColumnRenamed(' dropoff_datetime',"dropoff_datetime")
sampledf5.write.parquet("s3a://nycproject23/sampledrawdata/df5.parquet")
sampledf6 = spark.read.option("header",True).csv("s3a://nycproject23/rawdata/yellow_tripdata_2014-06.csv")
import pyspark.sql.functions as F
sampledf6 = sampledf6.sample(False, 0.50, seed=0)
sampledf6 = sampledf6.withColumnRenamed(' passenger_count',"passenger_count")
sampledf6 = sampledf6.withColumnRenamed(' trip_distance',"trip_distance")
sampledf6 = sampledf6.withColumnRenamed(' pickup_longitude',"pickup_longitude")
sampledf6 = sampledf6.withColumnRenamed(' pickup_latitude',"pickup_latitude")
sampledf6 = sampledf6.withColumnRenamed(' rate_code',"rate_code")
sampledf6 = sampledf6.withColumnRenamed(' store_and_fwd_flag',"store_and_fwd_flag")
sampledf6 = sampledf6.withColumnRenamed(' dropoff_longitude',"dropoff_longitude")
sampledf6 = sampledf6.withColumnRenamed(' dropoff_latitude',"dropoff_latitude")
sampledf6 = sampledf6.withColumnRenamed(' payment_type',"payment_type")
sampledf6 = sampledf6.withColumnRenamed(' fare_amount',"fare_amount")
sampledf6 = sampledf6.withColumnRenamed(' surcharge',"surcharge")
sampledf6 = sampledf6.withColumnRenamed(' mta_tax',"mta_tax")
sampledf6 = sampledf6.withColumnRenamed(' tip_amount',"tip_amount")
sampledf6 = sampledf6.withColumnRenamed(' tolls_amount',"tolls_amount")
sampledf6 = sampledf6.withColumnRenamed(' total_amount',"total_amount")
sampledf6 = sampledf6.withColumnRenamed(' pickup_datetime',"pickup_datetime")
sampledf6 = sampledf6.withColumnRenamed(' dropoff_datetime',"dropoff_datetime")
sampledf6.write.parquet("s3a://nycproject23/sampledrawdata/df6.parquet")
sampledf7 = spark.read.option("header",True).csv("s3a://nycproject23/rawdata/yellow_tripdata_2014-07.csv")
import pyspark.sql.functions as F
sampledf7 = sampledf7.sample(False, 0.50, seed=0)
sampledf7 = sampledf7.withColumnRenamed(' passenger_count',"passenger_count")
sampledf7 = sampledf7.withColumnRenamed(' trip_distance',"trip_distance")
sampledf7 = sampledf7.withColumnRenamed(' pickup_longitude',"pickup_longitude")
sampledf7 = sampledf7.withColumnRenamed(' pickup_latitude',"pickup_latitude")
sampledf7 = sampledf7.withColumnRenamed(' rate_code',"rate_code")
sampledf7 = sampledf7.withColumnRenamed(' store_and_fwd_flag',"store_and_fwd_flag")
sampledf7 = sampledf7.withColumnRenamed(' dropoff_longitude',"dropoff_longitude")
sampledf7 = sampledf7.withColumnRenamed(' dropoff_latitude',"dropoff_latitude")
sampledf7 = sampledf7.withColumnRenamed(' payment_type',"payment_type")
sampledf7 = sampledf7.withColumnRenamed(' fare_amount',"fare_amount")
sampledf7 = sampledf7.withColumnRenamed(' surcharge',"surcharge")
sampledf7 = sampledf7.withColumnRenamed(' mta_tax',"mta_tax")
sampledf7 = sampledf7.withColumnRenamed(' tip_amount',"tip_amount")
sampledf7 = sampledf7.withColumnRenamed(' tolls_amount',"tolls_amount")
sampledf7 = sampledf7.withColumnRenamed(' total_amount',"total_amount")
sampledf7 = sampledf7.withColumnRenamed(' pickup_datetime',"pickup_datetime")
sampledf7 = sampledf7.withColumnRenamed(' dropoff_datetime',"dropoff_datetime")
sampledf7.write.parquet("s3a://nycproject23/sampledrawdata/df7.parquet")
sampledf8 = spark.read.option("header",True).csv("s3a://nycproject23/rawdata/yellow_tripdata_2014-08.csv")
import pyspark.sql.functions as F
sampledf8 = sampledf8.sample(False, 0.50, seed=0)
sampledf8 = sampledf8.withColumnRenamed(' passenger_count',"passenger_count")
sampledf8 = sampledf8.withColumnRenamed(' trip_distance',"trip_distance")
sampledf8 = sampledf8.withColumnRenamed(' pickup_longitude',"pickup_longitude")
sampledf8 = sampledf8.withColumnRenamed(' pickup_latitude',"pickup_latitude")
sampledf8 = sampledf8.withColumnRenamed(' rate_code',"rate_code")
sampledf8 = sampledf8.withColumnRenamed(' store_and_fwd_flag',"store_and_fwd_flag")
sampledf8 = sampledf8.withColumnRenamed(' dropoff_longitude',"dropoff_longitude")
sampledf8 = sampledf8.withColumnRenamed(' dropoff_latitude',"dropoff_latitude")
sampledf8 = sampledf8.withColumnRenamed(' payment_type',"payment_type")
sampledf8 = sampledf8.withColumnRenamed(' fare_amount',"fare_amount")
sampledf8 = sampledf8.withColumnRenamed(' surcharge',"surcharge")
sampledf8 = sampledf8.withColumnRenamed(' mta_tax',"mta_tax")
sampledf8 = sampledf8.withColumnRenamed(' tip_amount',"tip_amount")
sampledf8 = sampledf8.withColumnRenamed(' tolls_amount',"tolls_amount")
sampledf8 = sampledf8.withColumnRenamed(' total_amount',"total_amount")
sampledf8 = sampledf8.withColumnRenamed(' pickup_datetime',"pickup_datetime")
sampledf8 = sampledf8.withColumnRenamed(' dropoff_datetime',"dropoff_datetime")
sampledf8.write.parquet("s3a://nycproject23/sampledrawdata/df8.parquet")
sampledf9 = spark.read.option("header",True).csv("s3a://nycproject23/rawdata/yellow_tripdata_2014-09.csv")
import pyspark.sql.functions as F
sampledf9 = sampledf9.sample(False, 0.50, seed=0)
sampledf9 = sampledf9.withColumnRenamed(' passenger_count',"passenger_count")
sampledf9 = sampledf9.withColumnRenamed(' trip_distance',"trip_distance")
sampledf9 = sampledf9.withColumnRenamed(' pickup_longitude',"pickup_longitude")
sampledf9 = sampledf9.withColumnRenamed(' pickup_latitude',"pickup_latitude")
sampledf9 = sampledf9.withColumnRenamed(' rate_code',"rate_code")
sampledf9 = sampledf9.withColumnRenamed(' store_and_fwd_flag',"store_and_fwd_flag")
sampledf9 = sampledf9.withColumnRenamed(' dropoff_longitude',"dropoff_longitude")
sampledf9 = sampledf9.withColumnRenamed(' dropoff_latitude',"dropoff_latitude")
sampledf9 = sampledf9.withColumnRenamed(' payment_type',"payment_type")
sampledf9 = sampledf9.withColumnRenamed(' fare_amount',"fare_amount")
sampledf9 = sampledf9.withColumnRenamed(' surcharge',"surcharge")
sampledf9 = sampledf9.withColumnRenamed(' mta_tax',"mta_tax")
sampledf9 = sampledf9.withColumnRenamed(' tip_amount',"tip_amount")
sampledf9 = sampledf9.withColumnRenamed(' tolls_amount',"tolls_amount")
sampledf9 = sampledf9.withColumnRenamed(' total_amount',"total_amount")
sampledf9 = sampledf9.withColumnRenamed(' pickup_datetime',"pickup_datetime")
sampledf9 = sampledf9.withColumnRenamed(' dropoff_datetime',"dropoff_datetime")
sampledf9.write.parquet("s3a://nycproject23/sampledrawdata/df9.parquet")
sampledf10 = spark.read.option("header",True).csv("s3a://nycproject23/rawdata/yellow_tripdata_2014-10.csv")
import pyspark.sql.functions as F
sampledf10 = sampledf10.sample(False, 0.50, seed=0)
sampledf10 = sampledf10.withColumnRenamed(' passenger_count',"passenger_count")
sampledf10 = sampledf10.withColumnRenamed(' trip_distance',"trip_distance")
sampledf10 = sampledf10.withColumnRenamed(' pickup_longitude',"pickup_longitude")
sampledf10 = sampledf10.withColumnRenamed(' pickup_latitude',"pickup_latitude")
sampledf10 = sampledf10.withColumnRenamed(' rate_code',"rate_code")
sampledf10 = sampledf10.withColumnRenamed(' store_and_fwd_flag',"store_and_fwd_flag")
sampledf10 = sampledf10.withColumnRenamed(' dropoff_longitude',"dropoff_longitude")
sampledf10 = sampledf10.withColumnRenamed(' dropoff_latitude',"dropoff_latitude")
sampledf10 = sampledf10.withColumnRenamed(' payment_type',"payment_type")
sampledf10 = sampledf10.withColumnRenamed(' fare_amount',"fare_amount")
sampledf10 = sampledf10.withColumnRenamed(' surcharge',"surcharge")
sampledf10 = sampledf10.withColumnRenamed(' mta_tax',"mta_tax")
sampledf10 = sampledf10.withColumnRenamed(' tip_amount',"tip_amount")
sampledf10 = sampledf10.withColumnRenamed(' tolls_amount',"tolls_amount")
sampledf10 = sampledf10.withColumnRenamed(' total_amount',"total_amount")
sampledf10 = sampledf10.withColumnRenamed(' pickup_datetime',"pickup_datetime")
sampledf10 = sampledf10.withColumnRenamed(' dropoff_datetime',"dropoff_datetime")
sampledf10.write.parquet("s3a://nycproject23/sampledrawdata/df10.parquet")
sampledf11 = spark.read.option("header",True).csv("s3a://nycproject23/rawdata/yellow_tripdata_2014-11.csv")
import pyspark.sql.functions as F
sampledf11 = sampledf11.sample(False, 0.50, seed=0)
sampledf11 = sampledf11.withColumnRenamed(' passenger_count',"passenger_count")
sampledf11 = sampledf11.withColumnRenamed(' trip_distance',"trip_distance")
sampledf11 = sampledf11.withColumnRenamed(' pickup_longitude',"pickup_longitude")
sampledf11 = sampledf11.withColumnRenamed(' pickup_latitude',"pickup_latitude")
sampledf11 = sampledf11.withColumnRenamed(' rate_code',"rate_code")
sampledf11 = sampledf11.withColumnRenamed(' store_and_fwd_flag',"store_and_fwd_flag")
sampledf11 = sampledf11.withColumnRenamed(' dropoff_longitude',"dropoff_longitude")
sampledf11 = sampledf11.withColumnRenamed(' dropoff_latitude',"dropoff_latitude")
sampledf11 = sampledf11.withColumnRenamed(' payment_type',"payment_type")
sampledf11 = sampledf11.withColumnRenamed(' fare_amount',"fare_amount")
sampledf11 = sampledf11.withColumnRenamed(' surcharge',"surcharge")
sampledf11 = sampledf11.withColumnRenamed(' mta_tax',"mta_tax")
sampledf11 = sampledf11.withColumnRenamed(' tip_amount',"tip_amount")
sampledf11 = sampledf11.withColumnRenamed(' tolls_amount',"tolls_amount")
sampledf11 = sampledf11.withColumnRenamed(' total_amount',"total_amount")
sampledf11 = sampledf11.withColumnRenamed(' pickup_datetime',"pickup_datetime")
sampledf11 = sampledf11.withColumnRenamed(' dropoff_datetime',"dropoff_datetime")
sampledf11.write.parquet("s3a://nycproject23/sampledrawdata/df11.parquet")
sampledf12 = spark.read.option("header",True).csv("s3a://nycproject23/rawdata/yellow_tripdata_2014-12.csv")
import pyspark.sql.functions as F
sampledf12 = sampledf12.sample(False, 0.50, seed=0)
sampledf12 = sampledf12.withColumnRenamed(' passenger_count',"passenger_count")
sampledf12 = sampledf12.withColumnRenamed(' trip_distance',"trip_distance")
sampledf12 = sampledf12.withColumnRenamed(' pickup_longitude',"pickup_longitude")
sampledf12 = sampledf12.withColumnRenamed(' pickup_latitude',"pickup_latitude")
sampledf12 = sampledf12.withColumnRenamed(' rate_code',"rate_code")
sampledf12 = sampledf12.withColumnRenamed(' store_and_fwd_flag',"store_and_fwd_flag")
sampledf12 = sampledf12.withColumnRenamed(' dropoff_longitude',"dropoff_longitude")
sampledf12 = sampledf12.withColumnRenamed(' dropoff_latitude',"dropoff_latitude")
sampledf12 = sampledf12.withColumnRenamed(' payment_type',"payment_type")
sampledf12 = sampledf12.withColumnRenamed(' fare_amount',"fare_amount")
sampledf12 = sampledf12.withColumnRenamed(' surcharge',"surcharge")
sampledf12 = sampledf12.withColumnRenamed(' mta_tax',"mta_tax")
sampledf12 = sampledf12.withColumnRenamed(' tip_amount',"tip_amount")
sampledf12 = sampledf12.withColumnRenamed(' tolls_amount',"tolls_amount")
sampledf12 = sampledf12.withColumnRenamed(' total_amount',"total_amount")
sampledf12 = sampledf12.withColumnRenamed(' pickup_datetime',"pickup_datetime")
sampledf12 = sampledf12.withColumnRenamed(' dropoff_datetime',"dropoff_datetime")
sampledf12.write.parquet("s3a://nycproject23/sampledrawdata/df12.parquet")
#Cleaning data
df1 = spark.read.option("header",True).parquet("s3a://nycproject23/sampledrawdata/*.parquet")
df2 = df1.filter("passenger_count != '208'")
df3 = df2.filter((df2.rate_code!='156') & (df2.rate_code!='208') & (df2.rate_code!='210') & (df2.rate_code!='28') & (df2.rate_code!='65') & (df2.rate_code!='77') & (df2.rate_code!='7') & (df2.rate_code!='8') & (df2.rate_code!='9') & (df2.rate_code!='0') & (df2.rate_code!='16'))
final_df = df3.drop('store_and_fwd_flag')
final_df = final_df.na.drop()
#Transformations on pickup and dropoff datetime column(timestamp).
from pyspark.sql.functions import *
df1 = final_df.withColumn('pickup_hour',hour(final_df.pickup_datetime))
df1 =df1.withColumn('dropoff_hour',hour(df1.dropoff_datetime))
df1 = df1.withColumn('pickup_day',date_format(col("pickup_datetime"),"EEEE"))
df1 = df1.withColumn('dropoff_day',date_format(col("dropoff_datetime"),"EEEE"))
df1 = df1.withColumn('pickup_year',year(df1.pickup_datetime))
df1 = df1.withColumn('dropoff_year',year(df1.dropoff_datetime))
df1 = df1.withColumn('pickup_month',month(df1.pickup_datetime))
df1 = df1.withColumn('dropoff_month',month(df1.dropoff_datetime))
df1.createOrReplaceTempView("taxitable")
sqlContext.sql("Create table nyctaxi select * from taxitable")