Skip to content

Commit

Permalink
Merge pull request openzfs#495 from delphix/projects/merge-upstream/m…
Browse files Browse the repository at this point in the history
…aster

Merge remote-tracking branch '6.0/stage' into 'master'
  • Loading branch information
grwilson authored Jun 24, 2022
2 parents 3553b3d + 73ae46a commit 0574972
Show file tree
Hide file tree
Showing 21 changed files with 345 additions and 271 deletions.
29 changes: 29 additions & 0 deletions cmd/zdb/zdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -7794,6 +7794,35 @@ dump_agent_metadata(spa_t *spa)
printf("Uberblock (object agent):\n");
print_zoa_nvlist(uberblock_phys);
fnvlist_free(uberblock_phys);

nvlist_t *leaked_objects = NULL;
VERIFY0(libzoa_find_leaks(zoa_handle, &leaked_objects));
uint_t leaks_len, missing_len;
uint64_t *leaks = fnvlist_lookup_uint64_array(leaked_objects, "leaked",
&leaks_len);
uint64_t *missing = fnvlist_lookup_uint64_array(leaked_objects,
"missing", &missing_len);

if (leaks_len > 0) {
printf("Leaked object store objects detected: %u, [%llu",
leaks_len, leaks[0]);
for (uint_t i = 1; i < leaks_len; i++) {
printf(", %llu", leaks[i]);
if (i % 4 == 0)
printf("\n\t");
}
printf("]\n");
}
if (missing_len > 0) {
printf("Missing object store objects detected: %u, [%llu",
missing_len, missing[0]);
for (uint_t i = 1; i < missing_len; i++) {
printf(", %llu", missing[i]);
if (i % 4 == 0)
printf("\n\t");
}
printf("]\n");
}
}
#endif

