Skip to content

Commit

Permalink
Address PR comments and create duplicate physical columns for tables
Browse files Browse the repository at this point in the history
  • Loading branch information
ktmud committed Apr 20, 2022
1 parent 52e500c commit 24b8670
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 111 deletions.
4 changes: 3 additions & 1 deletion superset/columns/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
ImportExportMixin,
)

UNKOWN_TYPE = "UNKNOWN"


class Column(
Model,
Expand Down Expand Up @@ -90,7 +92,7 @@ class Column(
# [1] https://www.postgresql.org/docs/9.1/datatype-character.html
name = sa.Column(sa.Text)
# Raw type as returned and used by db engine.
type = sa.Column(sa.Text, default="UNKNOWN")
type = sa.Column(sa.Text, default=UNKOWN_TYPE)

# Columns are defined by expressions. For tables, these are the actual columns names,
# and should match the ``name`` attribute. For datasets, these can be any valid SQL
Expand Down
56 changes: 31 additions & 25 deletions superset/connectors/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
from sqlalchemy.sql.selectable import Alias, TableClause

from superset import app, db, is_feature_enabled, security_manager
from superset.columns.models import Column as NewColumn
from superset.columns.models import Column as NewColumn, UNKOWN_TYPE
from superset.common.db_query_status import QueryStatus
from superset.connectors.base.models import BaseColumn, BaseDatasource, BaseMetric
from superset.connectors.sqla.utils import (
Expand Down Expand Up @@ -449,7 +449,7 @@ def to_sl_column(
column.created_by = self.created_by
column.changed_by = self.changed_by
column.name = self.column_name
column.type = self.type or "Unknown"
column.type = self.type or UNKOWN_TYPE
column.expression = self.expression or self.table.quote_identifier(
self.column_name
)
Expand All @@ -475,9 +475,9 @@ def after_delete( # pylint: disable=unused-argument
target: "TableColumn",
) -> None:
session = inspect(target).session
dataset = session.query(NewColumn).filter_by(uuid=target.uuid).one_or_none()
if dataset:
session.delete(dataset)
column = session.query(NewColumn).filter_by(uuid=target.uuid).one_or_none()
if column:
session.delete(column)


class SqlMetric(Model, BaseMetric, CertificationMixin):
Expand Down Expand Up @@ -565,7 +565,7 @@ def to_sl_column(
column.changed_on = self.changed_on
column.created_by = self.created_by
column.changed_by = self.changed_by
column.type = "Unknown"
column.type = UNKOWN_TYPE
column.expression = self.expression
column.warning_text = self.warning_text
column.description = self.description
Expand All @@ -589,9 +589,9 @@ def after_delete( # pylint: disable=unused-argument
target: "SqlMetric",
) -> None:
session = inspect(target).session
dataset = session.query(NewColumn).filter_by(uuid=target.uuid).one_or_none()
if dataset:
session.delete(dataset)
column = session.query(NewColumn).filter_by(uuid=target.uuid).one_or_none()
if column:
session.delete(column)


sqlatable_user = Table(
Expand Down Expand Up @@ -1879,7 +1879,6 @@ def fetch_metadata(self, commit: bool = True) -> MetadataResult:
new_column = TableColumn(
column_name=col["name"],
type=col["type"],
table_id=self.id,
table=self,
)
new_column.is_dttm = new_column.is_temporal
Expand Down Expand Up @@ -2055,7 +2054,7 @@ def get_sl_columns(self) -> List[NewColumn]:
if uuids:
# load those not found from db
existing_columns |= set(
session.query(NewColumn).filter(NewColumn.uuid.in_(uuids)).all()
session.query(NewColumn).filter(NewColumn.uuid.in_(uuids))
)

known_columns = {column.uuid: column for column in existing_columns}
Expand Down Expand Up @@ -2111,7 +2110,7 @@ def update_table( # pylint: disable=unused-argument
def after_insert(
mapper: Mapper,
connection: Connection,
table: "SqlaTable",
sqla_table: "SqlaTable",
) -> None:
"""
Shadow write the dataset to new models.
Expand All @@ -2125,14 +2124,14 @@ def after_insert(
For more context: https://github.com/apache/superset/issues/14909
"""
security_manager.set_perm(mapper, connection, table)
table.write_shadow_dataset()
security_manager.set_perm(mapper, connection, sqla_table)
sqla_table.write_shadow_dataset()

@staticmethod
def after_delete( # pylint: disable=unused-argument
mapper: Mapper,
connection: Connection,
target: "SqlaTable",
sqla_table: "SqlaTable",
) -> None:
"""
Shadow write the dataset to new models.
Expand All @@ -2146,8 +2145,10 @@ def after_delete( # pylint: disable=unused-argument
For more context: https://github.com/apache/superset/issues/14909
"""
session = inspect(target).session
dataset = session.query(NewDataset).filter_by(uuid=target.uuid).one_or_none()
session = inspect(sqla_table).session
dataset = (
session.query(NewDataset).filter_by(uuid=sqla_table.uuid).one_or_none()
)
if dataset:
session.delete(dataset)

Expand Down Expand Up @@ -2216,7 +2217,7 @@ def after_update(
or inspector.attrs.schema.history.has_changes()
or inspector.attrs.database.history.has_changes()
):
tables = NewTable.load_or_create(
tables = NewTable.bulk_load_or_create(
sqla_table.database,
[TableName(schema=sqla_table.schema, table=sqla_table.table_name)],
sync_columns=False,
Expand All @@ -2231,7 +2232,9 @@ def after_update(
# dataset columns will only be assigned to newly created tables
# existing tables should manage column syncing in another process
physical_columns = [
clone_model(column, ignore=["uuid"], additional=["changed_by"])
clone_model(
column, ignore=["uuid"], keep_relations=["changed_by"]
)
for column in dataset.columns
if column.is_physical
]
Expand All @@ -2249,7 +2252,7 @@ def after_update(
referenced_tables = extract_table_references(
sqla_table.sql, sqla_table.database.get_dialect().name
)
dataset.tables = NewTable.load_or_create(
dataset.tables = NewTable.bulk_load_or_create(
sqla_table.database,
referenced_tables,
default_schema=sqla_table.schema,
Expand Down Expand Up @@ -2287,6 +2290,9 @@ def write_shadow_dataset(
For more context: https://github.com/apache/superset/issues/14909
"""
session = inspect(self).session
# make sure database points to the right instance, in case only
# `table.database_id` is updated and the changes haven't been
# consolidated by SQLA
if self.database_id and (
not self.database or self.database.id != self.database_id
):
Expand All @@ -2307,8 +2313,8 @@ def write_shadow_dataset(

# physical dataset
if not self.sql:
# always create separate column entities for Dataset and Table
# so updating a dataset would not update the columns in the table
# always create separate column entries for Dataset and Table
# so updating a dataset would not update columns in the related table
physical_columns = [
clone_model(
column,
Expand All @@ -2317,12 +2323,12 @@ def write_shadow_dataset(
# be created via some sort of automated system.
# But keep `changed_by` in case someone manually changes
# column attributes such as `is_dttm`.
additional=["changed_by"],
keep_relations=["changed_by"],
)
for column in columns
if column.is_physical
]
tables = NewTable.load_or_create(
tables = NewTable.bulk_load_or_create(
self.database,
[TableName(schema=self.schema, table=self.table_name)],
sync_columns=False,
Expand All @@ -2345,7 +2351,7 @@ def write_shadow_dataset(
referenced_tables = extract_table_references(
self.sql, self.database.get_dialect().name
)
tables = NewTable.load_or_create(
tables = NewTable.bulk_load_or_create(
self.database,
referenced_tables,
default_schema=self.schema,
Expand Down
4 changes: 2 additions & 2 deletions superset/migrations/shared/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def assign_uuids(
op.execute(
f"UPDATE {dialect().identifier_preparer.quote(table_name)} SET uuid = {sql}"
)
print(f"Done. Assigned {count} uuids in {time.time() - start_time:.3f}s.")
print(f"Done. Assigned {count} uuids in {time.time() - start_time:.3f}s.\n")
return

# Othwewise Use Python uuid function
Expand All @@ -92,4 +92,4 @@ def assign_uuids(
print(f" uuid assigned to {end} out of {count}\r", end="")
start += batch_size

print(f"Done. Assigned {count} uuids in {time.time() - start_time:.3f}s.")
print(f"Done. Assigned {count} uuids in {time.time() - start_time:.3f}s.\n")
Loading

0 comments on commit 24b8670

Please sign in to comment.