All the example scripts used here are included in the sections below for illustration. Note that the –something argument is not known to esub. Hence, it is just passed to your executable.
Running 10 tasks in the active shell serially. Also runs the watchdog and merge functions as well as rerun-missing. The something argument gets passed to the executable:
$ esub exec_example.py --tasks='0 > 10' --function=all --mode=run --something=10
Running 21 tasks in the active shell in parallel using 10 of the cores on the local machine (each core running 2 jobs except for one running 3).:
$ esub exec_example.py --tasks='0 > 21' --main_n_cores_per_job=10 --function=main --mode=run-tasks --something=10
Running 5 MPI jobs locally in your active shell serially. Each job has 10 cores available:
$ esub exec_example.py --tasks='0 > 5' --main_n_cores_per_job=10 --function=main --mode=run-mpi --something=10
Submitting 10 tasks to the LSF system splitted over 5 jobs (each job executes 2 tasks). Each job runs on 1 core. Also runs the watchdog, merge and rerun-missing functions. The job has a maximum runtime of 10h and allocates 20GB of RAM for each core:
$ esub exec_example.py --tasks='0 > 10' --n_jobs=5 --main_n_cores_per_job=1 --function=all --mode=jobarray --something=10 --main_time=10 --main_memory=20000
Splitting 20 tasks over 3 jobs. Launches a jobarray with 3 jobs and each job has 5 cores available.:
$ esub exec_example.py --tasks='0 > 20' --n_jobs=3 --main_n_cores_per_job=5 --function=main --mode=jobarray --something=10
Submitting 4 individual MPI job using 5 cores each.:
$ esub exec_example.py --tasks='0 > 4' --n_jobs=4 --n_jobs=1 --main_n_cores_per_job=5 --function=main --mode=mpi --something=10
Running 21 tasks in the active shell in parallel using 10 of the cores on the local machine (each core running 2 jobs except for one running 3).:: A more complex example. Runs 21 tasks splitted over 5 jobs (jobarray) with each job having 10 cores, 5GB RAM and a maximum runtime of 5h. Additionally, runs a watchdog on 20 cores alongside the main jobs, having 10GB. After the main functions and the watchdog finish the rerun-missing function is triggered and potentially reruns failed jobs. When the rerun-missing is done the merge function is launched. The merge function is an MPI job with 100 cores and 4GB of RAM.
$ esub exec_example.py –tasks=’0 > 21’ –n_jobs=5 –main_n_cores_per_job=10 –function=all –mode=jobarray –something=10 –main_memory=5000 –main_time=5 –watchdog_memory=10000 –watchdog_n_cores=10 –mpi_merge –merge_n_cores=100 –merge_memory=4000
Submitting a whole pipeline with an arbitrary number of jobs, dependencies and loops to the system:
$ epipe pipeline_example.yaml
Below is an example of an executable script that can be used by esub. Please check the Usage section to find an explanation for the different elements.
# Copyright (C) 2019 ETH Zurich, Institute for Particle Physics and Astrophysics
# Author: Dominik Zuercher
# exec_example.py
# This is an example for an executable that can be ran by esub
import argparse
import os
import time
import numpy as np
# the file should include at least the main function
# each function takes a list of integer indices as well as a parser object ARGS which can be used to
# pass arguments to the function from the command line directly
def get_tasks(args):
# The get_tasks function can be used to calculate the number of indices that should be ran.
# This function has to either return a list of integers that indicate the index numbers that should be ran
# or a tuple indicating the first and last index to run (the last one is exclusive).
return [1, 2, 3]
def resources(args):
# The resource function can be used to tell esub which resources to allocate (only used if
# job is submitted to queing system). Gets overwritten by commmand line arguments.
return dict(
main_memory=1000,
main_time=5,
main_scratch=25000,
watchdog_memory=15000,
watchdog_time=24,
merge_memory=6000,
merge_time=8,
)
def check_missing(indices, args):
# The check_missing function. Per default esub will rerun all jobs that crashed. But in some
# cases one might want to check if the jobs produced the desired output and rerun them
# if the output is corrupted. The check_missing function is providing this functionality.
random_seed, output_directory = setup(args)
corrupted = []
for index in indices:
nums = np.load("{}/randoms_{}.npy".format(output_directory, index))
if nums.size != 3:
corrupted.append(index)
return corrupted
def setup(args):
# The setup function gets executed first before esub starts (useful to create directories for example)
description = "This is an example script that can be used by esub"
parser = argparse.ArgumentParser(description=description, add_help=True)
parser.add_argument(
"--random_seed", type=int, action="store", default=30, help="Some random seed"
)
parser.add_argument(
"--output_directory",
type=str,
action="store",
default=".",
help="Where to write output files to",
)
args = parser.parse_args(args)
return args.random_seed, args.output_directory
def main(indices, args):
random_seed, output_directory = setup(args)
np.random.seed(random_seed)
for index in indices:
# put here what you wish to do for the task with index 'index'
# if mode has been chosen to be mpi or run-mpi. index is always 0 and this is just an MPI pool
print("This is main index {}".format(index))
nums = np.zeros(0)
while nums.size < 3:
num = np.random.randint(1000000)
print("Generated random number: {}".format(num))
nums = np.append(nums, num)
time.sleep(0.5)
np.save("{}/randoms_{}.npy".format(output_directory, index), nums)
print("Saved output to file {}/randoms_{}.npy".format(output_directory, index))
# IMPORTANT: In order for rerun missing to work properly have to add yield index in index loop of main function!!!
yield index
def watchdog(indices, args):
# The watchdog runs along the main function on a single thread executing all indices. Meant to collect files on the
# fly for example.
random_seed, output_directory = setup(args)
total_nums = np.zeros(0)
for index in indices:
# put here what you wish to do for the task with index 'index'
print("This is watchdog index {}".format(index))
print(
"Waiting to collect file {}/randoms_{}.npy...".format(
output_directory, index
)
)
while not os.path.isfile("{}/randoms_{}.npy".format(output_directory, index)):
time.sleep(5)
nums = np.load("{}/randoms_{}.npy".format(output_directory, index))
total_nums = np.append(total_nums, nums)
np.save("{}/all_randoms.npy".format(output_directory), total_nums)
print("Saved output to file {}/all_randoms.npy".format(output_directory))
def merge(indices, args):
# The merge runs after the main function on a single thread executing all indices. Meant to finalize output files.
random_seed, output_directory = setup(args)
# in this example we completely ignore the indices and just run a single job on the file made by the watchdog.
print("This is merge")
nums = np.load("{}/all_randoms.npy".format(output_directory))
print("The mean is {}".format(np.mean(nums)))
print("The standard deviation is {}".format(np.std(nums)))
Below is an example of an epipe pipeline file that can be used by epipe. Please check the Usage section to find an explanation for the different elements.
# Copyright (C) 2019 ETH Zurich, Institute for Particle Physics and Astrophysics
# Created on May 12
# Author: Dominik Zuercher
# pipeline_example.yml
# This is an example for a pipeline which can be ran by epipe
# In the parameters section one can define global variables. When submitting the jobs epipe will replace them in the commands (see below)
- parameters:
- executable: exec_example.py
- source_file: source_file_example.sh
# This is a job instance without dependencies (starts directly).
- name: job1
cmd: esub $[executable] --tasks='0 > 5' --n_jobs=3 --mode=jobarray --function=all --source_file=$[source_file]
# This job only starts after job1 has finished (if dependencies are not found they are just ignored).
- name: job2
cmd: esub $[executable] --tasks='0 > 5' --n_jobs=3 --mode=jobarray --function=main --source_file=$[source_file]
dep: job1
# This is a loop. All the jobs in the loop will be ran 3 times and the loop index can be passed to the jobs.
- name: jobloop1
loop: [0, 3]
dep: job1, job2
items:
- name: loopjob1
cmd: esub $[executable] --tasks='0 > 5' --n_jobs=3 --mode=jobarray --function=all --source_file=$[source_file]
# This job is an example of how the loop index can be used to format the submitted command. The commands that will
# actually be submitted are:
# 1) esub exec_example.py --tasks='0 > 0' --n_jobs=3 --mode=jobarray --function=all --source_file=./source_file_example.sh
# 2) esub exec_example.py --tasks='0 > 1' --n_jobs=3 --mode=jobarray --function=all --source_file=./source_file_example.sh
# 3) esub exec_example.py --tasks='0 > 2' --n_jobs=3 --mode=jobarray --function=all --source_file=./source_file_example.sh
- name: loopjob2
cmd: esub $[executable] --tasks='{}' --n_jobs=1 --mode=jobarray --function=all --source_file=$[source_file]
dep: loopjob1
This is an example of a source file (simple shell script) that can be used by esub to set up the environement for the task that one wants to run.
# Copyright (C) 2019 ETH Zurich, Institute for Particle Physics and Astrophysics
# Author: Dominik Zuercher
# source_file_example.sh
# This is an example of a shell script that can be used by esub to
# declare variables and activate a virtual python environement for example
source ~/venv_3.6.1/bin/activate
export DIRECTORY=$pwd