Source code for esub.scripts.check_logs
# Copyright (C) 2019 ETH Zurich,
# Institute for Particle Physics and Astrophysics
# Author: Joerg Herbel
import argparse
import os
from ekit import logger as logger_utils
LOGGER = logger_utils.init_logger(__name__)
[docs]def main(dirpath_logs):
# Get all log files in directory
filenames_logs = filter(
lambda fn: not fn.startswith(".") and os.path.splitext(fn)[1] == ".log",
os.listdir(dirpath_logs),
)
filenames_logs = sorted(filenames_logs)
# Check
unfinished_logs = []
for filename in filenames_logs:
# read log file
with open(os.path.join(dirpath_logs, filename), mode="r") as f:
log = f.read()
# check which functions were submitted
main_run = "Job id main" in log
rerun_missing = "Job id rerun_missing" in log
merge_run = "Job id merge" in log
watchdog_run = "Job id watchdog" in log
# check if main job only and MPI mode
if main_run and not (rerun_missing | merge_run | watchdog_run):
if "Finished running main" in log:
continue
# check jobarray case
expected_content = []
if main_run or rerun_missing:
expected_content.append("All indices finished")
if merge_run:
expected_content.append("Finished running merge")
if watchdog_run:
expected_content.append("Finished running watchdog")
for exp_cont in expected_content:
if exp_cont not in log:
unfinished_logs.append(filename)
break
# Report
if len(unfinished_logs) == 0:
LOGGER.info("No unfinished jobs found")
else:
LOGGER.info(
"Logs containing unfinished jobs:\n{}".format("\n".join(unfinished_logs))
)
if __name__ == "__main__":
description = (
"Send a command to all jobs logged in " "esub logfiles in a given directory"
)
parser = argparse.ArgumentParser(description=description, add_help=True)
parser.add_argument(
"--dirpath_logs",
type=str,
default="esub_logs",
help="Directory containing esub logs",
)
args = parser.parse_args()
main(args.dirpath_logs)