Skip to content

Commit

Permalink
Merge 1eb9835 into 9d5ce5a
Browse files Browse the repository at this point in the history
  • Loading branch information
yumkam authored Jan 22, 2025
2 parents 9d5ce5a + 1eb9835 commit d8c79f5
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(100).Done().Done()
.Done()
.Done()
.Operand()
.Conjunction()
.Operand().Equal().Column("id").Value<ui64>(2).Done().Done()
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(102).Done().Done()
.Done()
.Done()
.Done()
.Done()
.Done()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ namespace NYql::NDq {
if (retriesRemaining) {
const auto retry = RequestRetriesLimit - retriesRemaining;
const auto delay = TDuration::MilliSeconds(1u << retry); // Exponential delay from 1ms to ~0.5s
// <<< TODO tune/tweak
// << TODO tune/tweak
YQL_CLOG(WARN, ProviderGeneric) << "ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString() << ", retry " << (retry + 1) << " of " << RequestRetriesLimit << ", scheduled in " << delay;
--retriesRemaining;
if (status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
Expand Down Expand Up @@ -450,6 +450,19 @@ namespace NYql::NDq {
return result;
}

void AddClause(NConnector::NApi::TPredicate::TDisjunction &disjunction,
ui32 columnsCount, auto&& getter) {
NConnector::NApi::TPredicate::TConjunction& conjunction = *disjunction.mutable_operands()->Add()->mutable_conjunction();
for (ui32 c = 0; c != columnsCount; ++c) {
NConnector::NApi::TPredicate::TComparison& eq = *conjunction.mutable_operands()->Add()->mutable_comparison();
eq.set_operation(NConnector::NApi::TPredicate::TComparison::EOperation::TPredicate_TComparison_EOperation_EQ);
eq.mutable_left_value()->set_column(TString(KeyType->GetMemberName(c)));
auto rightTypedValue = eq.mutable_right_value()->mutable_typed_value();
ExportTypeToProto(KeyType->GetMemberType(c), *rightTypedValue->mutable_type());
ExportValueToProto(KeyType->GetMemberType(c), getter(c), *rightTypedValue->mutable_value());
}
}

TString FillSelect(NConnector::NApi::TSelect& select) {
auto dsi = LookupSource.data_source_instance();
auto error = TokenProvider->MaybeFillToken(dsi);
Expand All @@ -466,21 +479,20 @@ namespace NYql::NDq {

select.mutable_from()->Settable(LookupSource.table());

NConnector::NApi::TPredicate_TDisjunction disjunction;
NConnector::NApi::TPredicate::TDisjunction disjunction;
for (const auto& [k, _] : *Request) {
// TODO consider skipping already retrieved keys
// ... but careful, can we end up with zero? TODO
NConnector::NApi::TPredicate_TConjunction conjunction;
for (ui32 c = 0; c != KeyType->GetMembersCount(); ++c) {
NConnector::NApi::TPredicate_TComparison eq;
eq.Setoperation(NConnector::NApi::TPredicate_TComparison_EOperation::TPredicate_TComparison_EOperation_EQ);
eq.mutable_left_value()->Setcolumn(TString(KeyType->GetMemberName(c)));
auto rightTypedValue = eq.mutable_right_value()->mutable_typed_value();
ExportTypeToProto(KeyType->GetMemberType(c), *rightTypedValue->mutable_type());
ExportValueToProto(KeyType->GetMemberType(c), k.GetElement(c), *rightTypedValue->mutable_value());
*conjunction.mutable_operands()->Add()->mutable_comparison() = eq;
}
*disjunction.mutable_operands()->Add()->mutable_conjunction() = conjunction;
AddClause(disjunction, KeyType->GetMembersCount(), [&k = k](auto c) {
return k.GetElement(c);
});
}
auto& k = Request->begin()->first; // Request is never empty
// Pad query with dummy clauses to improve caching
for (ui32 nRequests = Request->size(); !IsPowerOf2(nRequests) && nRequests < MaxKeysInRequest; ++nRequests) {
AddClause(disjunction, KeyType->GetMembersCount(), [&k = k](auto c) {
return k.GetElement(c);
});
}
*select.mutable_where()->mutable_filter_typed()->mutable_disjunction() = disjunction;
return {};
Expand Down

0 comments on commit d8c79f5

Please sign in to comment.