Fixed multigpu for euler

This commit is contained in:
André R. Brodtkorb 2018-11-29 14:33:13 +01:00
parent b03afc3d81
commit f9f0f20df8
7 changed files with 463 additions and 637 deletions

View File

@ -24,6 +24,7 @@ import os
import numpy as np
import time
import signal
import subprocess
import tempfile
import re
@ -102,7 +103,11 @@ class IPEngine(object):
self.logger.info("Starting IPController")
self.c_buff = PopenFileBuffer()
c_cmd = ["ipcontroller", "--ip='*'"]
self.c = subprocess.Popen(c_cmd, stderr=self.c_buff.stderr, stdout=self.c_buff.stdout, shell=False)
self.c = subprocess.Popen(c_cmd,
stderr=self.c_buff.stderr,
stdout=self.c_buff.stdout,
shell=False,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
#Wait until controller is running
time.sleep(3)
@ -111,11 +116,16 @@ class IPEngine(object):
self.logger.info("Starting IPEngines")
self.e_buff = PopenFileBuffer()
e_cmd = ["mpiexec", "-n", str(n_engines), "ipengine", "--mpi"]
self.e = subprocess.Popen(e_cmd, stderr=self.e_buff.stderr, stdout=self.e_buff.stdout, shell=False)
self.e = subprocess.Popen(e_cmd,
stderr=self.e_buff.stderr,
stdout=self.e_buff.stdout,
shell=False,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
# attach to a running cluster
import ipyparallel
self.cluster = ipyparallel.Client()#profile='mpi')
time.sleep(3)
while(len(self.cluster.ids) != n_engines):
time.sleep(0.5)
self.logger.info("Waiting for cluster...")
@ -124,37 +134,51 @@ class IPEngine(object):
self.logger.info("Done")
def __del__(self):
self.cluster.shutdown(hub=True)
self.e.terminate()
try:
self.e.communicate(timeout=3)
except TimeoutExpired:
self.logger.warn("Killing IPEngine")
self.e.kill()
self.e.communicate()
self.shutdown()
def shutdown(self):
if (self.e is not None):
if (os.name == 'nt'):
self.logger.warn("Sending CTRL+C to IPEngine")
self.e.send_signal(signal.CTRL_C_EVENT)
try:
self.e.communicate(timeout=3)
self.e.kill()
except subprocess.TimeoutExpired:
self.logger.warn("Killing IPEngine")
self.e.kill()
self.e.communicate()
self.e = None
cout, cerr = self.e_buff.read()
self.logger.info("IPEngine cout: {:s}".format(cout))
self.logger.info("IPEngine cerr: {:s}".format(cerr))
self.e_buff = None
cout, cerr = self.e_buff.read()
self.logger.info("IPEngine cout: {:s}".format(cout))
self.logger.info("IPEngine cerr: {:s}".format(cerr))
while(len(self.cluster.ids) != 0):
time.sleep(0.5)
self.logger.info("Waiting for cluster to shutdown...")
gc.collect()
if (self.c is not None):
if (os.name == 'nt'):
self.logger.warn("Sending CTRL+C to IPController")
self.c.send_signal(signal.CTRL_C_EVENT)
try:
self.c.communicate(timeout=3)
self.c.kill()
except subprocess.TimeoutExpired:
self.logger.warn("Killing IPController")
self.c.kill()
self.c.communicate()
self.c = None
cout, cerr = self.c_buff.read()
self.logger.info("IPController cout: {:s}".format(cout))
self.logger.info("IPController cerr: {:s}".format(cerr))
self.c_buff = None
gc.collect()
self.c.terminate()
try:
self.c.communicate(timeout=3)
except TimeoutExpired:
self.logger.warn("Killing IPController")
self.c.kill()
self.c.communicate()
cout, cerr = self.c_buff.read()
self.logger.info("IPController cout: {:s}".format(cout))
self.logger.info("IPController cerr: {:s}".format(cerr))
@ -348,14 +372,14 @@ class CudaArray2D:
copy.set_dst_host(cpu_data)
#Set offsets and pitch of source
copy.src_x_in_bytes = x*self.data.strides[1]
copy.src_y = y
copy.src_x_in_bytes = int(x)*self.data.strides[1]
copy.src_y = int(y)
copy.src_pitch = self.data.strides[0]
#Set width in bytes to copy for each row and
#number of rows to copy
copy.width_in_bytes = nx*cpu_data.itemsize
copy.height = ny
copy.width_in_bytes = int(nx)*cpu_data.itemsize
copy.height = int(ny)
copy(stream)
if async==False:
@ -384,14 +408,14 @@ class CudaArray2D:
copy.set_src_host(cpu_data)
#Set offsets and pitch of source
copy.dst_x_in_bytes = x*self.data.strides[1]
copy.dst_y = y
copy.dst_x_in_bytes = int(x)*self.data.strides[1]
copy.dst_y = int(y)
copy.dst_pitch = self.data.strides[0]
#Set width in bytes to copy for each row and
#number of rows to copy
copy.width_in_bytes = nx*cpu_data.itemsize
copy.height = ny
copy.width_in_bytes = int(nx)*cpu_data.itemsize
copy.height = int(ny)
copy(stream)

View File

@ -71,6 +71,7 @@ class EE2D_KP07_dimsplit (BaseSimulator):
nx, ny,
dx, dy,
cfl_scale,
2,
block_width, block_height)
self.g = np.float32(g)
self.gamma = np.float32(gamma)
@ -108,11 +109,8 @@ class EE2D_KP07_dimsplit (BaseSimulator):
self.cfl_data.fill(dt, stream=self.stream)
def step(self, dt):
self.substepDimsplit(0.5*dt, 0)
self.substepDimsplit(0.5*dt, 1)
self.t += dt
self.nt += 2
def substep(self, dt, step_number):
self.substepDimsplit(0.5*dt, step_number)
def substepDimsplit(self, dt, substep):
self.kernel.prepared_async_call(self.grid_size, self.block_size, self.stream,

View File

@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import logging
import gc
from IPython.core import magic_arguments
from IPython.core.magic import line_magic, Magics, magics_class
@ -111,7 +112,7 @@ class MagicLogger(Magics):
'--file_level', '-f', type=int, default=10, help='The level of logging to file [0, 50]')
def setup_logging(self, line):
if (self.logger_initialized):
logging.getLogger('').info("Global logger already initialized!")
logging.getLogger('GPUSimulators').info("Global logger already initialized!")
return;
else:
self.logger_initialized = True
@ -120,7 +121,7 @@ class MagicLogger(Magics):
import sys
#Get root logger
logger = logging.getLogger('')
logger = logging.getLogger('GPUSimulators')
logger.setLevel(min(args.level, args.file_level))
#Add log to screen
@ -153,27 +154,23 @@ class MagicLogger(Magics):
@magics_class
class MagicMPI(Magics):
cluster = None
@line_magic
@magic_arguments.magic_arguments()
@magic_arguments.argument(
'name', type=str, help='Name of context to create')
@magic_arguments.argument(
'--num_engines', '-n', type=int, default=4, help='Number of engines to start')
def setup_mpi(self, line):
args = magic_arguments.parse_argstring(self.setup_mpi, line)
logger = logging.getLogger('')
if (self.cluster != None):
logger = logging.getLogger('GPUSimulators')
if args.name in self.shell.user_ns.keys():
logger.warning("MPI alreay set up, resetting")
self.cluster = None
self.cluster = Common.IPEngine(args.num_engines)
self.shell.user_ns[args.name].shutdown()
self.shell.user_ns[args.name] = None
gc.collect()
self.shell.user_ns[args.name] = Common.IPEngine(args.num_engines)
# Handle CUDA context when exiting python
import atexit
def exitfunc():
self.logger.info("Exitfunc: Resetting MPI cluster")
self.cluster = None
atexit.register(exitfunc)

View File

@ -57,7 +57,7 @@ class MPISimulator(Simulator.BaseSimulator):
self.gc_y = int(self.sim.u0[0].y_halo)
self.nx = int(self.sim.nx)
self.ny = int(self.sim.ny)
self.nvars = 3
self.nvars = len(self.sim.u0.gpu_variables)
#Allocate data for receiving
#Note that east and west also transfer ghost cells
@ -68,44 +68,45 @@ class MPISimulator(Simulator.BaseSimulator):
self.in_s = np.empty((self.nvars, self.gc_y, self.nx), dtype=np.float32)
#Allocate data for sending
self.out_e = np.empty((self.nvars, self.ny + 2*self.gc_y, self.gc_x), dtype=np.float32)
self.out_w = np.empty((self.nvars, self.ny + 2*self.gc_y, self.gc_x), dtype=np.float32)
self.out_n = np.empty((self.nvars, self.gc_y, self.nx), dtype=np.float32)
self.out_s = np.empty((self.nvars, self.gc_y, self.nx), dtype=np.float32)
self.out_e = np.empty_like(self.in_e)
self.out_w = np.empty_like(self.in_w)
self.out_n = np.empty_like(self.in_n)
self.out_s = np.empty_like(self.in_s)
#Set regions for ghost cells to read from
self.read_e = [ self.nx, 0, self.gc_x, self.ny + 2*self.gc_y]
self.read_w = [self.gc_x, 0, self.gc_x, self.ny + 2*self.gc_y]
self.read_n = [self.gc_x, self.ny, self.nx, self.gc_y]
self.read_s = [self.gc_x, self.gc_y, self.nx, self.gc_y]
self.read_e = np.array([ self.nx, 0, self.gc_x, self.ny + 2*self.gc_y])
self.read_w = np.array([self.gc_x, 0, self.gc_x, self.ny + 2*self.gc_y])
self.read_n = np.array([self.gc_x, self.ny, self.nx, self.gc_y])
self.read_s = np.array([self.gc_x, self.gc_y, self.nx, self.gc_y])
#Set regions for ghost cells to write to
self.write_e = [self.nx+self.gc_x, 0, self.gc_x, self.ny + 2*self.gc_y]
self.write_w = [ 0, 0, self.gc_x, self.ny + 2*self.gc_y]
self.write_n = [ self.gc_x, self.ny+self.gc_y, self.nx, self.gc_y]
self.write_s = [ self.gc_x, 0, self.nx, self.gc_y]
#Initialize ghost cells
self.exchange()
self.write_e = self.read_e + np.array([self.gc_x, 0, 0, 0])
self.write_w = self.read_w - np.array([self.gc_x, 0, 0, 0])
self.write_n = self.read_n + np.array([0, self.gc_y, 0, 0])
self.write_s = self.read_s - np.array([0, self.gc_y, 0, 0])
self.logger.debug("Simlator rank {:d} created ".format(self.rank))
def substep(self, dt, step_number):
self.sim.substep(dt, step_number)
self.exchange()
self.sim.substep(dt, step_number)
def download(self):
raise(NotImplementedError("Needs to be implemented!"))
return self.sim.download()
def synchronize(self):
raise(NotImplementedError("Needs to be implemented!"))
self.sim.synchronize()
def check(self):
return self.sim.check()
def computeDt(self):
raise(NotImplementedError("Needs to be implemented!"))
local_dt = np.array([np.float32(self.sim.computeDt())]);
global_dt = np.empty(1, dtype=np.float32)
self.comm.Allreduce(local_dt, global_dt, op=MPI.MIN)
self.logger.debug("Local dt: {:f}, global dt: {:f}".format(local_dt[0], global_dt[0]))
return global_dt[0]
def exchange(self):
#Shorthands for dimensions
@ -126,13 +127,13 @@ class MPISimulator(Simulator.BaseSimulator):
#Send to north/south neighbours
comm_send = []
comm_send += [self.comm.Isend(self.out_n, dest=self.north, tag=0)]
comm_send += [self.comm.Isend(self.out_s, dest=self.south, tag=1)]
comm_send += [self.comm.Isend(self.out_n, dest=self.north, tag=4*self.nt + 0)]
comm_send += [self.comm.Isend(self.out_s, dest=self.south, tag=4*self.nt + 1)]
#Receive from north/south neighbors
comm_recv = []
comm_recv += [self.comm.Irecv(self.in_n, source=self.north, tag=1)]
comm_recv += [self.comm.Irecv(self.in_s, source=self.south, tag=0)]
comm_recv += [self.comm.Irecv(self.in_s, source=self.south, tag=4*self.nt + 0)]
comm_recv += [self.comm.Irecv(self.in_n, source=self.north, tag=4*self.nt + 1)]
#Wait for incoming transfers to complete
for comm in comm_recv:
@ -163,13 +164,13 @@ class MPISimulator(Simulator.BaseSimulator):
#Send to east/west neighbours
comm_send = []
comm_send += [self.comm.Isend(self.out_e, dest=self.east, tag=2)]
comm_send += [self.comm.Isend(self.out_w, dest=self.west, tag=3)]
comm_send += [self.comm.Isend(self.out_e, dest=self.east, tag=4*self.nt + 2)]
comm_send += [self.comm.Isend(self.out_w, dest=self.west, tag=4*self.nt + 3)]
#Receive from east/west neighbors
comm_recv = []
comm_recv += [self.comm.Irecv(self.in_e, source=self.east, tag=3)]
comm_recv += [self.comm.Irecv(self.in_w, source=self.west, tag=2)]
comm_recv += [self.comm.Irecv(self.in_w, source=self.west, tag=4*self.nt + 2)]
comm_recv += [self.comm.Irecv(self.in_e, source=self.east, tag=4*self.nt + 3)]
#Wait for incoming transfers to complete
for comm in comm_recv:
@ -266,4 +267,8 @@ class MPISimulator(Simulator.BaseSimulator):
#Pad with ones to guarantee num_factors
factors = factors + [1]*(num_factors - len(factors))
#Sort in descending order
factors = np.flip(np.sort(factors))
return factors

View File

@ -134,6 +134,7 @@ class BaseSimulator(object):
self.ny = np.int32(ny)
self.dx = np.float32(dx)
self.dy = np.float32(dy)
self.dt = None
self.cfl_scale = cfl_scale
self.num_substeps = num_substeps
@ -174,24 +175,27 @@ class BaseSimulator(object):
t_start = self.simTime()
t_end = t_start + t
update_dt = False
if (dt == None):
update_dt = True
update_dt = True
if (dt is not None):
update_dt = False
self.dt = dt
while(self.simTime() < t_end):
# Update dt every 100 timesteps and cross your fingers it works
# for the next 100
if (update_dt and (self.simSteps() % 100 == 0)):
dt = self.computeDt()*self.cfl_scale
self.dt = self.computeDt()*self.cfl_scale
# Compute timestep for "this" iteration (i.e., shorten last timestep)
dt = np.float32(min(dt, t_end-self.simTime()))
current_dt = np.float32(min(self.dt, t_end-self.simTime()))
# Stop if end reached (should not happen)
if (dt <= 0.0):
if (current_dt <= 0.0):
self.logger.warning("Timestep size {:d} is less than or equal to zero!".format(self.simSteps()))
break
# Step forward in time
self.step(dt)
self.step(current_dt)
#Print info
print_string = printer.getPrintString(self.simTime() - t_start)

View File

@ -185,7 +185,7 @@ def genKelvinHelmholtz(nx, ny, gamma, roughness=0.125):
dy = 1.0 / ny
def genSmoothRandom(nx, n):
assert (n <= nx), "Number of generated points nx must be larger than n"
n = max(1, min(n, nx))
if n == nx:
return np.random.random(nx)-0.5
@ -195,7 +195,17 @@ def genKelvinHelmholtz(nx, ny, gamma, roughness=0.125):
#Control points and interpolator
xp = np.linspace(0.0, 1.0, n)
yp = np.random.random(n) - 0.5
f = interp1d(xp, yp, kind='cubic')
if (n == 1):
kind = 'nearest'
elif (n == 2):
kind = 'linear'
elif (n == 3):
kind = 'quadratic'
else:
kind = 'cubic'
f = interp1d(xp, yp, kind=kind)
#Interpolation points
x = np.linspace(0.0, 1.0, nx)

File diff suppressed because one or more lines are too long