Skip to content

Commit

Permalink
fix: Use a SQLAlchemy to generate an insert statement
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Jan 28, 2025
1 parent c81fa53 commit bbf9e43
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 28 deletions.
34 changes: 20 additions & 14 deletions singer_sdk/sinks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
import typing as t
from collections import defaultdict
from copy import copy
from textwrap import dedent

import sqlalchemy as sa
from sqlalchemy.sql import quoted_name
from sqlalchemy.sql import insert
from sqlalchemy.sql.expression import bindparam

from singer_sdk.connectors import SQLConnector
Expand Down Expand Up @@ -282,19 +281,26 @@ def generate_insert_statement(
Returns:
An insert statement.
"""
property_names = list(self.conform_schema(schema)["properties"].keys())
column_identifiers = [
self.connector.quote(quoted_name(name, quote=True))
for name in property_names
]
statement = dedent(
f"""\
INSERT INTO {full_table_name}
({", ".join(column_identifiers)})
VALUES ({", ".join([f":{name}" for name in property_names])})
""",
conformed_schema = self.conform_schema(schema)
property_names = list(conformed_schema["properties"])

_, schema_name, table_name = self.connector.parse_full_table_name(
full_table_name
)
return statement.rstrip()

table = sa.Table(
table_name,
sa.MetaData(),
*[
sa.Column(
name, sa.String
) # Assuming all columns are of type String for simplicity # noqa: E501
for name in property_names
],
schema=schema_name,
)

return insert(table)

def bulk_insert_records(
self,
Expand Down
16 changes: 9 additions & 7 deletions tests/core/sinks/test_sql_sink.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from __future__ import annotations

import typing as t
from textwrap import dedent

import pytest
from sqlalchemy.sql import Insert

from samples.sample_duckdb import DuckDBConnector
from singer_sdk.sinks.sql import SQLSink
Expand Down Expand Up @@ -55,10 +55,12 @@ def sink(self, target: DuckDBTarget, schema: dict) -> DuckDBSink:

def test_generate_insert_statement(self, sink: DuckDBSink, schema: dict):
"""Test that the insert statement is generated correctly."""
expected = dedent(
"""\
INSERT INTO foo
(id, col_ts, "table")
VALUES (:id, :col_ts, :table)"""
stmt = sink.generate_insert_statement("foo", schema=schema)
assert isinstance(stmt, Insert)
assert stmt.table.name == "foo"
assert stmt.table.columns.keys() == ["id", "col_ts", "table"]

# Rendered SQL should look like:
assert str(stmt) == (
'INSERT INTO foo (id, col_ts, "table") VALUES (:id, :col_ts, :table)'
)
assert sink.generate_insert_statement("foo", schema=schema) == expected
9 changes: 2 additions & 7 deletions tests/samples/test_target_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,12 +646,7 @@ def test_record_with_missing_properties(
},
},
[],
dedent(
"""\
INSERT INTO test_stream
(id, name, "table")
VALUES (:id, :name, :table)""",
),
'INSERT INTO test_stream (id, name, "table") VALUES (:id, :name, :table)',
),
],
ids=[
Expand All @@ -676,7 +671,7 @@ def test_sqlite_generate_insert_statement(
sink.full_table_name,
sink.schema,
)
assert dml == expected_dml
assert str(dml) == expected_dml


def test_hostile_to_sqlite(
Expand Down

0 comments on commit bbf9e43

Please sign in to comment.