From aefe961da1c0ec60e53e81596ebfb9b4a0be260b Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Thu, 18 Jul 2024 14:58:03 +0000 Subject: [PATCH 01/12] libfaasm: add support for general s3 API --- func/CMakeLists.txt | 1 + func/s3/CMakeLists.txt | 11 +++++++++++ func/s3/list_buckets.cpp | 11 +++++++++++ libfaasm/faasm/host_interface.h | 7 +++++++ libfaasm/libfaasm.imports | 10 ++++++++++ 5 files changed, 40 insertions(+) create mode 100644 func/s3/CMakeLists.txt create mode 100644 func/s3/list_buckets.cpp diff --git a/func/CMakeLists.txt b/func/CMakeLists.txt index 47f4f86..2a02fd8 100644 --- a/func/CMakeLists.txt +++ b/func/CMakeLists.txt @@ -74,4 +74,5 @@ add_subdirectory(dynlink) add_subdirectory(errors) add_subdirectory(mpi) add_subdirectory(omp) +add_subdirectory(s3) add_subdirectory(threads) diff --git a/func/s3/CMakeLists.txt b/func/s3/CMakeLists.txt new file mode 100644 index 0000000..51f8dd5 --- /dev/null +++ b/func/s3/CMakeLists.txt @@ -0,0 +1,11 @@ +set(FAASM_USER s3) + +function(s3_func exec_name dir_path) + faasm_func(${exec_name} ${dir_path}) + set(ALL_DEMO_FUNCS ${ALL_DEMO_FUNCS} ${exec_name} PARENT_SCOPE) +endfunction(s3_func) + +s3_func(list_buckets list_buckets.cpp) + +# Custom target to group all the demo functions +add_custom_target(s3_all_funcs DEPENDS ${ALL_DEMO_FUNCS}) diff --git a/func/s3/list_buckets.cpp b/func/s3/list_buckets.cpp new file mode 100644 index 0000000..ecf22b2 --- /dev/null +++ b/func/s3/list_buckets.cpp @@ -0,0 +1,11 @@ +#include "faasm/faasm.h" + +// TODO: get the number of expected buckets as input +int main(int argc, char* argv[]) +{ + int numBuckets = __faasm_s3_get_num_buckets(); + + printf("Got %i buckets!", numBuckets); + + return 0; +} diff --git a/libfaasm/faasm/host_interface.h b/libfaasm/faasm/host_interface.h index 00a7573..fdd9958 100644 --- a/libfaasm/faasm/host_interface.h +++ b/libfaasm/faasm/host_interface.h @@ -160,4 +160,11 @@ void __faasm_migrate_point(FaasmMigrateEntryPoint f, int arg); HOST_IFACE_FUNC void __faasm_host_interface_test(int testNum); + +// S3 +HOST_IFACE_FUNC +int __faasm_s3_get_num_buckets(); + +HOST_IFACE_FUNC +void __faasm_s3_list_buckets(char** bucketsBuffer, long* bucketsBufferLens); #endif diff --git a/libfaasm/libfaasm.imports b/libfaasm/libfaasm.imports index 6ca3203..1eaaca7 100644 --- a/libfaasm/libfaasm.imports +++ b/libfaasm/libfaasm.imports @@ -42,5 +42,15 @@ __faasm_sm_critical_local_end # Migration __faasm_migrate_point +# S3 +__faasm_s3_get_num_buckets +__faasm_s3_list_buckets +__faasm_s3_create_bucket +__faasm_s3_delete_bucket +__faasm_s3_get_num_keys +__faasm_s3_list_keys +__faasm_s3_add_key_bytes +__faasm_s3_get_key_bytes + # Test __faasm_host_interface_test From 5f197a94291b9372943f434d325c3a6b6cf31e6a Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Thu, 18 Jul 2024 18:00:00 +0000 Subject: [PATCH 02/12] s3: add function to get number of buckets --- func/s3/list_buckets.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/func/s3/list_buckets.cpp b/func/s3/list_buckets.cpp index ecf22b2..003041b 100644 --- a/func/s3/list_buckets.cpp +++ b/func/s3/list_buckets.cpp @@ -1,4 +1,9 @@ -#include "faasm/faasm.h" +extern "C" +{ +#include "faasm/host_interface.h" +} + +#include // TODO: get the number of expected buckets as input int main(int argc, char* argv[]) From 0206145c694ce0ed0dccb9a9f5addd62869f8cc5 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 24 Jul 2024 11:31:42 +0000 Subject: [PATCH 03/12] s3: add remaining basic functions --- func/s3/CMakeLists.txt | 5 ++++ func/s3/add_key_bytes.cpp | 28 ++++++++++++++++++++ func/s3/get_key_bytes.cpp | 28 ++++++++++++++++++++ func/s3/get_num_buckets.cpp | 20 ++++++++++++++ func/s3/get_num_keys.cpp | 25 ++++++++++++++++++ func/s3/list_buckets.cpp | 30 +++++++++++++++++++-- func/s3/list_keys.cpp | 47 +++++++++++++++++++++++++++++++++ libfaasm/faasm/host_interface.h | 20 ++++++++++++-- 8 files changed, 199 insertions(+), 4 deletions(-) create mode 100644 func/s3/add_key_bytes.cpp create mode 100644 func/s3/get_key_bytes.cpp create mode 100644 func/s3/get_num_buckets.cpp create mode 100644 func/s3/get_num_keys.cpp create mode 100644 func/s3/list_keys.cpp diff --git a/func/s3/CMakeLists.txt b/func/s3/CMakeLists.txt index 51f8dd5..a082f11 100644 --- a/func/s3/CMakeLists.txt +++ b/func/s3/CMakeLists.txt @@ -5,7 +5,12 @@ function(s3_func exec_name dir_path) set(ALL_DEMO_FUNCS ${ALL_DEMO_FUNCS} ${exec_name} PARENT_SCOPE) endfunction(s3_func) +s3_func(get_num_buckets get_num_buckets.cpp) s3_func(list_buckets list_buckets.cpp) +s3_func(get_num_keys get_num_keys.cpp) +s3_func(list_keys list_keys.cpp) +s3_func(add_key_bytes add_key_bytes.cpp) +s3_func(get_key_bytes get_key_bytes.cpp) # Custom target to group all the demo functions add_custom_target(s3_all_funcs DEPENDS ${ALL_DEMO_FUNCS}) diff --git a/func/s3/add_key_bytes.cpp b/func/s3/add_key_bytes.cpp new file mode 100644 index 0000000..43080bc --- /dev/null +++ b/func/s3/add_key_bytes.cpp @@ -0,0 +1,28 @@ +extern "C" +{ +#include "faasm/host_interface.h" +} + +#include +#include + +int main(int argc, char* argv[]) +{ + // Get bucket and key from command line + if (argc != 3) { + printf("error: must invoke function with two arguments: bucketName keyName\n"); + return 1; + } + + char* bucketName = argv[1]; + char* keyName = argv[2]; + + // Get the bytes to add as input + int inputSize = faasmGetInputSize(); + uint8_t keyBytes[inputSize]; + faasmGetInput(keyBytes, inputSize); + + int ret = __faasm_s3_add_key_bytes(bucketName, keyName, (void*) keyBytes, inputSize); + + return ret; +} diff --git a/func/s3/get_key_bytes.cpp b/func/s3/get_key_bytes.cpp new file mode 100644 index 0000000..b7dfa44 --- /dev/null +++ b/func/s3/get_key_bytes.cpp @@ -0,0 +1,28 @@ +extern "C" +{ +#include "faasm/host_interface.h" +} + +#include +#include + +int main(int argc, char* argv[]) +{ + // Get bucket and key from command line + if (argc != 3) { + printf("error: must invoke function with two arguments: bucketName keyName\n"); + return 1; + } + + char* bucketName = argv[1]; + char* keyName = argv[2]; + + uint8_t* keyBytes; + int keyBytesLen; + + int ret = __faasm_s3_get_key_bytes(bucketName, keyName, &keyBytes, &keyBytesLen); + printf("Got %s/%s: %s\n", bucketName, keyName, (char*) keyBytes); + faasmSetOutput((char*) keyBytes, keyBytesLen); + + return ret; +} diff --git a/func/s3/get_num_buckets.cpp b/func/s3/get_num_buckets.cpp new file mode 100644 index 0000000..b092767 --- /dev/null +++ b/func/s3/get_num_buckets.cpp @@ -0,0 +1,20 @@ +extern "C" +{ +#include "faasm/host_interface.h" +} + +#include +#include +#include + +int main(int argc, char* argv[]) +{ + int numBuckets = __faasm_s3_get_num_buckets(); + + printf("Got %i buckets!\n", numBuckets); + + std::string numBucketsStr = std::to_string(numBuckets); + faasmSetOutput(numBucketsStr.c_str(), numBucketsStr.size()); + + return 0; +} diff --git a/func/s3/get_num_keys.cpp b/func/s3/get_num_keys.cpp new file mode 100644 index 0000000..d16780c --- /dev/null +++ b/func/s3/get_num_keys.cpp @@ -0,0 +1,25 @@ +extern "C" +{ +#include "faasm/host_interface.h" +} + +#include +#include +#include + +int main(int argc, char* argv[]) +{ + // Get the bucket name as an input + int inputSize = faasmGetInputSize(); + char bucketName[inputSize]; + faasmGetInput((uint8_t*) bucketName, inputSize); + + int numKeys = __faasm_s3_get_num_keys(bucketName); + + printf("Bucket %s has %i keys!\n", bucketName, numKeys); + + std::string numKeysStr = std::to_string(numKeys); + faasmSetOutput(numKeysStr.c_str(), numKeysStr.size()); + + return 0; +} diff --git a/func/s3/list_buckets.cpp b/func/s3/list_buckets.cpp index 003041b..33d56e8 100644 --- a/func/s3/list_buckets.cpp +++ b/func/s3/list_buckets.cpp @@ -3,14 +3,40 @@ extern "C" #include "faasm/host_interface.h" } +#include #include +#include -// TODO: get the number of expected buckets as input int main(int argc, char* argv[]) { int numBuckets = __faasm_s3_get_num_buckets(); - printf("Got %i buckets!", numBuckets); + char* bucketsBuffer[numBuckets]; + int bucketsBufferLens[numBuckets]; + __faasm_s3_list_buckets(bucketsBuffer, bucketsBufferLens); + + int totalSize = 0; + for (int i = 0; i < numBuckets; i++) { + totalSize += bucketsBufferLens[i]; + } + totalSize += numBuckets - 1; + + // Prepare the output: instead of a newline use a '|' character + char outBuffer[totalSize]; + + printf("Got %i buckets!\n", numBuckets); + int offset = 0; + for (int i = 0; i < numBuckets; i++) { + strncpy(outBuffer + offset, bucketsBuffer[i], bucketsBufferLens[i]); + offset += bucketsBufferLens[i]; + if (i < numBuckets - 1) { + outBuffer[offset] = (char) '|'; + offset += 1; + } + printf("Bucket %i: %s\n", i, bucketsBuffer[i]); + } + + faasmSetOutput(outBuffer, totalSize); return 0; } diff --git a/func/s3/list_keys.cpp b/func/s3/list_keys.cpp new file mode 100644 index 0000000..dac24a5 --- /dev/null +++ b/func/s3/list_keys.cpp @@ -0,0 +1,47 @@ +extern "C" +{ +#include "faasm/host_interface.h" +} + +#include +#include +#include + +int main(int argc, char* argv[]) +{ + // Get the bucket name as an input + int inputSize = faasmGetInputSize(); + char bucketName[inputSize]; + faasmGetInput((uint8_t*) bucketName, inputSize); + + int numKeys = __faasm_s3_get_num_keys(bucketName); + + char* keysBuffer[numKeys]; + int keysBufferLens[numKeys]; + __faasm_s3_list_keys(bucketName, keysBuffer, keysBufferLens); + + int totalSize = 0; + for (int i = 0; i < numKeys; i++) { + totalSize += keysBufferLens[i]; + } + totalSize += numKeys - 1; + + // Prepare the output: instead of a newline use a '|' character + char outBuffer[totalSize]; + + printf("Bucket %s has %i keys!\n", bucketName, numKeys); + int offset = 0; + for (int i = 0; i < numKeys; i++) { + strncpy(outBuffer + offset, keysBuffer[i], keysBufferLens[i]); + offset += keysBufferLens[i]; + if (i < numKeys - 1) { + outBuffer[offset] = (char) '|'; + offset += 1; + } + printf("Key %i: %s\n", i, keysBuffer[i]); + } + + faasmSetOutput(outBuffer, totalSize); + + return 0; +} diff --git a/libfaasm/faasm/host_interface.h b/libfaasm/faasm/host_interface.h index fdd9958..7034341 100644 --- a/libfaasm/faasm/host_interface.h +++ b/libfaasm/faasm/host_interface.h @@ -161,10 +161,26 @@ void __faasm_migrate_point(FaasmMigrateEntryPoint f, int arg); HOST_IFACE_FUNC void __faasm_host_interface_test(int testNum); -// S3 +// ----- S3 ----- + HOST_IFACE_FUNC int __faasm_s3_get_num_buckets(); +// Note that bucketsBuffer is, in reality, a char** populated by the host +HOST_IFACE_FUNC +void __faasm_s3_list_buckets(void* bucketsBuffer, int* bucketsBufferLens); + +HOST_IFACE_FUNC +int __faasm_s3_get_num_keys(const char* bucketName); + +// Note that keysBuffer is, in reality, a char** populated by the host +HOST_IFACE_FUNC +void __faasm_s3_list_keys(const char* bucketName, void* keysBuffer, int* keysBufferLens); + +HOST_IFACE_FUNC +int __faasm_s3_add_key_bytes(const char* bucketName, const char* keyName, void* keyBuffer, int keyBufferLen); + +// Note that keyBuffer is, in reality, a uint8_t** populated by the host HOST_IFACE_FUNC -void __faasm_s3_list_buckets(char** bucketsBuffer, long* bucketsBufferLens); +int __faasm_s3_get_key_bytes(const char* bucketName, const char* keyName, void* keyBuffer, int* keyBufferLen); #endif From 3906d8abe42e271b8c7064c45cdb132ea3330847 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 24 Jul 2024 14:59:26 +0000 Subject: [PATCH 04/12] nits: run clang-format --- func/s3/add_key_bytes.cpp | 6 ++++-- func/s3/get_key_bytes.cpp | 10 ++++++---- func/s3/get_num_keys.cpp | 2 +- func/s3/list_buckets.cpp | 2 +- func/s3/list_keys.cpp | 4 ++-- libfaasm/faasm/host_interface.h | 14 +++++++++++--- 6 files changed, 25 insertions(+), 13 deletions(-) diff --git a/func/s3/add_key_bytes.cpp b/func/s3/add_key_bytes.cpp index 43080bc..67708c7 100644 --- a/func/s3/add_key_bytes.cpp +++ b/func/s3/add_key_bytes.cpp @@ -10,7 +10,8 @@ int main(int argc, char* argv[]) { // Get bucket and key from command line if (argc != 3) { - printf("error: must invoke function with two arguments: bucketName keyName\n"); + printf("error: must invoke function with two arguments: bucketName " + "keyName\n"); return 1; } @@ -22,7 +23,8 @@ int main(int argc, char* argv[]) uint8_t keyBytes[inputSize]; faasmGetInput(keyBytes, inputSize); - int ret = __faasm_s3_add_key_bytes(bucketName, keyName, (void*) keyBytes, inputSize); + int ret = + __faasm_s3_add_key_bytes(bucketName, keyName, (void*)keyBytes, inputSize); return ret; } diff --git a/func/s3/get_key_bytes.cpp b/func/s3/get_key_bytes.cpp index b7dfa44..29c0ecf 100644 --- a/func/s3/get_key_bytes.cpp +++ b/func/s3/get_key_bytes.cpp @@ -10,7 +10,8 @@ int main(int argc, char* argv[]) { // Get bucket and key from command line if (argc != 3) { - printf("error: must invoke function with two arguments: bucketName keyName\n"); + printf("error: must invoke function with two arguments: bucketName " + "keyName\n"); return 1; } @@ -20,9 +21,10 @@ int main(int argc, char* argv[]) uint8_t* keyBytes; int keyBytesLen; - int ret = __faasm_s3_get_key_bytes(bucketName, keyName, &keyBytes, &keyBytesLen); - printf("Got %s/%s: %s\n", bucketName, keyName, (char*) keyBytes); - faasmSetOutput((char*) keyBytes, keyBytesLen); + int ret = + __faasm_s3_get_key_bytes(bucketName, keyName, &keyBytes, &keyBytesLen); + printf("Got %s/%s: %s\n", bucketName, keyName, (char*)keyBytes); + faasmSetOutput((char*)keyBytes, keyBytesLen); return ret; } diff --git a/func/s3/get_num_keys.cpp b/func/s3/get_num_keys.cpp index d16780c..8f9ed92 100644 --- a/func/s3/get_num_keys.cpp +++ b/func/s3/get_num_keys.cpp @@ -12,7 +12,7 @@ int main(int argc, char* argv[]) // Get the bucket name as an input int inputSize = faasmGetInputSize(); char bucketName[inputSize]; - faasmGetInput((uint8_t*) bucketName, inputSize); + faasmGetInput((uint8_t*)bucketName, inputSize); int numKeys = __faasm_s3_get_num_keys(bucketName); diff --git a/func/s3/list_buckets.cpp b/func/s3/list_buckets.cpp index 33d56e8..46a1787 100644 --- a/func/s3/list_buckets.cpp +++ b/func/s3/list_buckets.cpp @@ -30,7 +30,7 @@ int main(int argc, char* argv[]) strncpy(outBuffer + offset, bucketsBuffer[i], bucketsBufferLens[i]); offset += bucketsBufferLens[i]; if (i < numBuckets - 1) { - outBuffer[offset] = (char) '|'; + outBuffer[offset] = (char)'|'; offset += 1; } printf("Bucket %i: %s\n", i, bucketsBuffer[i]); diff --git a/func/s3/list_keys.cpp b/func/s3/list_keys.cpp index dac24a5..ef9b224 100644 --- a/func/s3/list_keys.cpp +++ b/func/s3/list_keys.cpp @@ -12,7 +12,7 @@ int main(int argc, char* argv[]) // Get the bucket name as an input int inputSize = faasmGetInputSize(); char bucketName[inputSize]; - faasmGetInput((uint8_t*) bucketName, inputSize); + faasmGetInput((uint8_t*)bucketName, inputSize); int numKeys = __faasm_s3_get_num_keys(bucketName); @@ -35,7 +35,7 @@ int main(int argc, char* argv[]) strncpy(outBuffer + offset, keysBuffer[i], keysBufferLens[i]); offset += keysBufferLens[i]; if (i < numKeys - 1) { - outBuffer[offset] = (char) '|'; + outBuffer[offset] = (char)'|'; offset += 1; } printf("Key %i: %s\n", i, keysBuffer[i]); diff --git a/libfaasm/faasm/host_interface.h b/libfaasm/faasm/host_interface.h index 7034341..9578245 100644 --- a/libfaasm/faasm/host_interface.h +++ b/libfaasm/faasm/host_interface.h @@ -175,12 +175,20 @@ int __faasm_s3_get_num_keys(const char* bucketName); // Note that keysBuffer is, in reality, a char** populated by the host HOST_IFACE_FUNC -void __faasm_s3_list_keys(const char* bucketName, void* keysBuffer, int* keysBufferLens); +void __faasm_s3_list_keys(const char* bucketName, + void* keysBuffer, + int* keysBufferLens); HOST_IFACE_FUNC -int __faasm_s3_add_key_bytes(const char* bucketName, const char* keyName, void* keyBuffer, int keyBufferLen); +int __faasm_s3_add_key_bytes(const char* bucketName, + const char* keyName, + void* keyBuffer, + int keyBufferLen); // Note that keyBuffer is, in reality, a uint8_t** populated by the host HOST_IFACE_FUNC -int __faasm_s3_get_key_bytes(const char* bucketName, const char* keyName, void* keyBuffer, int* keyBufferLen); +int __faasm_s3_get_key_bytes(const char* bucketName, + const char* keyName, + void* keyBuffer, + int* keyBufferLen); #endif From 1d7661aec50cb847e09c1147b0303140fdb1f845 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 24 Jul 2024 14:59:51 +0000 Subject: [PATCH 05/12] gh: bump minor code version --- .env | 4 ++-- .github/workflows/tests.yml | 2 +- VERSION | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.env b/.env index 52b3b94..a926869 100644 --- a/.env +++ b/.env @@ -1,3 +1,3 @@ -SYSROOT_VERSION=0.5.0 -SYSROOT_CLI_IMAGE=faasm.azurecr.io/cpp-sysroot:0.5.0 +SYSROOT_VERSION=0.6.0 +SYSROOT_CLI_IMAGE=faasm.azurecr.io/cpp-sysroot:0.6.0 COMPOSE_PROJECT_NAME=cpp-dev diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index dc0ae58..8c60ba0 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -17,7 +17,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/cpp-sysroot:0.5.0 + image: faasm.azurecr.io/cpp-sysroot:0.6.0 steps: # --- Update code --- - name: "Checkout code" diff --git a/VERSION b/VERSION index 8f0916f..a918a2a 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.5.0 +0.6.0 From 9379ae997f7de023a495e53981a6852ce921e1df Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 24 Jul 2024 15:02:29 +0000 Subject: [PATCH 06/12] libfaasm: remove un-implemented symbols --- libfaasm/libfaasm.imports | 2 -- 1 file changed, 2 deletions(-) diff --git a/libfaasm/libfaasm.imports b/libfaasm/libfaasm.imports index 1eaaca7..76ff179 100644 --- a/libfaasm/libfaasm.imports +++ b/libfaasm/libfaasm.imports @@ -45,8 +45,6 @@ __faasm_migrate_point # S3 __faasm_s3_get_num_buckets __faasm_s3_list_buckets -__faasm_s3_create_bucket -__faasm_s3_delete_bucket __faasm_s3_get_num_keys __faasm_s3_list_keys __faasm_s3_add_key_bytes From d6709c58323ad049f67d4a58c2e590113b4db400 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 24 Jul 2024 17:23:53 +0000 Subject: [PATCH 07/12] s3: temporarily commit the workflow functions --- func/s3/CMakeLists.txt | 3 ++ func/s3/wc_driver.cpp | 92 +++++++++++++++++++++++++++++++++ func/s3/wc_mapper.cpp | 115 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 210 insertions(+) create mode 100644 func/s3/wc_driver.cpp create mode 100644 func/s3/wc_mapper.cpp diff --git a/func/s3/CMakeLists.txt b/func/s3/CMakeLists.txt index a082f11..a79c39a 100644 --- a/func/s3/CMakeLists.txt +++ b/func/s3/CMakeLists.txt @@ -11,6 +11,9 @@ s3_func(get_num_keys get_num_keys.cpp) s3_func(list_keys list_keys.cpp) s3_func(add_key_bytes add_key_bytes.cpp) s3_func(get_key_bytes get_key_bytes.cpp) +# TODO: move elsewhere when it works +s3_func(wc_driver wc_driver.cpp) +s3_func(wc_mapper wc_mapper.cpp) # Custom target to group all the demo functions add_custom_target(s3_all_funcs DEPENDS ${ALL_DEMO_FUNCS}) diff --git a/func/s3/wc_driver.cpp b/func/s3/wc_driver.cpp new file mode 100644 index 0000000..4bf0e1b --- /dev/null +++ b/func/s3/wc_driver.cpp @@ -0,0 +1,92 @@ +#ifdef __faasm +#include +#endif + +#include +#include +#include + +std::vector splitByDelimiter(std::string stringCopy, const std::string& delimiter) +{ + std::vector splitString; + + size_t pos = 0; + std::string token; + while ((pos = stringCopy.find(delimiter)) != std::string::npos) { + splitString.push_back(stringCopy.substr(0, pos)); + stringCopy.erase(0, pos + delimiter.length()); + } + splitString.push_back(stringCopy); + + return splitString; +} + +/* Driver Function - MapReduce workflow + * + * This function acts as a "coordinator" for the MapReduce workflow. It + * reduces the amount of workflow-specific logic that we need to implement in + * Faasm. + * + * As an input, this workflow gets the S3 path read data from. + */ +int main(int argc, char** argv) +{ + if (argc != 2) { + printf("WC workflow must be invoked with one parameter: \n"); + return 1; + } + std::string s3prefix = argv[1]; + + // 1. Invoke one instance of the splitter function with an S3 path as an + // input. The splitter function will, in turn, traverse the S3 directory + // and invoke one mapper function per file in the directory. + // + // The return value of the splitter function is a list of message ids for + // the mapper function +#ifdef __faasm + // Call splitter + int splitterId = faasmChainNamed("wc_splitter", (uint8_t*) s3prefix.c_str(), s3prefix.size()); +#endif + + // Wait for splitter to finish invoking all mapper functions + // TODO: consider making this dynamic by calling malloc from Faasm + size_t maxOutput = 2048; + char splitterOutput[maxOutput]; + +#ifdef __faasm + int result = faasmAwaitCallOutput(splitterId, splitterOutput, maxOutput); + if (result != 0) { + printf("error: splitter execution failed with rc %i\n", result); + return 1; + } +#endif + + // Get all message ids from output + std::vector mapperIds = splitByDelimiter(splitterOutput, ","); + + // 2. Wait for all mapper functions to have finished + for (auto mapperIdStr : mapperIds) { + int mapperId = std::stoi(mapperIdStr); +#ifdef __faasm + result = faasmAwaitCall(mapperId); + if (result != 0) { + printf("error: mapper execution (id: %i) failed with rc %i\n", mapperId, result); + return 1; + } +#endif + } + + // 3. Invoke one reducer function to aggreagate all results + std::string s3result = s3prefix + "/mapper-results"; +#ifdef __faasm + // Call reducer and await + int reducerId = faasmChainNamed("wc_reducer", (uint8_t*) s3result.c_str(), s3result.size()); + result = faasmAwaitCall(reducerId); + if (result != 0) { + printf("error: reducer failed with rc %i\n", result); + return 1; + } +#endif + + return 0; +} diff --git a/func/s3/wc_mapper.cpp b/func/s3/wc_mapper.cpp new file mode 100644 index 0000000..74093a9 --- /dev/null +++ b/func/s3/wc_mapper.cpp @@ -0,0 +1,115 @@ +#ifdef __faasm +extern "C" +{ +#include "faasm/host_interface.h" +} + +#include +#endif + +#include +#include +#include + +std::map wordCount = { + {"JavaScript", 0}, + {"Java", 0}, + {"PHP", 0}, + {"Python", 0}, + {"C#", 0}, + {"C++", 0}, + {"Ruby", 0}, + {"CSS", 0}, + {"Objective-C", 0}, + {"Perl", 0}, + {"Scala", 0}, + {"Haskell", 0}, + {"MATLAB", 0}, + {"Clojure", 0}, + {"Groovy", 0} +}; + +std::string serialiseWordCount() +{ + std::string result; + + for (const auto& [key, val] : wordCount) { + result += key + ":" + std::to_string(val) + ","; + } + result.pop_back(); + + return result; +} + +/* Mapper Function - Step 2 of MapReduce Workflow + * + * The mapper function taks as an input an S3 path, and, as an output, writes + * a serialized JSON to S3 with the partial counts of different programming + * languages. + */ +int main(int argc, char** argv) +{ + // TODO: this is currently hardcoded + std::string bucketName = "tless"; + std::string s3ObjectKey; + +#ifdef __faasm + // Get the object key as an input + int inputSize = faasmGetInputSize(); + char keyName[inputSize]; + faasmGetInput((uint8_t*)keyName, inputSize); + s3ObjectKey.assign(keyName, keyName + inputSize); +#endif + + // Read object from S3 + uint8_t* keyBytes; +#ifdef __faasm + int keyBytesLen; + + int ret = + __faasm_s3_get_key_bytes(bucketName.c_str(), s3ObjectKey.c_str(), &keyBytes, &keyBytesLen); + if (ret != 0) { + printf("error: error getting bytes from key: %s (bucket: %s)\n", + s3ObjectKey.c_str(), + bucketName.c_str()); + } +#endif + + std::stringstream stringStream((char*) keyBytes); + std::string currentLine; + while (std::getline(stringStream, currentLine, '\n')) { + for (auto& [key, val] : wordCount) { + if (currentLine.find(key) != std::string::npos) { + val += 1; + } + } + } + + printf("final count:\n"); + for (const auto& [key, val] : wordCount) { + printf("\t- %s: %i\n", key.c_str(), val); + } + + // Work-out the serialised payload and directory + auto serialisedWordCount = serialiseWordCount(); + size_t lastSlash = s3ObjectKey.rfind("/"); + if (lastSlash != std::string::npos) { + s3ObjectKey.insert(lastSlash + 1, "mapper-results/"); + } else { + s3ObjectKey = "mapper-results/" + s3ObjectKey; + } + + printf("would write result to: %s\n", s3ObjectKey.c_str()); + // TODO: write serialised result +/* +#ifdef __faasm + int ret = + __faasm_s3_add_key_bytes(bucketName.c_str(), + s3ObjectKey.c_str(), + (void*) serialiseWordCount.c_str(), + serialiseWordCount.size()); +#endif +*/ + + return 0; +} From e6bf766c8a5d867934a1802c01110ec4a74b1095 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Thu, 25 Jul 2024 16:55:44 +0000 Subject: [PATCH 08/12] wc: more functions --- func/s3/CMakeLists.txt | 2 + func/s3/wc_driver.cpp | 17 ++++--- func/s3/wc_mapper.cpp | 13 ++--- func/s3/wc_reducer.cpp | 63 +++++++++++++++++++++++ func/s3/wc_splitter.cpp | 90 +++++++++++++++++++++++++++++++++ libfaasm/core.cpp | 4 +- libfaasm/faasm/core.h | 4 +- libfaasm/faasm/host_interface.h | 4 +- 8 files changed, 176 insertions(+), 21 deletions(-) create mode 100644 func/s3/wc_reducer.cpp create mode 100644 func/s3/wc_splitter.cpp diff --git a/func/s3/CMakeLists.txt b/func/s3/CMakeLists.txt index a79c39a..1c7774a 100644 --- a/func/s3/CMakeLists.txt +++ b/func/s3/CMakeLists.txt @@ -13,7 +13,9 @@ s3_func(add_key_bytes add_key_bytes.cpp) s3_func(get_key_bytes get_key_bytes.cpp) # TODO: move elsewhere when it works s3_func(wc_driver wc_driver.cpp) +s3_func(wc_splitter wc_splitter.cpp) s3_func(wc_mapper wc_mapper.cpp) +s3_func(wc_reducer wc_reducer.cpp) # Custom target to group all the demo functions add_custom_target(s3_all_funcs DEPENDS ${ALL_DEMO_FUNCS}) diff --git a/func/s3/wc_driver.cpp b/func/s3/wc_driver.cpp index 4bf0e1b..7e22218 100644 --- a/func/s3/wc_driver.cpp +++ b/func/s3/wc_driver.cpp @@ -27,12 +27,15 @@ std::vector splitByDelimiter(std::string stringCopy, const std::str * reduces the amount of workflow-specific logic that we need to implement in * Faasm. * + * In a TLess context, the coordinator can be interpreted as "all the things + * that could go wrong" during execution of a confidential serverless workflow. + * * As an input, this workflow gets the S3 path read data from. */ int main(int argc, char** argv) { if (argc != 2) { - printf("WC workflow must be invoked with one parameter: \n"); + printf("word-count(driver): error: workflow must be invoked with one parameter: \n"); return 1; } std::string s3prefix = argv[1]; @@ -43,18 +46,16 @@ int main(int argc, char** argv) // // The return value of the splitter function is a list of message ids for // the mapper function + printf("word-count(driver): invoking one splitter function\n"); #ifdef __faasm // Call splitter int splitterId = faasmChainNamed("wc_splitter", (uint8_t*) s3prefix.c_str(), s3prefix.size()); #endif - // Wait for splitter to finish invoking all mapper functions - // TODO: consider making this dynamic by calling malloc from Faasm - size_t maxOutput = 2048; - char splitterOutput[maxOutput]; - + char* splitterOutput; + int splitterOutputLen; #ifdef __faasm - int result = faasmAwaitCallOutput(splitterId, splitterOutput, maxOutput); + int result = faasmAwaitCallOutput(splitterId, &splitterOutput, &splitterOutputLen); if (result != 0) { printf("error: splitter execution failed with rc %i\n", result); return 1; @@ -65,6 +66,7 @@ int main(int argc, char** argv) std::vector mapperIds = splitByDelimiter(splitterOutput, ","); // 2. Wait for all mapper functions to have finished + printf("word-count(driver): waiting for %zu mapper functions...\n", mapperIds.size()); for (auto mapperIdStr : mapperIds) { int mapperId = std::stoi(mapperIdStr); #ifdef __faasm @@ -78,6 +80,7 @@ int main(int argc, char** argv) // 3. Invoke one reducer function to aggreagate all results std::string s3result = s3prefix + "/mapper-results"; + printf("word-count(driver): invoking one reducer function\n"); #ifdef __faasm // Call reducer and await int reducerId = faasmChainNamed("wc_reducer", (uint8_t*) s3result.c_str(), s3result.size()); diff --git a/func/s3/wc_mapper.cpp b/func/s3/wc_mapper.cpp index 74093a9..22a1cea 100644 --- a/func/s3/wc_mapper.cpp +++ b/func/s3/wc_mapper.cpp @@ -91,7 +91,7 @@ int main(int argc, char** argv) } // Work-out the serialised payload and directory - auto serialisedWordCount = serialiseWordCount(); + auto thisWordCount = serialiseWordCount(); size_t lastSlash = s3ObjectKey.rfind("/"); if (lastSlash != std::string::npos) { s3ObjectKey.insert(lastSlash + 1, "mapper-results/"); @@ -99,17 +99,14 @@ int main(int argc, char** argv) s3ObjectKey = "mapper-results/" + s3ObjectKey; } - printf("would write result to: %s\n", s3ObjectKey.c_str()); - // TODO: write serialised result -/* + printf("word-count(mapper): writting result to %s\n", s3ObjectKey.c_str()); #ifdef __faasm - int ret = + ret = __faasm_s3_add_key_bytes(bucketName.c_str(), s3ObjectKey.c_str(), - (void*) serialiseWordCount.c_str(), - serialiseWordCount.size()); + (void*) thisWordCount.c_str(), + thisWordCount.size()); #endif -*/ return 0; } diff --git a/func/s3/wc_reducer.cpp b/func/s3/wc_reducer.cpp new file mode 100644 index 0000000..2cc3544 --- /dev/null +++ b/func/s3/wc_reducer.cpp @@ -0,0 +1,63 @@ +#ifdef __faasm +extern "C" +{ +#include "faasm/host_interface.h" +} + +#include +#endif +#include +#include +#include +#include + +/* Reducer Function - Word Count Workflow + * + * This function takes a path to a directory as an input, reads the serialised + * counts from each file in the directory, and then aggreagates the results + * to one final count. + */ +int main(int argc, char** argv) +{ + // TODO: the bucket name is currently hardcoded + std::string bucketName = "tless"; + std::string s3dir; + +#ifdef __faasm + // Get the results dir as an input + int inputSize = faasmGetInputSize(); + char s3dirChar[inputSize]; + faasmGetInput((uint8_t*)s3dirChar, inputSize); + s3dir.assign(s3dirChar, s3dirChar + inputSize); +#endif + + // Get the list of files in the s3 dir + std::vector s3files; + +#ifdef __faasm + // In Faasm we need to do a bit of work because: (i) we can not pass + // structured objects (i.e. vectors) through the WASM calling interface, + // and (ii) we have not implmented prefix listing, so we need to filter + // out entries manually + int numKeys = __faasm_s3_get_num_keys(bucketName.c_str()); + + char* keysBuffer[numKeys]; + int keysBufferLens[numKeys]; + __faasm_s3_list_keys(bucketName.c_str(), keysBuffer, keysBufferLens); + + for (int i = 0; i < numKeys; i++) { + std::string tmpString; + tmpString.assign(keysBuffer[i], keysBuffer[i] + keysBufferLens[i]); + + // Filter by prefix + if (tmpString.rfind(s3dir, 0) == 0) { + printf("word-count(reducer): using this output file %s\n", tmpString.c_str()); + s3files.push_back(tmpString); + } + } +#endif + + // TODO: for each output file, de-serialise results and aggreagate + + return 0; +} diff --git a/func/s3/wc_splitter.cpp b/func/s3/wc_splitter.cpp new file mode 100644 index 0000000..dcda885 --- /dev/null +++ b/func/s3/wc_splitter.cpp @@ -0,0 +1,90 @@ +#ifdef __faasm +extern "C" +{ +#include "faasm/host_interface.h" +} + +#include +#endif + +#include +#include +#include + +/* Spliiter Function - MapReduce Workflow + * + * This function takes as an input an S3 path, and invokes one mapper function + * for each file in the S3 path. The chaining is asynchronous. + * + * The function returns a comma-separated list of the message ids corresponding + * to all the invoked functions. + */ +int main(int argc, char** argv) +{ + // TODO: the bucket name is currently hardcoded + std::string bucketName = "tless"; + std::string s3dir; + +#ifdef __faasm + // Get the object key as an input + int inputSize = faasmGetInputSize(); + char s3dirChar[inputSize]; + faasmGetInput((uint8_t*)s3dirChar, inputSize); + s3dir.assign(s3dirChar, s3dirChar + inputSize); +#endif + + // Get the list of files in the s3 dir + std::vector s3files; + +#ifdef __faasm + // In Faasm we need to do a bit of work because: (i) we can not pass + // structured objects (i.e. vectors) through the WASM calling interface, + // and (ii) we have not implmented prefix listing, so we need to filter + // out entries manually + int numKeys = __faasm_s3_get_num_keys(bucketName.c_str()); + + char* keysBuffer[numKeys]; + int keysBufferLens[numKeys]; + __faasm_s3_list_keys(bucketName.c_str(), keysBuffer, keysBufferLens); + + int totalSize = 0; + for (int i = 0; i < numKeys; i++) { + std::string tmpString; + tmpString.assign(keysBuffer[i], keysBuffer[i] + keysBufferLens[i]); + + // Filter by prefix + if (tmpString.rfind(s3dir, 0) == 0) { + // Filter-out sub-directories to store results + if (tmpString.find("results") == std::string::npos) { + s3files.push_back(tmpString); + } + } + } +#endif + + // Chain to one mapper function per file, and store the message id to be + // able to wait on it + std::vector splitterCallIds; + for (const auto& s3file : s3files) { +#ifdef __faasm + printf("word-count(splitter): chaining to mapper with file %s\n", s3file.c_str()); + int splitterId = faasmChainNamed("wc_mapper", (uint8_t*) s3file.c_str(), s3file.size()); + splitterCallIds.push_back(splitterId); +#endif + } + + + // Prepare the output: comma separated list of message ids + std::string outputStr; + for (const auto& splitterId : splitterCallIds) { + outputStr += std::to_string(splitterId) + ","; + } + outputStr.pop_back(); + +#ifdef __faasm + printf("word-count(splitter): my output is %s\n", outputStr.c_str()); + faasmSetOutput(outputStr.c_str(), outputStr.size()); +#endif + + return 0; +} diff --git a/libfaasm/core.cpp b/libfaasm/core.cpp index a7b29bd..0fef908 100644 --- a/libfaasm/core.cpp +++ b/libfaasm/core.cpp @@ -152,8 +152,8 @@ unsigned int faasmAwaitCall(unsigned int messageId) } unsigned int faasmAwaitCallOutput(unsigned int messageId, - const char* output, - long outputLen) + char** output, + int* outputLen) { return __faasm_await_call_output(messageId, output, outputLen); } diff --git a/libfaasm/faasm/core.h b/libfaasm/faasm/core.h index 351b700..bb755eb 100644 --- a/libfaasm/faasm/core.h +++ b/libfaasm/faasm/core.h @@ -188,8 +188,8 @@ extern "C" * Gets the output from the given call into the buffer */ unsigned int faasmAwaitCallOutput(unsigned int messageId, - const char* output, - long outputLen); + char** output, + int* outputLen); /** * Returns the python user diff --git a/libfaasm/faasm/host_interface.h b/libfaasm/faasm/host_interface.h index 9578245..ba7f4b7 100644 --- a/libfaasm/faasm/host_interface.h +++ b/libfaasm/faasm/host_interface.h @@ -114,8 +114,8 @@ int __faasm_await_call(unsigned int messageId); HOST_IFACE_FUNC int __faasm_await_call_output(unsigned int messageId, - const char* output, - long outputLen); + char** output, + int* outputLen); HOST_IFACE_FUNC void __faasm_get_py_user(unsigned char* buffer, long bufferLen); From 0fa589ce91ec54e4bf8afceec9cfd121910ca8e3 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 26 Jul 2024 11:17:00 +0000 Subject: [PATCH 09/12] s3: untrack wc functions --- func/s3/wc_driver.cpp | 95 ---------------------------------- func/s3/wc_mapper.cpp | 112 ---------------------------------------- func/s3/wc_reducer.cpp | 63 ---------------------- func/s3/wc_splitter.cpp | 90 -------------------------------- 4 files changed, 360 deletions(-) delete mode 100644 func/s3/wc_driver.cpp delete mode 100644 func/s3/wc_mapper.cpp delete mode 100644 func/s3/wc_reducer.cpp delete mode 100644 func/s3/wc_splitter.cpp diff --git a/func/s3/wc_driver.cpp b/func/s3/wc_driver.cpp deleted file mode 100644 index 7e22218..0000000 --- a/func/s3/wc_driver.cpp +++ /dev/null @@ -1,95 +0,0 @@ -#ifdef __faasm -#include -#endif - -#include -#include -#include - -std::vector splitByDelimiter(std::string stringCopy, const std::string& delimiter) -{ - std::vector splitString; - - size_t pos = 0; - std::string token; - while ((pos = stringCopy.find(delimiter)) != std::string::npos) { - splitString.push_back(stringCopy.substr(0, pos)); - stringCopy.erase(0, pos + delimiter.length()); - } - splitString.push_back(stringCopy); - - return splitString; -} - -/* Driver Function - MapReduce workflow - * - * This function acts as a "coordinator" for the MapReduce workflow. It - * reduces the amount of workflow-specific logic that we need to implement in - * Faasm. - * - * In a TLess context, the coordinator can be interpreted as "all the things - * that could go wrong" during execution of a confidential serverless workflow. - * - * As an input, this workflow gets the S3 path read data from. - */ -int main(int argc, char** argv) -{ - if (argc != 2) { - printf("word-count(driver): error: workflow must be invoked with one parameter: \n"); - return 1; - } - std::string s3prefix = argv[1]; - - // 1. Invoke one instance of the splitter function with an S3 path as an - // input. The splitter function will, in turn, traverse the S3 directory - // and invoke one mapper function per file in the directory. - // - // The return value of the splitter function is a list of message ids for - // the mapper function - printf("word-count(driver): invoking one splitter function\n"); -#ifdef __faasm - // Call splitter - int splitterId = faasmChainNamed("wc_splitter", (uint8_t*) s3prefix.c_str(), s3prefix.size()); -#endif - - char* splitterOutput; - int splitterOutputLen; -#ifdef __faasm - int result = faasmAwaitCallOutput(splitterId, &splitterOutput, &splitterOutputLen); - if (result != 0) { - printf("error: splitter execution failed with rc %i\n", result); - return 1; - } -#endif - - // Get all message ids from output - std::vector mapperIds = splitByDelimiter(splitterOutput, ","); - - // 2. Wait for all mapper functions to have finished - printf("word-count(driver): waiting for %zu mapper functions...\n", mapperIds.size()); - for (auto mapperIdStr : mapperIds) { - int mapperId = std::stoi(mapperIdStr); -#ifdef __faasm - result = faasmAwaitCall(mapperId); - if (result != 0) { - printf("error: mapper execution (id: %i) failed with rc %i\n", mapperId, result); - return 1; - } -#endif - } - - // 3. Invoke one reducer function to aggreagate all results - std::string s3result = s3prefix + "/mapper-results"; - printf("word-count(driver): invoking one reducer function\n"); -#ifdef __faasm - // Call reducer and await - int reducerId = faasmChainNamed("wc_reducer", (uint8_t*) s3result.c_str(), s3result.size()); - result = faasmAwaitCall(reducerId); - if (result != 0) { - printf("error: reducer failed with rc %i\n", result); - return 1; - } -#endif - - return 0; -} diff --git a/func/s3/wc_mapper.cpp b/func/s3/wc_mapper.cpp deleted file mode 100644 index 22a1cea..0000000 --- a/func/s3/wc_mapper.cpp +++ /dev/null @@ -1,112 +0,0 @@ -#ifdef __faasm -extern "C" -{ -#include "faasm/host_interface.h" -} - -#include -#endif - -#include -#include -#include - -std::map wordCount = { - {"JavaScript", 0}, - {"Java", 0}, - {"PHP", 0}, - {"Python", 0}, - {"C#", 0}, - {"C++", 0}, - {"Ruby", 0}, - {"CSS", 0}, - {"Objective-C", 0}, - {"Perl", 0}, - {"Scala", 0}, - {"Haskell", 0}, - {"MATLAB", 0}, - {"Clojure", 0}, - {"Groovy", 0} -}; - -std::string serialiseWordCount() -{ - std::string result; - - for (const auto& [key, val] : wordCount) { - result += key + ":" + std::to_string(val) + ","; - } - result.pop_back(); - - return result; -} - -/* Mapper Function - Step 2 of MapReduce Workflow - * - * The mapper function taks as an input an S3 path, and, as an output, writes - * a serialized JSON to S3 with the partial counts of different programming - * languages. - */ -int main(int argc, char** argv) -{ - // TODO: this is currently hardcoded - std::string bucketName = "tless"; - std::string s3ObjectKey; - -#ifdef __faasm - // Get the object key as an input - int inputSize = faasmGetInputSize(); - char keyName[inputSize]; - faasmGetInput((uint8_t*)keyName, inputSize); - s3ObjectKey.assign(keyName, keyName + inputSize); -#endif - - // Read object from S3 - uint8_t* keyBytes; -#ifdef __faasm - int keyBytesLen; - - int ret = - __faasm_s3_get_key_bytes(bucketName.c_str(), s3ObjectKey.c_str(), &keyBytes, &keyBytesLen); - if (ret != 0) { - printf("error: error getting bytes from key: %s (bucket: %s)\n", - s3ObjectKey.c_str(), - bucketName.c_str()); - } -#endif - - std::stringstream stringStream((char*) keyBytes); - std::string currentLine; - while (std::getline(stringStream, currentLine, '\n')) { - for (auto& [key, val] : wordCount) { - if (currentLine.find(key) != std::string::npos) { - val += 1; - } - } - } - - printf("final count:\n"); - for (const auto& [key, val] : wordCount) { - printf("\t- %s: %i\n", key.c_str(), val); - } - - // Work-out the serialised payload and directory - auto thisWordCount = serialiseWordCount(); - size_t lastSlash = s3ObjectKey.rfind("/"); - if (lastSlash != std::string::npos) { - s3ObjectKey.insert(lastSlash + 1, "mapper-results/"); - } else { - s3ObjectKey = "mapper-results/" + s3ObjectKey; - } - - printf("word-count(mapper): writting result to %s\n", s3ObjectKey.c_str()); -#ifdef __faasm - ret = - __faasm_s3_add_key_bytes(bucketName.c_str(), - s3ObjectKey.c_str(), - (void*) thisWordCount.c_str(), - thisWordCount.size()); -#endif - - return 0; -} diff --git a/func/s3/wc_reducer.cpp b/func/s3/wc_reducer.cpp deleted file mode 100644 index 2cc3544..0000000 --- a/func/s3/wc_reducer.cpp +++ /dev/null @@ -1,63 +0,0 @@ -#ifdef __faasm -extern "C" -{ -#include "faasm/host_interface.h" -} - -#include -#endif -#include -#include -#include -#include - -/* Reducer Function - Word Count Workflow - * - * This function takes a path to a directory as an input, reads the serialised - * counts from each file in the directory, and then aggreagates the results - * to one final count. - */ -int main(int argc, char** argv) -{ - // TODO: the bucket name is currently hardcoded - std::string bucketName = "tless"; - std::string s3dir; - -#ifdef __faasm - // Get the results dir as an input - int inputSize = faasmGetInputSize(); - char s3dirChar[inputSize]; - faasmGetInput((uint8_t*)s3dirChar, inputSize); - s3dir.assign(s3dirChar, s3dirChar + inputSize); -#endif - - // Get the list of files in the s3 dir - std::vector s3files; - -#ifdef __faasm - // In Faasm we need to do a bit of work because: (i) we can not pass - // structured objects (i.e. vectors) through the WASM calling interface, - // and (ii) we have not implmented prefix listing, so we need to filter - // out entries manually - int numKeys = __faasm_s3_get_num_keys(bucketName.c_str()); - - char* keysBuffer[numKeys]; - int keysBufferLens[numKeys]; - __faasm_s3_list_keys(bucketName.c_str(), keysBuffer, keysBufferLens); - - for (int i = 0; i < numKeys; i++) { - std::string tmpString; - tmpString.assign(keysBuffer[i], keysBuffer[i] + keysBufferLens[i]); - - // Filter by prefix - if (tmpString.rfind(s3dir, 0) == 0) { - printf("word-count(reducer): using this output file %s\n", tmpString.c_str()); - s3files.push_back(tmpString); - } - } -#endif - - // TODO: for each output file, de-serialise results and aggreagate - - return 0; -} diff --git a/func/s3/wc_splitter.cpp b/func/s3/wc_splitter.cpp deleted file mode 100644 index dcda885..0000000 --- a/func/s3/wc_splitter.cpp +++ /dev/null @@ -1,90 +0,0 @@ -#ifdef __faasm -extern "C" -{ -#include "faasm/host_interface.h" -} - -#include -#endif - -#include -#include -#include - -/* Spliiter Function - MapReduce Workflow - * - * This function takes as an input an S3 path, and invokes one mapper function - * for each file in the S3 path. The chaining is asynchronous. - * - * The function returns a comma-separated list of the message ids corresponding - * to all the invoked functions. - */ -int main(int argc, char** argv) -{ - // TODO: the bucket name is currently hardcoded - std::string bucketName = "tless"; - std::string s3dir; - -#ifdef __faasm - // Get the object key as an input - int inputSize = faasmGetInputSize(); - char s3dirChar[inputSize]; - faasmGetInput((uint8_t*)s3dirChar, inputSize); - s3dir.assign(s3dirChar, s3dirChar + inputSize); -#endif - - // Get the list of files in the s3 dir - std::vector s3files; - -#ifdef __faasm - // In Faasm we need to do a bit of work because: (i) we can not pass - // structured objects (i.e. vectors) through the WASM calling interface, - // and (ii) we have not implmented prefix listing, so we need to filter - // out entries manually - int numKeys = __faasm_s3_get_num_keys(bucketName.c_str()); - - char* keysBuffer[numKeys]; - int keysBufferLens[numKeys]; - __faasm_s3_list_keys(bucketName.c_str(), keysBuffer, keysBufferLens); - - int totalSize = 0; - for (int i = 0; i < numKeys; i++) { - std::string tmpString; - tmpString.assign(keysBuffer[i], keysBuffer[i] + keysBufferLens[i]); - - // Filter by prefix - if (tmpString.rfind(s3dir, 0) == 0) { - // Filter-out sub-directories to store results - if (tmpString.find("results") == std::string::npos) { - s3files.push_back(tmpString); - } - } - } -#endif - - // Chain to one mapper function per file, and store the message id to be - // able to wait on it - std::vector splitterCallIds; - for (const auto& s3file : s3files) { -#ifdef __faasm - printf("word-count(splitter): chaining to mapper with file %s\n", s3file.c_str()); - int splitterId = faasmChainNamed("wc_mapper", (uint8_t*) s3file.c_str(), s3file.size()); - splitterCallIds.push_back(splitterId); -#endif - } - - - // Prepare the output: comma separated list of message ids - std::string outputStr; - for (const auto& splitterId : splitterCallIds) { - outputStr += std::to_string(splitterId) + ","; - } - outputStr.pop_back(); - -#ifdef __faasm - printf("word-count(splitter): my output is %s\n", outputStr.c_str()); - faasmSetOutput(outputStr.c_str(), outputStr.size()); -#endif - - return 0; -} From 120dcd2d32ee9615a1ef285db31d318d65024004 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 26 Jul 2024 11:21:43 +0000 Subject: [PATCH 10/12] s3: build functions as part of gha --- func/s3/CMakeLists.txt | 5 ----- tasks/func.py | 1 + 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/func/s3/CMakeLists.txt b/func/s3/CMakeLists.txt index 1c7774a..a082f11 100644 --- a/func/s3/CMakeLists.txt +++ b/func/s3/CMakeLists.txt @@ -11,11 +11,6 @@ s3_func(get_num_keys get_num_keys.cpp) s3_func(list_keys list_keys.cpp) s3_func(add_key_bytes add_key_bytes.cpp) s3_func(get_key_bytes get_key_bytes.cpp) -# TODO: move elsewhere when it works -s3_func(wc_driver wc_driver.cpp) -s3_func(wc_splitter wc_splitter.cpp) -s3_func(wc_mapper wc_mapper.cpp) -s3_func(wc_reducer wc_reducer.cpp) # Custom target to group all the demo functions add_custom_target(s3_all_funcs DEPENDS ${ALL_DEMO_FUNCS}) diff --git a/tasks/func.py b/tasks/func.py index 2461cd6..cbb6c1f 100644 --- a/tasks/func.py +++ b/tasks/func.py @@ -183,4 +183,5 @@ def local(ctx, clean=False, debug=False): user(ctx, "errors", clean, debug) user(ctx, "mpi", clean, debug) user(ctx, "omp", clean, debug) + user(ctx, "s3", clean, debug) user(ctx, "threads", clean, debug) From 26c268ce3c5c4de0083e15e8583860a0910df799 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 26 Jul 2024 11:40:18 +0000 Subject: [PATCH 11/12] func(demo): update chain_output function for new signature --- func/demo/chain_output.cpp | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/func/demo/chain_output.cpp b/func/demo/chain_output.cpp index 8839b5a..2b8e524 100644 --- a/func/demo/chain_output.cpp +++ b/func/demo/chain_output.cpp @@ -26,17 +26,22 @@ int main(int argc, char* argv[]) unsigned int callIdB = faasmChain(otherB, nullptr, 0); std::string expectedA = "expected A"; - std::string actualA; - actualA.reserve(expectedA.size()); std::string expectedB = "longer expected B"; - std::string actualB; - actualB.reserve(expectedB.size()); + std::string actualA; + char* actualABuf; + int actualABufSize; unsigned int resA = - faasmAwaitCallOutput(callIdA, actualA.c_str(), actualA.size()); + faasmAwaitCallOutput(callIdA, &actualABuf, &actualABufSize); + actualA.assign(actualABuf, actualABuf + actualABufSize); + + std::string actualB; + char* actualBBuf; + int actualBBufSize; unsigned int resB = - faasmAwaitCallOutput(callIdB, actualB.c_str(), actualB.size()); + faasmAwaitCallOutput(callIdB, &actualBBuf, &actualBBufSize); + actualB.assign(actualBBuf, actualBBuf + actualBBufSize); if (resA != 0 || resB != 0) { printf("One or more chained calls failed: %i %i\n", resA, resB); @@ -44,10 +49,14 @@ int main(int argc, char* argv[]) } if (actualA != expectedA) { + printf( + "Output mismatch: %s != %s\n", actualA.c_str(), expectedA.c_str()); return 1; } if (actualB != expectedB) { + printf( + "Output mismatch: %s != %s\n", actualB.c_str(), expectedB.c_str()); return 1; } From 1a88997850743e9faf9ca93054af8eca1da0a99d Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 26 Jul 2024 11:49:44 +0000 Subject: [PATCH 12/12] func: separate threaded/non-threaded targets --- tasks/func.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tasks/func.py b/tasks/func.py index cbb6c1f..409d450 100644 --- a/tasks/func.py +++ b/tasks/func.py @@ -179,9 +179,16 @@ def local(ctx, clean=False, debug=False): """ Compile all functions used in the tests """ + # Mixing users that use the threads sysroot and the non-threads sysroot + # seems to give some rather obscure compilation errors, so just make sure + # to separate them + + # Non-threaded users user(ctx, "demo", clean, debug) user(ctx, "errors", clean, debug) user(ctx, "mpi", clean, debug) - user(ctx, "omp", clean, debug) user(ctx, "s3", clean, debug) + + # Threaded users + user(ctx, "omp", clean, debug) user(ctx, "threads", clean, debug)