Skip to content

Commit

Permalink
Fab data I/O (AMReX-Codes#1956)
Browse files Browse the repository at this point in the history
When Fab data are in GPU device memory (including managed memory), use
pinned memory as buffer to avoid accessing the data on the host.
Previously, MultiFab, plotfile and checkpoint writers except for the async
versions relied on that the memory was managed and therefore could be used
directly in I/O.
  • Loading branch information
WeiqunZhang authored Apr 18, 2021
1 parent f21f15d commit b30cae9
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 28 deletions.
72 changes: 66 additions & 6 deletions Src/Base/AMReX_FArrayBox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -607,15 +607,35 @@ FArrayBox::writeOn (std::ostream& os, int comp, int num_comp) const
BL_ASSERT(comp >= 0 && num_comp >= 1 && (comp+num_comp) <= nComp());
fabio->write_header(os, *this, num_comp);
os.flush(); // 2016-08-30: Titan requires this flush() (probably due to a bug).
fabio->write(os, *this, comp, num_comp);
#ifdef AMREX_USE_GPU
if (this->arena()->isManaged() || this->arena()->isDevice()) {
FArrayBox hostfab(this->box(), num_comp, The_Pinned_Arena());
Gpu::dtoh_memcpy_async(hostfab.dataPtr(), this->dataPtr(comp), hostfab.size()*sizeof(Real));
Gpu::streamSynchronize();
fabio->write(os, hostfab, 0, num_comp);
} else
#endif
{
fabio->write(os, *this, comp, num_comp);
}
}

void
FArrayBox::readFrom (std::istream& is)
{
// BL_PROFILE("FArrayBox::readFrom_is");
FABio* fabrd = FABio::read_header(is, *this);
fabrd->read(is, *this);
#ifdef AMREX_USE_GPU
if (this->arena()->isManaged() || this->arena()->isDevice()) {
FArrayBox hostfab(this->box(), this->nComp(), The_Pinned_Arena());
fabrd->read(is, hostfab);
Gpu::htod_memcpy_async(this->dataPtr(), hostfab.dataPtr(), hostfab.size()*sizeof(Real));
Gpu::streamSynchronize();
} else
#endif
{
fabrd->read(is, *this);
}
delete fabrd;
}

Expand All @@ -629,7 +649,17 @@ FArrayBox::readFrom (std::istream& is, int compIndex)
BL_ASSERT(compIndex >= 0 && compIndex < nCompAvailable);

fabrd->skip(is, *this, compIndex); // skip data up to the component we want
fabrd->read(is, *this);
#ifdef AMREX_USE_GPU
if (this->arena()->isManaged() || this->arena()->isDevice()) {
FArrayBox hostfab(this->box(), 1, The_Pinned_Arena());
fabrd->read(is, hostfab);
Gpu::htod_memcpy_async(this->dataPtr(), hostfab.dataPtr(), hostfab.size()*sizeof(Real));
Gpu::streamSynchronize();
} else
#endif
{
fabrd->read(is, *this);
}
int remainingComponents = nCompAvailable - compIndex - 1;
fabrd->skip(is, *this, remainingComponents); // skip to the end

Expand Down Expand Up @@ -710,7 +740,17 @@ void
FABio_ascii::skip (std::istream& is,
FArrayBox& f) const
{
FABio_ascii::read(is, f);
#ifdef AMREX_USE_GPU
if (f.arena()->isManaged() || f.arena()->isDevice()) {
FArrayBox hostfab(f.box(), f.nComp(), The_Pinned_Arena());
FABio_ascii::read(is, hostfab);
Gpu::htod_memcpy_async(f.dataPtr(), hostfab.dataPtr(), f.size()*sizeof(Real));
Gpu::streamSynchronize();
} else
#endif
{
FABio_ascii::read(is, f);
}
}

