Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 committed Dec 2, 2024
1 parent 6c33f1c commit d236268
Showing 1 changed file with 20 additions and 14 deletions.
34 changes: 20 additions & 14 deletions ydb/core/kqp/runtime/kqp_write_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ namespace {
constexpr ui64 DataShardMaxOperationBytes = 8_MB;
constexpr ui64 ColumnShardMaxOperationBytes = 64_MB;

using TCharVectorPtr = std::unique_ptr<TVector<char>>;

class TColumnBatch : public IDataBatch {
public:
using TRecordBatchPtr = std::shared_ptr<arrow::RecordBatch>;
Expand Down Expand Up @@ -67,13 +69,13 @@ class TRowBatch : public IDataBatch {
return Cells.empty();
}

std::pair<std::vector<TCell>, std::vector<std::string>> Extract() {
std::pair<std::vector<TCell>, std::vector<TCharVectorPtr>> Extract() {
Size = 0;
Rows = 0;
return {std::move(Cells), std::move(Data)};
}

TRowBatch(std::vector<TCell>&& cells, std::vector<std::string>&& data, i64 size, ui32 rows, ui16 columns)
TRowBatch(std::vector<TCell>&& cells, std::vector<TCharVectorPtr>&& data, i64 size, ui32 rows, ui16 columns)
: Cells(std::move(cells))
, Data(std::move(data))
, Size(size)
Expand All @@ -83,7 +85,7 @@ class TRowBatch : public IDataBatch {

private:
std::vector<TCell> Cells;
std::vector<std::string> Data;
std::vector<TCharVectorPtr> Data;
ui64 Size = 0;
ui32 Rows = 0;
ui16 Columns = 0;
Expand Down Expand Up @@ -154,13 +156,16 @@ std::set<std::string> BuildNotNullColumns(const TConstArrayRef<NKikimrKqp::TKqpC
}

std::vector<std::pair<TString, NScheme::TTypeInfo>> BuildBatchBuilderColumns(
const std::vector<ui32>& writeIndex,
const TConstArrayRef<NKikimrKqp::TKqpColumnMetadataProto> inputColumns) {
std::vector<std::pair<TString, NScheme::TTypeInfo>> result;
for (const auto& column : inputColumns) {
std::vector<std::pair<TString, NScheme::TTypeInfo>> result(writeIndex.size());
for (size_t index = 0; index < inputColumns.size(); ++index) {
const auto& column = inputColumns[index];
YQL_ENSURE(column.HasTypeId());
auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(column.GetTypeId(),
column.HasTypeInfo() ? &column.GetTypeInfo() : nullptr);
result.emplace_back(column.GetName(), typeInfoMod.TypeInfo);
result[writeIndex[index]].first = column.GetName();
result[writeIndex[index]].second = typeInfoMod.TypeInfo;
}
return result;
}
Expand All @@ -179,7 +184,7 @@ TVector<NScheme::TTypeInfo> BuildKeyColumnTypes(

struct TRowWithData {
TVector<TCell> Cells;
std::string Data;
TCharVectorPtr Data;
};

class TRowBuilder {
Expand Down Expand Up @@ -233,13 +238,13 @@ class TRowBuilder {
cells.reserve(CellsInfo.size());
const auto size = DataSize();
auto data = Allocate(size);
char* ptr = data.data();
char* ptr = data->data();

for (const auto& cellInfo : CellsInfo) {
cells.push_back(BuildCell(cellInfo, ptr));
}

AFL_ENSURE(ptr == data.data() + size);
AFL_ENSURE(ptr == data->data() + size);

return TRowWithData {
.Cells = std::move(cells),
Expand Down Expand Up @@ -299,8 +304,8 @@ class TRowBuilder {
return cellInfo.Value.AsStringRef().Size();
}

std::string Allocate(size_t size) {
return std::string(size, 0);
TCharVectorPtr Allocate(size_t size) {
return std::make_unique<TVector<char>>(size);
}

TVector<TCellInfo> CellsInfo;
Expand All @@ -317,7 +322,7 @@ class TColumnDataBatcher : public IDataBatcher {
, WriteIndex(std::move(writeIndex))
, BatchBuilder(arrow::Compression::UNCOMPRESSED, BuildNotNullColumns(inputColumns)) {
TString err;
if (!BatchBuilder.Start(BuildBatchBuilderColumns(inputColumns), 0, 0, err)) {
if (!BatchBuilder.Start(BuildBatchBuilderColumns(WriteIndex, inputColumns), 0, 0, err)) {
yexception() << "Failed to start batch builder: " + err;
}
}
Expand Down Expand Up @@ -563,7 +568,7 @@ class TRowsBatcher {
i64 Memory = 0;
i64 MemorySerialized = 0;
TVector<TCell> Cells;
TVector<std::string> Data;
TVector<TCharVectorPtr> Data;
};

TBatch Flush(bool force) {
Expand Down Expand Up @@ -674,6 +679,7 @@ class TDataShardPayloadSerializer : public IPayloadSerializer {
}

void AddRow(TRowWithData&& row, const TKeyDesc& keyRange) {
YQL_ENSURE(row.Cells.size() >= KeyColumnTypes.size());
auto shardIter = std::lower_bound(
std::begin(keyRange.GetPartitions()),
std::end(keyRange.GetPartitions()),
Expand Down Expand Up @@ -713,7 +719,7 @@ class TDataShardPayloadSerializer : public IPayloadSerializer {
AddRow(
TRowWithData{
TVector<TCell>(cells.begin() + (rowIndex * Columns.size()), cells.begin() + (rowIndex * Columns.size()) + Columns.size()),
data[rowIndex],
std::move(data[rowIndex]),
},
KeyDescription);
}
Expand Down

0 comments on commit d236268

Please sign in to comment.