-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSnowflake_DW_Creation_Insertion_script.txt
315 lines (228 loc) · 8.59 KB
/
Snowflake_DW_Creation_Insertion_script.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
create schema asvc_project
--creates a storage integration to set up integration bw aws and Snowflake.
CREATE OR REPLACE STORAGE integration aws_s3_integration
type = external_stage
storage_provider='S3'
enabled=true
storage_aws_role_arn= 'arn:aws:iam::649069303992:role/asvc-project'
storage_allowed_locations= ('s3://asvc-project/');
SHOW INTEGRATIONS;
GRANT USAGE ON INTEGRATION aws_s3_integration TO ROLE accountadmin;
--creates a snowflake stage
CREATE OR REPLACE STAGE demo_aws_stage
storage_integration = aws_s3_integration
file_format = demo_format
url='s3://asvc-project/';
List @demo_aws_stage
CREATE FILE FORMAT demo_format1
TYPE = CSV
FIELD_DELIMITER = ';'
RECORD_DELIMITER = '\n'
SKIP_HEADER = 1
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
DATE_FORMAT = 'YYYY-MM-DD';
--Start of DDL
CREATE TABLE dim_asvc_ctgry (
cat_id VARCHAR2(10) NOT NULL,
cat_name VARCHAR2(20) NOT NULL,
descp VARCHAR2(50)
);
CREATE TABLE dim_asvc_customer (
cust_id varchar(50) NULL,
address_id varchar(50) ,
cust_fname VARCHAR2(200) NULL,
cust_mname VARCHAR2(100),
cust_lname VARCHAR2(200) ,
street VARCHAR2(200) ,
city VARCHAR2(100) ,
state VARCHAR2(100) ,
zipcode VARCHAR2(10) ,
country VARCHAR2(100) ,
segment VARCHAR2(50)
);
CREATE TABLE fact_asvc_order (
ordr_id varchar(50) NOT NULL,
order_date DATE NOT NULL,
order_priority VARCHAR(20) NOT NULL,
qty NUMERIC(10) NOT NULL,
ship_date Date ,
ship_cost decimal(10,4) NOT NULL,
mode_of_ship varchar(15),
discount decimal(10,4) NOT NULL,
profit decimal(10,4) NOT NULL,
sale_price decimal(10,4) NOT NULL,
address_id varchar(50),
market_id varchar(20),
region_id varchar(15),
prdct_id varchar(20) NOT NULL,
cat_id VARCHAR(10) NOT NULL,
cust_id VARCHAR(50) NOT NULL,
subcat_id varchar(35) NOT NULL,
subcat_name varchar(20),
returned CHAR(4)
);
CREATE TABLE dim_asvc_date (
date_id NUMBER(10) NOT NULL,
entry_id NUMBER(10),
full_date DATE,
days NUMBER(5),
month_short VARCHAR2(5),
month_num NUMBER(3),
month_long VARCHAR2(10),
day_of_week_short VARCHAR2(5),
day_of_week_long VARCHAR2(10),
year VARCHAR2(5),
quarter VARCHAR2(10)
);
ALTER TABLE dim_asvc_date ADD CONSTRAINT dim_asvc_date_pk PRIMARY KEY ( date_id );
CREATE TABLE dim_asvc_market (
market_id NUMBER(10) NOT NULL,
market_name VARCHAR2(15) NOT NULL
);
ALTER TABLE dim_asvc_market ADD CONSTRAINT dim_asvc_market_pk PRIMARY KEY ( market_id );
CREATE TABLE dim_asvc_prdct (
prdct_id NUMBER(10) NOT NULL,
prdct_name VARCHAR2(15) NOT NULL,
prdct_descp VARCHAR2(50)
);
ALTER TABLE dim_asvc_prdct ADD CONSTRAINT dim_asvc_prdct_pk PRIMARY KEY ( prdct_id );
CREATE TABLE dim_asvc_region (
region_id NUMBER(10) NOT NULL,
region_name VARCHAR2(20) NOT NULL,
tbl_last_dt DATE NOT NULL,
market_id VARCHAR2(4) NOT NULL
);
-- End OF DDL
COPY INTO dim_asvc_ctgry
FROM @demo_aws_stage/PROJECT/asvc_ctgry.csv
FILE_FORMAT=(format_name=demo_format1 error_on_column_count_mismatch=false)
ON_ERROR = 'CONTINUE';
select * from dim_asvc_ctgry;
COPY INTO dim_asvc_region from (select c.$1,c.$3,c.$2,c.$4
FROM @demo_aws_stage/PROJECT/asvc_region.csv c)
FILE_FORMAT=(format_name=dm3 error_on_column_count_mismatch=false)
ON_ERROR = 'CONTINUE';
COPY INTO dim_asvc_market
FROM @demo_aws_stage/PROJECT/asvc_market.csv
FILE_FORMAT=(format_name=demo_format2 error_on_column_count_mismatch=false)
ON_ERROR = 'CONTINUE';
select * from dim_asvc_market;
COPY INTO dim_asvc_region from (select c.$1,c.$4,c.$2
FROM @demo_aws_stage/PROJECT/asvc_region.csv c)
FILE_FORMAT=(format_name=demo_format2 error_on_column_count_mismatch=false)
ON_ERROR = 'CONTINUE';
select * from dim_asvc_region;
COPY INTO fact_asvc_order
FROM @demo_aws_stage/PROJECT/asvc_ordr_prdct
FILE_FORMAT=(format_name=demo_format2 error_on_column_count_mismatch=false)
ON_ERROR = 'CONTINUE';
select * from fact_asvc_order
COPY INTO dim_asvc_subcat from (select c.$1,c.$3,c.$4
FROM @demo_aws_stage/PROJECT/asvc_subcat.csv c)
FILE_FORMAT=(format_name=demo_format2 error_on_column_count_mismatch=false)
ON_ERROR = 'CONTINUE';
select * from dim_asvc_subcat
COPY INTO dim_asvc_prdct from (select c.$1,c.$3,c.$4
FROM @demo_aws_stage/PROJECT/asvc_prdct.csv c)
FILE_FORMAT=(format_name=demo_format2 error_on_column_count_mismatch=false)
ON_ERROR = 'CONTINUE';
--External table
create or replace external table external_asvc_ordr_prdct
(ROWNUM int as (value:c1::int),
QTY INT as ( value:c2::INT),
SHIP_COST FLOAT as (value:c3::FLOAT),
DISCOUNT FLOAT AS (value:c4::FLOAT),
PROFIT FLOAT AS (value:c5::FLOAT),
SALE_PRICE FLOAT AS (value:c6::FLOAT),
PRDCT_ID VARCHAR(15) AS (value:c8::VARCHAR))
with location = @demo_aws_stage
file_format = demo_format2
PATTERN= '.*asvc_ordr_prdct.csv';
SELECT * from asvc_ordr_prdct;
SELECT *
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = 'ASVC_PROJECT'
AND LAST_ALTERED >= DATEADD(day, -15, CURRENT_TIMESTAMP())
ORDER BY LAST_ALTERED DESC;
-- Incremental ETL
select * from fact_asvc_order
where ordr_id= 'AE-2012-PO8865138-41184'
-- Creating stage table to store the new changes.
CREATE OR REPLACE TABLE stg_incrmntl_fact_table AS
SELECT *
FROM fact_asvc_order
WHERE 1=0;
COPY INTO stg_incrmntl_fact_table
FROM @demo_aws_stage/PROJECT/incremental.csv
FILE_FORMAT=(format_name=FIN error_on_column_count_mismatch=false)
ON_ERROR = 'CONTINUE';
select * from stg_incrmntl_fact_table;
-- Performing merging between the stage and actual fact table.
CREATE OR REPLACE PROCEDURE load_merge_fact_asvc_order()
RETURNS VARCHAR
LANGUAGE JAVASCRIPT
AS $$
var err_code;
var err_msg;
try {
var stmt = snowflake.createStatement({sqlText:`
MERGE INTO fact_asvc_order a
USING stg_incrmntl_fact_table b
ON (a.ordr_id = b.ordr_id) AND (a.prdct_id = b.prdct_id)
WHEN MATCHED THEN
UPDATE SET
a.order_date = b.order_date,
a.order_priority = b.order_priority,
a.qty = b.qty,
a.ship_date = b.ship_date,
a.ship_cost = b.ship_cost,
a.mode_of_ship = b.mode_of_ship,
a.discount = b.discount,
a.profit = b.profit,
a.sale_price = b.sale_price,
a.address_id = b.address_id,
a.market_id = b.market_id,
a.region_id = b.region_id,
a.cat_id = b.cat_id,
a.cust_id = b.cust_id,
a.subcat_id = b.subcat_id,
a.subcat_name = b.subcat_name,
a.returned = b.returned
WHEN NOT MATCHED THEN
INSERT (ordr_id, order_date, order_priority, qty, ship_date, ship_cost, mode_of_ship, discount, profit, sale_price, address_id, market_id, region_id, prdct_id, cat_id, cust_id, subcat_id, subcat_name, returned)
VALUES (b.ordr_id, b.order_date, b.order_priority, b.qty, b.ship_date, b.ship_cost, b.mode_of_ship, b.discount, b.profit, b.sale_price, b.address_id, b.market_id, b.region_id, b.prdct_id, b.cat_id, b.cust_id, b.subcat_id, b.subcat_name, b.returned);
`});
var result = stmt.execute();
return result.next() ? result.getColumnValue(1).toString() : 'Success';
} catch (err) {
err_code = err.code;
err_msg = err.message;
return 'Error code ' + err_code + ': ' + err_msg;
}
$$;
CALL load_merge_fact_asvc_order();
select * from fact_asvc_order
where ordr_id= 'AE-2012-PO8865138-41184'
select * from fact_asvc_order
-- Table Partitioning
-- select * from dim_asvc_customer
CREATE TABLE partitioning_fact_asvc cluster by (MARKET_ID) AS
(SELECT * FROM fact_asvc_order);
SELECT SYSTEM$CLUSTERING_INFORMATION('partitioning_fact_asvc');
SHOW TABLES LIKE 'partitioning_fact_asvc';
-- Code to verify that the number of clusters are dependent on data.
CREATE OR REPLACE TABLE CLUSTERING_DEMO cluster by (TENANT_ID)(
ID NUMBER(10,0),
TENANT_ID NUMBER(4,0),
KEY VARCHAR(4),
VALUE VARCHAR(255)
);
-- Insert dummy data into dummy table.
INSERT OVERWRITE INTO CLUSTERING_DEMO
SELECT
seq4() AS ID,
uniform(1, 1000, random()) AS TENANT_ID,
randstr(4, random()) AS KEY,
randstr(255, random()) AS VALUE
FROM table(generator(rowcount => 250000));
SELECT SYSTEM$CLUSTERING_INFORMATION('CLUSTERING_DEMO');