diff --git a/GPUSimulators/Common.py b/GPUSimulators/Common.py index 05239c6..50ba925 100644 --- a/GPUSimulators/Common.py +++ b/GPUSimulators/Common.py @@ -33,12 +33,154 @@ import hashlib import logging import gc import netCDF4 +import json import pycuda.compiler as cuda_compiler import pycuda.gpuarray import pycuda.driver as cuda + + + + +def safeCall(cmd): + logger = logging.getLogger(__name__) + try: + #git rev-parse HEAD + current_dir = os.path.dirname(os.path.realpath(__file__)) + stdout = subprocess.check_output(cmd, + stderr=subprocess.STDOUT, + cwd=current_dir, + universal_newlines=True, #text=True in more recent python + shell=False, + creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) + except subprocess.CalledProcessError as e: + output = e.output + logger.error("Git failed, \nReturn code: " + str(e.returncode) + "\nOutput: " + output) + raise e + + return stdout + +def getGitHash(): + return safeCall(["git", "rev-parse", "HEAD"]) + +def getGitStatus(): + return safeCall(["git", "status", "--porcelain", "-uno"]) + +def toJson(in_dict, compressed=True): + """ + Creates JSON string from a dictionary + """ + logger = logging.getLogger(__name__) + out_dict = in_dict.copy() + for key in out_dict: + if isinstance(out_dict[key], np.ndarray): + out_dict[key] = out_dict[key].tolist() + else: + try: + json.dumps(out_dict[key]) + except: + value = str(out_dict[key]) + logger.warning("JSON: Converting {:s} to string ({:s})".format(key, value)) + out_dict[key] = value + return json.dumps(out_dict) + +def runSimulation(simulator, simulator_args, outfile, save_times, save_var_names=[]): + """ + Runs a simulation, and stores output in netcdf file. Stores the times given in + save_times, and saves all of the variables in list save_var_names. Elements in + save_var_names can be set to None if you do not want to save them + """ + + assert len(save_times) > 0, "Need to specify which times to save" + + with Timer("construct") as t: + sim = simulator(**simulator_args) + print("Constructed in " + str(t.secs) + " seconds") + + #Create netcdf file and simulate + with DataDumper(outfile, mode='w', clobber=False) as outdata: + + #Create attributes (metadata) + outdata.ncfile.created = time.ctime(time.time()) + outdata.ncfile.git_hash = getGitHash() + outdata.ncfile.git_status = getGitStatus() + outdata.ncfile.simulator = str(simulator) + outdata.ncfile.sim_args = toJson(simulator_args) + + #Create dimensions + outdata.ncfile.createDimension('time', len(save_times)) + outdata.ncfile.createDimension('x', simulator_args['nx']) + outdata.ncfile.createDimension('y', simulator_args['ny']) + + #Create variables for dimensions + ncvars = {} + ncvars['time'] = outdata.ncfile.createVariable('time', np.dtype('float32').char, 'time') + ncvars['x'] = outdata.ncfile.createVariable( 'x', np.dtype('float32').char, 'x') + ncvars['y'] = outdata.ncfile.createVariable( 'y', np.dtype('float32').char, 'y') + + #Fill variables with proper values + ncvars['time'][:] = save_times + extent = sim.getExtent() + ncvars['x'][:] = np.linspace(extent[0], extent[1], simulator_args['nx']) + ncvars['y'][:] = np.linspace(extent[2], extent[3], simulator_args['ny']) + + #Choose which variables to download (prune None from list, but keep the index) + download_vars = [] + for i, var_name in enumerate(save_var_names): + if var_name is not None: + download_vars += [i] + save_var_names = list(save_var_names[i] for i in download_vars) + + #Create variables + for var_name in save_var_names: + ncvars[var_name] = outdata.ncfile.createVariable(var_name, np.dtype('float32').char, ('time', 'y', 'x'), zlib=True, least_significant_digit=3) + + #Create step sizes between each save + t_steps = np.empty_like(save_times) + t_steps[0] = save_times[0] + t_steps[1:] = save_times[1:] - save_times[0:-1] + + #Start simulation loop + progress_printer = ProgressPrinter(save_times[-1], print_every=10) + for k in range(len(save_times)): + #Get target time and step size there + t_step = t_steps[k] + t_end = save_times[k] + + #Sanity check simulator + try: + sim.check() + except AssertionError as e: + print("Error after {:d} steps (t={:f}: {:s}".format(sim.simSteps(), sim.simTime(), str(e))) + return outdata.filename + + #Simulate + if (t_step > 0.0): + sim.simulate(t_step) + + #Download + save_vars = sim.download(download_vars) + + #Save to file + for i, var_name in enumerate(save_var_names): + ncvars[var_name][k, :] = save_vars[i] + + #Write progress to screen + print_string = progress_printer.getPrintString(t_end) + if (print_string): + print(print_string) + + print("Simulated to t={:f} in {:d} timesteps (average dt={:f})".format(t_end, sim.simSteps(), sim.simTime() / sim.simSteps())) + + return outdata.filename + + + + + + class Timer(object): """ Class which keeps track of time spent for a section of code @@ -196,8 +338,9 @@ class DataDumper(object): self.logger = logging.getLogger(__name__) #Create directory if needed + filename = os.path.abspath(filename) dirname = os.path.dirname(filename) - if not os.path.isdir(dirname): + if dirname and not os.path.isdir(dirname): self.logger.info("Creating directory " + dirname) os.makedirs(dirname) @@ -223,7 +366,7 @@ class DataDumper(object): self.kwargs = kwargs #Log output - self.logger.info("Writing output to " + self.filename) + self.logger.info("Initialized " + self.filename) def __enter__(self): @@ -238,6 +381,22 @@ class DataDumper(object): def __exit__(self, *args): self.logger.info("Closing " + self.filename) self.ncfile.close() + + + def toJson(in_dict): + out_dict = in_dict.copy() + + for key in out_dict: + if isinstance(out_dict[key], np.ndarray): + out_dict[key] = out_dict[key].tolist() + else: + try: + json.dumps(out_dict[key]) + except: + out_dict[key] = str(out_dict[key]) + + return json.dumps(out_dict) + @@ -448,7 +607,7 @@ class CudaArray3D: #self.logger.debug("Allocating [%dx%dx%d] buffer", self.nx, self.ny, self.nz) #Should perhaps use pycuda.driver.mem_alloc_data.pitch() here - self.data = pycuda.gpuarray.empty((nz_halo, ny_halo, nx_halo), dtype) + self.data = pycuda.gpuarray.zeros((nz_halo, ny_halo, nx_halo), dtype) #If we don't have any data, just allocate and return if cpu_data is None: @@ -539,10 +698,10 @@ class CudaArray3D: A class representing an Arakawa A type (unstaggered, logically Cartesian) grid """ class ArakawaA2D: - """ - Uploads initial data to the CL device - """ def __init__(self, stream, nx, ny, halo_x, halo_y, cpu_variables): + """ + Uploads initial data to the GPU device + """ self.logger = logging.getLogger(__name__) self.gpu_variables = [] for cpu_variable in cpu_variables: @@ -554,22 +713,27 @@ class ArakawaA2D: raise IndexError("Out of bounds") return self.gpu_variables[key] - """ - Enables downloading data from CL device to Python - """ - def download(self, stream): + def download(self, stream, variables=None): + """ + Enables downloading data from the GPU device to Python + """ + if variables is None: + variables=range(len(self.gpu_variables)) + cpu_variables = [] - for gpu_variable in self.gpu_variables: - cpu_variables += [gpu_variable.download(stream, async=True)] + for i in variables: + assert i < len(self.gpu_variables), "Variable {:d} is out of range".format(i) + cpu_variables += [self.gpu_variables[i].download(stream, async=True)] + stream.synchronize() return cpu_variables - """ - Checks that data is still sane - """ def check(self): + """ + Checks that data is still sane + """ for i, gpu_variable in enumerate(self.gpu_variables): var_sum = pycuda.gpuarray.sum(gpu_variable.data).get() - self.logger.debug("Data %d with size [%d x %d] has sum %f", i, gpu_variable.nx, gpu_variable.ny, var_sum) + self.logger.debug("Data %d with size [%d x %d] has average %f", i, gpu_variable.nx, gpu_variable.ny, var_sum / (gpu_variable.nx * gpu_variable.ny)) assert np.isnan(var_sum) == False, "Data contains NaN values!" \ No newline at end of file diff --git a/GPUSimulators/EE2D_KP07_dimsplit.py b/GPUSimulators/EE2D_KP07_dimsplit.py index ad2803d..47d3235 100644 --- a/GPUSimulators/EE2D_KP07_dimsplit.py +++ b/GPUSimulators/EE2D_KP07_dimsplit.py @@ -132,8 +132,8 @@ class EE2D_KP07_dimsplit (BaseSimulator): self.cfl_data.gpudata) self.u0, self.u1 = self.u1, self.u0 - def download(self): - return self.u0.download(self.stream) + def getOutput(self): + return self.u0 def check(self): self.u0.check() diff --git a/GPUSimulators/FORCE.py b/GPUSimulators/FORCE.py index 0885fcf..35d593e 100644 --- a/GPUSimulators/FORCE.py +++ b/GPUSimulators/FORCE.py @@ -117,8 +117,8 @@ class FORCE (Simulator.BaseSimulator): self.cfl_data.gpudata) self.u0, self.u1 = self.u1, self.u0 - def download(self): - return self.u0.download(self.stream) + def getOutput(self): + return self.u0 def check(self): self.u0.check() diff --git a/GPUSimulators/HLL.py b/GPUSimulators/HLL.py index 01a7568..41968c5 100644 --- a/GPUSimulators/HLL.py +++ b/GPUSimulators/HLL.py @@ -112,8 +112,8 @@ class HLL (Simulator.BaseSimulator): self.cfl_data.gpudata) self.u0, self.u1 = self.u1, self.u0 - def download(self): - return self.u0.download(self.stream) + def getOutput(self): + return self.u0 def check(self): self.u0.check() diff --git a/GPUSimulators/HLL2.py b/GPUSimulators/HLL2.py index ff04c78..ddecba6 100644 --- a/GPUSimulators/HLL2.py +++ b/GPUSimulators/HLL2.py @@ -122,8 +122,8 @@ class HLL2 (Simulator.BaseSimulator): self.cfl_data.gpudata) self.u0, self.u1 = self.u1, self.u0 - def download(self): - return self.u0.download(self.stream) + def getOutput(self): + return self.u0 def check(self): self.u0.check() diff --git a/GPUSimulators/KP07.py b/GPUSimulators/KP07.py index 122558b..e91ba6d 100644 --- a/GPUSimulators/KP07.py +++ b/GPUSimulators/KP07.py @@ -126,9 +126,8 @@ class KP07 (Simulator.BaseSimulator): self.cfl_data.gpudata) self.u0, self.u1 = self.u1, self.u0 - - def download(self): - return self.u0.download(self.stream) + def getOutput(self): + return self.u0 def check(self): self.u0.check() diff --git a/GPUSimulators/KP07_dimsplit.py b/GPUSimulators/KP07_dimsplit.py index f0c8a1e..6052c9a 100644 --- a/GPUSimulators/KP07_dimsplit.py +++ b/GPUSimulators/KP07_dimsplit.py @@ -125,8 +125,8 @@ class KP07_dimsplit(Simulator.BaseSimulator): self.cfl_data.gpudata) self.u0, self.u1 = self.u1, self.u0 - def download(self): - return self.u0.download(self.stream) + def getOutput(self): + return self.u0 def check(self): self.u0.check() diff --git a/GPUSimulators/LxF.py b/GPUSimulators/LxF.py index 9c54096..91a715a 100644 --- a/GPUSimulators/LxF.py +++ b/GPUSimulators/LxF.py @@ -113,8 +113,8 @@ class LxF (Simulator.BaseSimulator): self.cfl_data.gpudata) self.u0, self.u1 = self.u1, self.u0 - def download(self): - return self.u0.download(self.stream) + def getOutput(self): + return self.u0 def check(self): self.u0.check() diff --git a/GPUSimulators/MPISimulator.py b/GPUSimulators/MPISimulator.py index 1b8543e..950fc24 100644 --- a/GPUSimulators/MPISimulator.py +++ b/GPUSimulators/MPISimulator.py @@ -135,58 +135,14 @@ class MPIGrid(object): grid = np.flip(np.sort(grid)) return grid - - - def getExtent(self, width, height, rank): - """ - Function which returns the extent of node with rank - rank in the grid - """ - i, j = self.getCoordinate(rank) - x0 = i * width - y0 = j * height - x1 = x0+width - y1 = y0+height - return [x0, x1, y0, y1] - def gatherData(self, data, rank=0): - """ - Function which gathers the data onto node with rank - rank - """ - #Get shape of data - ny, nx = data.shape - - #Create list of buffers to return - retval = [] - - #If we are the target node, recieve from others - #otherwise send to target - if (self.comm.rank == rank): - mpi_requests = [] - retval = [] - - #Loop over all nodes - for k in range(0, self.comm.size): - #If k equal target node, add our own data - #Otherwise receive it from node k - if (k == rank): - retval += [data] - else: - buffer = np.empty((ny, nx), dtype=np.float32) - retval += [buffer] - mpi_requests += [self.comm.Irecv(buffer, source=k, tag=k)] - - #Wait for transfers to complete - for mpi_request in mpi_requests: - mpi_request.wait() - else: - mpi_request = self.comm.Isend(data, dest=rank, tag=self.comm.rank) - mpi_request.wait() - - return retval - + def gather(self, data, root=0): + out_data = None + if (self.comm.rank == root): + out_data = np.empty([self.comm.size] + list(data.shape), dtype=data.dtype) + self.comm.Gather(data, out_data, root) + return out_data class MPISimulator(Simulator.BaseSimulator): @@ -259,8 +215,8 @@ class MPISimulator(Simulator.BaseSimulator): self.exchange() self.sim.substep(dt, step_number) - def download(self): - return self.sim.download() + def getOutput(self): + return self.sim.getOutput() def synchronize(self): self.sim.synchronize() @@ -273,7 +229,22 @@ class MPISimulator(Simulator.BaseSimulator): global_dt = np.empty(1, dtype=np.float32) self.grid.comm.Allreduce(local_dt, global_dt, op=MPI.MIN) self.logger.debug("Local dt: {:f}, global dt: {:f}".format(local_dt[0], global_dt[0])) - return global_dt[0] + return global_dt[0] + + + def getExtent(self): + """ + Function which returns the extent of node with rank + rank in the grid + """ + width = self.sim.nx*self.sim.dx + height = self.sim.ny*self.sim.dy + i, j = self.grid.getCoordinate() + x0 = i * width + y0 = j * height + x1 = x0 + width + y1 = y0 + height + return [x0, x1, y0, y1] def exchange(self): #### diff --git a/GPUSimulators/Simulator.py b/GPUSimulators/Simulator.py index af62cae..a9f8c5a 100644 --- a/GPUSimulators/Simulator.py +++ b/GPUSimulators/Simulator.py @@ -217,32 +217,38 @@ class BaseSimulator(object): self.t += dt self.nt += 1 - - def substep(self, dt, step_number): - """ - Function which performs one single substep with stepsize dt - """ - raise(NotImplementedError("Needs to be implemented in subclass")) - def download(self): - raise(NotImplementedError("Needs to be implemented in subclass")) + def download(self, variables=None): + return self.getOutput().download(self.stream, variables) def synchronize(self): self.stream.synchronize() - - def check(self): - self.logger.warning("check() is not implemented - please implement") - #raise(NotImplementedError("Needs to be implemented in subclass")) def simTime(self): return self.t def simSteps(self): return self.nt + + def getExtent(self): + return [0, 0, self.nx*self.dx, self.ny*self.dy] + + def substep(self, dt, step_number): + """ + Function which performs one single substep with stepsize dt + """ + raise(NotImplementedError("Needs to be implemented in subclass")) + + def getOutput(self): + raise(NotImplementedError("Needs to be implemented in subclass")) + + def check(self): + self.logger.warning("check() is not implemented - please implement") + #raise(NotImplementedError("Needs to be implemented in subclass")) def computeDt(self): raise(NotImplementedError("Needs to be implemented in subclass")) - + diff --git a/GPUSimulators/WAF.py b/GPUSimulators/WAF.py index 83acff5..89886ec 100644 --- a/GPUSimulators/WAF.py +++ b/GPUSimulators/WAF.py @@ -115,8 +115,8 @@ class WAF (Simulator.BaseSimulator): self.cfl_data.gpudata) self.u0, self.u1 = self.u1, self.u0 - def download(self): - return self.u0.download(self.stream) + def getOutput(self): + return self.u0 def check(self): self.u0.check()