diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 98d670a03343..48277a3c5ff0 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -94,6 +94,47 @@ namespace NKikimr::NDataStreams::V1 { return {}; } + + void SetShardProperties(::Ydb::DataStreams::V1::Shard* shard, + const ::NKikimrSchemeOp::TPersQueueGroupDescription_TPartition& partition, + const bool autoPartitioningEnabled, + const size_t allShardsCount, + const std::map>& offsets) { + shard->set_shard_id(GetShardName(partition.GetPartitionId())); + + + const auto& parents = partition.GetParentPartitionIds(); + if (parents.size() > 0) { + shard->set_parent_shard_id(GetShardName(parents[0])); + } + if (parents.size() > 1) { + shard->set_adjacent_parent_shard_id(GetShardName(parents[1])); + } + + auto* rangeProto = shard->mutable_hash_key_range(); + if (autoPartitioningEnabled) { + NYql::NDecimal::TUint128 from = partition.HasKeyRange() && partition.GetKeyRange().HasFromBound() + ? NPQ::AsInt(partition.GetKeyRange().GetFromBound()) + 1: 0; + NYql::NDecimal::TUint128 to = partition.HasKeyRange() && partition.GetKeyRange().HasToBound() + ? NPQ::AsInt(partition.GetKeyRange().GetToBound()): -1; + rangeProto->set_starting_hash_key(Uint128ToDecimalString(from)); + rangeProto->set_ending_hash_key(Uint128ToDecimalString(to)); + } else { + auto range = RangeFromShardNumber(partition.GetPartitionId(), allShardsCount); + rangeProto->set_starting_hash_key(Uint128ToDecimalString(range.Start)); + rangeProto->set_ending_hash_key(Uint128ToDecimalString(range.End)); + } + + auto it = offsets.find(partition.GetPartitionId()); + if (it != offsets.end()) { + auto* rangeProto = shard->mutable_sequence_number_range(); + rangeProto->set_starting_sequence_number(TStringBuilder() << it->second.first); + + if (::NKikimrPQ::ETopicPartitionStatus::Active != partition.GetStatus()) { + rangeProto->set_ending_sequence_number(TStringBuilder() << it->second.second); + } + } + } } @@ -845,32 +886,7 @@ namespace NKikimr::NDataStreams::V1 { break; } else { auto* shard = description.add_shards(); - shard->set_shard_id(shardName); - - const auto& parents = partition.GetParentPartitionIds(); - if (parents.size() > 0) { - shard->set_parent_shard_id(GetShardName(parents[0])); - } - if (parents.size() > 1) { - shard->set_adjacent_parent_shard_id(GetShardName(parents[1])); - } - - auto* rangeProto = shard->mutable_hash_key_range(); - if (NPQ::SplitMergeEnabled(pqConfig)) { - NYql::NDecimal::TUint128 from = partition.HasKeyRange() && partition.GetKeyRange().HasFromBound() ? NPQ::AsInt(partition.GetKeyRange().GetFromBound()) + 1: 0; - NYql::NDecimal::TUint128 to = partition.HasKeyRange() && partition.GetKeyRange().HasToBound() ? NPQ::AsInt(partition.GetKeyRange().GetToBound()): -1; - rangeProto->set_starting_hash_key(Uint128ToDecimalString(from)); - rangeProto->set_ending_hash_key(Uint128ToDecimalString(to)); - } else { - auto range = RangeFromShardNumber(partitionId, PQGroup.GetPartitions().size()); - rangeProto->set_starting_hash_key(Uint128ToDecimalString(range.Start)); - rangeProto->set_ending_hash_key(Uint128ToDecimalString(range.End)); - } - auto it = StartEndOffsetsPerPartition.find(partitionId); - if (it != StartEndOffsetsPerPartition.end()) { - auto* rangeProto = shard->mutable_sequence_number_range(); - rangeProto->set_starting_sequence_number(TStringBuilder() << it->second.first); - } + SetShardProperties(shard, partition, NPQ::SplitMergeEnabled(pqConfig), PQGroup.GetPartitions().size(), StartEndOffsetsPerPartition); } } } @@ -1754,6 +1770,7 @@ namespace NKikimr::NDataStreams::V1 { std::vector Shards; ui32 LeftToRead = 0; ui32 AllShardsCount = 0; + bool AutoPartitioningEnabled = false; std::atomic GotOffsetResponds; std::vector Pipes; }; @@ -1847,7 +1864,8 @@ namespace NKikimr::NDataStreams::V1 { } using TPartition = NKikimrSchemeOp::TPersQueueGroupDescription::TPartition; - const auto& partitions = topicInfo.PQGroupInfo->Description.GetPartitions(); + const auto& description = topicInfo.PQGroupInfo->Description; + const auto& partitions = description.GetPartitions(); TString startingShardId = this->GetProtoRequest()->Getexclusive_start_shard_id(); ui64 startingTimepoint{0}; bool onlyOpenShards{true}; @@ -1895,6 +1913,8 @@ namespace NKikimr::NDataStreams::V1 { }} }; + AutoPartitioningEnabled = NPQ::SplitMergeEnabled(description.GetPQTabletConfig()); + const auto alreadyRead = NextToken.GetAlreadyRead(); if (alreadyRead > (ui32)partitions.size()) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast(NYds::EErrorCodes::INVALID_ARGUMENT), @@ -1970,21 +1990,7 @@ namespace NKikimr::NDataStreams::V1 { void TListShardsActor::SendResponse(const TActorContext& ctx) { Ydb::DataStreams::V1::ListShardsResult result; for (auto& shard : Shards) { - auto awsShard = result.Addshards(); - // TODO: - // awsShard->set_parent_shard_id(""); - // awsShard->set_adjacent_parent_shard_id(prevShardName); - auto range = RangeFromShardNumber(shard.GetPartitionId(), AllShardsCount); - awsShard->mutable_hash_key_range()->set_starting_hash_key( - Uint128ToDecimalString(range.Start)); - awsShard->mutable_hash_key_range()->set_ending_hash_key( - Uint128ToDecimalString(range.End)); - awsShard->mutable_sequence_number_range()->set_starting_sequence_number( - std::to_string(StartEndOffsetsPerPartition[shard.GetPartitionId()].first)); - //TODO: fill it only for closed partitions - //awsShard->mutable_sequence_number_range()->set_ending_sequence_number( - // std::to_string(StartEndOffsetsPerPartition[shard.GetPartitionId()].second)); - awsShard->set_shard_id(GetShardName(shard.GetPartitionId())); + SetShardProperties(result.Addshards(), shard, AutoPartitioningEnabled, AllShardsCount, StartEndOffsetsPerPartition); } if (LeftToRead > 0) { TNextToken token(StreamName, NextToken.GetAlreadyRead() + Shards.size(), MaxResults, TInstant::Now().MilliSeconds()); diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp index 3824ad189088..e3d4b1cb449b 100644 --- a/ydb/services/datastreams/datastreams_ut.cpp +++ b/ydb/services/datastreams/datastreams_ut.cpp @@ -2756,6 +2756,18 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_EQUAL(description.partitioning_settings().auto_partitioning_settings().partition_write_speed().down_utilization_percent(), 13); } + { + std::vector records; + for (ui32 i = 1; i <= 30; ++i) { + TString data = Sprintf("%04u", i); + records.push_back({data, data, ""}); + } + auto result = testServer.DataStreamsClient->PutRecords(streamName, records).ExtractValueSync(); + Cerr << result.GetResult().DebugString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { ui64 txId = 107; SplitPartition(*kikimr->GetRuntime(), txId, 1, "a"); @@ -2776,8 +2788,11 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_EQUAL(description.shards().size(), 5); UNIT_ASSERT_VALUES_EQUAL(description.shards(0).sequence_number_range().starting_sequence_number(), "0"); + UNIT_ASSERT_VALUES_EQUAL(description.shards(0).sequence_number_range().ending_sequence_number(), ""); UNIT_ASSERT_VALUES_EQUAL(description.shards(0).hash_key_range().starting_hash_key(), "0"); UNIT_ASSERT_VALUES_EQUAL(description.shards(0).hash_key_range().ending_hash_key(), "113427455640312821154458202477256070484"); + UNIT_ASSERT_VALUES_EQUAL(description.shards(1).sequence_number_range().starting_sequence_number(), "0"); + UNIT_ASSERT_VALUES_EQUAL(description.shards(1).sequence_number_range().ending_sequence_number(), "8"); UNIT_ASSERT_VALUES_EQUAL(description.shards(1).hash_key_range().starting_hash_key(), "113427455640312821154458202477256070485"); UNIT_ASSERT_VALUES_EQUAL(description.shards(1).hash_key_range().ending_hash_key(), "226854911280625642308916404954512140969"); UNIT_ASSERT_VALUES_EQUAL(description.shards(2).hash_key_range().starting_hash_key(), "226854911280625642308916404954512140970");