diff --git a/CHANGES b/CHANGES index f954a07777..b7e238ebb3 100644 --- a/CHANGES +++ b/CHANGES @@ -9,6 +9,7 @@ * Remove verbose logging when initializing ClusterPubSub, ClusterPipeline or RedisCluster * Fix broken connection writer lock-up for asyncio (#2065) * Fix auth bug when provided with no username (#2086) + * Fix missing ClusterPipeline._lock (#2189) * 4.1.3 (Feb 8, 2022) * Fix flushdb and flushall (#1926) diff --git a/redis/cluster.py b/redis/cluster.py index fa1322f103..0b9c543afb 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -754,6 +754,7 @@ def pipeline(self, transaction=None, shard_hint=None): cluster_error_retry_attempts=self.cluster_error_retry_attempts, read_from_replicas=self.read_from_replicas, reinitialize_steps=self.reinitialize_steps, + lock=self._lock, ) def lock( @@ -1754,6 +1755,7 @@ def __init__( read_from_replicas=False, cluster_error_retry_attempts=5, reinitialize_steps=10, + lock=None, **kwargs, ): """ """ @@ -1776,6 +1778,9 @@ def __init__( kwargs.get("encoding_errors", "strict"), kwargs.get("decode_responses", False), ) + if lock is None: + lock = threading.Lock() + self._lock = lock def __repr__(self): """ """ diff --git a/redis/commands/json/__init__.py b/redis/commands/json/__init__.py index 638e4eb166..39983be9cb 100644 --- a/redis/commands/json/__init__.py +++ b/redis/commands/json/__init__.py @@ -113,6 +113,7 @@ def pipeline(self, transaction=True, shard_hint=None): cluster_error_retry_attempts=self.client.cluster_error_retry_attempts, read_from_replicas=self.client.read_from_replicas, reinitialize_steps=self.client.reinitialize_steps, + lock=self.client._lock, ) else: diff --git a/redis/commands/timeseries/__init__.py b/redis/commands/timeseries/__init__.py index 4720a430f8..4a6886f237 100644 --- a/redis/commands/timeseries/__init__.py +++ b/redis/commands/timeseries/__init__.py @@ -77,6 +77,7 @@ def pipeline(self, transaction=True, shard_hint=None): cluster_error_retry_attempts=self.client.cluster_error_retry_attempts, read_from_replicas=self.client.read_from_replicas, reinitialize_steps=self.client.reinitialize_steps, + lock=self.client._lock, ) else: