Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery: Upload pandas DataFrame containing arrays #19

Closed
AETDDraper opened this issue Jul 1, 2019 · 21 comments · Fixed by #980
Closed

BigQuery: Upload pandas DataFrame containing arrays #19

AETDDraper opened this issue Jul 1, 2019 · 21 comments · Fixed by #980
Assignees
Labels
api: bigquery Issues related to the googleapis/python-bigquery API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@AETDDraper
Copy link

The support for python Bigquery API indicates that arrays are possible, however, when passing from a pandas dataframe to bigquery there is a pyarrow struct issue.

The only way round it seems its to drop columns then use JSON Normalise for a separate table.

from google.cloud import bigquery

project = 'lake'
client = bigquery.Client(credentials=credentials, project=project)
dataset_ref = client.dataset('XXX')
table_ref = dataset_ref.table('RAW_XXX')
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.write_disposition = 'WRITE_TRUNCATE'

client.load_table_from_dataframe(appended_data, table_ref,job_config=job_config).result()

This is the error recieved. NotImplementedError: struct

The reason I wanted to use this API as it indicates Nested Array support, which is perfect for our data lake in BQ but I assume this doesn't work?

@tswast
Copy link
Contributor

tswast commented Jul 1, 2019

Thanks for the report. Struct support is reported at https://github.com/googleapis/google-cloud-python/issues/8191, but I'll keep this issue open as a feature request for arrays of scalar types.

@tswast tswast changed the title Arrays not supported in Bigquery Python API BigQuery: Upload pandas DataFrame containing arrays Jul 1, 2019
@sumit-ql
Copy link

I am not able to repro this issue with dataframe containing arrays only. Its not giving any exception.

i can see an exception when we use dict(STRUCT/RECORD) in dataframe.

@AETDDraper Could you please provide a sample dataframe containing arrays which causes the error.

google-cloud-bigquery 1.20.0
pyarrow 0.14.0
pandas 0.25.1

@plamut plamut transferred this issue from googleapis/google-cloud-python Feb 4, 2020
@product-auto-label product-auto-label bot added the api: bigquery Issues related to the googleapis/python-bigquery API. label Feb 4, 2020
@plamut plamut added the type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. label Feb 4, 2020
@JonathanBonnaud
Copy link

Loading a dataframe containing an array (a Python type List) does work if you don't specific the schema in the LoadJobConfig() and just let it detect it with autodetect=True. You just get a warning from pyarrow: UserWarning: Pyarrow could not determine the type of columns: COLUMN_NAME. ", ".join(field.name for field in unknown_type_fields)

df = pd.DataFrame([{'uuid':'test1', 'class':['cls1', 'cls1']}])
job_config = bigquery.LoadJobConfig(
	autodetect=True
)

bqclient.load_table_from_dataframe(
	df,
	"DATASET.TABLE",
	job_config=job_config
)

The detected schema looks like this:

field_name type mode
class RECORD NULLABLE
class.list RECORD REPEATED
class.list.item STRING NULLABLE

There is no way to provide a schema with the ARRAY type though, which is a bit frustrating. Or even RECORD or STRUCT type.

My config:

google-cloud-bigquery==1.24.0
pandas==1.0.1
pyarrow==0.16.0

@plamut
Copy link
Contributor

plamut commented Apr 2, 2020

There is no way to provide a schema with the ARRAY type though, which is a bit frustrating. Or even RECORD or STRUCT type.

FWIW, uploading STRUCT columns through a dataframe will become possible when a fix in the pyarrow dependency will be released. I tried that with the development pyarrow version and it worked.

At least uploading REPEATED fields (even those consisting of STRUCTs) works with the load_table_from_json() method, if that workaround is feasible. I managed to successfully upload the following data with it:

schema = [
    bigquery.SchemaField(
        "bar",
        "STRUCT",
        fields=[
            bigquery.SchemaField("aaa", "INTEGER", mode="REQUIRED"),
            bigquery.SchemaField("bbb", "INTEGER", mode="REQUIRED"),
        ],
        mode="REPEATED",
    ),
]

json_data = [
    {"bar": [{"aaa": 1, "bbb": 2}, {"aaa": 10, "bbb": 20}]},
    {"bar": [{"aaa": 3, "bbb": 4}, {"aaa": 5, "bbb": 6}]},
]

job_config = bigquery.LoadJobConfig(schema=schema)
client.load_table_from_json(json_data, table_ref, job_config=job_config).result()

@aaaaahaaaaa
Copy link

aaaaahaaaaa commented May 20, 2020

As far as I can tell, the use case of loading a dataframe that contains an array (no struct) doesn't work even with the new version of Pyarrow (0.17.1).

