-
Notifications
You must be signed in to change notification settings - Fork 240
/
Copy pathimpl.py
548 lines (490 loc) · 20.6 KB
/
impl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
import re
import requests
import time
import base64
from concurrent.futures import Future
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Union
from typing_extensions import TypeAlias
import agate
from dbt.contracts.relation import RelationType
import dbt
import dbt.exceptions
from dbt.adapters.base import AdapterConfig
from dbt.adapters.base.impl import catch_as_completed, log_code_execution
from dbt.adapters.base.meta import available
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.spark import SparkConnectionManager
from dbt.adapters.spark import SparkRelation
from dbt.adapters.spark import SparkColumn
from dbt.adapters.base import BaseRelation
from dbt.clients.agate_helper import DEFAULT_TYPE_TESTER
from dbt.events import AdapterLogger
from dbt.utils import executor
logger = AdapterLogger("Spark")
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME = "get_columns_in_relation_raw"
LIST_SCHEMAS_MACRO_NAME = "list_schemas"
LIST_RELATIONS_MACRO_NAME = "list_relations_without_caching"
DROP_RELATION_MACRO_NAME = "drop_relation"
FETCH_TBL_PROPERTIES_MACRO_NAME = "fetch_tbl_properties"
KEY_TABLE_OWNER = "Owner"
KEY_TABLE_STATISTICS = "Statistics"
@dataclass
class SparkConfig(AdapterConfig):
file_format: str = "parquet"
location_root: Optional[str] = None
partition_by: Optional[Union[List[str], str]] = None
clustered_by: Optional[Union[List[str], str]] = None
buckets: Optional[int] = None
options: Optional[Dict[str, str]] = None
merge_update_columns: Optional[str] = None
class SparkAdapter(SQLAdapter):
COLUMN_NAMES = (
"table_database",
"table_schema",
"table_name",
"table_type",
"table_comment",
"table_owner",
"column_name",
"column_index",
"column_type",
"column_comment",
"stats:bytes:label",
"stats:bytes:value",
"stats:bytes:description",
"stats:bytes:include",
"stats:rows:label",
"stats:rows:value",
"stats:rows:description",
"stats:rows:include",
)
INFORMATION_COLUMNS_REGEX = re.compile(r"^ \|-- (.*): (.*) \(nullable = (.*)\b", re.MULTILINE)
INFORMATION_OWNER_REGEX = re.compile(r"^Owner: (.*)$", re.MULTILINE)
INFORMATION_STATISTICS_REGEX = re.compile(r"^Statistics: (.*)$", re.MULTILINE)
HUDI_METADATA_COLUMNS = [
"_hoodie_commit_time",
"_hoodie_commit_seqno",
"_hoodie_record_key",
"_hoodie_partition_path",
"_hoodie_file_name",
]
Relation: TypeAlias = SparkRelation
Column: TypeAlias = SparkColumn
ConnectionManager: TypeAlias = SparkConnectionManager
AdapterSpecificConfigs: TypeAlias = SparkConfig
@classmethod
def date_function(cls) -> str:
return "current_timestamp()"
@classmethod
def convert_text_type(cls, agate_table, col_idx):
return "string"
@classmethod
def convert_number_type(cls, agate_table, col_idx):
decimals = agate_table.aggregate(agate.MaxPrecision(col_idx))
return "double" if decimals else "bigint"
@classmethod
def convert_date_type(cls, agate_table, col_idx):
return "date"
@classmethod
def convert_time_type(cls, agate_table, col_idx):
return "time"
@classmethod
def convert_datetime_type(cls, agate_table, col_idx):
return "timestamp"
def quote(self, identifier):
return "`{}`".format(identifier)
def add_schema_to_cache(self, schema) -> str:
"""Cache a new schema in dbt. It will show up in `list relations`."""
if schema is None:
name = self.nice_connection_name()
dbt.exceptions.raise_compiler_error(
"Attempted to cache a null schema for {}".format(name)
)
if dbt.flags.USE_CACHE:
self.cache.add_schema(None, schema)
# so jinja doesn't render things
return ""
def list_relations_without_caching(
self, schema_relation: SparkRelation
) -> List[SparkRelation]:
kwargs = {"schema_relation": schema_relation}
try:
results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
except dbt.exceptions.RuntimeException as e:
errmsg = getattr(e, "msg", "")
if f"Database '{schema_relation}' not found" in errmsg:
return []
else:
description = "Error while retrieving information about"
logger.debug(f"{description} {schema_relation}: {e.msg}")
return []
relations = []
for row in results:
if len(row) != 4:
raise dbt.exceptions.RuntimeException(
f'Invalid value from "show table extended ...", '
f"got {len(row)} values, expected 4"
)
_schema, name, _, information = row
rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table
is_delta = "Provider: delta" in information
is_hudi = "Provider: hudi" in information
relation = self.Relation.create(
schema=_schema,
identifier=name,
type=rel_type,
information=information,
is_delta=is_delta,
is_hudi=is_hudi,
)
relations.append(relation)
return relations
def get_relation(self, database: str, schema: str, identifier: str) -> Optional[BaseRelation]:
if not self.Relation.include_policy.database:
database = None # type: ignore
return super().get_relation(database, schema, identifier)
def parse_describe_extended(
self, relation: Relation, raw_rows: List[agate.Row]
) -> List[SparkColumn]:
# Convert the Row to a dict
dict_rows = [dict(zip(row._keys, row._values)) for row in raw_rows]
# Find the separator between the rows and the metadata provided
# by the DESCRIBE TABLE EXTENDED statement
pos = self.find_table_information_separator(dict_rows)
# Remove rows that start with a hash, they are comments
rows = [row for row in raw_rows[0:pos] if not row["col_name"].startswith("#")]
metadata = {col["col_name"]: col["data_type"] for col in raw_rows[pos + 1 :]}
raw_table_stats = metadata.get(KEY_TABLE_STATISTICS)
table_stats = SparkColumn.convert_table_stats(raw_table_stats)
return [
SparkColumn(
table_database=None,
table_schema=relation.schema,
table_name=relation.name,
table_type=relation.type,
table_owner=str(metadata.get(KEY_TABLE_OWNER)),
table_stats=table_stats,
column=column["col_name"],
column_index=idx,
dtype=column["data_type"],
)
for idx, column in enumerate(rows)
]
@staticmethod
def find_table_information_separator(rows: List[dict]) -> int:
pos = 0
for row in rows:
if not row["col_name"] or row["col_name"].startswith("#"):
break
pos += 1
return pos
def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]:
cached_relations = self.cache.get_relations(relation.database, relation.schema)
cached_relation = next(
(
cached_relation
for cached_relation in cached_relations
if str(cached_relation) == str(relation)
),
None,
)
columns = []
if cached_relation and cached_relation.information:
columns = self.parse_columns_from_information(cached_relation)
if not columns:
# in open source delta 'show table extended' query output doesnt
# return relation's schema. if columns are empty from cache,
# use get_columns_in_relation spark macro
# which would execute 'describe extended tablename' query
try:
rows: List[agate.Row] = self.execute_macro(
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, kwargs={"relation": relation}
)
columns = self.parse_describe_extended(relation, rows)
except dbt.exceptions.RuntimeException as e:
# spark would throw error when table doesn't exist, where other
# CDW would just return and empty list, normalizing the behavior here
errmsg = getattr(e, "msg", "")
if "Table or view not found" in errmsg or "NoSuchTableException" in errmsg:
pass
else:
raise e
# strip hudi metadata columns.
columns = [x for x in columns if x.name not in self.HUDI_METADATA_COLUMNS]
return columns
def parse_columns_from_information(self, relation: SparkRelation) -> List[SparkColumn]:
owner_match = re.findall(self.INFORMATION_OWNER_REGEX, relation.information)
owner = owner_match[0] if owner_match else None
matches = re.finditer(self.INFORMATION_COLUMNS_REGEX, relation.information)
columns = []
stats_match = re.findall(self.INFORMATION_STATISTICS_REGEX, relation.information)
raw_table_stats = stats_match[0] if stats_match else None
table_stats = SparkColumn.convert_table_stats(raw_table_stats)
for match_num, match in enumerate(matches):
column_name, column_type, nullable = match.groups()
column = SparkColumn(
table_database=None,
table_schema=relation.schema,
table_name=relation.table,
table_type=relation.type,
column_index=match_num,
table_owner=owner,
column=column_name,
dtype=column_type,
table_stats=table_stats,
)
columns.append(column)
return columns
def _get_columns_for_catalog(self, relation: SparkRelation) -> Iterable[Dict[str, Any]]:
columns = self.parse_columns_from_information(relation)
for column in columns:
# convert SparkColumns into catalog dicts
as_dict = column.to_column_dict()
as_dict["column_name"] = as_dict.pop("column", None)
as_dict["column_type"] = as_dict.pop("dtype")
as_dict["table_database"] = None
yield as_dict
def get_properties(self, relation: Relation) -> Dict[str, str]:
properties = self.execute_macro(
FETCH_TBL_PROPERTIES_MACRO_NAME, kwargs={"relation": relation}
)
return dict(properties)
def get_catalog(self, manifest):
schema_map = self._get_catalog_schemas(manifest)
if len(schema_map) > 1:
dbt.exceptions.raise_compiler_error(
f"Expected only one database in get_catalog, found " f"{list(schema_map)}"
)
with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = []
for info, schemas in schema_map.items():
for schema in schemas:
futures.append(
tpe.submit_connected(
self,
schema,
self._get_one_catalog,
info,
[schema],
manifest,
)
)
catalogs, exceptions = catch_as_completed(futures)
return catalogs, exceptions
def _get_one_catalog(
self,
information_schema,
schemas,
manifest,
) -> agate.Table:
if len(schemas) != 1:
dbt.exceptions.raise_compiler_error(
f"Expected only one schema in spark _get_one_catalog, found " f"{schemas}"
)
database = information_schema.database
schema = list(schemas)[0]
columns: List[Dict[str, Any]] = []
for relation in self.list_relations(database, schema):
logger.debug("Getting table schema for relation {}", relation)
columns.extend(self._get_columns_for_catalog(relation))
return agate.Table.from_object(columns, column_types=DEFAULT_TYPE_TESTER)
def check_schema_exists(self, database, schema):
results = self.execute_macro(LIST_SCHEMAS_MACRO_NAME, kwargs={"database": database})
exists = True if schema in [row[0] for row in results] else False
return exists
def get_rows_different_sql(
self,
relation_a: BaseRelation,
relation_b: BaseRelation,
column_names: Optional[List[str]] = None,
except_operator: str = "EXCEPT",
) -> str:
"""Generate SQL for a query that returns a single row with a two
columns: the number of rows that are different between the two
relations and the number of mismatched rows.
"""
# This method only really exists for test reasons.
names: List[str]
if column_names is None:
columns = self.get_columns_in_relation(relation_a)
names = sorted((self.quote(c.name) for c in columns))
else:
names = sorted((self.quote(n) for n in column_names))
columns_csv = ", ".join(names)
sql = COLUMNS_EQUAL_SQL.format(
columns=columns_csv,
relation_a=str(relation_a),
relation_b=str(relation_b),
)
return sql
# This is for use in the test suite
# Spark doesn't have 'commit' and 'rollback', so this override
# doesn't include those commands.
def run_sql_for_tests(self, sql, fetch, conn):
cursor = conn.handle.cursor()
try:
cursor.execute(sql)
if fetch == "one":
if hasattr(cursor, "fetchone"):
return cursor.fetchone()
else:
# AttributeError: 'PyhiveConnectionWrapper' object has no attribute 'fetchone'
return cursor.fetchall()[0]
elif fetch == "all":
return cursor.fetchall()
else:
return
except BaseException as e:
print(sql)
print(e)
raise
finally:
conn.transaction_open = False
@available.parse_none
@log_code_execution
def submit_python_job(self, parsed_model: dict, compiled_code: str, timeout=None):
# TODO improve the typing here. N.B. Jinja returns a `jinja2.runtime.Undefined` instead
# of `None` which evaluates to True!
# TODO limit this function to run only when doing the materialization of python nodes
# assuming that for python job running over 1 day user would mannually overwrite this
schema = getattr(parsed_model, "schema", self.config.credentials.schema)
identifier = parsed_model["alias"]
if not timeout:
timeout = 60 * 60 * 24
if timeout <= 0:
raise ValueError("Timeout must larger than 0")
auth_header = {"Authorization": f"Bearer {self.connections.profile.credentials.token}"}
# create new dir
if not self.connections.profile.credentials.user:
raise ValueError("Need to supply user in profile to submit python job")
# it is safe to call mkdirs even if dir already exists and have content inside
work_dir = f"/Users/{self.connections.profile.credentials.user}/{schema}"
response = requests.post(
f"https://{self.connections.profile.credentials.host}/api/2.0/workspace/mkdirs",
headers=auth_header,
json={
"path": work_dir,
},
)
if response.status_code != 200:
raise dbt.exceptions.RuntimeException(
f"Error creating work_dir for python notebooks\n {response.content!r}"
)
# add notebook
b64_encoded_content = base64.b64encode(compiled_code.encode()).decode()
response = requests.post(
f"https://{self.connections.profile.credentials.host}/api/2.0/workspace/import",
headers=auth_header,
json={
"path": f"{work_dir}/{identifier}",
"content": b64_encoded_content,
"language": "PYTHON",
"overwrite": True,
"format": "SOURCE",
},
)
if response.status_code != 200:
raise dbt.exceptions.RuntimeException(
f"Error creating python notebook.\n {response.content!r}"
)
# submit job
submit_response = requests.post(
f"https://{self.connections.profile.credentials.host}/api/2.1/jobs/runs/submit",
headers=auth_header,
json={
"run_name": "debug task",
"existing_cluster_id": self.connections.profile.credentials.cluster,
"notebook_task": {
"notebook_path": f"{work_dir}/{identifier}",
},
},
)
if submit_response.status_code != 200:
raise dbt.exceptions.RuntimeException(
f"Error creating python run.\n {response.content!r}"
)
# poll until job finish
state = None
start = time.time()
run_id = submit_response.json()["run_id"]
terminal_states = ["TERMINATED", "SKIPPED", "INTERNAL_ERROR"]
while state not in terminal_states and time.time() - start < timeout:
time.sleep(1)
resp = requests.get(
f"https://{self.connections.profile.credentials.host}"
f"/api/2.1/jobs/runs/get?run_id={run_id}",
headers=auth_header,
)
json_resp = resp.json()
state = json_resp["state"]["life_cycle_state"]
# logger.debug(f"Polling.... in state: {state}")
if state != "TERMINATED":
raise dbt.exceptions.RuntimeException(
"python model run ended in state"
f"{state} with state_message\n{json_resp['state']['state_message']}"
)
# get end state to return to user
run_output = requests.get(
f"https://{self.connections.profile.credentials.host}"
f"/api/2.1/jobs/runs/get-output?run_id={run_id}",
headers=auth_header,
)
json_run_output = run_output.json()
result_state = json_run_output["metadata"]["state"]["result_state"]
if result_state != "SUCCESS":
raise dbt.exceptions.RuntimeException(
"Python model failed with traceback as:\n"
"(Note that the line number here does not "
"match the line number in your code due to dbt templating)\n"
f"{json_run_output['error_trace']}"
)
return self.connections.get_response(None)
def standardize_grants_dict(self, grants_table: agate.Table) -> dict:
grants_dict: Dict[str, List[str]] = {}
for row in grants_table:
grantee = row["Principal"]
privilege = row["ActionType"]
object_type = row["ObjectType"]
# we only want to consider grants on this object
# (view or table both appear as 'TABLE')
# and we don't want to consider the OWN privilege
if object_type == "TABLE" and privilege != "OWN":
if privilege in grants_dict.keys():
grants_dict[privilege].append(grantee)
else:
grants_dict.update({privilege: [grantee]})
return grants_dict
# spark does something interesting with joins when both tables have the same
# static values for the join condition and complains that the join condition is
# "trivial". Which is true, though it seems like an unreasonable cause for
# failure! It also doesn't like the `from foo, bar` syntax as opposed to
# `from foo cross join bar`.
COLUMNS_EQUAL_SQL = """
with diff_count as (
SELECT
1 as id,
COUNT(*) as num_missing FROM (
(SELECT {columns} FROM {relation_a} EXCEPT
SELECT {columns} FROM {relation_b})
UNION ALL
(SELECT {columns} FROM {relation_b} EXCEPT
SELECT {columns} FROM {relation_a})
) as a
), table_a as (
SELECT COUNT(*) as num_rows FROM {relation_a}
), table_b as (
SELECT COUNT(*) as num_rows FROM {relation_b}
), row_count_diff as (
select
1 as id,
table_a.num_rows - table_b.num_rows as difference
from table_a
cross join table_b
)
select
row_count_diff.difference as row_count_difference,
diff_count.num_missing as num_mismatched
from row_count_diff
cross join diff_count
""".strip()