Skip to content

Commit b22c10f

Browse files
committed
Update version to 0.9.3.16 and replace shrink_large_string with convert_large_types_to_normal in schema handling
1 parent d4f310c commit b22c10f

File tree

6 files changed

+84
-56
lines changed

6 files changed

+84
-56
lines changed

pydala/dataset.py

+7-14
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from .io import Writer
2121
from .metadata import ParquetDatasetMetadata, PydalaDatasetMetadata
2222
from .schema import replace_schema # from .optimize import Optimize
23-
from .schema import shrink_large_string
23+
from .schema import convert_large_types_to_normal
2424
from .table import PydalaTable
2525

2626

@@ -630,7 +630,7 @@ def write_to_dataset(
630630
ts_unit: str = "us",
631631
tz: str | None = None,
632632
remove_tz: bool = False,
633-
use_large_string: bool = False,
633+
# use_large_string: bool = False,
634634
delta_subset: str | list[str] | None = None,
635635
alter_schema: bool = False,
636636
timestamp_column: str | None = None,
@@ -662,7 +662,6 @@ def write_to_dataset(
662662
- ts_unit: The unit of the timestamp column. Defaults to "us".
663663
- tz: The timezone to be used for the timestamp column. Defaults to None.
664664
- remove_tz: Whether to remove the timezone information from the timestamp column. Defaults to False.
665-
- use_large_string: Whether to use large string type for string columns. Defaults to False.
666665
- delta_subset: The subset of columns to consider for delta updates. Can be a string, a list of strings, or
667666
None. Defaults to None.
668667
- alter_schema: Whether to alter the schema of the dataset. Defaults to False.
@@ -711,7 +710,7 @@ def write_to_dataset(
711710
)
712711

713712
writer.cast_schema(
714-
use_large_string=use_large_string,
713+
# use_large_string=use_large_string,
715714
ts_unit=ts_unit,
716715
tz=tz,
717716
remove_tz=remove_tz,
@@ -819,7 +818,7 @@ def load(
819818
schema: pa.Schema | None = None,
820819
ts_unit: str = "us",
821820
tz: str | None = None,
822-
use_large_string: bool = False,
821+
# use_large_types: bool = False,
823822
format_version: str = "2.6",
824823
verbose: bool = False,
825824
**kwargs,
@@ -833,7 +832,6 @@ def load(
833832
schema (pa.Schema | None, optional): The schema of the data. Defaults to None.
834833
ts_unit (str, optional): The unit of the timestamp. Defaults to "us".
835834
tz (str | None, optional): The timezone. Defaults to None.
836-
use_large_string (bool, optional): Whether to use large string. Defaults to False.
837835
format_version (str, optional): The version of the data format. Defaults to "2.6".
838836
**kwargs: Additional keyword arguments.
839837
@@ -856,7 +854,6 @@ def load(
856854
schema=schema,
857855
ts_unit=ts_unit,
858856
tz=tz,
859-
use_large_string=use_large_string,
860857
format_version=format_version,
861858
verbose=verbose,
862859
**kwargs,
@@ -974,7 +971,6 @@ def write_to_dataset(
974971
ts_unit: str = "us",
975972
tz: str | None = None,
976973
remove_tz: bool = False,
977-
use_large_string: bool = False,
978974
delta_subset: str | list[str] | None = None,
979975
update_metadata: bool = False,
980976
alter_schema: bool = False,
@@ -1007,7 +1003,6 @@ def write_to_dataset(
10071003
- ts_unit: The unit of the timestamp column. Defaults to "us".
10081004
- tz: The timezone to be used for the timestamp column. Defaults to None.
10091005
- remove_tz: Whether to remove the timezone information from the timestamp column. Defaults to False.
1010-
- use_large_string: Whether to use large string type for string columns. Defaults to False.
10111006
- delta_subset: The subset of columns to consider for delta updates. Can be a string, a list of strings, or
10121007
None. Defaults to None.
10131008
- update_metadata: Whether to update the metadata table after writing. Defaults to False.
@@ -1099,7 +1094,6 @@ def write_to_dataset(
10991094
ts_unit=ts_unit,
11001095
tz=tz,
11011096
remove_tz=remove_tz,
1102-
use_large_string=use_large_string,
11031097
delta_subset=delta_subset,
11041098
alter_schema=alter_schema,
11051099
timestamp_column=timestamp_column,
@@ -1560,7 +1554,6 @@ def optimize_dtypes(
15601554
include: str | list[str] | None = None,
15611555
ts_unit: str | None = None, # "us",
15621556
tz: str | None = None,
1563-
use_large_string: bool = False,
15641557
infer_schema_size: int = 10_000,
15651558
**kwargs,
15661559
):
@@ -1572,8 +1565,8 @@ def optimize_dtypes(
15721565
.to_arrow()
15731566
.schema
15741567
)
1575-
if not use_large_string:
1576-
optimized_schema = shrink_large_string(optimized_schema)
1568+
1569+
optimized_schema = convert_large_types_to_normal(optimized_schema)
15771570

15781571
for file_path in tqdm.tqdm(self.files):
15791572
self._optimize_dtypes(
@@ -1584,7 +1577,7 @@ def optimize_dtypes(
15841577
include=include,
15851578
ts_unit=ts_unit,
15861579
tz=tz,
1587-
use_large_string=use_large_string,
1580+
# use_large_string=use_large_string,
15881581
**kwargs,
15891582
)
15901583

pydala/filesystem.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from loguru import logger
2525

2626
from .helpers.misc import read_table, run_parallel
27-
from .schema import shrink_large_string
27+
from .schema import convert_large_types_to_normal
2828

2929

3030
def get_credentials_from_fssspec(fs: AbstractFileSystem) -> dict[str, str]:
@@ -452,7 +452,7 @@ def write_parquet(
452452
) -> None:
453453
if isinstance(data, pl.DataFrame):
454454
data = data.to_arrow()
455-
data = data.cast(shrink_large_string(data.schema))
455+
data = data.cast(convert_large_types_to_normal(data.schema))
456456
elif isinstance(data, pd.DataFrame):
457457
data = pa.Table.from_pandas(data, preserve_index=False)
458458
elif isinstance(data, ddb.DuckDBPyRelation):
@@ -468,7 +468,7 @@ def write_json(
468468
) -> None:
469469
if isinstance(data, pl.DataFrame):
470470
data = data.to_arrow()
471-
data = data.cast(shrink_large_string(data.schema)).to_pydict()
471+
data = data.cast(convert_large_types_to_normal(data.schema)).to_pydict()
472472
elif isinstance(data, pd.DataFrame):
473473
data = pa.Table.from_pandas(data, preserve_index=False).to_pydict()
474474
elif isinstance(data, ddb.DuckDBPyRelation):
@@ -523,7 +523,7 @@ def write_to_pyarrow_dataset(
523523

524524
if isinstance(data[0], pl.DataFrame):
525525
data = [dd.to_arrow() for dd in data]
526-
data = [dd.cast(shrink_large_string(dd.schema)) for dd in data]
526+
data = [dd.cast(convert_large_types_to_normal(dd.schema)) for dd in data]
527527

528528
elif isinstance(data[0], pd.DataFrame):
529529
data = [pa.Table.from_pandas(dd, preserve_index=False) for dd in data]

pydala/io.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from .filesystem import clear_cache
1818
from .helpers.datetime import get_timestamp_column
1919
from .helpers.polars import pl
20-
from .schema import convert_timestamp, replace_schema, shrink_large_string
20+
from .schema import convert_timestamp, replace_schema, convert_large_types_to_normal
2121
from .table import PydalaTable
2222

2323

@@ -270,7 +270,7 @@ def cast_schema(
270270
self._set_schema()
271271
self._use_large_string = use_large_string
272272
if not use_large_string:
273-
self.schema = shrink_large_string(self.schema)
273+
self.schema = convert_large_types_to_normal(self.schema)
274274

275275
if tz is not None or ts_unit is not None or remove_tz:
276276
self.schema = convert_timestamp(

pydala/metadata.py

+18-18
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
# from .helpers.metadata import collect_parquet_metadata # , remove_from_metadata
1818
from .helpers.misc import get_partitions_from_path, run_parallel
19-
from .schema import repair_schema # unify_schemas
19+
from .schema import repair_schema, convert_large_types_to_normal # unify_schemas
2020

2121

2222
def collect_parquet_metadata(
@@ -308,7 +308,12 @@ def update_file_metadata(
308308
new_files += files
309309

310310
if new_files:
311+
if verbose:
312+
logger.info(f"Collecting metadata for {len(new_files)} new files.")
311313
self._collect_file_metadata(files=new_files, verbose=verbose, **kwargs)
314+
else:
315+
if verbose:
316+
logger.info("No new files to collect metadata for.")
312317

313318
if rm_files:
314319
self._rm_file_metadata(files=rm_files)
@@ -341,10 +346,6 @@ def reset(self):
341346
def _get_unified_schema(
342347
self,
343348
verbose: bool = False,
344-
# ts_unit: str | None = None,
345-
# tz: str | None = None,
346-
# use_large_string: bool = False,
347-
# sort: bool | list[str] = False,
348349
) -> tuple[pa.Schema, bool]:
349350
"""
350351
Returns the unified schema for the dataset.
@@ -370,10 +371,14 @@ def _get_unified_schema(
370371
if self.has_metadata:
371372
schemas.insert(0, self.metadata.schema.to_arrow_schema())
372373

373-
unified_schema = pa.unify_schemas(schemas, promote_options="permissive")
374+
unified_schema = convert_large_types_to_normal(
375+
pa.unify_schemas(schemas, promote_options="permissive")
376+
)
374377
schemas_equal = all([unified_schema == schema for schema in schemas])
375378
else:
376-
unified_schema = self.metadata.schema.to_arrow_schema()
379+
unified_schema = convert_large_types_to_normal(
380+
self.metadata.schema.to_arrow_schema()
381+
)
377382
schemas_equal = True
378383
if verbose:
379384
logger.info(f"Schema is equal: {schemas_equal}")
@@ -386,8 +391,6 @@ def _repair_file_schemas(
386391
format_version: str | None = None,
387392
tz: str | None = None,
388393
ts_unit: str | None = None,
389-
use_large_string: bool = False,
390-
# sort: bool | list[str] = False,
391394
alter_schema: bool = True,
392395
verbose: bool = False,
393396
**kwargs,
@@ -402,8 +405,6 @@ def _repair_file_schemas(
402405
the format version from the metadata will be used. Defaults to None.
403406
tz (str | None, optional): The timezone to use for repairing the files. Defaults to None.
404407
ts_unit (str | None, optional): The timestamp unit to use for repairing the files. Defaults to None.
405-
use_large_string (bool, optional): Whether to use large string type for repairing the files.
406-
Defaults to False.
407408
alter_schema (bool, optional): Whether to alter the schema of the files. Defaults to True.
408409
**kwargs: Additional keyword arguments to pass to the repair_schema function.
409410
@@ -451,7 +452,6 @@ def _repair_file_schemas(
451452
version=format_version,
452453
ts_unit=ts_unit,
453454
tz=tz,
454-
use_large_string=use_large_string,
455455
alter_schema=alter_schema,
456456
**kwargs,
457457
)
@@ -495,8 +495,8 @@ def _update_metadata(self, reload: bool = False, verbose: bool = False, **kwargs
495495
(set(self.files_in_file_metadata) - set(self.files_in_metadata))
496496
)
497497
if verbose:
498-
logger.info("Number of files to remove: ", len(rm_files))
499-
logger.info("Number of files to add: ", len(new_files))
498+
logger.info(f"Number of files to remove: {len(rm_files)}")
499+
logger.info(f"Number of files to add: {len(new_files)}")
500500
if len(rm_files) or (len(new_files) and not self.has_metadata) or reload:
501501
if verbose:
502502
logger.info("Updateing metadata: Rewrite metadata from file metadata")
@@ -507,12 +507,15 @@ def _update_metadata(self, reload: bool = False, verbose: bool = False, **kwargs
507507
for f in self.files_in_file_metadata[1:]:
508508
self._metadata.append_row_groups(self._file_metadata[f])
509509

510-
else:
510+
elif len(new_files):
511511
if verbose:
512512
logger.info("Updateing metadata: Append new file metadata")
513513

514514
for f in new_files:
515515
self._metadata.append_row_groups(self.file_metadata[f])
516+
else:
517+
if verbose:
518+
logger.info("Updateing metadata: No changes")
516519

517520
self._write_metadata_file()
518521
self.load_files()
@@ -523,7 +526,6 @@ def update(
523526
schema: pa.Schema | None = None,
524527
ts_unit: str | None = None,
525528
tz: str | None = None,
526-
use_large_string: bool = False,
527529
format_version: str | None = None,
528530
# sort: bool | list[str] = False,
529531
verbose: bool = False,
@@ -537,7 +539,6 @@ def update(
537539
schema (pa.Schema | None): The schema of the data source.
538540
ts_unit (str | None): The unit of the timestamp.
539541
tz (str | None): The time zone of the data source.
540-
use_large_string (bool): Flag to indicate whether to use large string type.
541542
format_version (str | None): The version of the data format.
542543
**kwargs: Additional keyword arguments.
543544
@@ -559,7 +560,6 @@ def update(
559560
format_version=format_version,
560561
tz=tz,
561562
ts_unit=ts_unit,
562-
use_large_string=use_large_string,
563563
verbose=verbose,
564564
# sort=sort,
565565
)

pydala/schema.py

+52-17
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,57 @@ def sort_schema(schema: pa.Schema, names: list[str] | None = None) -> pa.Schema:
2929
)
3030

3131

32-
def shrink_large_string(schema: pa.Schema) -> pa.Schema:
33-
"""Convert all large_string types to string in a pyarrow.schema.
32+
# def shrink_large_string(schema: pa.Schema) -> pa.Schema:
33+
# """Convert all large_string types to string in a pyarrow.schema.
3434

35-
Args:
36-
schema (pa.Schema): pyarrow schema
35+
# Args:
36+
# schema (pa.Schema): pyarrow schema
3737

38-
Returns:
39-
pa.Schema: converted pyarrow.schema
40-
"""
41-
return pa.schema(
42-
[
43-
(n, pa.utf8()) if t == pa.large_string() else (n, t)
44-
for n, t in list(zip(schema.names, schema.types))
45-
]
46-
)
38+
# Returns:
39+
# pa.Schema: converted pyarrow.schema
40+
# """
41+
# return pa.schema(
42+
# [
43+
# (n, pa.utf8()) if t == pa.large_string() else (n, t)
44+
# for n, t in list(zip(schema.names, schema.types))
45+
# ]
46+
# )
47+
48+
49+
def convert_large_types_to_normal(schema: pa.Schema) -> pa.Schema:
50+
# Define mapping of large types to standard types
51+
type_mapping = {
52+
pa.large_string(): pa.string(),
53+
pa.large_binary(): pa.binary(),
54+
pa.large_list(pa.null()): pa.list_(pa.null()),
55+
}
56+
57+
# Convert fields
58+
new_fields = []
59+
for field in schema:
60+
field_type = field.type
61+
# Check if type exists in mapping
62+
if field_type in type_mapping:
63+
new_field = pa.field(
64+
name=field.name,
65+
type=type_mapping[field_type],
66+
nullable=field.nullable,
67+
metadata=field.metadata,
68+
)
69+
new_fields.append(new_field)
70+
# Handle large lists with nested types
71+
elif isinstance(field_type, pa.LargeListType):
72+
new_field = pa.field(
73+
name=field.name,
74+
type=pa.list_(field_type.value_type),
75+
nullable=field.nullable,
76+
metadata=field.metadata,
77+
)
78+
new_fields.append(new_field)
79+
else:
80+
new_fields.append(field)
81+
82+
return pa.schema(new_fields)
4783

4884

4985
def convert_timestamp(
@@ -449,7 +485,7 @@ def repair_schema(
449485
verbose: bool = True,
450486
ts_unit: str | None = None, # "us",
451487
tz: str | None = None,
452-
use_large_string: bool = False,
488+
# use_large_types: bool = False,
453489
# sort: bool | list[str] = False,
454490
alter_schema: bool = True,
455491
**kwargs,
@@ -466,7 +502,6 @@ def repair_schema(
466502
verbose (bool, optional): Wheter to show the task progress using tqdm or not. Defaults to True.
467503
ts_unit (str|None): timestamp unit.
468504
tz (str|None): timezone for timestamp fields. Defaults to "UTC".
469-
use_large_string (bool): Convert pyarrow.large_string() to pyarrow.string().
470505
**kwargs: Additional keyword arguments for pyarrow.parquet.write_table.
471506
"""
472507
if files is None:
@@ -501,8 +536,8 @@ def repair_schema(
501536
if ts_unit is not None or tz is not None:
502537
schema = convert_timestamp(schema, unit=ts_unit, tz=tz)
503538

504-
if not use_large_string:
505-
schema = shrink_large_string(schema)
539+
# if not use_large_types:
540+
schema = convert_large_types_to_normal(schema)
506541

507542
schemas_equal = all([schema == schemas_ for schemas_ in file_schemas.values()])
508543

0 commit comments

Comments
 (0)