From 4bea2db6dd756d3c9d383346e0f9113de9a82d7a Mon Sep 17 00:00:00 2001 From: edwardhartnett Date: Wed, 7 Aug 2019 06:07:44 -0600 Subject: [PATCH] starting to get async working with netcdf integration --- src/clib/pio.h | 5 ++ src/ncint/ncint_pio.c | 66 +++++++++++++++++-- src/ncint/ncintdispatch.c | 15 +++-- tests/ncint/Makefile.am | 5 +- tests/ncint/pio_err_macros.h | 64 ++++++++++++++++++ tests/ncint/tst_pio_async.c | 124 +++++++++++++++++++++++++++++++++++ tests/ncint/tst_pio_udf.c | 74 ++++++++++----------- 7 files changed, 305 insertions(+), 48 deletions(-) create mode 100644 tests/ncint/pio_err_macros.h create mode 100644 tests/ncint/tst_pio_async.c diff --git a/src/clib/pio.h b/src/clib/pio.h index e19a4782212..1120aea5db0 100644 --- a/src/clib/pio.h +++ b/src/clib/pio.h @@ -1250,6 +1250,11 @@ extern "C" { int nc_def_iosystemm(MPI_Comm comp_comm, int num_iotasks, int stride, int base, int rearr, int *iosysidp); + int nc_def_async(MPI_Comm world, int num_io_procs, int *io_proc_list, + int component_count, int *num_procs_per_comp, int **proc_list, + MPI_Comm *io_comm, MPI_Comm *comp_comm, int rearranger, + int *iosysidp); + /* Set the default IOsystem ID. */ int nc_set_iosystem(int iosysid); diff --git a/src/ncint/ncint_pio.c b/src/ncint/ncint_pio.c index 7664a176781..8ef61a8158d 100644 --- a/src/ncint/ncint_pio.c +++ b/src/ncint/ncint_pio.c @@ -13,10 +13,6 @@ /** This is te default io system id. */ extern int diosysid; -/** Have we initialized the netCDF integration layer? This is where we - * register our dispatch layer with netcdf-c. */ -extern int ncint_initialized; - /** * Same as PIOc_Init_Intracomm(). * @@ -28,6 +24,10 @@ nc_def_iosystemm(MPI_Comm comp_comm, int num_iotasks, int stride, int base, { int ret; + /* Make sure PIO was initialized. */ + if ((ret = PIO_NCINT_initialize())) + return ret; + /* Call the PIOc_ function to initialize the intracomm. */ if ((ret = PIOc_Init_Intracomm(comp_comm, num_iotasks, stride, base, rearr, iosysidp))) @@ -39,6 +39,64 @@ nc_def_iosystemm(MPI_Comm comp_comm, int num_iotasks, int stride, int base, return PIO_NOERR; } +/** + * Same as PIOc_init_async(). + * + * @param world the communicator containing all the available tasks. + * @param num_io_procs the number of processes for the IO component. + * @param io_proc_list an array of lenth num_io_procs with the + * processor number for each IO processor. If NULL then the IO + * processes are assigned starting at processes 0. + * @param component_count number of computational components + * @param num_procs_per_comp an array of int, of length + * component_count, with the number of processors in each computation + * component. + * @param proc_list an array of arrays containing the processor + * numbers for each computation component. If NULL then the + * computation components are assigned processors sequentially + * starting with processor num_io_procs. + * @param user_io_comm pointer to an MPI_Comm. If not NULL, it will + * get an MPI duplicate of the IO communicator. (It is a full + * duplicate and later must be freed with MPI_Free() by the caller.) + * @param user_comp_comm pointer to an array of pointers to MPI_Comm; + * the array is of length component_count. If not NULL, it will get an + * MPI duplicate of each computation communicator. (These are full + * duplicates and each must later be freed with MPI_Free() by the + * caller.) + * @param rearranger the default rearranger to use for decompositions + * in this IO system. Only PIO_REARR_BOX is supported for + * async. Support for PIO_REARR_SUBSET will be provided in a future + * version. + * @param iosysidp pointer to array of length component_count that + * gets the iosysid for each component. + * + * @return PIO_NOERR on success, error code otherwise. + * @author Ed Hartnett + */ +int +nc_def_async(MPI_Comm world, int num_io_procs, int *io_proc_list, + int component_count, int *num_procs_per_comp, int **proc_list, + MPI_Comm *io_comm, MPI_Comm *comp_comm, int rearranger, + int *iosysidp) +{ + int ret; + + /* Make sure PIO was initialized. */ + if ((ret = PIO_NCINT_initialize())) + return ret; + + /* Call the PIOc_ function to initialize the intracomm. */ + if ((ret = PIOc_init_async(world, num_io_procs, io_proc_list, + component_count, num_procs_per_comp, proc_list, + io_comm, comp_comm, rearranger, iosysidp))) + return ret; + + /* Remember the io system id. */ + diosysid = *iosysidp; + + return PIO_NOERR; +} + /** * Set the default iosystemID. * diff --git a/src/ncint/ncintdispatch.c b/src/ncint/ncintdispatch.c index f6bf4aef9d3..2260137544d 100644 --- a/src/ncint/ncintdispatch.c +++ b/src/ncint/ncintdispatch.c @@ -133,14 +133,17 @@ PIO_NCINT_initialize(void) { int ret; - NCINT_dispatch_table = &NCINT_dispatcher; + if (!ncint_initialized) + { + NCINT_dispatch_table = &NCINT_dispatcher; - PLOG((1, "Adding user-defined format for netCDF PIO integration")); + PLOG((1, "Adding user-defined format for netCDF PIO integration")); - /* Add our user defined format. */ - if ((ret = nc_def_user_format(NC_UDF0, &NCINT_dispatcher, NULL))) - return ret; - ncint_initialized++; + /* Add our user defined format. */ + if ((ret = nc_def_user_format(NC_UDF0, &NCINT_dispatcher, NULL))) + return ret; + ncint_initialized++; + } return NC_NOERR; } diff --git a/tests/ncint/Makefile.am b/tests/ncint/Makefile.am index 03ebdc02ef6..07dc555c6ac 100644 --- a/tests/ncint/Makefile.am +++ b/tests/ncint/Makefile.am @@ -8,7 +8,10 @@ AM_CPPFLAGS = -I$(top_srcdir)/src/clib LDADD = ${top_builddir}/src/clib/libpioc.la # Build the test for make check. -check_PROGRAMS = tst_pio_udf +check_PROGRAMS = tst_pio_udf tst_pio_async + +tst_pio_udf_SOURCES = tst_pio_udf.c pio_err_macros.h +tst_pio_async_SOURCES = tst_pio_async.c pio_err_macros.h if RUN_TESTS # Tests will run from a bash script. diff --git a/tests/ncint/pio_err_macros.h b/tests/ncint/pio_err_macros.h new file mode 100644 index 00000000000..754f12e7271 --- /dev/null +++ b/tests/ncint/pio_err_macros.h @@ -0,0 +1,64 @@ +/* This is part of the netCDF package. + Copyright 2018 University Corporation for Atmospheric Research/Unidata + See COPYRIGHT file for conditions of use. + + Common includes, defines, etc., for test code in the libsrc4 and + nc_test4 directories. + + Ed Hartnett, Russ Rew, Dennis Heimbigner +*/ + +#ifndef _PIO_ERR_MACROS_H +#define _PIO_ERR_MACROS_H + +#include "config.h" +#include +#include +#include +#include + +/* Err is used to keep track of errors within each set of tests, + * total_err is the number of errors in the entire test program, which + * generally cosists of several sets of tests. */ +static int total_err = 0, err = 0; + +/* This macro prints an error message with line number and name of + * test program. */ +#define PERR do { \ + fflush(stdout); /* Make sure our stdout is synced with stderr. */ \ + err++; \ + fprintf(stderr, "Sorry! Unexpected result, %s, line: %d\n", \ + __FILE__, __LINE__); \ + fflush(stderr); \ + return 2; \ + } while (0) + +/* After a set of tests, report the number of errors, and increment + * total_err. */ +#define PSUMMARIZE_ERR do { \ + if (err) \ + { \ + printf("%d failures\n", err); \ + total_err += err; \ + err = 0; \ + } \ + else \ + if (!my_rank) \ + printf("ok.\n"); \ + } while (0) + +/* This macro prints out our total number of errors, if any, and exits + * with a 0 if there are not, or a 2 if there were errors. Make will + * stop if a non-zero value is returned from a test program. */ +#define PFINAL_RESULTS do { \ + if (total_err) \ + { \ + printf("%d errors detected! Sorry!\n", total_err); \ + return 2; \ + } \ + if (!my_rank) \ + printf("*** Tests successful!\n\n"); \ + return 0; \ + } while (0) + +#endif /* _PIO_ERR_MACROS_H */ diff --git a/tests/ncint/tst_pio_async.c b/tests/ncint/tst_pio_async.c new file mode 100644 index 00000000000..e042fd737dc --- /dev/null +++ b/tests/ncint/tst_pio_async.c @@ -0,0 +1,124 @@ +/* Test netcdf integration layer. + + Ed Hartnett +*/ + +#include "config.h" +#include +#include "pio_err_macros.h" + +#define FILE_NAME "tst_pio_async.nc" +#define VAR_NAME "data_var" +#define DIM_NAME_UNLIMITED "dim_unlimited" +#define DIM_NAME_X "dim_x" +#define DIM_NAME_Y "dim_y" +#define DIM_LEN_X 4 +#define DIM_LEN_Y 4 +#define NDIM2 2 +#define NDIM3 3 + +extern NC_Dispatch NCINT_dispatcher; + +/* Number of computational components to create. */ +#define COMPONENT_COUNT 1 + +int +main(int argc, char **argv) +{ + int my_rank; + int ntasks; + + /* Initialize MPI. */ + if (MPI_Init(&argc, &argv)) PERR; + + /* Learn my rank and the total number of processors. */ + if (MPI_Comm_rank(MPI_COMM_WORLD, &my_rank)) PERR; + if (MPI_Comm_size(MPI_COMM_WORLD, &ntasks)) PERR; + + if (!my_rank) + printf("\n*** Testing netCDF integration layer.\n"); + if (!my_rank) + printf("*** testing simple async use of netCDF integration layer..."); + { + int ncid, ioid; + /* int dimid[NDIM3], varid; */ + int dimlen[NDIM3] = {NC_UNLIMITED, DIM_LEN_X, DIM_LEN_Y}; + int iosysid; + size_t elements_per_pe; + size_t *compdof; /* The decomposition mapping. */ + /* int *my_data; */ + /* int *data_in; */ + int num_procs2[COMPONENT_COUNT] = {3}; + int num_io_procs = 1; + int i; + + /* Turn on logging for PIO library. */ + PIOc_set_log_level(3); + + /* Initialize the intracomm. */ + if (nc_def_async(MPI_COMM_WORLD, num_io_procs, NULL, COMPONENT_COUNT, + num_procs2, NULL, NULL, NULL, PIO_REARR_BOX, &iosysid)) + PERR; + + if (my_rank) + { + /* Create a file with a 3D record var. */ + if (nc_create(FILE_NAME, NC_UDF0, &ncid)) PERR; + /* if (nc_def_dim(ncid, DIM_NAME_UNLIMITED, dimlen[0], &dimid[0])) PERR; */ + /* if (nc_def_dim(ncid, DIM_NAME_X, dimlen[1], &dimid[1])) PERR; */ + /* if (nc_def_dim(ncid, DIM_NAME_Y, dimlen[2], &dimid[2])) PERR; */ + /* if (nc_def_var(ncid, VAR_NAME, NC_INT, NDIM3, dimid, &varid)) PERR; */ + + /* Calculate a decomposition for distributed arrays. */ + elements_per_pe = DIM_LEN_X * DIM_LEN_Y / ntasks; + if (!(compdof = malloc(elements_per_pe * sizeof(size_t)))) + PERR; + for (i = 0; i < elements_per_pe; i++) + compdof[i] = my_rank * elements_per_pe + i; + + /* Create the PIO decomposition for this test. */ + if (nc_def_decomp(iosysid, PIO_INT, NDIM2, &dimlen[1], elements_per_pe, + compdof, &ioid, 1, NULL, NULL)) PERR; + free(compdof); + + /* /\* Create some data on this processor. *\/ */ + /* if (!(my_data = malloc(elements_per_pe * sizeof(int)))) PERR; */ + /* for (i = 0; i < elements_per_pe; i++) */ + /* my_data[i] = my_rank * 10 + i; */ + + /* /\* Write some data with distributed arrays. *\/ */ + /* if (nc_put_vard_int(ncid, varid, ioid, 0, my_data)) PERR; */ + /* if (nc_close(ncid)) PERR; */ + + /* /\* Check that our user-defined format has been added. *\/ */ + /* if (nc_inq_user_format(NC_UDF0, &disp_in, NULL)) PERR; */ + /* if (disp_in != &NCINT_dispatcher) PERR; */ + + /* /\* Open the file. *\/ */ + /* if (nc_open(FILE_NAME, NC_UDF0, &ncid)) PERR; */ + + /* /\* Read distributed arrays. *\/ */ + /* if (!(data_in = malloc(elements_per_pe * sizeof(int)))) PERR; */ + /* if (nc_get_vard_int(ncid, varid, ioid, 0, data_in)) PERR; */ + + /* /\* Check results. *\/ */ + /* for (i = 0; i < elements_per_pe; i++) */ + /* if (data_in[i] != my_data[i]) PERR; */ + + /* Close file. */ + if (nc_close(ncid)) PERR; + + /* /\* Free resources. *\/ */ + /* free(data_in); */ + /* free(my_data); */ + if (nc_free_decomp(ioid)) PERR; + if (nc_free_iosystem(iosysid)) PERR; + } + } + if (!my_rank) + PSUMMARIZE_ERR; + + /* Finalize MPI. */ + MPI_Finalize(); + PFINAL_RESULTS; +} diff --git a/tests/ncint/tst_pio_udf.c b/tests/ncint/tst_pio_udf.c index 84203a2a183..dedcd536ee2 100644 --- a/tests/ncint/tst_pio_udf.c +++ b/tests/ncint/tst_pio_udf.c @@ -4,12 +4,8 @@ */ #include "config.h" -#include -#include "err_macros.h" -#include "netcdf.h" -#include "nc4dispatch.h" +#include "pio_err_macros.h" #include -#include #define FILE_NAME "tst_pio_udf.nc" #define VAR_NAME "data_var" @@ -20,6 +16,7 @@ #define DIM_LEN_Y 4 #define NDIM2 2 #define NDIM3 3 +#define TEST_VAL_42 42 extern NC_Dispatch NCINT_dispatcher; @@ -30,24 +27,27 @@ main(int argc, char **argv) int ntasks; /* Initialize MPI. */ - if (MPI_Init(&argc, &argv)) ERR; + if (MPI_Init(&argc, &argv)) PERR; /* Learn my rank and the total number of processors. */ - if (MPI_Comm_rank(MPI_COMM_WORLD, &my_rank)) ERR; - if (MPI_Comm_size(MPI_COMM_WORLD, &ntasks)) ERR; + if (MPI_Comm_rank(MPI_COMM_WORLD, &my_rank)) PERR; + if (MPI_Comm_size(MPI_COMM_WORLD, &ntasks)) PERR; - printf("\n*** Testing netCDF integration layer.\n"); - printf("*** testing getting/setting of default iosystemid..."); + if (!my_rank) + printf("\n*** Testing netCDF integration layer.\n"); + if (!my_rank) + printf("*** testing getting/setting of default iosystemid..."); { int iosysid; - if (nc_set_iosystem(TEST_VAL_42)) ERR; - if (nc_get_iosystem(&iosysid)) ERR; - if (iosysid != TEST_VAL_42) ERR; + if (nc_set_iosystem(TEST_VAL_42)) PERR; + if (nc_get_iosystem(&iosysid)) PERR; + if (iosysid != TEST_VAL_42) PERR; } - SUMMARIZE_ERR; + PSUMMARIZE_ERR; - printf("*** testing simple use of netCDF integration layer format..."); + if (!my_rank) + printf("*** testing simple use of netCDF integration layer format..."); { int ncid, ioid; int dimid[NDIM3], varid; @@ -64,63 +64,63 @@ main(int argc, char **argv) /* PIOc_set_log_level(3); */ /* Initialize the intracomm. */ - if (nc_def_iosystemm(MPI_COMM_WORLD, 1, 1, 0, 0, &iosysid)) ERR; + if (nc_def_iosystemm(MPI_COMM_WORLD, 1, 1, 0, 0, &iosysid)) PERR; /* Create a file with a 3D record var. */ - if (nc_create(FILE_NAME, NC_UDF0, &ncid)) ERR; - if (nc_def_dim(ncid, DIM_NAME_UNLIMITED, dimlen[0], &dimid[0])) ERR; - if (nc_def_dim(ncid, DIM_NAME_X, dimlen[1], &dimid[1])) ERR; - if (nc_def_dim(ncid, DIM_NAME_Y, dimlen[2], &dimid[2])) ERR; - if (nc_def_var(ncid, VAR_NAME, NC_INT, NDIM3, dimid, &varid)) ERR; + if (nc_create(FILE_NAME, NC_UDF0, &ncid)) PERR; + if (nc_def_dim(ncid, DIM_NAME_UNLIMITED, dimlen[0], &dimid[0])) PERR; + if (nc_def_dim(ncid, DIM_NAME_X, dimlen[1], &dimid[1])) PERR; + if (nc_def_dim(ncid, DIM_NAME_Y, dimlen[2], &dimid[2])) PERR; + if (nc_def_var(ncid, VAR_NAME, NC_INT, NDIM3, dimid, &varid)) PERR; /* Calculate a decomposition for distributed arrays. */ elements_per_pe = DIM_LEN_X * DIM_LEN_Y / ntasks; if (!(compdof = malloc(elements_per_pe * sizeof(size_t)))) - ERR; + PERR; for (i = 0; i < elements_per_pe; i++) compdof[i] = my_rank * elements_per_pe + i; /* Create the PIO decomposition for this test. */ if (nc_def_decomp(iosysid, PIO_INT, NDIM2, &dimlen[1], elements_per_pe, - compdof, &ioid, 1, NULL, NULL)) ERR; + compdof, &ioid, 1, NULL, NULL)) PERR; free(compdof); /* Create some data on this processor. */ - if (!(my_data = malloc(elements_per_pe * sizeof(int)))) ERR; + if (!(my_data = malloc(elements_per_pe * sizeof(int)))) PERR; for (i = 0; i < elements_per_pe; i++) my_data[i] = my_rank * 10 + i; /* Write some data with distributed arrays. */ - if (nc_put_vard_int(ncid, varid, ioid, 0, my_data)) ERR; - if (nc_close(ncid)) ERR; + if (nc_put_vard_int(ncid, varid, ioid, 0, my_data)) PERR; + if (nc_close(ncid)) PERR; /* Check that our user-defined format has been added. */ - if (nc_inq_user_format(NC_UDF0, &disp_in, NULL)) ERR; - if (disp_in != &NCINT_dispatcher) ERR; + if (nc_inq_user_format(NC_UDF0, &disp_in, NULL)) PERR; + if (disp_in != &NCINT_dispatcher) PERR; /* Open the file. */ - if (nc_open(FILE_NAME, NC_UDF0, &ncid)) ERR; + if (nc_open(FILE_NAME, NC_UDF0, &ncid)) PERR; /* Read distributed arrays. */ - if (!(data_in = malloc(elements_per_pe * sizeof(int)))) ERR; - if (nc_get_vard_int(ncid, varid, ioid, 0, data_in)) ERR; + if (!(data_in = malloc(elements_per_pe * sizeof(int)))) PERR; + if (nc_get_vard_int(ncid, varid, ioid, 0, data_in)) PERR; /* Check results. */ for (i = 0; i < elements_per_pe; i++) - if (data_in[i] != my_data[i]) ERR; + if (data_in[i] != my_data[i]) PERR; /* Close file. */ - if (nc_close(ncid)) ERR; + if (nc_close(ncid)) PERR; /* Free resources. */ free(data_in); free(my_data); - if (nc_free_decomp(ioid)) ERR; - if (nc_free_iosystem(iosysid)) ERR; + if (nc_free_decomp(ioid)) PERR; + if (nc_free_iosystem(iosysid)) PERR; } - SUMMARIZE_ERR; + PSUMMARIZE_ERR; /* Finalize MPI. */ MPI_Finalize(); - FINAL_RESULTS; + PFINAL_RESULTS; }