Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove from _KNOWN_TABLES on 404 insert to allow table re-creation for Python SDK #25325

Closed
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,9 @@ def chain_after(result):
from apache_beam.utils.annotations import deprecated
from apache_beam.utils.annotations import experimental


try:
from google.api_core.exceptions import ClientError, GoogleAPICallError
from apache_beam.io.gcp.internal.clients.bigquery import DatasetReference
from apache_beam.io.gcp.internal.clients.bigquery import TableReference
from apache_beam.io.gcp.internal.clients.bigquery import JobReference
Expand Down Expand Up @@ -1295,6 +1297,7 @@ class BigQueryWriteFn(DoFn):

FAILED_ROWS = 'FailedRows'
FAILED_ROWS_WITH_ERRORS = 'FailedRowsWithErrors'
FAILED_INSERT_NOTFOUND = 'notFound'
STREAMING_API_LOGGING_FREQUENCY_SEC = 300

def __init__(
Expand Down Expand Up @@ -1551,17 +1554,23 @@ def _flush_batch(self, destination):
insert_ids = [None for r in rows_and_insert_ids]
else:
insert_ids = [r[1] for r in rows_and_insert_ids]

while True:
errors = []
passed = False
start = time.time()
passed, errors = self.bigquery_wrapper.insert_rows(
project_id=table_reference.projectId,
dataset_id=table_reference.datasetId,
table_id=table_reference.tableId,
rows=rows,
insert_ids=insert_ids,
skip_invalid_rows=True,
ignore_unknown_values=self.ignore_unknown_columns)
try:
passed, errors = self.bigquery_wrapper.insert_rows(
project_id=table_reference.projectId,
dataset_id=table_reference.datasetId,
table_id=table_reference.tableId,
rows=rows,
insert_ids=insert_ids,
skip_invalid_rows=True,
ignore_unknown_values=self.ignore_unknown_columns)
except (ClientError, GoogleAPICallError) as e:
if e.code == 404:
ragyabraham marked this conversation as resolved.
Show resolved Hide resolved
_KNOWN_TABLES.remove(destination)
Copy link
Contributor

@ahmedabu98 ahmedabu98 Feb 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_KNOWN_TABLES.remove(destination)
_KNOWN_TABLES.remove(destination)
self._create_table_if_needed(bigquery_tools.parse_table_reference(destination), self.schema)

We will actually need to create the table again here, otherwise we will continue running into the 404 error.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify? I believe after raising, the bundle will retry, _create_table_if_needed is called again and will pass the check for the table name being in KNOWN_TABLES. Then the table may be created again (depending on create_disposition).

I've also realised, if the create_disposition is 'CREATE_NEVER' and the insert_retry_strategy is set to 'RETRY_ALWAYS', the pipeline will be stuck in a loop, no?

Copy link
Contributor

@ahmedabu98 ahmedabu98 Mar 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe after raising, the bundle will retry, _create_table_if_needed is called again

Ahh you're right, ignore my suggestion.

I've also realised, if the create_disposition is 'CREATE_NEVER' and the insert_retry_strategy is set to 'RETRY_ALWAYS', the pipeline will be stuck in a loop, no?

The retry strategy refers to errors we receive when inserting individual rows (e.g. schema mismatch) and they come after BQ tries inserting in the table. Those failed row insertions may be retried according to the strategy (and that logic is handled directly in this file), but the errors don't cause the whole bundle to fail.

The error we're looking at here is from the HTTP request (the table itself doesn't exist). This error will cause the bundle to fail and it's up to the runner to decide how it deals with this (e.g. DirectRunner fails the pipeline; DataflowRunner retries a failed bundle 3 times then fails the pipeline).

raise
self.batch_latency_metric.update((time.time() - start) * 1000)

failed_rows = [(rows[entry['index']], entry["errors"])
Expand All @@ -1579,7 +1588,6 @@ def _flush_batch(self, destination):
message = (
'There were errors inserting to BigQuery. Will{} retry. '
'Errors were {}'.format(("" if should_retry else " not"), errors))

# The log level is:
# - WARNING when we are continuing to retry, and have a deadline.
# - ERROR when we will no longer retry, or MAY retry forever.
Expand Down