Skip to content

Commit

Permalink
Add deduplication options checks (ydb-platform#2254)
Browse files Browse the repository at this point in the history
  • Loading branch information
FloatingCrowbar committed Mar 4, 2024
1 parent 523a628 commit 5357301
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 23 deletions.
19 changes: 8 additions & 11 deletions ydb/core/persqueue/writer/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace NKikimr::NPQ {
#define INFO(message) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message);
#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message);

static const ui64 WRITE_BLOCK_SIZE = 4_KB;
static const ui64 WRITE_BLOCK_SIZE = 4_KB;

TString TEvPartitionWriter::TEvInitResult::TSuccess::ToString() const {
return TStringBuilder() << "Success {"
Expand Down Expand Up @@ -88,7 +88,7 @@ TString TEvPartitionWriter::TEvWriteResponse::ToString() const {
class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRlHelpers {

static constexpr size_t MAX_QUOTA_INFLIGHT = 3;

static void FillHeader(NKikimrClient::TPersQueuePartitionRequest& request,
ui32 partitionId, const TActorId& pipeClient)
{
Expand Down Expand Up @@ -204,12 +204,9 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
void GetOwnership() {
auto ev = MakeRequest(PartitionId, PipeClient);

auto& cmd = *ev->Record.MutablePartitionRequest()->MutableCmdGetOwnership();
if (Opts.UseDeduplication) {
cmd.SetOwner(SourceId);
} else {
cmd.SetOwner(CreateGuidAsString());
}
auto& request = *ev->Record.MutablePartitionRequest();
auto& cmd = *request.MutableCmdGetOwnership();
cmd.SetOwner(SourceId);
cmd.SetForce(true);

NTabletPipe::SendData(SelfId(), PipeClient, ev.Release());
Expand Down Expand Up @@ -630,11 +627,11 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
break;

case EWakeupTag::RlNoResource:
// Re-requesting the quota. We do this until we get a quota.
// Re-requesting the quota. We do this until we get a quota.
// We do not request a quota with a long waiting time because the writer may already be a destroyer, and the quota will still be waiting to be received.
RequestDataQuota(PendingQuotaAmount, ctx);
break;

default:
Y_VERIFY_DEBUG_S(false, "Unsupported tag: " << static_cast<ui64>(tag));
}
Expand All @@ -658,7 +655,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
, TabletId(tabletId)
, PartitionId(partitionId)
, ExpectedGeneration(expectedGeneration)
, SourceId(sourceId)
, SourceId(opts.UseDeduplication ? sourceId : CreateGuidAsString())
, Opts(opts)
{
if (Opts.MeteringMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ class TWriteSessionImpl : public IWriteSession,
};

THandleResult OnErrorImpl(NYdb::TPlainStatus&& status); // true - should Start(), false - should Close(), empty - no action

public:
TWriteSessionImpl(const TWriteSessionSettings& settings,
std::shared_ptr<TTopicClient::TImpl> client,
Expand Down
66 changes: 64 additions & 2 deletions ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,68 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
// UNIT_ASSERT(false);
}

}
Y_UNIT_TEST(ConflictingWrites) {

TTopicSdkTestSetup setup(TEST_CASE_NAME);

NTopic::TWriteSessionSettings writeSettings;
writeSettings.Path(setup.GetTopicPath()).MessageGroupId(TEST_MESSAGE_GROUP_ID);
writeSettings.Path(setup.GetTopicPath()).ProducerId(TEST_MESSAGE_GROUP_ID);
writeSettings.Codec(NTopic::ECodec::RAW);
NTopic::IExecutor::TPtr executor = new NTopic::TSyncExecutor();
writeSettings.CompressionExecutor(executor);

ui64 count = 100u;

auto client = setup.MakeClient();
auto session = client.CreateSimpleBlockingWriteSession(writeSettings);

TString messageBase = "message----";

for (auto i = 0u; i < count; i++) {
auto res = session->Write(messageBase);
UNIT_ASSERT(res);
if (i % 10 == 0) {
setup.GetServer().KillTopicPqTablets(setup.GetTopicPath());
}
}
session->Close();

auto describeTopicSettings = TDescribeTopicSettings().IncludeStats(true);
auto result = client.DescribeTopic(setup.GetTopicPath(), describeTopicSettings).GetValueSync();
UNIT_ASSERT(result.IsSuccess());

auto description = result.GetTopicDescription();
UNIT_ASSERT(description.GetPartitions().size() == 1);
auto stats = description.GetPartitions().front().GetPartitionStats();
UNIT_ASSERT(stats.Defined());
UNIT_ASSERT_VALUES_EQUAL(stats->GetEndOffset(), count);

}
} // Y_UNIT_TEST_SUITE(BasicUsage)

Y_UNIT_TEST_SUITE(TSettingsValidation) {
Y_UNIT_TEST(TWriteSessionProducerSettings) {
TTopicSdkTestSetup setup(TEST_CASE_NAME);
TTopicClient client = setup.MakeClient();

{
auto writeSettings = TWriteSessionSettings()
.Path(TEST_TOPIC)
.ProducerId("something")
.DeduplicationEnabled(false);
try {
auto writeSession = client.CreateWriteSession(writeSettings);
auto event = writeSession->GetEvent(true);
UNIT_ASSERT(event.Defined());
auto* closedEvent = std::get_if<TSessionClosedEvent>(&event.GetRef());
UNIT_ASSERT(closedEvent);
} catch (NYdb::TContractViolation&) {
//pass
}
}
}
} // Y_UNIT_TEST_SUITE(TSettingsValidation)

} // namespace

}
26 changes: 17 additions & 9 deletions ydb/services/persqueue_v1/actors/write_session_actor.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -417,11 +417,12 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt
// 1.2. non-empty partition_id (explicit partitioning)
// 1.3. non-empty partition_with_generation (explicit partitioning && direct write to partition host)
// 2. Empty producer id (no deduplication, partition is selected using round-robin).
bool isScenarioSupported =
bool isScenarioSupported =
!InitRequest.producer_id().empty() && (
InitRequest.has_message_group_id() && InitRequest.message_group_id() == InitRequest.producer_id() ||
InitRequest.has_message_group_id() && InitRequest.message_group_id() == InitRequest.producer_id() ||
InitRequest.has_partition_id() ||
InitRequest.has_partition_with_generation()) ||
InitRequest.has_partition_with_generation())
||
InitRequest.producer_id().empty();

if (!isScenarioSupported) {
Expand Down Expand Up @@ -452,7 +453,6 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt
return InitRequest.has_message_group_id() ? InitRequest.message_group_id() : InitRequest.producer_id();
}
}();

LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << " " << InitRequest.ShortDebugString() << " from " << PeerName);
if (!UseDeduplication) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << ". Disable deduplication for empty producer id");
Expand Down Expand Up @@ -505,8 +505,9 @@ void TWriteSessionActor<UseMigrationProtocol>::InitAfterDiscovery(const TActorCo
PersQueue::ErrorCode::BAD_REQUEST, ctx);
return;
}
} else {
Y_VERIFY(!UseDeduplication);
} else if (UseDeduplication) {
CloseSession("Internal server error: got empty SourceId with enabled deduplication", PersQueue::ErrorCode::ERROR, ctx);
return;
}

