forked from apache/datafusion
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinsert_to_external.slt
350 lines (297 loc) · 14.6 KB
/
insert_to_external.slt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
###################################
## INSERT to external table tests##
###################################
statement ok
CREATE EXTERNAL TABLE aggregate_test_100 (
c1 VARCHAR NOT NULL,
c2 TINYINT NOT NULL,
c3 SMALLINT NOT NULL,
c4 SMALLINT,
c5 INT,
c6 BIGINT NOT NULL,
c7 SMALLINT NOT NULL,
c8 INT NOT NULL,
c9 BIGINT UNSIGNED NOT NULL,
c10 VARCHAR NOT NULL,
c11 FLOAT NOT NULL,
c12 DOUBLE NOT NULL,
c13 VARCHAR NOT NULL
)
STORED AS CSV
WITH HEADER ROW
LOCATION '../../testing/data/csv/aggregate_test_100.csv'
# test_insert_into
statement ok
set datafusion.execution.target_partitions = 8;
statement ok
CREATE EXTERNAL TABLE
ordered_insert_test(a bigint, b bigint)
STORED AS csv
LOCATION 'test_files/scratch/insert_to_external/insert_to_ordered/'
WITH ORDER (a ASC, B DESC)
OPTIONS(
create_local_path 'true',
insert_mode 'append_new_files',
);
query TT
EXPLAIN INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8), (7,9), (7,10), (3, 3), (2, 4), (1, 5);
----
logical_plan
Dml: op=[Insert Into] table=[ordered_insert_test]
--Projection: column1 AS a, column2 AS b
----Values: (Int64(5), Int64(1)), (Int64(4), Int64(2)), (Int64(7), Int64(7)), (Int64(7), Int64(8)), (Int64(7), Int64(9))...
physical_plan
InsertExec: sink=CsvSink(writer_mode=PutMultipart, file_groups=[])
--SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC]
----ProjectionExec: expr=[column1@0 as a, column2@1 as b]
------ValuesExec
query II
INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8), (7,9), (7,10), (3, 3), (2, 4), (1, 5);
----
9
query II
SELECT * from ordered_insert_test;
----
1 5
2 4
3 3
4 2
5 1
7 10
7 9
7 8
7 7
statement ok
CREATE EXTERNAL TABLE
single_file_test(a bigint, b bigint)
STORED AS csv
LOCATION 'test_files/scratch/insert_to_external/single_csv_table.csv'
OPTIONS(
create_local_path 'true',
single_file 'true',
);
query II
INSERT INTO single_file_test values (1, 2), (3, 4);
----
2
query II
select * from single_file_test;
----
1 2
3 4
statement ok
CREATE EXTERNAL TABLE
directory_test(a bigint, b bigint)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q0'
OPTIONS(
create_local_path 'true',
);
query II
INSERT INTO directory_test values (1, 2), (3, 4);
----
2
query II
select * from directory_test;
----
1 2
3 4
statement ok
CREATE EXTERNAL TABLE
table_without_values(field1 BIGINT NULL, field2 BIGINT NULL)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q1'
OPTIONS (create_local_path 'true');
query TT
EXPLAIN
INSERT INTO table_without_values SELECT
SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),
COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
FROM aggregate_test_100
ORDER by c1
----
logical_plan
Dml: op=[Insert Into] table=[table_without_values]
--Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2
----Sort: aggregate_test_100.c1 ASC NULLS LAST
------Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, aggregate_test_100.c1
--------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
physical_plan
InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
--ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2]
----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, c1@0 as c1]
--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
----------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
------------CoalesceBatchesExec: target_batch_size=8192
--------------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8
----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c4, c9], has_header=true
query II
INSERT INTO table_without_values SELECT
SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),
COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
FROM aggregate_test_100
ORDER by c1
----
100
# verify there is data now in the table
query I
SELECT COUNT(*) from table_without_values;
----
100
# verify there is data now in the table
query II
SELECT *
FROM table_without_values
ORDER BY field1, field2
LIMIT 5;
----
-70111 3
-65362 3
-62295 3
-56721 3
-55414 3
statement ok
drop table table_without_values;
# test_insert_into_as_select_multi_partitioned
statement ok
CREATE EXTERNAL TABLE
table_without_values(field1 BIGINT NULL, field2 BIGINT NULL)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q2'
OPTIONS (create_local_path 'true');
query TT
EXPLAIN
INSERT INTO table_without_values SELECT
SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
FROM aggregate_test_100
----
logical_plan
Dml: op=[Insert Into] table=[table_without_values]
--Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2
----WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
------TableScan: aggregate_test_100 projection=[c1, c4, c9]
physical_plan
InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
--ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]
----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8
------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c4, c9], has_header=true
query II
INSERT INTO table_without_values SELECT
SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
FROM aggregate_test_100
----
100
statement ok
drop table table_without_values;
# test_insert_into_with_sort
statement ok
CREATE EXTERNAL TABLE
table_without_values(c1 varchar NULL)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q3'
OPTIONS (create_local_path 'true');
# verify that the sort order of the insert query is maintained into the
# insert (there should be a SortExec in the following plan)
# See https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178 for more background
query TT
explain insert into table_without_values select c1 from aggregate_test_100 order by c1;
----
logical_plan
Dml: op=[Insert Into] table=[table_without_values]
--Projection: aggregate_test_100.c1 AS c1
----Sort: aggregate_test_100.c1 ASC NULLS LAST
------TableScan: aggregate_test_100 projection=[c1]
physical_plan
InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
--ProjectionExec: expr=[c1@0 as c1]
----SortExec: expr=[c1@0 ASC NULLS LAST]
------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true
query T
insert into table_without_values select c1 from aggregate_test_100 order by c1;
----
100
query I
select count(*) from table_without_values;
----
100
statement ok
drop table table_without_values;
# test insert with column names
statement ok
CREATE EXTERNAL TABLE
table_without_values(id BIGINT, name varchar)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q4'
OPTIONS (create_local_path 'true');
query IT
insert into table_without_values(id, name) values(1, 'foo');
----
1
query IT
insert into table_without_values(name, id) values('bar', 2);
----
1
statement error Schema error: Schema contains duplicate unqualified field name id
insert into table_without_values(id, id) values(3, 3);
statement error Arrow error: Cast error: Cannot cast string 'zoo' to value of Int64 type
insert into table_without_values(name, id) values(4, 'zoo');
statement error Error during planning: Column count doesn't match insert query!
insert into table_without_values(id) values(4, 'zoo');
statement error Error during planning: Inserting query must have the same schema with the table.
insert into table_without_values(id) values(4);
query IT rowsort
select * from table_without_values;
----
1 foo
2 bar
statement ok
drop table table_without_values;
# test insert with non-nullable column
statement ok
CREATE EXTERNAL TABLE
table_without_values(field1 BIGINT NOT NULL, field2 BIGINT NULL)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q5'
OPTIONS (create_local_path 'true');
query II
insert into table_without_values values(1, 100);
----
1
query II
insert into table_without_values values(2, NULL);
----
1
statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
insert into table_without_values values(NULL, 300);
statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
insert into table_without_values values(3, 300), (NULL, 400);
query II rowsort
select * from table_without_values;
----
1 100
2 NULL
statement ok
drop table table_without_values;