Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Use a SQLAlchemy to generate an insert statement #2843

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,10 @@ def create_engine(self) -> Engine:
pool_pre_ping=True,
)

@deprecated(
"This method is deprecated. Use or override `FullyQualifiedName` instead.",
category=SingerSDKDeprecationWarning,
)
def quote(self, name: str) -> str:
"""Quote a name if it needs quoting, using '.' as a name-part delimiter.

Expand All @@ -853,7 +857,7 @@ def quote(self, name: str) -> str:
Returns:
str: The quoted name.
"""
return ".".join(
return ".".join( # pragma: no cover
[
self._dialect.identifier_preparer.quote(name_part)
for name_part in name.split(".")
Expand Down
43 changes: 28 additions & 15 deletions singer_sdk/sinks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

import re
import typing as t
import warnings
from collections import defaultdict
from copy import copy
from textwrap import dedent

import sqlalchemy as sa
from sqlalchemy.sql import quoted_name
from sqlalchemy.sql import insert
from sqlalchemy.sql.expression import bindparam

from singer_sdk.connectors import SQLConnector
Expand Down Expand Up @@ -282,19 +282,26 @@ def generate_insert_statement(
Returns:
An insert statement.
"""
property_names = list(self.conform_schema(schema)["properties"].keys())
column_identifiers = [
self.connector.quote(quoted_name(name, quote=True))
for name in property_names
]
statement = dedent(
f"""\
INSERT INTO {full_table_name}
({", ".join(column_identifiers)})
VALUES ({", ".join([f":{name}" for name in property_names])})
""",
conformed_schema = self.conform_schema(schema)
property_names = list(conformed_schema["properties"])

_, schema_name, table_name = self.connector.parse_full_table_name(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: Need to handle case where schema_name is None.

The Table constructor will fail if schema_name is None. Consider adding a conditional to handle this case.

full_table_name
)
return statement.rstrip()

table = sa.Table(
table_name,
sa.MetaData(),
*[
sa.Column(
name, sa.String
) # Assuming all columns are of type String for simplicity # noqa: E501
Comment on lines +296 to +298
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (performance): Using String type for all columns could cause data type mismatches and performance issues.

Consider mapping the schema types to appropriate SQLAlchemy types based on the conformed_schema['properties'] type definitions.

for name in property_names
],
schema=schema_name,
)

return insert(table)

def bulk_insert_records(
self,
Expand All @@ -321,7 +328,13 @@ def bulk_insert_records(
full_table_name,
schema,
)
if isinstance(insert_sql, str):
if isinstance(insert_sql, str): # pragma: no cover
warnings.warn(
"Generating a SQL insert statement as a string is deprecated. "
"Please return an SQLAlchemy Executable object instead.",
DeprecationWarning,
stacklevel=2,
)
insert_sql = sa.text(insert_sql)

conformed_records = [self.conform_record(record) for record in records]
Expand Down
16 changes: 9 additions & 7 deletions tests/core/sinks/test_sql_sink.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from __future__ import annotations

import typing as t
from textwrap import dedent

import pytest
from sqlalchemy.sql import Insert

from samples.sample_duckdb import DuckDBConnector
from singer_sdk.sinks.sql import SQLSink
Expand Down Expand Up @@ -55,10 +55,12 @@ def sink(self, target: DuckDBTarget, schema: dict) -> DuckDBSink:

def test_generate_insert_statement(self, sink: DuckDBSink, schema: dict):
"""Test that the insert statement is generated correctly."""
expected = dedent(
"""\
INSERT INTO foo
(id, col_ts, "table")
VALUES (:id, :col_ts, :table)"""
stmt = sink.generate_insert_statement("foo", schema=schema)
assert isinstance(stmt, Insert)
assert stmt.table.name == "foo"
assert stmt.table.columns.keys() == ["id", "col_ts", "table"]

edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
# Rendered SQL should look like:
assert str(stmt) == (
'INSERT INTO foo (id, col_ts, "table") VALUES (:id, :col_ts, :table)'
)
assert sink.generate_insert_statement("foo", schema=schema) == expected
9 changes: 2 additions & 7 deletions tests/samples/test_target_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,12 +646,7 @@ def test_record_with_missing_properties(
},
},
[],
dedent(
"""\
INSERT INTO test_stream
(id, name, "table")
VALUES (:id, :name, :table)""",
),
'INSERT INTO test_stream (id, name, "table") VALUES (:id, :name, :table)',
),
],
ids=[
Expand All @@ -676,7 +671,7 @@ def test_sqlite_generate_insert_statement(
sink.full_table_name,
sink.schema,
)
assert dml == expected_dml
assert str(dml) == expected_dml


def test_hostile_to_sqlite(
Expand Down