Skip to content

Commit

Permalink
corner case: when nth is 1, n_multiplier should be 1
Browse files Browse the repository at this point in the history
  • Loading branch information
mqy committed Jun 28, 2023
1 parent 4afb12f commit 76e5e27
Showing 1 changed file with 173 additions and 24 deletions.
197 changes: 173 additions & 24 deletions examples/task-allocator/task-allocator.c
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// https://github.com/ggerganov/ggml/issues/291
// https://github.com/ggerganov/llama.cpp/pull/1507

#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <stdbool.h>
#include <stdint.h>

#if defined(_MSC_VER) || defined(__MINGW32__)
#include <malloc.h> // using malloc.h with MSC/MINGW
Expand Down Expand Up @@ -132,30 +132,36 @@ static void task_allocator_reset(struct task_allocator *a) {
atomic_store(&a->global_counter, 0);
}

// NOTE: when nth == 1, n_multiplier is actually useless.
static void task_allocator_init(struct task_allocator *a, int nth,
int n_multiplier) {
GGML_ASSERT(nth > 0);
GGML_ASSERT(nth <= MAX_THREADS);
GGML_ASSERT(n_multiplier > 0);

a->nth = nth;
a->n_multiplier = n_multiplier;
a->n_multiplier = nth == 1 ? 1 : n_multiplier;
task_allocator_reset(a);
}

// ith: worker id (start from 0).
// chunk_idx and n_chunks will be updated.
// chunk_idx is set as -1 when nothing to do.
static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx,
int *n_chunks) {
GGML_ASSERT(a->nth > 0);
GGML_ASSERT(a->n_multiplier > 0);
GGML_ASSERT(ith >= 0 && ith < a->nth);

int M = a->n_multiplier;
int nth = a->nth;
int total_chunks = M * nth;

*chunk_idx = -1;
*n_chunks = 0;
*n_chunks = total_chunks;

while (atomic_fetch_add(&a->lock, 1) != 0) { // lock
atomic_fetch_sub(&a->lock, 1);
}

int M = a->n_multiplier;
int nth = a->nth;
int total_chunks = M * nth;

// all assigned?
if (atomic_load(&a->global_counter) == total_chunks) {
GGML_PRINT_DEBUG("[#_%d] %s(): nothing to do.\n", ith, __func__);
Expand All @@ -177,7 +183,7 @@ static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx,
atomic_fetch_add(&a->thread_queue_heads[ith], 1);
atomic_fetch_add(&a->global_counter, 1);

GGML_PRINT_DEBUG("[#_%d] %s(): take the %3d-th trunk of its own.\n",
GGML_PRINT_DEBUG("[#_%d] %s(): take the %3d-th chunk of its own.\n",
ith, __func__, head + 1);

*chunk_idx = idx;
Expand All @@ -188,6 +194,7 @@ static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx,
}

// steal from others.
// TODO: optimize: steal from the slowest one.
for (int i = 0; i < nth; ++i) {
if (i == ith) {
continue;
Expand All @@ -203,7 +210,7 @@ static void allocate_chunk(struct task_allocator *a, int ith, int *chunk_idx,
atomic_fetch_sub(&a->thread_queue_tails[i], 1);
atomic_fetch_add(&a->global_counter, 1);

GGML_PRINT_DEBUG("[#_%d] %s(): steal the %d-th trunk from #_%d\n", ith,
GGML_PRINT_DEBUG("[#_%d] %s(): steal the %d-th chunk from #_%d\n", ith,
__func__, tail, i);

*chunk_idx = idx;
Expand Down Expand Up @@ -261,7 +268,7 @@ void compute_tensor(struct params params, struct ggml_tensor *node) {

while (true) {
allocate_chunk(params.task_allocator, ith, &chunk_idx, &n_chunks);
if (chunk_idx < 0 || n_chunks <= 0) {
if (chunk_idx < 0) {
break;
}

Expand Down Expand Up @@ -339,6 +346,144 @@ static thread_ret_t demo_compute_thread(void *data) {
return 0;
}

static void test_task_allocator_init(void) {
struct task_allocator a;

task_allocator_init(&a, 1, 2);
GGML_ASSERT(a.nth == 1);
GGML_ASSERT(a.n_multiplier == 1); // when nth == 1, force n_multiplier as 1

task_allocator_init(&a, 2, 2);
GGML_ASSERT(a.nth == 2);
GGML_ASSERT(a.n_multiplier == 2); // ok
}

static void task_allocator_unit_test_no_steal(void) {
int chunk_idx; // out
int n_chunks; // out

int n_threads = 2;
int n_multiplier = 2;
const int expected_n_slots = n_threads * n_multiplier;

struct task_allocator a;
task_allocator_init(&a, n_threads, n_multiplier);

struct test_data_t {
int ith; // call by
int chunk_idx; // expected
int n_chunks; // expected
};

struct test_data_t test_data[] = {
//////////////////// clang format /////////////////////////
{
.ith = 0,
.chunk_idx = 0,
},
{
.ith = 1,
.chunk_idx = 2,
},
{
.ith = 0,
.chunk_idx = 1,
},
{
.ith = 1,
.chunk_idx = 3,
},
{
.ith = 0,
.chunk_idx = -1,
},
{
.ith = 1,
.chunk_idx = -1,
}};

int t_len = sizeof(test_data) / sizeof(struct test_data_t);

for (int i = 0; i < t_len; i++) {
allocate_chunk(&a, test_data[i].ith, &chunk_idx, &n_chunks);
if (chunk_idx != test_data[i].chunk_idx) {
fprintf(stderr,
"%s(): chunk_idx mismatch. i: %d, actual: %d, expected: %d\n",
__func__, i, chunk_idx, test_data[i].chunk_idx);
abort();
}
if (n_chunks != expected_n_slots) {
fprintf(stderr,
"%s(): n_chunks mismatch. i: %d, actual: %d, expected: %d\n",
__func__, i, n_chunks, expected_n_slots);
abort();
}
}
}

static void task_allocator_unit_test_steal(void) {
int chunk_idx; // out
int n_chunks; // out

int n_threads = 2;
int n_multiplier = 2;
const int expected_n_slots = n_threads * n_multiplier;

struct task_allocator a;
task_allocator_init(&a, n_threads, n_multiplier);

struct test_data_t {
int ith; // call by
int chunk_idx; // expected
};

struct test_data_t test_data[] = {
//////////////////// clang format /////////////////////////
{
.ith = 0,
.chunk_idx = 0,
},
{
.ith = 0,
.chunk_idx = 1,
},
{
.ith = 1,
.chunk_idx = 2,
},
{
.ith = 0,
.chunk_idx = 4, // steal from tail
},
{
.ith = 0,
.chunk_idx = -1,
},
{
.ith = 1,
.chunk_idx = -1,
}};

int t_len = sizeof(test_data) / sizeof(struct test_data_t);

for (int i = 0; i < t_len; i++) {
allocate_chunk(&a, test_data[i].ith, &chunk_idx, &n_chunks);
if (chunk_idx != test_data[i].chunk_idx) {
fprintf(stderr,
"%s(): chunk_idx mismatch. i: %d, actual: %d, expected: %d\n",
__func__, i, chunk_idx, test_data[i].chunk_idx);
abort();
}
if (n_chunks != expected_n_slots) {
fprintf(stderr,
"%s(): n_chunks mismatch. i: %d, actual: %d, expected: %d\n",
__func__, i, n_chunks, expected_n_slots);
abort();
}
}
}

// Integration test.
static void test_task_allocator(int n_threads, int n_nodes, int n_compute_units,
int n_multiplier) {
fprintf(stderr,
Expand Down Expand Up @@ -386,36 +531,40 @@ static void test_task_allocator(int n_threads, int n_nodes, int n_compute_units,
// B can steal a chunk from A only if T_a > T_b + T_b_per_chunk.
// - Saw this situation: A steal B, B steal C.
// - n_chunks plays a key role, similar to choosing the best n_threads, it's
// difficult choose the ideal n_chunks value. Performance drops with per-chunk
// compute time exceeds the scheduling overhead.
// difficult to choose the ideal n_chunks value. Performance drops when
// per-chunk compute time exceeds the scheduling overhead.
// - Work stealing chunked task allocator can save the response time
// significantly when the majority threads runs fast but a few suddenly or
// constantly slow.
//
int main(void) {
if (false) { // the most simple one: only main thread, one node
test_task_allocator_init();
task_allocator_unit_test_no_steal();
task_allocator_unit_test_steal();

// Integration tests
const int n_compute_units = 64;

if (false) {
int n_threads = 1;
int n_nodes = 1;
int n_multiplier = 1; // trunks per thread.
int n_compute_units = 1;
int n_multiplier = 2; // equivalent to 1

test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier);
}

if (false) {
if (true) {
int n_threads = 2;
int n_nodes = 2;
int n_multiplier = 1; // trunks per thread.
int n_compute_units = 2;
int n_multiplier = 1;

test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier);
}

if (true) {
int n_threads = 4;
int n_threads = 2;
int n_nodes = 2;
int n_multiplier = 8; // trunks per thread.
int n_compute_units = 32;
int n_multiplier = 8;

test_task_allocator(n_threads, n_nodes, n_compute_units, n_multiplier);
}
Expand Down

0 comments on commit 76e5e27

Please sign in to comment.