Refactoring

This commit is contained in:
André R. Brodtkorb 2018-12-03 12:09:15 +01:00
parent b266567d09
commit ae6404f05e
11 changed files with 239 additions and 99 deletions

View File

@ -33,12 +33,154 @@ import hashlib
import logging
import gc
import netCDF4
import json
import pycuda.compiler as cuda_compiler
import pycuda.gpuarray
import pycuda.driver as cuda
def safeCall(cmd):
logger = logging.getLogger(__name__)
try:
#git rev-parse HEAD
current_dir = os.path.dirname(os.path.realpath(__file__))
stdout = subprocess.check_output(cmd,
stderr=subprocess.STDOUT,
cwd=current_dir,
universal_newlines=True, #text=True in more recent python
shell=False,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
except subprocess.CalledProcessError as e:
output = e.output
logger.error("Git failed, \nReturn code: " + str(e.returncode) + "\nOutput: " + output)
raise e
return stdout
def getGitHash():
return safeCall(["git", "rev-parse", "HEAD"])
def getGitStatus():
return safeCall(["git", "status", "--porcelain", "-uno"])
def toJson(in_dict, compressed=True):
"""
Creates JSON string from a dictionary
"""
logger = logging.getLogger(__name__)
out_dict = in_dict.copy()
for key in out_dict:
if isinstance(out_dict[key], np.ndarray):
out_dict[key] = out_dict[key].tolist()
else:
try:
json.dumps(out_dict[key])
except:
value = str(out_dict[key])
logger.warning("JSON: Converting {:s} to string ({:s})".format(key, value))
out_dict[key] = value
return json.dumps(out_dict)
def runSimulation(simulator, simulator_args, outfile, save_times, save_var_names=[]):
"""
Runs a simulation, and stores output in netcdf file. Stores the times given in
save_times, and saves all of the variables in list save_var_names. Elements in
save_var_names can be set to None if you do not want to save them
"""
assert len(save_times) > 0, "Need to specify which times to save"
with Timer("construct") as t:
sim = simulator(**simulator_args)
print("Constructed in " + str(t.secs) + " seconds")
#Create netcdf file and simulate
with DataDumper(outfile, mode='w', clobber=False) as outdata:
#Create attributes (metadata)
outdata.ncfile.created = time.ctime(time.time())
outdata.ncfile.git_hash = getGitHash()
outdata.ncfile.git_status = getGitStatus()
outdata.ncfile.simulator = str(simulator)
outdata.ncfile.sim_args = toJson(simulator_args)
#Create dimensions
outdata.ncfile.createDimension('time', len(save_times))
outdata.ncfile.createDimension('x', simulator_args['nx'])
outdata.ncfile.createDimension('y', simulator_args['ny'])
#Create variables for dimensions
ncvars = {}
ncvars['time'] = outdata.ncfile.createVariable('time', np.dtype('float32').char, 'time')
ncvars['x'] = outdata.ncfile.createVariable( 'x', np.dtype('float32').char, 'x')
ncvars['y'] = outdata.ncfile.createVariable( 'y', np.dtype('float32').char, 'y')
#Fill variables with proper values
ncvars['time'][:] = save_times
extent = sim.getExtent()
ncvars['x'][:] = np.linspace(extent[0], extent[1], simulator_args['nx'])
ncvars['y'][:] = np.linspace(extent[2], extent[3], simulator_args['ny'])
#Choose which variables to download (prune None from list, but keep the index)
download_vars = []
for i, var_name in enumerate(save_var_names):
if var_name is not None:
download_vars += [i]
save_var_names = list(save_var_names[i] for i in download_vars)
#Create variables
for var_name in save_var_names:
ncvars[var_name] = outdata.ncfile.createVariable(var_name, np.dtype('float32').char, ('time', 'y', 'x'), zlib=True, least_significant_digit=3)
#Create step sizes between each save
t_steps = np.empty_like(save_times)
t_steps[0] = save_times[0]
t_steps[1:] = save_times[1:] - save_times[0:-1]
#Start simulation loop
progress_printer = ProgressPrinter(save_times[-1], print_every=10)
for k in range(len(save_times)):
#Get target time and step size there
t_step = t_steps[k]
t_end = save_times[k]
#Sanity check simulator
try:
sim.check()
except AssertionError as e:
print("Error after {:d} steps (t={:f}: {:s}".format(sim.simSteps(), sim.simTime(), str(e)))
return outdata.filename
#Simulate
if (t_step > 0.0):
sim.simulate(t_step)
#Download
save_vars = sim.download(download_vars)
#Save to file
for i, var_name in enumerate(save_var_names):
ncvars[var_name][k, :] = save_vars[i]
#Write progress to screen
print_string = progress_printer.getPrintString(t_end)
if (print_string):
print(print_string)
print("Simulated to t={:f} in {:d} timesteps (average dt={:f})".format(t_end, sim.simSteps(), sim.simTime() / sim.simSteps()))
return outdata.filename
class Timer(object):
"""
Class which keeps track of time spent for a section of code
@ -196,8 +338,9 @@ class DataDumper(object):
self.logger = logging.getLogger(__name__)
#Create directory if needed
filename = os.path.abspath(filename)
dirname = os.path.dirname(filename)
if not os.path.isdir(dirname):
if dirname and not os.path.isdir(dirname):
self.logger.info("Creating directory " + dirname)
os.makedirs(dirname)
@ -223,7 +366,7 @@ class DataDumper(object):
self.kwargs = kwargs
#Log output
self.logger.info("Writing output to " + self.filename)
self.logger.info("Initialized " + self.filename)
def __enter__(self):
@ -240,6 +383,22 @@ class DataDumper(object):
self.ncfile.close()
def toJson(in_dict):
out_dict = in_dict.copy()
for key in out_dict:
if isinstance(out_dict[key], np.ndarray):
out_dict[key] = out_dict[key].tolist()
else:
try:
json.dumps(out_dict[key])
except:
out_dict[key] = str(out_dict[key])
return json.dumps(out_dict)
class ProgressPrinter(object):
@ -448,7 +607,7 @@ class CudaArray3D:
#self.logger.debug("Allocating [%dx%dx%d] buffer", self.nx, self.ny, self.nz)
#Should perhaps use pycuda.driver.mem_alloc_data.pitch() here
self.data = pycuda.gpuarray.empty((nz_halo, ny_halo, nx_halo), dtype)
self.data = pycuda.gpuarray.zeros((nz_halo, ny_halo, nx_halo), dtype)
#If we don't have any data, just allocate and return
if cpu_data is None:
@ -539,10 +698,10 @@ class CudaArray3D:
A class representing an Arakawa A type (unstaggered, logically Cartesian) grid
"""
class ArakawaA2D:
"""
Uploads initial data to the CL device
"""
def __init__(self, stream, nx, ny, halo_x, halo_y, cpu_variables):
"""
Uploads initial data to the GPU device
"""
self.logger = logging.getLogger(__name__)
self.gpu_variables = []
for cpu_variable in cpu_variables:
@ -554,22 +713,27 @@ class ArakawaA2D:
raise IndexError("Out of bounds")
return self.gpu_variables[key]
"""
Enables downloading data from CL device to Python
"""
def download(self, stream):
def download(self, stream, variables=None):
"""
Enables downloading data from the GPU device to Python
"""
if variables is None:
variables=range(len(self.gpu_variables))
cpu_variables = []
for gpu_variable in self.gpu_variables:
cpu_variables += [gpu_variable.download(stream, async=True)]
for i in variables:
assert i < len(self.gpu_variables), "Variable {:d} is out of range".format(i)
cpu_variables += [self.gpu_variables[i].download(stream, async=True)]
stream.synchronize()
return cpu_variables
"""
Checks that data is still sane
"""
def check(self):
"""
Checks that data is still sane
"""
for i, gpu_variable in enumerate(self.gpu_variables):
var_sum = pycuda.gpuarray.sum(gpu_variable.data).get()
self.logger.debug("Data %d with size [%d x %d] has sum %f", i, gpu_variable.nx, gpu_variable.ny, var_sum)
self.logger.debug("Data %d with size [%d x %d] has average %f", i, gpu_variable.nx, gpu_variable.ny, var_sum / (gpu_variable.nx * gpu_variable.ny))
assert np.isnan(var_sum) == False, "Data contains NaN values!"

View File

@ -132,8 +132,8 @@ class EE2D_KP07_dimsplit (BaseSimulator):
self.cfl_data.gpudata)
self.u0, self.u1 = self.u1, self.u0
def download(self):
return self.u0.download(self.stream)
def getOutput(self):
return self.u0
def check(self):
self.u0.check()

