Skip to content

Commit

Permalink
Avoid RecursionError when failing to pickle key in SpillBuffer an…
Browse files Browse the repository at this point in the history
…d using `tblib=3` (#8404)
  • Loading branch information
hendrikmakait authored Dec 14, 2023
1 parent add1fa2 commit 78050cc
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 14 deletions.
26 changes: 15 additions & 11 deletions distributed/spill.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,25 +136,24 @@ def _handle_errors(self, key: Key | None) -> Iterator[None]:
logger.error("Spill to disk failed; keeping data in memory", exc_info=True)
raise HandledError()
except PickleError as e:
key_e, orig_e = e.args
assert key_e in self.fast
assert key_e not in self.slow
if key_e == key:
assert e.key in self.fast
assert e.key not in self.slow
if e.key == key:
assert key is not None
# The key we just inserted failed to serialize.
# This happens only when the key is individually larger than target.
# The exception will be caught by Worker and logged; the status of
# the task will be set to error.
del self[key]
raise orig_e
raise
else:
# The key we just inserted is smaller than target, but it caused
# another, unrelated key to be spilled out of the LRU, and that key
# failed to serialize. There's nothing wrong with the new key. The older
# key is still in memory.
if key_e not in self.logged_pickle_errors:
logger.error("Failed to pickle %r", key_e, exc_info=True)
self.logged_pickle_errors.add(key_e)
if e.key not in self.logged_pickle_errors:
logger.error("Failed to pickle %r", e.key, exc_info=True)
self.logged_pickle_errors.add(e.key)
raise HandledError()

def __setitem__(self, key: Key, value: object) -> None:
Expand Down Expand Up @@ -267,8 +266,13 @@ class MaxSpillExceeded(Exception):
pass


class PickleError(Exception):
pass
class PickleError(TypeError):
def __str__(self) -> str:
return f"Failed to pickle {self.key!r}"

@property
def key(self) -> Key:
return self.args[0]


class HandledError(Exception):
Expand Down Expand Up @@ -324,7 +328,7 @@ def __setitem__(self, key: Key, value: object) -> None:
# zict.LRU ensures that the key remains in fast if we raise.
# Wrap the exception so that it's recognizable by SpillBuffer,
# which will then unwrap it.
raise PickleError(key, e)
raise PickleError(key) from e

# Thanks to Buffer.__setitem__, we never update existing
# keys in slow, but always delete them and reinsert them.
Expand Down
5 changes: 3 additions & 2 deletions distributed/tests/test_spill.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,10 @@ def test_spillbuffer_fail_to_serialize(tmp_path):
a = Bad(size=201)

# Exception caught in the worker
with pytest.raises(TypeError, match="Could not serialize"):
with pytest.raises(TypeError, match="Failed to pickle 'a'") as e:
with captured_logger("distributed.spill") as logs_bad_key:
buf["a"] = a
assert isinstance(e.value.__cause__.__cause__, MyError)

# spill.py must remain silent because we're already logging in worker.py
assert not logs_bad_key.getvalue()
Expand All @@ -240,7 +241,7 @@ def test_spillbuffer_fail_to_serialize(tmp_path):

# worker.py won't intercept the exception here, so spill.py must dump the traceback
logs_value = logs_bad_key_mem.getvalue()
assert "Failed to pickle" in logs_value # from distributed.spill
assert "Failed to pickle 'b'" in logs_value # from distributed.spill
assert "Traceback" in logs_value # from distributed.spill
assert_buf(buf, tmp_path, {"b": b, "c": c}, {})

Expand Down
3 changes: 2 additions & 1 deletion distributed/tests/test_worker_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,9 @@ async def test_fail_to_pickle_execute_1(c, s, a, b):

assert x.status == "error"

with pytest.raises(TypeError, match="Could not serialize"):
with pytest.raises(TypeError, match="Failed to pickle 'x'") as e:
await x
assert isinstance(e.value.__cause__.__cause__, CustomError)

await assert_basic_futures(c)

Expand Down

0 comments on commit 78050cc

Please sign in to comment.