Skip to content

Commit

Permalink
fix: Use a SQLAlchemy to generate an insert statement
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Jan 28, 2025
1 parent c81fa53 commit 0168c87
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 21 deletions.
28 changes: 14 additions & 14 deletions singer_sdk/sinks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
import typing as t
from collections import defaultdict
from copy import copy
from textwrap import dedent

import sqlalchemy as sa
from sqlalchemy.sql import quoted_name
from sqlalchemy.sql import insert
from sqlalchemy.sql.expression import bindparam

from singer_sdk.connectors import SQLConnector
Expand Down Expand Up @@ -282,19 +281,20 @@ def generate_insert_statement(
Returns:
An insert statement.
"""
property_names = list(self.conform_schema(schema)["properties"].keys())
column_identifiers = [
self.connector.quote(quoted_name(name, quote=True))
for name in property_names
]
statement = dedent(
f"""\
INSERT INTO {full_table_name}
({", ".join(column_identifiers)})
VALUES ({", ".join([f":{name}" for name in property_names])})
""",
conformed_schema = self.conform_schema(schema)
property_names = list(conformed_schema["properties"])
table = sa.Table(
full_table_name, # type: ignore[arg-type]
sa.MetaData(),
*[
sa.Column(
name, sa.String
) # Assuming all columns are of type String for simplicity # noqa: E501
for name in property_names
],
)
return statement.rstrip()

return insert(table)

def bulk_insert_records(
self,
Expand Down
16 changes: 9 additions & 7 deletions tests/core/sinks/test_sql_sink.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from __future__ import annotations

import typing as t
from textwrap import dedent

import pytest
from sqlalchemy.sql import Insert

from samples.sample_duckdb import DuckDBConnector
from singer_sdk.sinks.sql import SQLSink
Expand Down Expand Up @@ -55,10 +55,12 @@ def sink(self, target: DuckDBTarget, schema: dict) -> DuckDBSink:

def test_generate_insert_statement(self, sink: DuckDBSink, schema: dict):
"""Test that the insert statement is generated correctly."""
expected = dedent(
"""\
INSERT INTO foo
(id, col_ts, "table")
VALUES (:id, :col_ts, :table)"""
stmt = sink.generate_insert_statement("foo", schema=schema)
assert isinstance(stmt, Insert)
assert stmt.table.name == "foo"
assert stmt.table.columns.keys() == ["id", "col_ts", "table"]

# Rendered SQL should look like:
assert str(stmt) == (
'INSERT INTO foo (id, col_ts, "table") VALUES (:id, :col_ts, :table)'
)
assert sink.generate_insert_statement("foo", schema=schema) == expected

0 comments on commit 0168c87

Please sign in to comment.