diff --git a/ydb/library/workload/abstract/workload_query_generator.h b/ydb/library/workload/abstract/workload_query_generator.h index 9458be32c76a..4e27dbe47dcd 100644 --- a/ydb/library/workload/abstract/workload_query_generator.h +++ b/ydb/library/workload/abstract/workload_query_generator.h @@ -72,7 +72,10 @@ class IBulkDataGenerator { TString Schema; }; - using TDataType = std::variant; + struct TSkip { + }; + + using TDataType = std::variant; template TDataPortion(const TString& table, T&& data, ui64 size) diff --git a/ydb/library/workload/clickbench/data_generator.cpp b/ydb/library/workload/clickbench/data_generator.cpp index 358d2b1dcddc..65a5d0366d7f 100644 --- a/ydb/library/workload/clickbench/data_generator.cpp +++ b/ydb/library/workload/clickbench/data_generator.cpp @@ -30,7 +30,7 @@ TBulkDataGeneratorList TClickbenchWorkloadDataInitializerGenerator::DoGetBulkIni } TClickbenchWorkloadDataInitializerGenerator::TDataGenerartor::TDataGenerartor(const TClickbenchWorkloadDataInitializerGenerator& owner) - : IBulkDataGenerator("hits", CalcSize(owner)) + : IBulkDataGenerator("hits", DataSetSize) , Owner(owner) { if (Owner.GetDataFiles().IsDirectory()) { @@ -52,6 +52,22 @@ IBulkDataGenerator::TDataPortions TClickbenchWorkloadDataInitializerGenerator::T if (Files.empty()) { return {}; } + if (FirstPortion) { + FirstPortion = false; + ui64 toSkip = 0; + if (Owner.StateProcessor) { + for (const auto& [file, state]: Owner.StateProcessor->GetState()) { + toSkip += state.Position; + } + } + if (toSkip) { + return { MakeIntrusive( + Owner.Params.GetFullTableName(nullptr), + TDataPortion::TSkip(), + toSkip + )}; + } + } index = std::hash{}(std::this_thread::get_id()) % Files.size(); file = Files[index]; } @@ -69,16 +85,6 @@ IBulkDataGenerator::TDataPortions TClickbenchWorkloadDataInitializerGenerator::T } } -ui64 TClickbenchWorkloadDataInitializerGenerator::TDataGenerartor::CalcSize(const TClickbenchWorkloadDataInitializerGenerator& owner) { - ui64 result = 99997497; - if (owner.StateProcessor) { - for (const auto& [file, state]: owner.StateProcessor->GetState()) { - result -= state.Position; - } - } - return result; -} - class TClickbenchWorkloadDataInitializerGenerator::TDataGenerartor::TCsvFileBase: public TClickbenchWorkloadDataInitializerGenerator::TDataGenerartor::TFile { public: TCsvFileBase(TDataGenerartor& owner, const TString& path, const TString& delimiter, const TString& foramt) @@ -116,7 +122,8 @@ class TClickbenchWorkloadDataInitializerGenerator::TDataGenerartor::TCsvFileBase with_lock(Lock) { TString line; if (Owner.Owner.StateProcessor && Owner.Owner.StateProcessor->GetState().contains(Path)) { - while(Owner.Owner.StateProcessor->GetState().at(Path).Position > Readed && Decompressor->ReadLine(line)) { + auto position = Owner.Owner.StateProcessor->GetState().at(Path).Position; + while(position > Readed && Decompressor->ReadLine(line)) { ++Readed; } } diff --git a/ydb/library/workload/clickbench/data_generator.h b/ydb/library/workload/clickbench/data_generator.h index f57c40d05f5f..de6b9eede5cb 100644 --- a/ydb/library/workload/clickbench/data_generator.h +++ b/ydb/library/workload/clickbench/data_generator.h @@ -36,12 +36,13 @@ class TClickbenchWorkloadDataInitializerGenerator: public TWorkloadDataInitializ class TTsvFile; class TCsvFile; void AddFile(const TFsPath& path); - static ui64 CalcSize(const TClickbenchWorkloadDataInitializerGenerator& owner); private: const TClickbenchWorkloadDataInitializerGenerator& Owner; TVector Files; TAdaptiveLock Lock; + bool FirstPortion = true; + static constexpr ui64 DataSetSize = 99997497; }; }; diff --git a/ydb/library/workload/tpcds/data_generator.cpp b/ydb/library/workload/tpcds/data_generator.cpp index 9a0657ae8b28..c0d0c472c98e 100644 --- a/ydb/library/workload/tpcds/data_generator.cpp +++ b/ydb/library/workload/tpcds/data_generator.cpp @@ -63,12 +63,10 @@ TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TPositions TTpcdsWor split_work(tableNum, &result.FirstRow, &result.Count); if (useState && owner.StateProcessor && owner.StateProcessor->GetState().contains(tdef->name)) { result.Position = owner.StateProcessor->GetState().at(tdef->name).Position; - result.Count -= std::min(result.Count, result.Position); //this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c while (result.Position && !allowedModules.contains((result.FirstRow + result.Position) % 6)) { --result.Position; - ++result.Count; } } //this magic is needed for SCD to work correctly. See setSCDKeys in ydb/library/benchmarks/gen/tpcds-dbgen/scd.c @@ -129,12 +127,11 @@ TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TBulkDataGenerator(c : IBulkDataGenerator(getTdefsByNumber(tableNum)->name, CalcCountToGenerate(owner, tableNum, true).Count) , TableNum(tableNum) , Owner(owner) - , TableSize(CalcCountToGenerate(owner, tableNum, false).Count) {} TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::GenerateDataPortion() { TDataPortions result; - if (TableSize == 0) { + if (GetSize() == 0) { return result; } TContexts ctxs; @@ -149,6 +146,11 @@ TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcds auto positions = CalcCountToGenerate(Owner, TableNum, !Generated); if (!Generated) { Generated = positions.Position; + result.push_back(MakeIntrusive( + GetFullTableName(tdef->name), + TDataPortion::TSkip(), + Generated + )); if (const ui32 toSkip = positions.FirstRow + positions.Position - 1) { row_skip(TableNum, toSkip); if (tdef->flags & FL_PARENT) { @@ -159,7 +161,7 @@ TTpcdsWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpcds resetCountCount(); } } - const auto count = TableSize > Generated ? std::min(ui64(TableSize - Generated), Owner.Params.BulkSize) : 0; + const auto count = GetSize() > Generated ? std::min(ui64(GetSize() - Generated), Owner.Params.BulkSize) : 0; if (!count) { return result; } diff --git a/ydb/library/workload/tpcds/data_generator.h b/ydb/library/workload/tpcds/data_generator.h index 1de7d2a2fb47..f10c9aa8ef4e 100644 --- a/ydb/library/workload/tpcds/data_generator.h +++ b/ydb/library/workload/tpcds/data_generator.h @@ -70,7 +70,6 @@ class TTpcdsWorkloadDataInitializerGenerator: public TWorkloadDataInitializerBas }; static TPositions CalcCountToGenerate(const TTpcdsWorkloadDataInitializerGenerator& owner, int tableNum, bool useState); const TTpcdsWorkloadDataInitializerGenerator& Owner; - ui64 TableSize; }; }; diff --git a/ydb/library/workload/tpch/data_generator.cpp b/ydb/library/workload/tpch/data_generator.cpp index df9cd2fe31b3..de493dd9a434 100644 --- a/ydb/library/workload/tpch/data_generator.cpp +++ b/ydb/library/workload/tpch/data_generator.cpp @@ -46,23 +46,19 @@ TBulkDataGeneratorList TTpchWorkloadDataInitializerGenerator::DoGetBulkInitialDa } -ui64 TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::CalcCountToGenerate(const TTpchWorkloadDataInitializerGenerator& owner, int tableNum, bool useState) { +ui64 TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::CalcCountToGenerate(const TTpchWorkloadDataInitializerGenerator& owner, int tableNum) { if (tableNum == NONE) { return 0; } - ui64 position = 0; - if (useState && owner.StateProcessor && owner.StateProcessor->GetState().contains(tdefs[tableNum].name)) { - position = owner.StateProcessor->GetState().at(tdefs[tableNum].name).Position; - } if (tableNum >= NATION) { - return owner.GetProcessIndex() ? 0 : (tdefs[tableNum].base - position); + return owner.GetProcessIndex() ? 0 : tdefs[tableNum].base; } ui64 rowCount = tdefs[tableNum].base * owner.GetScale(); ui64 extraRows = 0; if (owner.GetProcessIndex() + 1 >= owner.GetProcessCount()) { extraRows = rowCount % owner.GetProcessCount(); } - return rowCount / owner.GetProcessCount() + extraRows - position; + return rowCount / owner.GetProcessCount() + extraRows; } TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TContext::TContext(const TBulkDataGenerator& owner, int tableNum, TGeneratorStateProcessor* state) @@ -112,15 +108,14 @@ TString TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::GetFullTableN } TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TBulkDataGenerator(const TTpchWorkloadDataInitializerGenerator& owner, int tableNum) - : IBulkDataGenerator(tdefs[tableNum].name, CalcCountToGenerate(owner, tableNum, true)) + : IBulkDataGenerator(tdefs[tableNum].name, CalcCountToGenerate(owner, tableNum)) , TableNum(tableNum) , Owner(owner) - , TableSize(CalcCountToGenerate(owner, tableNum, false)) {} TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::GenerateDataPortion() { TDataPortions result; - if (TableSize == 0) { + if (GetSize() == 0) { return result; } TContexts ctxs; @@ -138,11 +133,16 @@ TTpchWorkloadDataInitializerGenerator::TBulkDataGenerator::TDataPortions TTpchWo if (!!Owner.StateProcessor) { if (const auto* state = MapFindPtr(Owner.StateProcessor->GetState(), GetName())) { Generated = state->Position; + result.push_back(MakeIntrusive( + GetFullTableName(tdefs[TableNum].name), + TDataPortion::TSkip(), + Generated + )); GenSeed(TableNum, Generated); } } } - const auto count = TableSize > Generated ? std::min(ui64(TableSize - Generated), Owner.Params.BulkSize) : 0; + const auto count = GetSize() > Generated ? std::min(ui64(GetSize() - Generated), Owner.Params.BulkSize) : 0; if (!count) { return result; } diff --git a/ydb/library/workload/tpch/data_generator.h b/ydb/library/workload/tpch/data_generator.h index a4853af8c1a7..b2838ab0aaff 100644 --- a/ydb/library/workload/tpch/data_generator.h +++ b/ydb/library/workload/tpch/data_generator.h @@ -56,9 +56,8 @@ class TTpchWorkloadDataInitializerGenerator: public TWorkloadDataInitializerBase private: TString GetFullTableName(const char* table) const; - static ui64 CalcCountToGenerate(const TTpchWorkloadDataInitializerGenerator& owner, int tableNum, bool useState); + static ui64 CalcCountToGenerate(const TTpchWorkloadDataInitializerGenerator& owner, int tableNum); const TTpchWorkloadDataInitializerGenerator& Owner; - ui64 TableSize; }; }; diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp index f65f1a8d66f8..97a4e178e990 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp @@ -84,6 +84,9 @@ class TWorkloadCommandImport::TUploadCommand::TDbWriter: public IWriter { auto convertResult = [](const NTable::TAsyncBulkUpsertResult& result) { return TStatus(result.GetValueSync()); }; + if (std::holds_alternative(portion->MutableData())) { + return NThreading::MakeFuture(TStatus(EStatus::SUCCESS, NYql::TIssues())); + } if (auto* value = std::get_if(&portion->MutableData())) { return Owner.TableClient->BulkUpsert(portion->GetTable(), std::move(*value)).Apply(convertResult); } @@ -118,6 +121,9 @@ class TWorkloadCommandImport::TUploadCommand::TFileWriter: public IWriter { } TAsyncStatus WriteDataPortion(NYdbWorkload::IBulkDataGenerator::TDataPortionPtr portion) override { + if (std::holds_alternative(portion->MutableData())) { + return NThreading::MakeFuture(TStatus(EStatus::SUCCESS, NYql::TIssues())); + } if (auto* value = std::get_if(&portion->MutableData())) { return NThreading::MakeErrorFuture(std::make_exception_ptr(yexception() << "Not implemented")); } @@ -165,7 +171,7 @@ void TWorkloadCommandImport::TUploadCommand::ProcessDataGenerator(std::shared_pt if (!res.IsSuccess()) { Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl; AtomicIncrement(ErrorsCount); - } else { + } else if (data->GetSize()) { Bar->AddProgress(data->GetSize()); } AtomicDecrement(counter);