Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Order by pagination #52

Merged
merged 4 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 37 additions & 7 deletions dbt_automation/utils/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from google.cloud.exceptions import NotFound
from google.oauth2 import service_account
import json
from dbt_automation.utils.columnutils import quote_columnname
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface

basicConfig(level=INFO)
Expand All @@ -30,7 +31,7 @@ def __init__(self, conn_info=None, location=None):

def execute(self, statement: str, **kwargs) -> list:
"""run a query and return the results"""
query_job = self.bqclient.query(statement, **kwargs)
query_job = self.bqclient.query(statement, location=self.location, **kwargs)
return query_job.result()

def get_tables(self, schema: str) -> list:
Expand All @@ -50,12 +51,42 @@ def get_table_columns(self, schema: str, table: str) -> list:
column_names = [field.name for field in table.schema]
return column_names

def get_table_data(self, schema: str, table: str, limit: int) -> list:
def get_table_data(
self,
schema: str,
table: str,
limit: int,
page: int = 1,
order_by: str = None,
order: int = 1, # ASC
) -> list:
"""returns limited rows from the specified table in the given schema"""
table_ref = f"{schema}.{table}"
table: bigquery.Table = self.bqclient.get_table(table_ref)
records = self.bqclient.list_rows(table=table, max_results=limit)
rows = [dict(record) for record in records]

offset = (page - 1) * limit
# total_rows = self.execute(
# f"SELECT COUNT(*) as total_rows FROM `{schema}`.`{table}`"
# )
# total_rows = next(total_rows).total_rows

# select
query = f"""
SELECT *
FROM `{schema}`.`{table}`
"""

# order
if order_by:
query += f"""
ORDER BY {quote_columnname(order_by)} {"ASC" if order == 1 else "DESC"}
"""

# offset, limit
query += f"""
LIMIT {limit} OFFSET {offset}
"""

result = self.execute(query)
rows = [dict(record) for record in result]

return rows

Expand Down Expand Up @@ -86,7 +117,6 @@ def get_json_columnspec(
FROM keys
CROSS JOIN UNNEST(keys.keys) AS k
''',
location=self.location,
)
return [json_field["k"] for json_field in query]

Expand Down
10 changes: 9 additions & 1 deletion dbt_automation/utils/interfaces/warehouse_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,15 @@ def get_schemas(self):
pass

@abstractmethod
def get_table_data(self, schema: str, table: str, limit: int):
def get_table_data(
self,
schema: str,
table: str,
limit: int,
page: int = 1,
order_by: str = None,
order: int = 1, # ASC
):
pass

@abstractmethod
Expand Down
41 changes: 33 additions & 8 deletions dbt_automation/utils/postgres.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""helpers for postgres"""

from logging import basicConfig, getLogger, INFO
import psycopg2
import os
from dbt_automation.utils.columnutils import quote_columnname
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface


Expand Down Expand Up @@ -81,16 +83,39 @@ def get_schemas(self) -> list:
)
return [x[0] for x in resultset]

def get_table_data(self, schema: str, table: str, limit: int) -> list:
"""returns limited rows from the specified table in the given schema"""
def get_table_data(
self,
schema: str,
table: str,
limit: int,
page: int = 1,
order_by: str = None,
order: int = 1, # ASC
) -> list:
"""
returns limited rows from the specified table in the given schema
"""
offset = (page - 1) * limit
# total_rows = self.execute(f"SELECT COUNT(*) FROM {schema}.{table}")[0][0]

resultset = self.execute(
f"""
SELECT *
FROM {schema}.{table}
LIMIT {limit};
# select
query = f"""
SELECT *
FROM {schema}.{table}
"""

# order
if order_by:
query += f"""
ORDER BY {quote_columnname(order_by)} {"ASC" if order == 1 else "DESC"}
"""
) # returns an array of tuples of values

# offset, limit
query += f"""
OFFSET {offset} LIMIT {limit};
"""

resultset = self.execute(query) # returns an array of tuples of values
col_names = [desc[0] for desc in self.cursor.description]
rows = [dict(zip(col_names, row)) for row in resultset]

Expand Down
Loading