Added mpi utilities

This commit is contained in:
André R. Brodtkorb 2018-11-20 08:59:42 +01:00
parent cfcaa65bbe
commit 074e38de84
2 changed files with 116 additions and 4 deletions

View File

@ -24,6 +24,8 @@ import os
import numpy as np import numpy as np
import time import time
import subprocess
import tempfile
import re import re
import io import io
import hashlib import hashlib
@ -62,6 +64,93 @@ class Timer(object):
class PopenFileBuffer(object):
"""
Simple class for holding a set of tempfiles
for communicating with a subprocess
"""
def __init__(self):
self.stdout = tempfile.TemporaryFile(mode='w+t')
self.stderr = tempfile.TemporaryFile(mode='w+t')
def __del__(self):
self.stdout.close()
self.stderr.close()
def read(self):
self.stdout.seek(0)
cout = self.stdout.read()
self.stdout.seek(0, 2)
self.stderr.seek(0)
cerr = self.stderr.read()
self.stderr.seek(0, 2)
return cout, cerr
class IPEngine(object):
"""
Class for starting IPEngines for MPI processing in IPython
"""
def __init__(self, n_engines):
self.logger = logging.getLogger(__name__)
#Start ipcontroller
self.logger.info("Starting IPController")
self.c_buff = PopenFileBuffer()
c_cmd = ["ipcontroller", "--ip='*'"]
self.c = subprocess.Popen(c_cmd, stderr=self.c_buff.stderr, stdout=self.c_buff.stdout, shell=False)
#Wait until controller is running
time.sleep(3)
#Start engines
self.logger.info("Starting IPEngines")
self.e_buff = PopenFileBuffer()
e_cmd = ["mpiexec", "-n", str(n_engines), "ipengine", "--mpi"]
self.e = subprocess.Popen(e_cmd, stderr=self.e_buff.stderr, stdout=self.e_buff.stdout, shell=False)
# attach to a running cluster
import ipyparallel
cluster = ipyparallel.Client()#profile='mpi')
while(len(cluster.ids) != n_engines):
time.sleep(0.5)
self.logger.info("Waiting for cluster...")
cluster = ipyparallel.Client()#profile='mpi')
self.logger.info("Done")
def __del__(self):
self.e.terminate()
try:
self.e.communicate(timeout=3)
except TimeoutExpired:
self.logger.warn("Killing IPEngine")
self.e.kill()
self.e.communicate()
cout, cerr = self.e_buff.read()
self.logger.info("IPEngine cout: {:s}".format(cout))
self.logger.info("IPEngine cerr: {:s}".format(cerr))
self.c.terminate()
try:
self.c.communicate(timeout=3)
except TimeoutExpired:
self.logger.warn("Killing IPController")
self.c.kill()
self.c.communicate()
cout, cerr = self.c_buff.read()
self.logger.info("IPController cout: {:s}".format(cout))
self.logger.info("IPController cerr: {:s}".format(cerr))
class DataDumper(object): class DataDumper(object):

View File

@ -29,7 +29,7 @@ from GPUSimulators import Common, CudaContext
@magics_class @magics_class
class MyIPythonMagic(Magics): class MagicCudaContext(Magics):
@line_magic @line_magic
@magic_arguments.magic_arguments() @magic_arguments.magic_arguments()
@magic_arguments.argument( @magic_arguments.argument(
@ -97,10 +97,10 @@ class MyIPythonMagic(Magics):
@magics_class
class MagicLogger(Magics):
logger_initialized = False logger_initialized = False
@line_magic @line_magic
@magic_arguments.magic_arguments() @magic_arguments.magic_arguments()
@magic_arguments.argument( @magic_arguments.argument(
@ -144,10 +144,33 @@ class MyIPythonMagic(Magics):
@magics_class
class MagicMPI(Magics):
cluster = None
@line_magic
@magic_arguments.magic_arguments()
@magic_arguments.argument(
'--num_engines', '-n', type=int, default=4, help='Number of engines to start')
def setup_mpi(self, line):
args = magic_arguments.parse_argstring(self.setup_mpi, line)
logger = logging.getLogger('')
if (self.cluster != None):
logger.warning("MPI alreay set up, resetting")
self.cluster = None
self.cluster = Common.IPEngine(args.num_engines)
# Register # Register
ip = get_ipython() ip = get_ipython()
ip.register_magics(MyIPythonMagic) ip.register_magics(MagicCudaContext)
ip.register_magics(MagicLogger)
ip.register_magics(MagicMPI)