From adf458dac0be0056a0abd117bff3811c95705080 Mon Sep 17 00:00:00 2001 From: Matthias Fey Date: Mon, 13 Jan 2025 08:55:50 +0100 Subject: [PATCH] Use multi-threading and parallel hash map in `CPUHashMap` (#380) --- CMakeLists.txt | 1 + benchmark/classes/hash_map.py | 31 +++++++++----- pyg_lib/csrc/classes/cpu/hash_map.cpp | 59 ++++++++++++++------------- pyg_lib/csrc/classes/cpu/hash_map.h | 16 ++++++-- test/csrc/classes/test_hash_map.cpp | 2 +- 5 files changed, 65 insertions(+), 44 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 10afda77..c2d20125 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -72,6 +72,7 @@ if (NOT "$ENV{EXTERNAL_PHMAP_INCLUDE_DIR}" STREQUAL "") include_directories($ENV{EXTERNAL_PHMAP_INCLUDE_DIR}) else() set(PHMAP_DIR third_party/parallel-hashmap) + include_directories(${PHMAP_DIR}) target_include_directories(${PROJECT_NAME} PRIVATE ${PHMAP_DIR}) endif() diff --git a/benchmark/classes/hash_map.py b/benchmark/classes/hash_map.py index e1058307..092637f0 100644 --- a/benchmark/classes/hash_map.py +++ b/benchmark/classes/hash_map.py @@ -19,21 +19,28 @@ if args.device == 'cpu': num_warmups, num_steps = num_warmups // 10, num_steps // 10 - key = torch.randperm(args.num_keys, device=args.device) - query = torch.randperm(args.num_queries, device=args.device) - query = query[:args.num_queries] + max_value = torch.iinfo(torch.long).max + + key1 = torch.randint(0, max_value, (args.num_keys, ), dtype=torch.long, + device=args.device) + query1 = key1[torch.randperm(key1.size(0), device=args.device)] + query1 = query1[:args.num_queries] + + key2 = torch.randperm(args.num_keys, device=args.device) + query2 = torch.randperm(args.num_queries, device=args.device) + query2 = query2[:args.num_queries] t_init = t_get = 0 for i in range(num_warmups + num_steps): torch.cuda.synchronize() t_start = time.perf_counter() - hash_map = HashMap(key) + hash_map = HashMap(key1) torch.cuda.synchronize() if i >= num_warmups: t_init += time.perf_counter() - t_start t_start = time.perf_counter() - hash_map.get(query) + out1 = hash_map.get(query1) torch.cuda.synchronize() if i >= num_warmups: t_get += time.perf_counter() - t_start @@ -48,13 +55,13 @@ t_start = time.perf_counter() hash_map = torch.full((args.num_keys, ), fill_value=-1, dtype=torch.long, device=args.device) - hash_map[key] = torch.arange(args.num_keys) + hash_map[key2] = torch.arange(args.num_keys) torch.cuda.synchronize() if i >= num_warmups: t_init += time.perf_counter() - t_start t_start = time.perf_counter() - hash_map[query] + out2 = hash_map[query2] torch.cuda.synchronize() if i >= num_warmups: t_get += time.perf_counter() - t_start @@ -63,20 +70,22 @@ print(f' Memory Get: {t_get / num_steps:.4f}s') print('=====================') - if key.is_cpu: + if key1.is_cpu: t_init = t_get = 0 for i in range(num_warmups + num_steps): t_start = time.perf_counter() - hash_map = pd.CategoricalDtype(categories=key.numpy(), + hash_map = pd.CategoricalDtype(categories=key1.numpy(), ordered=True) if i >= num_warmups: t_init += time.perf_counter() - t_start t_start = time.perf_counter() - ser = pd.Series(query.numpy(), dtype=hash_map) - ser.cat.codes.to_numpy() + ser = pd.Series(query1.numpy(), dtype=hash_map) + out3 = ser.cat.codes.to_numpy() if i >= num_warmups: t_get += time.perf_counter() - t_start print(f' Pandas Init: {t_init / num_steps:.4f}s') print(f' Pandas Get: {t_get / num_steps:.4f}s') + + assert out1.equal(torch.tensor(out3)) diff --git a/pyg_lib/csrc/classes/cpu/hash_map.cpp b/pyg_lib/csrc/classes/cpu/hash_map.cpp index d596f2f6..e7a8bb1c 100644 --- a/pyg_lib/csrc/classes/cpu/hash_map.cpp +++ b/pyg_lib/csrc/classes/cpu/hash_map.cpp @@ -1,32 +1,36 @@ #include "hash_map.h" +#include #include namespace pyg { namespace classes { -CPUHashMap::CPUHashMap(const at::Tensor& key) { +template +CPUHashMap::CPUHashMap(const at::Tensor& key) { at::TensorArg key_arg{key, "key", 0}; at::CheckedFrom c{"HashMap.init"}; at::checkDeviceType(c, key, at::DeviceType::CPU); at::checkDim(c, key_arg, 1); at::checkContiguous(c, key_arg); - // clang-format off - AT_DISPATCH_ALL_TYPES_AND(at::ScalarType::Bool, - key.scalar_type(), - "cpu_hash_map_init", - [&] { - const auto key_data = key.data_ptr(); - for (int64_t i = 0; i < key.numel(); ++i) { - auto [iterator, inserted] = map_.insert({key_data[i], i}); - TORCH_CHECK(inserted, "Found duplicated key."); - } - }); - // clang-format on + map_.reserve(key.numel()); + + const auto num_threads = at::get_num_threads(); + const auto grain_size = std::max( + (key.numel() + num_threads - 1) / num_threads, at::internal::GRAIN_SIZE); + const auto key_data = key.data_ptr(); + + at::parallel_for(0, key.numel(), grain_size, [&](int64_t beg, int64_t end) { + for (int64_t i = beg; i < end; ++i) { + auto [iterator, inserted] = map_.insert({key_data[i], i}); + TORCH_CHECK(inserted, "Found duplicated key."); + } + }); }; -at::Tensor CPUHashMap::get(const at::Tensor& query) { +template +at::Tensor CPUHashMap::get(const at::Tensor& query) { at::TensorArg query_arg{query, "query", 0}; at::CheckedFrom c{"HashMap.get"}; at::checkDeviceType(c, query, at::DeviceType::CPU); @@ -37,27 +41,26 @@ at::Tensor CPUHashMap::get(const at::Tensor& query) { const auto out = at::empty({query.numel()}, options); auto out_data = out.data_ptr(); - // clang-format off - AT_DISPATCH_ALL_TYPES_AND(at::ScalarType::Bool, - query.scalar_type(), - "cpu_hash_map_get", - [&] { - const auto query_data = query.data_ptr(); + const auto num_threads = at::get_num_threads(); + const auto grain_size = + std::max((query.numel() + num_threads - 1) / num_threads, + at::internal::GRAIN_SIZE); + const auto query_data = query.data_ptr(); - for (size_t i = 0; i < query.numel(); ++i) { - auto it = map_.find(query_data[i]); - out_data[i] = (it != map_.end()) ? it->second : -1; - } - }); - // clang-format on + at::parallel_for(0, query.numel(), grain_size, [&](int64_t beg, int64_t end) { + for (int64_t i = beg; i < end; ++i) { + auto it = map_.find(query_data[i]); + out_data[i] = (it != map_.end()) ? it->second : -1; + } + }); return out; } TORCH_LIBRARY(pyg, m) { - m.class_("CPUHashMap") + m.class_>("CPUHashMap") .def(torch::init()) - .def("get", &CPUHashMap::get); + .def("get", &CPUHashMap::get); } } // namespace classes diff --git a/pyg_lib/csrc/classes/cpu/hash_map.h b/pyg_lib/csrc/classes/cpu/hash_map.h index 2ec9169a..6540acc8 100644 --- a/pyg_lib/csrc/classes/cpu/hash_map.h +++ b/pyg_lib/csrc/classes/cpu/hash_map.h @@ -1,21 +1,29 @@ #pragma once #include -#include +#include "parallel_hashmap/phmap.h" namespace pyg { namespace classes { +template struct CPUHashMap : torch::CustomClassHolder { public: - using KeyType = std:: - variant; + using ValueType = int64_t; CPUHashMap(const at::Tensor& key); at::Tensor get(const at::Tensor& query); private: - std::unordered_map map_; + phmap::parallel_flat_hash_map< + KeyType, + ValueType, + phmap::priv::hash_default_hash, + phmap::priv::hash_default_eq, + phmap::priv::Allocator>, + 8, + phmap::NullMutex> + map_; }; } // namespace classes diff --git a/test/csrc/classes/test_hash_map.cpp b/test/csrc/classes/test_hash_map.cpp index a54400b5..4705b3a8 100644 --- a/test/csrc/classes/test_hash_map.cpp +++ b/test/csrc/classes/test_hash_map.cpp @@ -7,7 +7,7 @@ TEST(CPUHashMapTest, BasicAssertions) { auto options = at::TensorOptions().dtype(at::kLong); auto key = at::tensor({0, 10, 30, 20}, options); - auto map = pyg::classes::CPUHashMap(key); + auto map = pyg::classes::CPUHashMap(key); auto query = at::tensor({30, 10, 20, 40}, options); auto expected = at::tensor({2, 1, 3, -1}, options);