View File

@ -117,8 +117,8 @@ class FORCE (Simulator.BaseSimulator):
self.cfl_data.gpudata)
self.u0, self.u1 = self.u1, self.u0
def download(self):
return self.u0.download(self.stream)
def getOutput(self):
return self.u0
def check(self):
self.u0.check()

View File

@ -112,8 +112,8 @@ class HLL (Simulator.BaseSimulator):
self.cfl_data.gpudata)
self.u0, self.u1 = self.u1, self.u0
def download(self):
return self.u0.download(self.stream)
def getOutput(self):
return self.u0
def check(self):
self.u0.check()

View File

@ -122,8 +122,8 @@ class HLL2 (Simulator.BaseSimulator):
self.cfl_data.gpudata)
self.u0, self.u1 = self.u1, self.u0
def download(self):
return self.u0.download(self.stream)
def getOutput(self):
return self.u0
def check(self):
self.u0.check()

View File

@ -126,9 +126,8 @@ class KP07 (Simulator.BaseSimulator):
self.cfl_data.gpudata)
self.u0, self.u1 = self.u1, self.u0
def download(self):
return self.u0.download(self.stream)
def getOutput(self):
return self.u0
def check(self):
self.u0.check()

View File

@ -125,8 +125,8 @@ class KP07_dimsplit(Simulator.BaseSimulator):
self.cfl_data.gpudata)
self.u0, self.u1 = self.u1, self.u0
def download(self):
return self.u0.download(self.stream)
def getOutput(self):
return self.u0
def check(self):
self.u0.check()

