diff --git a/GPUSimulators/Common.py b/GPUSimulators/Common.py index aba6a1f..39af310 100644 --- a/GPUSimulators/Common.py +++ b/GPUSimulators/Common.py @@ -24,6 +24,8 @@ import os import numpy as np import time +import subprocess +import tempfile import re import io import hashlib @@ -61,6 +63,93 @@ class Timer(object): + +class PopenFileBuffer(object): + """ + Simple class for holding a set of tempfiles + for communicating with a subprocess + """ + def __init__(self): + self.stdout = tempfile.TemporaryFile(mode='w+t') + self.stderr = tempfile.TemporaryFile(mode='w+t') + + def __del__(self): + self.stdout.close() + self.stderr.close() + + def read(self): + self.stdout.seek(0) + cout = self.stdout.read() + self.stdout.seek(0, 2) + + self.stderr.seek(0) + cerr = self.stderr.read() + self.stderr.seek(0, 2) + + return cout, cerr + + + + +class IPEngine(object): + """ + Class for starting IPEngines for MPI processing in IPython + """ + def __init__(self, n_engines): + self.logger = logging.getLogger(__name__) + + #Start ipcontroller + self.logger.info("Starting IPController") + self.c_buff = PopenFileBuffer() + c_cmd = ["ipcontroller", "--ip='*'"] + self.c = subprocess.Popen(c_cmd, stderr=self.c_buff.stderr, stdout=self.c_buff.stdout, shell=False) + + #Wait until controller is running + time.sleep(3) + + #Start engines + self.logger.info("Starting IPEngines") + self.e_buff = PopenFileBuffer() + e_cmd = ["mpiexec", "-n", str(n_engines), "ipengine", "--mpi"] + self.e = subprocess.Popen(e_cmd, stderr=self.e_buff.stderr, stdout=self.e_buff.stdout, shell=False) + + # attach to a running cluster + import ipyparallel + cluster = ipyparallel.Client()#profile='mpi') + while(len(cluster.ids) != n_engines): + time.sleep(0.5) + self.logger.info("Waiting for cluster...") + cluster = ipyparallel.Client()#profile='mpi') + + self.logger.info("Done") + + def __del__(self): + self.e.terminate() + try: + self.e.communicate(timeout=3) + except TimeoutExpired: + self.logger.warn("Killing IPEngine") + self.e.kill() + self.e.communicate() + + cout, cerr = self.e_buff.read() + self.logger.info("IPEngine cout: {:s}".format(cout)) + self.logger.info("IPEngine cerr: {:s}".format(cerr)) + + + self.c.terminate() + try: + self.c.communicate(timeout=3) + except TimeoutExpired: + self.logger.warn("Killing IPController") + self.c.kill() + self.c.communicate() + + cout, cerr = self.c_buff.read() + self.logger.info("IPController cout: {:s}".format(cout)) + self.logger.info("IPController cerr: {:s}".format(cerr)) + + diff --git a/GPUSimulators/IPythonMagic.py b/GPUSimulators/IPythonMagic.py index cf0a8ef..117f586 100644 --- a/GPUSimulators/IPythonMagic.py +++ b/GPUSimulators/IPythonMagic.py @@ -29,7 +29,7 @@ from GPUSimulators import Common, CudaContext @magics_class -class MyIPythonMagic(Magics): +class MagicCudaContext(Magics): @line_magic @magic_arguments.magic_arguments() @magic_arguments.argument( @@ -97,10 +97,10 @@ class MyIPythonMagic(Magics): +@magics_class +class MagicLogger(Magics): logger_initialized = False - - @line_magic @magic_arguments.magic_arguments() @magic_arguments.argument( @@ -141,6 +141,27 @@ class MyIPythonMagic(Magics): + + + +@magics_class +class MagicMPI(Magics): + cluster = None + + @line_magic + @magic_arguments.magic_arguments() + @magic_arguments.argument( + '--num_engines', '-n', type=int, default=4, help='Number of engines to start') + def setup_mpi(self, line): + args = magic_arguments.parse_argstring(self.setup_mpi, line) + + logger = logging.getLogger('') + if (self.cluster != None): + logger.warning("MPI alreay set up, resetting") + self.cluster = None + self.cluster = Common.IPEngine(args.num_engines) + + @@ -149,5 +170,7 @@ class MyIPythonMagic(Magics): # Register ip = get_ipython() -ip.register_magics(MyIPythonMagic) +ip.register_magics(MagicCudaContext) +ip.register_magics(MagicLogger) +ip.register_magics(MagicMPI)