Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A cleaner version of my branch feature/atrayano/faster_routing #774

Merged
merged 4 commits into from
Jul 3, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ module GEOS_SurfaceGridCompMod

type( ESMF_VM ) :: VMG

! !PUBLIC MEMBER FUNCTIONS:

! !PUBLIC MEMBER FUNCTIONS:

public SetServices

Expand Down Expand Up @@ -112,18 +113,25 @@ module GEOS_SurfaceGridCompMod
type T_Routing
integer :: srcTileID, dstTileID, &
srcIndex=-1, dstIndex=-1, &
srcPE=-1, dstPE=-1
srcPE=-1, dstPE=-1, SeqIdx=-1
real :: weight
end type T_Routing

type T_RiverRouting
type(T_Routing), pointer :: LocalRoutings(:) => NULL()
integer, pointer :: karray(:)
integer, pointer :: kdx(:)
integer, pointer :: BlockSizes(:), displ(:)
end type T_RiverRouting

! Internal state and its wrapper
! ------------------------------

type T_SURFACE_STATE
private
type (MAPL_LocStreamXFORM) :: XFORM_IN (NUM_CHILDREN)
type (MAPL_LocStreamXFORM) :: XFORM_OUT(NUM_CHILDREN)
type (T_Routing), pointer :: LocalRoutings(:) => NULL()
type (T_RiverRouting), pointer :: RoutingType => NULL()
end type T_SURFACE_STATE

type SURF_WRAP
Expand Down Expand Up @@ -3648,7 +3656,7 @@ subroutine Initialize ( GC, IMPORT, EXPORT, CLOCK, RC )
VERIFY_(STATUS)

