Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE-REQUEST] How to share results from calculations on previous rows for calculations on next rows in apply? #1706

Open
yohplala opened this issue Nov 15, 2021 · 6 comments
Assignees

Comments

@yohplala
Copy link
Contributor

yohplala commented Nov 15, 2021

Description
I would like to keep results from calculations on previous rows to operate calculations on next rows in apply. Please, how should I do that?

With below code, I have a shared variable buffer, but it seems rows are processed in parallel and not sequentially. As a result, I don't have the right value when calculating 'next rows'.
Is there any way to force in apply a 'sequential execution' (and yet operating on arrays i.e. with vectorize=True)? I was thinking multiprocessing=Falsewould do the trick, but it does not force sequential processing.

import vaex as vx
import numpy as np
from functools import partial

# Setup
def sum_v(data: np.ndarray, buffer: np.ndarray):
    res = np.empty(len(data))
    for idx, val in np.ndenumerate(data):
        idx, = idx
        print(buffer)
        res[idx] = val+buffer[0]
        buffer[0] = res[idx]
    return res

SIZE = 5
data = np.arange(SIZE)
buffer = np.array([0])
sum_vp = partial(sum_v, buffer=buffer)
sum_vp.__name__ = 'sum_vp'

# Test
vdf = vx.from_arrays(x=np.arange(SIZE))
vdf['res'] = vdf.apply(sum_vp, arguments=[vdf.x], vectorize=True, multiprocessing=False)

When buffer is printed (in the for loop), we can see the values it stores are nook.

vdf
[0]
[0]
[0]
[0]
[0]
[1]
[3]
[6]
[10]
[10]
[10]
[10]
[10]
[11]
[13]
[16]
Out[6]: 
  #    x    res
  0    0     10
  1    1     11
  2    2     13
  3    3     16
  4    4     20

We should see:

# initial array: 0,  1,   2,  3,    4
# buffer:        0,  0,   1,  3,    6
Out[]: 
  #    x    res
  0    0     0
  1    1     1
  2    2     3
  3    3     6
  4    4    10

If I deactivate vectorize in apply, then the function expects int / float, not array.
But I am willing to use arrays as in a next step, I intend to speed the execution with @guvectorize decorator from numba.

@yohplala yohplala changed the title [FEATURE-REQUEST] How to share result from calculations on previous rows for calculations on next rows in apply? [FEATURE-REQUEST] How to share results from calculations on previous rows for calculations on next rows in apply? Nov 15, 2021
@yohplala
Copy link
Contributor Author

yohplala commented Nov 15, 2021

To give you a view on how @guvectorize works, here is the corresponding working example with @guvectorize() + numpy.

import numpy as np
from numba import guvectorize

@guvectorize('void(float64[:], float64[:], float64[:])', '(m), (n)->(m)', nopython=True)
def sum_v(data: np.ndarray, buffer: np.ndarray, res: np.ndarray):
    for idx, val in np.ndenumerate(data):
        idx, = idx
        print(buffer)
        res[idx] = val+buffer[0]
        buffer[0] = res[idx]

def sum_vp(data):                       # does the work of partial
    buffer = np.array([0])
    res = np.empty(len(data))
    sum_v(data, buffer, res)
    return res

SIZE = 5
data = np.arange(SIZE)
res_ar = sum_vp(data)
[0.]
[0.]
[1.]
[3.]
[6.]

res_ar
Out[3]: array([ 0.,  1.,  3.,  6., 10.])

@JovanVeljanoski
Copy link
Member

Related issue: #1313

@yohplala
Copy link
Contributor Author

Related issue: #1313

Hi @JovanVeljanoski
Yes, looking at the title, it seems really the same need at first.

Then looking at provided and accepted SO answer, this answer is not the one I am looking for.
It seems like the author of the question either formulated wrongly the title of the question, either he accepted an answer that does not fulfill the title of his initial question.

Provided answer relies on a 'shift-like' approach. It uses a data from a previous row. this data is existing and unchanged during calculation.
Myself, I am looking for a way to use result from calculation at previous row (using a buffer variable: either a column of the dataframe that is changed during calculation, either an external variable).

@yohplala
Copy link
Contributor Author

yohplala commented Nov 16, 2021

I spent a couple of hours on the topic to see if I could find my way through apply(vectorize=True) or apply(vectorize=False).
I did learn 'more' about this parameter, and I am concluding that whatever its value, at some point, vaex will parallelize calculations. Hence, at some point, my buffer has not the right value.

In both cases, I am rolling out a similar approach:

  • trying to use external variables
  • safeguarding calculations so that they are run 'in the right conditions' by:
    • keeping row index from previous iteration, and doing calculation of current row only if its index is the next one. row_idx == last_idx+1
    • calculation at this row has not been already done in a past iteration (I could notice vaex redoes processing of same row at the begining and in the end) processed == False

Using apply(vectorize=True)

The farther I have been is with this piece of code.

import vaex as vx
import numpy as np
from functools import partial

# Setup
def sum_v(row_idx: np.ndarray, data: np.ndarray, res: np.ndarray, processed: np.ndarray, buffer, last_idx):
#    res = np.empty(len(data))
    for idx, val in np.ndenumerate(data):
        idx, = idx
#        print(f'before: buffer: {buffer} / last_idx: {last_idx} / idx: {idx} / processed: {processed[idx]}')
        if (row_idx[idx] == last_idx[0]+1) and not processed[idx]:
            res[idx] = val+buffer[0]
            buffer[0] = res[idx]
            last_idx[0] = row_idx[idx]
            processed[idx] = True
