-
Notifications
You must be signed in to change notification settings - Fork 175
/
Copy pathutil.py
99 lines (74 loc) · 3.04 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import json
from urllib.parse import quote_plus
from typing import Optional, Dict, List, Union, Tuple
from sqlalchemy import exc
from sqlalchemy.engine.url import _rfc_1738_quote # noqa
def _url(
host: str,
port: Optional[int] = 8080,
user: Optional[str] = None,
password: Optional[str] = None,
catalog: Optional[str] = None,
schema: Optional[str] = None,
source: Optional[str] = "trino-sqlalchemy",
session_properties: Dict[str, str] = None,
http_headers: Dict[str, Union[str, int]] = None,
extra_credential: Optional[List[Tuple[str, str]]] = None,
client_tags: Optional[List[str]] = None,
experimental_python_types: Optional[bool] = None,
access_token: Optional[str] = None,
cert: Optional[str] = None,
key: Optional[str] = None,
verify: Optional[bool] = None,
roles: Optional[Dict[str, str]] = None
) -> str:
"""
Composes a SQLAlchemy connection string from the given database connection
parameters.
Parameters containing special characters (e.g., '@', '%') need to be encoded to be parsed correctly.
"""
trino_url = "trino://"
if user is not None:
trino_url += _rfc_1738_quote(user)
if password is not None:
if user is None:
raise exc.ArgumentError("user must be specified when specifying a password.")
trino_url += f":{_rfc_1738_quote(password)}"
if user is not None:
trino_url += "@"
if not host:
raise exc.ArgumentError("host must be specified.")
trino_url += host
if not port:
raise exc.ArgumentError("port must be specified.")
trino_url += f":{port}/"
if catalog is not None:
trino_url += f"{quote_plus(catalog)}"
if schema is not None:
if catalog is None:
raise exc.ArgumentError("catalog must be specified when specifying a default schema.")
trino_url += f"/{quote_plus(schema)}"
assert source
trino_url += f"?source={quote_plus(source)}"
if session_properties is not None:
trino_url += f"&session_properties={quote_plus(json.dumps(session_properties))}"
if http_headers is not None:
trino_url += f"&http_headers={quote_plus(json.dumps(http_headers))}"
if extra_credential is not None:
# repr is used here as json.dumps converts tuples into arrays
trino_url += f"&extra_credential={quote_plus(json.dumps(extra_credential))}"
if client_tags is not None:
trino_url += f"&client_tags={quote_plus(json.dumps(client_tags))}"
if experimental_python_types is not None:
trino_url += f"&experimental_python_types={json.dumps(experimental_python_types)}"
if access_token is not None:
trino_url += f"&access_token={quote_plus(access_token)}"
if cert is not None:
trino_url += f"&cert={quote_plus(cert)}"
if key is not None:
trino_url += f"&key={quote_plus(key)}"
if verify is not None:
trino_url += f"&verify={json.dumps(verify)}"
if roles is not None:
trino_url += f"&roles={quote_plus(json.dumps(roles))}"
return trino_url