Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactoring database #28

Merged
merged 6 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion infrastructure/Dockerfile.extract_many
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ RUN pip3 install -r requirements.txt

# Copy necessary Python scripts from the pipeline directory into the container
COPY ../pipeline/extract_demand.py .
COPY ../pipeline/extract_price.py .
COPY ../pipeline/extract_generation.py .
COPY ../pipeline/extract_to_s3.py .
COPY ../pipeline/common.py .
Expand Down
33 changes: 20 additions & 13 deletions infrastructure/schema.sql
Original file line number Diff line number Diff line change
@@ -1,36 +1,43 @@
DROP TABLE IF EXISTS Generation ;
DROP TABLE IF EXISTS Cost ;
DROP TABLE IF EXISTS Carbon ;
DROP TABLE IF EXISTS Demand ;
DROP TABLE IF EXISTS user_data ;
DROP TABLE IF EXISTS generation_percent ;


CREATE TABLE Demand (
publish_time TIMESTAMP PRIMARY KEY,
Demand_amt INT
publish_time TIMESTAMP,
demand_amt INT,
PRIMARY KEY (publish_time)
);
CREATE TABLE Carbon (
publish_time TIMESTAMP,
forecast INT,
carbon_level VARCHAR(255),
PRIMARY KEY (publish_time)
);
CREATE TABLE Cost (

publish_date DATE,
settlement_period INT,
sell_price FLOAT,
buy_price FLOAT,
PRIMARY KEY (publish_date, settlement_period)
CREATE TABLE user_data (
users_id INT GENERATED ALWAYS AS IDENTITY,
users_name VARCHAR,
user_email VARCHAR,
user_postcode VARCHAR,
hours_to_charge INT,
user_preference VARCHAR,
PRIMARY KEY (users_id)
);
CREATE TABLE generation_percent (
fuel_type INT,
date_time TIMESTAMP,
slice_percentage FLOAT,
PRIMARY KEY (date_time, fuel_type)
);
CREATE TABLE Generation (
publish_time TIMESTAMP,
publish_date DATE,

fuel_type VARCHAR(255),
gain_loss VARCHAR(1),
generated FLOAT,
settlement_period INT,
PRIMARY KEY (publish_time, fuel_type),
FOREIGN KEY (publish_time) REFERENCES Demand(publish_time),
FOREIGN KEY (publish_date,settlement_period) REFERENCES Cost(publish_date,settlement_period)
FOREIGN KEY (publish_time) REFERENCES Demand(publish_time)
);
227 changes: 0 additions & 227 deletions pipeline/extract_price.py

This file was deleted.

7 changes: 0 additions & 7 deletions pipeline/extract_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import timeit
from extract_generation import main as extract_generation
from extract_demand import main as extract_demand
from extract_price import main as extract_price

from constants import Constants as ct
import config as cg
Expand Down Expand Up @@ -36,12 +35,6 @@ def pipeline():
extract_time = timeit.timeit(extract_demand, number=1)
logger.info("Extract script completed in %s seconds", extract_time)

logger.info("===========")
logger.info("==> Executing extract_price..")
logger.info("===========")
extract_time = timeit.timeit(extract_price, number=1)
logger.info("Extract script completed in %s seconds", extract_time)

logger.info("===============")
logger.info("==> Extract Scripts Complete!")
logger.info("=======================================|")
Expand Down
51 changes: 0 additions & 51 deletions pipeline/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,11 @@ def get_data(self) -> dict[tuple]:
formatted_data = self.demand_transform(df)
data['demand'] = formatted_data
self.logger.info("""Transformed demand data""")
if "cost" in file:
formatted_data = self.cost_transform(df)
if formatted_data:
data['cost'] = formatted_data
self.logger.info("""Transformed cost data""")
if "carbon" in file:
formatted_data = self.carbon_transform(df)
data['carbon'] = formatted_data
self.logger.info("""Transformed carbon data""")

data = self.difference_of_periods(data)

data = self.difference_of_dates(data)
self.delete_read_files(files)
return data
Expand All @@ -87,25 +80,6 @@ def difference_of_dates(self, data_conflict: dict):
break
return data_conflict

def difference_of_periods(self, data_conflict: dict):
"""
Works out the difference of settlement periods between the settlement period column of generation
and the settlement period column of cost, adds the missing dates to cost and
returns it
Mainly to fix foreign key errors
"""
diff = list(set(self.period_g) - set(self.period_c))
for period in diff:
for values in data_conflict['generation']:

if values[5] == period:
if period == 2:
yesterday = datetime.date.today() - datetime.timedelta(days=1)
data_conflict['cost'].append((yesterday, 2, 0, 0))
data_conflict['cost'].append((values[1], values[5], 0, 0))
break
return data_conflict

def generation_transform(self, df: pd.DataFrame) -> tuple:
"""
Filters and transforms the generation dataframe passed into it and returns
Expand All @@ -130,22 +104,6 @@ def demand_transform(self, df: pd.DataFrame) -> tuple:
self.time_d = df['startTime'].unique()
return list(df.itertuples(index=False, name=None))

def cost_transform(self, df: pd.DataFrame) -> tuple:
"""
Filters and transforms the cost dataframe passed into it and returns
a list of tuples
"""
df = df.get(['settlementDate', 'settlementPeriod',
'systemSellPrice', 'systemBuyPrice'])
self.period_c = df['settlementPeriod'].unique()
if 2 in self.period_c:
yesterday = datetime.date.today() - datetime.timedelta(days=1)
df.loc[len(df.index)] = [str(yesterday), 2, 0, 0]
if 1 in self.period_c:
yesterday = datetime.date.today() - datetime.timedelta(days=1)
df.loc[len(df.index)] = [str(yesterday), 1, 0, 0]
return list(df.itertuples(index=False, name=None))

def carbon_transform(self, df: pd.DataFrame) -> tuple:
"""
Filters and transforms the carbon dataframe passed into it and returns
Expand Down Expand Up @@ -207,15 +165,6 @@ def load_values(self, conn, data):
execute_values(curr, sql_query, data['demand'])
self.logger.info(
"""Loaded demand data into the database""")
if data.get('cost'):
sql_query = """INSERT INTO Cost (publish_date, settlement_period, sell_price, buy_price)
VALUES %s
ON CONFLICT (publish_date, settlement_period) DO UPDATE
SET sell_price=EXCLUDED.sell_price,
buy_price=EXCLUDED.buy_price"""
execute_values(curr, sql_query, data['cost'])
self.logger.info(
"""Loaded cost data into the database""")
if data.get('demand'):
sql_query = """INSERT INTO Carbon (publish_time, forecast, carbon_level)
VALUES %s
Expand Down
Loading
Loading