Skip to content

Commit

Permalink
Merge branch 'main' into sink-settings
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Sep 16, 2024
2 parents d866efc + e6d795f commit a65adad
Show file tree
Hide file tree
Showing 95 changed files with 2,557 additions and 1,453 deletions.
8 changes: 6 additions & 2 deletions ROADMAP.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# YDB Roadmap
## Intro
The document contains high-level roadmap for YDB. Take a look at [👑 Epics Project](https://github.com/orgs/ydb-platform/projects/46/) also.
## Legend
We use the following symbols as abbreviations:

Expand Down Expand Up @@ -48,9 +50,11 @@ We use the following symbols as abbreviations:
1. ✅ ㉓ **DDL for column-oriented tables**

## Database Core (Tablets, etc)
1. ✅ ㉔ **Exact Nearest Neighbor Vector Search**
1.**Approximate Nearest Neighbor Vector Search**. [Global vector index](https://github.com/ydb-platform/ydb/issues/8967)
1.**Volatile transactions**. YDB Distributed transactions 2.0, minimize network round trips in happy path
1.**Table statistics** for cost-based optimizer
1.**Memory optimization for row tables** (avoid full SST index loading, dynamic cache adjusting)
1.**Memory optimization for row tables** (avoid full [SST index loading](https://github.com/ydb-platform/ydb/issues/1483), dynamic cache adjusting)
1. ㉔ Reduce minimum requirements for **the number of cores to 2** for YDB node
1.**Incremental backup** and **Point-in-time recovery**
1.**``ALTER CHANGEFEED``**
Expand Down Expand Up @@ -129,7 +133,7 @@ Detailed roadmap could be found at [YDB Embedded UI repo](https://github.com/ydb
## Tests and Benchmarks
1.**Built-in load test for DataShards** in YCSB manner
1. ✅ ㉓ **`ydb workload` for topics**
1. **Jepsen tests support**
1. ✅ ㉔ **Jepsen tests support** [Blog post](https://blog.ydb.tech/hardening-ydb-with-jepsen-lessons-learned-e3238a7ef4f2)

## Experiments
1.*(refused)* Try **RTMR-tablet** for key-value workload
22 changes: 5 additions & 17 deletions ydb/core/backup/impl/table_writer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
TSerializationContext(const TSerializationContext& other) = default;
};

const static NKikimrSchemeOp::ECdcStreamFormat StreamType = NKikimrSchemeOp::ECdcStreamFormatProto;

ui64 GetGroup() const override {
return ProtoBody.GetGroup();
}
Expand All @@ -43,9 +41,6 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
EKind GetKind() const override {
return EKind::CdcDataChange;
}
TString GetSourceId() const {
return SourceId;
}

void Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record, TSerializationContext& ctx) const {
switch (ctx.Type) {
Expand All @@ -67,8 +62,12 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
Y_ABORT_UNLESS(Key);
return Key->GetCells();
}

void Accept(NChangeExchange::IVisitor& visitor) const override {
return visitor.Visit(*this);
}

private:
TString SourceId;
NKikimrChangeExchange::TChangeRecord ProtoBody;
NReplication::NService::TLightweightSchema::TCPtr Schema;

Expand Down Expand Up @@ -177,14 +176,3 @@ class TChangeRecordBuilder: public NChangeExchange::TChangeRecordBuilder<TChange
}; // TChangeRecordBuilder

}

namespace NKikimr {

template <>
struct TChangeRecordContainer<NBackup::NImpl::TChangeRecord>
: public TBaseChangeRecordContainer<NBackup::NImpl::TChangeRecord>
{
using TBaseChangeRecordContainer<NBackup::NImpl::TChangeRecord>::TBaseChangeRecordContainer;
};

}
1 change: 1 addition & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ struct TKikimrEvents : TEvents {
ES_LIMITER = 4258,
ES_MEMORY = 4259,
ES_GROUPED_ALLOCATIONS_MANAGER = 4260,
ES_INCREMENTAL_RESTORE_SCAN = 4261,
};
};

Expand Down
20 changes: 12 additions & 8 deletions ydb/core/change_exchange/change_exchange.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include "change_exchange.h"

#include <util/string/builder.h>
#include <util/string/join.h>

namespace NKikimr::NChangeExchange {

/// TEvEnqueueRecords
Expand Down Expand Up @@ -86,19 +89,20 @@ TString TEvChangeExchange::TEvRemoveRecords::ToString() const {
}

// TEvRecords
TEvChangeExchange::TEvRecords::TEvRecords(TChangeRecordVector&& records)
TEvChangeExchange::TEvRecords::TEvRecords(const TVector<IChangeRecord::TPtr>& records)
: Records(records)
{
}

TEvChangeExchange::TEvRecords::TEvRecords(TVector<IChangeRecord::TPtr>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvRecords::ToString() const {
auto visitor = [&](auto& records){
return TStringBuilder() << ToStringHeader() << " {"
<< " Records " << reinterpret_cast<IChangeRecordContainer*>(records.get())->Out()
<< " }";
};

return std::visit(visitor, Records);
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}

/// TEvForgetRecords
Expand Down
65 changes: 3 additions & 62 deletions ydb/core/change_exchange/change_exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,50 +7,6 @@
#include <ydb/core/scheme/scheme_pathid.h>

#include <util/generic/vector.h>
#include <util/string/builder.h>
#include <util/string/join.h>

#include <memory>
#include <variant>

namespace NKikimr {

namespace NDataShard {
class TChangeRecord;
}

namespace NReplication::NService {
class TChangeRecord;
}

namespace NBackup::NImpl {
class TChangeRecord;
}

struct IChangeRecordContainer {
virtual ~IChangeRecordContainer() = default;
virtual TString Out() const = 0;
};

template <typename T>
struct TBaseChangeRecordContainer: public IChangeRecordContainer {
TVector<typename T::TPtr> Records;

TBaseChangeRecordContainer() = default;

explicit TBaseChangeRecordContainer(TVector<typename T::TPtr>&& records)
: Records(std::move(records))
{}

TString Out() const override {
return TStringBuilder() << "[" << JoinSeq(",", Records) << "]";
}
};

template <typename T>
struct TChangeRecordContainer {};

}

namespace NKikimr::NChangeExchange {

Expand Down Expand Up @@ -117,26 +73,11 @@ struct TEvChangeExchange {
};

struct TEvRecords: public TEventLocal<TEvRecords, EvRecords> {
using TChangeRecordVector = std::variant<
std::shared_ptr<TChangeRecordContainer<NDataShard::TChangeRecord>>,
std::shared_ptr<TChangeRecordContainer<NReplication::NService::TChangeRecord>>,
std::shared_ptr<TChangeRecordContainer<NBackup::NImpl::TChangeRecord>>
>;

TChangeRecordVector Records;
TVector<IChangeRecord::TPtr> Records;

explicit TEvRecords(TChangeRecordVector&& records);
explicit TEvRecords(const TVector<IChangeRecord::TPtr>& records);
explicit TEvRecords(TVector<IChangeRecord::TPtr>&& records);
TString ToString() const override;

template <typename T>
static TEvRecords* New(TVector<typename T::TPtr>&& records) {
return new TEvRecords(std::make_shared<TChangeRecordContainer<T>>(std::move(records)));
}

template <typename T>
inline auto& GetRecords() {
return std::get<std::shared_ptr<TChangeRecordContainer<T>>>(Records)->Records;
}
};

struct TEvForgetRecords: public TEventLocal<TEvForgetRecords, EvForgetRecods> {
Expand Down
12 changes: 9 additions & 3 deletions ydb/core/change_exchange/change_record.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#pragma once

#include <ydb/core/change_exchange/visitor.h>

#include <util/generic/ptr.h>
#include <util/generic/string.h>
#include <util/stream/output.h>

namespace NKikimr::NChangeExchange {

class IChangeSenderResolver;

class IChangeRecord: public TThrRefBase {
public:
using TPtr = TIntrusivePtr<IChangeRecord>;
Expand All @@ -32,8 +32,11 @@ class IChangeRecord: public TThrRefBase {
virtual EKind GetKind() const = 0;
virtual const TString& GetBody() const = 0;
virtual ESource GetSource() const = 0;
virtual const TString& GetSourceId() const = 0;
virtual bool IsBroadcast() const = 0;

virtual void Accept(IVisitor& visitor) const = 0;

virtual TString ToString() const = 0;
virtual void Out(IOutputStream& out) const = 0;

Expand All @@ -43,12 +46,14 @@ template <typename T, typename TDerived>
class TChangeRecordBuilder;

class TChangeRecordBase: public IChangeRecord {
template <typename T, typename TDerived> friend class TChangeRecordBuilder;
template <typename T, typename TDerived>
friend class TChangeRecordBuilder;

public:
ui64 GetOrder() const override { return Order; }
const TString& GetBody() const override { return Body; }
ESource GetSource() const override { return Source; }
const TString& GetSourceId() const override { return SourceId; }
bool IsBroadcast() const override { return false; }

TString ToString() const override;
Expand All @@ -58,6 +63,7 @@ class TChangeRecordBase: public IChangeRecord {
ui64 Order = Max<ui64>();
TString Body;
ESource Source = ESource::Unspecified;
TString SourceId;

}; // TChangeRecordBase

Expand Down
Loading

0 comments on commit a65adad

Please sign in to comment.