MPI prototype

This commit is contained in:
André R. Brodtkorb
2018-11-21 07:49:39 +01:00
parent 074e38de84
commit c51afef9fc
7 changed files with 855 additions and 31 deletions

View File

@@ -115,15 +115,17 @@ class IPEngine(object):
# attach to a running cluster
import ipyparallel
cluster = ipyparallel.Client()#profile='mpi')
while(len(cluster.ids) != n_engines):
self.cluster = ipyparallel.Client()#profile='mpi')
while(len(self.cluster.ids) != n_engines):
time.sleep(0.5)
self.logger.info("Waiting for cluster...")
cluster = ipyparallel.Client()#profile='mpi')
self.cluster = ipyparallel.Client()#profile='mpi')
self.logger.info("Done")
def __del__(self):
self.cluster.shutdown(hub=True)
self.e.terminate()
try:
self.e.communicate(timeout=3)
@@ -137,6 +139,11 @@ class IPEngine(object):
self.logger.info("IPEngine cerr: {:s}".format(cerr))
while(len(self.cluster.ids) != 0):
time.sleep(0.5)
self.logger.info("Waiting for cluster to shutdown...")
self.c.terminate()
try:
self.c.communicate(timeout=3)
@@ -337,11 +344,19 @@ class CudaArray2D:
"""
Enables downloading data from GPU to Python
"""
def download(self, stream, async=False):
def download(self, stream, async=False, extent=None):
if (extent == None):
x = self.x_halo
y = self.y_halo
nx = self.nx
ny = self.ny
else:
x, y, nx, ny = extent
#self.logger.debug("Downloading [%dx%d] buffer", self.nx, self.ny)
#Allocate host memory
#cpu_data = cuda.pagelocked_empty((self.ny, self.nx), np.float32)
cpu_data = np.empty((self.ny, self.nx), dtype=np.float32)
cpu_data = np.empty((ny, nx), dtype=np.float32)
#Create copy object from device to host
copy = cuda.Memcpy2D()
@@ -349,20 +364,52 @@ class CudaArray2D:
copy.set_dst_host(cpu_data)
#Set offsets and pitch of source
copy.src_x_in_bytes = self.x_halo*self.data.strides[1]
copy.src_y = self.y_halo
copy.src_x_in_bytes = x*self.data.strides[1]
copy.src_y = y
copy.src_pitch = self.data.strides[0]
#Set width in bytes to copy for each row and
#number of rows to copy
copy.width_in_bytes = self.nx*cpu_data.itemsize
copy.height = self.ny
copy.width_in_bytes = nx*cpu_data.itemsize
copy.height = ny
copy(stream)
if async==False:
stream.synchronize()
return cpu_data
def upload(self, cpu_data, stream, extent=None):
if (extent == None):
x = self.x_halo
y = self.y_halo
nx = self.nx
ny = self.ny
else:
x, y, nx, ny = extent
assert(nx == cpu_data.shape[1])
assert(ny == cpu_data.shape[0])
assert(x+nx <= self.nx + 2*self.x_halo)
assert(y+ny <= self.ny + 2*self.y_halo)
#Create copy object from device to host
copy = cuda.Memcpy2D()
copy.set_dst_device(self.data.gpudata)
copy.set_src_host(cpu_data)
#Set offsets and pitch of source
copy.dst_x_in_bytes = x*self.data.strides[1]
copy.dst_y = y
copy.dst_pitch = self.data.strides[0]
#Set width in bytes to copy for each row and
#number of rows to copy
copy.width_in_bytes = nx*cpu_data.itemsize
copy.height = ny
copy(stream)

View File

@@ -129,9 +129,16 @@ class MagicLogger(Magics):
logger.addHandler(ch)
logger.log(args.level, "Console logger using level %s", logging.getLevelName(args.level))
#Get the outfilename (try to evaluate if Python expression...)
try:
outfile = eval(args.out, self.shell.user_global_ns, self.shell.user_ns)
except:
outfile = args.out
#Add log to file
logger.log(args.level, "File logger using level %s to %s", logging.getLevelName(args.file_level), args.out)
fh = logging.FileHandler(args.out)
logger.log(args.level, "File logger using level %s to %s", logging.getLevelName(args.file_level), outfile)
fh = logging.FileHandler(outfile)
formatter = logging.Formatter('%(asctime)s:%(name)s:%(levelname)s: %(message)s')
fh.setFormatter(formatter)
fh.setLevel(args.file_level)
@@ -161,6 +168,12 @@ class MagicMPI(Magics):
self.cluster = None
self.cluster = Common.IPEngine(args.num_engines)
# Handle CUDA context when exiting python
import atexit
def exitfunc():
self.logger.info("Exitfunc: Resetting MPI cluster")
self.cluster = None
atexit.register(exitfunc)

