From f353b918a410fe7ce2cf1eb8973d2433949a02bc Mon Sep 17 00:00:00 2001 From: "Maarten A. Breddels" Date: Wed, 24 Nov 2021 16:58:56 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20improved=20progress=20support?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * progress is now an argument to vaex.open to be used in combination with convert * more titles for progress values * rich progress support --- .github/workflows/pythonpackage.yml | 6 ++ packages/vaex-core/vaex/__init__.py | 23 +++-- packages/vaex-core/vaex/agg.py | 42 +++++--- packages/vaex-core/vaex/convert.py | 22 ++-- packages/vaex-core/vaex/dataframe.py | 105 ++++++++++++-------- packages/vaex-core/vaex/docstrings.py | 2 +- packages/vaex-core/vaex/expression.py | 12 ++- packages/vaex-core/vaex/groupby.py | 37 ++++--- packages/vaex-core/vaex/misc/progressbar.py | 75 ++++++++++++-- packages/vaex-core/vaex/utils.py | 41 +++++--- packages/vaex-hdf5/vaex/hdf5/writer.py | 27 ++++- packages/vaex-ml/vaex/ml/catboost.py | 2 +- packages/vaex-ml/vaex/ml/incubator/river.py | 2 +- packages/vaex-ml/vaex/ml/sklearn.py | 2 +- packages/vaex-ml/vaex/ml/transformations.py | 2 +- tests/agg_test.py | 2 +- tests/execution_test.py | 8 +- 17 files changed, 282 insertions(+), 128 deletions(-) diff --git a/.github/workflows/pythonpackage.yml b/.github/workflows/pythonpackage.yml index 8dcc4de650..a4c522f2b0 100644 --- a/.github/workflows/pythonpackage.yml +++ b/.github/workflows/pythonpackage.yml @@ -205,6 +205,12 @@ jobs: run: | python -c "import vaex; df = vaex.example()" + - name: Test comand line + run: | + vaex convert ~/.vaex/data/helmi-dezeeuw-2000-FeH-v2-10percent.hdf5 test.parquet + pip install rich + VAEX_PROGRESS_TYPE=rich vaex convert ~/.vaex/data/helmi-dezeeuw-2000-FeH-v2-10percent.hdf5 test.parquet + - name: Test server run: | vaex server --add-example --port 9999& diff --git a/packages/vaex-core/vaex/__init__.py b/packages/vaex-core/vaex/__init__.py index adfba6dd63..b30ca5ed32 100644 --- a/packages/vaex-core/vaex/__init__.py +++ b/packages/vaex-core/vaex/__init__.py @@ -40,6 +40,7 @@ import vaex.dataframe import vaex.dataset +from vaex.docstrings import docsubst from vaex.registry import register_function from vaex import functions, struct from . import stat @@ -93,7 +94,8 @@ def app(*args, **kwargs): return vaex.ui.main.VaexApp() -def open(path, convert=False, shuffle=False, fs_options={}, fs=None, *args, **kwargs): +@docsubst +def open(path, convert=False, progress=None, shuffle=False, fs_options={}, fs=None, *args, **kwargs): """Open a DataFrame from file given by path. Example: @@ -104,6 +106,7 @@ def open(path, convert=False, shuffle=False, fs_options={}, fs=None, *args, **kw :param str or list path: local or absolute path to file, or glob string, or list of paths :param convert: Uses `dataframe.export` when convert is a path. If True, ``convert=path+'.hdf5'`` The conversion is skipped if the input file or conversion argument did not change. + :param progress: (_Only applies when convert is not False_) {progress} :param bool shuffle: shuffle converted DataFrame or not :param dict fs_options: Extra arguments passed to an optional file system if needed: * Amazon AWS S3 @@ -138,9 +141,9 @@ def open(path, convert=False, shuffle=False, fs_options={}, fs=None, *args, **kw Examples: - >>> df = vaex.open('s3://vaex/taxi/yellow_taxi_2015_f32s.hdf5', fs_options={'anonymous': True}) + >>> df = vaex.open('s3://vaex/taxi/yellow_taxi_2015_f32s.hdf5', fs_options={{'anonymous': True}}) >>> df = vaex.open('s3://vaex/taxi/yellow_taxi_2015_f32s.hdf5?anon=true') - >>> df = vaex.open('s3://mybucket/path/to/file.hdf5', fs_options={'access_key': my_key, 'secret_key': my_secret_key}) + >>> df = vaex.open('s3://mybucket/path/to/file.hdf5', fs_options={{'access_key': my_key, 'secret_key': my_secret_key}}) >>> df = vaex.open(f's3://mybucket/path/to/file.hdf5?access_key={{my_key}}&secret_key={{my_secret_key}}') >>> df = vaex.open('s3://mybucket/path/to/file.hdf5?profile=myproject') @@ -154,7 +157,7 @@ def open(path, convert=False, shuffle=False, fs_options={}, fs=None, *args, **kw Examples: - >>> df = vaex.open('gs://vaex-data/airlines/us_airline_data_1988_2019.hdf5', fs_options={'token': None}) + >>> df = vaex.open('gs://vaex-data/airlines/us_airline_data_1988_2019.hdf5', fs_options={{'token': None}}) >>> df = vaex.open('gs://vaex-data/airlines/us_airline_data_1988_2019.hdf5?token=anon') >>> df = vaex.open('gs://vaex-data/testing/xys.hdf5?token=anon&cache=False') """ @@ -212,12 +215,13 @@ def open(path, convert=False, shuffle=False, fs_options={}, fs=None, *args, **kw # # naked_path, _ = vaex.file.split_options(path, fs_options) _, ext, _ = vaex.file.split_ext(path) if ext == '.csv': # special case for csv - return vaex.from_csv(path, fs_options=fs_options, fs=fs, convert=convert, **kwargs) + return vaex.from_csv(path, fs_options=fs_options, fs=fs, convert=convert, progress=progress, **kwargs) if convert: path_output = convert if isinstance(convert, str) else filename_hdf5 vaex.convert.convert( path_input=path, fs_options_input=fs_options, fs_input=fs, path_output=path_output, fs_options_output=fs_options, fs_output=fs, + progress=progress, *args, **kwargs ) ds = vaex.dataset.open(path_output, fs_options=fs_options, fs=fs, **kwargs) @@ -242,7 +246,7 @@ def open(path, convert=False, shuffle=False, fs_options={}, fs=None, *args, **kw if convert: if shuffle: df = df.shuffle() - df.export_hdf5(filename_hdf5) + df.export_hdf5(filename_hdf5, progress=progress) df = vaex.open(filename_hdf5) if df is None: @@ -499,7 +503,8 @@ def from_json(path_or_buffer, orient=None, precise_float=False, lines=False, cop copy_index=copy_index) -def from_csv(filename_or_buffer, copy_index=False, chunk_size=None, convert=False, fs_options={}, fs=None, **kwargs): +@docsubst +def from_csv(filename_or_buffer, copy_index=False, chunk_size=None, convert=False, fs_options={}, fs=None, progress=None, **kwargs): """ Read a CSV file as a DataFrame, and optionally convert to an hdf5 file. @@ -511,13 +516,14 @@ def from_csv(filename_or_buffer, copy_index=False, chunk_size=None, convert=Fals >>> import vaex >>> for i, df in enumerate(vaex.from_csv('taxi.csv', chunk_size=100_000)): >>> df = df[df.passenger_count < 6] - >>> df.export_hdf5(f'taxi_{i:02}.hdf5') + >>> df.export_hdf5(f'taxi_{{i:02}}.hdf5') :param bool or str convert: convert files to an hdf5 file for optimization, can also be a path. The CSV file will be read in chunks: either using the provided chunk_size argument, or a default size. Each chunk will be saved as a separate hdf5 file, then all of them will be combined into one hdf5 file. So for a big CSV file you will need at least double of extra space on the disk. Default chunk_size for converting is 5 million rows, which corresponds to around 1Gb memory on an example of NYC Taxi dataset. + :param progress: (_Only applies when convert is not False_) {progress} :param kwargs: extra keyword arguments, currently passed to Pandas read_csv function, but the implementation might change in future versions. :returns: DataFrame @@ -536,6 +542,7 @@ def from_csv(filename_or_buffer, copy_index=False, chunk_size=None, convert=Fals path_output=path_output, fs_options_output=fs_options, fs_output=fs, chunk_size=chunk_size, copy_index=copy_index, + progress=progress, **kwargs ) return open(path_output, fs_options=fs_options, fs=fs) diff --git a/packages/vaex-core/vaex/agg.py b/packages/vaex-core/vaex/agg.py index 7322de2429..00c5e2b154 100644 --- a/packages/vaex-core/vaex/agg.py +++ b/packages/vaex-core/vaex/agg.py @@ -84,8 +84,8 @@ def edges(self): def edges(self, value): self.agg.edges = value - def add_tasks(self, df, binners): - tasks, result = self.agg.add_tasks(df, binners) + def add_tasks(self, df, binners, progress): + tasks, result = self.agg.add_tasks(df, binners, progress) @vaex.delayed def finish(value): return self.finish(value) @@ -125,9 +125,10 @@ def edges(self, value): self.agg1.edges = value self.agg2.edges = value - def add_tasks(self, df, binners): - tasks1, result1 = self.agg1.add_tasks(df, binners) - tasks2, result2 = self.agg2.add_tasks(df, binners) + def add_tasks(self, df, binners, progress): + progressbar = vaex.utils.progressbars(progress, title=repr(self)) + tasks1, result1 = self.agg1.add_tasks(df, binners, progress=progressbar) + tasks2, result2 = self.agg2.add_tasks(df, binners, progress=progressbar) @vaex.delayed def finish(value1, value2): return self.finish(value1, value2) @@ -164,8 +165,9 @@ def edges(self): def edges(self, value): self.agg.edges = value - def add_tasks(self, df, binners): - tasks, result = self.agg.add_tasks(df, binners) + def add_tasks(self, df, binners, progress): + progressbar = vaex.utils.progressbars(progress, title=repr(self)) + tasks, result = self.agg.add_tasks(df, binners, progress=progressbar) @vaex.delayed def finish(value): return self.finish(value) @@ -223,6 +225,12 @@ def __init__(self, name, expression, short_name, multi_args=False, agg_args=[], else: self.expressions = expression + def __repr__(self): + if self.agg_args: + return 'vaex.agg.{}({!r}, {})'.format(self.short_name, str(self.expression), ", ".join(map(str, self.agg_args))) + else: + return 'vaex.agg.{}({!r})'.format(self.short_name, str(self.expression)) + def encode(self, encoding): spec = {'aggregation': self.short_name} if len(self.expressions) == 0: @@ -251,10 +259,12 @@ def _prepare_types(self, df): if self.short_name in ['sum', 'summoment']: self.dtype_out = self.dtype_in.upcast() - def add_tasks(self, df, binners): + def add_tasks(self, df, binners, progress): + progressbar = vaex.utils.progressbars(progress) self._prepare_types(df) task = vaex.tasks.TaskAggregation(df, binners, self) task = df.executor.schedule(task) + progressbar.add_task(task, repr(self)) @vaex.delayed def finish(value): return self.finish(value) @@ -325,14 +335,15 @@ class AggregatorDescriptorMean(AggregatorDescriptorMulti): def __init__(self, name, expression, short_name="mean", selection=None, edges=False): super(AggregatorDescriptorMean, self).__init__(name, expression, short_name, selection=selection, edges=edges) - def add_tasks(self, df, binners): + def add_tasks(self, df, binners, progress): + progressbar = vaex.utils.progressbars(progress, title=repr(self)) expression = expression_sum = expression = df[str(self.expression)] sum_agg = sum(expression_sum, selection=self.selection, edges=self.edges) count_agg = count(expression, selection=self.selection, edges=self.edges) - task_sum = sum_agg.add_tasks(df, binners)[0][0] - task_count = count_agg.add_tasks(df, binners)[0][0] + task_sum = sum_agg.add_tasks(df, binners, progress=progressbar)[0][0] + task_count = count_agg.add_tasks(df, binners, progress=progressbar)[0][0] self.dtype_in = sum_agg.dtype_in self.dtype_out = sum_agg.dtype_out @@ -359,16 +370,17 @@ def __init__(self, name, expression, short_name="var", ddof=0, selection=None, e super(AggregatorDescriptorVar, self).__init__(name, expression, short_name, selection=selection, edges=edges) self.ddof = ddof - def add_tasks(self, df, binners): + def add_tasks(self, df, binners, progress): + progressbar = vaex.utils.progressbars(progress, title=repr(self)) expression_sum = expression = df[str(self.expression)] expression = expression_sum = expression.astype('float64') sum_moment = _sum_moment(str(expression_sum), 2, selection=self.selection, edges=self.edges) sum_ = sum(str(expression_sum), selection=self.selection, edges=self.edges) count_ = count(str(expression), selection=self.selection, edges=self.edges) - task_sum_moment = sum_moment.add_tasks(df, binners)[0][0] - task_sum = sum_.add_tasks(df, binners)[0][0] - task_count = count_.add_tasks(df, binners)[0][0] + task_sum_moment = sum_moment.add_tasks(df, binners, progress=progressbar)[0][0] + task_sum = sum_.add_tasks(df, binners, progress=progressbar)[0][0] + task_count = count_.add_tasks(df, binners, progress=progressbar)[0][0] self.dtype_in = sum_.dtype_in self.dtype_out = sum_.dtype_out @vaex.delayed diff --git a/packages/vaex-core/vaex/convert.py b/packages/vaex-core/vaex/convert.py index 7fda817807..a4c1c96aa4 100644 --- a/packages/vaex-core/vaex/convert.py +++ b/packages/vaex-core/vaex/convert.py @@ -25,7 +25,7 @@ def _convert_name(filenames, shuffle=False, suffix=None): return base + ".hdf5" -def convert(path_input, fs_options_input, fs_input, path_output, fs_options_output, fs_output, *args, **kwargs): +def convert(path_input, fs_options_input, fs_input, path_output, fs_options_output, fs_output, progress=None, *args, **kwargs): @vaex.cache.output_file( path_input=path_input, fs_options_input=fs_options_input, fs_input=fs_input, path_output=path_output, fs_options_output=fs_options_output, fs_output=fs_output) @@ -33,11 +33,11 @@ def cached_output(*args, **kwargs): ds = vaex.dataset.open(path_input, fs_options=fs_options_input, fs=fs_input, *args, **kwargs) if ds is not None: df = vaex.from_dataset(ds) - df.export(path_output, fs_options=fs_options_output, fs=fs_output) + df.export(path_output, fs_options=fs_options_output, fs=fs_output, progress=progress) cached_output(*args, **kwargs) -def convert_csv(path_input, fs_options_input, fs_input, path_output, fs_options_output, fs_output, *args, **kwargs): +def convert_csv(path_input, fs_options_input, fs_input, path_output, fs_options_output, fs_output, progress=None, *args, **kwargs): @vaex.cache.output_file( path_input=path_input, fs_options_input=fs_options_input, fs_input=fs_input, path_output=path_output, fs_options_output=fs_options_output, fs_output=fs_output) @@ -47,11 +47,11 @@ def cached_output(*args, **kwargs): if 'chunk_size' not in kwargs: # make it memory efficient by default kwargs['chunk_size'] = 5_000_000 - _from_csv_convert_and_read(path_input, path_output=path_output, fs_options=fs_options_input, fs=fs_input, **kwargs) + _from_csv_convert_and_read(path_input, path_output=path_output, fs_options=fs_options_input, fs=fs_input, progress=progress, **kwargs) cached_output(*args, **kwargs) -def _from_csv_convert_and_read(filename_or_buffer, path_output, chunk_size, fs_options, fs=None, copy_index=False, **kwargs): +def _from_csv_convert_and_read(filename_or_buffer, path_output, chunk_size, fs_options, fs=None, copy_index=False, progress=None, **kwargs): # figure out the CSV file path csv_path = vaex.file.stringyfy(filename_or_buffer) path_output_bare, ext, _ = vaex.file.split_ext(path_output) @@ -61,24 +61,32 @@ def _from_csv_convert_and_read(filename_or_buffer, path_output, chunk_size, fs_o # convert CSV chunks to separate HDF5 files import pandas as pd converted_paths = [] + # we don't have indeterminate progress bars, so we cast it to truethy + progress = bool(progress) if progress is not None else False + if progress: + print("Converting csv to chunk files") with vaex.file.open(filename_or_buffer, fs_options=fs_options, fs=fs, for_arrow=True) as f: csv_reader = pd.read_csv(filename_or_buffer, chunksize=chunk_size, **kwargs) for i, df_pandas in enumerate(csv_reader): df = vaex.from_pandas(df_pandas, copy_index=copy_index) - chunk_name = f'{path_output_bare}_chunk_{i}.{ext}' + chunk_name = f'{path_output_bare}_chunk_{i}{ext}' df.export(chunk_name) converted_paths.append(chunk_name) log.info('saved chunk #%d to %s' % (i, chunk_name)) + if progress: + print("Saved chunk #%d to %s" % (i, chunk_name)) # combine chunks into one HDF5 file if len(converted_paths) == 1: # no need to merge several HDF5 files os.rename(converted_paths[0], path_output) else: + if progress: + print('Converting %d chunks into single file %s' % (len(converted_paths), path_output)) log.info('converting %d chunks into single file %s' % (len(converted_paths), path_output)) dfs = [vaex.open(p) for p in converted_paths] df_combined = vaex.concat(dfs) - df_combined.export(path_output) + df_combined.export(path_output, progress=progress) log.info('deleting %d chunk files' % len(converted_paths)) for df, df_path in zip(dfs, converted_paths): diff --git a/packages/vaex-core/vaex/dataframe.py b/packages/vaex-core/vaex/dataframe.py index 6516862148..f0efc362bb 100644 --- a/packages/vaex-core/vaex/dataframe.py +++ b/packages/vaex-core/vaex/dataframe.py @@ -423,7 +423,7 @@ def map_reduce(self, map, reduce, arguments, progress=False, delay=False, info=F pre_filter = pre_filter and self.filtered task = tasks.TaskMapReduce(self, arguments, map, reduce, info=info, to_numpy=to_numpy, ignore_filter=ignore_filter, selection=selection, pre_filter=pre_filter) progressbar = vaex.utils.progressbars(progress) - progressbar.add_task(task, name) + progressbar.add_task(task, f'map reduce: {name}') task = self.executor.schedule(task) return self._delay(delay, task) @@ -493,6 +493,8 @@ def _set(self, expression, progress=False, selection=None, flatten=True, delay=F expression = _ensure_string_from_expression(expression) task = vaex.tasks.TaskSetCreate(self, expression, flatten, unique_limit=unique_limit, selection=selection, return_inverse=return_inverse) task = self.executor.schedule(task) + progressbar = vaex.utils.progressbars(progress) + progressbar.add_task(task, f"set for {str(expression)}") return self._delay(delay, task) def _index(self, expression, progress=False, delay=False, prime_growth=False, cardinality=None): @@ -563,6 +565,7 @@ def unique(self, expression, return_inverse=False, dropna=False, dropnan=False, :param dropnan: do not count nan values :param dropna: short for any of the above, (see :func:`Expression.isna`) :param int axis: Axis over which to determine the unique elements (None will flatten arrays or lists) + :param progress: {progress} :param str array_type: {array_type} """ if dropna: @@ -596,7 +599,7 @@ def map(thread_index, i1, i2, selection_mask, blocks): inverse[i1:i2] = ordered_set.map_ordinal(ar) def reduce(a, b): pass - self.map_reduce(map, reduce, [expression], delay=delay, name='unique_return_inverse', info=True, to_numpy=False, selection=selection) + self.map_reduce(map, reduce, [expression], delay=delay, name='unique_return_inverse', progress=progress_inverse, info=True, to_numpy=False, selection=selection) # ordered_set.seal() # if array_type == 'python': if data_type_item.is_object: @@ -638,7 +641,11 @@ def reduce(a, b): return keys, inverse else: return keys - return self._delay(delay, process(self._set(expression, progress=progress, selection=selection, flatten=axis is None, delay=delay))) + progressbar = vaex.utils.progressbars(progress, title="unique") + set_result = self._set(expression, progress=progressbar, selection=selection, flatten=axis is None, delay=True) + if return_inverse: + progress_inverse = progressbar.add("find inverse") + return self._delay(delay, process(set_result)) @docsubst @@ -834,9 +841,16 @@ def _compute_agg(self, name, expression, binby=[], limits=None, shape=default_sh # TODO: temporary disabled # len(self) # fill caches and masks pass - binners = self._create_binners(binby, limits, shape, selection=selection, delay=True) + progressbar = vaex.utils.progressbars(progress, title=name) + if not isinstance(binby, (list, tuple)) or len(binby) > 0: + progressbar_limits = progressbar.add("binners") + binners = self._create_binners(binby, limits, shape, selection=selection, delay=True, progress=progressbar_limits) + else: + binners = () + progressbar_agg = progressbar @delayed - def compute(expression, binners, selection, edges, progressbar): + def compute(expression, binners, selection, edges): + binners = tuple(binners) if not hasattr(self.local, '_aggregator_nest_count'): self.local._aggregator_nest_count = 0 self.local._aggregator_nest_count += 1 @@ -848,9 +862,7 @@ def compute(expression, binners, selection, edges, progressbar): agg = vaex.agg.aggregates[name](expression, *extra_expressions, selection=selection, edges=edges) else: agg = vaex.agg.aggregates[name](expression, selection=selection, edges=edges) - tasks, result = agg.add_tasks(self, binners) - for task in tasks: - progressbar.add_task(task, "%s for %s" % (name, expression)) + tasks, result = agg.add_tasks(self, binners, progress=progressbar) @delayed def finish(counts): return np.asarray(counts) @@ -882,8 +894,7 @@ def to_coord(binner): return np.asarray(vaex.utils.unlistify(expression_waslist, counts)) else: raise RuntimeError(f'Unknown array_type {format}') - progressbar = vaex.utils.progressbars(progress) - stats = [compute(expression, binners, selection=selection, edges=edges, progressbar=progressbar) for expression in expressions] + stats = [compute(expression, binners, selection=selection, edges=edges) for expression in expressions] var = finish(binners, *stats) return self._delay(delay, var) @@ -1169,7 +1180,7 @@ def calculate(limits): results.append(cov(mx, my, cxy)) return results - progressbar = vaex.utils.progressbars(progress) + progressbar = vaex.utils.progressbars(progress, title="covar") covars = calculate(limits) @delayed @@ -1217,7 +1228,7 @@ def correlation(self, x, y=None, binby=[], limits=None, shape=default_shape, sor :return: {return_stat_scalar} """ selection = _normalize_selection(selection) - progressbar = vaex.utils.progressbars(progress, name="correlation") + progressbar = vaex.utils.progressbars(progress, title="correlation") if y is None: if not _issequence(x): raise ValueError("if y not given, x is expected to be a list or tuple, not %r" % x) @@ -1328,7 +1339,6 @@ def cov(self, x, y=None, binby=[], limits=None, shape=default_shape, selection=F N = len(expressions) binby = _ensure_list(binby) shape = _expand_shape(shape, len(binby)) - progressbar = vaex.utils.progressbars(progress) limits = self.limits(binby, limits, selection=selection, delay=True) @delayed @@ -1358,7 +1368,7 @@ def finish(values): moments2 = sums / counts cov_matrix = moments2 - meansxy return cov_matrix - progressbar = vaex.utils.progressbars(progress) + progressbar = vaex.utils.progressbars(progress, title="cov") values = calculate(expressions, limits) cov_matrix = finish(values) return self._delay(delay, cov_matrix) @@ -1419,7 +1429,7 @@ def finish(*minmax_list): all_same_kind = all(isinstance(data_type.internal, np.dtype) for data_type in data_types) and all([k.kind == data_type0.kind for k in data_types]) if not (all_same_kind or all([k == data_type0 for k in data_types])): raise TypeError("cannot mix different dtypes in 1 minmax call") - progressbar = vaex.utils.progressbars(progress, name="minmaxes") + progressbar = vaex.utils.progressbars(progress, title="minmaxes") limits = self.limits(binby, limits, selection=selection, delay=True) all_tasks = [calculate(expression, limits) for expression in expressions] result = finish(*all_tasks) @@ -1660,7 +1670,7 @@ def _delay(self, delay, task, progressbar=False): return task.get() @docsubst - def limits_percentage(self, expression, percentage=99.73, square=False, selection=False, delay=False): + def limits_percentage(self, expression, percentage=99.73, square=False, selection=False, progress=None, delay=False): """Calculate the [min, max] range for expression, containing approximately a percentage of the data as defined by percentage. @@ -1682,6 +1692,7 @@ def limits_percentage(self, expression, percentage=99.73, square=False, selectio :return: {return_limits} """ logger.info("limits_percentage for %r, with percentage=%r", expression, percentage) + progressbar = vaex.utils.progressbars(progress, title="limits_percentage") waslist, [expressions, ] = vaex.utils.listify(expression) limits = [] for expr in expressions: @@ -1698,7 +1709,7 @@ def compute_limits(counts): return l vmin, vmax = limits_minmax size = 1024 * 16 - counts = self.count(binby=expr, shape=size, limits=limits_minmax, selection=selection, delay=delay) + counts = self.count(binby=expr, shape=size, limits=limits_minmax, selection=selection, progress=progressbar, delay=delay) return compute_limits(counts) # limits.append(l) limits_minmax = self.minmax(expr, selection=selection, delay=delay) @@ -1707,7 +1718,7 @@ def compute_limits(counts): return self._delay(delay, delayed(vaex.utils.unlistify)(waslist, limits)) @docsubst - def limits(self, expression, value=None, square=False, selection=None, delay=False, shape=None): + def limits(self, expression, value=None, square=False, selection=None, delay=False, progress=None, shape=None): """Calculate the [min, max] range for expression, as described by value, which is 'minmax' by default. If value is a list of the form [minvalue, maxvalue], it is simply returned, this is for convenience when using mixed @@ -1749,6 +1760,7 @@ def limits(self, expression, value=None, square=False, selection=None, delay=Fal values = value # we cannot hash arrow arrays values = [vaex.array_types.to_numpy(k) if isinstance(k, vaex.array_types.supported_arrow_array_types) else k for k in values] + progressbar = vaex.utils.progressbars(progress, title="limits") initial_expressions, initial_values = expressions, values expression_values = dict() @@ -1789,7 +1801,7 @@ def limits(self, expression, value=None, square=False, selection=None, delay=Fal else: if isinstance(value, six.string_types): if value == "minmax": - limits = self.minmax(expression, selection=selection, delay=True) + limits = self.minmax(expression, selection=selection, progress=progressbar, delay=True) else: match = re.match(r"([\d.]*)(\D*)", value) if match is None: @@ -2929,7 +2941,8 @@ def evaluate_variable(self, name): else: return self.variables[name] - def evaluate(self, expression, i1=None, i2=None, out=None, selection=None, filtered=True, array_type=None, parallel=True, chunk_size=None): + @docsubst + def evaluate(self, expression, i1=None, i2=None, out=None, selection=None, filtered=True, array_type=None, parallel=True, chunk_size=None, progress=None): """Evaluate an expression, and return a numpy array with the results for the full column or a part of it. Note that this is not how vaex should be used, since it means a copy of the data needs to fit in memory. @@ -2941,15 +2954,17 @@ def evaluate(self, expression, i1=None, i2=None, out=None, selection=None, filte :param int i2: End row index, default is the length of the DataFrame :param ndarray out: Output array, to which the result may be written (may be used to reuse an array, or write to a memory mapped array) + :param progress: {{progress}} :param selection: selection to apply :return: """ if chunk_size is not None: - return self.evaluate_iterator(expression, s1=i1, s2=i2, out=out, selection=selection, filtered=filtered, array_type=array_type, parallel=parallel, chunk_size=chunk_size) + return self.evaluate_iterator(expression, s1=i1, s2=i2, out=out, selection=selection, filtered=filtered, array_type=array_type, parallel=parallel, chunk_size=chunk_size, progress=progress) else: - return self._evaluate_implementation(expression, i1=i1, i2=i2, out=out, selection=selection, filtered=filtered, array_type=array_type, parallel=parallel, chunk_size=chunk_size) + return self._evaluate_implementation(expression, i1=i1, i2=i2, out=out, selection=selection, filtered=filtered, array_type=array_type, parallel=parallel, chunk_size=chunk_size, progress=progress) - def evaluate_iterator(self, expression, s1=None, s2=None, out=None, selection=None, filtered=True, array_type=None, parallel=True, chunk_size=None, prefetch=True): + @docsubst + def evaluate_iterator(self, expression, s1=None, s2=None, out=None, selection=None, filtered=True, array_type=None, parallel=True, chunk_size=None, prefetch=True, progress=None): """Generator to efficiently evaluate expressions in chunks (number of rows). See :func:`DataFrame.evaluate` for other arguments. @@ -2959,22 +2974,25 @@ def evaluate_iterator(self, expression, s1=None, s2=None, out=None, selection=No >>> import vaex >>> df = vaex.example() >>> for i1, i2, chunk in df.evaluate_iterator(df.x, chunk_size=100_000): - ... print(f"Total of {i1} to {i2} = {chunk.sum()}") + ... print(f"Total of {{i1}} to {{i2}} = {{chunk.sum()}}") ... Total of 0 to 100000 = -7460.610158279056 Total of 100000 to 200000 = -4964.85827154921 Total of 200000 to 300000 = -7303.271340043915 Total of 300000 to 330000 = -2424.65234724951 + :param progress: {{progress}} :param prefetch: Prefetch/compute the next chunk in parallel while the current value is yielded/returned. """ - offset = 0 + progressbar = vaex.utils.progressbars(progress, title="evaluate iterator") import concurrent.futures self._fill_filter_mask() + progressbar(0) if not prefetch: # this is the simple implementation for l1, l2, i1, i2 in self._unfiltered_chunk_slices(chunk_size): yield l1, l2, self._evaluate_implementation(expression, i1=i1, i2=i2, out=out, selection=selection, filtered=filtered, array_type=array_type, parallel=parallel, raw=True) + progressbar(l2/len(self)) # But this implementation is faster if the main thread work is single threaded else: with concurrent.futures.ThreadPoolExecutor(1) as executor: @@ -2990,10 +3008,12 @@ def f(i1, i2): previous_chunk = previous.result() current = executor.submit(f, i1, i2) yield previous_l1, previous_l2, previous_chunk + progressbar(previous_l2/len(self)) previous = current previous_l1, previous_l2 = l1, l2 previous_chunk = previous.result() yield previous_l1, previous_l2, previous_chunk + progressbar(previous_l2/len(self)) @docsubst def to_records(self, index=None, selection=None, column_names=None, strings=True, virtual=True, parallel=True, @@ -5356,16 +5376,16 @@ def _graphviz(self, dot=None): @docsubst @stat_1d - def _agg(self, aggregator, binners=tuple(), delay=False): + def _agg(self, aggregator, binners=tuple(), delay=False, progress=None): """ :param delay: {delay} :return: {return_stat_scalar} """ - tasks, result = aggregator.add_tasks(self, binners) + tasks, result = aggregator.add_tasks(self, binners, progress=progress) return self._delay(delay, result) - def _binner(self, expression, limits=None, shape=None, selection=None, delay=False): + def _binner(self, expression, limits=None, shape=None, selection=None, progress=None, delay=False): expression = str(expression) if limits is not None and not isinstance(limits, (tuple, str)): limits = tuple(limits) @@ -5378,7 +5398,7 @@ def _binner(self, expression, limits=None, shape=None, selection=None, delay=Fal @delayed def create_binner(limits): return self._binner_scalar(expression, limits, shape) - binner = create_binner(self.limits(expression, limits, selection=selection, delay=True)) + binner = create_binner(self.limits(expression, limits, selection=selection, progress=progress, delay=True)) return self._delay(delay, binner) def _binner_scalar(self, expression, limits, shape): @@ -5389,7 +5409,7 @@ def _binner_ordinal(self, expression, ordinal_count, min_value=0): dtype = self.data_type(expression) return BinnerOrdinal(expression, min_value, ordinal_count, dtype) - def _create_binners(self, binby, limits, shape, selection=None, delay=False): + def _create_binners(self, binby, limits, shape, selection=None, progress=None, delay=False): if isinstance(binby, (list, tuple)): binbys = binby else: @@ -5405,7 +5425,7 @@ def _create_binners(self, binby, limits, shape, selection=None, delay=False): limits = [] shapes = _expand_shape(shape, len(binbys)) for binby, limits1, shape in zip(binbys, limits, shapes): - binners.append(self._binner(binby, limits1, shape, selection, delay=True)) + binners.append(self._binner(binby, limits1, shape, selection, progress=progress, delay=True)) @delayed def finish(*binners): return binners @@ -6134,7 +6154,7 @@ def _unfiltered_chunk_slices(self, chunk_size): for i1, i2 in vaex.utils.subdivide(logical_length, max_length=chunk_size): yield i1, i2, i1, i2 - def _evaluate_implementation(self, expression, i1=None, i2=None, out=None, selection=None, filtered=True, array_type=None, parallel=True, chunk_size=None, raw=False): + def _evaluate_implementation(self, expression, i1=None, i2=None, out=None, selection=None, filtered=True, array_type=None, parallel=True, chunk_size=None, raw=False, progress=None): """The real implementation of :func:`DataFrame.evaluate` (not returning a generator). :param raw: Whether indices i1 and i2 refer to unfiltered (raw=True) or 'logical' offsets (raw=False) @@ -6236,7 +6256,7 @@ def assign(thread_index, i1, i2, selection_masks, blocks): # for primitive arrays (and no filter/selection) we directly add it to the right place in contiguous numpy array arrays[expr][i1:i2] = blocks[i] if expression_to_evaluate: - df.map_reduce(assign, lambda *_: None, expression_to_evaluate, ignore_filter=False, selection=selection, pre_filter=use_filter, info=True, to_numpy=False) + df.map_reduce(assign, lambda *_: None, expression_to_evaluate, progress=progress, ignore_filter=False, selection=selection, pre_filter=use_filter, info=True, to_numpy=False, name="evaluate") def finalize_result(expression): if expression in chunks_map: # put all chunks in order @@ -6493,7 +6513,7 @@ def export_arrow(self, to, progress=None, chunk_size=default_chunk_size, paralle :param dict fs_options: {fs_options} :return: """ - progressbar = vaex.utils.progressbars(progress) + progressbar = vaex.utils.progressbars(progress, title="export(arrow)") def write(writer): progressbar(0) N = len(self) @@ -6593,7 +6613,7 @@ def export_partitioned(self, path, by, directory_format='{key}={value}', progres for name in by: columns.remove(name) - progressbar = vaex.utils.progressbars(progress) + progressbar = vaex.utils.progressbars(progress, title="export(partitioned)") progressbar(0) groups = self.groupby(by) _, ext, _ = vaex.file.split_ext(path) @@ -6654,7 +6674,7 @@ def write(i, item): df = vaex.from_dict(chunks) df.export(p, chunk_size=None, parallel=False, fs_options=fs_options, fs=fs) return i2 - progressbar = vaex.utils.progressbars(progress) + progressbar = vaex.utils.progressbars(progress, title="export(many)") progressbar(0) length = len(self) def update_progress(offset): @@ -6679,12 +6699,15 @@ def export_hdf5(self, path, byteorder="=", progress=None, chunk_size=default_chu :return: """ from vaex.hdf5.writer import Writer + progressbar = vaex.utils.progressbars(progress, title="export(hdf5)") + progressbar_layout = progressbar.add("layout file structure") + progressbar_write = progressbar.add("write data") with Writer(path=path, group=group, mode=mode, byteorder=byteorder) as writer: - writer.layout(self) + writer.layout(self, progress=progressbar_layout) writer.write( self, chunk_size=chunk_size, - progress=progress, + progress=progressbar_write, column_count=column_count, parallel=parallel, export_threads=writer_threads) @@ -6713,7 +6736,7 @@ def export_csv(self, path, progress=None, chunk_size=default_chunk_size, paralle """ import pandas as pd expressions = self.get_column_names() - progressbar = vaex.utils.progressbars(progress) + progressbar = vaex.utils.progressbars(progress, title="export(csv)") dtypes = self[expressions].dtypes n_samples = len(self) if chunk_size is None: @@ -6761,7 +6784,7 @@ def selected_length(self, selection="default"): # # self.signal_selection_changed.emit(self) @docsubst - def groupby(self, by=None, agg=None, sort=False, assume_sparse='auto', row_limit=None, copy=True, delay=False): + def groupby(self, by=None, agg=None, sort=False, assume_sparse='auto', row_limit=None, copy=True, progress=None, delay=False): """Return a :class:`GroupBy` or :class:`DataFrame` object when agg is not None Examples: @@ -6820,7 +6843,7 @@ def groupby(self, by=None, agg=None, sort=False, assume_sparse='auto', row_limit :return: :class:`DataFrame` or :class:`GroupBy` object. """ from .groupby import GroupBy - groupby = GroupBy(self, by=by, sort=sort, combine=assume_sparse, row_limit=row_limit, copy=copy) + groupby = GroupBy(self, by=by, sort=sort, combine=assume_sparse, row_limit=row_limit, copy=copy, progress=progress) @vaex.delayed def next(_ignore): if agg is None: diff --git a/packages/vaex-core/vaex/docstrings.py b/packages/vaex-core/vaex/docstrings.py index b5a75595cc..d0f2f8ab9a 100644 --- a/packages/vaex-core/vaex/docstrings.py +++ b/packages/vaex-core/vaex/docstrings.py @@ -16,7 +16,7 @@ def docsubst(f): _doc_snippets["selection"] = """Name of selection to use (or True for the 'default'), or all the data (when selection is None or False), or a list of selections""" _doc_snippets["selection1"] = """Name of selection to use (or True for the 'default'), or all the data (when selection is None or False)""" _doc_snippets["delay"] = """Do not return the result, but a proxy for asychronous calculations (currently only for internal use)""" -_doc_snippets["progress"] = """A callable that takes one argument (a floating point value between 0 and 1) indicating the progress, calculations are cancelled when this callable returns False""" +_doc_snippets["progress"] = """True to display a progress bar, or a callable that takes one argument (a floating point value between 0 and 1) indicating the progress, calculations are cancelled when this callable returns False""" _doc_snippets["expression_limits"] = _doc_snippets["expression"] _doc_snippets["grid"] = """If grid is given, instead if compuation a statistic given by what, use this Nd-numpy array instead, this is often useful when a custom computation/statistic is calculated, but you still want to use the plotting machinery.""" _doc_snippets["edges"] = """Currently for internal use only (it includes nan's and values outside the limits at borders, nan and 0, smaller than at 1, and larger at -1""" diff --git a/packages/vaex-core/vaex/expression.py b/packages/vaex-core/vaex/expression.py index 54ae3c06dc..185c4c8d5c 100644 --- a/packages/vaex-core/vaex/expression.py +++ b/packages/vaex-core/vaex/expression.py @@ -1003,7 +1003,8 @@ def map(thread_index, i1, i2, selection_masks, blocks): return 0 def reduce(a, b): return a+b - self.ds.map_reduce(map, reduce, [self.expression], delay=False, progress=progress, name='value_counts', info=True, to_numpy=False) + progressbar = vaex.utils.progressbars(progress, title="value counts") + self.ds.map_reduce(map, reduce, [self.expression], delay=False, progress=progressbar, name='value_counts', info=True, to_numpy=False) counters = [k for k in counters if k is not None] counter = counters[0] for other in counters[1:]: @@ -1071,18 +1072,19 @@ def reduce(a, b): return Series(counts, index=keys) @docsubst - def unique(self, dropna=False, dropnan=False, dropmissing=False, selection=None, axis=None, array_type='list', delay=False): + def unique(self, dropna=False, dropnan=False, dropmissing=False, selection=None, axis=None, array_type='list', progress=None, delay=False): """Returns all unique values. :param dropmissing: do not count missing values :param dropnan: do not count nan values :param dropna: short for any of the above, (see :func:`Expression.isna`) :param bool axis: Axis over which to determine the unique elements (None will flatten arrays or lists) + :param progress: {progress} :param bool array_type: {array_type} """ - return self.ds.unique(self, dropna=dropna, dropnan=dropnan, dropmissing=dropmissing, selection=selection, array_type=array_type, axis=axis, delay=delay) + return self.ds.unique(self, dropna=dropna, dropnan=dropnan, dropmissing=dropmissing, selection=selection, array_type=array_type, axis=axis, progress=progress, delay=delay) - def nunique(self, dropna=False, dropnan=False, dropmissing=False, selection=None, axis=None, delay=False): + def nunique(self, dropna=False, dropnan=False, dropmissing=False, selection=None, axis=None, progress=None, delay=False): """Counts number of unique values, i.e. `len(df.x.unique()) == df.x.nunique()`. :param dropmissing: do not count missing values @@ -1095,7 +1097,7 @@ def key_function(): return f'nunique-{fp}' @vaex.cache._memoize(key_function=key_function, delay=delay) def f(): - value = self.unique(dropna=dropna, dropnan=dropnan, dropmissing=dropmissing, selection=selection, axis=axis, array_type=None, delay=delay) + value = self.unique(dropna=dropna, dropnan=dropnan, dropmissing=dropmissing, selection=selection, axis=axis, array_type=None, progress=progress, delay=delay) if delay: return value.then(len) else: diff --git a/packages/vaex-core/vaex/groupby.py b/packages/vaex-core/vaex/groupby.py index 70590f92b8..d70cb82002 100644 --- a/packages/vaex-core/vaex/groupby.py +++ b/packages/vaex-core/vaex/groupby.py @@ -160,7 +160,7 @@ def _create_binner(self, df): class Grouper(BinnerBase): """Bins an expression to a set of unique bins, like an SQL like groupby.""" - def __init__(self, expression, df=None, sort=False, pre_sort=True, row_limit=None, df_original=None, materialize_experimental=False): + def __init__(self, expression, df=None, sort=False, pre_sort=True, row_limit=None, df_original=None, materialize_experimental=False, progress=None): self.df = df or expression.ds self.sort = sort self.pre_sort = pre_sort @@ -172,6 +172,7 @@ def __init__(self, expression, df=None, sort=False, pre_sort=True, row_limit=Non # make sure it's an expression self.expression = self.df[_ensure_string_from_expression(self.expression)] self.label = self.expression._label + self.progressbar = vaex.utils.progressbars(progress, title=f"grouper: {repr(self.label)}" ) dtype = self.expression.dtype if materialize_experimental: set, values = df_original._set(self.expression, unique_limit=row_limit, return_inverse=True) @@ -234,7 +235,7 @@ def process(set): # for datetimes, we converted to int if dtype.is_datetime: self.bin_values = dtype.create_array(self.bin_values) - self._promise = process(df_original._set(self.expression, unique_limit=row_limit, delay=True)) + self._promise = process(df_original._set(self.expression, unique_limit=row_limit, delay=True, progress=self.progressbar)) def _create_binner(self, df): # TODO: we modify the dataframe in place, this is not nice @@ -247,14 +248,13 @@ def _create_binner(self, df): self.binby_expression = '_ordinal_values(%s, %s)' % (self.expression, self.setname) self.binner = self.df._binner_ordinal(self.binby_expression, self.N) - class GrouperCombined(Grouper): - def __init__(self, expression, df, multipliers, parents, sort, row_limit=None): + def __init__(self, expression, df, multipliers, parents, sort, row_limit=None, progress=None): '''Will group by 1 expression, which is build up from multiple expressions. Used in the sparse/combined group by. ''' - super().__init__(expression, df, sort=sort, row_limit=row_limit) + super().__init__(expression, df, sort=sort, row_limit=row_limit, progress=progress) assert len(multipliers) == len(parents) assert multipliers[-1] == 1 @@ -263,6 +263,7 @@ def __init__(self, expression, df, multipliers, parents, sort, row_limit=None): self.expression = expression # efficient way to find the original bin values (parent.bin_value) from the 'compressed' # self.bin_values + progressbar = self.progressbar.add("extract labels from sparse set") @vaex.delayed def process(_ignore): df = vaex.from_dict({'row': vaex.vrange(0, self.N, dtype='i8'), 'bin_value': self.bin_values}) @@ -272,7 +273,7 @@ def process(_ignore): df[f'index_{i}'] = df[f'leftover_{i-1}'] // multipliers[i] df[f'leftover_{i}'] = df[f'leftover_{i-1}'] % multipliers[i] columns = [f'index_{i}' for i in range(len(multipliers))] - indices_parents = df.evaluate(columns) + indices_parents = df.evaluate(columns, progress=progressbar) def compress(ar): if vaex.dtype_of(ar).kind == 'i': ar = vaex.array_types.to_numpy(ar) @@ -436,11 +437,12 @@ def _create_binner(self, df): self.binby_expression = "_ordinal_values(%s, %s) %% %s" % (self.expression, self.setname, self.N) self.binner = self.df._binner_ordinal(self.binby_expression, self.N) -def _combine(df, groupers, sort, row_limit=None): +def _combine(df, groupers, sort, row_limit=None, progress=None): for grouper in groupers: if isinstance(grouper, Binner): raise NotImplementedError('Cannot combined Binner with other groupers yet') + progressbar = vaex.utils.progressbars(progress, title="find sparse entries / compress") groupers = groupers.copy() max_count_64bit = 2**63-1 first = groupers.pop(0) @@ -476,7 +478,7 @@ def _combine(df, groupers, sort, row_limit=None): binby_expression = binby_expression * cumulative_counts[i+1] binby_expressions[i] = binby_expression expression = reduce(operator.add, binby_expressions) - grouper = GrouperCombined(expression, df, multipliers=cumulative_counts[1:], parents=combine_now, sort=sort, row_limit=row_limit) + grouper = GrouperCombined(expression, df, multipliers=cumulative_counts[1:], parents=combine_now, sort=sort, row_limit=row_limit, progress=progressbar) if combine_later: @vaex.delayed def combine(_ignore): @@ -490,7 +492,7 @@ def combine(_ignore): class GroupByBase(object): - def __init__(self, df, by, sort=False, combine=False, expand=True, row_limit=None, copy=True): + def __init__(self, df, by, sort=False, combine=False, expand=True, row_limit=None, copy=True, progress=None): '''Note that row_limit only works in combination with combine=True''' df_original = df if copy: @@ -498,6 +500,9 @@ def __init__(self, df, by, sort=False, combine=False, expand=True, row_limit=Non self.df = df self.sort = sort self.expand = expand # keep as pyarrow struct? + self.progressbar = vaex.utils.progressbars(progress, title="groupby/binby") + self.progressbar_groupers = self.progressbar.add("groupers") + self.progressbar_agg = self.progressbar.add("aggregation") if not isinstance(by, collections_abc.Iterable)\ or isinstance(by, six.string_types): @@ -515,7 +520,7 @@ def __init__(self, df, by, sort=False, combine=False, expand=True, row_limit=Non if dtype == np.dtype('uint8') or dtype == np.dtype('bool'): by_value = BinnerInteger(expression) # doesn't modify, always sorted else: - by_value = Grouper(expression, sort=sort, row_limit=row_limit, df_original=df_original) + by_value = Grouper(expression, sort=sort, row_limit=row_limit, df_original=df_original, progress=self.progressbar_groupers) self.by.append(by_value) @vaex.delayed def possible_combine(*binner_promises): @@ -529,7 +534,7 @@ def set_combined(combined): self.by = [combined] self.combine = True if combine is True and len(self.by) >= 2: - promise = set_combined(_combine(self.df, self.by, sort=sort, row_limit=row_limit)) + promise = set_combined(_combine(self.df, self.by, sort=sort, row_limit=row_limit, progress=self.progressbar_groupers)) elif combine == 'auto' and len(self.by) >= 2: cells = product([grouper.N for grouper in self.by]) dim = len(self.by) @@ -539,7 +544,7 @@ def set_combined(combined): # we want each cell to have a least 10x occupacy if occupancy < 10: logger.info(f'Combining {len(self.by)} groupers into 1') - promise = set_combined(_combine(self.df, self.by, sort=sort, row_limit=row_limit)) + promise = set_combined(_combine(self.df, self.by, sort=sort, row_limit=row_limit, progress=self.progressbar_groupers)) self.combine = True else: self.combine = False @@ -580,7 +585,7 @@ def add(aggregate, column_name=None, override_name=None): if column_name is None or override_name is not None: column_name = aggregate.pretty_name(override_name, df) aggregate.edges = True # is this ok to override? - values = df._agg(aggregate, self.binners, delay=_USE_DELAY) + values = df._agg(aggregate, self.binners, delay=_USE_DELAY, progress=self.progressbar_agg) grids[column_name] = values if isinstance(aggregate, vaex.agg.AggregatorDescriptorBasic)\ and aggregate.name == 'AggCount'\ @@ -739,8 +744,8 @@ def process(arrays): class GroupBy(GroupByBase): """Implementation of the binning and aggregation of data, see :method:`groupby`.""" - def __init__(self, df, by, sort=False, combine=False, expand=True, row_limit=None, copy=True): - super(GroupBy, self).__init__(df, by, sort=sort, combine=combine, expand=expand, row_limit=row_limit, copy=copy) + def __init__(self, df, by, sort=False, combine=False, expand=True, row_limit=None, copy=True, progress=None): + super(GroupBy, self).__init__(df, by, sort=sort, combine=combine, expand=expand, row_limit=row_limit, copy=copy, progress=progress) def agg(self, actions, delay=False): # TODO: this basically forms a cartesian product, we can do better, use a @@ -756,7 +761,7 @@ def aggregate(promise_by): if counts is None: # TODO: it seems this path is never tested count_agg = vaex.agg.count(edges=True) - counts = self.df._agg(count_agg, self.binners, delay=_USE_DELAY) + counts = self.df._agg(count_agg, self.binners, delay=_USE_DELAY, progress=self.progressbar_agg) arrays = delayed_dict(arrays) return counts, arrays diff --git a/packages/vaex-core/vaex/misc/progressbar.py b/packages/vaex-core/vaex/misc/progressbar.py index 510a1f8cd7..58a79ba747 100644 --- a/packages/vaex-core/vaex/misc/progressbar.py +++ b/packages/vaex-core/vaex/misc/progressbar.py @@ -5,10 +5,11 @@ class ProgressBarBase(object): - def __init__(self, min_value, max_value, format="%(percentage) 6.2f%% %(timeinfo)s cpu: %(cpu_usage)d%%"): + def __init__(self, min_value, max_value, title='vaex', format="%(percentage) 6.2f%% %(timeinfo)s cpu: %(cpu_usage)d%%"): self.min_value = min_value self.max_value = max_value self.format = format + self.title = title self.value = self.min_value self.fraction = 0 self.prevfraction = 0 @@ -54,7 +55,7 @@ def info(self): timeinfo = "estimated time: % 8.2fs = % 4.1fm = % 2.1fh" % (seconds, minutes, hours) else: timeinfo = "estimated time: unknown " - return {"percentage":percentage, "timeinfo":timeinfo, "cpu_usage": cpu_usage} + return {"percentage":percentage, "timeinfo":timeinfo, "cpu_usage": cpu_usage, "title": self.title} def __repr__(self): output = '' @@ -74,7 +75,7 @@ class ProgressBar(ProgressBarBase): By default, the progress bar writes to stderr, so it doesn't clutter up log files when piping stdout """ - def __init__(self, min_value, max_value, format="%(percentage) 6.2f%% %(timeinfo)s", width=40, barchar="#", emptychar="-", output=sys.stdout): + def __init__(self, min_value, max_value, title="vaex", format="%(percentage) 6.2f%% %(timeinfo)s", width=40, barchar="#", emptychar="-", output=sys.stdout): """ :param min_value: minimum value for update(..) :param format: format specifier for the output @@ -82,7 +83,7 @@ def __init__(self, min_value, max_value, format="%(percentage) 6.2f%% %(timeinfo :param barchar: character used to print the bar :param output: where to write the output to """ - super(ProgressBar, self).__init__(min_value, max_value, format=format) + super(ProgressBar, self).__init__(min_value, max_value, format=format, title=title) self.width = width self.barchar = barchar self.emptychar = emptychar @@ -99,13 +100,12 @@ def finish(self): print(repr(self), file=self.output, end=' ') self.output.flush() - def __repr__(self): output = '' self.update_fraction() count = int(round(self.fraction * self.width)) space = self.width - count - bar = "[" + (self.barchar * count) + (self.emptychar * space) + "]" + bar = self.title + " [" + (self.barchar * count) + (self.emptychar * space) + "]" output = "\r" + bar + super(ProgressBar, self).__repr__() if self.fraction == 1: output += "\n" # last time print a newline char @@ -113,7 +113,7 @@ def __repr__(self): class ProgressBarWidget(ProgressBarBase): - def __init__(self, min_value, max_value, name=None): + def __init__(self, min_value, max_value, title=None): super(ProgressBarWidget, self).__init__(min_value, max_value) import ipywidgets as widgets from IPython.display import display @@ -133,3 +133,64 @@ def update(self, value): def finish(self): self(self.max_value) + + +class ProgressBarRich(ProgressBarBase): + def __init__(self, min_value, max_value, title=None, progress=None, indent=0, parent=None): + super(ProgressBarRich, self).__init__(min_value, max_value) + import rich.progress + import rich.table + import rich.tree + self.console = rich.console.Console(record=True) + self.parent = parent + if progress is None: + self.progress = rich.progress.Progress( + rich.progress.SpinnerColumn(), + rich.progress.TextColumn("[progress.description]{task.description}"), + rich.progress.BarColumn(), + rich.progress.TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + rich.progress.TimeRemainingColumn(), + rich.progress.TimeElapsedColumn(), + console=self.console, + transient=False, + expand=False, + ) + else: + self.progress = progress + if parent is None: + self.node = rich.tree.Tree(self.progress) + from rich.live import Live + self.live = Live(self.node, refresh_per_second=5, console=self.console) + self.live.start() + else: + self.node = parent.add(self.progress) + # we do 1000 discrete steps + self.steps = 0 + self.indent = indent + + padding = max(0, 50 - (self.indent * 4) - len(title)) + self.task = self.progress.add_task(f"[red]{title}" + (" " * padding), total=1000, start=False) + self.started = False + self.subtasks = [] + + def add_child(self, parent, task, title): + return ProgressBarRich(self.min_value, self.max_value, title, indent=self.indent+1, parent=self.node) + + def __call__(self, value): + if not self.started: + self.progress.start_task(self.task) + if value > self.value: + steps = int(value * 1000) + delta = steps - self.steps + if delta > 0: + self.progress.update(self.task, advance=delta, refresh=False) + self.steps = steps + self.value = value + + def update(self, value): + self(value) + + def finish(self): + self(self.max_value) + if self.parent is None: + self.live.stop() diff --git a/packages/vaex-core/vaex/utils.py b/packages/vaex-core/vaex/utils.py index d0013d2815..b223128cfa 100644 --- a/packages/vaex-core/vaex/utils.py +++ b/packages/vaex-core/vaex/utils.py @@ -276,24 +276,29 @@ def _progressbar_progressbar2(type=None, name="processing", max_value=1): # FormatLabel('Processed: %(value)d lines (in: %(elapsed)s)') -def _progressbar_vaex(type=None, name="processing", max_value=1): +def _progressbar_vaex(type=None, title="processing", max_value=1): import vaex.misc.progressbar as pb - return pb.ProgressBar(0, 1) + return pb.ProgressBar(0, 1, title=title) -def _progressbar_widget(type=None, name="processing", max_value=1): +def _progressbar_widget(type=None, title="processing", max_value=1): import vaex.misc.progressbar as pb - return pb.ProgressBarWidget(0, 1, name=name) + return pb.ProgressBarWidget(0, 1, title=title) + +def _progressbar_rich(type=None, title="processing", max_value=1): + import vaex.misc.progressbar as pb + return pb.ProgressBarRich(0, 1, title=title) _progressbar_typemap = {} _progressbar_typemap['progressbar2'] = _progressbar_progressbar2 _progressbar_typemap['vaex'] = _progressbar_vaex _progressbar_typemap['widget'] = _progressbar_widget +_progressbar_typemap['rich'] = _progressbar_rich def progressbar(type_name=None, title="processing", max_value=1): - type_name = type_name or 'vaex' - return _progressbar_typemap[type_name](name=title) + type_name = type_name or _progressbar_type_default + return _progressbar_typemap[type_name](title=title) def progressbar_widget(): @@ -340,15 +345,18 @@ def __repr__(self): def add(self, name=None): pb = _progressbar_wrapper_sum(parent=self, name=name) + if self.bar and hasattr(self.bar, 'add_child'): + pb.bar = self.bar.add_child(pb, None, name) self.children.append(pb) + self.finished = False + self.fraction = sum([c.fraction for c in self.children]) / len(self.children) + self(self.fraction) return pb def add_task(self, task, name=None): pb = self.add(name) pb.oncancel = task.cancel task.signal_progress.connect(pb) - if self.bar and hasattr(self.bar, 'add_child'): - self.bar.add_child(pb, task, name) def __call__(self, fraction): if self.cancelled: @@ -368,8 +376,8 @@ def __call__(self, fraction): elif fraction != 1: if self.bar: self.bar.update(fraction) - if self.next: - result = self.next(fraction) + if self.next: + result = self.next(fraction) if self.parent: assert self in self.parent.children result = self.parent(None) in [None, True] and result # fraction is not used anyway.. @@ -382,9 +390,12 @@ def status(self, name): pass -def progressbars(f=True, next=None, name=None): +def progressbars(f=True, next=None, name=None, title=None): if isinstance(f, _progressbar_wrapper_sum): - return f + if title is None: + return f + else: + return f.add(title) if callable(f): next = f f = False @@ -392,9 +403,9 @@ def progressbars(f=True, next=None, name=None): return _progressbar_wrapper_sum(next=next, name=name) else: if f is True: - return _progressbar_wrapper_sum(bar=progressbar(), next=next, name=name) + return _progressbar_wrapper_sum(bar=progressbar(title=title), next=next, name=name) elif isinstance(f, six.string_types): - return _progressbar_wrapper_sum(bar=progressbar(f), next=next, name=name) + return _progressbar_wrapper_sum(bar=progressbar(f, title=title), next=next, name=name) else: return _progressbar_wrapper_sum(next=next, name=name) @@ -1071,3 +1082,5 @@ def dropnan(sequence, expect=None): if expect is not None: assert len(sequence) - len(non_nan) == 1, "expected 1 nan value" return original_type(non_nan) + +_progressbar_type_default = get_env_type(str, 'VAEX_PROGRESS_TYPE', 'vaex') diff --git a/packages/vaex-hdf5/vaex/hdf5/writer.py b/packages/vaex-hdf5/vaex/hdf5/writer.py index cbe65a7fdb..d7514cf4c7 100644 --- a/packages/vaex-hdf5/vaex/hdf5/writer.py +++ b/packages/vaex-hdf5/vaex/hdf5/writer.py @@ -40,7 +40,7 @@ def __enter__(self): def __exit__(self, *args): self.close() - def layout(self, df): + def layout(self, df, progress=None): assert not self._layout_called, "Layout called twice" N = len(df) if N == 0: @@ -48,18 +48,24 @@ def layout(self, df): column_names = df.get_column_names() logger.debug("layout columns(hdf5): %r" % column_names) + progressbar = vaex.utils.progressbars(progress, title="layout(hdf5)") + progressbar_strings = progressbar.add("variable-length storage requirements") + progressbar_count = progressbar.add("count missing values") + progressbar_reserve = progressbar.add("reserve disk space to be mmapped") self.column_writers = {} dtypes = df.schema() - str_byte_length = {name:df[name].str.byte_length().sum(delay=True) for name, dtype in dtypes.items() if dtype.is_string} - str_count = {name:df.count(df[name], delay=True) for name, dtype in dtypes.items() if dtype.is_string} + str_byte_length = {name:df[name].str.byte_length().sum(delay=True, progress=progressbar_strings) for name, dtype in dtypes.items() if dtype.is_string} + str_count = {name:df.count(df[name], delay=True, progress=progressbar_count) for name, dtype in dtypes.items() if dtype.is_string} df.execute() + progressbar_count(1) + progressbar_strings(1) str_byte_length = {k: v.get() for k, v in str_byte_length.items()} has_null_str = {k: N != v.get() for k, v in str_count.items()} has_null = {name:df.is_masked(name) for name, dtype in dtypes.items() if not dtype.is_string} - for name in list(column_names): + for i, name in enumerate(list(column_names)): dtype = dtypes[name] shape = (N, ) + df._shape_of(name)[1:] @@ -72,6 +78,7 @@ def layout(self, df): logger.exception("error creating dataset for %r, with type %r " % (name, dtype)) del self.columns[name] column_names.remove(name) + progressbar_reserve((i+1)/len(column_names)) self.columns.attrs["column_order"] = ",".join(column_names) # flush out the content @@ -101,8 +108,9 @@ def write(self, df, chunk_size=int(1e5), parallel=True, progress=None, column_co logger.debug("writing columns(hdf5): %r" % column_names) # actual writing part - progressbar = vaex.utils.progressbars(progress) + progressbar = vaex.utils.progressbars(progress, title="exporting") progressbar(0) + progressbar_columns = {k: progressbar.add(f"write: {k}") for k in column_names} total = N * len(column_names) written = 0 if export_threads: @@ -112,6 +120,7 @@ def write(self, df, chunk_size=int(1e5), parallel=True, progress=None, column_co def write(arg): i, name = arg self.column_writers[name].write(values[i]) + progressbar_columns[name](self.column_writers[name].progress) # for i, name in enumerate(column_names_subgroup): if export_threads: list(pool.map(write, enumerate(column_names_subgroup))) @@ -153,6 +162,10 @@ def __init__(self, h5parent, name, dtype, shape, has_null, byteorder="="): else: self.mask = None + @property + def progress(self): + return self.to_offset/self.count + def mmap(self, mmap, file): self.to_array = h5mmap(mmap if USE_MMAP else None, file, self.array, self.mask) @@ -215,6 +228,10 @@ def __init__(self, h5parent, name, dtype, shape, byte_length, has_null): self.null_bitmap_array = None # TODO: masked support ala arrow? + @property + def progress(self): + return self.to_offset/self.count + def mmap(self, mmap, file): # from now on, we only work with the mmapped array diff --git a/packages/vaex-ml/vaex/ml/catboost.py b/packages/vaex-ml/vaex/ml/catboost.py index a2b65b93a7..a08727c8fe 100644 --- a/packages/vaex-ml/vaex/ml/catboost.py +++ b/packages/vaex-ml/vaex/ml/catboost.py @@ -130,7 +130,7 @@ def fit(self, df, evals=None, early_stopping_rounds=None, verbose_eval=None, plo # Set up progressbar n_samples = len(df) - progressbar = vaex.utils.progressbars(progress) + progressbar = vaex.utils.progressbars(progress, title="fit(catboost)") column_names = self.features + [self.target] iterator = df[column_names].to_pandas_df(chunk_size=self.batch_size) diff --git a/packages/vaex-ml/vaex/ml/incubator/river.py b/packages/vaex-ml/vaex/ml/incubator/river.py index a61de1a888..e067791b7c 100644 --- a/packages/vaex-ml/vaex/ml/incubator/river.py +++ b/packages/vaex-ml/vaex/ml/incubator/river.py @@ -124,7 +124,7 @@ def fit(self, df, progress=None): n_samples = len(df) - progressbar = vaex.utils.progressbars(progress) + progressbar = vaex.utils.progressbars(progress, title="fit(river)") # Portions of the DataFrame to evaluate expressions = self.features + [self.target] diff --git a/packages/vaex-ml/vaex/ml/sklearn.py b/packages/vaex-ml/vaex/ml/sklearn.py index cb9b8d44c7..798ac3d3ca 100644 --- a/packages/vaex-ml/vaex/ml/sklearn.py +++ b/packages/vaex-ml/vaex/ml/sklearn.py @@ -226,7 +226,7 @@ def fit(self, df, progress=None): n_samples = len(df) - progressbar = vaex.utils.progressbars(progress) + progressbar = vaex.utils.progressbars(progress, title="fit(sklearn)") # Portions of the DataFrame to evaluate expressions = self.features + [self.target] diff --git a/packages/vaex-ml/vaex/ml/transformations.py b/packages/vaex-ml/vaex/ml/transformations.py index 8eb8147063..c3fe3ac784 100644 --- a/packages/vaex-ml/vaex/ml/transformations.py +++ b/packages/vaex-ml/vaex/ml/transformations.py @@ -173,7 +173,7 @@ def fit(self, df, progress=None): self.n_components = self.n_components or len(self.features) n_samples = len(df) - progressbar = vaex.utils.progressbars(progress) + progressbar = vaex.utils.progressbars(progress, title="fit(PCA)") pca = sklearn.decomposition.IncrementalPCA(n_components=self.n_components, batch_size=self.batch_size, whiten=self.whiten) diff --git a/tests/agg_test.py b/tests/agg_test.py index 306e3e84ea..f6e50312ec 100644 --- a/tests/agg_test.py +++ b/tests/agg_test.py @@ -174,7 +174,7 @@ def test_count_1d_ordinal(): bins = 5 binner = df._binner_ordinal('x', 5) agg = vaex.agg.count(edges=True) - tasks, result = agg.add_tasks(df, (binner,)) + tasks, result = agg.add_tasks(df, (binner,), progress=False) df.execute() assert result.get().tolist() == [0, 2, 1, 1, 0, 0, 1, 1] diff --git a/tests/execution_test.py b/tests/execution_test.py index af8eda2233..f2f8b5a88c 100644 --- a/tests/execution_test.py +++ b/tests/execution_test.py @@ -126,13 +126,13 @@ def test_merge_aggregation_tasks(): binners = df._create_binners('x', [0.5, 2.5], 2) binners2 = df._create_binners('x', [0.5, 2.5], 2) assert len(binners) == 1 - vaex.agg.count().add_tasks(df, binners) + vaex.agg.count().add_tasks(df, binners, progress=False) assert len(df.executor.tasks) == 1 assert binners is not binners2 assert binners[0] is not binners2[0] assert binners == binners2 assert binners[0] == binners2[0] - vaex.agg.sum('y').add_tasks(df, binners) + vaex.agg.sum('y').add_tasks(df, binners, progress=False) assert len(df.executor.tasks) == 2 tasks = df.executor._pop_tasks() assert len(tasks) == 2 @@ -147,8 +147,8 @@ def test_merge_same_aggregation_tasks(): binners2 = df._create_binners('x', [0.5, 2.5], 2) assert len(binners) == 1 # these two aggregations should be merged into 1 subtask - [task1], result1 = vaex.agg.count().add_tasks(df, binners) - [task2], result2 = vaex.agg.count().add_tasks(df, binners) + [task1], result1 = vaex.agg.count().add_tasks(df, binners, progress=False) + [task2], result2 = vaex.agg.count().add_tasks(df, binners, progress=False) assert len(df.executor.tasks) == 1 df.execute() assert task1 is task2