if (RoutingFile /= "") then
call InitializeRiverRouting(SURF_INTERNAL_STATE%LocalRoutings, &
call InitializeRiverRouting(SURF_INTERNAL_STATE%RoutingType, &
RoutingFile, LocStream, rc=STATUS)
VERIFY_(STATUS)

Expand Down Expand Up @@ -3691,9 +3699,9 @@ subroutine Initialize ( GC, IMPORT, EXPORT, CLOCK, RC )
call MAPL_LocStreamTransform( LOCSTREAM, PCMETILE, PCME, RC=STATUS)
VERIFY_(STATUS)

call RouteRunoff(SURF_INTERNAL_STATE%LocalRoutings, PUMETILE, PUMEDISTILE, RC=STATUS)
call RouteRunoff(SURF_INTERNAL_STATE%RoutingType, PUMETILE, PUMEDISTILE, RC=STATUS)
VERIFY_(STATUS)
call RouteRunoff(SURF_INTERNAL_STATE%LocalRoutings, PCMETILE, PCMEDISTILE, RC=STATUS)
call RouteRunoff(SURF_INTERNAL_STATE%RoutingType, PCMETILE, PCMEDISTILE, RC=STATUS)
VERIFY_(STATUS)

call MAPL_GetPointer(INTERNAL, DISCHARGE_ADJUST, 'DISCHARGE_ADJUST', RC=STATUS)
Expand Down Expand Up @@ -3725,12 +3733,17 @@ subroutine Initialize ( GC, IMPORT, EXPORT, CLOCK, RC )
RETURN_(ESMF_SUCCESS)
end subroutine Initialize

subroutine InitializeRiverRouting(LocalRoutings, RoutingFile, Stream, rc)
type(T_Routing), pointer :: LocalRoutings(:)
subroutine InitializeRiverRouting(RoutingType, RoutingFile, Stream, rc)
type(T_RiverRouting), pointer :: RoutingType
character(len=*), intent(IN) :: RoutingFile
type(MAPL_LocStream), intent(IN) :: Stream
integer, optional, intent(OUT):: rc

type(T_Routing), pointer :: LocalRoutings(:) => NULL()
integer, pointer :: karray(:)
integer, pointer :: kdx(:)
integer, pointer :: BlockSizes(:), displ(:)

type(ESMF_VM) :: VM
integer :: comm, nDEs, myPE, i, numRoutings
type(T_Routing), pointer :: Routing
Expand All @@ -3739,9 +3752,11 @@ subroutine InitializeRiverRouting(LocalRoutings, RoutingFile, Stream, rc)
integer, pointer :: Active(:,:)
integer, pointer :: ActiveGlobal(:,:)
integer :: numActive, numLocalRoutings
integer, allocatable :: BlockSizes(:), displ(:)
integer, pointer :: Local_Id(:)
integer :: unit
integer :: ksum, n, nsdx
integer :: ntotal, k
integer, allocatable :: kn(:), kseq(:), tmparray(:)
#ifdef DEBUG
character(len=ESMF_MAXSTR) :: routefile
#endif
Expand Down Expand Up @@ -3812,7 +3827,8 @@ subroutine InitializeRiverRouting(LocalRoutings, RoutingFile, Stream, rc)
! assigned an index of -1.

Routing => tmpLocalRoutings(i)

Routing%seqIdx = i

call Tile2Index(Routing, Local_Id)

if(Routing%srcIndex>0 .and. Routing%dstIndex>0) then
Expand Down Expand Up @@ -3923,6 +3939,71 @@ subroutine InitializeRiverRouting(LocalRoutings, RoutingFile, Stream, rc)
close(unit)

#endif

!ALT NEW ROUTING to make communication more effective

nsdx=0
do i=1,numLocalRoutings
!notneeded if (mype == Routing(i)%dstPE) nddx = nddx+1
if (mype == LocalRoutings(i)%srcPE) nsdx = nsdx+1
end do
allocate(kdx(nsdx), blocksizes(nDEs), _STAT)
blocksizes=0

! exchange with everybody else
call MPI_AllGather(nsdx, 1, MP_Integer, &
blocksizes, 1, MP_Integer, comm, status)
_VERIFY(status)

! now everybody has blocksizes(nDEs)

ntotal = sum(blocksizes) ! should be same as # of paired sources and sinks (npairs)
_ASSERT(ntotal==numRoutings, 'Number source/sinks does not match')
allocate (karray(numRoutings), _STAT) !declare as target!!!
karray = 0
allocate (displ(0:nDEs), _STAT) !declare as target!!!

ksum = 0
displ(0)=ksum
do n=1,nDEs
ksum = ksum + blocksizes(n)
displ(n)=ksum
end do
! as another sanity check: ksum should be the same as npairs
_ASSERT(displ(nDEs)==ntotal, 'Displ source/sinks does not match')

allocate(kseq(nsdx), _STAT)
allocate(tmparray(numRoutings), _STAT)
! local k index
k=0
do i=1,size(LocalRoutings)
if (mype==LocalRoutings(i)%srcPE) then
k=k+1
kseq(k) = LocalRoutings(i)%seqIdx
kdx(k) = i
end if
end do

call MPI_AllGatherV(kseq, nsdx, MP_Integer, &
tmparray, blocksizes, displ, MP_Integer, comm, status)
_VERIFY(STATUS)

deallocate(kseq)
do n=1,nDEs
do k=1,blocksizes(n)
i=tmparray(displ(n-1)+k)
karray(i)=k
end do
end do
deallocate(tmparray)

allocate(RoutingType, _STAT)
RoutingType%LocalRoutings => LocalRoutings
RoutingType%karray => karray
RoutingType%kdx => kdx
RoutingType%BlockSizes => BlockSizes
RoutingType%displ => displ

return

contains
Expand Down Expand Up @@ -6522,7 +6603,7 @@ subroutine RUN2 ( GC, IMPORT, EXPORT, CLOCK, RC )
!-----------------------------------------------------------------------

if (associated(DISCHARGE)) then
_ASSERT(associated(SURF_INTERNAL_STATE%LocalRoutings),'needs informative message')
_ASSERT(associated(SURF_INTERNAL_STATE%RoutingType),'needs informative message')
!ALT call MAPL_GetPointer(EXPORT, RUNOFF, 'RUNOFF', alloc=.true., RC=STATUS)
!ALT VERIFY_(STATUS)
end if
Expand Down Expand Up @@ -6989,7 +7070,7 @@ subroutine RUN2 ( GC, IMPORT, EXPORT, CLOCK, RC )

!ALT call MKTILE(RUNOFF ,RUNOFFTILE ,NT,RC=STATUS); VERIFY_(STATUS)
!ALT call MKTILE(DISCHARGE,DISCHARGETILE,NT,RC=STATUS); VERIFY_(STATUS)
if (associated(SURF_INTERNAL_STATE%LocalRoutings)) then ! routing file exists
if (associated(SURF_INTERNAL_STATE%RoutingType)) then ! routing file exists
allocate(DISCHARGETILE(NT),stat=STATUS); VERIFY_(STATUS)
DISCHARGETILE=MAPL_Undef
allocate(RUNOFFTILE(NT),stat=STATUS); VERIFY_(STATUS)
Expand Down Expand Up @@ -7140,7 +7221,7 @@ subroutine RUN2 ( GC, IMPORT, EXPORT, CLOCK, RC )

! Create discharge at exit tiles by routing runoff

call RouteRunoff(SURF_INTERNAL_STATE%LocalRoutings, RUNOFFTILE, DISCHARGETILE, RC=STATUS)
call RouteRunoff(SURF_INTERNAL_STATE%RoutingType, RUNOFFTILE, DISCHARGETILE, RC=STATUS)
VERIFY_(STATUS)

!-------------------------------------------------------------------------------------
Expand Down Expand Up @@ -10222,21 +10303,27 @@ subroutine FILLOUT_UNGRIDDED(STATE, NAME, TILE, XFORM, RC)

end subroutine FILLOUT_UNGRIDDED

subroutine RouteRunoff(Routing, Runoff, Discharge, rc)
type(T_Routing), intent(IN ) :: Routing(:)
subroutine RouteRunoff(RoutingType, Runoff, Discharge, rc)
type(T_RiverRouting), intent(IN ) :: RoutingType
real, intent(IN ) :: Runoff(:)
real, intent(OUT) :: Discharge(:)
integer, optional,intent(OUT) :: rc

character(len=ESMF_MAXSTR) :: IAm="RouteRunoff"
integer :: STATUS

type(T_Routing), pointer :: Routing(:)
integer, pointer :: karray(:)
integer, pointer :: kdx(:)
integer, pointer :: BlockSizes(:), displ(:)

type(ESMF_VM) :: VM
integer :: myPE, nDEs, comm
integer :: i
real :: TileDischarge
integer :: mpstatus(MP_STATUS_SIZE)

integer :: n, k
real, allocatable :: td(:), tarray(:)

call ESMF_VMGetCurrent(VM, RC=STATUS)
VERIFY_(STATUS)
Expand All @@ -10245,29 +10332,39 @@ subroutine RouteRunoff(Routing, Runoff, Discharge, rc)
call ESMF_VMGet (VM, localpet=MYPE, petcount=nDEs, RC=STATUS)
VERIFY_(STATUS)

Routing => RoutingType%LocalRoutings
karray => RoutingType%karray
kdx => RoutingType%kdx
BlockSizes => RoutingType%BlockSizes
displ => RoutingType%displ

Discharge = 0.0

n=size(kdx)
allocate(td(n), _STAT)
allocate(tarray(displ(nDEs)), _STAT)
do k=1,n
i=kdx(k)

TileDischarge = Runoff(Routing(i)%SrcIndex)*Routing(i)%weight
TileDischarge = max(TileDischarge, 0.0)
td(k) = TileDischarge
end do
call MPI_AllGatherV(td, n, MP_Real, &
tarray, blocksizes, displ, MP_Real, comm, status)
_VERIFY(STATUS)

do i=1,size(Routing)
if(Routing(i)%SrcPE==myPE) then
TileDischarge = Runoff(Routing(i)%SrcIndex)*Routing(i)%weight
TileDischarge = max(TileDischarge, 0.0)
if(Routing(i)%DstPE==myPE) then
Discharge(Routing(i)%DstIndex) = Discharge(Routing(i)%DstIndex) + TileDischarge
else
call MPI_Send(TileDischarge,1,MP_REAL,Routing(i)%DstPE,123,comm,status)
VERIFY_(STATUS)
end if
else
if(Routing(i)%DstPE==myPE) then
call MPI_Recv(TileDischarge,1,MP_REAL, Routing(i)%SrcPE,123,comm,mpstatus,status)
VERIFY_(STATUS)
Discharge(Routing(i)%DstIndex) = Discharge(Routing(i)%DstIndex) + TileDischarge
else
_ASSERT(.false.,'needs informative message')
end if
if(Routing(i)%DstPE==myPE) then
n=Routing(i)%srcPe
k=karray(Routing(i)%seqIdx)
TileDischarge=tarray(displ(n)+k)
Discharge(Routing(i)%DstIndex) = Discharge(Routing(i)%DstIndex) + TileDischarge
end if
end do

deallocate(td, _STAT)
deallocate(tarray, _STAT)

RETURN_(ESMF_SUCCESS)
end subroutine RouteRunoff

Expand Down