Skip to content

Commit

Permalink
Merge 2c2d300 into d79de8b
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru authored Feb 22, 2024
2 parents d79de8b + 2c2d300 commit 1e03037
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 32 deletions.
124 changes: 121 additions & 3 deletions ydb/core/blobstorage/nodewarden/distconf.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ namespace NKikimr::NStorage {
};

struct TEvStorageConfigLoaded : TEventLocal<TEvStorageConfigLoaded, EvStorageConfigLoaded> {
std::vector<std::tuple<TString, NKikimrBlobStorage::TPDiskMetadataRecord>> MetadataPerPath;
std::vector<std::tuple<TString, NKikimrBlobStorage::TPDiskMetadataRecord, std::optional<ui64>>> MetadataPerPath;
};

struct TEvStorageConfigStored : TEventLocal<TEvStorageConfigStored, EvStorageConfigStored> {
std::vector<std::tuple<TString, bool>> StatusPerPath;
std::vector<std::tuple<TString, bool, std::optional<ui64>>> StatusPerPath;
};
};

Expand Down Expand Up @@ -437,7 +437,7 @@ namespace NKikimr::NStorage {
EnumerateConfigDrives(config, 0, cb, &nodeMap);

// process responses
generateSuccessful([&](const TNodeIdentifier& node, const TString& path) {
generateSuccessful([&](const TNodeIdentifier& node, const TString& path, std::optional<ui64> /*guid*/) {
const auto it = nodeMap.find(node.NodeId());
if (it == nodeMap.end() || TNodeIdentifier(*it->second) != node) { // unexpected node answers
return;
Expand Down Expand Up @@ -498,4 +498,122 @@ namespace NKikimr::NStorage {
return ok > err;
}

template<typename T>
bool HasStorageQuorum(const NKikimrBlobStorage::TStorageConfig& config, T&& generateSuccessful,
const TNodeWardenConfig& nwConfig, bool allowUnformatted) {
auto makeError = [&](TString error) -> bool {
STLOG(PRI_CRIT, BS_NODE, NWDC41, "configuration incorrect", (Error, error));
Y_DEBUG_ABORT_UNLESS(false, "%s", error.c_str());
return false;
};
if (!config.HasBlobStorageConfig()) { // no storage config at all -- however, this is quite strange
return makeError("no BlobStorageConfig section in config");
}
const auto& bsConfig = config.GetBlobStorageConfig();
if (!bsConfig.HasServiceSet()) { // maybe this is initial configuration
return !config.GetGeneration() || makeError("non-initial configuration with missing ServiceSet");
}
const auto& ss = bsConfig.GetServiceSet();

// build map of group infos
struct TGroupRecord {
TIntrusivePtr<TBlobStorageGroupInfo> Info;
TBlobStorageGroupInfo::TGroupVDisks Confirmed; // a set of confirmed group disks

TGroupRecord(TIntrusivePtr<TBlobStorageGroupInfo>&& info)
: Info(std::move(info))
, Confirmed(&Info->GetTopology())
{}
};
THashMap<ui32, TGroupRecord> groups;
for (const auto& group : ss.GetGroups()) {
const ui32 groupId = group.GetGroupID();
if (TGroupID(groupId).ConfigurationType() != EGroupConfigurationType::Static) {
return makeError("nonstatic group id in static configuration section");
}

TStringStream err;
TIntrusivePtr<TBlobStorageGroupInfo> info = TBlobStorageGroupInfo::Parse(group, &nwConfig.StaticKey, &err);
if (!info) {
return makeError(TStringBuilder() << "failed to parse static group " << groupId << ": " << err.Str());
}

if (const auto [it, inserted] = groups.emplace(groupId, std::move(info)); !inserted) {
return makeError("duplicate group id in static configuration section");
}
}

// fill in pdisk map
THashMap<std::tuple<ui32, ui32, ui64>, TString> pdiskIdToPath; // (nodeId, pdiskId, pdiskGuid) -> path
for (const auto& pdisk : ss.GetPDisks()) {
const auto [it, inserted] = pdiskIdToPath.emplace(std::make_tuple(pdisk.GetNodeID(), pdisk.GetPDiskID(),
pdisk.GetPDiskGuid()), pdisk.GetPath());
if (!inserted) {
return makeError("duplicate pdisk in static configuration section");
}
}

// create confirmation map
THashMultiMap<std::tuple<ui32, TString, std::optional<ui64>>, TVDiskID> confirm;
for (const auto& vdisk : ss.GetVDisks()) {
if (!vdisk.HasVDiskID() || !vdisk.HasVDiskLocation()) {
return makeError("incorrect TVDisk record");
}
const auto vdiskId = VDiskIDFromVDiskID(vdisk.GetVDiskID());
const auto it = groups.find(vdiskId.GroupID);
if (it == groups.end()) {
return makeError(TStringBuilder() << "VDisk " << vdiskId << " does not match any static group");
}
const TGroupRecord& group = it->second;
if (vdiskId.GroupGeneration != group.Info->GroupGeneration) {
return makeError(TStringBuilder() << "VDisk " << vdiskId << " group generation mismatch");
}
const auto& location = vdisk.GetVDiskLocation();
const auto jt = pdiskIdToPath.find(std::make_tuple(location.GetNodeID(), location.GetPDiskID(),
location.GetPDiskGuid()));
if (jt == pdiskIdToPath.end()) {
return makeError(TStringBuilder() << "VDisk " << vdiskId << " points to incorrect PDisk record");
}
confirm.emplace(std::make_tuple(location.GetNodeID(), jt->second, location.GetPDiskGuid()), vdiskId);
if (allowUnformatted) {
confirm.emplace(std::make_tuple(location.GetNodeID(), jt->second, std::nullopt), vdiskId);
}
}

// process responded nodes
generateSuccessful([&](const TNodeIdentifier& node, const TString& path, std::optional<ui64> guid) {
const auto key = std::make_tuple(node.NodeId(), path, guid);
const auto [begin, end] = confirm.equal_range(key);
for (auto it = begin; it != end; ++it) {
const TVDiskID& vdiskId = it->second;
TGroupRecord& group = groups.at(vdiskId.GroupID);
group.Confirmed |= {&group.Info->GetTopology(), vdiskId};
}
});

// scan all groups and find ones without quorum
for (const auto& [groupId, group] : groups) {
if (const auto& checker = group.Info->GetQuorumChecker(); !checker.CheckQuorumForGroup(group.Confirmed)) {
return false;
}
}

return true; // all group meet their quorums
}

// Ensure configuration has quorum in both disk and storage ways for current and previous configuration.
template<typename T>
bool HasConfigQuorum(const NKikimrBlobStorage::TStorageConfig& config, T&& generateSuccessful,
const TNodeWardenConfig& nwConfig) {
return HasDiskQuorum(config, generateSuccessful) &&
HasStorageQuorum(config, generateSuccessful, nwConfig, true) && (!config.HasPrevConfig() || (
HasDiskQuorum(config.GetPrevConfig(), generateSuccessful) &&
HasStorageQuorum(config.GetPrevConfig(), generateSuccessful, nwConfig, false)));
}

} // NKikimr::NStorage

template<>
struct THash<std::optional<ui64>> {
size_t operator ()(std::optional<ui64> x) const { return MultiHash(x.has_value(), x.value_or(0)); }
};
28 changes: 14 additions & 14 deletions ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ namespace NKikimr::NStorage {

struct TDiskConfigInfo {
NKikimrBlobStorage::TStorageConfig Config;
THashSet<std::tuple<TNodeIdentifier, TString>> HavingDisks;
THashSet<std::tuple<TNodeIdentifier, TString, std::optional<ui64>>> HavingDisks;
};
THashMap<TStorageConfigMeta, TDiskConfigInfo> committedConfigs;
THashMap<TStorageConfigMeta, TDiskConfigInfo> proposedConfigs;
Expand All @@ -162,7 +162,7 @@ namespace NKikimr::NStorage {
r.Config.CopyFrom(config.GetConfig());
}
for (const auto& disk : config.GetDisks()) {
r.HavingDisks.emplace(disk.GetNodeId(), disk.GetPath());
r.HavingDisks.emplace(disk.GetNodeId(), disk.GetPath(), disk.HasGuid() ? std::make_optional(disk.GetGuid()) : std::nullopt);
}
}
}
Expand All @@ -171,15 +171,12 @@ namespace NKikimr::NStorage {
TDiskConfigInfo& r = it->second;

auto generateSuccessful = [&](auto&& callback) {
for (const auto& [node, path] : r.HavingDisks) {
callback(node, path);
for (const auto& [node, path, guid] : r.HavingDisks) {
callback(node, path, guid);
}
};

const bool quorum = HasDiskQuorum(r.Config, generateSuccessful) &&
(!r.Config.HasPrevConfig() || HasDiskQuorum(r.Config.GetPrevConfig(), generateSuccessful));

if (quorum) {
if (HasConfigQuorum(r.Config, generateSuccessful, *Cfg)) {
++it;
} else {
set->erase(it++);
Expand Down Expand Up @@ -249,14 +246,13 @@ namespace NKikimr::NStorage {
auto generateSuccessful = [&](auto&& callback) {
for (const auto& item : res->GetStatus()) {
const TNodeIdentifier node(item.GetNodeId());
for (const TString& path : item.GetSuccessfulDrives()) {
callback(node, path);
for (const auto& drive : item.GetSuccessfulDrives()) {
callback(node, drive.GetPath(), drive.HasGuid() ? std::make_optional(drive.GetGuid()) : std::nullopt);
}
}
};

if (HasDiskQuorum(CurrentProposedStorageConfig, generateSuccessful) &&
HasDiskQuorum(CurrentProposedStorageConfig.GetPrevConfig(), generateSuccessful)) {
if (HasConfigQuorum(CurrentProposedStorageConfig, generateSuccessful, *Cfg)) {
// apply configuration and spread it
ApplyStorageConfig(CurrentProposedStorageConfig);
for (const auto& [nodeId, info] : DirectBoundNodes) {
Expand Down Expand Up @@ -507,9 +503,13 @@ namespace NKikimr::NStorage {
auto *status = task.Response.MutableProposeStorageConfig()->AddStatus();
SelfNode.Serialize(status->MutableNodeId());
status->SetStatus(TEvGather::TProposeStorageConfig::ACCEPTED);
for (const auto& [path, ok] : msg.StatusPerPath) {
for (const auto& [path, ok, guid] : msg.StatusPerPath) {
if (ok) {
status->AddSuccessfulDrives(path);
auto *drive = status->AddSuccessfulDrives();
drive->SetPath(path);
if (guid) {
drive->SetGuid(*guid);
}
}
}

Expand Down
36 changes: 26 additions & 10 deletions ydb/core/blobstorage/nodewarden/distconf_persistent_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ namespace NKikimr::NStorage {
auto ev = std::make_unique<TEvPrivate::TEvStorageConfigLoaded>();
for (const TString& path : drives) {
TRcBuf metadata;
switch (auto status = ReadPDiskMetadata(path, cfg->PDiskKey, metadata)) {
std::optional<ui64> guid;
switch (auto status = ReadPDiskMetadata(path, cfg->PDiskKey, metadata, &guid)) {
case NPDisk::EPDiskMetadataOutcome::OK:
if (NKikimrBlobStorage::TPDiskMetadataRecord m; m.ParseFromString(metadata.ExtractUnderlyingContainerOrCopy<TString>())) {
auto& [p, config] = ev->MetadataPerPath.emplace_back();
auto& [p, config, g] = ev->MetadataPerPath.emplace_back();
p = path;
config.Swap(&m);
g = guid;
}
break;

Expand All @@ -36,13 +38,14 @@ namespace NKikimr::NStorage {
TString data;
const bool success = record.SerializeToString(&data);
Y_ABORT_UNLESS(success);
switch (WritePDiskMetadata(path, cfg->PDiskKey, TRcBuf(std::move(data)))) {
std::optional<ui64> guid;
switch (WritePDiskMetadata(path, cfg->PDiskKey, TRcBuf(std::move(data)), &guid)) {
case NPDisk::EPDiskMetadataOutcome::OK:
ev->StatusPerPath.emplace_back(path, true);
ev->StatusPerPath.emplace_back(path, true, guid);
break;

default:
ev->StatusPerPath.emplace_back(path, false);
ev->StatusPerPath.emplace_back(path, false, std::nullopt);
break;
}
}
Expand Down Expand Up @@ -110,7 +113,7 @@ namespace NKikimr::NStorage {
void TDistributedConfigKeeper::Handle(TEvPrivate::TEvStorageConfigStored::TPtr ev) {
ui32 numOk = 0;
ui32 numError = 0;
for (const auto& [path, status] : ev->Get()->StatusPerPath) {
for (const auto& [path, status, guid] : ev->Get()->StatusPerPath) {
++(status ? numOk : numError);
}

Expand Down Expand Up @@ -152,7 +155,7 @@ namespace NKikimr::NStorage {
proposed.try_emplace(item.GetConfig(), &item);
}

for (const auto& [path, m] : msg.MetadataPerPath) {
for (const auto& [path, m, guid] : msg.MetadataPerPath) {
auto addConfig = [&, path = path](const auto& config, auto func, auto& set) {
auto& ptr = set[config];
if (!ptr) {
Expand All @@ -162,6 +165,9 @@ namespace NKikimr::NStorage {
auto *disk = ptr->AddDisks();
SelfNode.Serialize(disk->MutableNodeId());
disk->SetPath(path);
if (guid) {
disk->SetGuid(*guid);
}
};

if (m.HasCommittedStorageConfig()) {
Expand All @@ -175,7 +181,7 @@ namespace NKikimr::NStorage {
FinishAsyncOperation(it->first);
}
} else { // just loaded the initial config, try to acquire newer configuration
for (const auto& [path, m] : msg.MetadataPerPath) {
for (const auto& [path, m, guid] : msg.MetadataPerPath) {
if (m.HasCommittedStorageConfig()) {
const auto& config = m.GetCommittedStorageConfig();
if (InitialConfig.GetGeneration() < config.GetGeneration()) {
Expand Down Expand Up @@ -251,7 +257,8 @@ namespace NKikimr {
return true;
}

NPDisk::EPDiskMetadataOutcome ReadPDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf& metadata) {
NPDisk::EPDiskMetadataOutcome ReadPDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf& metadata,
std::optional<ui64> *pdiskGuid) {
TFileHandle fh(VaultPath, OpenExisting | RdWr);
if (!fh.IsOpen()) {
return NPDisk::EPDiskMetadataOutcome::NO_METADATA;
Expand Down Expand Up @@ -284,6 +291,12 @@ namespace NKikimr {
TPDiskInfo info;
const bool pdiskSuccess = ReadPDiskFormatInfo(path, key, info, false);

if (pdiskSuccess) {
pdiskGuid->emplace(info.DiskGuid);
} else {
pdiskGuid->reset();
}

if (it->GetUnformatted() && pdiskSuccess) {
it->SetPDiskGuid(info.DiskGuid);
it->SetTimestamp(info.Timestamp.GetValue());
Expand All @@ -308,7 +321,8 @@ namespace NKikimr {
return NPDisk::EPDiskMetadataOutcome::NO_METADATA;
}

NPDisk::EPDiskMetadataOutcome WritePDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf&& metadata) {
NPDisk::EPDiskMetadataOutcome WritePDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf&& metadata,
std::optional<ui64> *pdiskGuid) {
TFileHandle fh(VaultPath, OpenAlways);
if (!fh.IsOpen() || fh.Flock(LOCK_EX)) {
return NPDisk::EPDiskMetadataOutcome::ERROR;
Expand Down Expand Up @@ -340,8 +354,10 @@ namespace NKikimr {
it->SetPDiskGuid(info.DiskGuid);
it->SetTimestamp(info.Timestamp.GetValue());
it->ClearUnformatted();
pdiskGuid->emplace(info.DiskGuid);
} else {
it->SetUnformatted(true);
pdiskGuid->reset();
}
it->SetKey(key.Keys.back());
const bool success = it->MutableMeta()->ParseFromString(metadata.ExtractUnderlyingContainerOrCopy<TString>());
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk.h
Original file line number Diff line number Diff line change
Expand Up @@ -1519,14 +1519,16 @@ struct TEvReadMetadata : TEventLocal<TEvReadMetadata, TEvBlobStorage::EvReadMeta
struct TEvReadMetadataResult : TEventLocal<TEvReadMetadataResult, TEvBlobStorage::EvReadMetadataResult> {
EPDiskMetadataOutcome Outcome;
TRcBuf Metadata;
std::optional<ui64> PDiskGuid;

TEvReadMetadataResult(EPDiskMetadataOutcome outcome)
: Outcome(outcome)
{}

TEvReadMetadataResult(TRcBuf&& metadata)
TEvReadMetadataResult(TRcBuf&& metadata, std::optional<ui64> pdiskGuid)
: Outcome(EPDiskMetadataOutcome::OK)
, Metadata(std::move(metadata))
, PDiskGuid(pdiskGuid)
{}
};

Expand All @@ -1540,9 +1542,11 @@ struct TEvWriteMetadata : TEventLocal<TEvWriteMetadata, TEvBlobStorage::EvWriteM

struct TEvWriteMetadataResult : TEventLocal<TEvWriteMetadataResult, TEvBlobStorage::EvWriteMetadataResult> {
EPDiskMetadataOutcome Outcome;
std::optional<ui64> PDiskGuid;

TEvWriteMetadataResult(EPDiskMetadataOutcome outcome)
TEvWriteMetadataResult(EPDiskMetadataOutcome outcome, std::optional<ui64> pdiskGuid)
: Outcome(outcome)
, PDiskGuid(pdiskGuid)
{}
};

Expand Down
6 changes: 4 additions & 2 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ bool ReadPDiskFormatInfo(const TString &path, const NPDisk::TMainKey &mainKey, T
const bool doLock = false, TIntrusivePtr<NPDisk::TSectorMap> sectorMap = nullptr);

// reads metadata from PDisk (if available)
NPDisk::EPDiskMetadataOutcome ReadPDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf& metadata);
NPDisk::EPDiskMetadataOutcome ReadPDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf& metadata,
std::optional<ui64> *pdiskGuid);

// updated PDisk metadata (when PDisk is properly formatted and supports metadata vault); size of metadata should not
// exceed 15 MiB; when function fails (even many times), previusly stored metadata must be kept intact
NPDisk::EPDiskMetadataOutcome WritePDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf&& metadata);
NPDisk::EPDiskMetadataOutcome WritePDiskMetadata(const TString& path, const NPDisk::TMainKey& key, TRcBuf&& metadata,
std::optional<ui64> *pdiskGuid);

} // NKikimr
Loading

0 comments on commit 1e03037

Please sign in to comment.