From 00ccb7a5c1f246b5099265058a5e9875e6627024 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH <57220027+harshachinta@users.noreply.github.com> Date: Thu, 20 Jun 2024 13:21:24 +0530 Subject: [PATCH] feat(spanner): add support for txn changstream exclusion (#1152) * feat(spanner): add support for txn changstream exclusion * feat(spanner): add tests for txn change streams exclusion * chore(spanner): lint fix * feat(spanner): add docs * feat(spanner): add test for ILB with change stream exclusion * feat(spanner): update default value and add optional --- google/cloud/spanner_v1/batch.py | 21 ++- google/cloud/spanner_v1/database.py | 43 +++++- google/cloud/spanner_v1/session.py | 8 ++ google/cloud/spanner_v1/transaction.py | 11 +- tests/unit/test_batch.py | 42 +++++- tests/unit/test_database.py | 16 ++- tests/unit/test_session.py | 185 +++++++++++++++++++++++++ tests/unit/test_spanner.py | 37 ++++- 8 files changed, 346 insertions(+), 17 deletions(-) diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index 9cb2afbc2c..e3d681189c 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -147,7 +147,11 @@ def _check_state(self): raise ValueError("Batch already committed") def commit( - self, return_commit_stats=False, request_options=None, max_commit_delay=None + self, + return_commit_stats=False, + request_options=None, + max_commit_delay=None, + exclude_txn_from_change_streams=False, ): """Commit mutations to the database. @@ -178,7 +182,10 @@ def commit( metadata.append( _metadata_with_leader_aware_routing(database._route_to_leader_enabled) ) - txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite()) + txn_options = TransactionOptions( + read_write=TransactionOptions.ReadWrite(), + exclude_txn_from_change_streams=exclude_txn_from_change_streams, + ) trace_attributes = {"num_mutations": len(self._mutations)} if request_options is None: @@ -270,7 +277,7 @@ def group(self): self._mutation_groups.append(mutation_group) return MutationGroup(self._session, mutation_group.mutations) - def batch_write(self, request_options=None): + def batch_write(self, request_options=None, exclude_txn_from_change_streams=False): """Executes batch_write. :type request_options: @@ -280,6 +287,13 @@ def batch_write(self, request_options=None): If a dict is provided, it must be of the same form as the protobuf message :class:`~google.cloud.spanner_v1.types.RequestOptions`. + :type exclude_txn_from_change_streams: bool + :param exclude_txn_from_change_streams: + (Optional) If true, instructs the transaction to be excluded from being recorded in change streams + with the DDL option `allow_txn_exclusion=true`. This does not exclude the transaction from + being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or + unset. + :rtype: :class:`Iterable[google.cloud.spanner_v1.types.BatchWriteResponse]` :returns: a sequence of responses for each batch. """ @@ -302,6 +316,7 @@ def batch_write(self, request_options=None): session=self._session.name, mutation_groups=self._mutation_groups, request_options=request_options, + exclude_txn_from_change_streams=exclude_txn_from_change_streams, ) with trace_call("CloudSpanner.BatchWrite", self._session, trace_attributes): method = functools.partial( diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 356bec413c..5b7c27b236 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -619,6 +619,7 @@ def execute_partitioned_dml( param_types=None, query_options=None, request_options=None, + exclude_txn_from_change_streams=False, ): """Execute a partitionable DML statement. @@ -651,6 +652,13 @@ def execute_partitioned_dml( Please note, the `transactionTag` setting will be ignored as it is not supported for partitioned DML. + :type exclude_txn_from_change_streams: bool + :param exclude_txn_from_change_streams: + (Optional) If true, instructs the transaction to be excluded from being recorded in change streams + with the DDL option `allow_txn_exclusion=true`. This does not exclude the transaction from + being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or + unset. + :rtype: int :returns: Count of rows affected by the DML statement. """ @@ -673,7 +681,8 @@ def execute_partitioned_dml( api = self.spanner_api txn_options = TransactionOptions( - partitioned_dml=TransactionOptions.PartitionedDml() + partitioned_dml=TransactionOptions.PartitionedDml(), + exclude_txn_from_change_streams=exclude_txn_from_change_streams, ) metadata = _metadata_with_prefix(self.name) @@ -752,7 +761,12 @@ def snapshot(self, **kw): """ return SnapshotCheckout(self, **kw) - def batch(self, request_options=None, max_commit_delay=None): + def batch( + self, + request_options=None, + max_commit_delay=None, + exclude_txn_from_change_streams=False, + ): """Return an object which wraps a batch. The wrapper *must* be used as a context manager, with the batch @@ -771,10 +785,19 @@ def batch(self, request_options=None, max_commit_delay=None): in order to improve throughput. Value must be between 0ms and 500ms. + :type exclude_txn_from_change_streams: bool + :param exclude_txn_from_change_streams: + (Optional) If true, instructs the transaction to be excluded from being recorded in change streams + with the DDL option `allow_txn_exclusion=true`. This does not exclude the transaction from + being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or + unset. + :rtype: :class:`~google.cloud.spanner_v1.database.BatchCheckout` :returns: new wrapper """ - return BatchCheckout(self, request_options, max_commit_delay) + return BatchCheckout( + self, request_options, max_commit_delay, exclude_txn_from_change_streams + ) def mutation_groups(self): """Return an object which wraps a mutation_group. @@ -840,6 +863,10 @@ def run_in_transaction(self, func, *args, **kw): "max_commit_delay" will be removed and used to set the max_commit_delay for the request. Value must be between 0ms and 500ms. + "exclude_txn_from_change_streams" if true, instructs the transaction to be excluded + from being recorded in change streams with the DDL option `allow_txn_exclusion=true`. + This does not exclude the transaction from being recorded in the change streams with + the DDL option `allow_txn_exclusion` being false or unset. :rtype: Any :returns: The return value of ``func``. @@ -1103,7 +1130,13 @@ class BatchCheckout(object): in order to improve throughput. """ - def __init__(self, database, request_options=None, max_commit_delay=None): + def __init__( + self, + database, + request_options=None, + max_commit_delay=None, + exclude_txn_from_change_streams=False, + ): self._database = database self._session = self._batch = None if request_options is None: @@ -1113,6 +1146,7 @@ def __init__(self, database, request_options=None, max_commit_delay=None): else: self._request_options = request_options self._max_commit_delay = max_commit_delay + self._exclude_txn_from_change_streams = exclude_txn_from_change_streams def __enter__(self): """Begin ``with`` block.""" @@ -1130,6 +1164,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): return_commit_stats=self._database.log_commit_stats, request_options=self._request_options, max_commit_delay=self._max_commit_delay, + exclude_txn_from_change_streams=self._exclude_txn_from_change_streams, ) finally: if self._database.log_commit_stats and self._batch.commit_stats: diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index 52994e58e2..28280282f4 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -387,6 +387,10 @@ def run_in_transaction(self, func, *args, **kw): request options for the commit request. "max_commit_delay" will be removed and used to set the max commit delay for the request. "transaction_tag" will be removed and used to set the transaction tag for the request. + "exclude_txn_from_change_streams" if true, instructs the transaction to be excluded + from being recorded in change streams with the DDL option `allow_txn_exclusion=true`. + This does not exclude the transaction from being recorded in the change streams with + the DDL option `allow_txn_exclusion` being false or unset. :rtype: Any :returns: The return value of ``func``. @@ -398,12 +402,16 @@ def run_in_transaction(self, func, *args, **kw): commit_request_options = kw.pop("commit_request_options", None) max_commit_delay = kw.pop("max_commit_delay", None) transaction_tag = kw.pop("transaction_tag", None) + exclude_txn_from_change_streams = kw.pop( + "exclude_txn_from_change_streams", None + ) attempts = 0 while True: if self._transaction is None: txn = self.transaction() txn.transaction_tag = transaction_tag + txn.exclude_txn_from_change_streams = exclude_txn_from_change_streams else: txn = self._transaction diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index b02a43e8d2..ee1dd8ef3b 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -55,6 +55,7 @@ class Transaction(_SnapshotBase, _BatchBase): _execute_sql_count = 0 _lock = threading.Lock() _read_only = False + exclude_txn_from_change_streams = False def __init__(self, session): if session._transaction is not None: @@ -86,7 +87,10 @@ def _make_txn_selector(self): if self._transaction_id is None: return TransactionSelector( - begin=TransactionOptions(read_write=TransactionOptions.ReadWrite()) + begin=TransactionOptions( + read_write=TransactionOptions.ReadWrite(), + exclude_txn_from_change_streams=self.exclude_txn_from_change_streams, + ) ) else: return TransactionSelector(id=self._transaction_id) @@ -137,7 +141,10 @@ def begin(self): metadata.append( _metadata_with_leader_aware_routing(database._route_to_leader_enabled) ) - txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite()) + txn_options = TransactionOptions( + read_write=TransactionOptions.ReadWrite(), + exclude_txn_from_change_streams=self.exclude_txn_from_change_streams, + ) with trace_call("CloudSpanner.BeginTransaction", self._session): method = functools.partial( api.begin_transaction, diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index 1c02e93f1d..ee96decf5e 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -259,7 +259,12 @@ def test_commit_ok(self): "CloudSpanner.Commit", attributes=dict(BASE_ATTRIBUTES, num_mutations=1) ) - def _test_commit_with_options(self, request_options=None, max_commit_delay_in=None): + def _test_commit_with_options( + self, + request_options=None, + max_commit_delay_in=None, + exclude_txn_from_change_streams=False, + ): import datetime from google.cloud.spanner_v1 import CommitResponse from google.cloud.spanner_v1 import TransactionOptions @@ -276,7 +281,9 @@ def _test_commit_with_options(self, request_options=None, max_commit_delay_in=No batch.transaction_tag = self.TRANSACTION_TAG batch.insert(TABLE_NAME, COLUMNS, VALUES) committed = batch.commit( - request_options=request_options, max_commit_delay=max_commit_delay_in + request_options=request_options, + max_commit_delay=max_commit_delay_in, + exclude_txn_from_change_streams=exclude_txn_from_change_streams, ) self.assertEqual(committed, now) @@ -301,6 +308,10 @@ def _test_commit_with_options(self, request_options=None, max_commit_delay_in=No self.assertEqual(mutations, batch._mutations) self.assertIsInstance(single_use_txn, TransactionOptions) self.assertTrue(type(single_use_txn).pb(single_use_txn).HasField("read_write")) + self.assertEqual( + single_use_txn.exclude_txn_from_change_streams, + exclude_txn_from_change_streams, + ) self.assertEqual( metadata, [ @@ -355,6 +366,14 @@ def test_commit_w_max_commit_delay(self): max_commit_delay_in=datetime.timedelta(milliseconds=100), ) + def test_commit_w_exclude_txn_from_change_streams(self): + request_options = RequestOptions( + request_tag="tag-1", + ) + self._test_commit_with_options( + request_options=request_options, exclude_txn_from_change_streams=True + ) + def test_context_mgr_already_committed(self): import datetime from google.cloud._helpers import UTC @@ -499,7 +518,9 @@ def test_batch_write_grpc_error(self): attributes=dict(BASE_ATTRIBUTES, num_mutation_groups=1), ) - def _test_batch_write_with_request_options(self, request_options=None): + def _test_batch_write_with_request_options( + self, request_options=None, exclude_txn_from_change_streams=False + ): import datetime from google.cloud.spanner_v1 import BatchWriteResponse from google.cloud._helpers import UTC @@ -519,7 +540,10 @@ def _test_batch_write_with_request_options(self, request_options=None): group = groups.group() group.insert(TABLE_NAME, COLUMNS, VALUES) - response_iter = groups.batch_write(request_options) + response_iter = groups.batch_write( + request_options, + exclude_txn_from_change_streams=exclude_txn_from_change_streams, + ) self.assertEqual(len(response_iter), 1) self.assertEqual(response_iter[0], response) @@ -528,6 +552,7 @@ def _test_batch_write_with_request_options(self, request_options=None): mutation_groups, actual_request_options, metadata, + request_exclude_txn_from_change_streams, ) = api._batch_request self.assertEqual(session, self.SESSION_NAME) self.assertEqual(mutation_groups, groups._mutation_groups) @@ -545,6 +570,9 @@ def _test_batch_write_with_request_options(self, request_options=None): else: expected_request_options = request_options self.assertEqual(actual_request_options, expected_request_options) + self.assertEqual( + request_exclude_txn_from_change_streams, exclude_txn_from_change_streams + ) self.assertSpanAttributes( "CloudSpanner.BatchWrite", @@ -567,6 +595,11 @@ def test_batch_write_w_incorrect_tag_dictionary_error(self): with self.assertRaises(ValueError): self._test_batch_write_with_request_options({"incorrect_tag": "tag-1-1"}) + def test_batch_write_w_exclude_txn_from_change_streams(self): + self._test_batch_write_with_request_options( + exclude_txn_from_change_streams=True + ) + class _Session(object): def __init__(self, database=None, name=TestBatch.SESSION_NAME): @@ -625,6 +658,7 @@ def batch_write( request.mutation_groups, request.request_options, metadata, + request.exclude_txn_from_change_streams, ) if self._rpc_error: raise Unknown("error") diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index ec2983ff7e..90fa0c269f 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -1083,6 +1083,7 @@ def _execute_partitioned_dml_helper( query_options=None, request_options=None, retried=False, + exclude_txn_from_change_streams=False, ): from google.api_core.exceptions import Aborted from google.api_core.retry import Retry @@ -1129,13 +1130,19 @@ def _execute_partitioned_dml_helper( api.execute_streaming_sql.return_value = iterator row_count = database.execute_partitioned_dml( - dml, params, param_types, query_options, request_options + dml, + params, + param_types, + query_options, + request_options, + exclude_txn_from_change_streams, ) self.assertEqual(row_count, 2) txn_options = TransactionOptions( - partitioned_dml=TransactionOptions.PartitionedDml() + partitioned_dml=TransactionOptions.PartitionedDml(), + exclude_txn_from_change_streams=exclude_txn_from_change_streams, ) api.begin_transaction.assert_called_with( @@ -1250,6 +1257,11 @@ def test_execute_partitioned_dml_w_req_tag_used(self): def test_execute_partitioned_dml_wo_params_retry_aborted(self): self._execute_partitioned_dml_helper(dml=DML_WO_PARAM, retried=True) + def test_execute_partitioned_dml_w_exclude_txn_from_change_streams(self): + self._execute_partitioned_dml_helper( + dml=DML_WO_PARAM, exclude_txn_from_change_streams=True + ) + def test_session_factory_defaults(self): from google.cloud.spanner_v1.session import Session diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 917e875f22..d4052f0ae3 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -1696,6 +1696,191 @@ def unit_of_work(txn, *args, **kw): ], ) + def test_run_in_transaction_w_exclude_txn_from_change_streams(self): + import datetime + from google.cloud.spanner_v1 import CommitRequest + from google.cloud.spanner_v1 import CommitResponse + from google.cloud.spanner_v1 import ( + Transaction as TransactionPB, + TransactionOptions, + ) + from google.cloud._helpers import UTC + from google.cloud._helpers import _datetime_to_pb_timestamp + from google.cloud.spanner_v1.transaction import Transaction + + TABLE_NAME = "citizens" + COLUMNS = ["email", "first_name", "last_name", "age"] + VALUES = [ + ["phred@exammple.com", "Phred", "Phlyntstone", 32], + ["bharney@example.com", "Bharney", "Rhubble", 31], + ] + TRANSACTION_ID = b"FACEDACE" + transaction_pb = TransactionPB(id=TRANSACTION_ID) + now = datetime.datetime.utcnow().replace(tzinfo=UTC) + now_pb = _datetime_to_pb_timestamp(now) + commit_stats = CommitResponse.CommitStats(mutation_count=4) + response = CommitResponse(commit_timestamp=now_pb, commit_stats=commit_stats) + gax_api = self._make_spanner_api() + gax_api.begin_transaction.return_value = transaction_pb + gax_api.commit.return_value = response + database = self._make_database() + database.spanner_api = gax_api + session = self._make_one(database) + session._session_id = self.SESSION_ID + + called_with = [] + + def unit_of_work(txn, *args, **kw): + called_with.append((txn, args, kw)) + txn.insert(TABLE_NAME, COLUMNS, VALUES) + return 42 + + return_value = session.run_in_transaction( + unit_of_work, "abc", exclude_txn_from_change_streams=True + ) + + self.assertIsNone(session._transaction) + self.assertEqual(len(called_with), 1) + txn, args, kw = called_with[0] + self.assertIsInstance(txn, Transaction) + self.assertEqual(return_value, 42) + self.assertEqual(args, ("abc",)) + + expected_options = TransactionOptions( + read_write=TransactionOptions.ReadWrite(), + exclude_txn_from_change_streams=True, + ) + gax_api.begin_transaction.assert_called_once_with( + session=self.SESSION_NAME, + options=expected_options, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], + ) + request = CommitRequest( + session=self.SESSION_NAME, + mutations=txn._mutations, + transaction_id=TRANSACTION_ID, + request_options=RequestOptions(), + ) + gax_api.commit.assert_called_once_with( + request=request, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], + ) + + def test_run_in_transaction_w_abort_w_retry_metadata_w_exclude_txn_from_change_streams( + self, + ): + import datetime + from google.api_core.exceptions import Aborted + from google.protobuf.duration_pb2 import Duration + from google.rpc.error_details_pb2 import RetryInfo + from google.cloud.spanner_v1 import CommitRequest + from google.cloud.spanner_v1 import CommitResponse + from google.cloud.spanner_v1 import ( + Transaction as TransactionPB, + TransactionOptions, + ) + from google.cloud._helpers import UTC + from google.cloud._helpers import _datetime_to_pb_timestamp + from google.cloud.spanner_v1.transaction import Transaction + + TABLE_NAME = "citizens" + COLUMNS = ["email", "first_name", "last_name", "age"] + VALUES = [ + ["phred@exammple.com", "Phred", "Phlyntstone", 32], + ["bharney@example.com", "Bharney", "Rhubble", 31], + ] + TRANSACTION_ID = b"FACEDACE" + RETRY_SECONDS = 12 + RETRY_NANOS = 3456 + retry_info = RetryInfo( + retry_delay=Duration(seconds=RETRY_SECONDS, nanos=RETRY_NANOS) + ) + trailing_metadata = [ + ("google.rpc.retryinfo-bin", retry_info.SerializeToString()) + ] + aborted = _make_rpc_error(Aborted, trailing_metadata=trailing_metadata) + transaction_pb = TransactionPB(id=TRANSACTION_ID) + now = datetime.datetime.utcnow().replace(tzinfo=UTC) + now_pb = _datetime_to_pb_timestamp(now) + response = CommitResponse(commit_timestamp=now_pb) + gax_api = self._make_spanner_api() + gax_api.begin_transaction.return_value = transaction_pb + gax_api.commit.side_effect = [aborted, response] + database = self._make_database() + database.spanner_api = gax_api + session = self._make_one(database) + session._session_id = self.SESSION_ID + + called_with = [] + + def unit_of_work(txn, *args, **kw): + called_with.append((txn, args, kw)) + txn.insert(TABLE_NAME, COLUMNS, VALUES) + + with mock.patch("time.sleep") as sleep_mock: + session.run_in_transaction( + unit_of_work, + "abc", + some_arg="def", + exclude_txn_from_change_streams=True, + ) + + sleep_mock.assert_called_once_with(RETRY_SECONDS + RETRY_NANOS / 1.0e9) + self.assertEqual(len(called_with), 2) + + for index, (txn, args, kw) in enumerate(called_with): + self.assertIsInstance(txn, Transaction) + if index == 1: + self.assertEqual(txn.committed, now) + else: + self.assertIsNone(txn.committed) + self.assertEqual(args, ("abc",)) + self.assertEqual(kw, {"some_arg": "def"}) + + expected_options = TransactionOptions( + read_write=TransactionOptions.ReadWrite(), + exclude_txn_from_change_streams=True, + ) + self.assertEqual( + gax_api.begin_transaction.call_args_list, + [ + mock.call( + session=self.SESSION_NAME, + options=expected_options, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], + ) + ] + * 2, + ) + request = CommitRequest( + session=self.SESSION_NAME, + mutations=txn._mutations, + transaction_id=TRANSACTION_ID, + request_options=RequestOptions(), + ) + self.assertEqual( + gax_api.commit.call_args_list, + [ + mock.call( + request=request, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], + ) + ] + * 2, + ) + def test_delay_helper_w_no_delay(self): from google.cloud.spanner_v1.session import _delay_until_retry diff --git a/tests/unit/test_spanner.py b/tests/unit/test_spanner.py index 0c7feed5ac..ab5479eb3c 100644 --- a/tests/unit/test_spanner.py +++ b/tests/unit/test_spanner.py @@ -137,6 +137,7 @@ def _execute_update_helper( api, count=0, query_options=None, + exclude_txn_from_change_streams=False, ): stats_pb = ResultSetStats(row_count_exact=1) @@ -145,6 +146,7 @@ def _execute_update_helper( api.execute_sql.return_value = ResultSet(stats=stats_pb, metadata=metadata_pb) transaction.transaction_tag = self.TRANSACTION_TAG + transaction.exclude_txn_from_change_streams = exclude_txn_from_change_streams transaction._execute_sql_count = count row_count = transaction.execute_update( @@ -160,11 +162,19 @@ def _execute_update_helper( self.assertEqual(row_count, count + 1) def _execute_update_expected_request( - self, database, query_options=None, begin=True, count=0 + self, + database, + query_options=None, + begin=True, + count=0, + exclude_txn_from_change_streams=False, ): if begin is True: expected_transaction = TransactionSelector( - begin=TransactionOptions(read_write=TransactionOptions.ReadWrite()) + begin=TransactionOptions( + read_write=TransactionOptions.ReadWrite(), + exclude_txn_from_change_streams=exclude_txn_from_change_streams, + ) ) else: expected_transaction = TransactionSelector(id=self.TRANSACTION_ID) @@ -560,6 +570,29 @@ def test_transaction_should_include_begin_with_first_batch_update(self): timeout=TIMEOUT, ) + def test_transaction_should_include_begin_w_exclude_txn_from_change_streams_with_first_update( + self, + ): + database = _Database() + session = _Session(database) + api = database.spanner_api = self._make_spanner_api() + transaction = self._make_one(session) + self._execute_update_helper( + transaction=transaction, api=api, exclude_txn_from_change_streams=True + ) + + api.execute_sql.assert_called_once_with( + request=self._execute_update_expected_request( + database=database, exclude_txn_from_change_streams=True + ), + retry=RETRY, + timeout=TIMEOUT, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], + ) + def test_transaction_should_use_transaction_id_if_error_with_first_batch_update( self, ):