void
Expand Down Expand Up @@ -936,7 +976,17 @@ operator<< (std::ostream& os,
const FArrayBox& f)
{
static FABio_ascii fabio_ascii;
fabio_ascii.write(os,f,0,f.nComp());
#ifdef AMREX_USE_GPU
if (f.arena()->isManaged() || f.arena()->isDevice()) {
FArrayBox hostfab(f.box(), f.nComp(), The_Pinned_Arena());
Gpu::dtoh_memcpy_async(hostfab.dataPtr(), f.dataPtr(), f.size()*sizeof(Real));
Gpu::streamSynchronize();
fabio_ascii.write(os,hostfab,0,f.nComp());
} else
#endif
{
fabio_ascii.write(os,f,0,f.nComp());
}
return os;
}

Expand All @@ -945,7 +995,17 @@ operator>> (std::istream& is,
FArrayBox& f)
{
FABio *fabrd = FABio::read_header(is,f);
fabrd->read(is,f);
#ifdef AMREX_USE_GPU
if (f.arena()->isManaged() || f.arena()->isDevice()) {
FArrayBox hostfab(f.box(), f.nComp(), The_Pinned_Arena());
fabrd->read(is,hostfab);
Gpu::htod_memcpy_async(f.dataPtr(), hostfab.dataPtr(), f.size()*sizeof(Real));
Gpu::streamSynchronize();
} else
#endif
{
fabrd->read(is,f);
}
delete fabrd;
return is;
}
Expand Down
120 changes: 98 additions & 22 deletions Src/Base/AMReX_VisMF.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -972,8 +972,6 @@ VisMF::Write (const FabArray<FArrayBox>& mf,
}
}

amrex::prefetchToHost(mf);

// ---- check if mf has sparse data
bool useSparseFPP(false);
const Vector<int> &pmap = mf.DistributionMap().ProcessorMap();
Expand Down Expand Up @@ -1047,12 +1045,24 @@ VisMF::Write (const FabArray<FArrayBox>& mf,
auto tstr = hss.str();
memcpy(afPtr, tstr.c_str(), hLength); // ---- the fab header
}
Real const* fabdata = fab.dataPtr();
#ifdef AMREX_USE_GPU
std::unique_ptr<FArrayBox> hostfab;
if (fab.arena()->isManaged() || fab.arena()->isDevice()) {
hostfab = std::make_unique<FArrayBox>(fab.box(), fab.nComp(),
The_Pinned_Arena());
Gpu::dtoh_memcpy_async(hostfab->dataPtr(), fab.dataPtr(),
fab.size()*sizeof(Real));
Gpu::streamSynchronize();
fabdata = hostfab->dataPtr();
}
#endif
if(doConvert) {
RealDescriptor::convertFromNativeFormat(static_cast<void *> (afPtr + hLength),
writeDataItems,
fab.dataPtr(), *whichRD);
fabdata, *whichRD);
} else { // ---- copy from the fab
memcpy(afPtr + hLength, fab.dataPtr(), writeDataSize);
memcpy(afPtr + hLength, fabdata, writeDataSize);
}
writePosition += hLength + writeDataSize;
}
Expand All @@ -1074,16 +1084,28 @@ VisMF::Write (const FabArray<FArrayBox>& mf,
nfi.Stream().write(tstr.c_str(), hLength); // ---- the fab header
nfi.Stream().flush();
}
Real const* fabdata = fab.dataPtr();
#ifdef AMREX_USE_GPU
std::unique_ptr<FArrayBox> hostfab;
if (fab.arena()->isManaged() || fab.arena()->isDevice()) {
hostfab = std::make_unique<FArrayBox>(fab.box(), fab.nComp(),
The_Pinned_Arena());
Gpu::dtoh_memcpy_async(hostfab->dataPtr(), fab.dataPtr(),
fab.size()*sizeof(Real));
Gpu::streamSynchronize();
fabdata = hostfab->dataPtr();
}
#endif
if(doConvert) {
char *cDataPtr = new char[writeDataSize];
RealDescriptor::convertFromNativeFormat(static_cast<void *> (cDataPtr),
writeDataItems,
fab.dataPtr(), *whichRD);
fabdata, *whichRD);
nfi.Stream().write(cDataPtr, writeDataSize);
nfi.Stream().flush();
delete [] cDataPtr;
} else { // ---- copy from the fab
nfi.Stream().write((char *) fab.dataPtr(), writeDataSize);
nfi.Stream().write((char *) fabdata, writeDataSize);
nfi.Stream().flush();
}
}
Expand All @@ -1094,10 +1116,6 @@ VisMF::Write (const FabArray<FArrayBox>& mf,
coordinatorProc = nfi.CoordinatorProc();
}

