-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSQLETL.py
81 lines (59 loc) · 3.04 KB
/
SQLETL.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import pandas as pd
from sqlalchemy import create_engine as eng, MetaData
from datetime import datetime, timedelta
def does_table_exists(engine, table_name):
meta = MetaData()
meta.reflect(bind=engine)
return table_name in meta.tables
def getDatatoModel(database,schema,table_name,startDttm):
server = 'ANDREW_PC\ANDREWSSQLSEVER'
trusted_connection = 'yes'
Engine = eng(f'mssql+pyodbc://{server}/{database}?trusted_connection={trusted_connection}&driver=ODBC+Driver+17+for+SQL+Server&schema={schema}')
def get_date_value(query):
try:
data = pd.read_sql(sql=query, con=Engine)
except Exception as e:
print(f"Error: {str(e)}")
return data.iloc[0, 0] if not data.empty else None
minDatequery = f"SELECT min(Activity_DTTM) as minActivity_DTTM, max(Activity_DTTM) as maxActivity_DTTM FROM {database}.{schema}.{table_name} where Activity_DTTM >= '{startDttm}' "
minDateValue = get_date_value(minDatequery)
maxDatequery = f"SELECT max(Activity_DTTM) as maxActivity_DTTM FROM {database}.{schema}.{table_name} where Activity_DTTM >= '{startDttm}' "
maxDateValue = get_date_value(maxDatequery)
if minDateValue.year is None:
datecounter = startDttm.year
else:
datecounter = 2000
datecounter = startDttm.year
query_results_object = []
while datecounter <= maxDateValue.year:
query = f"""
with cte as (
select ticker,count(1) as cnt
from [Nasdaq].[dbo].[NasdaqHistory]
where activity_dttm >= '1/1/2010'
group by ticker
having count(1) >= 2500 and max(activity_dttm) >= '2024-01-10'
)
SELECT a.*
FROM {database}.{schema}.{table_name} as a
join cte as b
on a.ticker = b.ticker
where year(Activity_DTTM) = '{datecounter}' """
data = pd.read_sql(sql=query,con=Engine)
query_results_object.append(data)
print("Execution results for: ", datecounter, " with ", data.shape[0], " size")
#datecounter += timedelta(days=1)
datecounter += 1
df = pd.concat(query_results_object)
print(f"Shape: {df.shape[0]}")
return df
def uploadtoSQL(importdf, Database,Schema,Table):
starttime = datetime.now()
server = 'ANDREW_PC\ANDREWSSQLSEVER'
database = Database #'Nasdaq'
schema = Schema #'dbo'
table_name = Table #'NasdaqModeledData'
trusted_connection = 'yes'
NasdaqEngine = eng(f'mssql+pyodbc://{server}/{database}?trusted_connection={trusted_connection}&driver=ODBC+Driver+17+for+SQL+Server&schema={schema}')
importdf.to_sql(schema=schema,name=table_name, con=NasdaqEngine, if_exists='append', index=False)
# -- End uploadModeltoSQL()