Skip to content

Commit

Permalink
feat: set column metadata from python
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Andersson <u.martin.andersson@gmail.com>
  • Loading branch information
umartin authored and rtyler committed Feb 5, 2025
1 parent 9dc8f32 commit 43edb9d
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 8 deletions.
7 changes: 7 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,13 @@ class RawDeltaTable:
allow_out_of_range: bool = False,
) -> pyarrow.RecordBatchReader: ...
def transaction_versions(self) -> Dict[str, Transaction]: ...
def set_column_metadata(
self,
column: str,
metadata: dict[str, str],
commit_properties: Optional[CommitProperties],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> None: ...
def __datafusion_table_provider__(self) -> Any: ...

def rust_core_version() -> str: ...
Expand Down
22 changes: 22 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2167,6 +2167,28 @@ def set_table_properties(
commit_properties,
)

def set_column_metadata(
self,
column: str,
metadata: dict[str, str],
commit_properties: Optional[CommitProperties] = None,
post_commithook_properties: Optional[PostCommitHookProperties] = None,
) -> None:
"""
Update a field's metadata in a schema. If the metadata key does not exist, the entry is inserted.
If the column name doesn't exist in the schema - an error is raised.
:param column: name of the column to update metadata for.
:param metadata: the metadata to be added or modified on the column.
:param commit_properties: properties of the transaction commit. If None, default values are used.
:param post_commithook_properties: properties for the post commit hook. If None, default values are used.
:return:
"""
self.table._table.set_column_metadata(
column, metadata, commit_properties, post_commithook_properties
)


class TableOptimizer:
"""API for various table optimization commands."""
Expand Down
54 changes: 46 additions & 8 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use arrow::pyarrow::PyArrowType;
use chrono::{DateTime, Duration, FixedOffset, Utc};
use datafusion_ffi::table_provider::FFI_TableProvider;
use delta_kernel::expressions::Scalar;
use delta_kernel::schema::StructField;
use delta_kernel::schema::{MetadataValue, StructField};
use deltalake::arrow::compute::concat_batches;
use deltalake::arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
use deltalake::arrow::record_batch::{RecordBatch, RecordBatchIterator};
Expand Down Expand Up @@ -63,13 +63,6 @@ use error::DeltaError;
use futures::future::join_all;
use tracing::log::*;

use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::pybacked::PyBackedStr;
use pyo3::types::{PyCapsule, PyDict, PyFrozenSet};
use serde_json::{Map, Value};
use uuid::Uuid;

use crate::error::DeltaProtocolError;
use crate::error::PythonError;
use crate::features::TableFeatures;
Expand All @@ -78,6 +71,14 @@ use crate::merge::PyMergeBuilder;
use crate::query::PyQueryBuilder;
use crate::schema::{schema_to_pyobject, Field};
use crate::utils::rt;
use deltalake::operations::update_field_metadata::UpdateFieldMetadataBuilder;
use deltalake::protocol::DeltaOperation::UpdateFieldMetadata;
use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::pybacked::PyBackedStr;
use pyo3::types::{PyCapsule, PyDict, PyFrozenSet};
use serde_json::{Map, Value};
use uuid::Uuid;

#[cfg(all(target_family = "unix", not(target_os = "emscripten")))]
use jemallocator::Jemalloc;
Expand Down Expand Up @@ -1521,6 +1522,43 @@ impl RawDeltaTable {
}
}

#[pyo3(signature = (field_name, metadata, commit_properties=None, post_commithook_properties=None))]
pub fn set_column_metadata(
&self,
py: Python,
field_name: &str,
metadata: HashMap<String, String>,
commit_properties: Option<PyCommitProperties>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<()> {
let table = py.allow_threads(|| {
let mut cmd = UpdateFieldMetadataBuilder::new(self.log_store()?, self.cloned_state()?);

cmd = cmd.with_field_name(field_name).with_metadata(
metadata
.iter()
.map(|(k, v)| (k.clone(), MetadataValue::String(v.clone())))
.collect(),
);

if let Some(commit_properties) =
maybe_create_commit_properties(commit_properties, post_commithook_properties)
{
cmd = cmd.with_commit_properties(commit_properties)
}

if self.log_store()?.name() == "LakeFSLogStore" {
cmd = cmd.with_custom_execute_handler(Arc::new(LakeFSCustomExecuteHandler {}))
}

rt().block_on(cmd.into_future())
.map_err(PythonError::from)
.map_err(PyErr::from)
})?;
self.set_state(table.state)?;
Ok(())
}

fn __datafusion_table_provider__<'py>(
&self,
py: Python<'py>,
Expand Down
15 changes: 15 additions & 0 deletions python/tests/test_alter.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,3 +457,18 @@ def test_add_feautres(existing_sample_table: DeltaTable):
"v2Checkpoint",
]
) # type: ignore


def test_set_column_metadata(tmp_path: pathlib.Path, sample_table: pa.Table):
write_deltalake(tmp_path, sample_table)

dt = DeltaTable(tmp_path)

dt.alter.set_column_metadata("price", {"comment": "my comment"})

fields_by_name = {field.name: field for field in dt.schema().fields}
assert fields_by_name["price"].metadata == {"comment": "my comment"}

with pytest.raises(DeltaError):
# Can't set metadata for non existing column.
dt.alter.set_column_metadata("non_existing_column", {"comment": "my comment"})

0 comments on commit 43edb9d

Please sign in to comment.