Skip to content

Commit

Permalink
Returns ArchiveSnapshotPackageError from archive_snapshot_package() (s…
Browse files Browse the repository at this point in the history
  • Loading branch information
brooksprumo authored Jan 19, 2024
1 parent 2b0b5ae commit a915e2f
Showing 1 changed file with 131 additions and 51 deletions.
182 changes: 131 additions & 51 deletions runtime/src/snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,6 @@ pub enum SnapshotError {
#[error("archive generation failure {0}")]
ArchiveGenerationFailure(ExitStatus),

#[error("storage path symlink is invalid '{0}'")]
StoragePathSymlinkInvalid(PathBuf),

#[error("Unpack error: {0}")]
UnpackError(#[from] UnpackError),

Expand Down Expand Up @@ -351,6 +348,9 @@ pub enum SnapshotError {

#[error("failed to add bank snapshot for slot {1}: {0}")]
AddBankSnapshot(#[source] AddBankSnapshotError, Slot),

#[error("failed to archive snapshot package: {0}")]
ArchiveSnapshotPackage(#[from] ArchiveSnapshotPackageError),
}

#[derive(Error, Debug)]
Expand Down Expand Up @@ -426,6 +426,73 @@ pub enum AddBankSnapshotError {
CreateStateCompleteFile(#[source] IoError),
}

/// Errors that can happen in `archive_snapshot_package()`
#[derive(Error, Debug)]
pub enum ArchiveSnapshotPackageError {
#[error("failed to create archive path '{1}': {0}")]
CreateArchiveDir(#[source] IoError, PathBuf),

#[error("failed to create staging dir inside '{1}': {0}")]
CreateStagingDir(#[source] IoError, PathBuf),

#[error("failed to create accounts staging dir '{1}': {0}")]
CreateAccountsStagingDir(#[source] IoError, PathBuf),

#[error("failed to create snapshot staging dir '{1}': {0}")]
CreateSnapshotStagingDir(#[source] IoError, PathBuf),

#[error("failed to canonicalize snapshot source dir '{1}': {0}")]
CanonicalizeSnapshotSourceDir(#[source] IoError, PathBuf),

#[error("failed to symlink snapshot from '{1}' to '{2}': {0}")]
SymlinkSnapshot(#[source] IoError, PathBuf, PathBuf),

#[error("failed to symlink status cache from '{1}' to '{2}': {0}")]
SymlinkStatusCache(#[source] IoError, PathBuf, PathBuf),

#[error("failed to symlink version file from '{1}' to '{2}': {0}")]
SymlinkVersionFile(#[source] IoError, PathBuf, PathBuf),

#[error("failed to flush account storage file '{1}': {0}")]
FlushAccountStorageFile(#[source] AccountsFileError, PathBuf),

#[error("failed to canonicalize account storage file '{1}': {0}")]
CanonicalizeAccountStorageFile(#[source] IoError, PathBuf),

#[error("failed to symlink account storage file from '{1}' to '{2}': {0}")]
SymlinkAccountStorageFile(#[source] IoError, PathBuf, PathBuf),

#[error("account storage staging file is invalid '{0}'")]
InvalidAccountStorageStagingFile(PathBuf),

#[error("failed to create archive file '{1}': {0}")]
CreateArchiveFile(#[source] IoError, PathBuf),

#[error("failed to archive version file: {0}")]
ArchiveVersionFile(#[source] IoError),

#[error("failed to archive snapshots dir: {0}")]
ArchiveSnapshotsDir(#[source] IoError),

#[error("failed to archive accounts dir: {0}")]
ArchiveAccountsDir(#[source] IoError),

#[error("failed to archive snapshot: {0}")]
FinishArchive(#[source] IoError),

#[error("failed to create encoder: {0}")]
CreateEncoder(#[source] IoError),

#[error("failed to encode archive: {0}")]
FinishEncoder(#[source] IoError),

#[error("failed to query archive metadata '{1}': {0}")]
QueryArchiveMetadata(#[source] IoError, PathBuf),

#[error("failed to move archive from '{1}' to '{2}': {0}")]
MoveArchive(#[source] IoError, PathBuf, PathBuf),
}

/// Errors that can happen in `hard_link_storages_to_snapshot()`
#[derive(Error, Debug)]
pub enum HardLinkStoragesToSnapshotError {
Expand Down Expand Up @@ -664,6 +731,9 @@ pub fn archive_snapshot_package(
maximum_full_snapshot_archives_to_retain: NonZeroUsize,
maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
) -> Result<()> {
use ArchiveSnapshotPackageError as E;
const SNAPSHOTS_DIR: &str = "snapshots";
const ACCOUNTS_DIR: &str = "accounts";
info!(
"Generating snapshot archive for slot {}",
snapshot_package.slot()
Expand All @@ -675,8 +745,7 @@ pub fn archive_snapshot_package(
.parent()
.expect("Tar output path is invalid");

fs_err::create_dir_all(tar_dir)
.map_err(|err| SnapshotError::IoWithSource(err, "create archive path"))?;
fs::create_dir_all(tar_dir).map_err(|err| E::CreateArchiveDir(err, tar_dir.to_path_buf()))?;

// Create the staging directories
let staging_dir_prefix = TMP_SNAPSHOT_ARCHIVE_PREFIX;
Expand All @@ -687,61 +756,65 @@ pub fn archive_snapshot_package(
snapshot_package.slot()
))
.tempdir_in(tar_dir)
.map_err(|e| SnapshotError::IoWithSource(e, "create archive tempdir"))?;
.map_err(|err| E::CreateStagingDir(err, tar_dir.to_path_buf()))?;

let staging_accounts_dir = staging_dir.path().join("accounts");
let staging_snapshots_dir = staging_dir.path().join("snapshots");
let staging_snapshots_dir = staging_dir.path().join(SNAPSHOTS_DIR);
let staging_accounts_dir = staging_dir.path().join(ACCOUNTS_DIR);

// Create staging/accounts/
fs_err::create_dir_all(&staging_accounts_dir)
.map_err(|err| SnapshotError::IoWithSource(err, "create staging accounts path"))?;
fs::create_dir_all(&staging_accounts_dir)
.map_err(|err| E::CreateAccountsStagingDir(err, staging_accounts_dir.clone()))?;

let slot_str = snapshot_package.slot().to_string();
let staging_snapshot_dir = staging_snapshots_dir.join(&slot_str);
// Creates staging snapshots/<slot>/
fs_err::create_dir_all(&staging_snapshot_dir)
.map_err(|err| SnapshotError::IoWithSource(err, "create staging snapshots path"))?;
fs::create_dir_all(&staging_snapshot_dir)
.map_err(|err| E::CreateSnapshotStagingDir(err, staging_snapshot_dir.clone()))?;

let src_snapshot_dir = &snapshot_package.bank_snapshot_dir;
// To be a source for symlinking and archiving, the path need to be an absolute path
let src_snapshot_dir = src_snapshot_dir
.canonicalize()
.map_err(|_e| SnapshotError::InvalidSnapshotDirPath(src_snapshot_dir.clone()))?;
.map_err(|err| E::CanonicalizeSnapshotSourceDir(err, src_snapshot_dir.clone()))?;
let staging_snapshot_file = staging_snapshot_dir.join(&slot_str);
let src_snapshot_file = src_snapshot_dir.join(slot_str);
symlink::symlink_file(src_snapshot_file, staging_snapshot_file)
.map_err(|e| SnapshotError::IoWithSource(e, "create snapshot symlink"))?;
symlink::symlink_file(&src_snapshot_file, &staging_snapshot_file)
.map_err(|err| E::SymlinkSnapshot(err, src_snapshot_file, staging_snapshot_file))?;

// Following the existing archive format, the status cache is under snapshots/, not under <slot>/
// like in the snapshot dir.
let staging_status_cache = staging_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
let src_status_cache = src_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
symlink::symlink_file(src_status_cache, staging_status_cache)
.map_err(|e| SnapshotError::IoWithSource(e, "create status cache symlink"))?;
symlink::symlink_file(&src_status_cache, &staging_status_cache)
.map_err(|err| E::SymlinkStatusCache(err, src_status_cache, staging_status_cache))?;

// The bank snapshot has the version file, so symlink it to the correct staging path
let staging_version_file = staging_dir.path().join(SNAPSHOT_VERSION_FILENAME);
let src_version_file = src_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
symlink::symlink_file(src_version_file, staging_version_file)
.map_err(|e| SnapshotError::IoWithSource(e, "create version file symlink"))?;
symlink::symlink_file(&src_version_file, &staging_version_file).map_err(|err| {
E::SymlinkVersionFile(err, src_version_file, staging_version_file.clone())
})?;

// Add the AppendVecs into the compressible list
for storage in snapshot_package.snapshot_storages.iter() {
storage.flush()?;
let storage_path = storage.get_path();
let output_path = staging_accounts_dir.join(AppendVec::file_name(
storage
.flush()
.map_err(|err| E::FlushAccountStorageFile(err, storage_path.clone()))?;
let staging_storage_path = staging_accounts_dir.join(AppendVec::file_name(
storage.slot(),
storage.append_vec_id(),
));

// `storage_path` - The file path where the AppendVec itself is located
// `output_path` - The file path where the AppendVec will be placed in the staging directory.
let storage_path =
fs_err::canonicalize(storage_path).expect("Could not get absolute path for accounts");
symlink::symlink_file(storage_path, &output_path)
.map_err(|e| SnapshotError::IoWithSource(e, "create storage symlink"))?;
if !output_path.is_file() {
return Err(SnapshotError::StoragePathSymlinkInvalid(output_path));
// `src_storage_path` - The file path where the AppendVec itself is located
// `staging_storage_path` - The file path where the AppendVec will be placed in the staging directory.
let src_storage_path = fs::canonicalize(&storage_path)
.map_err(|err| E::CanonicalizeAccountStorageFile(err, storage_path))?;
symlink::symlink_file(&src_storage_path, &staging_storage_path).map_err(|err| {
E::SymlinkAccountStorageFile(err, src_storage_path, staging_storage_path.clone())
})?;
if !staging_storage_path.is_file() {
return Err(E::InvalidAccountStorageStagingFile(staging_storage_path).into());
}
}

Expand All @@ -754,20 +827,23 @@ pub fn archive_snapshot_package(
));

{
let mut archive_file = fs_err::File::create(&archive_path)?;
let mut archive_file = fs::File::create(&archive_path)
.map_err(|err| E::CreateArchiveFile(err, archive_path.clone()))?;

let do_archive_files = |encoder: &mut dyn Write| -> Result<()> {
let do_archive_files = |encoder: &mut dyn Write| -> std::result::Result<(), E> {
let mut archive = tar::Builder::new(encoder);
// Serialize the version and snapshots files before accounts so we can quickly determine the version
// and other bank fields. This is necessary if we want to interleave unpacking with reconstruction
archive.append_path_with_name(
staging_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME),
SNAPSHOT_VERSION_FILENAME,
)?;
for dir in ["snapshots", "accounts"] {
archive.append_dir_all(dir, staging_dir.as_ref().join(dir))?;
}
archive.into_inner()?;
archive
.append_path_with_name(&staging_version_file, SNAPSHOT_VERSION_FILENAME)
.map_err(E::ArchiveVersionFile)?;
archive
.append_dir_all(SNAPSHOTS_DIR, &staging_snapshots_dir)
.map_err(E::ArchiveSnapshotsDir)?;
archive
.append_dir_all(ACCOUNTS_DIR, &staging_accounts_dir)
.map_err(E::ArchiveAccountsDir)?;
archive.into_inner().map_err(E::FinishArchive)?;
Ok(())
};

Expand All @@ -776,24 +852,28 @@ pub fn archive_snapshot_package(
let mut encoder =
bzip2::write::BzEncoder::new(archive_file, bzip2::Compression::best());
do_archive_files(&mut encoder)?;
encoder.finish()?;
encoder.finish().map_err(E::FinishEncoder)?;
}
ArchiveFormat::TarGzip => {
let mut encoder =
flate2::write::GzEncoder::new(archive_file, flate2::Compression::default());
do_archive_files(&mut encoder)?;
encoder.finish()?;
encoder.finish().map_err(E::FinishEncoder)?;
}
ArchiveFormat::TarZstd => {
let mut encoder = zstd::stream::Encoder::new(archive_file, 0)?;
let mut encoder =
zstd::stream::Encoder::new(archive_file, 0).map_err(E::CreateEncoder)?;
do_archive_files(&mut encoder)?;
encoder.finish()?;
encoder.finish().map_err(E::FinishEncoder)?;
}
ArchiveFormat::TarLz4 => {
let mut encoder = lz4::EncoderBuilder::new().level(1).build(archive_file)?;
let mut encoder = lz4::EncoderBuilder::new()
.level(1)
.build(archive_file)
.map_err(E::CreateEncoder)?;
do_archive_files(&mut encoder)?;
let (_output, result) = encoder.finish();
result?
result.map_err(E::FinishEncoder)?;
}
ArchiveFormat::Tar => {
do_archive_files(&mut archive_file)?;
Expand All @@ -802,10 +882,10 @@ pub fn archive_snapshot_package(
}

// Atomically move the archive into position for other validators to find
let metadata = fs_err::metadata(&archive_path)
.map_err(|err| SnapshotError::IoWithSource(err, "archive path stat"))?;
fs_err::rename(&archive_path, snapshot_package.path())
.map_err(|err| SnapshotError::IoWithSource(err, "archive path rename"))?;
let metadata = fs::metadata(&archive_path)
.map_err(|err| E::QueryArchiveMetadata(err, archive_path.clone()))?;
fs::rename(&archive_path, snapshot_package.path())
.map_err(|err| E::MoveArchive(err, archive_path, snapshot_package.path().clone()))?;

purge_old_snapshot_archives(
full_snapshot_archives_dir,
Expand All @@ -816,8 +896,8 @@ pub fn archive_snapshot_package(

timer.stop();
info!(
"Successfully created {:?}. slot: {}, elapsed ms: {}, size={}",
snapshot_package.path(),
"Successfully created {}. slot: {}, elapsed ms: {}, size: {}",
snapshot_package.path().display(),
snapshot_package.slot(),
timer.as_ms(),
metadata.len()
Expand Down

0 comments on commit a915e2f

Please sign in to comment.