View File

@@ -143,4 +143,4 @@ class KP07 (Simulator.BaseSimulator):
def computeDt(self):
max_dt = gpuarray.min(self.cfl_data, stream=self.stream).get();
return max_dt*0.5**self.order
return max_dt*0.5**(self.order-1)

View File

@@ -138,4 +138,4 @@ class KP07_dimsplit(Simulator.BaseSimulator):
def computeDt(self):
max_dt = gpuarray.min(self.cfl_data, stream=self.stream).get();
return max_dt*0.5
return max_dt

View File

@@ -53,11 +53,11 @@ class BoundaryCondition(object):
def __init__(self, types={ \
'north': Type.Reflective, \
'south': Type.Reflective, \
'east': Type.Reflective, \
'west': Type.Reflective \
def __init__(self, types={
'north': Type.Reflective,
'south': Type.Reflective,
'east': Type.Reflective,
'west': Type.Reflective
}):
"""
Constructor
@@ -170,25 +170,24 @@ class BaseSimulator(object):
t_start = self.simTime()
t_end = t_start + t
local_dt = dt
if (local_dt == None):
local_dt = self.computeDt()
update_dt = False
if (dt == None):
update_dt = True
while(self.simTime() < t_end):
if (dt == None and self.simSteps() % 100 == 0):
local_dt = self.computeDt()
if (update_dt and (self.simSteps() % 100 == 0)):
dt = self.computeDt()*self.cfl_scale
# Compute timestep for "this" iteration (i.e., shorten last timestep)
local_dt = np.float32(min(local_dt*self.cfl_scale, t_end-self.simTime()))
dt = np.float32(min(dt, t_end-self.simTime()))
# Stop if end reached (should not happen)
if (local_dt <= 0.0):
if (dt <= 0.0):
self.logger.warning("Timestep size {:d} is less than or equal to zero!".format(self.simSteps()))
break
# Step forward in time
self.step(local_dt)
self.step(dt)
#Print info
print_string = printer.getPrintString(self.simTime() - t_start)
@@ -197,7 +196,7 @@ class BaseSimulator(object):
try:
self.check()
except AssertionError as e:
e.args += ("Step={:d}, time={:f}".format(self.simSteps(), self.simTime()))
e.args += ("Step={:d}, time={:f}".format(self.simSteps(), self.simTime()),)
raise

View File

@@ -48,7 +48,11 @@ def downsample(highres_solution, x_factor, y_factor=None):
def bump(nx, ny, width, height, bump_size=None, h_ref=0.5, h_amp=0.1, u_ref=0.0, u_amp=0.1, v_ref=0.0, v_amp=0.1, ref_nx=None, ref_ny=None):
def bump(nx, ny, width, height,
bump_size=None,
ref_nx=None, ref_ny=None,
x_center=0.5, y_center=0.5,
h_ref=0.5, h_amp=0.1, u_ref=0.0, u_amp=0.1, v_ref=0.0, v_amp=0.1):
if (ref_nx == None):
ref_nx = nx
@@ -64,8 +68,8 @@ def bump(nx, ny, width, height, bump_size=None, h_ref=0.5, h_amp=0.1, u_ref=0.0,
ref_dx = width / float(ref_nx)
ref_dy = height / float(ref_ny)
x_center = ref_dx*ref_nx/2.0
y_center = ref_dy*ref_ny/2.0
x_center = ref_dx*ref_nx*x_center
y_center = ref_dy*ref_ny*y_center
x = ref_dx*(np.arange(0, ref_nx, dtype=np.float32)+0.5) - x_center
y = ref_dy*(np.arange(0, ref_ny, dtype=np.float32)+0.5) - y_center