From 2da46408405d9efe2f43ea141e60faa329967fe8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20Lilleeng=20S=C3=A6tra?= Date: Wed, 5 May 2021 15:24:34 +0000 Subject: [PATCH] Added class for managing SHMEMSimulators --- GPUSimulators/SHMEMSimulator.py | 184 +++----------- GPUSimulators/SHMEMSimulatorGroup.py | 353 +++++++++++++++++++++++++++ shmemTesting.py | 52 ++-- 3 files changed, 419 insertions(+), 170 deletions(-) create mode 100644 GPUSimulators/SHMEMSimulatorGroup.py diff --git a/GPUSimulators/SHMEMSimulator.py b/GPUSimulators/SHMEMSimulator.py index d0dcc54..5c86474 100644 --- a/GPUSimulators/SHMEMSimulator.py +++ b/GPUSimulators/SHMEMSimulator.py @@ -21,139 +21,17 @@ along with this program. If not, see . import logging -from GPUSimulators import Simulator, CudaContext +from GPUSimulators import Simulator import numpy as np -import pycuda.driver as cuda - -class SHMEMGrid(object): - """ - Class which represents an SHMEM grid of GPUs. Facilitates easy communication between - neighboring subdomains in the grid. Contains one CUDA context per subdomain. - """ - def __init__(self, ngpus=None, ndims=2): - self.logger = logging.getLogger(__name__) - - cuda.init(flags=0) - self.logger.info("Initializing CUDA") - num_cuda_devices = cuda.Device.count() - - if ngpus is None: - ngpus = num_cuda_devices - - assert ngpus <= num_cuda_devices, "Trying to allocate more GPUs than are available in the system." - assert ndims == 2, "Unsupported number of dimensions. Must be two at the moment" - assert ngpus >= 2, "Must have at least two GPUs available to run multi-GPU simulations." - - self.ngpus = ngpus - self.ndims = ndims - - self.grid = SHMEMGrid.getGrid(self.ngpus, self.ndims) - - self.logger.debug("Created {:}-dimensional SHMEM grid, using {:} GPUs".format( - self.ndims, self.ngpus)) - - self.cuda_contexts = [] - - for i in range(self.ngpus): - self.cuda_contexts.append(CudaContext.CudaContext(device=i, autotuning=False)) - - def getCoordinate(self, index): - i = (index % self.grid[0]) - j = (index // self.grid[0]) - return i, j - - def getIndex(self, i, j): - return j*self.grid[0] + i - - def getEast(self, index): - i, j = self.getCoordinate(index) - i = (i+1) % self.grid[0] - return self.getIndex(i, j) - - def getWest(self, index): - i, j = self.getCoordinate(index) - i = (i+self.grid[0]-1) % self.grid[0] - return self.getIndex(i, j) - - def getNorth(self, index): - i, j = self.getCoordinate(index) - j = (j+1) % self.grid[1] - return self.getIndex(i, j) - - def getSouth(self, index): - i, j = self.getCoordinate(index) - j = (j+self.grid[1]-1) % self.grid[1] - return self.getIndex(i, j) - - def getGrid(num_gpus, num_dims): - assert(isinstance(num_gpus, int)) - assert(isinstance(num_dims, int)) - - # Adapted from https://stackoverflow.com/questions/28057307/factoring-a-number-into-roughly-equal-factors - # Original code by https://stackoverflow.com/users/3928385/ishamael - # Factorizes a number into n roughly equal factors - - #Dictionary to remember already computed permutations - memo = {} - def dp(n, left): # returns tuple (cost, [factors]) - """ - Recursively searches through all factorizations - """ - - #Already tried: return existing result - if (n, left) in memo: - return memo[(n, left)] - - #Spent all factors: return number itself - if left == 1: - return (n, [n]) - - #Find new factor - i = 2 - best = n - bestTuple = [n] - while i * i < n: - #If factor found - if n % i == 0: - #Factorize remainder - rem = dp(n // i, left - 1) - - #If new permutation better, save it - if rem[0] + i < best: - best = rem[0] + i - bestTuple = [i] + rem[1] - i += 1 - - #Store calculation - memo[(n, left)] = (best, bestTuple) - return memo[(n, left)] - - - grid = dp(num_gpus, num_dims)[1] - - if (len(grid) < num_dims): - #Split problematic 4 - if (4 in grid): - grid.remove(4) - grid.append(2) - grid.append(2) - - #Pad with ones to guarantee num_dims - grid = grid + [1]*(num_dims - len(grid)) - - #Sort in descending order - grid = np.sort(grid) - grid = grid[::-1] - - return grid - class SHMEMSimulator(Simulator.BaseSimulator): """ Class which handles communication between simulators on different GPUs + + NOTE: This class is only intended to be used by SHMEMSimulatorGroup """ - def __init__(self, sim, grid): + def __init__(self, index, sim, grid): self.logger = logging.getLogger(__name__) autotuner = sim.context.autotuner @@ -168,14 +46,15 @@ class SHMEMSimulator(Simulator.BaseSimulator): sim.block_size[0], sim.block_size[1]) sim.context.autotuner = autotuner + self.index = index self.sim = sim self.grid = grid #Get neighbor subdomain ids - self.east = grid.getEast() - self.west = grid.getWest() - self.north = grid.getNorth() - self.south = grid.getSouth() + self.east = grid.getEast(self.index) + self.west = grid.getWest(self.index) + self.north = grid.getNorth(self.index) + self.south = grid.getSouth(self.index) #Get coordinate of this subdomain #and handle global boundary conditions @@ -185,7 +64,7 @@ class SHMEMSimulator(Simulator.BaseSimulator): 'east': Simulator.BoundaryCondition.Type.Dirichlet, 'west': Simulator.BoundaryCondition.Type.Dirichlet }) - gi, gj = grid.getCoordinate() + gi, gj = grid.getCoordinate(self.index) if (gi == 0 and boundary_conditions.west != Simulator.BoundaryCondition.Type.Periodic): self.west = None new_boundary_conditions.west = boundary_conditions.west; @@ -237,7 +116,7 @@ class SHMEMSimulator(Simulator.BaseSimulator): self.out_n = np.empty_like(self.in_n) self.out_s = np.empty_like(self.in_s) - self.logger.debug("Simlator subdomain {:d} initialized on {:s}".format(self.grid.comm.rank, MPI.Get_processor_name())) + self.logger.debug("Simlator subdomain {:d} initialized on context {:s}".format(self.index, sim.context)) def substep(self, dt, step_number): @@ -252,23 +131,22 @@ class SHMEMSimulator(Simulator.BaseSimulator): def check(self): return self.sim.check() - + def computeDt(self): - local_dt = np.array([np.float32(self.sim.computeDt())]); + local_dt = np.array([np.float32(self.sim.computeDt())]) global_dt = np.empty(1, dtype=np.float32) self.grid.comm.Allreduce(local_dt, global_dt, op=MPI.MIN) self.logger.debug("Local dt: {:f}, global dt: {:f}".format(local_dt[0], global_dt[0])) return global_dt[0] - def getExtent(self): """ - Function which returns the extent of node with rank - rank in the grid + Function which returns the extent of the subdomain with index + index in the grid """ width = self.sim.nx*self.sim.dx height = self.sim.ny*self.sim.dy - i, j = self.grid.getCoordinate() + i, j = self.grid.getCoordinate(self.index) x0 = i * width y0 = j * height x1 = x0 + width @@ -276,6 +154,19 @@ class SHMEMSimulator(Simulator.BaseSimulator): return [x0, x1, y0, y1] def exchange(self): + ns_download_before_exchange() + # GLOBAL SYNC + ns_do_exchange() + # GLOBAL SYNC + ns_upload_after_exchange() + + ew_download_before_exchange() + # GLOBAL SYNC + ew_do_exchange() + # GLOBAL SYNC + ew_upload_after_exchange() + + def ns_download_before_exchange(self): #### # First transfer internal cells north-south #### @@ -288,7 +179,8 @@ class SHMEMSimulator(Simulator.BaseSimulator): for k in range(self.nvars): self.sim.u0[k].download(self.sim.stream, cpu_data=self.out_s[k,:,:], asynch=True, extent=self.read_s) self.sim.stream.synchronize() - + + def ns_do_exchange(self): #Send/receive to north/south neighbours comm_send = [] comm_recv = [] @@ -302,7 +194,8 @@ class SHMEMSimulator(Simulator.BaseSimulator): #Wait for incoming transfers to complete for comm in comm_recv: comm.wait() - + + def ns_upload_after_exchange(self): #Upload to the GPU if self.north is not None: for k in range(self.nvars): @@ -314,9 +207,8 @@ class SHMEMSimulator(Simulator.BaseSimulator): #Wait for sending to complete for comm in comm_send: comm.wait() - - - + + def ew_download_before_exchange(self): #### # Then transfer east-west including ghost cells that have been filled in by north-south transfer above #### @@ -329,7 +221,8 @@ class SHMEMSimulator(Simulator.BaseSimulator): for k in range(self.nvars): self.sim.u0[k].download(self.sim.stream, cpu_data=self.out_w[k,:,:], asynch=True, extent=self.read_w) self.sim.stream.synchronize() - + + def ew_do_exchange(self): #Send/receive to east/west neighbours comm_send = [] comm_recv = [] @@ -344,7 +237,8 @@ class SHMEMSimulator(Simulator.BaseSimulator): #Wait for incoming transfers to complete for comm in comm_recv: comm.wait() - + + def ew_upload_after_exchange(self): #Upload to the GPU if self.east is not None: for k in range(self.nvars): diff --git a/GPUSimulators/SHMEMSimulatorGroup.py b/GPUSimulators/SHMEMSimulatorGroup.py new file mode 100644 index 0000000..2615e2b --- /dev/null +++ b/GPUSimulators/SHMEMSimulatorGroup.py @@ -0,0 +1,353 @@ +# -*- coding: utf-8 -*- + +""" +This python module implements SHMEM simulator group class + +Copyright (C) 2020 Norwegian Meteorological Institute + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" + + +import logging +from GPUSimulators import Simulator, CudaContext +import numpy as np + +import pycuda.driver as cuda + + +class SHMEMGrid(object): + """ + Class which represents an SHMEM grid of GPUs. Facilitates easy communication between + neighboring subdomains in the grid. Contains one CUDA context per subdomain. + + XXX: Adapted to debug on a single GPU. Either remove this possibility or + make it less hacky... + """ + def __init__(self, ngpus=None, ndims=2): + self.logger = logging.getLogger(__name__) + + cuda.init(flags=0) + self.logger.info("Initializing CUDA") + num_cuda_devices = cuda.Device.count() + + if ngpus is None: + #ngpus = num_cuda_devices + ngpus = 2 + + #assert ngpus <= num_cuda_devices, "Trying to allocate more GPUs than are available in the system." + assert ndims == 2, "Unsupported number of dimensions. Must be two at the moment" + #assert ngpus >= 2, "Must have at least two GPUs available to run multi-GPU simulations." + + self.ngpus = ngpus + self.ndims = ndims + + self.grid = SHMEMGrid.getGrid(self.ngpus, self.ndims) + + self.logger.debug("Created {:}-dimensional SHMEM grid, using {:} GPUs".format( + self.ndims, self.ngpus)) + + # XXX: Is this a natural place to store the contexts? Consider moving contexts out of this class. + self.cuda_contexts = [] + + for i in range(self.ngpus): + #self.cuda_contexts.append(CudaContext.CudaContext(device=i, autotuning=False)) + self.cuda_contexts.append(CudaContext.CudaContext(device=0, autotuning=False)) + + def getCoordinate(self, index): + i = (index % self.grid[0]) + j = (index // self.grid[0]) + return i, j + + def getIndex(self, i, j): + return j*self.grid[0] + i + + def getEast(self, index): + i, j = self.getCoordinate(index) + i = (i+1) % self.grid[0] + return self.getIndex(i, j) + + def getWest(self, index): + i, j = self.getCoordinate(index) + i = (i+self.grid[0]-1) % self.grid[0] + return self.getIndex(i, j) + + def getNorth(self, index): + i, j = self.getCoordinate(index) + j = (j+1) % self.grid[1] + return self.getIndex(i, j) + + def getSouth(self, index): + i, j = self.getCoordinate(index) + j = (j+self.grid[1]-1) % self.grid[1] + return self.getIndex(i, j) + + def getGrid(num_gpus, num_dims): + assert(isinstance(num_gpus, int)) + assert(isinstance(num_dims, int)) + + # Adapted from https://stackoverflow.com/questions/28057307/factoring-a-number-into-roughly-equal-factors + # Original code by https://stackoverflow.com/users/3928385/ishamael + # Factorizes a number into n roughly equal factors + + #Dictionary to remember already computed permutations + memo = {} + def dp(n, left): # returns tuple (cost, [factors]) + """ + Recursively searches through all factorizations + """ + + #Already tried: return existing result + if (n, left) in memo: + return memo[(n, left)] + + #Spent all factors: return number itself + if left == 1: + return (n, [n]) + + #Find new factor + i = 2 + best = n + bestTuple = [n] + while i * i < n: + #If factor found + if n % i == 0: + #Factorize remainder + rem = dp(n // i, left - 1) + + #If new permutation better, save it + if rem[0] + i < best: + best = rem[0] + i + bestTuple = [i] + rem[1] + i += 1 + + #Store calculation + memo[(n, left)] = (best, bestTuple) + return memo[(n, left)] + + + grid = dp(num_gpus, num_dims)[1] + + if (len(grid) < num_dims): + #Split problematic 4 + if (4 in grid): + grid.remove(4) + grid.append(2) + grid.append(2) + + #Pad with ones to guarantee num_dims + grid = grid + [1]*(num_dims - len(grid)) + + #Sort in descending order + grid = np.sort(grid) + grid = grid[::-1] + + return grid + +class SHMEMSimulatorGroup(Simulator.BaseSimulator): + """ + Class which handles communication and synchronization between simulators in different + contexts (presumably on different GPUs) + """ + def __init__(self, grid, **kwargs): + self.logger = logging.getLogger(__name__) + sims = [] + + for i in range(grid.ngpus): + local_sim = EE2D_KP07_dimsplit.EE2D_KP07_dimsplit(**kwargs) + sims[i] = SHMEMSimulator(i, local_sim, grid) + + autotuner = sims[0].context.autotuner + sims[0].context.autotuner = None; + boundary_conditions = sims[0].getBoundaryConditions() + super().__init__(sims[0].context, + sims[0].nx, sims[0].ny, + sims[0].dx, sims[0].dy, + boundary_conditions, + sims[0].cfl_scale, + sims[0].num_substeps, + sims[0].block_size[0], sims[0].block_size[1]) + sims[0].context.autotuner = autotuner + + self.nsubdomains = grid.ngpus + self.sims = sims + self.grid = grid + + #Get coordinate of this subdomain + #and handle global boundary conditions + new_boundary_conditions = Simulator.BoundaryCondition({ + 'north': Simulator.BoundaryCondition.Type.Dirichlet, + 'south': Simulator.BoundaryCondition.Type.Dirichlet, + 'east': Simulator.BoundaryCondition.Type.Dirichlet, + 'west': Simulator.BoundaryCondition.Type.Dirichlet + }) + gi, gj = grid.getCoordinate(self.index) + if (gi == 0 and boundary_conditions.west != Simulator.BoundaryCondition.Type.Periodic): + self.west = None + new_boundary_conditions.west = boundary_conditions.west; + if (gj == 0 and boundary_conditions.south != Simulator.BoundaryCondition.Type.Periodic): + self.south = None + new_boundary_conditions.south = boundary_conditions.south; + if (gi == grid.grid[0]-1 and boundary_conditions.east != Simulator.BoundaryCondition.Type.Periodic): + self.east = None + new_boundary_conditions.east = boundary_conditions.east; + if (gj == grid.grid[1]-1 and boundary_conditions.north != Simulator.BoundaryCondition.Type.Periodic): + self.north = None + new_boundary_conditions.north = boundary_conditions.north; + sim.setBoundaryConditions(new_boundary_conditions) + + #Get number of variables + self.nvars = len(self.getOutput().gpu_variables) + + #Shorthands for computing extents and sizes + gc_x = int(self.sim.getOutput()[0].x_halo) + gc_y = int(self.sim.getOutput()[0].y_halo) + nx = int(self.sim.nx) + ny = int(self.sim.ny) + + self.logger.debug("Initialized {:d} subdomains".format(len(self.sims))) + + + # TODO: Re-implement methods below + def substep(self, dt, step_number): + self.exchange() + self.sim.substep(dt, step_number) + + def getOutput(self): + return self.sim.getOutput() + + def synchronize(self): + self.sim.synchronize() + + def check(self): + return self.sim.check() + + def computeDt(self): + local_dt = np.array([np.float32(self.sim.computeDt())]) + global_dt = np.empty(1, dtype=np.float32) + self.grid.comm.Allreduce(local_dt, global_dt, op=MPI.MIN) + self.logger.debug("Local dt: {:f}, global dt: {:f}".format(local_dt[0], global_dt[0])) + return global_dt[0] + + def getExtent(self): + """ + Function which returns the extent of the subdomain with index + index in the grid + """ + width = self.sim.nx*self.sim.dx + height = self.sim.ny*self.sim.dy + i, j = self.grid.getCoordinate(self.index) + x0 = i * width + y0 = j * height + x1 = x0 + width + y1 = y0 + height + return [x0, x1, y0, y1] + + def exchange(self): + ns_download_before_exchange() + # GLOBAL SYNC + ns_do_exchange() + # GLOBAL SYNC + ns_upload_after_exchange() + + ew_download_before_exchange() + # GLOBAL SYNC + ew_do_exchange() + # GLOBAL SYNC + ew_upload_after_exchange() + + def ns_download_before_exchange(self): + #### + # First transfer internal cells north-south + #### + + #Download from the GPU + if self.north is not None: + for k in range(self.nvars): + self.sim.u0[k].download(self.sim.stream, cpu_data=self.out_n[k,:,:], asynch=True, extent=self.read_n) + if self.south is not None: + for k in range(self.nvars): + self.sim.u0[k].download(self.sim.stream, cpu_data=self.out_s[k,:,:], asynch=True, extent=self.read_s) + self.sim.stream.synchronize() + + def ns_do_exchange(self): + #Send/receive to north/south neighbours + comm_send = [] + comm_recv = [] + if self.north is not None: + comm_send += [self.grid.comm.Isend(self.out_n, dest=self.north, tag=4*self.nt + 0)] + comm_recv += [self.grid.comm.Irecv(self.in_n, source=self.north, tag=4*self.nt + 1)] + if self.south is not None: + comm_send += [self.grid.comm.Isend(self.out_s, dest=self.south, tag=4*self.nt + 1)] + comm_recv += [self.grid.comm.Irecv(self.in_s, source=self.south, tag=4*self.nt + 0)] + + #Wait for incoming transfers to complete + for comm in comm_recv: + comm.wait() + + def ns_upload_after_exchange(self): + #Upload to the GPU + if self.north is not None: + for k in range(self.nvars): + self.sim.u0[k].upload(self.sim.stream, self.in_n[k,:,:], extent=self.write_n) + if self.south is not None: + for k in range(self.nvars): + self.sim.u0[k].upload(self.sim.stream, self.in_s[k,:,:], extent=self.write_s) + + #Wait for sending to complete + for comm in comm_send: + comm.wait() + + def ew_download_before_exchange(self): + #### + # Then transfer east-west including ghost cells that have been filled in by north-south transfer above + #### + + #Download from the GPU + if self.east is not None: + for k in range(self.nvars): + self.sim.u0[k].download(self.sim.stream, cpu_data=self.out_e[k,:,:], asynch=True, extent=self.read_e) + if self.west is not None: + for k in range(self.nvars): + self.sim.u0[k].download(self.sim.stream, cpu_data=self.out_w[k,:,:], asynch=True, extent=self.read_w) + self.sim.stream.synchronize() + + def ew_do_exchange(self): + #Send/receive to east/west neighbours + comm_send = [] + comm_recv = [] + if self.east is not None: + comm_send += [self.grid.comm.Isend(self.out_e, dest=self.east, tag=4*self.nt + 2)] + comm_recv += [self.grid.comm.Irecv(self.in_e, source=self.east, tag=4*self.nt + 3)] + if self.west is not None: + comm_send += [self.grid.comm.Isend(self.out_w, dest=self.west, tag=4*self.nt + 3)] + comm_recv += [self.grid.comm.Irecv(self.in_w, source=self.west, tag=4*self.nt + 2)] + + + #Wait for incoming transfers to complete + for comm in comm_recv: + comm.wait() + + def ew_upload_after_exchange(self): + #Upload to the GPU + if self.east is not None: + for k in range(self.nvars): + self.sim.u0[k].upload(self.sim.stream, self.in_e[k,:,:], extent=self.write_e) + if self.west is not None: + for k in range(self.nvars): + self.sim.u0[k].upload(self.sim.stream, self.in_w[k,:,:], extent=self.write_w) + + #Wait for sending to complete + for comm in comm_send: + comm.wait() \ No newline at end of file diff --git a/shmemTesting.py b/shmemTesting.py index 8a6ec9d..ce20f76 100644 --- a/shmemTesting.py +++ b/shmemTesting.py @@ -60,8 +60,10 @@ logger.info("File logger using level %s to %s", logging.getLevelName(log_level_f #### # Initialize SHMEM grid etc #### +nsubdomains = 2 + logger.info("Creating SHMEM grid") -grid = SHMEMSimulator.SHMEMGrid() +grid = SHMEMSimulator.SHMEMGrid(ngpus=nsubdomains) @@ -73,39 +75,39 @@ nx = 128 ny = 128 gamma = 1.4 save_times = np.linspace(0, 5.0, 10) -#outfile = "shmem_out.nc" # XXX: implement grid-support save_var_names = ['rho', 'rho_u', 'rho_v', 'E'] -sims = [] -for i in range(grid.ngpus): - outfile = "shmem_out_" + str(i) + ".nc" +outfile = outfile[i] = "shmem_out.nc" - arguments = IC.genKelvinHelmholtz(nx, ny, gamma) # XXX: implement grid-support - arguments['context'] = grid.cuda_contexts[i] - arguments['theta'] = 1.2 - #arguments['grid'] = grid # XXX: implement grid-support - - #### - # Run simulation - #### - logger.info("Running simulation") - #Helper function to create SHMEM simulator - #def genSim(grid, **kwargs): - def genSim(**kwargs): - sim = EE2D_KP07_dimsplit.EE2D_KP07_dimsplit(**kwargs) - #sim = SHMEMSimulator.SHMEMSimulator(local_sim, grid) # implement SHMEMSimulator-support - sims.append(sim) - return sim - outfile = Common.runSimulation(genSim, arguments, outfile, save_times, save_var_names) +#outfile[i] = "shmem_out_" + str(i) + ".nc" +#arguments = [] +local_sim = [] +#sim = [] + +arguments = IC.genKelvinHelmholtz(nx, ny, gamma, grid=grid) +arguments['context'] = grid.cuda_contexts[i] +arguments['theta'] = 1.2 +arguments['grid'] = grid + +#### +# Run simulation +#### +logger.info("Running simulation") +#Helper function to create SHMEM simulator +def genSim(grid, **kwargs): + sim = SHMEMSimulatorGroup.SHMEMSimulatorGroup(i, local_sims, grid, **kwargs) + return sim + +outfile = Common.runSimulation(genSim, arguments, outfile, save_times, save_var_names) #### # Clean shutdown #### -#sim = None # implement SHMEMSimulator-support -sims = None -local_sims = None +sim = None +local_sim = None +cuda_context = None arguments = None logging.shutdown() gc.collect()