Skip to content

Commit

Permalink
VIEW: import (#13361)
Browse files Browse the repository at this point in the history
  • Loading branch information
jepett0 authored Jan 29, 2025
1 parent 54406f6 commit ee426f6
Show file tree
Hide file tree
Showing 26 changed files with 1,171 additions and 309 deletions.
13 changes: 13 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4441,6 +4441,19 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
item.Scheme = scheme;
}

if (rowset.HaveValue<Schema::ImportItems::CreationQuery>()) {
item.CreationQuery = rowset.GetValue<Schema::ImportItems::CreationQuery>();
}

if (rowset.HaveValue<Schema::ImportItems::PreparedCreationQuery>()) {
NKikimrSchemeOp::TModifyScheme preparedQuery;
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(
preparedQuery,
rowset.GetValue<Schema::ImportItems::PreparedCreationQuery>()
));
item.PreparedCreationQuery = std::move(preparedQuery);
}

if (rowset.HaveValue<Schema::ImportItems::Permissions>()) {
Ydb::Scheme::ModifyPermissionsRequest permissions;
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(permissions, rowset.GetValue<Schema::ImportItems::Permissions>()));
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/tx/schemeshard/schemeshard_export__create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -988,8 +988,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
NIceDb::TNiceDb db(txc.DB);

auto& item = exportInfo->Items[itemIdx];
Self->RunningExportSchemeUploaders.erase(item.SchemeUploader);
item.SchemeUploader = TActorId();
Self->RunningExportSchemeUploaders.erase(std::exchange(item.SchemeUploader, {}));

if (!result.Success) {
item.State = EState::Cancelled;
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4558,6 +4558,12 @@ void TSchemeShard::Die(const TActorContext &ctx) {
for (TActorId schemeUploader : RunningExportSchemeUploaders) {
ctx.Send(schemeUploader, new TEvents::TEvPoisonPill());
}
for (TActorId schemeGetter : RunningImportSchemeGetters) {
ctx.Send(schemeGetter, new TEvents::TEvPoisonPill());
}
for (TActorId schemeQueryExecutor : RunningImportSchemeQueryExecutors) {
ctx.Send(schemeQueryExecutor, new TEvents::TEvPoisonPill());
}

IndexBuildPipes.Shutdown(ctx);
CdcStreamScanPipes.Shutdown(ctx);
Expand Down Expand Up @@ -4855,6 +4861,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) {
HFuncTraced(TEvImport::TEvForgetImportRequest, Handle);
HFuncTraced(TEvImport::TEvListImportsRequest, Handle);
HFuncTraced(TEvPrivate::TEvImportSchemeReady, Handle);
HFuncTraced(TEvPrivate::TEvImportSchemeQueryResult, Handle);
// } // NImport

// namespace NBackup {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,8 @@ class TSchemeShard
THashMap<ui64, TImportInfo::TPtr> Imports;
THashMap<TString, TImportInfo::TPtr> ImportsByUid;
THashMap<TTxId, std::pair<ui64, ui32>> TxIdToImport;
THashSet<TActorId> RunningImportSchemeGetters;
THashSet<TActorId> RunningImportSchemeQueryExecutors;

void FromXxportInfo(NKikimrImport::TImport& exprt, const TImportInfo::TPtr importInfo);

Expand All @@ -1268,6 +1270,7 @@ class TSchemeShard
static void PersistImportState(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo);
static void PersistImportItemState(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx);
static void PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx);
static void PersistImportItemPreparedCreationQuery(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx);
static void PersistImportItemDstPathId(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx);

struct TImport {
Expand All @@ -1291,6 +1294,7 @@ class TSchemeShard

NTabletFlatExecutor::ITransaction* CreateTxProgressImport(ui64 id, const TMaybe<ui32>& itemIdx = Nothing());
NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvPrivate::TEvImportSchemeReady::TPtr& ev);
NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev);
NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev);
NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev);
NTabletFlatExecutor::ITransaction* CreateTxProgressImport(TEvIndexBuilder::TEvCreateResponse::TPtr& ev);
Expand All @@ -1302,6 +1306,7 @@ class TSchemeShard
void Handle(TEvImport::TEvForgetImportRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvImport::TEvListImportsRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvImportSchemeReady::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev, const TActorContext& ctx);

void ResumeImports(const TVector<ui64>& ids, const TActorContext& ctx);
// } // NImport
Expand Down
28 changes: 25 additions & 3 deletions ydb/core/tx/schemeshard/schemeshard_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void TSchemeShard::FromXxportInfo(NKikimrImport::TImport& import, const TImportI
case TImportInfo::EState::Waiting:
switch (GetMinState(importInfo)) {
case TImportInfo::EState::GetScheme:
case TImportInfo::EState::CreateTable:
case TImportInfo::EState::CreateSchemeObject:
import.SetProgress(Ydb::Import::ImportProgress::PROGRESS_PREPARING);
break;
case TImportInfo::EState::Transferring:
Expand Down Expand Up @@ -171,11 +171,18 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
const auto& item = importInfo->Items.at(itemIdx);

db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
auto record = db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx);
record.Update(
NIceDb::TUpdate<Schema::ImportItems::Scheme>(item.Scheme.SerializeAsString())
);

if (!item.CreationQuery.empty()) {
record.Update(
NIceDb::TUpdate<Schema::ImportItems::CreationQuery>(item.CreationQuery)
);
}
if (item.Permissions.Defined()) {
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
record.Update(
NIceDb::TUpdate<Schema::ImportItems::Permissions>(item.Permissions->SerializeAsString())
);
}
Expand All @@ -184,6 +191,17 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf
);
}

void TSchemeShard::PersistImportItemPreparedCreationQuery(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
const auto& item = importInfo->Items[itemIdx];

if (item.PreparedCreationQuery) {
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
NIceDb::TUpdate<Schema::ImportItems::PreparedCreationQuery>(item.PreparedCreationQuery->SerializeAsString())
);
}
}

void TSchemeShard::PersistImportItemDstPathId(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
const auto& item = importInfo->Items.at(itemIdx);
Expand Down Expand Up @@ -218,6 +236,10 @@ void TSchemeShard::Handle(TEvPrivate::TEvImportSchemeReady::TPtr& ev, const TAct
Execute(CreateTxProgressImport(ev), ctx);
}

void TSchemeShard::Handle(TEvPrivate::TEvImportSchemeQueryResult::TPtr& ev, const TActorContext& ctx) {
Execute(CreateTxProgressImport(ev), ctx);
}

void TSchemeShard::ResumeImports(const TVector<ui64>& ids, const TActorContext& ctx) {
for (const ui64 id : ids) {
Execute(CreateTxProgressImport(id), ctx);
Expand Down
Loading

0 comments on commit ee426f6

Please sign in to comment.