#! /usr/bin/env python
# Copyright (C) 2019 ETH Zurich,
# Institute for Particle Physics and Astrophysics
# Author: Dominik Zuercher, adapted by Silvan Fischbacher
# System imports
from __future__ import absolute_import, division, print_function, unicode_literals
import argparse
import shlex
import subprocess
# package imports
import sys
import yaml
from ekit import logger as logger_utils
from esub import esub
LOGGER = logger_utils.init_logger(__name__)
[docs]def starter_message():
print(" ")
print(" _____ ")
print("________________(_)____________ ")
print("_ _ \\__ __ \\_ /___ __ \\ _ \\ ")
print("/ __/_ /_/ / / __ /_/ / __/ ")
print("\\___/_ .___//_/ _ .___/\\___/ ")
print(" /_/ /_/ ")
print(" ")
[docs]def represents_int(s):
"""
Checks if string can be converted to integer
:param s: string to be checked
:return: True if s can be converted to an integer and False otherwise
"""
try:
int(s)
return True
except ValueError:
return False
[docs]def make_submit_command(base_cmd, name, deps, job_dict, parameters):
"""
Adds the dependencies to the base command given in the pipeline
Also tries to replace variables indicated by [] brackets.
:param base_cmd: The command string given in pipeline file
:param name: The job name
:param deps: The dependencies of the job
:param job_dict: The jobs dictionary
:param parameters: A dictionary holding global variables.
Function attempts to replace them in the command string
:return: Complete submission string
"""
dep_string = ""
for dep in deps:
if represents_int(dep):
job_ids = [dep]
elif dep in job_dict.keys():
job_ids = job_dict[dep]
else:
LOGGER.warning(
"Did not find a job named {}. \
Ignoring dependencies on it".format(dep)
)
continue
for job_id in job_ids:
dep_string += "ended({}) && ".format(int(job_id))
dep_string = dep_string[:-4]
dep_string = '"' + dep_string + '"'
cmd = (
base_cmd + " --job_name={}".format(name) + " --dependency={}".format(dep_string)
)
# Attempt variable replacement
while True:
rep = cmd.find("$")
if rep == -1:
break
start = cmd[rep:].index("[") + rep
stop = cmd[rep:].index("]") + rep
rep_key = cmd[start + 1 : stop]
cmd = cmd[:rep] + str(parameters[rep_key]) + cmd[stop + 1 :]
return cmd
[docs]def submit(cmd, assert_ids=True):
"""
Runs the command and receives its job index
which gets added to the dependency list.
:param cmd: Command string to submit
:param assert_ids: whether to check that job ids were successfully obtained
:return: The ids of the just submitted jobs
"""
stdout_lines = []
with subprocess.Popen(
shlex.split(cmd),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
universal_newlines=True,
) as proc:
# read stdout
for line in proc.stdout:
stdout_lines.append(line)
if esub.TIMEOUT_MESSAGE in line:
LOGGER.warning(esub.TIMEOUT_MESSAGE)
# read stderr
for line in proc.stdout:
stdout_lines.append(line)
if esub.TIMEOUT_MESSAGE in line:
LOGGER.warning(esub.TIMEOUT_MESSAGE)
# read stderr
stderr = "\n".join(proc.stderr)
# raise an exception in case the process did not terminate successfully
if proc.returncode != 0:
raise RuntimeError(
'Running the command "{}" failed with\
exit code {}. Error: \n{}'.format(
cmd, proc.returncode, stderr
)
)
# get ids of newly submitted jobs
last_line = stdout_lines[-1]
ids = []
for string in last_line.split("Submitted job ids:")[-1].split(" "):
try:
int(string)
ids.append(string)
except ValueError:
pass
if assert_ids:
assert (
len(ids) > 0
), "Something went wrong, did not manage to get any job ids from stdout"
return ids
[docs]def get_parameters(step):
"""
Extracts and stores global variables from the\
parameter object in epipe file
:param step: epipe object instance
:return : Dictionary holding the global variables
"""
param_list = step["parameters"]
parameters = {}
for element in param_list:
key = list(element.keys())[0]
parameters[key] = element[key]
parameter_dict = {}
for par_key in parameters.keys():
parameter_dict[par_key] = parameters[par_key]
for key in parameter_dict.keys():
LOGGER.debug(f"Setting parameter {key} -> {parameter_dict[key]}")
return parameter_dict
[docs]def process_item(
step,
job_dict,
index=-1,
loop_entry=None,
loop_dependence=None,
parameters=None,
assert_ids=True,
):
"""
Processes one of the epipe items in the pipeline
:param step: The job item
:param job_dict: The previous job id dictionary
:param index: Loop index if within a loop
:param loop_dependence: The overall dependencies of the
loop structure if within a loop
:param parameters: A dictionary holding global variables. Function
attempts to replace them in the command string
:param assert_ids: whether to check that job ids were successfully obtained
:return: An updated dictionary containing the already
submitted jobs and the newly submitted job
"""
if parameters is None:
parameters = dict()
deps = []
in_loop = index >= 0
# direct dependencies of the item itself
if "dep" in step.keys():
step["dep"] = str(step["dep"])
dependencies = step["dep"].replace(" ", "").split(",")
for dep in dependencies:
deps.append(dep)
# if we are inside a loop, then format the
# loop-internal dependencies as necessary
if in_loop:
for ii, dep in enumerate(deps):
if not represents_int(dep):
deps[ii] = "{}__{}".format(dep, loop_entry)
# add the global dependencies of the loop
if loop_dependence is not None:
for lp in loop_dependence:
deps.append(lp)
# get the job name and the job command
job_name = step["name"]
base_cmd = step["cmd"].strip()
if in_loop:
job_name = "{}__{}".format(job_name, loop_entry)
base_cmd = format_loop_cmd(base_cmd, index, loop_entry)
# submit the job
LOGGER.debug(f"Propagating dependencies {deps} to job {job_name}")
LOGGER.info("Submitting job {}".format(job_name))
cmd = make_submit_command(base_cmd, job_name, deps, job_dict, parameters)
print(cmd)
new_ids = submit(cmd, assert_ids=assert_ids)
job_dict[job_name] = new_ids
[docs]def main(args=None):
"""
epipe main function. Accepts a epipe yaml
configuration file and runs the pipeline.
:param args: Command line arguments
"""
if args is None:
args = sys.argv[1:]
description = "This is epipe a tool to easily submit\
pipelines to a clusters queing system"
parser = argparse.ArgumentParser(description=description, add_help=True)
# parse all the submitter arguments
parser.add_argument(
"pipeline",
type=str,
action="store",
help="Path to the pipeline file. Format should be as "
"specified in the epipe example. Check "
"documentations",
)
parser.add_argument(
"--ignore_jobid_errors",
action="store_true",
help="Switches off checks checking "
"wheter ids of submitted "
"jobs were successfully obtained, useful for "
"testing",
)
parser.add_argument(
"--epipe_verbosity",
type=int,
default=3,
choices=(0, 1, 2, 3, 4),
help="epipe verbosity. From 0-4 with 4 being "
"the most verbose. Default is 3 (info).",
)
args = parser.parse_args(args)
logger_utils.set_logger_level(LOGGER, args.epipe_verbosity)
pipeline = args.pipeline
LOGGER.debug(f"Using pipeline in file {pipeline}")
assert_ids = not args.ignore_jobid_errors
# read pipeline file
with open(pipeline, "r") as f:
pipeline = yaml.load(f, yaml.FullLoader)
starter_message()
# parse all the jobs and their dependencies from the pipeline string
job_dict = {}
LOGGER.info("Starting submission")
parameters = None
for step in pipeline:
LOGGER.debug(f"Processing item {step}")
if "parameters" in step.keys():
LOGGER.debug("Found parameter object")
parameters = get_parameters(step)
continue
if "loop" in step.keys():
LOGGER.debug("Found loop object")
loop_entries = list(step["loop"])
if len(loop_entries) == 2:
if isinstance(loop_entries[0], int) & isinstance(loop_entries[1], int):
LOGGER.warning(
"Loop object has structure "
"[int1, int2] -> Intrepreting as a range!"
)
loop_entries = [x for x in range(loop_entries[0], loop_entries[1])]
LOGGER.info(f"Submitting loop indices {loop_entries}")
deps = []
# get the names of all jobs that have been
# submitted before the loop
submitted_jobs = set(job_dict.keys())
if "dep" in step.keys():
step["dep"] = str(step["dep"])
dependencies = step["dep"].replace(" ", "").split(",")
for dep in dependencies:
deps.append(dep)
for index, entry in enumerate(loop_entries):
for item in step["items"]:
process_item(
item,
job_dict,
index=index,
loop_entry=entry,
loop_dependence=deps,
parameters=parameters,
assert_ids=assert_ids,
)
# find the names of the newly submitted jobs
loop_jobs = set(job_dict.keys()) - submitted_jobs
# add the loop as a whole to the job dictionary
job_dict[step["name"]] = []
for loop_job in loop_jobs:
job_dict[step["name"]] += job_dict[loop_job]
else:
process_item(step, job_dict, parameters=parameters, assert_ids=assert_ids)
for key in job_dict.keys():
job_ids = " ".join(map(str, job_dict[key]))
LOGGER.info(f"Submitted job {key} with id(s): {job_ids}")
LOGGER.info("Submission finished")
if __name__ == "__main__":
main()