Skip to content

Commit

Permalink
refactor: rename table to recordbatch (#3771)
Browse files Browse the repository at this point in the history
renames 'table' to 'recordbatch' to free up 'table' for upcoming catalog
& table functionality.

note for reviewers: 
a lot of files changed, but this was mostly a find and replace, so there
shouldn't be much "logic" to actually review, just more of a sanity
check to make sure I didn't terribly mess something up.
  • Loading branch information
universalmind303 authored Feb 7, 2025
1 parent 610bde4 commit 7f2e9b5
Show file tree
Hide file tree
Showing 226 changed files with 1,382 additions and 1,275 deletions.
4 changes: 2 additions & 2 deletions .github/working-dir/shuffle_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import daft
import daft.context
from daft.io._generator import read_generator
from daft.table.table import Table
from daft.recordbatch.recordbatch import RecordBatch

# Constants
GB = 1 << 30
Expand Down Expand Up @@ -110,7 +110,7 @@ def generate(
delay = random.uniform(0, timing_variation)
time.sleep(delay)

yield Table.from_pydict(data)
yield RecordBatch.from_pydict(data)


def generator(
Expand Down
62 changes: 31 additions & 31 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ daft-minhash = {path = "src/daft-minhash", default-features = false}
daft-parquet = {path = "src/daft-parquet", default-features = false}
daft-physical-plan = {path = "src/daft-physical-plan", default-features = false}
daft-py-runners = {path = "src/daft-py-runners", default-features = false}
daft-recordbatch = {path = "src/daft-recordbatch", default-features = false}
daft-scan = {path = "src/daft-scan", default-features = false}
daft-scheduler = {path = "src/daft-scheduler", default-features = false}
daft-sql = {path = "src/daft-sql", default-features = false}
daft-stats = {path = "src/daft-stats", default-features = false}
daft-table = {path = "src/daft-table", default-features = false}
daft-writers = {path = "src/daft-writers", default-features = false}
lazy_static = {workspace = true}
log = {workspace = true}
Expand Down Expand Up @@ -76,7 +76,7 @@ python = [
"daft-scheduler/python",
"daft-sql/python",
"daft-stats/python",
"daft-table/python",
"daft-recordbatch/python",
"daft-writers/python",
"daft-context/python",
"dep:daft-catalog-python-catalog",
Expand Down Expand Up @@ -168,7 +168,7 @@ members = [
"src/daft-scheduler",
"src/daft-sketch",
"src/daft-sql",
"src/daft-table",
"src/daft-recordbatch",
"src/daft-writers",
"src/hyperloglog",
"src/daft-connect",
Expand Down Expand Up @@ -207,10 +207,10 @@ daft-local-execution = {path = "src/daft-local-execution"}
daft-logical-plan = {path = "src/daft-logical-plan"}
daft-micropartition = {path = "src/daft-micropartition"}
daft-py-runners = {path = "src/daft-py-runners"}
daft-recordbatch = {path = "src/daft-recordbatch"}
daft-scan = {path = "src/daft-scan"}
daft-schema = {path = "src/daft-schema"}
daft-sql = {path = "src/daft-sql"}
daft-table = {path = "src/daft-table"}
derivative = "2.2.0"
derive_builder = "0.20.2"
futures = "0.3.30"
Expand Down
8 changes: 4 additions & 4 deletions benchmarking/parquet/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ def boto3_get_object_read(path: str, columns: list[str] | None = None) -> pa.Tab


def daft_native_read(path: str, columns: list[str] | None = None) -> pa.Table:
tbl = daft.table.MicroPartition.read_parquet(path, columns=columns)
tbl = daft.recordbatch.MicroPartition.read_parquet(path, columns=columns)
return tbl.to_arrow()


def daft_native_read_to_arrow(path: str, columns: list[str] | None = None) -> pa.Table:
return daft.table.read_parquet_into_pyarrow(path, columns=columns)
return daft.recordbatch.read_parquet_into_pyarrow(path, columns=columns)


def daft_dataframe_read(path: str, columns: list[str] | None = None) -> pa.Table:
Expand Down Expand Up @@ -85,12 +85,12 @@ def fn(files: list[str]) -> list[pa.Table]:


def daft_bulk_read(paths: list[str], columns: list[str] | None = None) -> list[pa.Table]:
tables = daft.table.Table.read_parquet_bulk(paths, columns=columns)
tables = daft.recordbatch.read_parquet_bulk(paths, columns=columns)
return [t.to_arrow() for t in tables]


def daft_into_pyarrow_bulk_read(paths: list[str], columns: list[str] | None = None) -> list[pa.Table]:
return daft.table.read_parquet_into_pyarrow_bulk(paths, columns=columns)
return daft.recordbatch.read_parquet_into_pyarrow_bulk(paths, columns=columns)


def pyarrow_bulk_read(paths: list[str], columns: list[str] | None = None) -> list[pa.Table]:
Expand Down
72 changes: 36 additions & 36 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,8 @@ class ScanTask:
size_bytes: int | None,
iceberg_delete_files: list[str] | None,
pushdowns: Pushdowns | None,
partition_values: PyTable | None,
stats: PyTable | None,
partition_values: PyRecordBatch | None,
stats: PyRecordBatch | None,
) -> ScanTask | None:
"""Create a Catalog Scan Task."""
...
Expand All @@ -637,7 +637,7 @@ class ScanTask:
num_rows: int | None,
size_bytes: int | None,
pushdowns: Pushdowns | None,
stats: PyTable | None,
stats: PyRecordBatch | None,
) -> ScanTask:
"""Create a SQL Scan Task."""
...
Expand All @@ -651,7 +651,7 @@ class ScanTask:
num_rows: int | None,
size_bytes: int | None,
pushdowns: Pushdowns | None,
stats: PyTable | None,
stats: PyRecordBatch | None,
) -> ScanTask:
"""Create a Python factory function Scan Task."""
...
Expand Down Expand Up @@ -1369,48 +1369,48 @@ class PySeries:
@staticmethod
def _debug_bincode_deserialize(b: bytes) -> PySeries: ...

class PyTable:
class PyRecordBatch:
def schema(self) -> PySchema: ...
def cast_to_schema(self, schema: PySchema) -> PyTable: ...
def eval_expression_list(self, exprs: list[PyExpr]) -> PyTable: ...
def take(self, idx: PySeries) -> PyTable: ...
def filter(self, exprs: list[PyExpr]) -> PyTable: ...
def sort(self, sort_keys: list[PyExpr], descending: list[bool], nulls_first: list[bool]) -> PyTable: ...
def cast_to_schema(self, schema: PySchema) -> PyRecordBatch: ...
def eval_expression_list(self, exprs: list[PyExpr]) -> PyRecordBatch: ...
def take(self, idx: PySeries) -> PyRecordBatch: ...
def filter(self, exprs: list[PyExpr]) -> PyRecordBatch: ...
def sort(self, sort_keys: list[PyExpr], descending: list[bool], nulls_first: list[bool]) -> PyRecordBatch: ...
def argsort(self, sort_keys: list[PyExpr], descending: list[bool], nulls_first: list[bool]) -> PySeries: ...
def agg(self, to_agg: list[PyExpr], group_by: list[PyExpr]) -> PyTable: ...
def agg(self, to_agg: list[PyExpr], group_by: list[PyExpr]) -> PyRecordBatch: ...
def pivot(
self,
group_by: list[PyExpr],
pivot_column: PyExpr,
values_column: PyExpr,
names: list[str],
) -> PyTable: ...
) -> PyRecordBatch: ...
def hash_join(
self,
right: PyTable,
right: PyRecordBatch,
left_on: list[PyExpr],
right_on: list[PyExpr],
how: JoinType,
) -> PyTable: ...
) -> PyRecordBatch: ...
def sort_merge_join(
self,
right: PyTable,
right: PyRecordBatch,
left_on: list[PyExpr],
right_on: list[PyExpr],
is_sorted: bool,
) -> PyTable: ...
def explode(self, to_explode: list[PyExpr]) -> PyTable: ...
def head(self, num: int) -> PyTable: ...
def sample_by_fraction(self, fraction: float, with_replacement: bool, seed: int | None) -> PyTable: ...
def sample_by_size(self, size: int, with_replacement: bool, seed: int | None) -> PyTable: ...
def quantiles(self, num: int) -> PyTable: ...
def partition_by_hash(self, exprs: list[PyExpr], num_partitions: int) -> list[PyTable]: ...
def partition_by_random(self, num_partitions: int, seed: int) -> list[PyTable]: ...
) -> PyRecordBatch: ...
def explode(self, to_explode: list[PyExpr]) -> PyRecordBatch: ...
def head(self, num: int) -> PyRecordBatch: ...
def sample_by_fraction(self, fraction: float, with_replacement: bool, seed: int | None) -> PyRecordBatch: ...
def sample_by_size(self, size: int, with_replacement: bool, seed: int | None) -> PyRecordBatch: ...
def quantiles(self, num: int) -> PyRecordBatch: ...
def partition_by_hash(self, exprs: list[PyExpr], num_partitions: int) -> list[PyRecordBatch]: ...
def partition_by_random(self, num_partitions: int, seed: int) -> list[PyRecordBatch]: ...
def partition_by_range(
self, partition_keys: list[PyExpr], boundaries: PyTable, descending: list[bool]
) -> list[PyTable]: ...
def partition_by_value(self, partition_keys: list[PyExpr]) -> tuple[list[PyTable], PyTable]: ...
def add_monotonically_increasing_id(self, partition_num: int, column_name: str) -> PyTable: ...
self, partition_keys: list[PyExpr], boundaries: PyRecordBatch, descending: list[bool]
) -> list[PyRecordBatch]: ...
def partition_by_value(self, partition_keys: list[PyExpr]) -> tuple[list[PyRecordBatch], PyRecordBatch]: ...
def add_monotonically_increasing_id(self, partition_num: int, column_name: str) -> PyRecordBatch: ...
def __repr__(self) -> str: ...
def _repr_html_(self) -> str: ...
def __len__(self) -> int: ...
Expand All @@ -1419,17 +1419,17 @@ class PyTable:
def get_column(self, name: str) -> PySeries: ...
def get_column_by_index(self, idx: int) -> PySeries: ...
@staticmethod
def concat(tables: list[PyTable]) -> PyTable: ...
def slice(self, start: int, end: int) -> PyTable: ...
def concat(tables: list[PyRecordBatch]) -> PyRecordBatch: ...
def slice(self, start: int, end: int) -> PyRecordBatch: ...
@staticmethod
def from_arrow_record_batches(record_batches: list[pa.RecordBatch], schema: PySchema) -> PyTable: ...
def from_arrow_record_batches(record_batches: list[pa.RecordBatch], schema: PySchema) -> PyRecordBatch: ...
@staticmethod
def from_pylist_series(dict: dict[str, PySeries]) -> PyTable: ...
def from_pylist_series(dict: dict[str, PySeries]) -> PyRecordBatch: ...
def to_arrow_record_batch(self) -> pa.RecordBatch: ...
@staticmethod
def empty(schema: PySchema | None = None) -> PyTable: ...
def empty(schema: PySchema | None = None) -> PyRecordBatch: ...
@staticmethod
def from_file_infos(file_infos: FileInfos) -> PyTable: ...
def from_file_infos(file_infos: FileInfos) -> PyRecordBatch: ...
def to_file_infos(self) -> FileInfos: ...