if (Gpu::inLaunchRegion()) {
amrex::prefetchToDevice(mf); // CalculateMinMax might do work on device
}

if(currentVersion == VisMF::Header::Version_v1 ||
currentVersion == VisMF::Header::NoFabHeaderMinMax_v1)
{
Expand Down Expand Up @@ -1406,26 +1424,40 @@ VisMF::readFAB (int idx,
fab->readFrom(*infs, whichComp);
}
} else {
Real* fabdata = fab->dataPtr();
#ifdef AMREX_USE_GPU
std::unique_ptr<FArrayBox> hostfab;
if (fab->arena()->isManaged() || fab->arena()->isDevice()) {
hostfab = std::make_unique<FArrayBox>(fab->box(), fab->nComp(), The_Pinned_Arena());
fabdata = hostfab->dataPtr();
}
#endif
if(whichComp == -1) { // ---- read all components
if(hdr.m_writtenRD == FPC::NativeRealDescriptor()) {
infs->read((char *) fab->dataPtr(), fab->nBytes());
infs->read((char *) fabdata, fab->nBytes());
} else {
Long readDataItems(fab->box().numPts() * fab->nComp());
RealDescriptor::convertToNativeFormat(fab->dataPtr(), readDataItems,
RealDescriptor::convertToNativeFormat(fabdata, readDataItems,
*infs, hdr.m_writtenRD);
}

} else {
Long bytesPerComp(fab->box().numPts() * hdr.m_writtenRD.numBytes());
infs->seekg(bytesPerComp * whichComp, std::ios::cur);
if(hdr.m_writtenRD == FPC::NativeRealDescriptor()) {
infs->read((char *) fab->dataPtr(), bytesPerComp);
infs->read((char *) fabdata, bytesPerComp);
} else {
Long readDataItems(fab->box().numPts()); // ---- one component only
RealDescriptor::convertToNativeFormat(fab->dataPtr(), readDataItems,
RealDescriptor::convertToNativeFormat(fabdata, readDataItems,
*infs, hdr.m_writtenRD);
}
}
#ifdef AMREX_USE_GPU
if (hostfab) {
Gpu::htod_memcpy_async(fab->dataPtr(), hostfab->dataPtr(), fab->size()*sizeof(Real));
Gpu::streamSynchronize();
}
#endif
}

VisMF::CloseStream(FullName);
Expand All @@ -1450,13 +1482,27 @@ VisMF::readFAB (FabArray<FArrayBox> &mf,
infs->seekg(hdr.m_fod[idx].m_head, std::ios::beg);

if(NoFabHeader(hdr)) {
Real* fabdata = fab.dataPtr();
#ifdef AMREX_USE_GPU
std::unique_ptr<FArrayBox> hostfab;
if (fab.arena()->isManaged() || fab.arena()->isDevice()) {
hostfab = std::make_unique<FArrayBox>(fab.box(), fab.nComp(), The_Pinned_Arena());
fabdata = hostfab->dataPtr();
}
#endif
if(hdr.m_writtenRD == FPC::NativeRealDescriptor()) {
infs->read((char *) fab.dataPtr(), fab.nBytes());
infs->read((char *) fabdata, fab.nBytes());
} else {
Long readDataItems(fab.box().numPts() * fab.nComp());
RealDescriptor::convertToNativeFormat(fab.dataPtr(), readDataItems,
RealDescriptor::convertToNativeFormat(fabdata, readDataItems,
*infs, hdr.m_writtenRD);
}
#ifdef AMREX_USE_GPU
if (hostfab) {
Gpu::htod_memcpy_async(fab.dataPtr(), hostfab->dataPtr(), fab.size()*sizeof(Real));
Gpu::streamSynchronize();
}
#endif
} else {
fab.readFrom(*infs);
}
Expand Down Expand Up @@ -1693,14 +1739,30 @@ VisMF::Read (FabArray<FArrayBox> &mf,
if(myProc == frc[i].rankToRead) {
char *afPtr = allFabData + currentOffset;
FArrayBox &fab = whichFA[frc[i].faIndex];
Real* fabdata = fab.dataPtr();
#ifdef AMREX_USE_GPU
std::unique_ptr<FArrayBox> hostfab;
if (fab.arena()->isManaged() || fab.arena()->isDevice()) {
hostfab = std::make_unique<FArrayBox>(fab.box(), fab.nComp(),
The_Pinned_Arena());
fabdata = hostfab->dataPtr();
}
#endif
Long readDataItems(fab.box().numPts() * fab.nComp());
if(doConvert) {
RealDescriptor::convertToNativeFormat(fab.dataPtr(), readDataItems,
RealDescriptor::convertToNativeFormat(fabdata, readDataItems,
afPtr, hdr.m_writtenRD);
} else {
memcpy(fab.dataPtr(), afPtr, fab.nBytes());
memcpy(fabdata, afPtr, fab.nBytes());
}
currentOffset += readDataItems * hdr.m_writtenRD.numBytes();
#ifdef AMREX_USE_GPU
if (hostfab) {
Gpu::htod_memcpy_async(fab.dataPtr(), hostfab->dataPtr(),
fab.size()*sizeof(Real));
Gpu::streamSynchronize();
}
#endif
}
}
delete [] allFabData;
Expand All @@ -1712,13 +1774,29 @@ VisMF::Read (FabArray<FArrayBox> &mf,
nfi.Stream().seekp(frc[i].fileOffset, std::ios::beg);
}
FArrayBox &fab = whichFA[frc[i].faIndex];
Real* fabdata = fab.dataPtr();
#ifdef AMREX_USE_GPU
std::unique_ptr<FArrayBox> hostfab;
if (fab.arena()->isManaged() || fab.arena()->isDevice()) {
hostfab = std::make_unique<FArrayBox>(fab.box(), fab.nComp(),
The_Pinned_Arena());
fabdata = hostfab->dataPtr();
}
#endif
Long readDataItems(fab.box().numPts() * fab.nComp());
if(doConvert) {
RealDescriptor::convertToNativeFormat(fab.dataPtr(), readDataItems,
RealDescriptor::convertToNativeFormat(fabdata, readDataItems,
nfi.Stream(), hdr.m_writtenRD);
} else {
nfi.Stream().read((char *) fab.dataPtr(), fab.nBytes());
nfi.Stream().read((char *) fabdata, fab.nBytes());
}
#ifdef AMREX_USE_GPU
if (hostfab) {
Gpu::htod_memcpy_async(fab.dataPtr(), hostfab->dataPtr(),
fab.size()*sizeof(Real));
Gpu::streamSynchronize();
}
#endif
}
}
}
Expand Down Expand Up @@ -1921,8 +1999,6 @@ VisMF::Read (FabArray<FArrayBox> &mf,
}

BL_ASSERT(mf.ok());

if (Gpu::inLaunchRegion()) amrex::prefetchToDevice(mf);
}


Expand Down

0 comments on commit b30cae9

Please sign in to comment.