-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Output record count metric from batch files insert #267
feat: Output record count metric from batch files insert #267
Conversation
target_snowflake/sinks.py
Outdated
full_table_name=full_table_name, | ||
schema=self.schema, | ||
sync_id=sync_id, | ||
file_format=file_format, | ||
) | ||
|
||
with self.record_counter_metric as counter: | ||
counter.increment(record_count) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this add records to the count twice? Once when the record is processed in https://github.com/meltano/sdk/blob/409a40b48c442e1382611d3a69b2f95df2e073d3/singer_sdk/target_base.py#L362 and again when they're batched in
target-snowflake/target_snowflake/sinks.py
Lines 151 to 154 in 8919a66
self.insert_batch_files_via_internal_stage( | |
full_table_name=full_table_name, | |
files=files, | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I guess so. The intention of this change was to work with BATCH
messages, but you're referring to when a user supplies batch_config
on the target side specifically to batch together RECORD
data into files for insert via internal stage?
Just to check my understanding, if batch_config
is supplied on the tap side, it emits a BATCH
message and then doesn't hit this issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the full context of the function I linked is useful:
target-snowflake/target_snowflake/sinks.py
Lines 109 to 156 in 7c9a1fb
def bulk_insert_records( | |
self, | |
full_table_name: str, | |
schema: dict, | |
records: t.Iterable[dict[str, t.Any]], | |
) -> int | None: | |
"""Bulk insert records to an existing destination table. | |
The default implementation uses a generic SQLAlchemy bulk insert operation. | |
This method may optionally be overridden by developers in order to provide | |
faster, native bulk uploads. | |
Args: | |
full_table_name: the target table name. | |
schema: the JSON schema for the new table, to be used when inferring column | |
names. | |
records: the input records. | |
Returns: | |
True if table exists, False if not, None if unsure or undetectable. | |
""" | |
# prepare records for serialization | |
processed_records = ( | |
conform_record_data_types( | |
stream_name=self.stream_name, | |
record=rcd, | |
schema=schema, | |
level="RECURSIVE", | |
logger=self.logger, | |
) | |
for rcd in records | |
) | |
# serialize to batch files and upload | |
# TODO: support other batchers | |
batcher = JSONLinesBatcher( | |
tap_name=self.target.name, | |
stream_name=self.stream_name, | |
batch_config=self.batch_config, | |
) | |
batches = batcher.get_batches(records=processed_records) | |
for files in batches: | |
self.insert_batch_files_via_internal_stage( | |
full_table_name=full_table_name, | |
files=files, | |
) | |
# if records list, we can quickly return record count. | |
return len(records) if isinstance(records, list) else None |
RECORD
messages are processed by this logic, even with the default batch_config
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see - I wasn't aware there was a default batch_config
here. So processing of RECORD
and BATCH
messages both end up calling insert_batch_files_via_internal_stage
where I've made this change, except RECORD
messages are already counted by the SDK before they are batched so would now get counted twice.
In spite of this issue, in principle how do you feel about this idea of emitting a record count when this target receives a BATCH
message? Is it OK or does it conflate two concepts?
0dddac6
to
684d362
Compare
`RECORD` messages are already counted by the SDK goes through, and processing of them here goes through `insert_batch_files_via_internal_stage` by default
684d362
to
fb4cc82
Compare
with self.record_counter_metric as counter: | ||
counter.increment(record_count) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this should only count records inserted from batch files!
Thanks @ReubenFrankel! |
Bit of an opinionated change since it might be conflating batches/records, but it has been helpful for us to know how many rows (records) were created/updated from batch processing.