class PyMicroPartition:
Expand All @@ -1443,13 +1443,13 @@ class PyMicroPartition:
@staticmethod
def from_scan_task(scan_task: ScanTask) -> PyMicroPartition: ...
@staticmethod
def from_tables(tables: list[PyTable]) -> PyMicroPartition: ...
def from_tables(tables: list[PyRecordBatch]) -> PyMicroPartition: ...
@staticmethod
def from_arrow_record_batches(record_batches: list[pa.RecordBatch], schema: PySchema) -> PyMicroPartition: ...
@staticmethod
def concat(tables: list[PyMicroPartition]) -> PyMicroPartition: ...
def slice(self, start: int, end: int) -> PyMicroPartition: ...
def to_table(self) -> PyTable: ...
def to_table(self) -> PyRecordBatch: ...
def cast_to_schema(self, schema: PySchema) -> PyMicroPartition: ...
def eval_expression_list(self, exprs: list[PyExpr]) -> PyMicroPartition: ...
def take(self, idx: PySeries) -> PyMicroPartition: ...
Expand Down Expand Up @@ -1499,7 +1499,7 @@ class PyMicroPartition:
def partition_by_hash(self, exprs: list[PyExpr], num_partitions: int) -> list[PyMicroPartition]: ...
def partition_by_random(self, num_partitions: int, seed: int) -> list[PyMicroPartition]: ...
def partition_by_range(
self, partition_keys: list[PyExpr], boundaries: PyTable, descending: list[bool]
self, partition_keys: list[PyExpr], boundaries: PyRecordBatch, descending: list[bool]
) -> list[PyMicroPartition]: ...
def partition_by_value(self, exprs: list[PyExpr]) -> tuple[list[PyMicroPartition], PyMicroPartition]: ...
def add_monotonically_increasing_id(self, partition_num: int, column_name: str) -> PyMicroPartition: ...
Expand Down
Loading

0 comments on commit 7f2e9b5

Please sign in to comment.