View File

@ -113,8 +113,8 @@ class LxF (Simulator.BaseSimulator):
self.cfl_data.gpudata)
self.u0, self.u1 = self.u1, self.u0
def download(self):
return self.u0.download(self.stream)
def getOutput(self):
return self.u0
def check(self):
self.u0.check()

View File

@ -137,56 +137,12 @@ class MPIGrid(object):
return grid
def getExtent(self, width, height, rank):
"""
Function which returns the extent of node with rank
rank in the grid
"""
i, j = self.getCoordinate(rank)
x0 = i * width
y0 = j * height
x1 = x0+width
y1 = y0+height
return [x0, x1, y0, y1]
def gatherData(self, data, rank=0):
"""
Function which gathers the data onto node with rank
rank
"""
#Get shape of data
ny, nx = data.shape
#Create list of buffers to return
retval = []
#If we are the target node, recieve from others
#otherwise send to target
if (self.comm.rank == rank):
mpi_requests = []
retval = []
#Loop over all nodes
for k in range(0, self.comm.size):
#If k equal target node, add our own data
#Otherwise receive it from node k
if (k == rank):
retval += [data]
else:
buffer = np.empty((ny, nx), dtype=np.float32)
retval += [buffer]
mpi_requests += [self.comm.Irecv(buffer, source=k, tag=k)]
#Wait for transfers to complete
for mpi_request in mpi_requests:
mpi_request.wait()
else:
mpi_request = self.comm.Isend(data, dest=rank, tag=self.comm.rank)
mpi_request.wait()
return retval
def gather(self, data, root=0):
out_data = None
if (self.comm.rank == root):
out_data = np.empty([self.comm.size] + list(data.shape), dtype=data.dtype)
self.comm.Gather(data, out_data, root)
return out_data
class MPISimulator(Simulator.BaseSimulator):
@ -259,8 +215,8 @@ class MPISimulator(Simulator.BaseSimulator):
self.exchange()
self.sim.substep(dt, step_number)
def download(self):
return self.sim.download()
def getOutput(self):
return self.sim.getOutput()
def synchronize(self):
self.sim.synchronize()
@ -275,6 +231,21 @@ class MPISimulator(Simulator.BaseSimulator):
self.logger.debug("Local dt: {:f}, global dt: {:f}".format(local_dt[0], global_dt[0]))
return global_dt[0]
def getExtent(self):
"""
Function which returns the extent of node with rank
rank in the grid
"""
width = self.sim.nx*self.sim.dx
height = self.sim.ny*self.sim.dy
i, j = self.grid.getCoordinate()
x0 = i * width
y0 = j * height
x1 = x0 + width
y1 = y0 + height
return [x0, x1, y0, y1]
def exchange(self):
####
# FIXME: This function can be optimized using persitent communications.

View File

@ -218,28 +218,34 @@ class BaseSimulator(object):
self.t += dt
self.nt += 1
def substep(self, dt, step_number):
"""
Function which performs one single substep with stepsize dt
"""
raise(NotImplementedError("Needs to be implemented in subclass"))
def download(self):
raise(NotImplementedError("Needs to be implemented in subclass"))
def download(self, variables=None):
return self.getOutput().download(self.stream, variables)
def synchronize(self):
self.stream.synchronize()
def check(self):
self.logger.warning("check() is not implemented - please implement")
#raise(NotImplementedError("Needs to be implemented in subclass"))
def simTime(self):
return self.t
def simSteps(self):
return self.nt
def getExtent(self):
return [0, 0, self.nx*self.dx, self.ny*self.dy]
def substep(self, dt, step_number):
"""
Function which performs one single substep with stepsize dt
"""
raise(NotImplementedError("Needs to be implemented in subclass"))
def getOutput(self):
raise(NotImplementedError("Needs to be implemented in subclass"))
def check(self):
self.logger.warning("check() is not implemented - please implement")
#raise(NotImplementedError("Needs to be implemented in subclass"))
def computeDt(self):
raise(NotImplementedError("Needs to be implemented in subclass"))

View File

@ -115,8 +115,8 @@ class WAF (Simulator.BaseSimulator):
self.cfl_data.gpudata)
self.u0, self.u1 = self.u1, self.u0
def download(self):
return self.u0.download(self.stream)
def getOutput(self):
return self.u0
def check(self):
self.u0.check()