Dataframe looks like this:

    field_A        field_B
0   1000135057     [-0.04591, -0.00742, ...
1   1100299862     [-0.15857, -0.17590, ...
2   1100174775     [0.075828, -0.1888, ...
3   1300002087     [0.097694, 0.06120, ...
4   1100190614     [0.097450, -0.10271, ...

Loading the dataframe:

schema = [
    bigquery.SchemaField("field_A", "INTEGER"),
    bigquery.SchemaField("field_B", "FLOAT", "REPEATED"),
]

table = bigquery.Table(table_id, schema=schema)
bq_client.create_table(table, exists_ok=True)

job_config = bigquery.LoadJobConfig(
    schema=schema,
    destination=table_id,
    write_disposition="WRITE_TRUNCATE",
)
job = bq_client.load_table_from_dataframe(data, table_id, job_config=job_config)
job.result()

Produces:

google.api_core.exceptions.BadRequest: 400 Incompatible type for field 'field_B'. Expected: 'FLOAT64', actual: 'STRUCT'.

@HemangChothani
Copy link
Contributor

@aaaaahaaaaa A use case of loading a dataframe that contains an array is not supported by bigquery and raised an error Field is specified as REPEATED in provided schema which does not match REQUIRED as specified in the file, it is due to dataframe to parquest file conversion and for more information you can see #17 old issue related to that.

Parquet limitation note for REPEATED fields has added and you could found in documentation of load_table_from_dataframe when BigQuery's 1.26.0 version will release.

Instead of using load_table_from_dataframe you can use load_table_from_json

example:

    from google.cloud import bigquery
    import pandas
    client = bigquery.Client()

    schema = [bigquery.SchemaField("nested_repeated", "INTEGER", mode="REPEATED")]
    job_config = bigquery.LoadJobConfig(schema=schema)
    data = [{"nested_repeated": record}]
   
    client.load_table_from_json(data, "table_id", job_config=job_config).result()

@abuckenheimer
Copy link

So just to confirm my read here, am I correct in saying:

There is no way to use a REPEATED type with the load_table_from_dataframe function right now. This is due to how parquet serializes dataframes which is ultimately how this library serializes frames for transfer to bigquery. The current workaround is to use load_table_from_json. I put together a test to showcase how different methods breakdown here (run with code at master af2c987):

import sys

import pytest
import typing
import pandas as pd
from google.cloud.bigquery import Client, schema, job

T = typing.TypeVar("T")


def construct_repeated(
    lst: typing.List[T],
) -> typing.Mapping[str, typing.List[typing.Mapping[str, T]]]:
    """ when bigquery reads parquet arrays it ultimately structures the schema to look like this struct<list array<struct<item i>>>  """
    return {"list": [{"item": obj} for obj in lst]}


def destruct_repeated(
    dct: typing.Mapping[str, typing.List[typing.Mapping[str, T]]]
) -> typing.List[T]:
    return [d["item"] for d in dct["list"]]


def apply_col(
    col: str, callable: typing.Callable[[T], typing.Any]
) -> typing.Callable[[pd.DataFrame], pd.DataFrame]:
    def wrapper(df: pd.DataFrame) -> pd.DataFrame:
        df[col] = df[col].apply(callable)
        return df

    return wrapper


repeated_df = pd.DataFrame({"id": ["a", "b"], "targets": [[1], [2, 3]]})

repeated_schema = [
    schema.SchemaField("id", "STRING", "NULLABLE", None, (), None),
    schema.SchemaField("targets", "INTEGER", "REPEATED", None, (), None),
]

nested_df = repeated_df.copy()
nested_df["targets"] = nested_df["targets"].apply(construct_repeated)

nested_schema = [
    schema.SchemaField("id", "STRING", "NULLABLE", None, (), None),
    schema.SchemaField(
        "targets",
        "RECORD",
        "NULLABLE",
        None,
        (
            schema.SchemaField(
                "list",
                "RECORD",
                "REPEATED",
                None,
                (schema.SchemaField("item", "INTEGER", "NULLABLE", None, (), None),),
                None,
            ),
        ),
        None,
    ),
]


@pytest.fixture
def client():
    client = Client()
    yield client
    client.close()


@pytest.fixture
def dataset(client):
    d = client.create_dataset("test_dataset")
    yield d
    client.delete_dataset(d)


@pytest.fixture
def table(client, dataset):
    table = dataset.table("test_table")
    yield table
    client.delete_table(table, not_found_ok=True)


@pytest.mark.parametrize(
    "df, schema, post",
    [
        # FAILS WITH:
        # google.api_core.exceptions.BadRequest: 400 Error while reading data,
        # error message: Provided schema is not compatible with the file
        # 'prod-scotty-81de9f48-1a4f-424e-be99-2258b706fc46'. Field 'targets' is
        # specified as REPEATED in provided schema which does not match REQUIRED as
        # specified in the file.
        (repeated_df, repeated_schema, None),
        # FAILS comparing `df.to_dict() == got_back.to_dict()`:
        # doesn't upload structs properly, got_back ->
        # id targets
        # 0  a    None
        # 1  b    None
        (nested_df, nested_schema, None),
    ],
)
def test_roundtrip_from_dataframe(df, schema, post, client, table):
    client.load_table_from_dataframe(
        df, table, job_config=job.LoadJobConfig(schema=schema)
    ).result()

    got_back = client.list_rows(table).to_dataframe()
    if post is not None:
        got_back = post(got_back)

    assert df.to_dict() == got_back.to_dict()


@pytest.mark.parametrize(
    "df, schema, post",
    [
        # FAILS WITH:
        # google.api_core.exceptions.BadRequest: 400 Error while reading data,
        # error message: Provided schema is not compatible with the file
        # 'prod-scotty-81de9f48-1a4f-424e-be99-2258b706fc46'. Field 'targets' is
        # specified as REPEATED in provided schema which does not match REQUIRED as
        # specified in the file.
        (repeated_df, repeated_schema, None),
        # FAILS comparing `df.to_dict() == got_back.to_dict()`:
        # doesn't upload structs properly, got_back ->
        # id targets
        # 0  a    None
        # 1  b    None
        (nested_df, nested_schema, None),
    ],
)
def test_roundtrip_from_dataframe_existing_table(df, schema, post, client, table):
    table.schema = schema
    client.create_table(table)
    client.load_table_from_dataframe(df, table).result()

    got_back = client.list_rows(table).to_dataframe()
    if post is not None:
        got_back = post(got_back)

    assert df.to_dict() == got_back.to_dict()


@pytest.mark.parametrize(
    "df, schema, post",
    [
        # works but effectively applies construct_repeated on targets elements
        (repeated_df, repeated_schema, apply_col("targets", destruct_repeated)),
    ],
)
def test_roundtrip_from_dataframe_no_schema(df, schema, post, client, table):
    client.load_table_from_dataframe(df, table).result()
    got_back = client.list_rows(table).to_dataframe()

    if post is not None:
        got_back = post(got_back)

    assert df.to_dict() == got_back.to_dict()


@pytest.mark.parametrize(
    "df, schema, post",
    [
        # to_dataframe here returns an `np.array` type for lists which you can't compare
        # with a `list` type so we convert them for comparison
        (repeated_df, repeated_schema, apply_col("targets", list)),
        (nested_df, nested_schema, apply_col("targets", lambda d: {"list": list(d['list'])})),
    ],
)
def test_roundtrip_from_json(df, schema, post, client, table):
    client.load_table_from_json(
        df.to_dict(orient="records"), table, job_config=job.LoadJobConfig(schema=schema)
    ).result()

    got_back = client.list_rows(table).to_dataframe()
    if post is not None:
        got_back = post(got_back)

    assert df.to_dict() == got_back.to_dict()


if __name__ == "__main__":
    pytest.main([__file__, *sys.argv[1:]])

@tswast
Copy link
Contributor

tswast commented Nov 2, 2020

Pyarrow 2.0 released with improvements to the Parquet serialization. We should revisit to see if this issue can be resolved with pyarrow 2.0

@HemangChothani
Copy link
Contributor

@tswast I have tried with these two following examples and it's working fine.

Example 1:

import pandas as pd
import numpy as np
from google.cloud import bigquery

client = bigquery.Client()

job_config = bigquery.LoadJobConfig(autodetect=True)
table_id = bigquery.Table('table') 
table_id = client.create_table(table, exists_ok=True)
df = pd.DataFrame({'A': [np.array([1,2,3]), np.array([4,5,6]), np.array([7,8,9])]})
job = client.load_table_from_dataframe(df,"table_id",job_config=job_config).result()

Example 2:

table_id_1 = bigquery.Table('table'_1) 
table_id_1= client.create_table(table, exists_ok=True)
job_config = bigquery.LoadJobConfig(autodetect=True)
df = pd.DataFrame([{'uuid':'test1', 'class':[['cls1','class2'], ['cls1','class3']]}])
job = client.load_table_from_dataframe(df,"table_id_1",job_config=job_config).result()

@tswast
Copy link
Contributor

tswast commented Nov 3, 2020

Let's add some system test cases and/or code samples for this (which are skipped unless we have pyarrow >=2.0) before we close this issue out.

I imagine it'd be useful to have samples which show more complex dataframes in the following docs:

@tswast
Copy link
Contributor

tswast commented Nov 4, 2020

Based on the schema in #365, this feature is not yet supported. It still serializes to a strange format with an "item" column.

@emkornfield
Copy link

Based on the schema in #365, this feature is not yet supported. It still serializes to a strange format with an "item" column.

We should look into how to integrate: --parquet_enable_list_inference from BQ CLI

@judahrand
Copy link
Contributor

judahrand commented Sep 20, 2021

Based on the schema in #365, this feature is not yet supported. It still serializes to a strange format with an "item" column.

We should look into how to integrate: --parquet_enable_list_inference from BQ CLI

We can already use that flag:

parquet_options = bigquery.format_options.ParquetOptions()
parquet_options.enable_list_inference = True
job_config = bigquery.LoadJobConfig()
job_config.parquet_options = parquet_options
job = client.load_table_from_dataframe(
    df, TABLE_NAME, job_config=job_config
)

@bnaul
Copy link
Contributor

bnaul commented Sep 20, 2021

^ this is true but it only partly resolves this issue or at least the specific complaint "It still serializes to a strange format with an "item" column.")

client.load_table_from_dataframe(
    pd.DataFrame({"a": [[1, 2, 3], [3, 4, 5]]}), "tmp.repeated", job_config=job_config
)
client.list_rows(job.destination).to_dataframe()

enable_list_inference=False:

                                                   a
0  {'list': [{'item': 1}, {'item': 2}, {'item': 3}]}
1  {'list': [{'item': 3}, {'item': 4}, {'item': 5}]}

enable_list_inference=True:

                                         a
0  [{'item': 1}, {'item': 2}, {'item': 3}]
1  [{'item': 3}, {'item': 4}, {'item': 5}]

@emkornfield
Copy link

I think the probably has to do with implied nullability of the parquet schema (we would need to infer and remove nullness I think).

@judahrand
Copy link
Contributor

judahrand commented Sep 20, 2021

This works as a workaround:

df = pd.DataFrame([[[1,2,3]], [[4,5,6]]], columns=('int64_array',))

writer = pyarrow.BufferOutputStream()
pyarrow.parquet.write_table(
    pyarrow.Table.from_pandas(df),
    writer,
    use_compliant_nested_type=True
)
reader = pyarrow.BufferReader(writer.getvalue())

client = bigquery.Client()
parquet_options = bigquery.format_options.ParquetOptions()
parquet_options.enable_list_inference = True
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.PARQUET
job_config.parquet_options = parquet_options

job = client.load_table_from_file(
    reader, TABLE_NAME, job_config=job_config
)

I promise it does for realz this time.

@emkornfield
Copy link

@judahrand so the fix is using use_compliant_nested_type=True?

@judahrand
Copy link
Contributor

@judahrand so the fix is using use_compliant_nested_type=True?

use_compliant_nested_type=True AND parquet_options.enable_list_inference = True.

Both are needed.

@bnaul
Copy link
Contributor

bnaul commented Sep 21, 2021

This is great, thanks @judahrand! Since as you point out the list_inference param is already exposed, I guess all that's needed to close this ticket is adding use_compliant_nested_type=True in

def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SNAPPY"):
"""Write dataframe as a Parquet file, according to the desired BQ schema.
This function requires the :mod:`pyarrow` package. Arrow is used as an
intermediate format.
Args:
dataframe (pandas.DataFrame):
DataFrame to convert to Parquet file.
bq_schema (Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
Desired BigQuery schema. Number of columns must match number of
columns in the DataFrame.
filepath (str):
Path to write Parquet file to.
parquet_compression (Optional[str]):
The compression codec to use by the the ``pyarrow.parquet.write_table``
serializing method. Defaults to "SNAPPY".
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
"""
pyarrow = _helpers.PYARROW_VERSIONS.try_import(raise_if_error=True)
import pyarrow.parquet
bq_schema = schema._to_schema_fields(bq_schema)
arrow_table = dataframe_to_arrow(dataframe, bq_schema)
pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression)
or allowing a way to configure it, depending on how breaking the change to that default would be

@judahrand
Copy link
Contributor

Yeah, exposing it somewhere would be the best - maybe don't change the default just document it 😛

Only took 2+ years for an answer 🤣

@judahrand
Copy link
Contributor

I'm happy to look into this if no one else fancies.

gcf-merge-on-green bot pushed a commit that referenced this issue Oct 7, 2021
…#980)

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)

Fixes #19 🦕
abdelmegahedgoogle pushed a commit to abdelmegahedgoogle/python-bigquery that referenced this issue Apr 17, 2023
…googleapis#980)

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)

Fixes googleapis#19 🦕
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquery Issues related to the googleapis/python-bigquery API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet