Skip to content

Commit

Permalink
Merge remote-tracking branch 'jnke2016/branch-23.10_increase-timeout'…
Browse files Browse the repository at this point in the history
… into branch-23.10-cugraph_nx_benchmarks
  • Loading branch information
rlratzel committed Oct 6, 2023
2 parents bf74a73 + 74c63fb commit cbdbd8a
Show file tree
Hide file tree
Showing 15 changed files with 485 additions and 46 deletions.
4 changes: 4 additions & 0 deletions ci/release/update-version.sh
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ NEXT_SHORT_TAG_PEP440=$(python -c "from setuptools.extern import packaging; prin
DEPENDENCIES=(
cudf
cugraph
cugraph-dgl
cugraph-pyg
cugraph-service-server
cugraph-service-client
cuxfilter
dask-cuda
Expand All @@ -93,6 +96,7 @@ DEPENDENCIES=(
librmm
pylibcugraph
pylibcugraphops
pylibwholegraph
pylibraft
pyraft
raft-dask
Expand Down
5 changes: 4 additions & 1 deletion ci/test_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ popd

rapids-logger "pytest cugraph"
pushd python/cugraph/cugraph
export DASK_WORKER_DEVICES="0"
DASK_WORKER_DEVICES="0" \
DASK_DISTRIBUTED__SCHEDULER__WORKER_TTL="1000s" \
DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="1000s" \
DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \
pytest \
-v \
--benchmark-disable \
Expand Down
6 changes: 5 additions & 1 deletion ci/test_wheel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,9 @@ arch=$(uname -m)
if [[ "${arch}" == "aarch64" && ${RAPIDS_BUILD_TYPE} == "pull-request" ]]; then
python ./ci/wheel_smoke_test_${package_name}.py
else
RAPIDS_DATASET_ROOT_DIR=`pwd`/datasets python -m pytest ./python/${package_name}/${python_package_name}/tests
RAPIDS_DATASET_ROOT_DIR=`pwd`/datasets \
DASK_DISTRIBUTED__SCHEDULER__WORKER_TTL="1000s" \
DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="1000s" \
DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \
python -m pytest ./python/${package_name}/${python_package_name}/tests
fi
1 change: 1 addition & 0 deletions conda/environments/all_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ dependencies:
- pydata-sphinx-theme
- pylibcugraphops==23.12.*
- pylibraft==23.12.*
- pylibwholegraph==23.10.*
- pytest
- pytest-benchmark
- pytest-cov
Expand Down
1 change: 1 addition & 0 deletions conda/environments/all_cuda-120_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ dependencies:
- pydata-sphinx-theme
- pylibcugraphops==23.12.*
- pylibraft==23.12.*
- pylibwholegraph==23.10.*
- pytest
- pytest-benchmark
- pytest-cov
Expand Down
3 changes: 3 additions & 0 deletions dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,9 @@ dependencies:
- *numpy
- python-louvain
- scikit-learn>=0.23.1
- output_types: [conda]
packages:
- pylibwholegraph==23.10.*
test_python_pylibcugraph:
common:
- output_types: [conda, pyproject]
Expand Down
2 changes: 1 addition & 1 deletion python/cugraph-dgl/cugraph_dgl/dataloading/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from cugraph_dgl.dataloading.dataset import (
HomogenousBulkSamplerDataset,
HetrogenousBulkSamplerDataset,
HeterogenousBulkSamplerDataset,
)
from cugraph_dgl.dataloading.neighbor_sampler import NeighborSampler
from cugraph_dgl.dataloading.dataloader import DataLoader
49 changes: 35 additions & 14 deletions python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from dask.distributed import default_client, Event
from cugraph_dgl.dataloading import (
HomogenousBulkSamplerDataset,
HetrogenousBulkSamplerDataset,
HeterogenousBulkSamplerDataset,
)
from cugraph_dgl.dataloading.utils.extract_graph_helpers import (
create_cugraph_graph_from_edges_dict,
Expand All @@ -47,19 +47,20 @@ def __init__(
graph_sampler: cugraph_dgl.dataloading.NeighborSampler,
sampling_output_dir: str,
batches_per_partition: int = 50,
seeds_per_call: int = 400_000,
seeds_per_call: int = 200_000,
device: torch.device = None,
use_ddp: bool = False,
ddp_seed: int = 0,
batch_size: int = 1024,
drop_last: bool = False,
shuffle: bool = False,
sparse_format: str = "coo",
**kwargs,
):
"""
Constructor for CuGraphStorage:
-------------------------------
graph : CuGraphStorage
graph : CuGraphStorage
The graph.
indices : Tensor or dict[ntype, Tensor]
The set of indices. It can either be a tensor of
Expand Down Expand Up @@ -89,7 +90,12 @@ def __init__(
The seed for shuffling the dataset in
:class:`torch.utils.data.distributed.DistributedSampler`.
Only effective when :attr:`use_ddp` is True.
batch_size: int,
batch_size: int
Batch size.
sparse_format: str, default = "coo"
The sparse format of the emitted sampled graphs. Choose between "csc"
and "coo". When using "csc", the graphs are of type
cugraph_dgl.nn.SparseGraph.
kwargs : dict
Key-word arguments to be passed to the parent PyTorch
:py:class:`torch.utils.data.DataLoader` class. Common arguments are:
Expand Down Expand Up @@ -123,6 +129,12 @@ def __init__(
... for input_nodes, output_nodes, blocks in dataloader:
...
"""
if sparse_format not in ["coo", "csc"]:
raise ValueError(
f"sparse_format must be one of 'coo', 'csc', "
f"but got {sparse_format}."
)
self.sparse_format = sparse_format

self.ddp_seed = ddp_seed
self.use_ddp = use_ddp
Expand Down Expand Up @@ -156,11 +168,12 @@ def __init__(
self.cugraph_dgl_dataset = HomogenousBulkSamplerDataset(
total_number_of_nodes=graph.total_number_of_nodes,
edge_dir=self.graph_sampler.edge_dir,
sparse_format=sparse_format,
)
else:
etype_id_to_etype_str_dict = {v: k for k, v in graph._etype_id_dict.items()}

self.cugraph_dgl_dataset = HetrogenousBulkSamplerDataset(
self.cugraph_dgl_dataset = HeterogenousBulkSamplerDataset(
num_nodes_dict=graph.num_nodes_dict,
etype_id_dict=etype_id_to_etype_str_dict,
etype_offset_dict=graph._etype_offset_d,
Expand Down Expand Up @@ -210,14 +223,23 @@ def __iter__(self):
output_dir = os.path.join(
self._sampling_output_dir, "epoch_" + str(self.epoch_number)
)
kwargs = {}
if isinstance(self.cugraph_dgl_dataset, HomogenousBulkSamplerDataset):
deduplicate_sources = True
prior_sources_behavior = "carryover"
renumber = True
kwargs["deduplicate_sources"] = True
kwargs["prior_sources_behavior"] = "carryover"
kwargs["renumber"] = True

if self.sparse_format == "csc":
kwargs["compression"] = "CSR"
kwargs["compress_per_hop"] = True
# The following kwargs will be deprecated in uniform sampler.
kwargs["use_legacy_names"] = False
kwargs["include_hop_column"] = False

else:
deduplicate_sources = False
prior_sources_behavior = None
renumber = False
kwargs["deduplicate_sources"] = False
kwargs["prior_sources_behavior"] = None
kwargs["renumber"] = False

bs = BulkSampler(
output_path=output_dir,
Expand All @@ -227,10 +249,9 @@ def __iter__(self):
seeds_per_call=self._seeds_per_call,
fanout_vals=self.graph_sampler._reversed_fanout_vals,
with_replacement=self.graph_sampler.replace,
deduplicate_sources=deduplicate_sources,
prior_sources_behavior=prior_sources_behavior,
renumber=renumber,
**kwargs,
)

if self.shuffle:
self.tensorized_indices_ds.shuffle()

Expand Down
37 changes: 25 additions & 12 deletions python/cugraph-dgl/cugraph_dgl/dataloading/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from cugraph_dgl.dataloading.utils.sampling_helpers import (
create_homogeneous_sampled_graphs_from_dataframe,
create_heterogeneous_sampled_graphs_from_dataframe,
create_homogeneous_sampled_graphs_from_dataframe_csc,
)


Expand All @@ -33,17 +34,19 @@ def __init__(
total_number_of_nodes: int,
edge_dir: str,
return_type: str = "dgl.Block",
sparse_format: str = "coo",
):
if return_type not in ["dgl.Block", "cugraph_dgl.nn.SparseGraph"]:
raise ValueError(
"return_type must be either 'dgl.Block' or \
'cugraph_dgl.nn.SparseGraph' "
"return_type must be either 'dgl.Block' or "
"'cugraph_dgl.nn.SparseGraph'."
)
# TODO: Deprecate `total_number_of_nodes`
# as it is no longer needed
# in the next release
self.total_number_of_nodes = total_number_of_nodes
self.edge_dir = edge_dir
self.sparse_format = sparse_format
self._current_batch_fn = None
self._input_files = None
self._return_type = return_type
Expand All @@ -60,10 +63,20 @@ def __getitem__(self, idx: int):

fn, batch_offset = self._batch_to_fn_d[idx]
if fn != self._current_batch_fn:
df = _load_sampled_file(dataset_obj=self, fn=fn)
self._current_batches = create_homogeneous_sampled_graphs_from_dataframe(
sampled_df=df, edge_dir=self.edge_dir, return_type=self._return_type
)
if self.sparse_format == "csc":
df = _load_sampled_file(dataset_obj=self, fn=fn, skip_rename=True)
self._current_batches = (
create_homogeneous_sampled_graphs_from_dataframe_csc(df)
)
else:
df = _load_sampled_file(dataset_obj=self, fn=fn)
self._current_batches = (
create_homogeneous_sampled_graphs_from_dataframe(
sampled_df=df,
edge_dir=self.edge_dir,
return_type=self._return_type,
)
)
current_offset = idx - batch_offset
return self._current_batches[current_offset]

Expand All @@ -87,7 +100,7 @@ def set_input_files(
)


class HetrogenousBulkSamplerDataset(torch.utils.data.Dataset):
class HeterogenousBulkSamplerDataset(torch.utils.data.Dataset):
def __init__(
self,
num_nodes_dict: Dict[str, int],
Expand Down Expand Up @@ -141,18 +154,18 @@ def set_input_files(
----------
input_directory: str
input_directory which contains all the files that will be
loaded by HetrogenousBulkSamplerDataset
loaded by HeterogenousBulkSamplerDataset
input_file_paths: List[str]
File names that will be loaded by the HetrogenousBulkSamplerDataset
File names that will be loaded by the HeterogenousBulkSamplerDataset
"""
_set_input_files(
self, input_directory=input_directory, input_file_paths=input_file_paths
)


def _load_sampled_file(dataset_obj, fn):
def _load_sampled_file(dataset_obj, fn, skip_rename=False):
df = cudf.read_parquet(os.path.join(fn))
if dataset_obj.edge_dir == "in":
if dataset_obj.edge_dir == "in" and not skip_rename:
df.rename(
columns={"sources": "destinations", "destinations": "sources"},
inplace=True,
Expand Down Expand Up @@ -181,7 +194,7 @@ def get_batch_to_fn_d(files):


def _set_input_files(
dataset_obj: Union[HomogenousBulkSamplerDataset, HetrogenousBulkSamplerDataset],
dataset_obj: Union[HomogenousBulkSamplerDataset, HeterogenousBulkSamplerDataset],
input_directory: Optional[str] = None,
input_file_paths: Optional[List[str]] = None,
) -> None:
Expand Down
Loading

0 comments on commit cbdbd8a

Please sign in to comment.