Skip to content

Commit

Permalink
Merge pull request ESMCI#1601 from NCAR/ejh_read_darray
Browse files Browse the repository at this point in the history
implementation of read_darray() and some tests
  • Loading branch information
edhartnett authored Sep 6, 2019
2 parents 49ab208 + 5774d81 commit 4664ac5
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 79 deletions.
44 changes: 39 additions & 5 deletions src/clib/pio_darray.c
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ PIOc_write_darray(int ncid, int varid, int ioid, PIO_Offset arraylen, void *arra
return pio_err(ios, file, PIO_EINVAL, __FILE__, __LINE__);

/* Move to end of list or the entry that matches this ioid. */
hashid = ioid*10 + vdesc->rec_var;
hashid = ioid * 10 + vdesc->rec_var;
HASH_FIND_INT( file->buffer, &hashid, wmb);
if (wmb)
PLOG((3, "wmb->ioid = %d wmb->recordvar = %d", wmb->ioid, wmb->recordvar));
Expand Down Expand Up @@ -871,10 +871,11 @@ PIOc_write_darray(int ncid, int varid, int ioid, PIO_Offset arraylen, void *arra
}

/**
* Read a field from a file to the IO library.
* Read a field from a file to the IO library using distributed
* arrays.
*
* @param ncid identifies the netCDF file
* @param varid the variable ID to be read
* @param ncid identifies the netCDF file.
* @param varid the variable ID to be read.
* @param ioid the I/O description ID as passed back by
* PIOc_InitDecomp().
* @param arraylen this parameter is ignored. Nominally it is the
Expand All @@ -898,8 +899,9 @@ PIOc_read_darray(int ncid, int varid, int ioid, PIO_Offset arraylen,
io_desc_t *iodesc; /* Pointer to IO description information. */
void *iobuf = NULL; /* holds the data as read on the io node. */
size_t rlen = 0; /* the length of data in iobuf. */
int ierr; /* Return code. */
void *tmparray; /* unsorted copy of array buf if required */
int mpierr = MPI_SUCCESS, mpierr2; /* Return code from MPI function calls. */
int ierr; /* Return code. */

#ifdef USE_MPE
pio_start_mpe_log(DARRAY_READ);
Expand All @@ -913,6 +915,38 @@ PIOc_read_darray(int ncid, int varid, int ioid, PIO_Offset arraylen,
return pio_err(NULL, NULL, PIO_EBADID, __FILE__, __LINE__);
ios = file->iosystem;

/* If async is in use, and this is not an IO task, bcast the
* parameters. */
if (ios->async)
{
if (!ios->ioproc)
{
int msg = PIO_MSG_READDARRAY;

if (ios->compmaster == MPI_ROOT)
mpierr = MPI_Send(&msg, 1, MPI_INT, ios->ioroot, 1, ios->union_comm);

/* Send the function parameters and associated informaiton
* to the msg handler. */
if (!mpierr)
mpierr = MPI_Bcast(&ncid, 1, MPI_INT, ios->compmaster, ios->intercomm);
if (!mpierr)
mpierr = MPI_Bcast(&varid, 1, MPI_INT, ios->compmaster, ios->intercomm);
if (!mpierr)
mpierr = MPI_Bcast(&ioid, 1, MPI_INT, ios->compmaster, ios->intercomm);
if (!mpierr)
mpierr = MPI_Bcast(&arraylen, 1, MPI_OFFSET, ios->compmaster, ios->intercomm);
PLOG((2, "PIOc_read_darray ncid %d varid %d ioid %d arraylen %d",
ncid, varid, ioid, arraylen));
}

/* Handle MPI errors. */
if ((mpierr2 = MPI_Bcast(&mpierr, 1, MPI_INT, ios->comproot, ios->my_comm)))
return check_mpi(NULL, file, mpierr2, __FILE__, __LINE__);
if (mpierr)
return check_mpi(NULL, file, mpierr, __FILE__, __LINE__);
}

/* Get the iodesc. */
if (!(iodesc = pio_get_iodesc_from_id(ioid)))
return pio_err(ios, file, PIO_EBADID, __FILE__, __LINE__);
Expand Down
24 changes: 10 additions & 14 deletions src/clib/pio_darray_int.c
Original file line number Diff line number Diff line change
Expand Up @@ -1137,19 +1137,19 @@ write_darray_multi_serial(file_desc_t *file, int nvars, int fndims, const int *v
}

/**
* Read an array of data from a file to the (parallel) IO library.
* Read an array of data from a file using distributed arrays.
*
* @param file a pointer to the open file descriptor for the file
* that will be written to
* @param iodesc a pointer to the defined iodescriptor for the buffer
* @param vid the variable id to be read
* that will be read from.
* @param iodesc a pointer to the defined iodescriptor for the buffer.
* @param vid the variable id to be read.
* @param iobuf the buffer to be read into from this mpi task. May be
* null. for example we have 8 ionodes and a distributed array with
* null. (For example we have 8 ionodes and a distributed array with
* global size 4, then at least 4 nodes will have a null iobuf. In
* practice the box rearranger trys to have at least blocksize bytes
* practice the box rearranger tries to have at least blocksize bytes
* on each io task and so if the total number of bytes to write is
* less than blocksize*numiotasks then some iotasks will have a NULL
* iobuf.
* iobuf.)
* @return 0 on success, error code otherwise.
* @ingroup PIO_read_darray_c
* @author Jim Edwards, Ed Hartnett
Expand Down Expand Up @@ -1189,15 +1189,14 @@ pio_read_darray_nc(file_desc_t *file, io_desc_t *iodesc, int vid, void *iobuf)
ndims = iodesc->ndims;

/* Get the number of dims for this var in the file. */
/* if ((ierr = PIOc_inq_varndims(file->pio_ncid, vid, &fndims))) */
/* return pio_err(ios, file, ierr, __FILE__, __LINE__); */
fndims = vdesc->ndims;
PLOG((3, "fndims %d vdesc->ndims %d", fndims, vdesc->ndims));
PLOG((4, "fndims %d ndims %d", fndims, ndims));

/* ??? */
#if USE_VARD_READ
if(!ios->async || !ios->ioproc)
ierr = get_gdim0(file, iodesc, vid, fndims, &gdim0);
#endif
/* PLOG((4, "fndims %d ndims %d", fndims, ndims)); */

/* IO procs will read the data. */
if (ios->ioproc)
Expand Down Expand Up @@ -1464,16 +1463,13 @@ pio_read_darray_nc_serial(file_desc_t *file, io_desc_t *iodesc, int vid,

/* Get number of dims for this var. */
fndims = vdesc->ndims;
/* if ((ierr = PIOc_inq_varndims(file->pio_ncid, vid, &fndims))) */
/* return pio_err(ios, file, ierr, __FILE__, __LINE__); */

/* If setframe was not called, use a default value of 0. This is
* required for backward compatibility. */
if (fndims == ndims + 1 && vdesc->record < 0)
vdesc->record = 0;
PLOG((3, "fndims %d ndims %d vdesc->record %d vdesc->ndims %d", fndims,
ndims, vdesc->record, vdesc->ndims));
/* pioassert(fndims == vdesc->ndims, "bad ndims", __FILE__, __LINE__); */

/* Confirm that we are being called with the correct ndims. */
pioassert((fndims == ndims && vdesc->record < 0) ||
Expand Down
33 changes: 28 additions & 5 deletions src/clib/pio_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ int create_file_handler(iosystem_desc_t *ios)
int use_ext_ncid;
char ncidp_present;
int mpierr;
int ret;

PLOG((1, "create_file_handler comproot = %d", ios->comproot));
assert(ios);
Expand Down Expand Up @@ -2259,8 +2258,7 @@ int write_darray_multi_handler(iosystem_desc_t *ios)
}

/**
* This function is run on the IO tasks to...
* NOTE: not yet implemented
* This function is run on the IO tasks to read distributed arrays.
*
* @param ios pointer to the iosystem_desc_t data.
*
Expand All @@ -2269,9 +2267,34 @@ int write_darray_multi_handler(iosystem_desc_t *ios)
* @internal
* @author Ed Hartnett
*/
int readdarray_handler(iosystem_desc_t *ios)
int read_darray_handler(iosystem_desc_t *ios)
{
int ncid;
int varid;
int ioid;
int arraylen;
void *data = NULL;
int mpierr;

PLOG((1, "read_darray_handler called"));
assert(ios);

/* Get the parameters for this function that the the comp master
* task is broadcasting. */
if ((mpierr = MPI_Bcast(&ncid, 1, MPI_INT, 0, ios->intercomm)))
return check_mpi(ios, NULL, mpierr, __FILE__, __LINE__);
if ((mpierr = MPI_Bcast(&varid, 1, MPI_INT, 0, ios->intercomm)))
return check_mpi(ios, NULL, mpierr, __FILE__, __LINE__);
if ((mpierr = MPI_Bcast(&ioid, 1, MPI_INT, 0, ios->intercomm)))
return check_mpi(ios, NULL, mpierr, __FILE__, __LINE__);
if ((mpierr = MPI_Bcast(&arraylen, 1, MPI_OFFSET, 0, ios->intercomm)))
return check_mpi(ios, NULL, mpierr, __FILE__, __LINE__);
PLOG((2, "ncid %d varid %d ioid %d arraylen %d", ncid, varid,
ioid, arraylen));

PIOc_read_darray(ncid, varid, ioid, arraylen, data);

PLOG((1, "read_darray_handler succeeded!"));
return PIO_NOERR;
}

Expand Down Expand Up @@ -2736,7 +2759,7 @@ int pio_msg_handler2(int io_rank, int component_count, iosystem_desc_t **iosys,
ret = advanceframe_handler(my_iosys);
break;
case PIO_MSG_READDARRAY:
ret = readdarray_handler(my_iosys);
ret = read_darray_handler(my_iosys);
break;
case PIO_MSG_SETERRORHANDLING:
ret = seterrorhandling_handler(my_iosys);
Expand Down
2 changes: 1 addition & 1 deletion src/clib/pio_rearrange.c
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ define_iodesc_datatypes(iosystem_desc_t *ios, io_desc_t *iodesc)
int ret; /* Return value. */

pioassert(ios && iodesc, "invalid input", __FILE__, __LINE__);
PLOG((1, "define_iodesc_datatypes ios->ioproc = %d iodesc->rtype is %sNULL, "
PLOG((3, "define_iodesc_datatypes ios->ioproc = %d iodesc->rtype is %sNULL, "
"iodesc->nrecvs %d", ios->ioproc, iodesc->rtype ? "not " : "",
iodesc->nrecvs));

Expand Down
4 changes: 2 additions & 2 deletions src/clib/pioc_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -963,10 +963,10 @@ PIOc_freedecomp(int iosysid, int ioid)
}

/* Handle MPI errors. */
PLOG((3, "handline error mpierr %d ios->comproot %d", mpierr, ios->comproot));
PLOG((3, "handling mpierr %d ios->comproot %d", mpierr, ios->comproot));
if ((mpierr2 = MPI_Bcast(&mpierr, 1, MPI_INT, ios->comproot, ios->my_comm)))
return check_mpi(NULL, NULL, mpierr2, __FILE__, __LINE__);
PLOG((3, "handline error mpierr2 %d", mpierr2));
PLOG((3, "handling mpierr2 %d", mpierr2));
if (mpierr)
return check_mpi(NULL, NULL, mpierr, __FILE__, __LINE__);
}
Expand Down
7 changes: 4 additions & 3 deletions tests/cunit/test_async_1d.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ int main(int argc, char **argv)
PIO_Offset compmap[MAPLEN];
int varid;
int data;
/* int data_in; */
int data_in;
int ioid;

/* Create a file. */
Expand Down Expand Up @@ -131,8 +131,9 @@ int main(int argc, char **argv)
/* Read the data. */
if ((ret = PIOc_setframe(ncid, 0, 0)))
ERR(ret);
/* if ((ret = PIOc_read_darray(ncid, 0, ioid, MAPLEN, &data_in))) */
/* ERR(ret); */
if ((ret = PIOc_read_darray(ncid, 0, ioid, MAPLEN, &data_in)))
ERR(ret);
if (data_in != data) ERR(ERR_WRONG);

/* Close the file. */
if ((ret = PIOc_closefile(ncid)))
Expand Down
52 changes: 3 additions & 49 deletions tests/ncint/tst_pio_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,52 +99,6 @@ main(int argc, char **argv)
if (nc_put_vard_int(ncid, varid, ioid, 0, my_data)) PERR;
if (nc_close(ncid)) PERR;

/* Reopen the file using normal PIO calls. */
{
int ndims, nvars, ngatts, unlimdimid;
int ioid2;
PIO_Offset *compdof2; /* The decomposition mapping. */
nc_type xtype_in;
char var_name_in[NC_MAX_NAME + 1];
char dim_name_in[NC_MAX_NAME + 1];
int natts_in;
int dimids_in[NDIM3];
int iotype = PIO_IOTYPE_NETCDF4C;
size_t dim_len_in;

/* Open the file. */
if (PIOc_openfile(iosysid, &ncid, &iotype, FILE_NAME, 0)) PERR;

/* Set up decomposition. */
if (!(compdof2 = malloc(elements_per_pe * sizeof(size_t))))
PERR;
for (i = 0; i < elements_per_pe; i++)
{
compdof2[i] = (my_rank - num_io_procs) * elements_per_pe + i;
/* printf("my_rank %d compdof2[%d]=%lld\n", my_rank, i, compdof2[i]); */
}

if (PIOc_init_decomp(iosysid, PIO_INT, NDIM2, &dimlen[1], elements_per_pe, compdof2,
&ioid2, 1, NULL, NULL)) PERR;
free(compdof2);

/* Read distributed arrays. */
if (!(data_in = malloc(elements_per_pe * sizeof(int)))) PERR;
if (PIOc_setframe(ncid, 0, 0)) PERR;
/* if (PIOc_read_darray(ncid, 0, ioid, elements_per_pe, data_in)) PERR; */

/* Check results. */
/* for (i = 0; i < elements_per_pe; i++) */
/* if (data_in[i] != my_data[i]) PERR; */

/* Close file. */
if (PIOc_closefile(ncid)) PERR;

/* Free resources. */
free(data_in);
if (PIOc_freedecomp(iosysid, ioid2)) PERR;
}

/* Reopen the file using netCDF integration. */
{
int ndims, nvars, ngatts, unlimdimid;
Expand Down Expand Up @@ -176,11 +130,11 @@ main(int argc, char **argv)

/* Read distributed arrays. */
if (!(data_in = malloc(elements_per_pe * sizeof(int)))) PERR;
/* if (nc_get_vard_int(ncid, varid, ioid, 0, data_in)) PERR; */
if (nc_get_vard_int(ncid, varid, ioid, 0, data_in)) PERR;

/* Check results. */
/* for (i = 0; i < elements_per_pe; i++) */
/* if (data_in[i] != my_data[i]) PERR; */
for (i = 0; i < elements_per_pe; i++)
if (data_in[i] != my_data[i]) PERR;

/* Close file. */
if (nc_close(ncid)) PERR;
Expand Down

0 comments on commit 4664ac5

Please sign in to comment.