-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdbUtils.py
76 lines (68 loc) · 2.76 KB
/
dbUtils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
'''
This small module contains utility functions to run database queries and other long calculations. Its roots lay in the DataExplorer notebook.
'''
import psycopg2
import sqlalchemy
from sqlalchemy import MetaData, Table
import pandas
from IPython.display import Audio
import time
import multiprocessing
from datetime import datetime
defaultSchema = 'crm20'
engine = sqlalchemy.create_engine('postgresql://crm20:crm20@localhost:5433/github')
lastResult = None
'''
The `runQuery` method provides several features:
- It acts as wrapper to run queries against the database
- It (optionally) beeps at the end of a database run, which helped a lot in development, as I could focus on other things while waiting for queries to end, without having to look if the query finished every time
- It can save the last result to memory, as a fallback if the display session crashed while running the query (or the query just took too long to keep the session up)
- It wraps a pandas.read, which enforces each query to have a return type; this strongly encourages a exploratory way of developing
'''
def runQuery(query, mute=False):
start = time.time()
result = pandas.read_sql_query(query, engine)
global lastResult
lastResult = result
end = time.time()
print('Time used: '+str(end - start))
if not mute:
display(Audio('./beep.mp3', autoplay=True))
return result
'''
The `log` function is meant to run very long running calculation. For those, it is impossible to keep up a single display session, which is why the output has to be redirected. Timestamps additionally show when progress has been made.
'''
logSemaphore = multiprocessing.Semaphore()
def log(text, file='log.txt'):
with logSemaphore:
with open(file, 'a') as file:
file.write('========= '+str(datetime.now())+' ==========\n'+str(text)+'\n')
'''
Wraps table creation (for modularity); originates from the RepoAnalysis notebook
'''
def createTable(tableName, columns):
meta = MetaData(schema=defaultSchema)
table = Table(
tableName, meta,
*columns
)
meta.create_all(engine)
engine.dispose()
'''
Wraps table deletion, exception handling catches error that is thrown because the query does not return data; originates from the RepoAnalysis notebook
'''
def deleteTable(tableName):
try:
runQuery('''
DROP TABLE '''+defaultSchema+'''.'''+tableName+'''
''')
except:
pass
'''
Safely writes a pandas dataframe to a given table; originates from the RepoAnalysis notebook
'''
dataBaseSemaphore = multiprocessing.Semaphore()
def writeDataToDb(data, tableName):
with dataBaseSemaphore:
data.to_sql(tableName, schema=defaultSchema, con=engine, if_exists='append', index=False)
engine.dispose()