Async mem ops

This commit is contained in:
Martin Lilleeng Sætra
2022-04-26 11:34:29 +00:00
parent dcef56a1b9
commit 6c8bac6f7b
6 changed files with 314 additions and 57 deletions

View File

@@ -26,6 +26,8 @@ import numpy as np
from mpi4py import MPI
import time
import pycuda.driver as cuda
import nvtx
@@ -239,25 +241,36 @@ class MPISimulator(Simulator.BaseSimulator):
#Get coordinate of this node
#and handle global boundary conditions
#new_boundary_conditions = Simulator.BoundaryCondition({
# 'north': Simulator.BoundaryCondition.Type.Dirichlet,
# 'south': Simulator.BoundaryCondition.Type.Dirichlet,
# 'east': Simulator.BoundaryCondition.Type.Dirichlet,
# 'west': Simulator.BoundaryCondition.Type.Dirichlet
#})
new_boundary_conditions = Simulator.BoundaryCondition({
'north': Simulator.BoundaryCondition.Type.Dirichlet,
'south': Simulator.BoundaryCondition.Type.Dirichlet,
'east': Simulator.BoundaryCondition.Type.Dirichlet,
'west': Simulator.BoundaryCondition.Type.Dirichlet
'north': Simulator.BoundaryCondition.Type.Reflective,
'south': Simulator.BoundaryCondition.Type.Reflective,
'east': Simulator.BoundaryCondition.Type.Reflective,
'west': Simulator.BoundaryCondition.Type.Reflective
})
gi, gj = grid.getCoordinate()
print("gi: " + str(gi) + ", gj: " + str(gj))
if (gi == 0 and boundary_conditions.west != Simulator.BoundaryCondition.Type.Periodic):
self.west = None
new_boundary_conditions.west = boundary_conditions.west;
print("Rank: " + str(self.grid.comm.rank) + ": WEST")
if (gj == 0 and boundary_conditions.south != Simulator.BoundaryCondition.Type.Periodic):
self.south = None
new_boundary_conditions.south = boundary_conditions.south;
print("Rank: " + str(self.grid.comm.rank) + ": SOUTH")
if (gi == grid.grid[0]-1 and boundary_conditions.east != Simulator.BoundaryCondition.Type.Periodic):
self.east = None
new_boundary_conditions.east = boundary_conditions.east;
print("Rank: " + str(self.grid.comm.rank) + ": EAST")
if (gj == grid.grid[1]-1 and boundary_conditions.north != Simulator.BoundaryCondition.Type.Periodic):
self.north = None
new_boundary_conditions.north = boundary_conditions.north;
print("Rank: " + str(self.grid.comm.rank) + ": NORTH")
sim.setBoundaryConditions(new_boundary_conditions)
#Get number of variables
@@ -286,36 +299,52 @@ class MPISimulator(Simulator.BaseSimulator):
#Note that east and west also transfer ghost cells
#whilst north/south only transfer internal cells
#Reuses the width/height defined in the read-extets above
self.in_e = np.empty((self.nvars, self.read_e[3], self.read_e[2]), dtype=np.float32)
self.in_w = np.empty((self.nvars, self.read_w[3], self.read_w[2]), dtype=np.float32)
self.in_n = np.empty((self.nvars, self.read_n[3], self.read_n[2]), dtype=np.float32)
self.in_s = np.empty((self.nvars, self.read_s[3], self.read_s[2]), dtype=np.float32)
self.in_e = cuda.pagelocked_empty((int(self.nvars), int(self.read_e[3]), int(self.read_e[2])), dtype=np.float32, mem_flags=cuda.host_alloc_flags.PORTABLE) #np.empty((self.nvars, self.read_e[3], self.read_e[2]), dtype=np.float32)
self.in_w = cuda.pagelocked_empty((int(self.nvars), int(self.read_w[3]), int(self.read_w[2])), dtype=np.float32, mem_flags=cuda.host_alloc_flags.PORTABLE) #np.empty((self.nvars, self.read_w[3], self.read_w[2]), dtype=np.float32)
self.in_n = cuda.pagelocked_empty((int(self.nvars), int(self.read_n[3]), int(self.read_n[2])), dtype=np.float32, mem_flags=cuda.host_alloc_flags.PORTABLE) #np.empty((self.nvars, self.read_n[3], self.read_n[2]), dtype=np.float32)
self.in_s = cuda.pagelocked_empty((int(self.nvars), int(self.read_s[3]), int(self.read_s[2])), dtype=np.float32, mem_flags=cuda.host_alloc_flags.PORTABLE) #np.empty((self.nvars, self.read_s[3], self.read_s[2]), dtype=np.float32)
#Allocate data for sending
self.out_e = np.empty_like(self.in_e)
self.out_w = np.empty_like(self.in_w)
self.out_n = np.empty_like(self.in_n)
self.out_s = np.empty_like(self.in_s)
self.out_e = cuda.pagelocked_empty((int(self.nvars), int(self.read_e[3]), int(self.read_e[2])), dtype=np.float32, mem_flags=cuda.host_alloc_flags.PORTABLE) #np.empty_like(self.in_e)
self.out_w = cuda.pagelocked_empty((int(self.nvars), int(self.read_w[3]), int(self.read_w[2])), dtype=np.float32, mem_flags=cuda.host_alloc_flags.PORTABLE) #np.empty_like(self.in_w)
self.out_n = cuda.pagelocked_empty((int(self.nvars), int(self.read_n[3]), int(self.read_n[2])), dtype=np.float32, mem_flags=cuda.host_alloc_flags.PORTABLE) #np.empty_like(self.in_n)
self.out_s = cuda.pagelocked_empty((int(self.nvars), int(self.read_s[3]), int(self.read_s[2])), dtype=np.float32, mem_flags=cuda.host_alloc_flags.PORTABLE) #np.empty_like(self.in_s)
self.logger.debug("Simlator rank {:d} initialized on {:s}".format(self.grid.comm.rank, MPI.Get_processor_name()))
self.profiling_data_mpi["end"]["t_sim_mpi_init"] = time.time()
def substep(self, dt, step_number):
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["start"]["t_step_mpi_halo_exchange"] += time.time()
#Init ghost cells (with data from neighboring subdomains)
self.download_for_exchange(self.sim.u0)
self.exchange()
self.upload_for_exchange(self.sim.u0)
def substep(self, dt, step_number):
nvtx.mark("substep start", color="red")
self.sim.stream.synchronize() # only necessary for profiling!
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["end"]["t_step_mpi_halo_exchange"] += time.time()
self.profiling_data_mpi["start"]["t_step_mpi"] += time.time()
self.profiling_data_mpi["start"]["t_step_mpi"] += time.time()
nvtx.mark("substep internal", color="red")
self.sim.substep(dt, step_number, internal=True, external=False) # "internal ghost cells" excluded
self.profiling_data_mpi["end"]["t_step_mpi"] += time.time()
self.sim.substep(dt, step_number)
self.profiling_data_mpi["start"]["t_step_mpi"] += time.time()
nvtx.mark("substep external", color="blue")
self.sim.substep(dt, step_number, external=True, internal=False) # only "internal ghost cells"
self.profiling_data_mpi["end"]["t_step_mpi"] += time.time()
self.sim.stream.synchronize() # only necessary for profiling!
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["end"]["t_step_mpi"] += time.time()
# NOTE: Need to download from u1, as u0<->u1 switch is not performed yet
nvtx.mark("download", color="red")
self.sim.swapBuffers()
self.download_for_exchange(self.sim.u0)
nvtx.mark("sync", color="red")
self.sim.stream.synchronize()
nvtx.mark("MPI", color="green")
self.exchange()
nvtx.mark("upload", color="red")
self.upload_for_exchange(self.sim.u0)
self.sim.internal_stream.synchronize()
self.profiling_data_mpi["n_time_steps"] += 1
def getOutput(self):
@@ -348,8 +377,114 @@ class MPISimulator(Simulator.BaseSimulator):
x1 = x0 + width
y1 = y0 + height
return [x0, x1, y0, y1]
def download_for_exchange(self, u):
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["start"]["t_step_mpi_halo_exchange_download"] += time.time()
# North-south
if self.north is not None:
for k in range(self.nvars):
u[k].download(self.sim.stream, cpu_data=self.out_n[k,:,:], asynch=True, extent=self.read_n)
#self.out_n[k,:,:] = u[k].download(self.sim.stream, asynch=True, extent=self.read_n)
if self.south is not None:
for k in range(self.nvars):
u[k].download(self.sim.stream, cpu_data=self.out_s[k,:,:], asynch=True, extent=self.read_s)
#self.out_s[k,:,:] = u[k].download(self.sim.stream, asynch=True, extent=self.read_s)
# East-west
if self.east is not None:
for k in range(self.nvars):
u[k].download(self.sim.stream, cpu_data=self.out_e[k,:,:], asynch=True, extent=self.read_e)
#self.out_e[k,:,:] = u[k].download(self.sim.stream, asynch=True, extent=self.read_e)
if self.west is not None:
for k in range(self.nvars):
u[k].download(self.sim.stream, cpu_data=self.out_w[k,:,:], asynch=True, extent=self.read_w)
#self.out_w[k,:,:] = u[k].download(self.sim.stream, asynch=True, extent=self.read_w)
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["end"]["t_step_mpi_halo_exchange_download"] += time.time()
def exchange(self):
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["start"]["t_step_mpi_halo_exchange_sendreceive"] += time.time()
####
# First transfer internal cells north-south
####
#Send/receive to north/south neighbours
comm_send = []
comm_recv = []
if self.north is not None:
comm_send += [self.grid.comm.Isend(self.out_n, dest=self.north, tag=4*self.nt + 0)]
comm_recv += [self.grid.comm.Irecv(self.in_n, source=self.north, tag=4*self.nt + 1)]
if self.south is not None:
comm_send += [self.grid.comm.Isend(self.out_s, dest=self.south, tag=4*self.nt + 1)]
comm_recv += [self.grid.comm.Irecv(self.in_s, source=self.south, tag=4*self.nt + 0)]
#Wait for incoming transfers to complete
for comm in comm_recv:
comm.wait()
#Wait for sending to complete
for comm in comm_send:
comm.wait()
####
# Then transfer east-west including ghost cells that have been filled in by north-south transfer above
####
#Send/receive to east/west neighbours
comm_send = []
comm_recv = []
if self.east is not None:
comm_send += [self.grid.comm.Isend(self.out_e, dest=self.east, tag=4*self.nt + 2)]
comm_recv += [self.grid.comm.Irecv(self.in_e, source=self.east, tag=4*self.nt + 3)]
if self.west is not None:
comm_send += [self.grid.comm.Isend(self.out_w, dest=self.west, tag=4*self.nt + 3)]
comm_recv += [self.grid.comm.Irecv(self.in_w, source=self.west, tag=4*self.nt + 2)]
#Wait for incoming transfers to complete
for comm in comm_recv:
comm.wait()
#Wait for sending to complete
for comm in comm_send:
comm.wait()
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["end"]["t_step_mpi_halo_exchange_sendreceive"] += time.time()
def upload_for_exchange(self, u):
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["start"]["t_step_mpi_halo_exchange_upload"] += time.time()
# North-south
if self.north is not None:
for k in range(self.nvars):
u[k].upload(self.sim.stream, self.in_n[k,:,:], extent=self.write_n)
if self.south is not None:
for k in range(self.nvars):
u[k].upload(self.sim.stream, self.in_s[k,:,:], extent=self.write_s)
# East-west
if self.east is not None:
for k in range(self.nvars):
u[k].upload(self.sim.stream, self.in_e[k,:,:], extent=self.write_e)
if self.west is not None:
for k in range(self.nvars):
u[k].upload(self.sim.stream, self.in_w[k,:,:], extent=self.write_w)
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["end"]["t_step_mpi_halo_exchange_upload"] += time.time()
def old_exchange(self):
####
# FIXME: This function can be optimized using persitent communications.
# Also by overlapping some of the communications north/south and east/west of GPU and intra-node
@@ -470,4 +605,3 @@ class MPISimulator(Simulator.BaseSimulator):
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["end"]["t_step_mpi_halo_exchange_sendreceive"] += time.time()