#            print(f'after: buffer: {buffer} / last_idx: {last_idx} / idx: {idx} / processed: {processed[idx]}')

    return res

SIZE = 5
buffer = [0]
last_idx = [-1]
sum_vp = partial(sum_v, buffer=buffer, last_idx=last_idx)
sum_vp.__name__ = 'sum_vp'

# Test
vdf = vx.from_arrays(row_idx=np.arange(SIZE), x=np.arange(SIZE), res=np.empty(SIZE), processed=np.array([False]*SIZE))
vdf['res_n'] = vdf.apply(sum_vp, arguments=[vdf.row_idx, vdf.x, vdf.res, vdf.processed], vectorize=True, multiprocessing=False)

With SIZE equals 5, it works, but as soon as it it is greater than 5, it fails, including the 1st rows.
'Interesting' aspects I discovered are that:

  • good: you can modify in-place a vaex column (here res column),
  • bad: you need your function to always return a result, which my function was not initially doing because being run only when conditions are appropriate (checking last_idx and processed)

The approach I retain is then to keep my results in res column (in-place modification).
Rows of res are indeed set only conditionally (checking last_idx and processed).
To avoid an error message, the functions returns also res, but it is stored in another column (res_n) so as not to overwrite result in res.

Using apply(vectorize=False)

The farther I have been is with this piece of code.

import vaex as vx
import numpy as np
from functools import partial

# Setup
def sum_v(data: float, idx: float, res: float, buffer, last_idx, last_value, processed: np.ndarray):
    print(f'before: buffer: {buffer} // last idx: {last_idx} // idx: {idx} // res: {res}')
    if (idx == last_idx[0]+1) and not processed[idx]:
        res = data+buffer[0]
        buffer[0] = res
        last_value[idx] = res
        last_idx[0] = idx
        processed[idx] = True
        print(f'after: buffer: {buffer} // last idx: {last_idx} // idx: {idx} // res: {res}')
        return res
    else:
        return last_value[idx]


buffer = [0]
last_idx = [-1]
last_value = dict()

SIZE = 5
processed = np.array([False]*SIZE)
sum_vp = partial(sum_v, buffer=buffer, last_idx=last_idx, last_value=last_value, processed=processed)
sum_vp.__name__ = 'sum_vp'

# Test
vdf = vx.from_arrays(x=np.arange(SIZE), idx=np.arange(SIZE), res=np.zeros(SIZE))
vdf['res_t'] = vdf.apply(sum_vp, arguments=[vdf.x, vdf.idx, vdf.res], vectorize=False, multiprocessing=False)

It works until SIZE get bigger than 1024.
When it does, I can see that when the next chunk is processed (row idx > 1024), buffer seems to have been not initialized yet. this leads me to think that even for vectorize=False, calculations are run in parallel.

Even if above approach would have worked, it would have been unrealistic as I am using a dict that in the end will be as big the initial array. The out-of-core feature is thus lost. But I wanted a working case, so I tried anyway. But it does not work anyhow as said.

What I learned here is that:

  • on the opposite to vectorize=True, you cannot modify in-place a column (which is why I am using a dict instead, re-creating a kind of array externally to vaex dataframe.
  • this is here again to cope up with the fact that the function needs to return something, otherwise is not happy

Next?

I think we can derive what is missing from these tests to be able to conduct such sequential processing.
I am having the following considerations:

  • The 1st approach with vectorize=True seems the most interesting as it allows modifying in-place dataframe column. Hence we can use them as buffer as well.
  • (required) A parameter to force sequential execution, i.e. still processing data by chunk, but doing one chunk at a time, one after the other
  • (optional) It would be nice to have a parameter that makes vaex interpret a None (returned by the function) as 'keep existing value in result' instead of setting value at None.

Could this be possible?

@yohplala
Copy link
Contributor Author

yohplala commented Nov 16, 2021

The calculation I intend is not exactly the one presented above (I intend to use min and max, store them temporarily in a buffer till I encounter a new min or max, or a limit value is hit, in which case buffer values are resetted).

But above example shows the intennt. It is actually a cumulative sum, and other tickets are indeed open on this topic @JovanVeljanoski
#743
#540 (somehow, but I think solution of #540 will not necessarily apply to this case)

Bests

@yohplala
Copy link
Contributor Author

Thinking loudly (I think will go this way, but need to spend some time on other topics, so will get back to this a bit later).
A iter_apply function may answer the needs. It would be a 'short' function that only gathers some 'vaex best practises' but leaves complete freedom to user to implement any function he/she would like to (which does not current apply)

@vaex.register_function
def iter_apply(func, arguments, chunk_size):
    vdf = arguments[0].join(arguments[1:]) # re-create a DataFrame of relevant column
    arrow_list=[]
    for df in vdf.to_pandas_df(chunk_size=chunk_size): # iterate and apply func by chunk sequentially
        res = vx.from_pandas(func(df))
        file = '~/.vaex/cache/....arrow'      # use cache directory to store temporarily results
        res.to_arrow(file)
        arrow_list.append(file)
    return vx.open_many(arrow_list)

This is the rough idea, need to check in more details.
Basically, we keep a control of the RAM managed.
Returned data can be re-used lazily (because read from arrow) in a vaex dataflow.
User does not need knowledge about the cache and temporary file management.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants