Skip to content

Commit

Permalink
Merge task table and task log
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanie-wang committed Nov 10, 2016
1 parent 194bdb1 commit 7a88268
Show file tree
Hide file tree
Showing 27 changed files with 1,521 additions and 779 deletions.
12 changes: 6 additions & 6 deletions src/common/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ BUILD = build

all: hiredis $(BUILD)/libcommon.a

$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o state/table.o state/object_table.o state/task_log.o thirdparty/ae/ae.o thirdparty/sha256.o
$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o state/table.o state/object_table.o state/task_table.o thirdparty/ae/ae.o thirdparty/sha256.o
ar rcs $@ $^

$(BUILD)/common_tests: test/common_tests.c $(BUILD)/libcommon.a
Expand All @@ -16,8 +16,8 @@ $(BUILD)/db_tests: hiredis test/db_tests.c $(BUILD)/libcommon.a
$(BUILD)/object_table_tests: hiredis test/object_table_tests.c $(BUILD)/libcommon.a
$(CC) -o $@ test/object_table_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS)

$(BUILD)/task_log_tests: hiredis test/task_log_tests.c $(BUILD)/libcommon.a
$(CC) -o $@ test/task_log_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS)
$(BUILD)/task_table_tests: hiredis test/task_table_tests.c $(BUILD)/libcommon.a
$(CC) -o $@ test/task_table_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS)

$(BUILD)/io_tests: test/io_tests.c $(BUILD)/libcommon.a
$(CC) -o $@ $^ $(CFLAGS)
Expand All @@ -38,15 +38,15 @@ redis:
hiredis:
git submodule update --init --recursive -- "thirdparty/hiredis" ; cd thirdparty/hiredis ; make

test: hiredis redis $(BUILD)/common_tests $(BUILD)/task_log_tests $(BUILD)/object_table_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE
test: hiredis redis $(BUILD)/common_tests $(BUILD)/task_table_tests $(BUILD)/object_table_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE
./thirdparty/redis-3.2.3/src/redis-server &
sleep 1s
./build/common_tests
./build/db_tests
./build/io_tests
./build/task_tests
./build/redis_tests
./build/task_log_tests
./build/task_table_tests
./build/object_table_tests

valgrind: test
Expand All @@ -55,7 +55,7 @@ valgrind: test
valgrind --leak-check=full --error-exitcode=1 ./build/io_tests
valgrind --leak-check=full --error-exitcode=1 ./build/task_tests
valgrind --leak-check=full --error-exitcode=1 ./build/redis_tests
valgrind --leak-check=full --error-exitcode=1 ./build/task_log_tests
valgrind --leak-check=full --error-exitcode=1 ./build/task_table_tests
valgrind --leak-check=full --error-exitcode=1 ./build/object_table_tests

FORCE:
6 changes: 5 additions & 1 deletion src/common/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ unique_id globally_unique_id(void) {
}

bool object_ids_equal(object_id first_id, object_id second_id) {
return memcmp(&first_id, &second_id, sizeof(object_id)) == 0 ? true : false;
return UNIQUE_ID_EQ(first_id, second_id);
}

bool object_id_is_nil(object_id id) {
return object_ids_equal(id, NIL_OBJECT_ID);
}

char *sha1_to_hex(const unsigned char *sha1, char *buffer) {
Expand Down
14 changes: 14 additions & 0 deletions src/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@

#define UNIQUE_ID_SIZE 20

#define UNIQUE_ID_EQ(id1, id2) (memcmp((id1).id, (id2).id, UNIQUE_ID_SIZE) == 0)

#define IS_NIL_ID(id) UNIQUE_ID_EQ(id, NIL_ID)

typedef struct { unsigned char id[UNIQUE_ID_SIZE]; } unique_id;

extern const UT_icd object_id_icd;
Expand All @@ -70,6 +74,8 @@ unique_id globally_unique_id(void);
* UNIQUE_ID_SIZE + 1 */
char *sha1_to_hex(const unsigned char *sha1, char *buffer);

#define NIL_OBJECT_ID NIL_ID

typedef unique_id object_id;

/**
Expand All @@ -81,4 +87,12 @@ typedef unique_id object_id;
*/
bool object_ids_equal(object_id first_id, object_id second_id);

/**
* Compare a object ID to the nil ID.
*
* @param id The object ID to compare to nil.
* @return True if the object ID is equal to nil.
*/
bool object_id_is_nil(object_id id);

#endif
13 changes: 6 additions & 7 deletions src/common/doc/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
A *task specification* contains all information that is needed for computing
the results of a task:

- The ID of the task
- The function ID of the function that executes the task
- The arguments (either object IDs for pass by reference
or values for pass by value)
Expand All @@ -11,23 +12,21 @@ or values for pass by value)
From these, a task ID can be computed which is also stored in the task
specification.

A *task instance* represents one execution of a task specification.
A *task* represents the execution of a task specification.
It consists of:

- A scheduling state (WAITING, SCHEDULED, RUNNING, DONE)
- The target node where the task is scheduled or executed
- A unique task instance ID that identifies the particular execution
of the task.
- The task specification

The task data structures are defined in `common/task.h`.

The *task log* is a mapping from the task instance ID to a sequence of
updates to the status of the task instance. It is updated by various parts
of the system:
The *task table* is a mapping from the task ID to the *task* information. It is
updated by various parts of the system:

1. The local scheduler writes it with status WAITING when submits a task to the global scheduler
2. The global scheduler appends an update WAITING -> SCHEDULED together with the node ID when assigning the task to a local scheduler
3. The local scheduler appends an update SCHEDULED -> RUNNING when it assigns a task to a worker
4. The local scheduler appends an update RUNNING -> DONE when the task finishes execution

The task log is defined in `common/state/task_log.h`.
The task table is defined in `common/state/task_table.h`.
2 changes: 1 addition & 1 deletion src/common/lib/python/common_extension.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ static PyObject *PyTask_function_id(PyObject *self) {
}

static PyObject *PyTask_task_id(PyObject *self) {
task_id task_id = task_task_id(((PyTask *) self)->spec);
task_id task_id = task_spec_id(((PyTask *) self)->spec);
return PyObjectID_make(task_id);
}

Expand Down
21 changes: 21 additions & 0 deletions src/common/state/object_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,24 @@ void object_table_subscribe(
init_table_callback(db_handle, object_id, sub_data, retry, done_callback,
redis_object_table_subscribe, user_context);
}

void result_table_add(db_handle *db_handle,
object_id object_id,
task_id task_id_arg,
retry_info *retry,
result_table_done_callback done_callback,
void *user_context) {
task_id *task_id_copy = malloc(UNIQUE_ID_SIZE);
memcpy(task_id_copy, task_id_arg.id, UNIQUE_ID_SIZE);
init_table_callback(db_handle, object_id, task_id_copy, retry, done_callback,
redis_result_table_add, user_context);
}

void result_table_lookup(db_handle *db_handle,
object_id object_id,
retry_info *retry,
result_table_lookup_callback done_callback,
void *user_context) {
init_table_callback(db_handle, object_id, NULL, retry, done_callback,
redis_result_table_lookup, user_context);
}
52 changes: 52 additions & 0 deletions src/common/state/object_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "common.h"
#include "table.h"
#include "db.h"
#include "task.h"

/*
* ==== Lookup call and callback ====
Expand Down Expand Up @@ -123,4 +124,55 @@ typedef struct {
void *subscribe_context;
} object_table_subscribe_data;

/*
* ==== Result table ====
*/

/**
* Callback called when the add/remove operation for a result table entry
* completes. */
typedef void (*result_table_done_callback)(object_id object_id,
void *user_context);

/**
* Add information about a new object to the object table. This
* is immutable information like the ID of the task that
* created the object.
*
* @param db_handle Handle to object_table database.
* @param object_id ID of the object to add.
* @param task_id ID of the task that creates this object.
* @param retry Information about retrying the request to the database.
* @param done_callback Function to be called when database returns result.
* @param user_context Context passed by the caller.
* @return Void.
*/
void result_table_add(db_handle *db_handle,
object_id object_id,
task_id task_id,
retry_info *retry,
result_table_done_callback done_callback,
void *user_context);

/** Callback called when the result table lookup completes. */
typedef void (*result_table_lookup_callback)(object_id object_id,
task *task,
void *user_context);

/**
* Lookup the task that created an object in the result table.
*
* @param db_handle Handle to object_table database.
* @param object_id ID of the object to lookup.
* @param retry Information about retrying the request to the database.
* @param done_callback Function to be called when database returns result.
* @param user_context Context passed by the caller.
* @return Void.
*/
void result_table_lookup(db_handle *db_handle,
object_id object_id,
retry_info *retry,
result_table_lookup_callback done_callback,
void *user_context);

#endif /* OBJECT_TABLE_H */
Loading

0 comments on commit 7a88268

Please sign in to comment.