Skip to content

Commit

Permalink
Merge d571a09 into f40aa6e
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Oct 25, 2024
2 parents f40aa6e + d571a09 commit 8de6cad
Show file tree
Hide file tree
Showing 28 changed files with 3,011 additions and 576 deletions.
17 changes: 17 additions & 0 deletions ydb/core/kqp/common/buffer/buffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#pragma once

#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/core/kqp/common/kqp_tx_manager.h>

namespace NKikimr {
namespace NKqp {

struct TKqpBufferWriterSettings {
TActorId SessionActorId;
IKqpTransactionManagerPtr TxManager;
};

NActors::IActor* CreateKqpBufferWriterActor(TKqpBufferWriterSettings&& settings);

}
}
16 changes: 16 additions & 0 deletions ydb/core/kqp/common/buffer/events.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#include "events.h"

namespace NKikimr {
namespace NKqp {

TEvKqpBuffer::TEvError::TEvError(
const TString& message,
NYql::NDqProto::StatusIds::StatusCode statusCode,
const NYql::TIssues& subIssues)
: Message(message)
, StatusCode(statusCode)
, SubIssues(subIssues) {
}

}
}
47 changes: 47 additions & 0 deletions ydb/core/kqp/common/buffer/events.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#pragma once

#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
#include <ydb/library/yql/public/issue/yql_issue.h>


namespace NKikimr {
namespace NKqp {

struct TEvKqpBuffer {

struct TEvPrepare : public TEventLocal<TEvPrepare, TKqpBufferWriterEvents::EvPrepare> {
TActorId ExecuterActorId;
};

struct TEvCommit : public TEventLocal<TEvCommit, TKqpBufferWriterEvents::EvCommit> {
TActorId ExecuterActorId;
ui64 TxId;
};

struct TEvRollback : public TEventLocal<TEvRollback, TKqpBufferWriterEvents::EvRollback> {
TActorId ExecuterActorId;
};

struct TEvFlush : public TEventLocal<TEvFlush, TKqpBufferWriterEvents::EvFlush> {
TActorId ExecuterActorId;
};

struct TEvResult : public TEventLocal<TEvResult, TKqpBufferWriterEvents::EvResult> {
};

struct TEvError : public TEventLocal<TEvError, TKqpBufferWriterEvents::EvError> {
TString Message;
NYql::NDqProto::StatusIds::StatusCode StatusCode;
NYql::TIssues SubIssues;

TEvError(const TString& message, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& subIssues);
};

struct TEvTerminate : public TEventLocal<TEvTerminate, TKqpBufferWriterEvents::EvTerminate> {
};

};

}
}
14 changes: 14 additions & 0 deletions ydb/core/kqp/common/buffer/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
LIBRARY()

SRCS(
events.cpp
)

PEERDIR(
ydb/core/kqp/common/simple
ydb/library/yql/public/issue
)

YQL_LAST_ABI_VERSION()

END()
12 changes: 6 additions & 6 deletions ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <ydb/core/base/feature_flags.h>
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/common/kqp_tx_manager.h>
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/kqp/provider/yql_kikimr_provider.h>

Expand Down Expand Up @@ -121,12 +122,6 @@ struct TDeferredEffects {
friend class TKqpTransactionContext;
};

struct TTableInfo {
bool IsOlap = false;
THashSet<TStringBuf> Pathes;
};


class TShardIdToTableInfo {
public:
const TTableInfo& Get(ui64 shardId) const {
Expand Down Expand Up @@ -204,6 +199,8 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
void Finish() final {
YQL_ENSURE(DeferredEffects.Empty());
YQL_ENSURE(!Locks.HasLocks());
YQL_ENSURE(!TxManager);
YQL_ENSURE(!BufferActorId);

FinishTime = TInstant::Now();

Expand Down Expand Up @@ -351,6 +348,9 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
bool NeedUncommittedChangesFlush = false;
THashSet<NKikimr::TTableId> ModifiedTablesSinceLastFlush;

TActorId BufferActorId;
IKqpTransactionManagerPtr TxManager = nullptr;

TShardIdToTableInfoPtr ShardIdToTableInfo = std::make_shared<TShardIdToTableInfo>();
};

Expand Down
Loading

0 comments on commit 8de6cad

Please sign in to comment.