Expand Down
6 changes: 3 additions & 3 deletions cmd/zfs_object_agent/client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ async fn do_list_pools(
// Lookup all objects in the pool.
if list_all_objects {
object_access
.list_objects(pool_key, None, false)
.list_objects(pool_key, false)
.for_each(|object| async move { println!(" {}", object) })
.await;
}
Expand All @@ -410,7 +410,7 @@ async fn do_destroy_old_pools(
) -> Result<(), Box<dyn Error>> {
for pool_keys in find_old_pools(object_access, min_age).await {
object_access
.delete_objects(object_access.list_objects(pool_keys, None, false))
.delete_objects(object_access.list_objects(pool_keys, false))
.await;
}
Ok(())
Expand Down Expand Up @@ -490,7 +490,7 @@ async fn do_blob(cli_params: CliParams, count: NonZeroU32) -> Result<(), Box<dyn
println!(
"List blobs {:?}",
object_access
.list_objects("".to_string(), None, true)
.list_objects("".to_string(), true)
.collect::<Vec<String>>()
.await
);
Expand Down
6 changes: 3 additions & 3 deletions cmd/zfs_object_agent/object_perf/src/perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl Perf {
duration: Duration,
) {
let num_objects = object_access
.list_objects(key_prefix.clone(), None, true)
.list_objects(key_prefix.clone(), true)
.fold(0, |count, _key| async move { count + 1 })
.await;
let mut key_id = 0;
Expand Down Expand Up @@ -167,7 +167,7 @@ pub async fn write_test(
println!("{:#?}", perf.metrics.put);

object_access
.delete_objects(object_access.list_objects(key_prefix, None, false))
.delete_objects(object_access.list_objects(key_prefix, false))
.await;

Ok(())
Expand Down Expand Up @@ -199,7 +199,7 @@ pub async fn read_test(
println!("{:#?}", perf.metrics.get);

object_access
.delete_objects(object_access.list_objects(key_prefix, None, false))
.delete_objects(object_access.list_objects(key_prefix, false))
.await;

Ok(())
Expand Down
21 changes: 20 additions & 1 deletion cmd/zfs_object_agent/scripts/start_zoa
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,24 @@
# - ZETTACACHE_DIRECTORY: Directory to use for ZettaCache device discovery.
#

#
# Wait for the agent to start listening to requests. This function
# will loop until "zcache list" can communicate with the agent or
# systemd times out the service.
#
function wait_for_agent()
{
start=$SECONDS
while :; do
zcache list
if [[ $? -eq 0 ]]; then
echo "Succeded in: $((SECONDS - start)) seconds"
return
fi
echo "Elapsed wait time: $((SECONDS - start)) seconds"
done
}

ZOA_LOG_CONFIG="${ZOA_LOG_CONFIG:-/etc/zfs/zoa_log4rs.yml}"
PARAMS="-l ${ZOA_LOG_CONFIG} --clear-incompatible-cache"
echo "Logging config: ${ZOA_LOG_CONFIG}"
Expand All @@ -19,4 +37,5 @@ if [[ ! -z $ZOA_CONFIG ]]; then
fi
PARAMS="$PARAMS -d ${ZETTACACHE_DIRECTORY:-/dev}"

exec /sbin/zfs_object_agent ${PARAMS}
/sbin/zfs_object_agent ${PARAMS} &
wait_for_agent
4 changes: 4 additions & 0 deletions cmd/zfs_object_agent/util/src/unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,8 @@ where
pub fn is_empty(&self) -> bool {
self.pending.is_empty()
}

pub fn last(&self) -> K {
self.first + self.pending.len()
}
}
4 changes: 2 additions & 2 deletions cmd/zfs_object_agent/zettacache/src/superblock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub const SUPERBLOCK_SIZE: u64 = util::message::SUPERBLOCK_SIZE as u64;
pub struct SuperblockPhys {
pub primary: Option<PrimaryPhys>,
pub disk: DiskId,
#[serde(alias = "guid")]
#[serde(rename = "guid")]
pub cache_guid: CacheGuid,
#[serde(default)]
pub disk_guid: Option<DiskGuid>,
Expand All @@ -36,7 +36,7 @@ pub struct SuperblockPhys {
struct SuperblockFeaturesPhys {
primary: Option<PrimaryFeaturesPhys>,
disk: DiskId,
#[serde(alias = "guid")]
#[serde(rename = "guid")]
cache_guid: CacheGuid,
}

Expand Down
1 change: 1 addition & 0 deletions cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ impl ZettaCache {
let inner = match Inner::open(mode).await {
Ok(inner) => Some(inner),
Err(CacheOpenError::NoDevices) => None,
Err(e @ CacheOpenError::IncompatibleFeatures(_, _)) => return Err(e),
Err(e) => {
error!("could not open zettacache: {e}");
None
Expand Down
10 changes: 8 additions & 2 deletions cmd/zfs_object_agent/zettacache/src/zettacache/zcdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ use crate::block_access::BlockAccess;
use crate::block_access::Disk;
use crate::block_allocator::zcdb::zcachedb_dump_slabs;
use crate::block_allocator::zcdb::zcachedb_dump_spacemaps;
use crate::features::check_features;
use crate::slab_allocator::SlabAllocatorBuilder;
use crate::superblock::PrimaryPhys;
use crate::superblock::SuperblockPhys;
use crate::superblock::SUPERBLOCK_SIZE;
use crate::CacheOpenError;
use crate::DumpSlabsOptions;
use crate::DumpStructuresOptions;

Expand Down Expand Up @@ -49,11 +51,15 @@ impl ZCacheDBHandle {

pub async fn open(paths: Vec<PathBuf>) -> Result<ZCacheDBHandle> {
let mut disks: Vec<Disk> = Vec::with_capacity(paths.len());
for path in paths {
disks.push(Disk::new(&path, true)?);
for path in &paths {
disks.push(Disk::new(path, true)?);
}
let block_access = Arc::new(BlockAccess::new(disks, true));

let feature_flags = PrimaryPhys::read_features(&block_access).await?;
check_features(&feature_flags)
.map_err(|e| CacheOpenError::IncompatibleFeatures(paths, e))?;

let (primary, primary_disk, guid, _extra_disks) = PrimaryPhys::read(&block_access).await?;
let checkpoint = Arc::new(CheckpointPhys::read(&block_access, &primary.checkpoint).await?);

Expand Down
10 changes: 10 additions & 0 deletions cmd/zfs_object_agent/zettaobject/src/base_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ impl Txg {
Some(Txg(self.0 - rhs))
}
}

pub fn from_key(key: &str) -> Self {
Txg(key.rsplit_once('/').unwrap().1.parse().unwrap())
}
}

#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
Expand All @@ -43,4 +47,10 @@ impl ObjectId {
pub fn prefix(self) -> u64 {
self.0 % NUM_DATA_PREFIXES
}

/// This function parses a key into an object id. It works for any key
/// where the last path component is the object id.
pub fn from_key(key: &str) -> Self {
ObjectId(key.rsplit_once('/').unwrap().1.parse().unwrap())
}
}
39 changes: 39 additions & 0 deletions cmd/zfs_object_agent/zettaobject/src/data_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ use anyhow::Context;
use anyhow::Result;
use bytes::Bytes;
use bytesize::ByteSize;
use futures::future;
use futures::stream;
use futures::FutureExt;
use futures::Stream;
use futures::StreamExt;
use log::*;
use more_asserts::*;
use rusoto_core::ByteStream;
Expand All @@ -39,6 +42,10 @@ tunable! {
static ref DATA_OBJ_RANGED_GET: bool = false;
static ref DATA_OBJ_TRY_HEADER_SIZE: ByteSize = ByteSize::kib(16);
static ref OBJECT_CACHE_SIZE: usize = 100;

// Number of block IDs to scan in parallel for recovery phase when the agent crashes in the
// middle of a TXG.
static ref RECOVERY_SCAN_COUNT: usize = 500;
}

lazy_static_ptr! {
Expand Down Expand Up @@ -498,6 +505,38 @@ impl DataObject {
pub fn is_empty(&self) -> bool {
self.blocks.is_empty()
}

pub async fn next_uncached(
object_access: &ObjectAccess,
guid: PoolGuid,
start_from: ObjectId,
end_with: ObjectId,
) -> Option<Self> {
stream::iter(
(0..)
.map(|i| ObjectId::new(start_from.as_min_block() + i))
.take_while(|&object| object <= end_with)
.map(|object| async move {
Self::get_uncached(object_access, guid, object, ObjectAccessOpType::ReadsGet)
.await
}),
)
.buffered(*RECOVERY_SCAN_COUNT)
.filter_map(|result| future::ready(result.ok()))
.next()
.await
}

pub fn list_all(
object_access: &ObjectAccess,
guid: PoolGuid,
) -> impl Stream<Item = ObjectId> + '_ {
stream::select_all(Self::prefixes(guid).map(|prefix| {
object_access
.list_objects(prefix, false)
.map(|str| ObjectId::from_key(&str))
}))
}
}

impl Display for DataObject {
Expand Down
25 changes: 25 additions & 0 deletions cmd/zfs_object_agent/zettaobject/src/debug.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::BTreeSet;
use std::sync::Arc;

use futures::StreamExt;
use log::debug;
use log::error;
use nix::errno::Errno;
Expand All @@ -13,6 +15,7 @@ use zettacache::CacheOpenMode;
use zettacache::ZettaCache;

use crate::base_types::Txg;
use crate::data_object::DataObject;
use crate::object_access::ObjectAccess;
use crate::object_access::ObjectAccessProtocol;
use crate::pool;
Expand Down Expand Up @@ -125,4 +128,26 @@ impl DebugHandle {
};
self.runtime.block_on(future)
}

pub fn find_leaks(&self) -> Result<NvList, Errno> {
let object_access = self.object_access.as_ref().unwrap();
let pool = self.pool.as_ref().unwrap();
let found = self.runtime.block_on(
DataObject::list_all(object_access, pool.state.shared_state.guid)
.collect::<BTreeSet<_>>(),
);
let map_set = pool.state.object_block_set();
let leaked = (&found - &map_set)
.into_iter()
.map(|id| id.as_min_block().0)
.collect::<Vec<_>>();
let missing = (&map_set - &found)
.into_iter()
.map(|id| id.as_min_block().0)
.collect::<Vec<_>>();
let mut nvl = NvList::new_unique_names();
nvl.insert("leaked", leaked.as_slice()).unwrap();
nvl.insert("missing", missing.as_slice()).unwrap();
Ok(nvl)
}
}
13 changes: 6 additions & 7 deletions cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,8 @@ impl ObjectAccessTrait for BlobObjectAccess {
use_delimiter: bool,
list_prefixes: bool,
) -> Pin<Box<dyn Stream<Item = Result<String>> + Send + '_>> {
let msg = format!("list {} (after {:?})", prefix, start_after);
assert!(start_after.is_none());
let msg = format!("list {}", prefix);
let list_prefix = prefix;
let mut next_marker: Option<NextMarker> = None;

Expand Down Expand Up @@ -645,22 +646,16 @@ impl ObjectAccessTrait for BlobObjectAccess {
})
.await?;

// XXX The performance of this is likely to be quite bad. We need a better solution. DOSE-1215
let initial = start_after.clone().unwrap_or_default();
if list_prefixes {
if let Some(prefixes) = output.blobs.blob_prefix {
for blob_prefix in prefixes {
if initial < blob_prefix.name {
yield blob_prefix.name;
}
}
}
} else {
for blob in output.blobs.blobs {
if initial < blob.name {
yield blob.name;
}
}
}
next_marker = output.next_marker.clone();
if (next_marker.is_none()) {
Expand All @@ -669,6 +664,10 @@ impl ObjectAccessTrait for BlobObjectAccess {
}
})
}

fn supports_list_after(&self) -> bool {
false
}
}

// Creation of a BlobObjectAccess object with invalid credentials can cause a crash as the azure sdk
Expand Down
Loading

0 comments on commit 0574972

Please sign in to comment.