961 lines
35 KiB
Python

# -*- coding: utf-8 -*-
"""
This python module implements the different helper functions and
classes
Copyright (C) 2018 SINTEF ICT
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import os
import numpy as np
import time
import signal
import subprocess
import tempfile
import re
import io
import hashlib
import logging
import gc
import netCDF4
import json
from tqdm import tqdm
#import pycuda.compiler as cuda_compiler
#import pycuda.gpuarray
#import pycuda.driver as cuda
#from pycuda.tools import PageLockedMemoryPool
from hip import hip, hiprtc
from hip import hipblas
def hip_check(call_result):
err = call_result[0]
result = call_result[1:]
if len(result) == 1:
result = result[0]
if isinstance(err, hip.hipError_t) and err != hip.hipError_t.hipSuccess:
raise RuntimeError(str(err))
elif (
isinstance(err, hiprtc.hiprtcResult)
and err != hiprtc.hiprtcResult.HIPRTC_SUCCESS
):
raise RuntimeError(str(err))
return result
def safeCall(cmd):
logger = logging.getLogger(__name__)
try:
#git rev-parse HEAD
current_dir = os.path.dirname(os.path.realpath(__file__))
params = dict()
params['stderr'] = subprocess.STDOUT
params['cwd'] = current_dir
params['universal_newlines'] = True #text=True in more recent python
params['shell'] = False
if os.name == 'nt':
params['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
stdout = subprocess.check_output(cmd, **params)
except subprocess.CalledProcessError as e:
output = e.output
logger.error("Git failed, \nReturn code: " + str(e.returncode) + "\nOutput: " + output)
raise e
return stdout
def getGitHash():
return safeCall(["git", "rev-parse", "HEAD"])
def getGitStatus():
return safeCall(["git", "status", "--porcelain", "-uno"])
def toJson(in_dict, compressed=True):
"""
Creates JSON string from a dictionary
"""
logger = logging.getLogger(__name__)
out_dict = in_dict.copy()
for key in out_dict:
if isinstance(out_dict[key], np.ndarray):
out_dict[key] = out_dict[key].tolist()
else:
try:
json.dumps(out_dict[key])
except:
value = str(out_dict[key])
logger.warning("JSON: Converting {:s} to string ({:s})".format(key, value))
out_dict[key] = value
return json.dumps(out_dict)
def runSimulation(simulator, simulator_args, outfile, save_times, save_var_names=[], dt=None):
"""
Runs a simulation, and stores output in netcdf file. Stores the times given in
save_times, and saves all of the variables in list save_var_names. Elements in
save_var_names can be set to None if you do not want to save them
"""
profiling_data_sim_runner = { 'start': {}, 'end': {} }
profiling_data_sim_runner["start"]["t_sim_init"] = 0
profiling_data_sim_runner["end"]["t_sim_init"] = 0
profiling_data_sim_runner["start"]["t_nc_write"] = 0
profiling_data_sim_runner["end"]["t_nc_write"] = 0
profiling_data_sim_runner["start"]["t_full_step"] = 0
profiling_data_sim_runner["end"]["t_full_step"] = 0
profiling_data_sim_runner["start"]["t_sim_init"] = time.time()
logger = logging.getLogger(__name__)
assert len(save_times) > 0, "Need to specify which times to save"
with Timer("construct") as t:
sim = simulator(**simulator_args)
logger.info("Constructed in " + str(t.secs) + " seconds")
#Create netcdf file and simulate
with DataDumper(outfile, mode='w', clobber=False) as outdata:
#Create attributes (metadata)
outdata.ncfile.created = time.ctime(time.time())
outdata.ncfile.git_hash = getGitHash()
outdata.ncfile.git_status = getGitStatus()
outdata.ncfile.simulator = str(simulator)
# do not write fields to attributes (they are to large)
simulator_args_for_ncfile = simulator_args.copy()
del simulator_args_for_ncfile["rho"]
del simulator_args_for_ncfile["rho_u"]
del simulator_args_for_ncfile["rho_v"]
del simulator_args_for_ncfile["E"]
outdata.ncfile.sim_args = toJson(simulator_args_for_ncfile)
#Create dimensions
outdata.ncfile.createDimension('time', len(save_times))
outdata.ncfile.createDimension('x', simulator_args['nx'])
outdata.ncfile.createDimension('y', simulator_args['ny'])
#Create variables for dimensions
ncvars = {}
ncvars['time'] = outdata.ncfile.createVariable('time', np.dtype('float32').char, 'time')
ncvars['x'] = outdata.ncfile.createVariable( 'x', np.dtype('float32').char, 'x')
ncvars['y'] = outdata.ncfile.createVariable( 'y', np.dtype('float32').char, 'y')
#Fill variables with proper values
ncvars['time'][:] = save_times
extent = sim.getExtent()
ncvars['x'][:] = np.linspace(extent[0], extent[1], simulator_args['nx'])
ncvars['y'][:] = np.linspace(extent[2], extent[3], simulator_args['ny'])
#Choose which variables to download (prune None from list, but keep the index)
download_vars = []
for i, var_name in enumerate(save_var_names):
if var_name is not None:
download_vars += [i]
save_var_names = list(save_var_names[i] for i in download_vars)
#Create variables
for var_name in save_var_names:
ncvars[var_name] = outdata.ncfile.createVariable(var_name, np.dtype('float32').char, ('time', 'y', 'x'), zlib=True, least_significant_digit=3)
#Create step sizes between each save
t_steps = np.empty_like(save_times)
t_steps[0] = save_times[0]
t_steps[1:] = save_times[1:] - save_times[0:-1]
profiling_data_sim_runner["end"]["t_sim_init"] = time.time()
#Start simulation loop
# progress_printer = ProgressPrinter(save_times[-1], print_every=10)
for k, t_step in tqdm(enumerate(t_steps), desc="Simulation Loop"):
#Get target time and step size there
# t_step = t_steps[k]
# t_end = save_times[k]
#Sanity check simulator
try:
sim.check()
except AssertionError as e:
logger.error("Error after {:d} steps (t={:f}: {:s}".format(sim.simSteps(), sim.simTime(), str(e)))
return outdata.filename
profiling_data_sim_runner["start"]["t_full_step"] += time.time()
#Simulate
if t_step > 0.0:
sim.simulate(t_step, dt)
profiling_data_sim_runner["end"]["t_full_step"] += time.time()
profiling_data_sim_runner["start"]["t_nc_write"] += time.time()
#Download
save_vars = sim.download(download_vars)
#Save to file
for i, var_name in enumerate(save_var_names):
ncvars[var_name][k, :] = save_vars[i]
profiling_data_sim_runner["end"]["t_nc_write"] += time.time()
#Write progress to screen
# print_string = progress_printer.getPrintString(t_end)
# if (print_string):
# logger.debug(print_string)
logger.debug("Simulated to t={:f} in {:d} timesteps (average dt={:f})".format(save_times[-1], sim.simSteps(), sim.simTime() / sim.simSteps()))
return outdata.filename, profiling_data_sim_runner, sim.profiling_data_mpi
#return outdata.filename
class Timer(object):
"""
Class which keeps track of time spent for a section of code
"""
def __init__(self, tag, log_level=logging.DEBUG):
self.tag = tag
self.log_level = log_level
self.logger = logging.getLogger(__name__)
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, *args):
self.end = time.time()
self.secs = self.end - self.start
self.msecs = self.secs * 1000 # millisecs
self.logger.log(self.log_level, "%s: %f ms", self.tag, self.msecs)
def elapsed(self):
return time.time() - self.start
class PopenFileBuffer(object):
"""
Simple class for holding a set of tempfiles
for communicating with a subprocess
"""
def __init__(self):
self.stdout = tempfile.TemporaryFile(mode='w+t')
self.stderr = tempfile.TemporaryFile(mode='w+t')
def __del__(self):
self.stdout.close()
self.stderr.close()
def read(self):
self.stdout.seek(0)
cout = self.stdout.read()
self.stdout.seek(0, 2)
self.stderr.seek(0)
cerr = self.stderr.read()
self.stderr.seek(0, 2)
return cout, cerr
class IPEngine(object):
"""
Class for starting IPEngines for MPI processing in IPython
"""
def __init__(self, n_engines):
self.logger = logging.getLogger(__name__)
#Start ipcontroller
self.logger.info("Starting IPController")
self.c_buff = PopenFileBuffer()
c_cmd = ["ipcontroller", "--ip='*'"]
c_params = dict()
c_params['stderr'] = self.c_buff.stderr
c_params['stdout'] = self.c_buff.stdout
c_params['shell'] = False
if os.name == 'nt':
c_params['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
self.c = subprocess.Popen(c_cmd, **c_params)
#Wait until controller is running
time.sleep(3)
#Start engines
self.logger.info("Starting IPEngines")
self.e_buff = PopenFileBuffer()
e_cmd = ["mpiexec", "-n", str(n_engines), "ipengine", "--mpi"]
e_params = dict()
e_params['stderr'] = self.e_buff.stderr
e_params['stdout'] = self.e_buff.stdout
e_params['shell'] = False
if os.name == 'nt':
e_params['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
self.e = subprocess.Popen(e_cmd, **e_params)
# attach to a running cluster
import ipyparallel
self.cluster = ipyparallel.Client()#profile='mpi')
time.sleep(3)
while len(self.cluster.ids) != n_engines:
time.sleep(0.5)
self.logger.info("Waiting for cluster...")
self.cluster = ipyparallel.Client()#profile='mpi')
self.logger.info("Done")
def __del__(self):
self.shutdown()
def shutdown(self):
if (self.e is not None):
if (os.name == 'nt'):
self.logger.warn("Sending CTRL+C to IPEngine")
self.e.send_signal(signal.CTRL_C_EVENT)
try:
self.e.communicate(timeout=3)
self.e.kill()
except subprocess.TimeoutExpired:
self.logger.warn("Killing IPEngine")
self.e.kill()
self.e.communicate()
self.e = None
cout, cerr = self.e_buff.read()
self.logger.info("IPEngine cout: {:s}".format(cout))
self.logger.info("IPEngine cerr: {:s}".format(cerr))
self.e_buff = None
gc.collect()
if (self.c is not None):
if (os.name == 'nt'):
self.logger.warn("Sending CTRL+C to IPController")
self.c.send_signal(signal.CTRL_C_EVENT)
try:
self.c.communicate(timeout=3)
self.c.kill()
except subprocess.TimeoutExpired:
self.logger.warn("Killing IPController")
self.c.kill()
self.c.communicate()
self.c = None
cout, cerr = self.c_buff.read()
self.logger.info("IPController cout: {:s}".format(cout))
self.logger.info("IPController cerr: {:s}".format(cerr))
self.c_buff = None
gc.collect()
class DataDumper(object):
"""
Simple class for holding a netCDF4 object
(handles opening and closing in a nice way)
Use as
with DataDumper("filename") as data:
...
"""
def __init__(self, filename, *args, **kwargs):
self.logger = logging.getLogger(__name__)
#Create directory if needed
filename = os.path.abspath(filename)
dirname = os.path.dirname(filename)
if dirname and not os.path.isdir(dirname):
self.logger.info("Creating directory " + dirname)
os.makedirs(dirname)
#Get mode of file if we have that
mode = None
if (args):
mode = args[0]
elif (kwargs and 'mode' in kwargs.keys()):
mode = kwargs['mode']
#Create new unique file if writing
if (mode):
if (("w" in mode) or ("+" in mode) or ("a" in mode)):
i = 0
stem, ext = os.path.splitext(filename)
while (os.path.isfile(filename)):
filename = "{:s}_{:04d}{:s}".format(stem, i, ext)
i = i+1
self.filename = os.path.abspath(filename)
#Save arguments
self.args = args
self.kwargs = kwargs
#Log output
self.logger.info("Initialized " + self.filename)
def __enter__(self):
self.logger.info("Opening " + self.filename)
if (self.args):
self.logger.info("Arguments: " + str(self.args))
if (self.kwargs):
self.logger.info("Keyword arguments: " + str(self.kwargs))
self.ncfile = netCDF4.Dataset(self.filename, *self.args, **self.kwargs)
return self
def __exit__(self, *args):
self.logger.info("Closing " + self.filename)
self.ncfile.close()
def toJson(in_dict):
out_dict = in_dict.copy()
for key in out_dict:
if isinstance(out_dict[key], np.ndarray):
out_dict[key] = out_dict[key].tolist()
else:
try:
json.dumps(out_dict[key])
except:
out_dict[key] = str(out_dict[key])
return json.dumps(out_dict)
# class ProgressPrinter(object):
# """
# Small helper class for
# """
# def __init__(self, total_steps, print_every=5):
# self.logger = logging.getLogger(__name__)
# self.start = time.time()
# self.total_steps = total_steps
# self.print_every = print_every
# self.next_print_time = self.print_every
# self.last_step = 0
# self.secs_per_iter = None
# def getPrintString(self, step):
# elapsed = time.time() - self.start
# if (elapsed > self.next_print_time):
# dt = elapsed - (self.next_print_time - self.print_every)
# dsteps = step - self.last_step
# steps_remaining = self.total_steps - step
# if (dsteps == 0):
# return
# self.last_step = step
# self.next_print_time = elapsed + self.print_every
# if not self.secs_per_iter:
# self.secs_per_iter = dt / dsteps
# self.secs_per_iter = 0.2*self.secs_per_iter + 0.8*(dt / dsteps)
# remaining_time = steps_remaining * self.secs_per_iter
# return "{:s}. Total: {:s}, elapsed: {:s}, remaining: {:s}".format(
# ProgressPrinter.progressBar(step, self.total_steps),
# ProgressPrinter.timeString(elapsed + remaining_time),
# ProgressPrinter.timeString(elapsed),
# ProgressPrinter.timeString(remaining_time))
# def timeString(seconds):
# seconds = int(max(seconds, 1))
# minutes, seconds = divmod(seconds, 60)
# hours, minutes = divmod(minutes, 60)
# periods = [('h', hours), ('m', minutes), ('s', seconds)]
# time_string = ' '.join('{}{}'.format(value, name)
# for name, value in periods
# if value)
# return time_string
# def progressBar(step, total_steps, width=30):
# progress = np.round(width * step / total_steps).astype(np.int32)
# progressbar = "0% [" + "#"*(progress) + "="*(width-progress) + "] 100%"
# return progressbar
"""
Class that holds 2D data
"""
class CudaArray2D:
"""
Uploads initial data to the CUDA device
"""
def __init__(self, stream, nx, ny, x_halo, y_halo, cpu_data=None, dtype=np.float32):
self.logger = logging.getLogger(__name__)
self.nx = nx
self.ny = ny
self.x_halo = x_halo
self.y_halo = y_halo
nx_halo = nx + 2*x_halo
ny_halo = ny + 2*y_halo
#self.logger.debug("Allocating [%dx%d] buffer", self.nx, self.ny)
#Should perhaps use pycuda.driver.mem_alloc_data.pitch() here
#Initialize an array on GPU with zeros
#self.data = pycuda.gpuarray.zeros((ny_halo, nx_halo), dtype)
#data.strides[0] == nx_halo*np.float32().itemsize
#data.strides[1] == np.float32().itemsize
num_bytes = ny_halo*nx_halo * np.float32().itemsize
#data_h = np.zeros((ny_halo, nx_halo), dtype)
# init device array and upload host data
self.data = hip_check(hip.hipMalloc(num_bytes)).configure(
typestr="float32",shape=(ny_halo, nx_halo))
#num_bytes = ny*nx * np.float32().itemsize
#cpu_data = hip_check(hip.hipHostMalloc(num_bytes,hip.hipHostMallocPortable))
# copy data from host to device
#hip_check(hip.hipMemcpy(self.data,data_h,num_bytes,hip.hipMemcpyKind.hipMemcpyHostToDevice))
#https://rocm.docs.amd.com/projects/hip-python/en/latest/python_api/hip.html#hip.hip.hipMemPoolCreate
#self.memorypool = PageLockedMemoryPool()
#If we don't have any data, just allocate and return
if cpu_data is None:
return
#Make sure data is in proper format
assert cpu_data.shape == (ny_halo, nx_halo) or cpu_data.shape == (self.ny, self.nx), "Wrong shape of data %s vs %s / %s" % (str(cpu_data.shape), str((self.ny, self.nx)), str((ny_halo, nx_halo)))
assert cpu_data.itemsize == 4, "Wrong size of data type"
assert not np.isfortran(cpu_data), "Wrong datatype (Fortran, expected C)"
#Create copy object from host to device
x = (nx_halo - cpu_data.shape[1]) // 2
y = (ny_halo - cpu_data.shape[0]) // 2
self.upload(stream, cpu_data, extent=[x, y, cpu_data.shape[1], cpu_data.shape[0]])
#self.logger.debug("Buffer <%s> [%dx%d]: Allocated ", int(self.data.gpudata), self.nx, self.ny)
def __del__(self, *args):
#self.logger.debug("Buffer <%s> [%dx%d]: Releasing ", int(self.data.gpudata), self.nx, self.ny)
#self.data.gpudata.free()
#self.logger.debug("Buffer <%s> [%dx%d]: Releasing ", int(self.data), self.nx, self.ny)
hip_check(hip.hipFree(self.data))
#hip_check(hip.hipFreeAsync(self.data, self.stream))
self.data = None
"""
Enables downloading data from GPU to Python
"""
def download(self, stream, cpu_data=None, asynch=False, extent=None):
if (extent is None):
x = self.x_halo
y = self.y_halo
nx = self.nx
ny = self.ny
else:
x, y, nx, ny = extent
if (cpu_data is None):
#self.logger.debug("Downloading [%dx%d] buffer", self.nx, self.ny)
#Allocate host memory
#The following fails, don't know why (crashes python)
#allocate a pinned (page-locked) memory array
#cpu_data = cuda.pagelocked_empty((int(ny), int(nx)), dtype=np.float32, mem_flags=cuda.host_alloc_flags.PORTABLE)
#see here type of memory: https://rocm.docs.amd.com/projects/hip-python/en/latest/python_api/hip.html#hip.hip.hipMemoryType
cpu_data = np.zeros((ny, nx), dtype=np.float32)
#num_bytes = cpu_data.size * cpu_data.itemsize
#hipHostMalloc allocates pinned host memory which is mapped into the address space of all GPUs in the system, the memory can #be accessed directly by the GPU device
#hipHostMallocDefault:Memory is mapped and portable (default allocation)
#hipHostMallocPortable: memory is explicitely portable across different devices
#cpu_data = hip_check(hip.hipHostMalloc(num_bytes,hip.hipHostMallocPortable))
#Non-pagelocked: cpu_data = np.empty((ny, nx), dtype=np.float32)
#cpu_data = self.memorypool.allocate((ny, nx), dtype=np.float32)
assert nx == cpu_data.shape[1]
assert ny == cpu_data.shape[0]
assert x+nx <= self.nx + 2*self.x_halo
assert y+ny <= self.ny + 2*self.y_halo
#Cuda
"""
#Create copy object from device to host
copy = cuda.Memcpy2D()
copy.set_src_device(self.data.gpudata)
copy.set_dst_host(cpu_data)
#Set offsets and pitch of source
copy.src_x_in_bytes = int(x)*self.data.strides[1]
copy.src_y = int(y)
copy.src_pitch = self.data.strides[0]
#Set width in bytes to copy for each row and
#number of rows to copy
copy.width_in_bytes = int(nx)*cpu_data.itemsize
copy.height = int(ny)
"""
#copy from device to host
#host_array_pinned = hip_check(hip.hipHostMalloc(cpu_data.size * cpu_data.itemsize, hip.hipHostMallocDefault))
#device_pointer = hip_check(hip.hipHostGetDevicePointer(host_array_pinned,hip.hipHostMallocDefault))
copy_download = {
'srcXInBytes': int(x)*np.float32().itemsize,
'srcY': int(y),
'srcMemoryType': hip.hipMemoryType.hipMemoryTypeDevice,#hipMemoryTypeManaged
'srcDevice': self.data,
'srcPitch': self.data.shape[0]*np.float32().itemsize,
'dstXInBytes': 0,
'dstY': 0,
'dstMemoryType': hip.hipMemoryType.hipMemoryTypeHost,
'dstHost': cpu_data, #device_pointer,
'dstPitch': cpu_data.strides[0],
'WidthInBytes': int(nx)*cpu_data.itemsize,
'Height': int(ny)
}
# Perform the copy back to host
Copy = hip.hip_Memcpy2D(**copy_download)
#err = hip.hipMemcpyParam2D(Copy)
err = hip.hipMemcpyParam2DAsync(Copy, stream)
if err is None:
print("--download - DtoH: Failed to copy 2D data to Host")
print("--I stop:", err)
exit()
#copy(stream)
if asynch==False:
#stream.synchronize()
hip_check(hip.hipStreamSynchronize(stream))
return cpu_data
def upload(self, stream, cpu_data, extent=None):
if (extent is None):
x = self.x_halo
y = self.y_halo
nx = self.nx
ny = self.ny
else:
x, y, nx, ny = extent
assert(nx == cpu_data.shape[1])
assert(ny == cpu_data.shape[0])
assert(x+nx <= self.nx + 2*self.x_halo)
assert(y+ny <= self.ny + 2*self.y_halo)
#Cuda
"""
#Create copy object from device to host
#Well this copy from src:host to dst:device AND NOT from device to host
copy = cuda.Memcpy2D()
copy.set_dst_device(self.data.gpudata)
copy.set_src_host(cpu_data)
#Set offsets and pitch of source
copy.dst_x_in_bytes = int(x)*self.data.strides[1]
copy.dst_y = int(y)
copy.dst_pitch = self.data.strides[0]
#Set width in bytes to copy for each row and
#number of rows to copy
copy.width_in_bytes = int(nx)*cpu_data.itemsize
copy.height = int(ny)
"""
#Copy from host to device
#host_array_pinned = hip_check(hip.hipHostMalloc(cpu_data.size * cpu_data.itemsize, hip.hipHostMallocDefault))
#device_pointer = hip_check(hip.hipHostGetDevicePointer(host_array_pinned,hip.hipHostMallocDefault))
copy_upload = {
'srcXInBytes': 0,
'srcY': 0,
'srcMemoryType': hip.hipMemoryType.hipMemoryTypeHost,
'srcHost': cpu_data, #device_pointer
'srcPitch': cpu_data.strides[0], # assuming float32 (4 bytes)
'dstXInBytes': int(x)*np.float32().itemsize,
'dstY': int(y),
'dstMemoryType': hip.hipMemoryType.hipMemoryTypeDevice, #hipMemoryTypeManaged
'dstDevice': self.data,
'dstPitch': self.data.shape[0]*np.float32().itemsize,
'WidthInBytes': int(nx)*cpu_data.itemsize,
'Height': int(ny)
}
# Perform the copy HtoD
Copy = hip.hip_Memcpy2D(**copy_upload)
#err = hip.hipMemcpyParam2D(Copy)
err = hip.hipMemcpyParam2DAsync(Copy, stream)
if err is None:
print("--Upload - HtoD: Failed to copy 2D data to Device")
print("--I stop:", err)
exit()
#copy(stream)
"""
Class that holds 2D data
"""
class CudaArray3D:
"""
Uploads initial data to the CL device
"""
def __init__(self, stream, nx, ny, nz, x_halo, y_halo, z_halo, cpu_data=None, dtype=np.float32):
self.logger = logging.getLogger(__name__)
self.nx = nx
self.ny = ny
self.nz = nz
self.x_halo = x_halo
self.y_halo = y_halo
self.z_halo = z_halo
nx_halo = nx + 2*x_halo
ny_halo = ny + 2*y_halo
nz_halo = nz + 2*z_halo
#self.logger.debug("Allocating [%dx%dx%d] buffer", self.nx, self.ny, self.nz)
#Should perhaps use pycuda.driver.mem_alloc_data.pitch() here
#self.data = pycuda.gpuarray.zeros((nz_halo, ny_halo, nx_halo), dtype)
"""
num_bytes = nz_halo*ny_halo*nx_halo * np.float32().itemsize
# init device array and upload host data
self.data = hip_check(hip.hipMalloc(num_bytes)).configure(
typestr="float32",shape=(nz_halo, ny_halo, nx_halo))
"""
#For returning to download
#self.memorypool = PageLockedMemoryPool()
#If we don't have any data, just allocate and return
if cpu_data is None:
return
#Make sure data is in proper format
assert cpu_data.shape == (nz_halo, ny_halo, nx_halo) or cpu_data.shape == (self.nz, self.ny, self.nx), "Wrong shape of data %s vs %s / %s" % (str(cpu_data.shape), str((self.nz, self.ny, self.nx)), str((nz_halo, ny_halo, nx_halo)))
assert cpu_data.itemsize == 4, "Wrong size of data type"
assert not np.isfortran(cpu_data), "Wrong datatype (Fortran, expected C)"
#Cuda
"""
#Create copy object from host to device
copy = cuda.Memcpy3D()
copy.set_src_host(cpu_data)
copy.set_dst_device(self.data.gpudata)
#Set offsets of destination
x_offset = (nx_halo - cpu_data.shape[2]) // 2
y_offset = (ny_halo - cpu_data.shape[1]) // 2
z_offset = (nz_halo - cpu_data.shape[0]) // 2
copy.dst_x_in_bytes = x_offset*self.data.strides[1]
copy.dst_y = y_offset
copy.dst_z = z_offset
#Set pitch of destination
copy.dst_pitch = self.data.strides[0]
#Set width in bytes to copy for each row and
#number of rows to copy
width = max(self.nx, cpu_data.shape[2])
height = max(self.ny, cpu_data.shape[1])
depth = max(self.nz, cpu-data.shape[0])
copy.width_in_bytes = width*cpu_data.itemsize
copy.height = height
copy.depth = depth
#Perform the copy
copy(stream)
#self.logger.debug("Buffer <%s> [%dx%d]: Allocated ", int(self.data.gpudata), self.nx, self.ny)
"""
#copy from host to device
#src
host_array_pinned = hip_check(hip.hipHostMalloc(cpu_data.size * cpu_data.itemsize, hip.hipHostMallocDefault))
src_ptr = hip_check(hip.hipHostGetDevicePointer(host_array_pinned,hip.hipHostMallocDefault))
#src_ptr = hip.hipPitchedPtr()
#dst
# Allocate 3D pitched memory on the device
self.data = hip.hipPitchedPtr()
c_extent = hip.hipExtent(nx_halo*np.float32().itemsize, ny_halo, nz_halo)
#hip.hipMalloc3D(pitchedDevPtr-OUT, extent-IN)
err, = hip.hipMalloc3D(self.data, c_extent)
dst_pitch = nx_halo * np.float32().itemsize
#include offset: do we need make_hipPitchedPtr
x_offset = (nx_halo - cpu_data.shape[2]) // 2
y_offset = (ny_halo - cpu_data.shape[1]) // 2
z_offset = (nz_halo - cpu_data.shape[0]) // 2
if err != hip.hipError_t.hipSuccess:
raise RuntimeError(f"Error from hipMalloc3D: {hip.hipGetErrorString(err)}")
copy_upload = {
'srcPos': hip.hipPos(0, 0, 0),
'srcPtr': src_ptr,
'dstPos': hip.hipPos(0, 0, 0),
'dstPtr': self.data,
'extent': c_extent,
'kind': hip.hipMemcpyKind.hipMemcpyHostToDevice
}
# Perform the copy
copy = hip.hipMemcpy3DParms(**copy_upload)
err = hip.hipMemcpy3DAsync(copy, stream)
#copy = hip_check(hip.hipMemcpyAsync(self.data,cpu_data,num_bytes,hip.hipMemcpyKind.hipMemcpyHostToDevice,stream))
#self.logger.debug("Buffer <%s> [%dx%d]: Allocated ", int(self.data), self.nx, self.ny)
def __del__(self, *args):
#self.logger.debug("Buffer <%s> [%dx%d]: Releasing ", int(self.data.gpudata), self.nx, self.ny)
#self.logger.debug("Buffer <%s> [%dx%d]: Releasing ", int(self.data), self.nx, self.ny)
#self.data.gpudata.free()
hip_check(hip.hipFree(self.data))
#hip_check(hip.hipFreeAsync(self.data, self.stream))
self.data = None
"""
Enables downloading data from GPU to Python
"""
def download(self, stream, asynch=False):
#self.logger.debug("Downloading [%dx%d] buffer", self.nx, self.ny)
#Allocate host memory
#cpu_data = cuda.pagelocked_empty((self.ny, self.nx), np.float32)
cpu_data = np.empty((self.nz, self.ny, self.nx), dtype=np.float32)
#cpu_data = self.memorypool.allocate((self.nz, self.ny, self.nx), dtype=np.float32)
#Cuda
"""
#Create copy object from device to host
copy = cuda.Memcpy2D()
copy.set_src_device(self.data.gpudata)
copy.set_dst_host(cpu_data)
#Set offsets and pitch of source
copy.src_x_in_bytes = self.x_halo*self.data.strides[1]
copy.src_y = self.y_halo
copy.src_z = self.z_halo
copy.src_pitch = self.data.strides[0]
#Set width in bytes to copy for each row and
#number of rows to copy
copy.width_in_bytes = self.nx*cpu_data.itemsize
copy.height = self.ny
copy.depth = self.nz
copy(stream)
"""
#copy from device to host
num_bytes = cpu_data.size * cpu_data.itemsize
#hip.hipMemcpy(dst, src, unsigned long sizeBytes, kind)
copy = hip_check(hip.hipMemcpyAsync(cpu_data,self.data,num_bytes,hip.hipMemcpyKind.hipMemcpyDeviceToHost,stream))
if asynch==False:
#stream.synchronize()
hip_check(hip.hipStreamSynchronize(stream))
return cpu_data
"""
A class representing an Arakawa A type (unstaggered, logically Cartesian) grid
"""
class ArakawaA2D:
def __init__(self, stream, nx, ny, halo_x, halo_y, cpu_variables):
"""
Uploads initial data to the GPU device
"""
self.logger = logging.getLogger(__name__)
self.gpu_variables = []
for cpu_variable in cpu_variables:
self.gpu_variables += [CudaArray2D(stream, nx, ny, halo_x, halo_y, cpu_variable)]
def __getitem__(self, key):
assert type(key) == int, "Indexing is int based"
if (key > len(self.gpu_variables) or key < 0):
raise IndexError("Out of bounds")
return self.gpu_variables[key]
def download(self, stream, variables=None):
"""
Enables downloading data from the GPU device to Python
"""
if variables is None:
variables=range(len(self.gpu_variables))
cpu_variables = []
for i in variables:
assert i < len(self.gpu_variables), "Variable {:d} is out of range".format(i)
cpu_variables += [self.gpu_variables[i].download(stream, asynch=True)]
#print("--FIN: sum:", np.array(cpu_variables).sum())
#stream.synchronize()
hip_check(hip.hipStreamSynchronize(stream))
return cpu_variables
#hipblas
def sum_hipblas(self, num_elements, data):
num_bytes_r = np.dtype(np.float32).itemsize
result_d = hip_check(hip.hipMalloc(num_bytes_r))
result_h0 = np.zeros(1, dtype=np.float32)
# call hipblasSaxpy + initialization
handle = hip_check(hipblas.hipblasCreate())
#hip_check(hipblas.hipblasSaxpy(handle, num_elements, ctypes.addressof(alpha), x_d, 1, y_d, 1))
#"incx" [int] specifies the increment for the elements of x. incx must be > 0.
hip_check(hipblas.hipblasSasum(handle, num_elements, data, 1, result_d))
# destruction of handle
hip_check(hipblas.hipblasDestroy(handle))
# copy result (stored in result_d) back to host (store in result_h)
hip_check(hip.hipMemcpy(result_h0,result_d,num_bytes_r,hip.hipMemcpyKind.hipMemcpyDeviceToHost))
result_h = result_h0[0]
# clean up
#hip_check(hip.hipFree(data))
return result_h
def check(self):
"""
Checks that data is still sane
"""
for i, gpu_variable in enumerate(self.gpu_variables):
#compute sum with hipblas
#var_sum = pycuda.gpuarray.sum(gpu_variable.data).get()
var_sum = self.sum_hipblas(gpu_variable.data.size,gpu_variable.data)
#print(f"GPU: Sum for column {i}: {var_sum}")
self.logger.debug("Data %d with size [%d x %d] has average %f", i, gpu_variable.nx, gpu_variable.ny, var_sum / (gpu_variable.nx * gpu_variable.ny))
assert np.isnan(var_sum) == False, "Data contains NaN values!"