diff --git a/ydb/tests/fq/pq_async_io/dq_pq_read_actor_ut.cpp b/ydb/tests/fq/pq_async_io/dq_pq_read_actor_ut.cpp new file mode 100644 index 000000000000..fc962a128d9e --- /dev/null +++ b/ydb/tests/fq/pq_async_io/dq_pq_read_actor_ut.cpp @@ -0,0 +1,352 @@ +#include "ut_helpers.h" + +#include + +#include + +#include + +namespace NYql::NDq { + +namespace { + +// We can't be sure that no extra watermarks were generated (we can't control LB receipt write time). +// So, we will check only if there is at least one watermark before each specified position. +template +void AssertDataWithWatermarks( + const std::vector>& actual, + const std::vector& expected, + const std::vector watermarkBeforePositions) +{ + auto expectedPos = 0U; + auto watermarksBeforeIter = watermarkBeforePositions.begin(); + + for (auto item : actual) { + if (std::holds_alternative(item)) { + if (watermarksBeforeIter != watermarkBeforePositions.end()) { + watermarksBeforeIter++; + } + continue; + } else { + UNIT_ASSERT_C(expectedPos < expected.size(), "Too many data items"); + UNIT_ASSERT_C( + watermarksBeforeIter == watermarkBeforePositions.end() || + *watermarksBeforeIter > expectedPos, + "Watermark before item on position " << expectedPos << " was expected"); + UNIT_ASSERT_EQUAL(std::get(item), expected.at(expectedPos)); + expectedPos++; + } + } +} + +constexpr auto defaultWatermarkPeriod = TDuration::MilliSeconds(100); +constexpr auto defaultLateArrivalDelay = TDuration::MilliSeconds(1); + +void WaitForNextWatermark(TDuration lateArrivalDelayMs = defaultLateArrivalDelay) { + // We can't control write time in LB, so just sleep for watermarkPeriod to ensure the next written data + // will obtain write_time which will move watermark forward. + Sleep(lateArrivalDelayMs); +} + +} + +Y_UNIT_TEST_SUITE(TDqPqReadActorTest) { + Y_UNIT_TEST_F(TestReadFromTopic, TPqIoTestFixture) { + const TString topicName = "ReadFromTopic"; + PQCreateStream(topicName); + InitSource(topicName); + + const std::vector data = { "1", "2", "3", "4" }; + PQWrite(data, topicName); + + auto result = SourceReadDataUntil(UVParser, 4); + AssertDataWithWatermarks(result, data, {}); + } + + Y_UNIT_TEST_F(TestReadFromTopicFromNow, TPqIoTestFixture) { + const TString topicName = "ReadFromTopicFromNow"; + PQCreateStream(topicName); + + const std::vector oldData = { "-4", "-3", "-2", "-1", "0" }; + PQWrite(oldData, topicName); + + InitSource(topicName); + + const std::vector data = { "1", "2", "3", "4" }; + PQWrite(data, topicName); + + auto result = SourceReadDataUntil(UVParser, 4); + AssertDataWithWatermarks(result, data, {}); + } + + Y_UNIT_TEST_F(ReadWithFreeSpace, TPqIoTestFixture) { + const TString topicName = "ReadWithFreeSpace"; + PQCreateStream(topicName); + InitSource(topicName); + + PQWrite({"data1", "data2", "data3"}, topicName); + + { + auto result = SourceReadDataUntil(UVParser, 1, 1); + std::vector expected {"data1"}; + AssertDataWithWatermarks(result, expected, {}); + } + + UNIT_ASSERT_EQUAL(SourceRead(UVParser, 0).size(), 0); + UNIT_ASSERT_EQUAL(SourceRead(UVParser, -1).size(), 0); + } + + Y_UNIT_TEST_F(ReadNonExistentTopic, TPqIoTestFixture) { + const TString topicName = "NonExistentTopic"; + InitSource(topicName); + + while (true) { + try { + SourceRead(UVParser); + } catch (yexception& e) { + UNIT_ASSERT_STRING_CONTAINS(e.what(), "Read session to topic \"NonExistentTopic\" was closed"); + break; + } + + sleep(1); + } + } + + Y_UNIT_TEST(TestSaveLoadPqRead) { + NDqProto::TSourceState state; + const TString topicName = "SaveLoadPqRead"; + PQCreateStream(topicName); + + { + TPqIoTestFixture setup1; + setup1.InitSource(topicName); + + std::vector data {"data"}; + PQWrite(data, topicName); + + auto result = setup1.SourceReadDataUntil(UVParser, 1); + AssertDataWithWatermarks(result, data, {}); + + auto checkpoint = CreateCheckpoint(); + setup1.SaveSourceState(checkpoint, state); + Cerr << "State saved" << Endl; + } + + NDqProto::TSourceState state2; + { + TPqIoTestFixture setup2; + setup2.InitSource(topicName); + + std::vector data {"data"}; + PQWrite({"data"}, topicName); + + setup2.LoadSource(state); + Cerr << "State loaded" << Endl; + auto result = setup2.SourceReadDataUntil(UVParser, 1); + AssertDataWithWatermarks(result, data, {}); + + auto checkpoint = CreateCheckpoint(); + setup2.SaveSourceState(checkpoint, state2); + + PQWrite({"futherData"}, topicName); + } + + NDqProto::TSourceState state3; + { + TPqIoTestFixture setup3; + setup3.InitSource(topicName); + setup3.LoadSource(state2); + + auto result = setup3.SourceReadDataUntil(UVParser, 1); + std::vector data {"futherData"}; + AssertDataWithWatermarks(result, data, {}); + + // pq session is steel alive + + PQWrite({"yetAnotherData"}, topicName); + + auto checkpoint = CreateCheckpoint(); + setup3.SaveSourceState(checkpoint, state3); + } + + // Load the first state and check it. + { + TPqIoTestFixture setup4; + setup4.InitSource(topicName); + setup4.LoadSource(state); + + auto result = setup4.SourceReadUntil(UVParser, 3); + std::vector data {"data", "futherData", "yetAnotherData"}; + AssertDataWithWatermarks(result, data, {}); + } + + // Load graphState2 and check it (offsets were saved). + { + TPqIoTestFixture setup5; + setup5.InitSource(topicName); + setup5.LoadSource(state2); + + auto result = setup5.SourceReadDataUntil(UVParser, 2); + std::vector data {"futherData", "yetAnotherData"}; + AssertDataWithWatermarks(result, data, {}); + } + + // Load graphState3 and check it (other offsets). + { + TPqIoTestFixture setup6; + setup6.InitSource(topicName); + setup6.LoadSource(state3); + + auto result = setup6.SourceReadDataUntil(UVParser, 1); + std::vector data {"yetAnotherData"}; + AssertDataWithWatermarks(result, data, {}); + } + } + + Y_UNIT_TEST(LoadCorruptedState) { + NDqProto::TSourceState state; + const TString topicName = "Invalid"; // We wouldn't read from this topic. + auto checkpoint = CreateCheckpoint(); + + { + TPqIoTestFixture setup1; + setup1.InitSource(topicName); + setup1.SaveSourceState(checkpoint, state); + } + + // Corrupt state. + TString corruptedBlob = state.GetData(0).GetStateData().GetBlob(); + corruptedBlob.append('a'); + state.MutableData(0)->MutableStateData()->SetBlob(corruptedBlob); + + { + TPqIoTestFixture setup2; + setup2.InitSource(topicName); + UNIT_ASSERT_EXCEPTION_CONTAINS(setup2.LoadSource(state), yexception, "Serialized state is corrupted"); + } + } + + Y_UNIT_TEST(TestLoadFromSeveralStates) { + const TString topicName = "LoadFromSeveralStates"; + PQCreateStream(topicName); + + NDqProto::TSourceState state2; + { + TPqIoTestFixture setup; + setup.InitSource(topicName); + + std::vector data {"data"}; + PQWrite(data, topicName); + + auto result1 = setup.SourceReadDataUntil(UVParser, 1); + AssertDataWithWatermarks(result1, data, {}); + + NDqProto::TSourceState state1; + auto checkpoint1 = CreateCheckpoint(); + setup.SaveSourceState(checkpoint1, state1); + Cerr << "State saved" << Endl; + + std::vector data2 {"data2"}; + PQWrite(data2, topicName); + + auto result2 = setup.SourceReadDataUntil(UVParser, 1); + AssertDataWithWatermarks(result2, data2, {}); + + auto checkpoint2 = CreateCheckpoint(); + setup.SaveSourceState(checkpoint2, state2); + Cerr << "State 2 saved" << Endl; + + // Add state1 to state2 + *state2.AddData() = state1.GetData(0); + } + + TPqIoTestFixture setup2; + setup2.InitSource(topicName); + setup2.LoadSource(state2); // Loads min offset + + std::vector data3 {"data3"}; + PQWrite(data3, topicName); + + auto result = setup2.SourceReadUntil(UVParser, 2); + std::vector dataResult {"data2", "data3"}; + AssertDataWithWatermarks(result, dataResult, {}); + } + + Y_UNIT_TEST_F(TestReadFromTopicFirstWatermark, TPqIoTestFixture) { + const TString topicName = "ReadFromTopicFirstWatermark"; + PQCreateStream(topicName); + + auto settings = BuildPqTopicSourceSettings(topicName, defaultWatermarkPeriod); + InitSource(std::move(settings)); + + const std::vector data = { "1", "2", "3", "4" }; + PQWrite(data, topicName); + + auto result = SourceReadDataUntil(UVParser, 4); + AssertDataWithWatermarks(result, { "1", "2", "3", "4" }, {0}); + } + + Y_UNIT_TEST_F(TestReadFromTopicWatermarks1, TPqIoTestFixture) { + const TString topicName = "ReadFromTopicWatermarks1"; + PQCreateStream(topicName); + + auto settings = BuildPqTopicSourceSettings(topicName, defaultWatermarkPeriod, defaultLateArrivalDelay); + InitSource(std::move(settings)); + + const std::vector data1 = { "1", "2" }; + PQWrite(data1, topicName); + + WaitForNextWatermark(); + const std::vector data2 = { "3", "4", "5" }; + PQWrite(data2, topicName); + + WaitForNextWatermark(); + const std::vector data3 = { "6" }; + PQWrite(data3, topicName); + + auto result = SourceReadDataUntil(UVParser, 6); + AssertDataWithWatermarks(result, {"1", "2", "3", "4", "5", "6"}, {0, 2, 5}); + } + + Y_UNIT_TEST(WatermarkCheckpointWithItemsInReadyBuffer) { + const TString topicName = "WatermarkCheckpointWithItemsInReadyBuffer"; + PQCreateStream(topicName); + NDqProto::TSourceState state; + + { + TPqIoTestFixture setup; + auto settings = BuildPqTopicSourceSettings(topicName, defaultWatermarkPeriod); + setup.InitSource(std::move(settings)); + + std::vector data1 {"1", "2"}; + PQWrite(data1, topicName); + + auto result = setup.SourceReadDataUntil(UVParser, 2); + AssertDataWithWatermarks(result, data1, {0}); + + WaitForNextWatermark(); + std::vector data2 {"3", "4"}; + PQWrite(data2, topicName); + + // read only watermark (1-st batch), items '3', '4' will stay in ready buffer inside Source actor + result = setup.SourceReadUntil(UVParser, 1); + AssertDataWithWatermarks(result, {}, {0}); + + auto checkpoint = CreateCheckpoint(); + setup.SaveSourceState(checkpoint, state); + Cerr << "State saved" << Endl; + } + + { + TPqIoTestFixture setup; + auto settings = BuildPqTopicSourceSettings(topicName, defaultWatermarkPeriod); + setup.InitSource(std::move(settings)); + setup.LoadSource(state); + + auto result = setup.SourceReadDataUntil(UVParser, 2); + // Since items '3', '4' weren't returned from source actor, they should be reread + AssertDataWithWatermarks(result, {"3", "4"}, {}); + } + } +} +} // NYql::NDq diff --git a/ydb/tests/fq/pq_async_io/dq_pq_write_actor_ut.cpp b/ydb/tests/fq/pq_async_io/dq_pq_write_actor_ut.cpp new file mode 100644 index 000000000000..7c6cd84f3185 --- /dev/null +++ b/ydb/tests/fq/pq_async_io/dq_pq_write_actor_ut.cpp @@ -0,0 +1,143 @@ +#include "ut_helpers.h" + +#include + +#include + +namespace NYql::NDq { + +constexpr TDuration WaitTimeout = TDuration::MilliSeconds(10000); + +Y_UNIT_TEST_SUITE(TPqWriterTest) { + Y_UNIT_TEST_F(TestWriteToTopic, TPqIoTestFixture) { + const TString topicName = "WriteToTopic"; + PQCreateStream(topicName); + InitAsyncOutput(topicName); + const std::vector data = { "1", "2", "3", "4" }; + + AsyncOutputWrite(data); + auto result = PQReadUntil(topicName, 4); + UNIT_ASSERT_EQUAL(result, data); + } + + Y_UNIT_TEST_F(TestWriteToTopicMultiBatch, TPqIoTestFixture) { + const TString topicName = "WriteToTopicMultiBatch"; + PQCreateStream(topicName); + InitAsyncOutput(topicName); + + const std::vector data1 = { "1" }; + const std::vector data2 = { "2" }; + const std::vector data3 = { "3" }; + + AsyncOutputWrite(data1); + AsyncOutputWrite(data2); + AsyncOutputWrite(data3); + auto result = PQReadUntil(topicName, 3); + + std::vector expected = { "1", "2", "3" }; + UNIT_ASSERT_EQUAL(result, expected); + } + + Y_UNIT_TEST_F(TestDeferredWriteToTopic, TPqIoTestFixture) { + // In this case we are checking free space overflow + const TString topicName = "DeferredWriteToTopic"; + PQCreateStream(topicName); + InitAsyncOutput(topicName, 1); + + const std::vector data = { "1", "2", "3" }; + + auto future = CaSetup->AsyncOutputPromises.ResumeExecution.GetFuture(); + AsyncOutputWrite(data); + auto result = PQReadUntil(topicName, 3); + + UNIT_ASSERT_EQUAL(result, data); + UNIT_ASSERT(future.Wait(WaitTimeout)); // Resume execution should be called + + const std::vector data2 = { "4", "5", "6" }; + + AsyncOutputWrite(data2); + auto result2 = PQReadUntil(topicName, 6); + const std::vector expected = { "1", "2", "3", "4", "5", "6" }; + UNIT_ASSERT_EQUAL(result2, expected); + } + + Y_UNIT_TEST_F(WriteNonExistentTopic, TPqIoTestFixture) { + const TString topicName = "NonExistentTopic"; + InitAsyncOutput(topicName); + + const std::vector data = { "1" }; + auto future = CaSetup->AsyncOutputPromises.Issue.GetFuture(); + AsyncOutputWrite(data); + + UNIT_ASSERT(future.Wait(WaitTimeout)); + UNIT_ASSERT_STRING_CONTAINS(future.GetValue().ToString(), "Write session to topic \"NonExistentTopic\" was closed"); + } + + Y_UNIT_TEST(TestCheckpoints) { + const TString topicName = "Checkpoints"; + PQCreateStream(topicName); + + NDqProto::TSinkState state1; + { + TPqIoTestFixture setup; + setup.InitAsyncOutput(topicName); + + const std::vector data1 = { "1" }; + setup.AsyncOutputWrite(data1); + + const std::vector data2 = { "2", "3" }; + auto checkpoint = CreateCheckpoint(); + auto future = setup.CaSetup->AsyncOutputPromises.StateSaved.GetFuture(); + setup.AsyncOutputWrite(data2, checkpoint); + + UNIT_ASSERT(future.Wait(WaitTimeout)); + state1 = future.GetValue(); + } + + { + TPqIoTestFixture setup; + setup.InitAsyncOutput(topicName); + setup.LoadSink(state1); + + const std::vector data3 = { "4", "5" }; + setup.AsyncOutputWrite(data3); + + auto result = PQReadUntil(topicName, 5); + const std::vector expected = { "1", "2", "3", "4", "5" }; + UNIT_ASSERT_EQUAL(result, expected); + } + + { + TPqIoTestFixture setup; + setup.InitAsyncOutput(topicName); + setup.LoadSink(state1); + + const std::vector data4 = { "4", "5" }; + setup.AsyncOutputWrite(data4); // This write should be deduplicated + + auto result = PQReadUntil(topicName, 4); + const std::vector expected = { "1", "2", "3", "4", "5" }; + UNIT_ASSERT_EQUAL(result, expected); + } + } + + Y_UNIT_TEST_F(TestCheckpointWithEmptyBatch, TPqIoTestFixture) { + const TString topicName = "CheckpointsWithEmptyBatch"; + PQCreateStream(topicName); + + NDqProto::TSinkState state1; + { + InitAsyncOutput(topicName); + + const std::vector data = {}; + auto checkpoint = CreateCheckpoint(); + auto future = CaSetup->AsyncOutputPromises.StateSaved.GetFuture(); + AsyncOutputWrite(data, checkpoint); + + UNIT_ASSERT(future.Wait(WaitTimeout)); + state1 = future.GetValue(); + } + } +} + +} // NYql::NDq diff --git a/ydb/tests/fq/pq_async_io/ut_helpers.cpp b/ydb/tests/fq/pq_async_io/ut_helpers.cpp new file mode 100644 index 000000000000..532532051244 --- /dev/null +++ b/ydb/tests/fq/pq_async_io/ut_helpers.cpp @@ -0,0 +1,248 @@ +#include "ut_helpers.h" + +#include + +#include + +#include + +#include +#include + +namespace NYql::NDq { + +using namespace NActors; + +NYql::NPq::NProto::TDqPqTopicSource BuildPqTopicSourceSettings( + TString topic, + TMaybe watermarksPeriod, + TDuration lateArrivalDelay, + bool idlePartitionsEnabled) +{ + NYql::NPq::NProto::TDqPqTopicSource settings; + settings.SetTopicPath(topic); + settings.SetConsumerName(DefaultPqConsumer); + settings.SetEndpoint(GetDefaultPqEndpoint()); + settings.MutableToken()->SetName("token"); + settings.SetDatabase(GetDefaultPqDatabase()); + if (watermarksPeriod) { + settings.MutableWatermarks()->SetEnabled(true); + settings.MutableWatermarks()->SetGranularityUs(watermarksPeriod->MicroSeconds()); + } + settings.MutableWatermarks()->SetIdlePartitionsEnabled(idlePartitionsEnabled); + settings.MutableWatermarks()->SetLateArrivalDelayUs(lateArrivalDelay.MicroSeconds()); + + return settings; +} + +NYql::NPq::NProto::TDqPqTopicSink BuildPqTopicSinkSettings(TString topic) { + NYql::NPq::NProto::TDqPqTopicSink settings; + settings.SetTopicPath(topic); + settings.SetEndpoint(GetDefaultPqEndpoint()); + settings.SetDatabase(GetDefaultPqDatabase()); + settings.SetClusterType(NPq::NProto::DataStreams); + settings.MutableToken()->SetName("token"); + + return settings; +} + +TPqIoTestFixture::TPqIoTestFixture() { +} + +TPqIoTestFixture::~TPqIoTestFixture() { + CaSetup = nullptr; + Driver.Stop(true); +} + +void TPqIoTestFixture::InitSource( + NYql::NPq::NProto::TDqPqTopicSource&& settings, + i64 freeSpace) +{ + CaSetup->Execute([&](TFakeActor& actor) { + NPq::NProto::TDqReadTaskParams params; + auto* partitioninigParams = params.MutablePartitioningParams(); + partitioninigParams->SetTopicPartitionsCount(1); + partitioninigParams->SetEachTopicPartitionGroupId(0); + partitioninigParams->SetDqPartitionsCount(1); + + TString serializedParams; + Y_PROTOBUF_SUPPRESS_NODISCARD params.SerializeToString(&serializedParams); + + const THashMap secureParams; + const THashMap taskParams { {"pq", serializedParams} }; + + auto [dqSource, dqSourceAsActor] = CreateDqPqReadActor( + std::move(settings), + 0, + NYql::NDq::TCollectStatsLevel::None, + "query_1", + 0, + secureParams, + taskParams, + Driver, + nullptr, + actor.SelfId(), + actor.GetHolderFactory(), + freeSpace); + + actor.InitAsyncInput(dqSource, dqSourceAsActor); + }); +} + +void TPqIoTestFixture::InitAsyncOutput( + NPq::NProto::TDqPqTopicSink&& settings, + i64 freeSpace) +{ + const THashMap secureParams; + + CaSetup->Execute([&](TFakeActor& actor) { + auto [dqAsyncOutput, dqAsyncOutputAsActor] = CreateDqPqWriteActor( + std::move(settings), + 0, + NYql::NDq::TCollectStatsLevel::None, + "query_1", + secureParams, + Driver, + nullptr, + &actor.GetAsyncOutputCallbacks(), + freeSpace); + + actor.InitAsyncOutput(dqAsyncOutput, dqAsyncOutputAsActor); + }); +} + +TString GetDefaultPqEndpoint() { + auto endpoint = GetEnv("YDB_ENDPOINT"); + UNIT_ASSERT_C(endpoint, "Yds recipe is expected"); + return endpoint; +} + +TString GetDefaultPqDatabase() { + auto database = GetEnv("YDB_DATABASE"); + UNIT_ASSERT_C(database, "Yds recipe is expected"); + return database; +} + +extern const TString DefaultPqConsumer = "test_client"; + +void PQWrite( + const std::vector& sequence, + const TString& topic, + const TString& endpoint) +{ + NYdb::TDriverConfig cfg; + cfg.SetEndpoint(endpoint); + cfg.SetDatabase(GetDefaultPqDatabase()); + cfg.SetLog(CreateLogBackend("cerr")); + NYdb::TDriver driver(cfg); + NYdb::NPersQueue::TPersQueueClient client(driver); + NYdb::NPersQueue::TWriteSessionSettings sessionSettings; + sessionSettings + .Path(topic) + .MessageGroupId("src_id") + .Codec(NYdb::NPersQueue::ECodec::RAW); + auto session = client.CreateSimpleBlockingWriteSession(sessionSettings); + for (const TString& data : sequence) { + UNIT_ASSERT_C(session->Write(data), "Failed to write message with body \"" << data << "\" to topic " << topic); + Cerr << "Message '" << data << "' was written into topic '" << topic << "'" << Endl; + } + session->Close(); // Wait until all data would be written into PQ. + session = nullptr; + driver.Stop(true); +} + +std::vector PQReadUntil( + const TString& topic, + ui64 size, + const TString& endpoint, + TDuration timeout) +{ + NYdb::TDriverConfig cfg; + cfg.SetEndpoint(endpoint); + cfg.SetDatabase(GetDefaultPqDatabase()); + cfg.SetLog(CreateLogBackend("cerr")); + NYdb::TDriver driver(cfg); + NYdb::NPersQueue::TPersQueueClient client(driver); + NYdb::NPersQueue::TReadSessionSettings sessionSettings; + sessionSettings + .AppendTopics(topic) + .ConsumerName(DefaultPqConsumer) + .DisableClusterDiscovery(true); + + auto promise = NThreading::NewPromise(); + std::vector result; + + sessionSettings.EventHandlers_.SimpleDataHandlers([&](NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& ev) { + for (const auto& message : ev.GetMessages()) { + result.emplace_back(message.GetData()); + } + if (result.size() >= size) { + promise.SetValue(); + } + }, false, false); + + std::shared_ptr session = client.CreateReadSession(sessionSettings); + UNIT_ASSERT(promise.GetFuture().Wait(timeout)); + session->Close(TDuration::Zero()); + session = nullptr; + driver.Stop(true); + return result; +} + +void PQCreateStream(const TString& streamName) +{ + NYdb::TDriverConfig cfg; + cfg.SetEndpoint(GetDefaultPqEndpoint()); + cfg.SetDatabase(GetDefaultPqDatabase()); + cfg.SetLog(CreateLogBackend("cerr")); + NYdb::TDriver driver(cfg); + + NYdb::NDataStreams::V1::TDataStreamsClient client = NYdb::NDataStreams::V1::TDataStreamsClient( + driver, + NYdb::TCommonClientSettings().Database(GetDefaultPqDatabase())); + + auto result = client.CreateStream(streamName, + NYdb::NDataStreams::V1::TCreateStreamSettings().ShardCount(1).RetentionPeriodHours(1)).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + + AddReadRule(driver, streamName); + driver.Stop(true); +} + +void AddReadRule(NYdb::TDriver& driver, const TString& streamName) { + NYdb::NPersQueue::TPersQueueClient client(driver); + + auto result = client.AddReadRule( + streamName, + NYdb::NPersQueue::TAddReadRuleSettings() + .ReadRule( + NYdb::NPersQueue::TReadRuleSettings() + .ConsumerName(DefaultPqConsumer) + .ServiceType("yandex-query") + .SupportedCodecs({ + NYdb::NPersQueue::ECodec::RAW + }) + ) + ).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); +} + +std::vector UVParser(const NUdf::TUnboxedValue& item) { + return { TString(item.AsStringRef()) }; +} + +void TPqIoTestFixture::AsyncOutputWrite(std::vector data, TMaybe checkpoint) { + CaSetup->AsyncOutputWrite([data](NKikimr::NMiniKQL::THolderFactory& factory) { + NKikimr::NMiniKQL::TUnboxedValueBatch batch; + for (const auto& item : data) { + NUdf::TUnboxedValue* unboxedValueForData = nullptr; + batch.emplace_back(factory.CreateDirectArrayHolder(1, unboxedValueForData)); + unboxedValueForData[0] = NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(item.Data(), item.Size())); + } + + return batch; + }, checkpoint); +} +} diff --git a/ydb/tests/fq/pq_async_io/ut_helpers.h b/ydb/tests/fq/pq_async_io/ut_helpers.h new file mode 100644 index 000000000000..260bdcec0281 --- /dev/null +++ b/ydb/tests/fq/pq_async_io/ut_helpers.h @@ -0,0 +1,129 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include +#include + +namespace NYql::NDq { + +NYql::NPq::NProto::TDqPqTopicSource BuildPqTopicSourceSettings( + TString topic, + TMaybe watermarksPeriod = Nothing(), + TDuration lateArrivalDelay = TDuration::Seconds(2), + bool idlePartitionsEnabled = false); + +NYql::NPq::NProto::TDqPqTopicSink BuildPqTopicSinkSettings(TString topic); + +TString GetDefaultPqEndpoint(); +TString GetDefaultPqDatabase(); + +struct TPqIoTestFixture : public NUnitTest::TBaseFixture { + std::unique_ptr CaSetup = std::make_unique(); + NYdb::TDriver Driver = NYdb::TDriver(NYdb::TDriverConfig().SetLog(CreateLogBackend("cerr"))); + + TPqIoTestFixture(); + ~TPqIoTestFixture(); + + void InitSource( + NYql::NPq::NProto::TDqPqTopicSource&& settings, + i64 freeSpace = 1_MB); + + void InitSource( + const TString& topic, + i64 freeSpace = 1_MB) + { + InitSource(BuildPqTopicSourceSettings(topic), freeSpace); + } + + template + std::vector> SourceRead(const TReadValueParser parser, i64 freeSpace = 12345) { + NThreading::TFuture nextDataFuture; + return CaSetup->AsyncInputRead(parser, nextDataFuture, freeSpace); + } + + template + std::vector> SourceReadUntil( + const TReadValueParser parser, + ui64 size, + i64 eachReadFreeSpace = 1000, + TDuration timeout = TDuration::Seconds(30)) + { + return CaSetup->AsyncInputReadUntil(parser, size, eachReadFreeSpace, timeout, false); + } + + template + std::vector> SourceReadDataUntil( + const TReadValueParser parser, + ui64 size, + i64 eachReadFreeSpace = 1000) + { + return CaSetup->AsyncInputReadUntil(parser, size, eachReadFreeSpace, TDuration::Seconds(30), true); + } + + + void SaveSourceState(NDqProto::TCheckpoint checkpoint, NDqProto::TSourceState& state) { + CaSetup->SaveSourceState(checkpoint, state); + } + + void LoadSource(const NDqProto::TSourceState& state) { + return CaSetup->LoadSource(state); + } + + + void InitAsyncOutput( + NYql::NPq::NProto::TDqPqTopicSink&& settings, + i64 freeSpace = 1_MB); + + void InitAsyncOutput( + const TString& topic, + i64 freeSpace = 1_MB) + { + InitAsyncOutput(BuildPqTopicSinkSettings(topic), freeSpace); + } + + void LoadSink(const NDqProto::TSinkState& state) { + CaSetup->LoadSink(state); + } + + void AsyncOutputWrite(std::vector data, TMaybe checkpoint = Nothing()); +}; + +extern const TString DefaultPqConsumer; +extern const TString DefaultPqCluster; + +// Write using YDB driver +void PQWrite( + const std::vector& sequence, + const TString& topic, + const TString& endpoint = GetDefaultPqEndpoint()); + +// Read using YDB driver +std::vector PQReadUntil( + const TString& topic, + ui64 size, + const TString& endpoint = GetDefaultPqEndpoint(), + TDuration timeout = TDuration::MilliSeconds(10000)); + +void PQCreateStream( + const TString& streamName); + +void AddReadRule( + NYdb::TDriver& driver, + const TString& streamName); + +std::vector UVParser(const NUdf::TUnboxedValue& item); + +} diff --git a/ydb/tests/fq/pq_async_io/ya.make b/ydb/tests/fq/pq_async_io/ya.make new file mode 100644 index 000000000000..d830123e1cdc --- /dev/null +++ b/ydb/tests/fq/pq_async_io/ya.make @@ -0,0 +1,32 @@ +UNITTEST_FOR(ydb/library/yql/providers/pq/async_io) + +OWNER( + g:yq + g:yql +) + +SIZE(MEDIUM) + +INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/fq_runner/ydb_runner_with_datastreams.inc) + +SRCS( + dq_pq_read_actor_ut.cpp + dq_pq_write_actor_ut.cpp + ut_helpers.cpp +) + +PEERDIR( + ydb/core/testlib/basics/default + ydb/library/yql/minikql/computation/llvm14 + ydb/library/yql/public/udf/service/exception_policy + ydb/library/yql/providers/common/comp_nodes + ydb/library/yql/providers/common/ut_helpers + ydb/library/yql/sql + ydb/public/sdk/cpp/client/ydb_datastreams + ydb/public/sdk/cpp/client/ydb_persqueue_public + ydb/library/yql/minikql/comp_nodes/llvm14 +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/tests/fq/ya.make b/ydb/tests/fq/ya.make index 734e38f7f709..6c4bbc466b59 100644 --- a/ydb/tests/fq/ya.make +++ b/ydb/tests/fq/ya.make @@ -5,6 +5,7 @@ RECURSE_FOR_TESTS( mem_alloc multi_plane plans + pq_async_io restarts s3 yds