Bugfix for ghost cell exchange and nc atts

This commit is contained in:
Martin Lilleeng Sætra
2022-05-25 11:25:10 +00:00
parent 4d03e6cfbc
commit a588948e77
4 changed files with 124 additions and 86 deletions

View File

@@ -208,18 +208,17 @@ class MPISimulator(Simulator.BaseSimulator):
"""
def __init__(self, sim, grid):
self.profiling_data_mpi = { 'start': {}, 'end': {} }
self.profiling_data_mpi["start"]["t_step_mpi_halo_exchange"] = 0
self.profiling_data_mpi["end"]["t_step_mpi_halo_exchange"] = 0
self.profiling_data_mpi["start"]["t_step_mpi_halo_exchange_download"] = 0
self.profiling_data_mpi["end"]["t_step_mpi_halo_exchange_download"] = 0
self.profiling_data_mpi["start"]["t_step_mpi_halo_exchange_upload"] = 0
self.profiling_data_mpi["end"]["t_step_mpi_halo_exchange_upload"] = 0
self.profiling_data_mpi["start"]["t_step_mpi_halo_exchange_sendreceive"] = 0
self.profiling_data_mpi["end"]["t_step_mpi_halo_exchange_sendreceive"] = 0
self.profiling_data_mpi["start"]["t_step_mpi"] = 0
self.profiling_data_mpi["end"]["t_step_mpi"] = 0
self.profiling_data_mpi["start"]["t_mpi_halo_exchange"] = 0
self.profiling_data_mpi["end"]["t_mpi_halo_exchange"] = 0
self.profiling_data_mpi["start"]["t_mpi_halo_exchange_download"] = 0
self.profiling_data_mpi["end"]["t_mpi_halo_exchange_download"] = 0
self.profiling_data_mpi["start"]["t_mpi_halo_exchange_upload"] = 0
self.profiling_data_mpi["end"]["t_mpi_halo_exchange_upload"] = 0
self.profiling_data_mpi["start"]["t_mpi_halo_exchange_sendreceive"] = 0
self.profiling_data_mpi["end"]["t_mpi_halo_exchange_sendreceive"] = 0
self.profiling_data_mpi["start"]["t_mpi_step"] = 0
self.profiling_data_mpi["end"]["t_mpi_step"] = 0
self.profiling_data_mpi["n_time_steps"] = 0
self.profiling_data_mpi["start"]["t_sim_mpi_init"] = time.time()
self.logger = logging.getLogger(__name__)
autotuner = sim.context.autotuner
@@ -297,43 +296,43 @@ class MPISimulator(Simulator.BaseSimulator):
#Note that east and west also transfer ghost cells
#whilst north/south only transfer internal cells
#Reuses the width/height defined in the read-extets above
self.in_e = cuda.pagelocked_empty((int(self.nvars), int(self.read_e[3]), int(self.read_e[2])), dtype=np.float32, mem_flags=cuda.host_alloc_flags.PORTABLE) #np.empty((self.nvars, self.read_e[3], self.read_e[2]), dtype=np.float32)
self.in_w = cuda.pagelocked_empty((int(self.nvars), int(self.read_w[3]), int(self.read_w[2])), dtype=np.float32, mem_flags=cuda.host_alloc_flags.PORTABLE) #np.empty((self.nvars, self.read_w[3], self.read_w[2]), dtype=np.float32)
self.in_n = cuda.pagelocked_empty((int(self.nvars), int(self.read_n[3]), int(self.read_n[2])), dtype=np.float32, mem_flags=cuda.host_alloc_flags.PORTABLE) #np.empty((self.nvars, self.read_n[3], self.read_n[2]), dtype=np.float32)
self.in_s = cuda.pagelocked_empty((int(self.nvars), int(self.read_s[3]), int(self.read_s[2])), dtype=np.float32, mem_flags=cuda.host_alloc_flags.PORTABLE) #np.empty((self.nvars, self.read_s[3], self.read_s[2]), dtype=np.float32)
self.in_e = cuda.pagelocked_empty((int(self.nvars), int(self.read_e[3]), int(self.read_e[2])), dtype=np.float32) #np.empty((self.nvars, self.read_e[3], self.read_e[2]), dtype=np.float32)
self.in_w = cuda.pagelocked_empty((int(self.nvars), int(self.read_w[3]), int(self.read_w[2])), dtype=np.float32) #np.empty((self.nvars, self.read_w[3], self.read_w[2]), dtype=np.float32)
self.in_n = cuda.pagelocked_empty((int(self.nvars), int(self.read_n[3]), int(self.read_n[2])), dtype=np.float32) #np.empty((self.nvars, self.read_n[3], self.read_n[2]), dtype=np.float32)
self.in_s = cuda.pagelocked_empty((int(self.nvars), int(self.read_s[3]), int(self.read_s[2])), dtype=np.float32) #np.empty((self.nvars, self.read_s[3], self.read_s[2]), dtype=np.float32)
#Allocate data for sending
self.out_e = cuda.pagelocked_empty((int(self.nvars), int(self.read_e[3]), int(self.read_e[2])), dtype=np.float32, mem_flags=cuda.host_alloc_flags.PORTABLE) #np.empty_like(self.in_e)
self.out_w = cuda.pagelocked_empty((int(self.nvars), int(self.read_w[3]), int(self.read_w[2])), dtype=np.float32, mem_flags=cuda.host_alloc_flags.PORTABLE) #np.empty_like(self.in_w)
self.out_n = cuda.pagelocked_empty((int(self.nvars), int(self.read_n[3]), int(self.read_n[2])), dtype=np.float32, mem_flags=cuda.host_alloc_flags.PORTABLE) #np.empty_like(self.in_n)
self.out_s = cuda.pagelocked_empty((int(self.nvars), int(self.read_s[3]), int(self.read_s[2])), dtype=np.float32, mem_flags=cuda.host_alloc_flags.PORTABLE) #np.empty_like(self.in_s)
self.out_e = cuda.pagelocked_empty((int(self.nvars), int(self.read_e[3]), int(self.read_e[2])), dtype=np.float32) #np.empty_like(self.in_e)
self.out_w = cuda.pagelocked_empty((int(self.nvars), int(self.read_w[3]), int(self.read_w[2])), dtype=np.float32) #np.empty_like(self.in_w)
self.out_n = cuda.pagelocked_empty((int(self.nvars), int(self.read_n[3]), int(self.read_n[2])), dtype=np.float32) #np.empty_like(self.in_n)
self.out_s = cuda.pagelocked_empty((int(self.nvars), int(self.read_s[3]), int(self.read_s[2])), dtype=np.float32) #np.empty_like(self.in_s)
self.logger.debug("Simlator rank {:d} initialized on {:s}".format(self.grid.comm.rank, MPI.Get_processor_name()))
self.profiling_data_mpi["end"]["t_sim_mpi_init"] = time.time()
self.old_exchange()
self.full_exchange()
sim.context.synchronize()
def substep(self, dt, step_number):
nvtx.mark("substep start", color="yellow")
self.profiling_data_mpi["start"]["t_step_mpi"] += time.time()
self.profiling_data_mpi["start"]["t_mpi_step"] += time.time()
nvtx.mark("substep external", color="blue")
self.sim.substep(dt, step_number, external=True, internal=False) # only "internal ghost cells"
nvtx.mark("substep internal", color="red")
self.sim.substep(dt, step_number, internal=True, external=False) # "internal ghost cells" excluded
nvtx.mark("substep external", color="blue")
self.sim.substep(dt, step_number, external=True, internal=False) # only "internal ghost cells"
#nvtx.mark("substep full", color="blue")
#self.sim.substep(dt, step_number, external=True, internal=True)
self.sim.swapBuffers()
self.profiling_data_mpi["end"]["t_step_mpi"] += time.time()
self.profiling_data_mpi["end"]["t_mpi_step"] += time.time()
nvtx.mark("exchange", color="blue")
self.old_exchange()
self.full_exchange()
#nvtx.mark("download", color="blue")
#self.download_for_exchange(self.sim.u0)
@@ -383,8 +382,7 @@ class MPISimulator(Simulator.BaseSimulator):
return [x0, x1, y0, y1]
def download_for_exchange(self, u):
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["start"]["t_step_mpi_halo_exchange_download"] += time.time()
self.profiling_data_mpi["start"]["t_mpi_halo_exchange_download"] += time.time()
# North-south
if self.north is not None:
@@ -406,12 +404,10 @@ class MPISimulator(Simulator.BaseSimulator):
u[k].download(self.sim.stream, cpu_data=self.out_w[k,:,:], asynch=True, extent=self.read_w)
#self.out_w[k,:,:] = u[k].download(self.sim.stream, asynch=True, extent=self.read_w)
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["end"]["t_step_mpi_halo_exchange_download"] += time.time()
self.profiling_data_mpi["end"]["t_mpi_halo_exchange_download"] += time.time()
def exchange(self):
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["start"]["t_step_mpi_halo_exchange_sendreceive"] += time.time()
self.profiling_data_mpi["start"]["t_mpi_halo_exchange_sendreceive"] += time.time()
#Send/receive to north/south neighbours
comm_send = []
@@ -441,12 +437,10 @@ class MPISimulator(Simulator.BaseSimulator):
for comm in comm_send:
comm.wait()
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["end"]["t_step_mpi_halo_exchange_sendreceive"] += time.time()
self.profiling_data_mpi["end"]["t_mpi_halo_exchange_sendreceive"] += time.time()
def upload_for_exchange(self, u):
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["start"]["t_step_mpi_halo_exchange_upload"] += time.time()
self.profiling_data_mpi["start"]["t_mpi_halo_exchange_upload"] += time.time()
# North-south
if self.north is not None:
@@ -464,15 +458,11 @@ class MPISimulator(Simulator.BaseSimulator):
for k in range(self.nvars):
u[k].upload(self.sim.stream, self.in_w[k,:,:], extent=self.write_w)
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["end"]["t_step_mpi_halo_exchange_upload"] += time.time()
self.profiling_data_mpi["end"]["t_mpi_halo_exchange_upload"] += time.time()
def old_exchange(self):
def full_exchange(self):
####
# FIXME: This function can be optimized using persitent communications.
# Also by overlapping some of the communications north/south and east/west of GPU and intra-node
@@ -484,8 +474,7 @@ class MPISimulator(Simulator.BaseSimulator):
####
#Download from the GPU
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["start"]["t_step_mpi_halo_exchange_download"] += time.time()
self.profiling_data_mpi["start"]["t_mpi_halo_exchange_download"] += time.time()
if self.north is not None:
for k in range(self.nvars):
@@ -495,10 +484,10 @@ class MPISimulator(Simulator.BaseSimulator):
self.sim.u0[k].download(self.sim.stream, cpu_data=self.out_s[k,:,:], asynch=True, extent=self.read_s)
self.sim.stream.synchronize()
self.profiling_data_mpi["end"]["t_mpi_halo_exchange_download"] += time.time()
#Send/receive to north/south neighbours
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["end"]["t_step_mpi_halo_exchange_download"] += time.time()
self.profiling_data_mpi["start"]["t_step_mpi_halo_exchange_sendreceive"] += time.time()
self.profiling_data_mpi["start"]["t_mpi_halo_exchange_sendreceive"] += time.time()
comm_send = []
comm_recv = []
@@ -513,10 +502,10 @@ class MPISimulator(Simulator.BaseSimulator):
for comm in comm_recv:
comm.wait()
self.profiling_data_mpi["end"]["t_mpi_halo_exchange_sendreceive"] += time.time()
#Upload to the GPU
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["end"]["t_step_mpi_halo_exchange_sendreceive"] += time.time()
self.profiling_data_mpi["start"]["t_step_mpi_halo_exchange_upload"] += time.time()
self.profiling_data_mpi["start"]["t_mpi_halo_exchange_upload"] += time.time()
if self.north is not None:
for k in range(self.nvars):
@@ -524,25 +513,23 @@ class MPISimulator(Simulator.BaseSimulator):
if self.south is not None:
for k in range(self.nvars):
self.sim.u0[k].upload(self.sim.stream, self.in_s[k,:,:], extent=self.write_s)
self.profiling_data_mpi["end"]["t_mpi_halo_exchange_upload"] += time.time()
#Wait for sending to complete
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["end"]["t_step_mpi_halo_exchange_upload"] += time.time()
self.profiling_data_mpi["start"]["t_step_mpi_halo_exchange_sendreceive"] += time.time()
self.profiling_data_mpi["start"]["t_mpi_halo_exchange_sendreceive"] += time.time()
for comm in comm_send:
comm.wait()
self.profiling_data_mpi["end"]["t_mpi_halo_exchange_sendreceive"] += time.time()
####
# Then transfer east-west including ghost cells that have been filled in by north-south transfer above
####
#Download from the GPU
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["end"]["t_step_mpi_halo_exchange_sendreceive"] += time.time()
self.profiling_data_mpi["start"]["t_step_mpi_halo_exchange_download"] += time.time()
self.profiling_data_mpi["start"]["t_mpi_halo_exchange_download"] += time.time()
if self.east is not None:
for k in range(self.nvars):
@@ -552,10 +539,10 @@ class MPISimulator(Simulator.BaseSimulator):
self.sim.u0[k].download(self.sim.stream, cpu_data=self.out_w[k,:,:], asynch=True, extent=self.read_w)
self.sim.stream.synchronize()
self.profiling_data_mpi["end"]["t_mpi_halo_exchange_download"] += time.time()
#Send/receive to east/west neighbours
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["end"]["t_step_mpi_halo_exchange_download"] += time.time()
self.profiling_data_mpi["start"]["t_step_mpi_halo_exchange_sendreceive"] += time.time()
self.profiling_data_mpi["start"]["t_mpi_halo_exchange_sendreceive"] += time.time()
comm_send = []
comm_recv = []
@@ -566,15 +553,14 @@ class MPISimulator(Simulator.BaseSimulator):
comm_send += [self.grid.comm.Isend(self.out_w, dest=self.west, tag=4*self.nt + 3)]
comm_recv += [self.grid.comm.Irecv(self.in_w, source=self.west, tag=4*self.nt + 2)]
#Wait for incoming transfers to complete
for comm in comm_recv:
comm.wait()
self.profiling_data_mpi["end"]["t_mpi_halo_exchange_sendreceive"] += time.time()
#Upload to the GPU
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["end"]["t_step_mpi_halo_exchange_sendreceive"] += time.time()
self.profiling_data_mpi["start"]["t_step_mpi_halo_exchange_upload"] += time.time()
self.profiling_data_mpi["start"]["t_mpi_halo_exchange_upload"] += time.time()
if self.east is not None:
for k in range(self.nvars):
@@ -583,13 +569,12 @@ class MPISimulator(Simulator.BaseSimulator):
for k in range(self.nvars):
self.sim.u0[k].upload(self.sim.stream, self.in_w[k,:,:], extent=self.write_w)
self.profiling_data_mpi["end"]["t_mpi_halo_exchange_upload"] += time.time()
#Wait for sending to complete
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["end"]["t_step_mpi_halo_exchange_upload"] += time.time()
self.profiling_data_mpi["start"]["t_step_mpi_halo_exchange_sendreceive"] += time.time()
self.profiling_data_mpi["start"]["t_mpi_halo_exchange_sendreceive"] += time.time()
for comm in comm_send:
comm.wait()
if self.profiling_data_mpi["n_time_steps"] > 0:
self.profiling_data_mpi["end"]["t_step_mpi_halo_exchange_sendreceive"] += time.time()
self.profiling_data_mpi["end"]["t_mpi_halo_exchange_sendreceive"] += time.time()