Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Details of the Data Parallel Tree Learner #5798

Closed
adfea9c0 opened this issue Mar 21, 2023 · 9 comments
Closed

Details of the Data Parallel Tree Learner #5798

adfea9c0 opened this issue Mar 21, 2023 · 9 comments

Comments

@adfea9c0
Copy link

I'm working to understand the data parallel tree learner included in LightGBM. I'm using dask on lightgbm==3.3.2. I have a few open questions:

  1. My understanding was that this tree learner simply distributes the computations of the local algorithm, and should produce similar results. (Conversely the voting tree learner has the workers vote based on their local data, therefore the results produced depend on the data distribution.) However I find that the output of the data-parallel tree learner can vary. I included an artificial example below. I work with fairly noisy data so these results are quite usual for me.
  2. If the data parallel learner is not supposed to faithfully reproduce the local algorithm, is there documentation or a paper anywhere on how it works? I found the paper for the voting tree learner but I would like to understand the data parallel learner too.
  3. I'm fairly new to dask and lightgbm so this might be poorly phrased, but are there any writings on the best practices? Profiling my code, a lot of time is spent by dask moving the data into place, and relatively little on the learning. In particular, there seems to be something weird going on where my client has N workers available, and I use M < N parts in my dask.dataframe, and then when I make repeated calls to model.predict, I can see in my dask dashboard that these computations are going to a different subset of M workers each time. This seems kind of weird to me, is the data really re-distributed each time or am I misunderstanding?

The example from my first point:

import numpy as np
import pandas as pd
import lightgbm as lgb
import dask.array as da
import dask.dataframe as dd
from dask.distributed import Client
import matplotlib.pyplot as plt

np.random.seed(0)

# Data
parts = 4
N = 4_000
features = ['x1', 'x2', 'x3', 'x4', 'x5', 'x6']
df = pd.DataFrame(
    data=np.random.normal(size=(N, 6)),
    columns=features,
)
df['y'] = np.where(df['x1'] * df['x2'] > df['x3'] * df['x4'], df['x5'] - df['x6'], df['x5'] * df['x6'])
df['y_with_noise'] = df['y'] + 4 * np.random.normal(size=(N,))

split = int(0.8 * len(df))
df_train, df_test = df.iloc[:split], df.iloc[split:]

# Distribute(?)
data = dd.from_pandas(df_train, npartitions=parts)
X = data[features].to_dask_array(lengths=True)
y = data['y_with_noise'].to_dask_array(lengths=True)

# Regressor
client = Client()
for it in range(10):
    model = lgb.DaskLGBMRegressor(
        client=client,
        silent=False,
        tree_learner="data",
        **{
            # Some random parameters
            "learning_rate": 0.1,
            "num_leaves": 32,
            "max_depth": 5,
            "num_iterations": 100,
            "bagging_fraction": 0.8,
            # Let's try to get the same result.
            "seed": 1,
            "deterministic": True,
        },
    )
    model.fit(X, y)
    booster = model.booster_
    
    res, pred_so_far = [0.0], 0
    label = df_test['y_with_noise']
    for i in range(booster.num_trees()):
        pred_so_far += booster.predict(df_test[features], start_iteration=i, num_iteration=1)
        res.append(1.0 - np.sum((pred_so_far - label)**2) / np.sum(label**2))
    plt.plot(res, label=f"Iteration {it}")

Annotation 2023-03-21 150527

@jmoralez
Copy link
Collaborator

Hey @adfea9c0, thanks for using LightGBM. I'll try to answer your questions.

  1. The algorithm isn't exactly the same because each worker builds the feature histograms for a subset of features using its local data Investigate how the gap between local and Dask predictions can be decreased #3835 (comment)
  2. I'm not sure if there's a paper because the data-parallel algorithm is probably the most common, you can see a visual representation of the data-splitting here.
  3. The dask interface adds some overhead because it has to combine partitions, schedule, synchronize the training and so on. You'll only benefit from it if you really need distributed training, i.e. your data doesn't fit in a single machine and you need to use a cluster of machines to train. With respect to the predict scheduling, when you call predict a local version of the model is serialized and sent to the worker that was scheduled to run the prediction of a partition, if that partition is already in memory of a worker then it'll probably be scheduled there, however, if your partition is something like "read this file from s3" then any worker can be assigned. You can load your dataset into distributed memory by using .persist() on your dask dataframe, which will use more memory but be more efficient if you're repeatedly training/predicting.

