diff --git a/dataflows/helpers/iterable_loader.py b/dataflows/helpers/iterable_loader.py index 1b1adf7..2d36da1 100644 --- a/dataflows/helpers/iterable_loader.py +++ b/dataflows/helpers/iterable_loader.py @@ -68,18 +68,23 @@ def __init__(self, iterable, name=None): super(iterable_loader, self).__init__() self.iterable = iterable self.name = name + self.exc = None def handle_iterable(self): mode = None - for x in self.iterable: - if mode is None: - assert isinstance(x, (dict, list)) - mode = dict if isinstance(x, dict) else list - assert isinstance(x, mode) - if mode == dict: - yield x - else: - yield dict(zip(('col{}'.format(i) for i in range(len(x))), x)) + try: + for x in self.iterable: + if mode is None: + assert isinstance(x, (dict, list)) + mode = dict if isinstance(x, dict) else list + assert isinstance(x, mode) + if mode == dict: + yield x + else: + yield dict(zip(('col{}'.format(i) for i in range(len(x))), x)) + except Exception as e: + self.exc = e + raise def process_datapackage(self, dp: Package): name = self.name @@ -90,6 +95,8 @@ def process_datapackage(self, dp: Package): path='{}.csv'.format(name) ), storage=iterable_storage(self.handle_iterable())) self.res.infer() + if self.exc is not None: + raise self.exc dp.descriptor.setdefault('resources', []).append(self.res.descriptor) return dp diff --git a/tests/test_edge_cases.py b/tests/test_edge_cases.py new file mode 100644 index 0000000..0602c13 --- /dev/null +++ b/tests/test_edge_cases.py @@ -0,0 +1,15 @@ +import pytest + +def test_exception_in_generator(): + from dataflows import Flow, printer + + class MyException(Exception): + pass + + def generator(): + for i in range(5): + raise MyException() + yield {"i": i} + + with pytest.raises(MyException): + Flow(generator(), printer()).process()