InitMeta = GetInitialDataChunk(InitRequest, FullConverter->GetClientsideName(), PeerName); // ToDo[migration] - check?
Expand Down Expand Up @@ -1162,9 +1163,16 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T

OwnerCookie = result.GetResult().OwnerCookie;
const auto& maxSeqNo = result.GetResult().SourceIdInfo.GetSeqNo();
if (!UseDeduplication) {
Y_VERIFY(maxSeqNo == 0);
}

// ToDo: uncomment after fixing KIKIMR-21124
// if (!UseDeduplication) {
// if (maxSeqNo != 0) {
// return CloseSession("Internal server error: have maxSeqNo != with deduplication disabled",
// PersQueue::ErrorCode::ERROR, ctx);
// }
// }

OwnerCookie = result.GetResult().OwnerCookie;
MakeAndSentInitResponse(maxSeqNo, ctx);

}
Expand Down
49 changes: 49 additions & 0 deletions ydb/services/persqueue_v1/persqueue_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6231,6 +6231,55 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
}
}

Y_UNIT_TEST(DisableWrongSettings) {
NPersQueue::TTestServer server;
server.EnableLogs({NKikimrServices::PQ_READ_PROXY, NKikimrServices::BLACKBOX_VALIDATOR });
server.EnableLogs({NKikimrServices::PERSQUEUE}, NActors::NLog::EPriority::PRI_INFO);
TString topicFullName = "rt3.dc1--acc--topic1";
auto driver = SetupTestAndGetDriver(server, topicFullName, 3);

std::shared_ptr<grpc::Channel> Channel_;
std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> TopicStubP_;
{
Channel_ = grpc::CreateChannel("localhost:" + ToString(server.GrpcPort), grpc::InsecureChannelCredentials());
TopicStubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_);
}

{
grpc::ClientContext rcontext1;
auto writeStream1 = TopicStubP_->StreamWrite(&rcontext1);
UNIT_ASSERT(writeStream1);
Ydb::Topic::StreamWriteMessage::FromClient req;
Ydb::Topic::StreamWriteMessage::FromServer resp;

req.mutable_init_request()->set_path("acc/topic1");
req.mutable_init_request()->set_message_group_id("some-group");
if (!writeStream1->Write(req)) {
ythrow yexception() << "write fail";
}
UNIT_ASSERT(writeStream1->Read(&resp));
Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
UNIT_ASSERT(resp.status() == Ydb::StatusIds::BAD_REQUEST);
}
{
grpc::ClientContext rcontext1;
auto writeStream1 = TopicStubP_->StreamWrite(&rcontext1);
UNIT_ASSERT(writeStream1);
Ydb::Topic::StreamWriteMessage::FromClient req;
Ydb::Topic::StreamWriteMessage::FromServer resp;

req.mutable_init_request()->set_path("acc/topic1");
req.mutable_init_request()->set_message_group_id("some-group");
req.mutable_init_request()->set_producer_id("producer");
if (!writeStream1->Write(req)) {
ythrow yexception() << "write fail";
}
UNIT_ASSERT(writeStream1->Read(&resp));
Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
UNIT_ASSERT(resp.status() == Ydb::StatusIds::BAD_REQUEST);
}
}

Y_UNIT_TEST(DisableDeduplication) {
NPersQueue::TTestServer server;
TString topicFullName = "rt3.dc1--topic1";
Expand Down

0 comments on commit 5357301

Please sign in to comment.