Skip to content

Commit

Permalink
✨ improved progress support
Browse files Browse the repository at this point in the history
 * progress is now an argument to vaex.open to be used in combination with convert
 * more titles for progress values
 * rich progress support
  • Loading branch information
maartenbreddels committed Nov 25, 2021
1 parent e2db557 commit a59c3ad
Show file tree
Hide file tree
Showing 16 changed files with 278 additions and 124 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/pythonpackage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ jobs:
run: |
python -c "import vaex; df = vaex.example()"
- name: Test comand line
run: |
vaex convert ~/.vaex/data/helmi-dezeeuw-2000-FeH-v2-10percent.hdf5 test.parquet
pip install rich
VAEX_PROGRESS_TYPE=rich vaex convert ~/.vaex/data/helmi-dezeeuw-2000-FeH-v2-10percent.hdf5 test.parquet
- name: Test server
run: |
vaex server --add-example --port 9999&
Expand Down
23 changes: 15 additions & 8 deletions packages/vaex-core/vaex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import vaex.dataframe
import vaex.dataset
from vaex.docstrings import docsubst
from vaex.registry import register_function
from vaex import functions, struct
from . import stat
Expand Down Expand Up @@ -93,7 +94,8 @@ def app(*args, **kwargs):
return vaex.ui.main.VaexApp()


