Source code for esub.utils

# Copyright (C) 2019 ETH Zurich,
# Institute for Particle Physics and Astrophysics
# Author: Dominik Zuercher, adapted by Silvan Fischbacher

import datetime
import math
import multiprocessing
import os
import shlex
import shutil
import subprocess
import sys
import time
from functools import partial

import numpy as np
import portalocker
from ekit import logger as logger_utils

LOGGER = logger_utils.init_logger(__name__)
TIMEOUT_MESSAGE = (
    "Maximum number of pending jobs reached, will sleep for 30 minutes and retry"
)


[docs]def decimal_hours_to_str(dec_hours): """Transforms decimal hours into the hh:mm format :param dec_hours: decimal hours, float or int :return: string in the format hh:mm """ full_hours = math.floor(dec_hours) minutes = math.ceil((dec_hours - full_hours) * 60) if minutes == 60: full_hours += 1 minutes = 0 if minutes < 10: time_str = "{}:0{}".format(full_hours, minutes) else: time_str = "{}:{}".format(full_hours, minutes) return time_str
[docs]def make_resource_string( function, main_memory, main_time, main_scratch, main_n_cores, main_gpu, main_gpu_memory, watchdog_memory, watchdog_time, watchdog_scratch, watchdog_n_cores, watchdog_gpu, watchdog_gpu_memory, merge_memory, merge_time, merge_scratch, merge_n_cores, merge_gpu, merge_gpu_memory, system, verbosity=3, ): """ Creates the part of the submission string which handles the allocation of ressources :param function: The name of the function defined in the executable that will be submitted :param main_memory: Memory per core to allocate for the main job :param main_time: The Wall time requested for the main job :param main_scratch: Scratch per core to allocate for the main job :param main_n_cores: Number of cores to allocate for the main job :param main_gpu: Number of GPUs to allocate for the main job :param main_gpu_memory: Memory per GPU to allocate for the main job :param watchdog_memory: Memory per core to allocate for the watchdog job :param watchdog_time: The Wall time requested for the watchdog job :param watchdog_scratch: Scratch to allocate for the watchdog job :param watchdog_n_cores: Number of cores to allocate for the watchdog job :param watchdog_gpu: Number of GPUs to allocate for the watchdog job :param watchdog_gpu_memory: Memory per GPU to allocate for the watchdog job :param merge_memory: Memory per core to allocate for the merge job :param merge_time: The Wall time requested for the merge job :param merge_scratch: Scratch to allocate for the merge jo :param merge_n_cores: Number of cores to allocate for the merge job :param merge_gpu: Number of GPUs to allocate for the merge job :param merge_gpu_memory: Memory per GPU to allocate for the merge job :param system: The type of the queing system of the cluster :param verbosity: Verbosity level (0 - 4). :return: A string that is part of the submission string and the gpu string that needs to added to srun """ logger_utils.set_logger_level(LOGGER, verbosity) gpu_cmd = "" if function == "main": mem = main_memory time = main_time scratch = main_scratch n_cores = main_n_cores gpu = main_gpu gpu_memory = main_gpu_memory elif function == "watchdog": mem = watchdog_memory time = watchdog_time scratch = watchdog_scratch n_cores = watchdog_n_cores gpu = watchdog_gpu gpu_memory = watchdog_gpu_memory elif function == "merge": mem = merge_memory time = merge_time scratch = merge_scratch n_cores = merge_n_cores gpu = merge_gpu gpu_memory = merge_gpu_memory elif function == "rerun_missing": mem = main_memory time = main_time scratch = main_scratch n_cores = main_n_cores gpu = main_gpu gpu_memory = main_gpu_memory elif function == "merge_log_files": mem = main_memory time = 4 scratch = main_scratch n_cores = 1 gpu = 0 gpu_memory = 0 if system == "bsub": resource_string = ( "-W {} -R rusage[mem={}] " "-R rusage[scratch={}] " "-n {}".format( decimal_hours_to_str(time), mem, scratch, n_cores ) ) elif system == "slurm": resource_string = ( "#SBATCH --time={}:00 \n" "#SBATCH --mem-per-cpu={} \n" "#SBATCH --tmp={} \n" "#SBATCH --cpus-per-task={} \n".format( decimal_hours_to_str(time), int(mem), int(scratch), n_cores ) ) if gpu > 0: resource_string += "#SBATCH --gpus={} \n".format(gpu) gpumem_in_gb = gpu_memory / 1024 resource_string += f"#SBATCH --gres=gpumem:{gpumem_in_gb:.0f}g \n" # slurm needs the gpus and gres argument to be passed to srun and sbatch gpu_cmd = f"--gpus={gpu} --gres=gpumem:{gpumem_in_gb:.0f}g " resource_string += "\n" elif system == "daint": resource_string = "#SBATCH --time={}:00 \n#SBATCH --mem={} \n \n".format( decimal_hours_to_str(time), mem ) # TODO: local scratch for slurm if scratch > 0: LOGGER.warning( "Not Implemented Warning: Automatic local scratch " "allocation not supported for DAINT system. Ignoring." ) LOGGER.debug(f"Built resource string as {resource_string}") return resource_string, gpu_cmd
[docs]def get_log_filenames(log_dir, job_name, function, system="bsub"): """ Builds the filenames of the stdout and stderr log files for a given job name and a given function to run. :param log_dir: directory where the logs are stored :param job_name: Name of the job that will write to the log files :param function: Function that will be executed :param system: The type of the queing system of the cluster :return: filenames for stdout and stderr logs """ job_name_ext = job_name + "_" + function if system == "slurm" and (function == "main" or function == "main_r"): stdout_log = os.path.join(log_dir, "{}_index%a.o".format(job_name_ext)) stderr_log = os.path.join(log_dir, "{}_index%a.e".format(job_name_ext)) else: stdout_log = os.path.join(log_dir, "{}.o".format(job_name_ext)) stderr_log = os.path.join(log_dir, "{}.e".format(job_name_ext)) return stdout_log, stderr_log
[docs]def get_source_cmd(source_file, verbosity=3): """ Builds the command to source a given file if the file exists, otherwise returns an empty string. :param source_file: path to the (possibly non-existing) source file, can be relative and can contain "~" :param verbosity: Verbosity level (0 - 4). :return: command to source the file if it exists or empty string """ logger_utils.set_logger_level(LOGGER, verbosity) source_file_abs = os.path.abspath(os.path.expanduser(source_file)) if os.path.isfile(source_file_abs): source_cmd = "source {}; ".format(source_file_abs) LOGGER.debug(f"Running source script at {source_file_abs}") else: LOGGER.warning("Source file {} not found, skipping".format(source_file)) source_cmd = "" return source_cmd
[docs]def get_dependency_string(function, jobids, ext_dependencies, system, verbosity=3): """ Constructs the dependency string which handles which other jobs this job is dependent on. :param function: The type o function to submit :param jobids: Dictionary of the jobids for each job already submitted :param ext_dependencies: If external dependencies are given they get added to the dependency string (this happens if epipe is used) :param system: The type of the queing system of the cluster :param verbosity: Verbosity level (0 - 4). :return: A sting which is used as a substring for the submission string and it handles the dependencies of the job """ logger_utils.set_logger_level(LOGGER, verbosity) dep_string = "" # no dependencies for main if function == "main": if ext_dependencies != "": dep_string = '-w "' + ext_dependencies + '"' else: dep_string = "" return dep_string if system == "slurm": dep_string = dep_string.replace("ended(", "afterany:") dep_string = dep_string.replace("started(", "after:") dep_string = dep_string.replace(") && ", ",") dep_string = dep_string.replace(")", "") dep_string = dep_string.replace('-w "', '--dependency="') return dep_string # watchdog starts along with main elif function == "watchdog": if "main" in jobids.keys(): for id in jobids["main"]: dep_string += "{}({}) && ".format("started", id) else: LOGGER.warning( "Function {} has not been submitted -> Skipping " "in dependencies for {}".format("main", function) ) # rerun missing starts after main elif function == "rerun_missing": if "main" in jobids.keys(): for id in jobids["main"]: dep_string += "{}({}) && ".format("ended", id) else: LOGGER.warning( "Function {} has not been submitted -> Skipping " "in dependencies for {}".format("main", function) ) # merge_log_files starts after main or rerun_missing elif function == "merge_log_files": if "rerun_missing" in jobids.keys(): for id in jobids["rerun_missing"]: dep_string += "{}({}) && ".format("ended", id) elif "main" in jobids.keys(): for id in jobids["main"]: dep_string += "{}({}) && ".format("ended", id) # merge starts after all the others elif function == "merge": if "main" in jobids.keys(): for id in jobids["main"]: dep_string += "{}({}) && ".format("ended", id) else: LOGGER.warning( "Function {} has not been submitted -> Skipping " "in dependencies for {}".format("main", function) ) if "watchdog" in jobids.keys(): for id in jobids["watchdog"]: dep_string += "{}({}) && ".format("ended", id) else: LOGGER.warning( "Function {} has not been submitted -> Skipping " "in dependencies for {}".format("watchdog", function) ) if "rerun_missing" in jobids.keys(): for id in jobids["rerun_missing"]: dep_string += "{}({}) && ".format("ended", id) else: LOGGER.warning( "Function {} has not been submitted -> Skipping " "in dependencies for {}".format("rerun_missing", function) ) else: raise ValueError("Dependencies for function" " {} not defined".format(function)) # remove trailing && if len(dep_string) > 0: dep_string = dep_string[:-4] if ext_dependencies != "": dep_string = dep_string + " && " + ext_dependencies # remove leading && if dep_string[:4] == " && ": dep_string = dep_string[4:] dep_string = '-w "' + dep_string + '"' if system == "slurm": dep_string = dep_string.replace("ended(", "afterany:") dep_string = dep_string.replace("started(", "after:") dep_string = dep_string.replace(") && ", ",") dep_string = dep_string.replace(")", "") dep_string = dep_string.replace('-w "', '--dependency="') if len(dep_string) > 0: LOGGER.debug(f"Built dependency string as {dep_string}") return dep_string
[docs]def make_cmd_string( function, source_file, n_jobs, tasks, mode, job_name, function_args, exe, main_memory, main_time, main_scratch, main_n_cores, main_gpu, main_gpu_memory, rerun_missing_memory, rerun_missing_time, rerun_missing_scratch, rerun_missing_n_cores, watchdog_time, watchdog_memory, watchdog_scratch, watchdog_n_cores, watchdog_gpu, watchdog_gpu_memory, merge_memory, merge_time, merge_scratch, merge_n_cores, merge_gpu, merge_gpu_memory, log_dir, dependency, system, main_name="main", batchsize=100000, max_njobs=-1, add_args="", add_bsub="", discard_output=False, verbosity=3, main_mode="jobarray", # main_n_cores=1, nodes=1, MPI_tasks_per_core=1, MPI_tasks_per_node=1, OpenMP_threads_per_task=1, ): """ Creates the submission string which gets submitted to the queing system :param function: The name of the function defined in the executable that will be submitted :param source_file: A file which gets executed before running the actual function(s) :param n_jobs: The number of jobs that will be requested for the job :param tasks: The task string, which will get parsed into the job indices :param mode: The mode in which the job will be ran (MPI-job or as a jobarray) :param job_name: The name of the job :param function_args: The remaining arguments that will be forwarded to the executable :param exe: The path of the executable :param main_memory: Memory per core to allocate for the main job :param main_time: The Wall time requested for the main job :param main_scratch: Scratch per core to allocate for the main job :param main_n_cores: Number of cores to allocate for the main job :param main_gpu: Number of GPUs to allocate for the main job :param main_gpu_memory: Memory per GPU to allocate for the main job :param rerun_missing_memory: Memory per core to allocate for the rerun job :param rerun_missing_time: The Wall time requested for the rerun job :param rerun_missing_scratch: Scratch per core to allocate for the rerun job :param rerun_missing_n_cores: Number of cores to allocate for the rerun job :param watchdog_memory: Memory per core to allocate for the watchdog job :param watchdog_time: The Wall time requested for the watchdog job :param watchdog_scratch: Scratch to allocate for the watchdog job :param watchdog_n_cores: Number of cores to allocate for the watchdog job :param watchdog_gpu: Number of GPUs to allocate for the watchdog job :param watchdog_gpu_memory: Memory per GPU to allocate for the watchdog job :param merge_memory: Memory per core to allocate for the merge job :param merge_time: The Wall time requested for the merge job :param merge_scratch: Scratch to allocate for the merge job :param merge_n_cores: Number of cores to allocate for the merge job :param merge_gpu: Number of GPUs to allocate for the merge job :param merge_gpu_memory: Memory per GPU to allocate for the merge job :param log_dir: log_dir: The path to the log directory :param dependency: The dependency string :param system: The type of the queing system of the cluster :param main_name: name of the main function :param batchsize: If not zero the jobarray gets divided into batches. :param max_njobs: Maximum number of jobs allowed to run at the same time. :param add_args: Additional cluster-specific arguments :param add_bsub: Additional bsub arguments to pass :param discard_output: If True writes stdout/stderr to /dev/null :param verbosity: Verbosity level (0 - 4). :return: The submission string that wil get submitted to the cluster """ logger_utils.set_logger_level(LOGGER, verbosity) # rerun_missing should use the same resources as main, but can be overwritten # by the rerun_ arguments if (function == "rerun_missing") | (function == "main_rerun"): if rerun_missing_memory is not None: main_memory = rerun_missing_memory if rerun_missing_time is not None: main_time = rerun_missing_time if rerun_missing_scratch is not None: main_scratch = rerun_missing_scratch if rerun_missing_n_cores is not None: main_n_cores = rerun_missing_n_cores if function == "main_rerun": function = "main" if system == "slurm": # reruns should use different log files to avoid overwriting log_function = "main_r" else: log_function = "main" else: log_function = function # allocate computing resources resource_string, gpu_cmd = make_resource_string( function, main_memory, main_time, main_scratch, main_n_cores, main_gpu, main_gpu_memory, watchdog_memory, watchdog_time, watchdog_scratch, watchdog_n_cores, watchdog_gpu, watchdog_gpu_memory, merge_memory, merge_time, merge_scratch, merge_n_cores, merge_gpu, merge_gpu_memory, system, verbosity, ) # get the job name for the submission system and the log files job_name_ext = job_name + "_" + function stdout_log, stderr_log = get_log_filenames(log_dir, job_name, log_function, system) # construct the string of arguments passed to the executable args_string = "" for arg in function_args: args_string += arg + " " # make submission string source_cmd = get_source_cmd(source_file, verbosity) if mode == "mpi": run_cmd = "mpirun python" elif mode == "jobarray": run_cmd = "python" else: raise ValueError(f"Run mode {mode} is not known") extra_args_string = ( "--source_file={} --main_memory={} --main_time={} " "--main_scratch={} --function={} " "--executable={} --n_jobs={} " "--log_dir={} --system={} " "--main_name={} --batchsize={} --max_njobs={} " "--main_n_cores={} --main_gpu={} --main_gpu_memory={} " '--esub_verbosity={} --main_mode={} --mode={} {}"'.format( source_file, main_memory, main_time, main_scratch, function, exe, n_jobs, log_dir, system, main_name, batchsize, max_njobs, main_n_cores, main_gpu, main_gpu_memory, verbosity, main_mode, mode, args_string, ) ) if (function == "main") & (max_njobs > 0): max_string = "%{}".format(max_njobs) else: max_string = "" if system == "bsub": if n_jobs <= batchsize: cmd_string = ( "bsub -o {} -e {} -J {}[1-{}]{} " "{} {} {}" ' "{} {} -m esub.submit --job_name={} ' "--tasks='{}' {}".format( stdout_log, stderr_log, job_name_ext, n_jobs, max_string, resource_string, add_bsub, dependency, source_cmd, run_cmd, job_name, tasks, extra_args_string, ) ) else: LOGGER.warning( "You have requested a jobarray with more " f"than {batchsize} cores" ". Euler cannot handle this. I break down this job into " "multiple subarrays and submit them one by one. " "Note that this feature currently breakes the rerun " "missing capability. Also note that" " this process needs to keep running...." ) n_batches = math.ceil(n_jobs / batchsize) cmd_string = [] for rank in range(n_batches): if rank < (n_batches - 1): jobs = batchsize else: jobs = n_jobs % batchsize if jobs == 0: jobs = batchsize first_task = get_indices_splitted(tasks, n_jobs, rank * batchsize) first_task = first_task[0] last_task = get_indices_splitted( tasks, n_jobs, rank * batchsize + jobs - 1 ) last_task = last_task[-1] tasks_ = f"{first_task} > {last_task + 1}" jobname_ = f"{job_name}_{rank}" stdout_log_ = stdout_log[:-2] + f"_{rank}.o" stderr_log_ = stdout_log[:-2] + f"_{rank}.e" cs = ( "bsub -o {} -e {} -J {}[1-{}]{} " '{} {} "{} ' "{} -m esub.submit --job_name={} --tasks='{}' {}".format( stdout_log_, stderr_log_, job_name_ext, jobs, max_string, resource_string, dependency, source_cmd, run_cmd, jobname_, tasks_, extra_args_string, ) ) cmd_string.append(cs) if discard_output: if isinstance(cmd_string, list): for i in range(len(cmd_string)): cmd_string[i] = cmd_string[i] + " --discard_output &> /dev/null" else: cmd_string += " --discard_output &> /dev/null" elif system == "slurm": # split add_args if len(add_args) > 0: add_args = add_args.split(",") else: add_args = [] cmd_string = "sbatch {} submit_{}.slurm".format(dependency, job_name_ext) # write submission file with open(f"submit_{job_name_ext}.slurm", "w+") as f: f.write("#! /bin/bash \n#\n") if discard_output: f.write("#SBATCH --output=/dev/null \n") f.write("#SBATCH --error=/dev/null \n") else: f.write("#SBATCH --output={} \n".format(stdout_log)) f.write("#SBATCH --error={} \n".format(stderr_log)) f.write("#SBATCH --job-name={} \n".format(job_name_ext)) for arg in add_args: f.write("#SBATCH {} \n".format(arg)) f.write("#SBATCH --array=1-{}{} \n".format(n_jobs, max_string)) f.write(resource_string) f.write( "srun {}bash; {} {} -m esub.submit --job_name={} " "--tasks='{}' {}".format( gpu_cmd, source_cmd, run_cmd, job_name, tasks, extra_args_string[:-1], ) ) elif system == "daint": # split add_args if len(add_args) > 0: add_args = add_args.split(",") else: add_args = [] cmd_string = "sbatch {} submit_{}.slurm".format(dependency, job_name_ext) # write submission file with open(f"submit_{job_name_ext}.slurm", "w+") as f: f.write("#! /bin/bash \n#\n") if discard_output: f.write("#SBATCH --output=/dev/null \n") f.write("#SBATCH --error=/dev/null \n") else: f.write("#SBATCH --output={} \n".format(stdout_log)) f.write("#SBATCH --error={} \n".format(stderr_log)) f.write("#SBATCH --job-name={} \n".format(job_name_ext)) f.write("#SBATCH --constraint=gpu \n") f.write("#SBATCH --nodes={} \n".format(nodes)) f.write("#SBATCH --ntasks-per-core={} \n".format(MPI_tasks_per_core)) f.write("#SBATCH --ntasks-per-node={} \n".format(MPI_tasks_per_node)) f.write("#SBATCH --cpus-per-task={} \n".format(OpenMP_threads_per_task)) for arg in add_args: f.write("#SBATCH {} \n".format(arg)) f.write(resource_string) if len(source_cmd) > 0: f.write("srun {} \n".format(source_cmd)) f.write( "srun python -m esub.submit --job_name={} " "--tasks='{}' {}".format( job_name, tasks, extra_args_string[:-1] ) ) LOGGER.debug(f"Built total command string as {cmd_string}") return cmd_string
[docs]def submit_job( tasks, mode, exe, log_dir, function_args, function="main", source_file="", n_jobs=1, job_name="job", main_memory=100, main_time=1, main_scratch=1000, main_n_cores=1, main_gpu=0, main_gpu_memory=1000, rerun_missing_memory=None, rerun_missing_time=None, rerun_missing_scratch=None, rerun_missing_n_cores=None, watchdog_memory=100, watchdog_time=1, watchdog_scratch=1000, watchdog_n_cores=1, watchdog_gpu=0, watchdog_gpu_memory=1000, merge_memory=100, merge_time=1, merge_scratch=1000, merge_n_cores=1, merge_gpu=0, merge_gpu_memory=1000, dependency="", system="bsub", main_name="main", test=False, batchsize=100000, max_njobs=100000, add_args="", add_bsub="", discard_output=False, verbosity=3, main_mode="jobarray", keep_submit_files=False, nodes=1, MPI_tasks_per_core=1, MPI_tasks_per_node=1, OpenMP_threads_per_task=1, ): """ Based on arguments gets the submission string and submits it to the cluster :param tasks: The task string, which will get parsed into the job indices :param mode: The mode in which the job will be ran (MPI-job or as a jobarray) :param exe: The path of the executable :param log_dir: The path to the log directory :param function_args: The remaining arguments that will be forwarded to the executable :param function: The name of the function defined in the executable that will be submitted :param source_file: A file which gets executed before running the actual function(s) :param n_jobs: The number of jobs that will be requested for the job :param job_name: The name of the job :param main_memory: Memory per core to allocate for the main job :param main_time: The Wall time requested for the main job :param main_scratch: Scratch per core to allocate for the main job :param main_n_cores: Number of cores to allocate for the main job :param main_gpu: Number of GPUs to allocate for the main job :param main_gpu_memory: Memory per GPU to allocate for the main job :param rerun_missing_memory: Memory per core to allocate for the rerun job, if None the main_memory will be used :param rerun_missing_time: The Wall time requested for the rerun job, if None the main_time will be used :param rerun_missing_scratch: Scratch per core to allocate for the rerun job, if None the main_scratch will be used :param rerun_missing_n_cores: Number of cores to allocate for the rerun job, if None the main_n_cores will be used :param watchdog_memory: Memory per core to allocate for the watchdog job :param watchdog_time: The Wall time requested for the watchdog job :param watchdog_scratch: Scratch to allocate for the watchdog job :param watchdog_n_cores: Number of cores to allocate for the watchdog job :param watchdog_gpu: Number of GPUs to allocate for the watchdog job :param watchdog_gpu_memory: Memory per GPU to allocate for the watchdog job :param merge_memory: Memory per core to allocate for the merge job :param merge_time: The Wall time requested for the merge job :param merge_scratch: Scratch to allocate for the merge job :param merge_n_cores: Number of cores to allocate for the merge job :param merge_gpu: Number of GPUs to allocate for the merge job :param merge_gpu_memory: Memory per GPU to allocate for the merge job :param dependency: The jobids of the jobs on which this job depends on :param system: The type of the queing system of the cluster :param main_name: name of the main function :param test: If True no submission but just printing submission string to log :param batchsize: If number of cores requested is > batchsize, break up jobarrays into jobarrys of size batchsize :param max_njobs: Maximum number of jobs allowed to run at the same time :param add_args: Additional cluster-specific arguments :param add_bsub: Additional bsub arguments to pass :param discard_output: If True writes stdout/stderr to /dev/null :param verbosity: Verbosity level (0 - 4). :param keep_submit_files: If True store SLURM submission files :return: The jobid of the submitted job """ logger_utils.set_logger_level(LOGGER, verbosity) # assess if number of tasks is valid n_tasks = len(get_indices_splitted(tasks, 1, 0)) if (n_jobs > n_tasks) & (("mpi" not in mode) & (mode != "tasks")): raise Exception( "You tried to request more jobs than you have tasks. " "I assume this is a mistake. Aborting..." ) # get submission string cmd_string = make_cmd_string( function, source_file, n_jobs, tasks, mode, job_name, function_args, exe, main_memory, main_time, main_scratch, main_n_cores, main_gpu, main_gpu_memory, rerun_missing_memory, rerun_missing_time, rerun_missing_scratch, rerun_missing_n_cores, watchdog_time, watchdog_memory, watchdog_scratch, watchdog_n_cores, watchdog_gpu, watchdog_gpu_memory, merge_memory, merge_time, merge_scratch, merge_n_cores, merge_gpu, merge_gpu_memory, log_dir, dependency, system, main_name, batchsize, max_njobs, add_bsub=add_bsub, add_args=add_args, discard_output=discard_output, verbosity=verbosity, main_mode=main_mode, # main_n_cores=main_n_cores, nodes=nodes, MPI_tasks_per_core=MPI_tasks_per_core, MPI_tasks_per_node=MPI_tasks_per_node, OpenMP_threads_per_task=OpenMP_threads_per_task, ) LOGGER.debug(cmd_string) if test: path_log = get_path_log(log_dir, job_name) write_to_log(path_log, cmd_string) return [] # message the system sends if the # maximum number of pendings jobs is reached msg_limit_reached = "Pending job threshold reached." pipe_limit_reached = "stderr" if isinstance(cmd_string, str): cmd_string = [cmd_string] jobids = [] for cs in cmd_string: LOGGER.info("Submitting command:") LOGGER.info(cs) # submit while True: output = dict(stdout=[], stderr=[]) with subprocess.Popen( shlex.split(cs), stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1, universal_newlines=True, ) as proc: # check for limit concerning maximum number of pending jobs if system == "bsub": for line in getattr(proc, pipe_limit_reached): pending_limit_reached = msg_limit_reached in line if pending_limit_reached: break else: output[pipe_limit_reached].append(line) # if the limit has been reached, kill process and sleep if pending_limit_reached: proc.kill() LOGGER.warning(TIMEOUT_MESSAGE) time.sleep(60 * 30) continue # read rest of the output for line in proc.stdout: output["stdout"].append(line) for line in proc.stderr: output["stderr"].append(line) break # check if process terminated successfully if proc.returncode != 0: raise RuntimeError( 'Running the command "{}" failed with' "exit code {}. Error: \n{}".format( cmd_string, proc.returncode, "\n".join(output["stderr"]) ) ) # get id of submitted job if system == "bsub": jobid = output["stdout"][-1].split("<")[1] jobid = jobid.split(">")[0] elif system == "slurm": jobid = output["stdout"][-1].split("job ")[-1] if not keep_submit_files: for cs in cmd_string: os.remove(f"{cs.split(' ')[-1]}") elif system == "daint": jobid = output["stdout"][-1].split("job ")[-1] if not keep_submit_files: for cs in cmd_string: os.remove(f"{cs.split(' ')[-1]}") jobids.append(int(jobid)) LOGGER.info("Submitted job and got jobid(s): {}".format(jobid)) return jobids
[docs]def robust_remove(path): """ Remove a file or directory if existing :param path: path to possible non-existing file or directory """ if os.path.isfile(path): os.remove(path) elif os.path.isdir(path): shutil.rmtree(path) # recreate open(path, "a").close()
[docs]def get_path_log(log_dir, job_name): """ Construct the path of the esub log file :param log_dir: directory where log files are stored :param job_name: name of the job that will be logged :return: path of the log file """ path_log = os.path.join(log_dir, job_name + ".log") return path_log
[docs]def get_path_finished_indices(log_dir, job_name): """ Construct the path of the file containing the finished indices :param log_dir: directory where log files are stored :param job_name: name of the job for which the indices will be store :return: path of the file for the finished indices """ path_finished = os.path.join(log_dir, job_name + "_done.dat") return path_finished
[docs]def import_executable(exe): """ Imports the functions defined in the executable file. :param exe: path of the executable :return: executable imported as python module """ sys.path.insert(0, os.path.dirname(exe)) to_import = os.path.basename(exe).replace(".py", "") try: executable = __import__(to_import) except ImportError: raise ImportError(f"Failed to import your executable {exe}") return executable
[docs]def save_write(path, str_to_write, mode="a"): """ Write a string to a file, with the file being locked in the meantime. :param path: path of file :param str_to_write: string to be written :param mode: mode in which file is opened """ with portalocker.Lock(path, mode=mode, timeout=math.inf) as f: # write f.write(str_to_write) # flush and sync to filesystem f.flush() os.fsync(f.fileno())
[docs]def write_index(index, finished_file): """ Writes the index number on a new line of the file containing the finished indices :param index: A job index :param finished_file: The file to which the jobs will write that they are done """ save_write(finished_file, "{}\n".format(index))
[docs]def check_indices( indices, finished_file, exe, function_args, check_indices_file=True, verbosity=3, ): """ Checks which of the indices are missing in the file containing the finished indices :param indices: Job indices that should be checked :param finished_file: The file from which the jobs will be read :param exe: Path to executable :param check_indices_file: If True adds indices from index file otherwise only use check_missing function :param verbosity: Verbosity level (0 - 4). :return: Returns the indices that are missing """ if check_indices_file: LOGGER.debug("Checking missing file for missing indices...") # wait for the indices file to be written if os.path.exists(finished_file): # first get the indices missing in the log file (crashed jobs) done = [] with open(finished_file, "r") as f: for line in f: # Ignore empty lines if line != "\n": done.append(int(line.replace("\n", ""))) failed = list(set(indices) - set(done)) LOGGER.debug(f"Found failed indices: {failed}") else: LOGGER.warning( "Did not find File {} -> None of the main functions " "recorded its indices. " "Not rerunning any jobs".format(finished_file) ) failed = [] else: failed = [] # if provided use check_missing function # (finished jobs but created corrupted output) if hasattr(exe, "check_missing"): LOGGER.info("Found check_missing function in executable. Running...") corrupted = getattr(exe, "check_missing")(indices, function_args) LOGGER.debug(f"Found corruped indices: {corrupted}") else: corrupted = [] missing = failed + corrupted missing = np.unique(np.asarray(missing)) LOGGER.debug(f"Found failed/corrputed indices: {missing}") return missing
[docs]def write_to_log(path, line, mode="a"): """ Write a line to a esub log file :param path: path of the log file :param line: line (string) to write :param mode: mode in which the log file will be opened """ extended_line = "{} {}\n".format(datetime.datetime.now(), line) save_write(path, extended_line, mode=mode)
[docs]def cd_local_scratch(verbosity=3): """ Change to current working directory to the local scratch if set. :param verbosity: Verbosity level (0 - 4). """ if "ESUB_LOCAL_SCRATCH" in os.environ: if os.path.isdir(os.environ["ESUB_LOCAL_SCRATCH"]): submit_dir = os.getcwd() os.chdir(os.environ["ESUB_LOCAL_SCRATCH"]) os.environ["SUBMIT_DIR"] = submit_dir LOGGER.warning( "Changed current working directory to {} and " "set $SUBMIT_DIR to {}".format(os.getcwd(), os.environ["SUBMIT_DIR"]) ) else: LOGGER.error( "$ESUB_LOCAL_SCRATCH is set to non-existing " "directory {}, skipping...".format(os.environ["ESUB_LOCAL_SCRATCH"]) ) else: LOGGER.debug( "Environment variable ESUB_LOCAL_SCRATCH not set. " "Not chaning working directory." )
[docs]def run_local_mpi_job( exe, n_cores, function_args, logger, main_name="main", verbosity=3 ): """ This function runs an MPI job locally :param exe: Path to executable :param n_cores: Number of cores :param function_args: A list of arguments to be passed to the executable :param index: Index number to run :param logger: logger instance for logging :param main_name: Name of main function in executable :param verbosity: Verbosity level (0 - 4). :param main_name: """ # construct the string of arguments passed to the executable args_string = "" for arg in function_args: args_string += arg + " " # make command string cmd_string = ( "mpirun -np {} python -m esub.submit" " --executable={} --tasks='0' --main_name={} " "--esub_verbosity={} {}".format(n_cores, exe, main_name, verbosity, args_string) ) for line in execute_local_mpi_job(cmd_string): line = line.strip() if len(line) > 0: logger.info(line)
[docs]def get_indices(tasks): """ Parses the jobids from the tasks string. :param tasks: The task string, which will get parsed into the job indices :return: A list of the jobids that should be executed """ # parsing a list of indices from the tasks argument if ">" in tasks: tasks = tasks.split(">") start = tasks[0].replace(" ", "") stop = tasks[1].replace(" ", "") indices = list(range(int(start), int(stop))) elif "," in tasks: indices = tasks.split(",") indices = list(map(int, indices)) elif os.path.exists(tasks): with open(tasks, "r") as f: content = f.readline() indices = get_indices(content) else: try: indices = [int(tasks)] except ValueError: raise ValueError("Tasks argument is not in the correct format!") return indices
[docs]def get_indices_splitted(tasks, n_jobs, rank): """ Parses the jobids from the tasks string. Performs load-balance splitting of the jobs and returns the indices corresponding to rank. This is only used for job array submission. :param tasks: The task string, which will get parsed into the job indices :param n_jobs: The number of cores that will be requested for the job :param rank: The rank of the core :return: A list of the jobids that should be executed by the core with number rank """ # Parse indices = get_indices(tasks) # Load-balanced splitter steps = len(indices) size = n_jobs chunky = int(steps / size) rest = steps - chunky * size mini = chunky * rank maxi = chunky * (rank + 1) if rank >= (size - 1) - rest: maxi += 2 + rank - size + rest mini += rank - size + 1 + rest mini = int(mini) maxi = int(maxi) return indices[mini:maxi]
[docs]def function_wrapper(indices, args, func): """ Wrapper that converts a generator to a function. :param generator: A generator """ inds = [] for ii in func(indices, args): inds.append(ii) return inds
[docs]def run_local_tasks(exe, n_jobs, function_args, tasks, function): """ Executes an MPI job locally, running each splitted index list on one core. :param exe: The executable from where the main function is imported. :param n_jobs: The number of cores to allocate. :param function_args: The arguments that will get passed to the main function. :param tasks: The indices to run on. :param function: The function name to run """ LOGGER.warning( "NotImplementedWarning: Using run-tasks creates a multiprocessing " "worker pool with just one thread per job. " "The n_core arguments are ignored." ) # get executable func = getattr(exe, function) # Fix function arguments for all walkers run_func = partial(function_wrapper, args=function_args, func=func) # get splitted indices nums = [] for rank in range(n_jobs): nums.append(get_indices_splitted(tasks, n_jobs, rank)) # Setup mutltiprocessing pool pool = multiprocessing.Pool(processes=n_jobs) if int(multiprocessing.cpu_count()) < n_jobs: raise Exception( "Number of CPUs available is smaller \ than requested number of CPUs" ) # run and retrive the finished indices out = pool.map(run_func, nums) out = [item for sublist in out for item in sublist] return out
[docs]def execute_local_mpi_job(cmd_string): """ Execution of local MPI job :param cmd_string: The command string to run """ popen = subprocess.Popen( shlex.split(cmd_string), stdout=subprocess.PIPE, universal_newlines=True, ) for stdout_line in iter(popen.stdout.readline, ""): yield stdout_line popen.stdout.close() return_code = popen.wait() if return_code: raise subprocess.CalledProcessError(return_code, cmd_string)