Skip to content

Commit

Permalink
feat(python): Make output-schema optional on tableWrite (facebookincu…
Browse files Browse the repository at this point in the history
…bator#12296)

Summary:
Pull Request resolved: facebookincubator#12296

output schema should be optional; by default is uses whatever schema
is produced by the upstream operator.

Reviewed By: kostasxmeta

Differential Revision: D69406743

fbshipit-source-id: d47b5f24a2f2165efe5a9dd34fd7830e33185e30
  • Loading branch information
pedroerp authored and facebook-github-bot committed Feb 11, 2025
1 parent 183e3fa commit 04bfdff
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 15 deletions.
19 changes: 12 additions & 7 deletions velox/py/plan_builder/PyPlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,24 @@ std::optional<PyPlanNode> PyPlanBuilder::planNode() const {
}

PyPlanBuilder& PyPlanBuilder::tableWrite(
const PyType& outputSchema,
const PyFile& outputFile,
const std::string& connectorId) {
const std::string& connectorId,
const std::optional<PyType>& outputSchema) {
exec::test::PlanBuilder::TableWriterBuilder builder(planBuilder_);

// Try to convert the output type.
auto outputRowSchema = asRowType(outputSchema.type());
if (outputRowSchema == nullptr) {
throw std::runtime_error("Output schema must be a ROW().");
RowTypePtr outputRowSchema;

if (outputSchema != std::nullopt) {
outputRowSchema = asRowType(outputSchema->type());

if (outputRowSchema == nullptr) {
throw std::runtime_error("Output schema must be a ROW().");
}
builder.outputType(outputRowSchema);
}

builder.outputType(outputRowSchema)
.outputFileName(outputFile.filePath())
builder.outputFileName(outputFile.filePath())
.fileFormat(outputFile.fileFormat())
.connectorId(connectorId)
.endTableWriter();
Expand Down
9 changes: 5 additions & 4 deletions velox/py/plan_builder/PyPlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,16 @@ class PyPlanBuilder {

/// Adds a table writer node to write to an output file(s).
///
/// @param outputSchema The schema to be used when writing the file (columns
/// and types).
/// @param outputFile The output file to be written.
/// @param connectorId The id of the connector to use during the write
/// process.
/// @param outputSchema An optional schema to be used when writing the file
/// (columns and types). By default use the schema produced by the upstream
/// operator.
PyPlanBuilder& tableWrite(
const PyType& outputSchema,
const PyFile& outputFile,
const std::string& connectorId);
const std::string& connectorId,
const std::optional<PyType>& outputSchema);

// Add the provided vectors straight into the operator tree.
PyPlanBuilder& values(const std::vector<PyVector>& values);
Expand Down
7 changes: 4 additions & 3 deletions velox/py/plan_builder/plan_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,18 @@ PYBIND11_MODULE(plan_builder, m) {
.def(
"table_write",
&velox::py::PyPlanBuilder::tableWrite,
py::arg("output_schema"),
py::arg("output_file"),
py::arg("connector_id") = "hive",
py::arg("output_schema") = std::nullopt,
py::doc(R"(
Adds a table write node to the plan.
Args:
output_schema: A RowType containing the schema to be written to
the file.
output_file: Name of the file to be written.
connector_id: ID of the connector to use for this scan.
output_schema: An optional RowType containing the schema to be
written to the file. By default write the schema
produced by the operator upstream.
)"))
.def(
"values",
Expand Down
1 change: 0 additions & 1 deletion velox/py/tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ def test_write_read_file(self):

plan_builder = PlanBuilder()
plan_builder.values([input_batch]).table_write(
output_schema=ROW(["c0"], [BIGINT()]),
output_file=DWRF(output_file),
connector_id="hive",
)
Expand Down

0 comments on commit 04bfdff

Please sign in to comment.