FiniteVolumeGPU/mpi_testing_cuda.py
2025-07-03 14:55:34 +02:00

202 lines
5.8 KiB
Python

# -*- coding: utf-8 -*-
"""
This python module implements MPI simulations for benchmarking
Copyright (C) 2018 SINTEF ICT
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import argparse
import numpy as np
import gc
import time
import json
import logging
import os
# MPI
from mpi4py import MPI
# CUDA
import pycuda.driver as cuda
# Simulator engine etc
from GPUSimulators.mpi import MPISimulator, MPIGrid
from GPUSimulators.common import run_simulation, get_git_hash, get_git_status
from GPUSimulators.gpu import KernelContext
from GPUSimulators.model import EE2DKP07Dimsplit
from GPUSimulators.helpers import initial_conditions as IC
parser = argparse.ArgumentParser(description='Strong and weak scaling experiments.')
parser.add_argument('-nx', type=int, default=128)
parser.add_argument('-ny', type=int, default=128)
parser.add_argument('--profile', action='store_true') # default: False
args = parser.parse_args()
if args.profile:
profiling_data = {}
# profiling: total run time
t_total_start = time.time()
t_init_start = time.time()
# Get MPI COMM to use
comm = MPI.COMM_WORLD
####
# Initialize logging
####
log_level_console = 20
log_level_file = 10
log_filename = 'mpi_' + str(comm.rank) + '.log'
logger = logging.getLogger('GPUSimulators')
logger.setLevel(min(log_level_console, log_level_file))
ch = logging.StreamHandler()
ch.setLevel(log_level_console)
logger.addHandler(ch)
logger.info(f"Console logger using level {logging.getLevelName(log_level_console)}")
fh = logging.FileHandler(log_filename)
formatter = logging.Formatter(
'%(asctime)s:%(name)s:%(levelname)s: %(message)s')
fh.setFormatter(formatter)
fh.setLevel(log_level_file)
logger.addHandler(fh)
logger.info(f"File logger using level {logging.getLevelName(log_level_file)} to {log_filename}")
####
# Initialize MPI grid etc
####
logger.info("Creating MPI grid")
grid = MPIGrid(MPI.COMM_WORLD)
####
# Initialize CUDA
####
cuda.init(flags=0)
logger.info("Initializing CUDA")
local_rank = grid.get_local_rank()
num_cuda_devices = cuda.Device.count()
cuda_device = local_rank % num_cuda_devices
logger.info(f"Process {str(local_rank)} using CUDA device {str(cuda_device)}")
cuda_context = KernelContext(device=cuda_device, autotuning=False)
####
# Set initial conditions
####
# DEBUGGING - setting random seed
np.random.seed(42)
logger.info("Generating initial conditions")
nx = args.nx
ny = args.ny
dt = 0.000001
gamma = 1.4
# save_times = np.linspace(0, 0.000009, 2)
# save_times = np.linspace(0, 0.000099, 11)
# save_times = np.linspace(0, 0.000099, 2)
save_times = np.linspace(0, 0.0000999, 2)
outfile = "mpi_out_" + str(MPI.COMM_WORLD.rank) + ".nc"
save_var_names = ['rho', 'rho_u', 'rho_v', 'E']
arguments = IC.gen_kelvin_helmholtz(nx, ny, gamma, grid=grid)
arguments['context'] = cuda_context
arguments['theta'] = 1.2
arguments['grid'] = grid
if args.profile:
t_init_end = time.time()
t_init = t_init_end - t_init_start
profiling_data["t_init"] = t_init
####
# Run simulation
####
logger.info("Running simulation")
# Helper function to create MPI simulator
def genSim(grid, **kwargs):
local_sim = EE2DKP07Dimsplit(**kwargs)
sim = MPISimulator(local_sim, grid)
return sim
outfile, sim_runner_profiling_data, sim_profiling_data = run_simulation(
genSim, arguments, outfile, save_times, save_var_names, dt)
if args.profile:
t_total_end = time.time()
t_total = t_total_end - t_total_start
profiling_data["t_total"] = t_total
print(f"Total run time on rank {str(MPI.COMM_WORLD.rank)} is {str(t_total)} s")
# write profiling to JSON file
if args.profile and MPI.COMM_WORLD.rank == 0:
job_id = ""
if "SLURM_JOB_ID" in os.environ:
job_id = int(os.environ["SLURM_JOB_ID"])
allocated_nodes = int(os.environ["SLURM_JOB_NUM_NODES"])
allocated_gpus = int(os.environ["CUDA_VISIBLE_DEVICES"].count(",") + 1)
profiling_file = "MPI_jobid_" + \
str(job_id) + "_" + str(allocated_nodes) + "_nodes_and_" + str(
allocated_gpus) + "_GPUs_profiling.json"
profiling_data["outfile"] = outfile
else:
profiling_file = "MPI_" + str(MPI.COMM_WORLD.size) + "_procs_and_" + str(
num_cuda_devices) + "_GPUs_profiling.json"
for stage in sim_runner_profiling_data["start"].keys():
profiling_data[stage] = sim_runner_profiling_data["end"][stage] - sim_runner_profiling_data["start"][stage]
for stage in sim_profiling_data["start"].keys():
profiling_data[stage] = sim_profiling_data["end"][stage] - sim_profiling_data["start"][stage]
profiling_data["nx"] = nx
profiling_data["ny"] = ny
profiling_data["dt"] = dt
profiling_data["n_time_steps"] = sim_profiling_data["n_time_steps"]
profiling_data["slurm_job_id"] = job_id
profiling_data["n_cuda_devices"] = str(num_cuda_devices)
profiling_data["n_processes"] = str(MPI.COMM_WORLD.size)
profiling_data["git_hash"] = get_git_hash()
profiling_data["git_status"] = get_git_status()
with open(profiling_file, "w") as write_file:
json.dump(profiling_data, write_file)
####
# Clean shutdown
####
sim = None
local_sim = None
cuda_context = None
arguments = None
logging.shutdown()
gc.collect()
####
# Print completion and exit
####
print("Completed!")
exit(0)