-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[native] Table writer 2: Add Presto write protocols #18377
Conversation
d39e15a
to
47a898e
Compare
596d4f9
to
78198b6
Compare
964c0eb
to
c84c999
Compare
9e68793
to
c069b87
Compare
6c33e23
to
80b2b95
Compare
ca52b64
to
39ca0c5
Compare
@gggrace14 There are CI failures. Are these related? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gggrace14 Looks good overall. Some comments below.
case protocol::TableType::TEMPORARY: | ||
return connector::hive::LocationHandle::TableType::kTemporary; | ||
default: | ||
throw std::invalid_argument("Unknown table type"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use VELOX_USER_CHECK and include tableType in the error message (see toJsonString helper function) to simplify troubleshooting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revised here and other places. Found in this file we use VELOX_UNSUPPORTED for this case
return connector::hive::LocationHandle::WriteMode:: | ||
kDirectToTargetExistingDirectory; | ||
default: | ||
throw std::invalid_argument("Unknown write mode"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@@ -1662,6 +1717,7 @@ VeloxQueryPlanConverter::toVeloxQueryPlan( | |||
node->columnNames, | |||
insertTableHandle, | |||
outputType, | |||
connector::WriteProtocol::CommitStrategy::kNoCommit, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is no-commit protocol hard-coded here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is where a system sets the commit strategy it's going to use. CommitStrategy::kNoCommit is set for Presto.
Also discussed with MJ previously, and for PrestoSpark, he will change here to add a condition to set kTaskCommit for PrestoSpark and kNoCommit for Presto.
class PrestoNoCommitWriteProtocol | ||
: public velox::connector::hive::HiveNoCommitWriteProtocol { | ||
public: | ||
~PrestoNoCommitWriteProtocol() override {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
= default or remove this and let compiler auto-generate
std::make_shared<PrestoNoCommitWriteProtocol>()); | ||
} | ||
|
||
// private: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove commented out code
rowWrittenVector = std::dynamic_pointer_cast<FlatVector<int64_t>>( | ||
BaseVector::create(BIGINT(), 1, pool)); | ||
rowWrittenVector->set(0, commitInfo.numWrittenRows()); | ||
columns.emplace_back(rowWrittenVector); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: consider shortening
columns.emplace_back(BaseVector::createConstant(commitInfo.numWrittenRows(), 1, pool))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to know
|
||
vector_size_t numOutputRows = 1; | ||
FlatVectorPtr<int64_t> rowWrittenVector; | ||
FlatVectorPtr<StringView> fragmentsVector; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fragmentsVector and commitContextVector are used only in the 'else' branch. Consider moving them there.
commitInfo.connectorCommitInfo()); | ||
numOutputRows = hiveCommitInfo.writerParameters().size() + 1; | ||
|
||
// Set rows column |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add period at the end of the sentence; ditto other places
numOutputRows = hiveCommitInfo.writerParameters().size() + 1; | ||
|
||
// Set rows column | ||
rowWrittenVector = std::dynamic_pointer_cast<FlatVector<int64_t>>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use BaseVector::create<FlatVector<int64_t>>(...)
ditto other places
return std::make_shared<RowVector>( | ||
pool, | ||
commitInfo.outputType(), | ||
BufferPtr(nullptr), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just "nullptr"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to know
b8f071f
to
f0d3a97
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gggrace14 Looks good to me. Please, make sure all CI is green before landing.
("targetFileName", hiveWriterParameters->targetFileName()) | ||
("fileSize", 0))) | ||
("rowCount", numWrittenRows) | ||
("inMemoryDataSizeInBytes", 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this accessible outside of Meta?
} | ||
}; | ||
|
||
class PrestoHiveTaskCommitWriteProtocol |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are inside presto::protocol namespace, "Presto" prefix can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see
cca4b30
to
ab21950
Compare
Add PrestoNoCommitWriteProtocol and PrestoTaskCommitWriteProtocol, which extends the write protocols of Hive connector and make commit() return the specific format of outputs expected by Presto from the table writer. Register the two write protocols during server start. The right protocol will be picked up by the table writer, given the CommitStrategy used by the TableWriter.
Add PrestoNoCommitWriteProtocol and PrestoTaskCommitWriteProtocol,
which extends the write protocols of Hive connector and make commit()
return the specific format of outputs expected by Presto from
the table writer. Register the two write protocols during server start.
The right protocol will be picked up by the table writer,
given the CommitStrategy used by the TableWriter.