Please let us know if you have further doubts.

@adfea9c0
Copy link
Author

Thank you for the response!

1./2. I understand how the data is partitioned -- but I didn't understand really how the actual algorithm works. From your description it sounds like the workers do binning locally, which results in non-deterministic behaviour? Otherwise I still don't understand the source of non-determinism.
3. I do .persist() my dask dataframe but I still see different workers doing the actual job.

One more clarification -- do i understand correctly that generally speaking you want the number of partitions npartitions of your dask dataframe to be equal to the number of workers? Since each worker will first collect all partitions assigned to it, and only then start working.

@jmoralez
Copy link
Collaborator

1./2. Yes, exactly. Even if the partitions are the same, the feature bins for the same feature might be computed using a different partition, thus they may be different.
3. We let dask decide the scheduling, so it may decide to schedule the predict for a partition in different workers, I don't think there's a guarantee. I'd say it's likely to be scheduled on the worker that already has the partition in memory but if it's small it may decide to just serialize it and let other worker do it.

Yes, that's a good value because as you say the first step is combining all partitions that a worker has into a single one, so if each worker holds a single partition that step will be faster.

@adfea9c0
Copy link
Author

Thanks for your help!

@adfea9c0
Copy link
Author

adfea9c0 commented Mar 30, 2023

Hey sorry I had one follow-up question about this:

@adfea9c0

1./2. I understand how the data is partitioned -- but I didn't understand really how the actual algorithm works. From your description it sounds like the workers do binning locally, which results in non-deterministic behaviour? Otherwise I still don't understand the source of non-determinism.

@jmoralez

1./2. Yes, exactly. Even if the partitions are the same, the feature bins for the same feature might be computed using a different partition, thus they may be different.

How is this information merged to determine the best global split? So each worker gets its own subset of the data, determines the cost of splitting on each boundary, and then communicates this for central processing? But then how is this info combined?

E.g. if Worker A has bin boundaries for some feature at 1.0 and at 3.0, but Worker B has a boundary at 2.0, how can we use histogram information to find the gain by splitting on 2.0 for Worker A? I find the idea of local bins very confusing.

EDIT: This would also imply that for the data parellel tree learner it is also important to have each worker receive a representative subset of the data, correct? Otherwise my bins might be completely different than the ones from some other worker.

@adfea9c0 adfea9c0 reopened this Mar 30, 2023
@jmoralez
Copy link
Collaborator

I believe the process is:

  1. Compute the feature bins for all features. This is where each worker is assigned a subset of the features to compute the bins.
  2. Synchronize the feature bins across all workers. Here every worker gets the feature bins for all features.
  3. Each worker computes the gradients & hessians on their local data.
  4. The gradients and hessians are synchronized and the split gain is computed.
  5. The best split is computed and then synchronized to all workers.
  6. Go back to step 3 until the training is complete.

This would also imply that for the data parellel tree learner it is also important to have each worker receive a representative subset of the data, correct?

Yes, that's a very important assumption.

Please take this with a grain of salt, I may be wrong in some step, @shiyu1994 is the person who knows this for sure.

@adfea9c0
Copy link
Author

  1. Compute the feature bins for all features. This is where each worker is assigned a subset of the features to compute the bins.

And these bins are computed based on the local sample of data? I feel like it should still be possible to implement this in a deterministic way, i.e. if the data is partitioned the same way between runs, and features are distributed across workers in a consistent way (assuming a fixed seed), then bin boundaries should be consistent across runs, no?

@jmoralez
Copy link
Collaborator

It is in the CLI because you manually specify the order of the machines, however for the dask case the order of the workers isn't deterministic, so the same worker may get assigned a different subset of features each time.

@adfea9c0 adfea9c0 closed this as completed Apr 3, 2023
@github-actions
Copy link

This issue has been automatically locked since there has not been any recent activity since it was closed. To start a new related discussion, open a new issue at https://github.com/microsoft/LightGBM/issues including a reference to this.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Aug 19, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

3 participants