Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request]: Mechanism to re-create deleted table if it is already in _KNOWN_TABLES, according to CREATE_IF_NEEDED create disposition #25225

Open
1 of 15 tasks
tomlynchRNA opened this issue Jan 31, 2023 · 4 comments

Comments

@tomlynchRNA
Copy link

What would you like to happen?

Hello,

I am running an Apache Beam pipeline and streaming inserts into Google BigQuery with the Python SDK.

I am facing an issue where because Beam only creates tables once before storing the name a list called _KNOWN_TABLES, if the table is deleted while the pipeline is running after Beam having already created it, further inserts will error out with a 404 and not attempt to re-create it.

You can see here where the _create_table_if_needed method returns early if the table is already in _KNOWN_TABLES:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1463

I have the pipeline create disposition set to the default CREATE_IF_NEEDED, therefore I expect that if the table does not exist (and is needed) to stream inserts, that it will be created.

I propose that a mechanism be implemented allowing this behaviour, and would be willing to make the changes & open a pull request.

Looking forward to your thoughts,
Tom

Issue Priority

Priority: 2 (default / most feature requests should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@tomlynchRNA tomlynchRNA changed the title [Feature Request]: Python SDK, ability to disable _KNOWN_TABLES in _create_table_if_needed [Feature Request]: Mechanism to re-create deleted table if it is already in _KNOWN_TABLES, according to CREATE_IF_NEEDED create disposition Jan 31, 2023
@damccorm damccorm added the io label Jan 31, 2023
tomlynchRNA added a commit to rnadigital/beam that referenced this issue Feb 2, 2023
…ady in _KNOWN_TABLES, according to CREATE_IF_NEEDED create disposition apache#25225

* Add check during `BigQueryWriteFn._flush_batch` such that if an insert fails with HttpError 404 and reason 'notFound',  we remove the table_reference from _KNOWN_TABLES so that on subsequent calls to `BigQueryWriteFn._create_table_if_needed` the table may be recreated (depending on create_disposition)
tomlynchRNA added a commit to rnadigital/beam that referenced this issue Feb 6, 2023
…ady in _KNOWN_TABLES, according to CREATE_IF_NEEDED create disposition apache#25225

* Add check during bigquery insert such that if an insert fails with code 404, we remove the table_reference from _KNOWN_TABLES so that on subsequent calls to `BigQueryWriteFn._create_table_if_needed` will recreate the table if needed (subject to create_disposition)
@ahmedabu98
Copy link
Contributor

Hey @tomlynchRNA, can you provide a stack trace of the error you're seeing here?

if the table is deleted while the pipeline is running after Beam having already created it, further inserts will error out with a 404 and not attempt to re-create it.

@tomlynchRNA
Copy link
Author

Hey @tomlynchRNA, can you provide a stack trace of the error you're seeing here?

if the table is deleted while the pipeline is running after Beam having already created it, further inserts will error out with a 404 and not attempt to re-create it.

stack trace, quite big
Error message from worker: generic::unknown: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1514, in process
    return self._flush_batch(destination)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1564, in _flush_batch
    ignore_unknown_values=self.ignore_unknown_columns)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1264, in insert_rows
    ignore_unknown_values=ignore_unknown_values)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 275, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 713, in _insert_all_rows
    timeout=BQ_STREAMING_INSERT_TIMEOUT_SEC)
  File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 3604, in insert_rows_json
    timeout=timeout,
  File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 759, in _call_api
    return call()
  File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 354, in retry_wrapped_func
    on_error=on_error,
  File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 191, in retry_target
    return target()
  File "/usr/local/lib/python3.7/site-packages/google/cloud/_http/__init__.py", line 494, in api_request
    raise exceptions.from_http_response(response)
google.api_core.exceptions.NotFound: 404 POST https://bigquery.googleapis.com/bigquery/v2/projects/REDACTED/datasets/REDACTED/tables/REDACTED/insertAll?prettyPrint=false: Not found: Table REDACTED

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 287, in _execute
    response = task()
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 360, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 634, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1001, in process_bundle
    element.timer_family_id, timer_data)
  File "apache_beam/runners/worker/operations.py", line 931, in apache_beam.runners.worker.operations.DoOperation.process_timer
  File "apache_beam/runners/common.py", line 1453, in apache_beam.runners.common.DoFnRunner.process_user_timer
  File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1450, in apache_beam.runners.common.DoFnRunner.process_user_timer
  File "apache_beam/runners/common.py", line 579, in apache_beam.runners.common.DoFnInvoker.invoke_user_timer
  File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1514, in process
    return self._flush_batch(destination)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1564, in _flush_batch
    ignore_unknown_values=self.ignore_unknown_columns)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1264, in insert_rows
    ignore_unknown_values=ignore_unknown_values)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 275, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 713, in _insert_all_rows
    timeout=BQ_STREAMING_INSERT_TIMEOUT_SEC)
  File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 3604, in insert_rows_json
    timeout=timeout,
  File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 759, in _call_api
    return call()
  File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 354, in retry_wrapped_func
    on_error=on_error,
  File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 191, in retry_target
    return target()
  File "/usr/local/lib/python3.7/site-packages/google/cloud/_http/__init__.py", line 494, in api_request
    raise exceptions.from_http_response(response)
google.api_core.exceptions.NotFound: 404 POST https://bigquery.googleapis.com/bigquery/v2/REDACTED/datasets/REDACTED/tables/REDACTED/insertAll?prettyPrint=false: Not found: Table REDACTED [while running 'Write Custom Vars to BQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)-ptransform-66']

@chamikaramj
Copy link
Contributor

chamikaramj commented Feb 21, 2023

I think in general, for Beam source/sink I/O, we assume read/write data store resources to not be deleted by third parties while the pipeline is running. Trying to add this as a feature to Beam I/O in general will probably will need a lot of re-work (even though we might might be able to fix for this instance).

Also, CREATE_IF_NEEDED I think (as it's defined to Beam BigQuery sink right now) means that tables will be created once per pipeline (not that tables will be re-created if they are deleted at any state of the pipeline).

@chamikaramj
Copy link
Contributor

chamikaramj commented Mar 9, 2023

That said, I'm OK with getting this fix in if we are clear that it does not modify the guarantees offered by CREATE_IF_NEEDED (i.e. the pipeline may still fail or get stuck if output tables get deleted by third parties during execution).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants