Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce DeltaConfig and tombstones retention policy #420

Merged
merged 2 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,17 @@ def metadata(self) -> Metadata:
"""
return self._metadata

def vacuum(self, retention_hours: int, dry_run: bool = True) -> List[str]:
def vacuum(self, retention_hours: Optional[int] = None, dry_run: bool = True) -> List[str]:
"""
Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.

:param retention_hours: the retention threshold in hours
:param retention_hours: the retention threshold in hours, if none then the value from `configuration.deletedFileRetentionDuration` is used or default of 1 week otherwise.
:param dry_run: when activated, list only the files, delete otherwise
:return: the list of files no longer referenced by the Delta Table and are older than the retention threshold.
"""
if retention_hours < 0:
raise ValueError("The retention periods should be positive.")
if retention_hours:
if retention_hours < 0:
raise ValueError("The retention periods should be positive.")

return self._table.vacuum(dry_run, retention_hours)

Expand Down
2 changes: 1 addition & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl RawDeltaTable {
}

/// Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.
pub fn vacuum(&mut self, dry_run: bool, retention_hours: u64) -> PyResult<Vec<String>> {
pub fn vacuum(&mut self, dry_run: bool, retention_hours: Option<u64>) -> PyResult<Vec<String>> {
rt()?
.block_on(self._table.vacuum(retention_hours, dry_run))
.map_err(PyDeltaTableError::from_raw)
Expand Down
3 changes: 2 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,5 @@ glibc_version = "0"
utime = "0.3"
serial_test = "0"
pretty_assertions = "0"
tempdir = "0"
tempdir = "0"
maplit = { version = "1" }
9 changes: 5 additions & 4 deletions rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<Vec<u8>, Checkpoi
let mut stats_conversions: Vec<(SchemaPath, SchemaDataType)> = Vec::new();
collect_stats_conversions(&mut stats_conversions, current_metadata.schema.get_fields());

let tombstones = state.unexpired_tombstones();

// protocol
let mut jsons = std::iter::once(action::Action::protocol(action::Protocol {
min_reader_version: state.min_reader_version(),
Expand All @@ -191,10 +193,9 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<Vec<u8>, Checkpoi
)
// removes
.chain(
state
.tombstones()
tombstones
.iter()
.map(|f| action::Action::remove(f.clone())),
.map(|f| action::Action::remove((*f).clone())),
)
.map(|a| serde_json::to_value(a).map_err(ArrowError::from))
// adds
Expand All @@ -213,7 +214,7 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<Vec<u8>, Checkpoi
let writeable_cursor = InMemoryWriteableCursor::default();
let mut writer = ArrowWriter::try_new(writeable_cursor.clone(), arrow_schema.clone(), None)?;
let batch_size =
state.app_transaction_version().len() + state.tombstones().len() + state.files().len() + 2; // 1 (protocol) + 1 (metadata)
state.app_transaction_version().len() + tombstones.len() + state.files().len() + 2; // 1 (protocol) + 1 (metadata)
let decoder = Decoder::new(arrow_schema, batch_size, None);
while let Some(batch) = decoder.next_batch(&mut jsons)? {
writer.write(&batch)?;
Expand Down
85 changes: 59 additions & 26 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt;
use std::io::{BufRead, BufReader, Cursor};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{cmp::Ordering, collections::HashSet};
use uuid::Uuid;

use crate::action::Stats;

use super::action;
use super::action::{Action, DeltaOperation};
use super::delta_config;
use super::partitions::{DeltaTablePartition, PartitionFilter};
use super::schema::*;
use super::storage;
use super::storage::{parse_uri, StorageBackend, StorageError, UriError};
use crate::delta_config::DeltaConfigError;

/// Metadata for a checkpoint file
#[derive(Serialize, Deserialize, Debug, Default, Clone, Copy)]
Expand Down Expand Up @@ -311,6 +313,13 @@ pub enum ApplyLogError {
/// Storage error details returned while reading the log content.
source: StorageError,
},
/// Error returned when reading delta config failed.
#[error("Failed to read delta config: {}", .source)]
Config {
/// Delta config error returned when reading delta config failed.
#[from]
source: DeltaConfigError,
},
/// Error returned when a line from log record is invalid.
#[error("Failed to read line from log record")]
Io {
Expand Down Expand Up @@ -371,14 +380,25 @@ pub struct DeltaTableState {
min_reader_version: i32,
min_writer_version: i32,
current_metadata: Option<DeltaTableMetaData>,
tombstone_retention_millis: DeltaDataTypeLong,
}

impl DeltaTableState {
/// Full list of tombstones (remove actions) representing files removed from table state).
pub fn tombstones(&self) -> &Vec<action::Remove> {
pub fn all_tombstones(&self) -> &Vec<action::Remove> {
self.tombstones.as_ref()
}

/// List of unexpired tombstones (remove actions) representing files removed from table state.
/// The retention period is set by `deletedFileRetentionDuration` with default value of 1 week.
pub fn unexpired_tombstones(&self) -> Vec<&action::Remove> {
let retention_timestamp = Utc::now().timestamp_millis() - self.tombstone_retention_millis;
self.tombstones
.iter()
.filter(|t| t.deletion_timestamp.unwrap_or(0) > retention_timestamp)
.collect()
}

/// Full list of add actions representing all parquet files that are part of the current
/// delta table state.
pub fn files(&self) -> &Vec<action::Add> {
Expand Down Expand Up @@ -867,9 +887,9 @@ impl DeltaTable {
.ok_or(DeltaTableError::NoMetadata)
}

/// Returns a vector of tombstones (i.e. `Remove` actions present in the current delta log.
pub fn get_tombstones(&self) -> &Vec<action::Remove> {
&self.state.tombstones
/// Returns a vector of tombstones (i.e. `Remove` actions present in the current delta log).
pub fn get_tombstones(&self) -> Vec<&action::Remove> {
self.state.unexpired_tombstones()
}

/// Returns the current version of the DeltaTable based on the loaded metadata.
Expand All @@ -890,21 +910,29 @@ impl DeltaTable {
}

/// List files no longer referenced by a Delta table and are older than the retention threshold.
fn get_stale_files(&self, retention_hours: u64) -> Result<HashSet<&str>, DeltaTableError> {
if retention_hours < 168 {
fn get_stale_files(
&self,
retention_hours: Option<u64>,
) -> Result<HashSet<&str>, DeltaTableError> {
let retention_millis = retention_hours
.map(|hours| 3600000 * hours as i64)
.unwrap_or(self.state.tombstone_retention_millis);

if retention_millis < self.state.tombstone_retention_millis {
return Err(DeltaTableError::InvalidVacuumRetentionPeriod);
}
let before_duration = (SystemTime::now() - Duration::from_secs(3600 * retention_hours))
.duration_since(UNIX_EPOCH);
let delete_before_timestamp = match before_duration {
Ok(duration) => duration.as_millis() as i64,
Err(_) => return Err(DeltaTableError::InvalidVacuumRetentionPeriod),
};

let tombstone_retention_timestamp = Utc::now().timestamp_millis() - retention_millis;

Ok(self
.get_tombstones()
.state
.all_tombstones()
.iter()
.filter(|tombstone| tombstone.deletion_timestamp.unwrap_or(0) < delete_before_timestamp)
.filter(|tombstone| {
// if the file has a creation time before the `tombstone_retention_timestamp`
// then it's considered as a stale file
tombstone.deletion_timestamp.unwrap_or(0) < tombstone_retention_timestamp
})
.map(|tombstone| tombstone.path.as_str())
.collect::<HashSet<_>>())
}
Expand All @@ -928,10 +956,15 @@ impl DeltaTable {
}

/// Run the Vacuum command on the Delta Table: delete files no longer referenced by a Delta table and are older than the retention threshold.
/// We do not recommend that you set a retention interval shorter than 7 days, because old snapshots and uncommitted files can still be in use by concurrent readers or writers to the table. If vacuum cleans up active files, concurrent readers can fail or, worse, tables can be corrupted when vacuum deletes files that have not yet been committed.
/// We do not recommend that you set a retention interval shorter than 7 days, because old snapshots
/// and uncommitted files can still be in use by concurrent readers or writers to the table.
/// If vacuum cleans up active files, concurrent readers can fail or, worse, tables can be
/// corrupted when vacuum deletes files that have not yet been committed.
/// If `retention_hours` is not set then the `configuration.deletedFileRetentionDuration` of
/// delta table is used or if that's missing too, then the default value of 7 days otherwise.
pub async fn vacuum(
&mut self,
retention_hours: u64,
retention_hours: Option<u64>,
dry_run: bool,
) -> Result<Vec<String>, DeltaTableError> {
let expired_tombstones = self.get_stale_files(retention_hours)?;
Expand Down Expand Up @@ -1024,9 +1057,7 @@ impl DeltaTable {
.await
.map_err(|e| DeltaTableError::from(DeltaTransactionError::from(e)))?;

// NOTE: since we have the log entry in memory already,
// we could optimize this further by merging the log entry instead of updating from storage.
self.update_incremental().await?;
self.update().await?;

Ok(version)
}
Expand Down Expand Up @@ -1418,7 +1449,7 @@ impl<'a> DeltaTransaction<'a> {
) -> Result<DeltaDataTypeVersion, DeltaTableError> {
let mut attempt_number: u32 = 0;
loop {
self.delta_table.update_incremental().await?;
self.delta_table.update().await?;

let version = self.delta_table.version + 1;

Expand Down Expand Up @@ -1472,10 +1503,7 @@ fn log_entry_from_actions(actions: &[Action]) -> Result<String, serde_json::Erro
Ok(jsons.join("\n"))
}

fn process_action(
state: &mut DeltaTableState,
action: Action,
) -> Result<(), serde_json::error::Error> {
fn process_action(state: &mut DeltaTableState, action: Action) -> Result<(), ApplyLogError> {
match action {
Action::add(v) => {
state.files.push(v);
Expand All @@ -1489,7 +1517,11 @@ fn process_action(
state.min_writer_version = v.min_writer_version;
}
Action::metaData(v) => {
state.current_metadata = Some(DeltaTableMetaData::try_from(v)?);
let md = DeltaTableMetaData::try_from(v)?;
state.tombstone_retention_millis = delta_config::TOMBSTONE_RETENTION
.get_interval_from_metadata(&md)?
.as_millis() as i64;
state.current_metadata = Some(md);
}
Action::txn(v) => {
*state
Expand Down Expand Up @@ -1565,6 +1597,7 @@ mod tests {
min_reader_version: 1,
min_writer_version: 2,
app_transaction_version,
tombstone_retention_millis: 0,
};

let txn_action = Action::txn(action::Txn {
Expand Down
Loading