def open(path, convert=False, shuffle=False, fs_options={}, fs=None, *args, **kwargs):
@docsubst
def open(path, convert=False, progress=None, shuffle=False, fs_options={}, fs=None, *args, **kwargs):
"""Open a DataFrame from file given by path.
Example:
Expand All @@ -104,6 +106,7 @@ def open(path, convert=False, shuffle=False, fs_options={}, fs=None, *args, **kw
:param str or list path: local or absolute path to file, or glob string, or list of paths
:param convert: Uses `dataframe.export` when convert is a path. If True, ``convert=path+'.hdf5'``
The conversion is skipped if the input file or conversion argument did not change.
:param progress: (_Only applies when convert is not False_) {progress}
:param bool shuffle: shuffle converted DataFrame or not
:param dict fs_options: Extra arguments passed to an optional file system if needed:
* Amazon AWS S3
Expand Down Expand Up @@ -138,9 +141,9 @@ def open(path, convert=False, shuffle=False, fs_options={}, fs=None, *args, **kw
Examples:
>>> df = vaex.open('s3://vaex/taxi/yellow_taxi_2015_f32s.hdf5', fs_options={'anonymous': True})
>>> df = vaex.open('s3://vaex/taxi/yellow_taxi_2015_f32s.hdf5', fs_options={{'anonymous': True}})
>>> df = vaex.open('s3://vaex/taxi/yellow_taxi_2015_f32s.hdf5?anon=true')
>>> df = vaex.open('s3://mybucket/path/to/file.hdf5', fs_options={'access_key': my_key, 'secret_key': my_secret_key})
>>> df = vaex.open('s3://mybucket/path/to/file.hdf5', fs_options={{'access_key': my_key, 'secret_key': my_secret_key}})
>>> df = vaex.open(f's3://mybucket/path/to/file.hdf5?access_key={{my_key}}&secret_key={{my_secret_key}}')
>>> df = vaex.open('s3://mybucket/path/to/file.hdf5?profile=myproject')
Expand All @@ -154,7 +157,7 @@ def open(path, convert=False, shuffle=False, fs_options={}, fs=None, *args, **kw
Examples:
>>> df = vaex.open('gs://vaex-data/airlines/us_airline_data_1988_2019.hdf5', fs_options={'token': None})
>>> df = vaex.open('gs://vaex-data/airlines/us_airline_data_1988_2019.hdf5', fs_options={{'token': None}})
>>> df = vaex.open('gs://vaex-data/airlines/us_airline_data_1988_2019.hdf5?token=anon')
>>> df = vaex.open('gs://vaex-data/testing/xys.hdf5?token=anon&cache=False')
"""
Expand Down Expand Up @@ -212,12 +215,13 @@ def open(path, convert=False, shuffle=False, fs_options={}, fs=None, *args, **kw
# # naked_path, _ = vaex.file.split_options(path, fs_options)
_, ext, _ = vaex.file.split_ext(path)
if ext == '.csv': # special case for csv
return vaex.from_csv(path, fs_options=fs_options, fs=fs, convert=convert, **kwargs)
return vaex.from_csv(path, fs_options=fs_options, fs=fs, convert=convert, progress=progress, **kwargs)
if convert:
path_output = convert if isinstance(convert, str) else filename_hdf5
vaex.convert.convert(
path_input=path, fs_options_input=fs_options, fs_input=fs,
path_output=path_output, fs_options_output=fs_options, fs_output=fs,
progress=progress,
*args, **kwargs
)
ds = vaex.dataset.open(path_output, fs_options=fs_options, fs=fs, **kwargs)
Expand All @@ -242,7 +246,7 @@ def open(path, convert=False, shuffle=False, fs_options={}, fs=None, *args, **kw
if convert:
if shuffle:
df = df.shuffle()
df.export_hdf5(filename_hdf5)
df.export_hdf5(filename_hdf5, progress=progress)
df = vaex.open(filename_hdf5)

if df is None:
Expand Down Expand Up @@ -499,7 +503,8 @@ def from_json(path_or_buffer, orient=None, precise_float=False, lines=False, cop
copy_index=copy_index)


def from_csv(filename_or_buffer, copy_index=False, chunk_size=None, convert=False, fs_options={}, fs=None, **kwargs):
@docsubst
def from_csv(filename_or_buffer, copy_index=False, chunk_size=None, convert=False, fs_options={}, fs=None, progress=None, **kwargs):
"""
Read a CSV file as a DataFrame, and optionally convert to an hdf5 file.
Expand All @@ -511,13 +516,14 @@ def from_csv(filename_or_buffer, copy_index=False, chunk_size=None, convert=Fals
>>> import vaex
>>> for i, df in enumerate(vaex.from_csv('taxi.csv', chunk_size=100_000)):
>>> df = df[df.passenger_count < 6]
>>> df.export_hdf5(f'taxi_{i:02}.hdf5')
>>> df.export_hdf5(f'taxi_{{i:02}}.hdf5')
:param bool or str convert: convert files to an hdf5 file for optimization, can also be a path. The CSV
file will be read in chunks: either using the provided chunk_size argument, or a default size. Each chunk will
be saved as a separate hdf5 file, then all of them will be combined into one hdf5 file. So for a big CSV file
you will need at least double of extra space on the disk. Default chunk_size for converting is 5 million rows,
which corresponds to around 1Gb memory on an example of NYC Taxi dataset.
:param progress: (_Only applies when convert is not False_) {progress}
:param kwargs: extra keyword arguments, currently passed to Pandas read_csv function, but the implementation might
change in future versions.
:returns: DataFrame
Expand All @@ -536,6 +542,7 @@ def from_csv(filename_or_buffer, copy_index=False, chunk_size=None, convert=Fals
path_output=path_output, fs_options_output=fs_options, fs_output=fs,
chunk_size=chunk_size,
copy_index=copy_index,
progress=progress,
**kwargs
)
return open(path_output, fs_options=fs_options, fs=fs)
Expand Down
38 changes: 25 additions & 13 deletions packages/vaex-core/vaex/agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ def edges(self, value):
self.agg1.edges = value
self.agg2.edges = value

def add_tasks(self, df, binners):
tasks1, result1 = self.agg1.add_tasks(df, binners)
tasks2, result2 = self.agg2.add_tasks(df, binners)
def add_tasks(self, df, binners, progress):
progressbar = vaex.utils.progressbars(progress, title=repr(self))
tasks1, result1 = self.agg1.add_tasks(df, binners, progress=progressbar)
tasks2, result2 = self.agg2.add_tasks(df, binners, progress=progressbar)
@vaex.delayed
def finish(value1, value2):
return self.finish(value1, value2)
Expand Down Expand Up @@ -164,8 +165,9 @@ def edges(self):
def edges(self, value):
self.agg.edges = value

def add_tasks(self, df, binners):
tasks, result = self.agg.add_tasks(df, binners)
def add_tasks(self, df, binners, progress):
progressbar = vaex.utils.progressbars(progress, title=repr(self))
tasks, result = self.agg.add_tasks(df, binners, progress=progressbar)
@vaex.delayed
def finish(value):
return self.finish(value)
Expand Down Expand Up @@ -223,6 +225,12 @@ def __init__(self, name, expression, short_name, multi_args=False, agg_args=[],
else:
self.expressions = expression

def __repr__(self):
if self.agg_args:
return 'vaex.agg.{}({!r}, {})'.format(self.short_name, str(self.expression), ", ".join(map(str, self.agg_args)))
else:
return 'vaex.agg.{}({!r})'.format(self.short_name, str(self.expression))

def encode(self, encoding):
spec = {'aggregation': self.short_name}
if len(self.expressions) == 0:
Expand Down Expand Up @@ -251,10 +259,12 @@ def _prepare_types(self, df):
if self.short_name in ['sum', 'summoment']:
self.dtype_out = self.dtype_in.upcast()

def add_tasks(self, df, binners):
def add_tasks(self, df, binners, progress):
progressbar = vaex.utils.progressbars(progress)
self._prepare_types(df)
task = vaex.tasks.TaskAggregation(df, binners, self)
task = df.executor.schedule(task)
progressbar.add_task(task, repr(self))
@vaex.delayed
def finish(value):
return self.finish(value)
Expand Down Expand Up @@ -325,14 +335,15 @@ class AggregatorDescriptorMean(AggregatorDescriptorMulti):
def __init__(self, name, expression, short_name="mean", selection=None, edges=False):
super(AggregatorDescriptorMean, self).__init__(name, expression, short_name, selection=selection, edges=edges)

def add_tasks(self, df, binners):
def add_tasks(self, df, binners, progress):
progressbar = vaex.utils.progressbars(progress, title=repr(self))
expression = expression_sum = expression = df[str(self.expression)]

sum_agg = sum(expression_sum, selection=self.selection, edges=self.edges)
count_agg = count(expression, selection=self.selection, edges=self.edges)

task_sum = sum_agg.add_tasks(df, binners)[0][0]
task_count = count_agg.add_tasks(df, binners)[0][0]
task_sum = sum_agg.add_tasks(df, binners, progress=progressbar)[0][0]
task_count = count_agg.add_tasks(df, binners, progress=progressbar)[0][0]
self.dtype_in = sum_agg.dtype_in
self.dtype_out = sum_agg.dtype_out

Expand All @@ -359,16 +370,17 @@ def __init__(self, name, expression, short_name="var", ddof=0, selection=None, e
super(AggregatorDescriptorVar, self).__init__(name, expression, short_name, selection=selection, edges=edges)
self.ddof = ddof

def add_tasks(self, df, binners):
def add_tasks(self, df, binners, progress):
progressbar = vaex.utils.progressbars(progress, title=repr(self))
expression_sum = expression = df[str(self.expression)]
expression = expression_sum = expression.astype('float64')
sum_moment = _sum_moment(str(expression_sum), 2, selection=self.selection, edges=self.edges)
sum_ = sum(str(expression_sum), selection=self.selection, edges=self.edges)
count_ = count(str(expression), selection=self.selection, edges=self.edges)

task_sum_moment = sum_moment.add_tasks(df, binners)[0][0]
task_sum = sum_.add_tasks(df, binners)[0][0]
task_count = count_.add_tasks(df, binners)[0][0]
task_sum_moment = sum_moment.add_tasks(df, binners, progress=progressbar)[0][0]
task_sum = sum_.add_tasks(df, binners, progress=progressbar)[0][0]
task_count = count_.add_tasks(df, binners, progress=progressbar)[0][0]
self.dtype_in = sum_.dtype_in
self.dtype_out = sum_.dtype_out
@vaex.delayed
Expand Down
22 changes: 15 additions & 7 deletions packages/vaex-core/vaex/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ def _convert_name(filenames, shuffle=False, suffix=None):
return base + ".hdf5"


def convert(path_input, fs_options_input, fs_input, path_output, fs_options_output, fs_output, *args, **kwargs):
def convert(path_input, fs_options_input, fs_input, path_output, fs_options_output, fs_output, progress=None, *args, **kwargs):
@vaex.cache.output_file(
path_input=path_input, fs_options_input=fs_options_input, fs_input=fs_input,
path_output=path_output, fs_options_output=fs_options_output, fs_output=fs_output)
def cached_output(*args, **kwargs):
ds = vaex.dataset.open(path_input, fs_options=fs_options_input, fs=fs_input, *args, **kwargs)
if ds is not None:
df = vaex.from_dataset(ds)
df.export(path_output, fs_options=fs_options_output, fs=fs_output)
df.export(path_output, fs_options=fs_options_output, fs=fs_output, progress=progress)
cached_output(*args, **kwargs)


def convert_csv(path_input, fs_options_input, fs_input, path_output, fs_options_output, fs_output, *args, **kwargs):
def convert_csv(path_input, fs_options_input, fs_input, path_output, fs_options_output, fs_output, progress=None, *args, **kwargs):
@vaex.cache.output_file(
path_input=path_input, fs_options_input=fs_options_input, fs_input=fs_input,
path_output=path_output, fs_options_output=fs_options_output, fs_output=fs_output)
Expand All @@ -47,11 +47,11 @@ def cached_output(*args, **kwargs):
if 'chunk_size' not in kwargs:
# make it memory efficient by default
kwargs['chunk_size'] = 5_000_000
_from_csv_convert_and_read(path_input, path_output=path_output, fs_options=fs_options_input, fs=fs_input, **kwargs)
_from_csv_convert_and_read(path_input, path_output=path_output, fs_options=fs_options_input, fs=fs_input, progress=progress, **kwargs)
cached_output(*args, **kwargs)


def _from_csv_convert_and_read(filename_or_buffer, path_output, chunk_size, fs_options, fs=None, copy_index=False, **kwargs):
def _from_csv_convert_and_read(filename_or_buffer, path_output, chunk_size, fs_options, fs=None, copy_index=False, progress=None, **kwargs):
# figure out the CSV file path
csv_path = vaex.file.stringyfy(filename_or_buffer)
path_output_bare, ext, _ = vaex.file.split_ext(path_output)
Expand All @@ -61,24 +61,32 @@ def _from_csv_convert_and_read(filename_or_buffer, path_output, chunk_size, fs_o
# convert CSV chunks to separate HDF5 files
import pandas as pd
converted_paths = []
# we don't have indeterminate progress bars, so we cast it to truethy
progress = bool(progress) if progress is not None else False
if progress:
print("Converting csv to chunk files")
with vaex.file.open(filename_or_buffer, fs_options=fs_options, fs=fs, for_arrow=True) as f:
csv_reader = pd.read_csv(filename_or_buffer, chunksize=chunk_size, **kwargs)
for i, df_pandas in enumerate(csv_reader):
df = vaex.from_pandas(df_pandas, copy_index=copy_index)
chunk_name = f'{path_output_bare}_chunk_{i}.{ext}'
chunk_name = f'{path_output_bare}_chunk_{i}{ext}'
df.export(chunk_name)
converted_paths.append(chunk_name)
log.info('saved chunk #%d to %s' % (i, chunk_name))
if progress:
print("Saved chunk #%d to %s" % (i, chunk_name))

# combine chunks into one HDF5 file
if len(converted_paths) == 1:
# no need to merge several HDF5 files
os.rename(converted_paths[0], path_output)
else:
if progress:
print('Converting %d chunks into single file %s' % (len(converted_paths), path_output))
log.info('converting %d chunks into single file %s' % (len(converted_paths), path_output))
dfs = [vaex.open(p) for p in converted_paths]
df_combined = vaex.concat(dfs)
df_combined.export(path_output)
df_combined.export(path_output, progress=progress)

log.info('deleting %d chunk files' % len(converted_paths))
for df, df_path in zip(dfs, converted_paths):
Expand Down
Loading

0 comments on commit a59c3ad

Please sign in to comment.