Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WriteToBigQuery in BundleBasedDirectRunner fails when method is FILE_LOADS #21061

Open
damccorm opened this issue Jun 4, 2022 · 2 comments
Open

Comments

@damccorm
Copy link
Contributor

damccorm commented Jun 4, 2022

WriteToBigQuery fails when using the FILE_LOADS method in the BundleBasedDirectRunner.

The issue appears to be in wait_for_bq_job, where the function expects job_reference to be an actual JobReference instance and not a string. However, the WaitForBQJobs DoFn appears to be passing a string as the argument. I believe this is during the copy step, and I'm not calling this code directly (so unfortunately I can't just pass a TableReference instance myself).

Here is a traceback:

 


request_worker_1      | ERROR:root:Traceback (most recent call last):
request_worker_1      |   File
"/app/main.py", line 209, in process_message
request_worker_1      |     construct_and_run_pipeline(request)
request_worker_1
     |   File "/app/main.py", line 190, in construct_and_run_pipeline
request_worker_1      |     return
result.wait_until_finish()
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py",
line 588, in wait_until_finish
request_worker_1      |     self._executor.await_completion()
request_worker_1
     |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line
433, in await_completion
request_worker_1      |     self._executor.await_completion()
request_worker_1
     |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line
482, in await_completion
request_worker_1      |     raise t(v).with_traceback(tb)
request_worker_1
     |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line
371, in call
request_worker_1      |     self.attempt_call(
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py",
line 414, in attempt_call
request_worker_1      |     evaluator.process_element(value)
request_worker_1
     |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/transform_evaluator.py",
line 880, in process_element
request_worker_1      |     self.runner.process(element)
request_worker_1
     |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 1225, in
process
request_worker_1      |     self._reraise_augmented(exn)
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py",
line 1306, in _reraise_augmented
request_worker_1      |     raise new_exn.with_traceback(tb)
request_worker_1
     |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 1223, in
process
request_worker_1      |     return self.do_fn_invoker.invoke_process(windowed_value)
request_worker_1
     |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 752, in invoke_process
request_worker_1
     |     self._invoke_process_per_window(
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py",
line 877, in _invoke_process_per_window
request_worker_1      |     self.process_method(*args_for_process),
request_worker_1
     |   File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line
730, in process
request_worker_1      |     self.bq_wrapper.wait_for_bq_job(ref, sleep_duration_sec=10,
max_retries=0)
request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py",
line 562, in wait_for_bq_job
request_worker_1      |     job_reference.projectId, job_reference.jobId,
job_reference.location)
request_worker_1      | AttributeError: 'str' object has no attribute 'projectId'
[while running 'write tweets to bigquery/Write/BigQueryBatchFileLoads/WaitForTempTableLoadJobs']

 

Here is the WriteToBigQuery step that is failing (note that the callable passed for table returns a TableReference instance):


WriteToBigQuery(
     table=lambda row: bigquery_tools.parse_table_reference(row["table_name"]),

    create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
     write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,

    ignore_insert_ids=True,
     method="FILE_LOADS", # using STREAMING_INSERTS 'fixes' the issue

    batch_size=int(os.getenv("BIGQUERY_BATCH_SIZE", "10")),
     schema=schema,
)

 

Note that this issue does not occur when using the standard DirectRunner, nor does it occur when using the STREAMING_INSERTS method.

Thanks! (And apologies if I left out any important information. This is the first issue I've opened here.)

Imported from Jira BEAM-12659. Original Jira may contain additional context.
Reported by: milesmcc.

@blazingbhavneek
Copy link

Hey there! 👋 I'm new to this repository and eager to contribute! 🌟 Could you kindly suggest some entry point or files to look into?

@damccorm
Copy link
Contributor Author

Hey, saw you added this comment several places. I'd recommend focusing on a single issue at first (I answered the underlying question here - #20298 (comment))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants