From ba7484fe873fbd30ae068ca44af532096bcbe687 Mon Sep 17 00:00:00 2001 From: Thomas Peiselt Date: Sat, 30 Dec 2023 12:03:29 +0100 Subject: [PATCH] chore: update documentation for S3 / DynamoDb log store configuration --- crates/deltalake-aws/src/storage.rs | 5 +-- crates/deltalake-aws/tests/common.rs | 4 -- python/deltalake/writer.py | 2 +- python/docs/source/usage.rst | 56 +++++++++++++++++++++------- 4 files changed, 45 insertions(+), 22 deletions(-) diff --git a/crates/deltalake-aws/src/storage.rs b/crates/deltalake-aws/src/storage.rs index d2c3cca9a2..b71d17bb64 100644 --- a/crates/deltalake-aws/src/storage.rs +++ b/crates/deltalake-aws/src/storage.rs @@ -539,7 +539,6 @@ mod tests { s3_constants::AWS_ACCESS_KEY_ID.to_string() => "test_id_mixed".to_string(), s3_constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret_mixed".to_string(), s3_constants::AWS_REGION.to_string() => "us-west-2".to_string(), - "DYNAMO_LOCK_PARTITION_KEY_VALUE".to_string() => "my_lock".to_string(), "AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES".to_string() => "3".to_string(), }); @@ -562,9 +561,7 @@ mod tests { s3_pool_idle_timeout: Duration::from_secs(1), sts_pool_idle_timeout: Duration::from_secs(2), s3_get_internal_server_error_retries: 3, - extra_opts: hashmap! { - "DYNAMO_LOCK_PARTITION_KEY_VALUE".to_string() => "my_lock".to_string(), - }, + extra_opts: hashmap! {}, allow_unsafe_rename: false, }, options diff --git a/crates/deltalake-aws/tests/common.rs b/crates/deltalake-aws/tests/common.rs index c294ca5d99..01aa505b1b 100644 --- a/crates/deltalake-aws/tests/common.rs +++ b/crates/deltalake-aws/tests/common.rs @@ -23,10 +23,6 @@ impl Default for S3Integration { impl StorageIntegration for S3Integration { /// Create a new bucket fn create_bucket(&self) -> std::io::Result { - set_env_if_not_set( - "DYNAMO_LOCK_PARTITION_KEY_VALUE", - format!("s3://{}", self.bucket_name()), - ); Self::create_lock_table()?; let mut child = Command::new("aws") .args(["s3", "mb", &self.root_uri()]) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 7306a5705c..af4c54bcb8 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -185,7 +185,7 @@ def write_deltalake( Additionally, you must create a DynamoDB table with the name 'delta_rs_lock_table' so that it can be automatically discovered by delta-rs. Alternatively, you can - use a table name of your choice, but you must set the `DYNAMO_LOCK_TABLE_NAME` + use a table name of your choice, but you must set the `DELTA_DYNAMO_TABLE_NAME` variable to match your chosen table name. The required schema for the DynamoDB table is as follows: diff --git a/python/docs/source/usage.rst b/python/docs/source/usage.rst index ed0556a176..d0349a450c 100644 --- a/python/docs/source/usage.rst +++ b/python/docs/source/usage.rst @@ -492,30 +492,39 @@ locking provider at the moment in delta-rs. To enable DynamoDB as the locking provider, you need to set the **AWS_S3_LOCKING_PROVIDER** to 'dynamodb' as a ``storage_options`` or as an environment variable. -Additionally, you must create a DynamoDB table with the name ``delta_rs_lock_table`` +Additionally, you must create a DynamoDB table with the name ``delta_log`` so that it can be automatically recognized by delta-rs. Alternatively, you can -use a table name of your choice, but you must set the **DYNAMO_LOCK_TABLE_NAME** +use a table name of your choice, but you must set the **DELTA_DYNAMO_TABLE_NAME** variable to match your chosen table name. The required schema for the DynamoDB table is as follows: .. code-block:: json - { + + "Table": { "AttributeDefinitions": [ { - "AttributeName": "key", + "AttributeName": "fileName", + "AttributeType": "S" + }, + { + "AttributeName": "tablePath", "AttributeType": "S" } ], - "TableName": "delta_rs_lock_table", + "TableName": "delta_log", "KeySchema": [ { - "AttributeName": "key", + "AttributeName": "tablePath", "KeyType": "HASH" + }, + { + "AttributeName": "fileName", + "KeyType": "RANGE" } - ] - } + ], + } Here is an example writing to s3 using this mechanism: @@ -523,16 +532,37 @@ Here is an example writing to s3 using this mechanism: >>> from deltalake import write_deltalake >>> df = pd.DataFrame({'x': [1, 2, 3]}) - >>> storage_options = {'AWS_S3_LOCKING_PROVIDER': 'dynamodb', 'DYNAMO_LOCK_TABLE_NAME': 'custom_table_name'} - >>> write_deltalake('s3://path/to/table', df, 'storage_options'= storage_options) + >>> storage_options = {'AWS_S3_LOCKING_PROVIDER': 'dynamodb', 'DELTA_DYNAMO_TABLE_NAME': 'custom_table_name'} + >>> write_deltalake('s3a://path/to/table', df, 'storage_options'= storage_options) + +.. note:: + This locking mechanism is compatible with the one used by Apache Spark. The `tablePath` property, + denoting the root url of the delta table itself, is part of the primary key, and all writers + intending to write to the same table must match this property precisely. In Spark, S3 URLs + are prefixed with `s3a://`, and a table in delta-rs must be configured accordingly. + +The following code allows creating the necessary table from the AWS cli: + +.. code-block:: sh + + aws dynamodb create-table \ + --table-name delta_log \ + --attribute-definitions AttributeName=tablePath,AttributeType=S AttributeName=fileName,AttributeType=S \ + --key-schema AttributeName=tablePath,KeyType=HASH AttributeName=fileName,KeyType=RANGE \ + --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 + +You can find additional information in the `delta-rs-documentation +https://docs.delta.io/latest/delta-storage.html#multi-cluster-setup`_, which +also includes recommendations on configuring a time-to-live (TTL) for the table to +avoid growing the table indefinitely. + +https://docs.delta.io/latest/delta-storage.html#production-configuration-s3-multi-cluster .. note:: if for some reason you don't want to use dynamodb as your locking mechanism you can choose to set the `AWS_S3_ALLOW_UNSAFE_RENAME` variable to ``true`` in order to enable S3 unsafe writes. -Please note that this locking mechanism is not compatible with any other -locking mechanisms, including the one used by Spark. Updating Delta Tables --------------------- @@ -561,7 +591,7 @@ Update all the rows for the column "processed" to the value True. :meth:`DeltaTable.update` predicates and updates are all in string format. The predicates and expressions, are parsed into Apache Datafusion expressions. -Apply a soft deletion based on a predicate, so update all the rows for the column "deleted" to the value +Apply a soft deletion based on a predicate, so update all the rows for the column "deleted" to the value True where x = 3 .. code-block:: python