Skip to content

Commit

Permalink
Use multi-threading and parallel hash map in CPUHashMap (#380)
Browse files Browse the repository at this point in the history
  • Loading branch information
rusty1s authored Jan 13, 2025
1 parent b4855b6 commit adf458d
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 44 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ if (NOT "$ENV{EXTERNAL_PHMAP_INCLUDE_DIR}" STREQUAL "")
include_directories($ENV{EXTERNAL_PHMAP_INCLUDE_DIR})
else()
set(PHMAP_DIR third_party/parallel-hashmap)
include_directories(${PHMAP_DIR})
target_include_directories(${PROJECT_NAME} PRIVATE ${PHMAP_DIR})
endif()

Expand Down
31 changes: 20 additions & 11 deletions benchmark/classes/hash_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,28 @@
if args.device == 'cpu':
num_warmups, num_steps = num_warmups // 10, num_steps // 10

key = torch.randperm(args.num_keys, device=args.device)
query = torch.randperm(args.num_queries, device=args.device)
query = query[:args.num_queries]
max_value = torch.iinfo(torch.long).max

key1 = torch.randint(0, max_value, (args.num_keys, ), dtype=torch.long,
device=args.device)
query1 = key1[torch.randperm(key1.size(0), device=args.device)]
query1 = query1[:args.num_queries]

key2 = torch.randperm(args.num_keys, device=args.device)
query2 = torch.randperm(args.num_queries, device=args.device)
query2 = query2[:args.num_queries]

t_init = t_get = 0
for i in range(num_warmups + num_steps):
torch.cuda.synchronize()
t_start = time.perf_counter()
hash_map = HashMap(key)
hash_map = HashMap(key1)
torch.cuda.synchronize()
if i >= num_warmups:
t_init += time.perf_counter() - t_start

t_start = time.perf_counter()
hash_map.get(query)
out1 = hash_map.get(query1)
torch.cuda.synchronize()
if i >= num_warmups:
t_get += time.perf_counter() - t_start
Expand All @@ -48,13 +55,13 @@
t_start = time.perf_counter()
hash_map = torch.full((args.num_keys, ), fill_value=-1,
dtype=torch.long, device=args.device)
hash_map[key] = torch.arange(args.num_keys)
hash_map[key2] = torch.arange(args.num_keys)
torch.cuda.synchronize()
if i >= num_warmups:
t_init += time.perf_counter() - t_start

t_start = time.perf_counter()
hash_map[query]
out2 = hash_map[query2]
torch.cuda.synchronize()
if i >= num_warmups:
t_get += time.perf_counter() - t_start
Expand All @@ -63,20 +70,22 @@
print(f' Memory Get: {t_get / num_steps:.4f}s')
print('=====================')

if key.is_cpu:
if key1.is_cpu:
t_init = t_get = 0
for i in range(num_warmups + num_steps):
t_start = time.perf_counter()
hash_map = pd.CategoricalDtype(categories=key.numpy(),
hash_map = pd.CategoricalDtype(categories=key1.numpy(),
ordered=True)
if i >= num_warmups:
t_init += time.perf_counter() - t_start

t_start = time.perf_counter()
ser = pd.Series(query.numpy(), dtype=hash_map)
ser.cat.codes.to_numpy()
ser = pd.Series(query1.numpy(), dtype=hash_map)
out3 = ser.cat.codes.to_numpy()
if i >= num_warmups:
t_get += time.perf_counter() - t_start

print(f' Pandas Init: {t_init / num_steps:.4f}s')
print(f' Pandas Get: {t_get / num_steps:.4f}s')

assert out1.equal(torch.tensor(out3))
59 changes: 31 additions & 28 deletions pyg_lib/csrc/classes/cpu/hash_map.cpp
Original file line number Diff line number Diff line change
@@ -1,32 +1,36 @@
#include "hash_map.h"

#include <ATen/Parallel.h>
#include <torch/library.h>

namespace pyg {
namespace classes {

CPUHashMap::CPUHashMap(const at::Tensor& key) {
template <typename KeyType>
CPUHashMap<KeyType>::CPUHashMap(const at::Tensor& key) {
at::TensorArg key_arg{key, "key", 0};
at::CheckedFrom c{"HashMap.init"};
at::checkDeviceType(c, key, at::DeviceType::CPU);
at::checkDim(c, key_arg, 1);
at::checkContiguous(c, key_arg);

// clang-format off
AT_DISPATCH_ALL_TYPES_AND(at::ScalarType::Bool,
key.scalar_type(),
"cpu_hash_map_init",
[&] {
const auto key_data = key.data_ptr<scalar_t>();
for (int64_t i = 0; i < key.numel(); ++i) {
auto [iterator, inserted] = map_.insert({key_data[i], i});
TORCH_CHECK(inserted, "Found duplicated key.");
}
});
// clang-format on
map_.reserve(key.numel());

const auto num_threads = at::get_num_threads();
const auto grain_size = std::max(
(key.numel() + num_threads - 1) / num_threads, at::internal::GRAIN_SIZE);
const auto key_data = key.data_ptr<KeyType>();

at::parallel_for(0, key.numel(), grain_size, [&](int64_t beg, int64_t end) {
for (int64_t i = beg; i < end; ++i) {
auto [iterator, inserted] = map_.insert({key_data[i], i});
TORCH_CHECK(inserted, "Found duplicated key.");
}
});
};

at::Tensor CPUHashMap::get(const at::Tensor& query) {
template <typename KeyType>
at::Tensor CPUHashMap<KeyType>::get(const at::Tensor& query) {
at::TensorArg query_arg{query, "query", 0};
at::CheckedFrom c{"HashMap.get"};
at::checkDeviceType(c, query, at::DeviceType::CPU);
Expand All @@ -37,27 +41,26 @@ at::Tensor CPUHashMap::get(const at::Tensor& query) {
const auto out = at::empty({query.numel()}, options);
auto out_data = out.data_ptr<int64_t>();

// clang-format off
AT_DISPATCH_ALL_TYPES_AND(at::ScalarType::Bool,
query.scalar_type(),
"cpu_hash_map_get",
[&] {
const auto query_data = query.data_ptr<scalar_t>();
const auto num_threads = at::get_num_threads();
const auto grain_size =
std::max((query.numel() + num_threads - 1) / num_threads,
at::internal::GRAIN_SIZE);
const auto query_data = query.data_ptr<int64_t>();

for (size_t i = 0; i < query.numel(); ++i) {
auto it = map_.find(query_data[i]);
out_data[i] = (it != map_.end()) ? it->second : -1;
}
});
// clang-format on
at::parallel_for(0, query.numel(), grain_size, [&](int64_t beg, int64_t end) {
for (int64_t i = beg; i < end; ++i) {
auto it = map_.find(query_data[i]);
out_data[i] = (it != map_.end()) ? it->second : -1;
}
});

return out;
}

TORCH_LIBRARY(pyg, m) {
m.class_<CPUHashMap>("CPUHashMap")
m.class_<CPUHashMap<int64_t>>("CPUHashMap")
.def(torch::init<at::Tensor&>())
.def("get", &CPUHashMap::get);
.def("get", &CPUHashMap<int64_t>::get);
}

} // namespace classes
Expand Down
16 changes: 12 additions & 4 deletions pyg_lib/csrc/classes/cpu/hash_map.h
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
#pragma once

#include <ATen/ATen.h>
#include <variant>
#include "parallel_hashmap/phmap.h"

namespace pyg {
namespace classes {

template <typename KeyType>
struct CPUHashMap : torch::CustomClassHolder {
public:
using KeyType = std::
variant<bool, uint8_t, int8_t, int16_t, int32_t, int64_t, float, double>;
using ValueType = int64_t;

CPUHashMap(const at::Tensor& key);
at::Tensor get(const at::Tensor& query);

private:
std::unordered_map<KeyType, int64_t> map_;
phmap::parallel_flat_hash_map<
KeyType,
ValueType,
phmap::priv::hash_default_hash<KeyType>,
phmap::priv::hash_default_eq<KeyType>,
phmap::priv::Allocator<std::pair<const KeyType, ValueType>>,
8,
phmap::NullMutex>
map_;
};

} // namespace classes
Expand Down
2 changes: 1 addition & 1 deletion test/csrc/classes/test_hash_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ TEST(CPUHashMapTest, BasicAssertions) {
auto options = at::TensorOptions().dtype(at::kLong);
auto key = at::tensor({0, 10, 30, 20}, options);

auto map = pyg::classes::CPUHashMap(key);
auto map = pyg::classes::CPUHashMap<int64_t>(key);

auto query = at::tensor({30, 10, 20, 40}, options);
auto expected = at::tensor({2, 1, 3, -1}, options);
Expand Down

0 comments on commit adf458d

Please sign in to comment.