Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub to BigQuery Javascript UDF destroys attributes #69

Closed
mollyporph opened this issue Nov 3, 2019 · 4 comments
Closed

PubSub to BigQuery Javascript UDF destroys attributes #69

mollyporph opened this issue Nov 3, 2019 · 4 comments

Comments

@mollyporph
Copy link

mollyporph commented Nov 3, 2019

Hi,
We're using the PubSub Subscription to bigquery template. We have data in both PubSubMessage Attributes and the body. Our body contains an array without a field name i.e

[
 {"id": "item1"},
 {"id": "item2"}
]

Which the template had issues parsing, so we added a simple UDF

function process(str){
    var arrayOfItems = JSON.parse(str);
    var outObject = {items: arrayOfItems};
    return JSON.stringify(outObject);

When this template runs it seems like the attributes are discarded after the UDF step.

I'm not that well versed with BEAM but it seems that when the InvokeUDF step is built it's discarding everything but the message payload

 PCollectionTuple udfOut =
          input
              // Map the incoming messages into FailsafeElements so we can recover from failures
              // across multiple transforms.
              .apply("MapToRecord", ParDo.of(new PubsubMessageToFailsafeElementFn()))
              .apply(
                  "InvokeUDF",
                  FailsafeJavascriptUdf.<PubsubMessage>newBuilder()
                      .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                      .setFunctionName(options.getJavascriptTextTransformFunctionName())
                      .setSuccessTag(UDF_OUT)
                      .setFailureTag(UDF_DEADLETTER_OUT)
                      .build());

The PubsubMessageToFailsafeElementFn looks like this

 static class PubsubMessageToFailsafeElementFn
      extends DoFn<PubsubMessage, FailsafeElement<PubsubMessage, String>> {
    @ProcessElement
    public void processElement(ProcessContext context) {
      PubsubMessage message = context.element();
      context.output(
          FailsafeElement.of(message, new String(message.getPayload(), StandardCharsets.UTF_8)));
    }
  }

It seems to call message.getPlayload which would probably cause the issue.

So my question is: Am I doing something wrong, is there some way of getting both the attributes and the payload through the UDF? Or do I have to modify the java template?

Thanks in advance!

@Joao-Lamesa
Copy link

I am currently working in the same issue.

I customized the PubsubMessageToFailsafeElementFn() method. I just added a few lines to add the attribute map to the main payload.

Hence, for me using the javascript udf woudn´t bring any gains, because we have to customize the failsafe element anyway. SO I might as well stick to it and make the transforms in that method itself. And I don´t have to pass and write udf again.

I hope it helps to clarify your work.

@dc185333
Copy link

dc185333 commented Jun 9, 2022

Hi @Joao-Lamesa could you share what changes you made in order to add the attribute map to the main payload? We're running into a similar issue with being unable to access the attributes in the UDF and it seems like it would make more sense to just process the extension in the template instead of using a UDF at this point.

Copy link

This issue has been marked as stale due to 180 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the issue at any time. Thank you for your contributions.

@github-actions github-actions bot added the stale label Jun 11, 2024
Copy link

This issue has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

shreyakhajanchi pushed a commit that referenced this issue Jan 23, 2025
* Dml integration (#53)

* Added extensive UT

Added extensive UT

* Cassandra pr bug fixes (#57)

* Cassandra Consolidate Unit Test case and Regression testing fixes (#58)

* Added Mapping fixes

* Added Spoltles fixes

* Added Consolidated fixes

* Added TODO

* Addess Data and Time

* Cassandra pr bug fixes (#64)

* Handle TypeHandler Parsing issue fixes (#65)

Co-authored-by: pawankashyapollion <v-pawan.kumar@ollion.com>

* Added Safe handle (#68)

* Handle LocalTime For Time Data Type In Cassandra (#69)

* Cassandra pr bug fixes (#70)

* Handle Timestamp Fixes (#72)

* Added Code Combined in a single way

* Address The Unwanted Hop

* Cassandra pr bug fixes (#75)

* Added PR Review Comments

* Remove NamesCol Dependecy as spannerTableName is same as In Given Mapping

* Added spannerTableId for fetching Mapping

* Removed SpannerToID and also Updated Session file with proper structure

* Timestamp in milisecond

* removed assertNotNull from UT wherever possible

* Added Fixes

* Added Note Instead of Question

* -- review fixes (#78)

* Added Bytes to hex to blob conversion

* Handling Bytes as Binary encoded As of now

* Passing Null Value to Primary Key as well for cassandra

* Added UT fixes

* Added UT refectoring

* Reverse merge confict fixes

---------

Co-authored-by: pawankashyapollion <v-pawan.kumar@ollion.com>
Co-authored-by: Akash Thawait <aakash@ollion.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants