"""Command-line methods for the DRS subsystem."""
from ast import arg
import os
import sys
import json
import argparse
import axiom.utilities as au
from axiom.config import load_config
import axiom.drs.payload as adp
import axiom.drs.utilities as adu
from tqdm import tqdm
from pathlib import Path
import shutil
import datetime
[docs]def split_args(values):
"""Split an argument that is comma-separated.
Args:
values (str): Values
Returns:
list : List of split arguments.
"""
return values.split(',')
[docs]def get_parser(config=None, parent=None):
"""Parse arguments for command line utiltities.
Args:
config (dict) : Configuration dictionary.
parent (obj) : Parent parser object (for integration into the main axiom CLI)
Returns:
argparse.Namespace : Arguments object.
"""
# Load the model, project and domain
VALID_MODELS = load_config('models').keys()
VALID_PROJECTS = load_config('projects').keys()
VALID_DOMAINS = load_config('domains').keys()
# Build a parser
if parent is None:
parser = argparse.ArgumentParser()
# ...or add one to the top-level CLI
else:
parser = parent.add_parser('drs')
parser.description = "DRS utility"
# Paths
parser.add_argument('--input_files', required=True, type=str, help='Input filepaths', nargs=argparse.ONE_OR_MORE)
parser.add_argument('--output_directory', required=True, type=str, help='Output base directory (DRS structure built from here)')
parser.add_argument('-o', '--overwrite', default=False, help='Overwrite existing output', action='store_true')
# Temporal
parser.add_argument('-s', '--start_year', required=True, type=int, help='Start year')
parser.add_argument('-e', '--end_year', required=True, type=int, help='End year')
# Resolution and output frequency
parser.add_argument('-r', '--input_resolution', type=float, help='Input resolution in km, leave blank to auto-detect from path.')
parser.add_argument(
'-f', '--output_frequency', required=True, type=str, metavar='output_frequency',
help='Output frequency, Examples include "12min", "1M" (1 month) etc. see https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases.'
)
# Metadata
parser.add_argument('-p', '--project', required=True, type=str, choices=VALID_PROJECTS)
parser.add_argument('-m', '--model', required=True, type=str, choices=VALID_MODELS)
# Domains, we can process multiple at once
parser.add_argument(
'-d', '--domain',
required=True, type=str,
choices=VALID_DOMAINS,
help='Domain to process'
)
# Override the variables defined in the drs.json file
parser.add_argument(
'-v', '--variable',
type=str,
required=True,
help='Variable to process.'
)
return parser
[docs]def get_parser_consume(config=None, parent=None):
"""A parser that can consume JSON payload files.
Args:
config ([type], optional): [description]. Defaults to None.
parent ([type], optional): [description]. Defaults to None.
"""
if parent is None:
parser = argparse.ArgumentParser()
else:
parser = parent.add_parser('drs_consume')
# Input filepaths
parser.add_argument('input_filepaths', type=str, help='Input json filepaths.', nargs=argparse.ONE_OR_MORE)
return parser
[docs]def drs_launch(path, jobscript, log_dir, batches=None, dry_run=True, interactive=False, unlock=False, **launch_context):
"""Method to launch a series of qsubs for DRS processing.
Args:
path (str): Globbable path of payload files.
jobscript (str): Path to the job script.
log_dir (str): Path to which to save the log files.
batches (int): Number of batches to split variables into (for parallel processing).
dry_run (bool): Print out the commands rather than executing.
interactive (bool): Dump the interactive flag into the qsub command when dumping.
unlock (bool): Unlock locked payloads prior to submission (for rerunning walltime overruns)
**launch_context: Additional arguments that will be interpolated as launch context.
"""
# List the payloads in the input_directory
payloads = au.auto_glob(path)
if not dry_run:
os.makedirs(log_dir, exist_ok=True)
for payload in payloads:
# Load the payload to get the project, where we can get the variables and work out what the batch_size will be
if batches:
payload_obj = json.load(open(payload, 'r'))
project = payload_obj['project']
project_config = load_config('projects')[project]
variables = project_config['variables_2d'] + list(project_config['variables_3d'].keys())
variables_batches = au.batch_split(variables, n_batches=batches)
_batches = range(1, len(variables_batches) + 1)
# Single batch, which makes the next section simpler
else:
_batches = [1]
# Unlock the file if requested
if au.is_locked(payload) and unlock == True:
print(f'Unlocking {payload} for resubmission')
au.unlock(payload)
# Skip if not
elif au.is_locked(payload):
print(f'{payload} is locked.')
continue
# Convert the path to the jobscript to an absolute path for reproducibility
jobscript = os.path.abspath(jobscript)
payload = os.path.abspath(payload)
log_dir = os.path.abspath(log_dir)
for batch_id in _batches:
job_name = os.path.basename(payload)
batch_str = str(batch_id).zfill(3)
job_name = f'{job_name}_{batch_str}'
# Assemble the command from configuration
config = load_config('drs')
directives = config['launch']['directives']
# Add interactive flag when dry running
if dry_run and interactive:
directives.append('-I')
# Override walltime
if 'walltime' in launch_context.keys() and launch_context['walltime'] is not None:
walltime = launch_context['walltime']
directives.append(f'-l walltime={walltime}')
qsub_vars = dict(
AXIOM_PAYLOAD=payload,
AXIOM_LOG_DIR=log_dir,
AXIOM_BATCH=batch_id
)
# Assemble the launch context for this job
_launch_context = dict(
qsub_vars=adu.assemble_qsub_vars(**qsub_vars),
job_name=job_name,
log_dir=log_dir,
batch_str=batch_str
)
# Add this to the user-supplied launch context
launch_context.update(_launch_context)
# Assemble the qsub command
cmd = adu.assemble_qsub_command(
jobscript=jobscript,
directives=directives,
**launch_context
)
# Dry run, just echo the outputs
if dry_run:
print(cmd)
# Real run, submit the jobs.
else:
qsub = au.shell(cmd)
if qsub.returncode == 0:
print(qsub.stdout.decode('utf-8'))
[docs]def get_parser_launch(parent=None):
"""A parser that can launch DRS processing.
Args:
parent ([type], optional): [description]. Defaults to None.
"""
if parent is None:
parser = argparse.ArgumentParser()
else:
parser = parent.add_parser('drs_launch')
parser.description = 'Submit drs_consume tasks via qsub.'
# Input filepaths
parser.add_argument('path', type=str, help='Globbable path to payload files (use quotes)')
parser.add_argument('jobscript', type=str, help='Path to the jobscript for submission.')
parser.add_argument('log_dir', type=str, help='Directory to which to write logs.')
parser.add_argument('-d', '--dry_run', action='store_true', default=False, help='Print commands without executing.')
parser.add_argument('-i', '--interactive', action='store_true', default=False, help='Dump the interactive flag into the qsub command when dry-running.')
parser.add_argument('--walltime', type=str, help='Override walltime in job script.')
parser.add_argument('--unlock', help='Unlock locked payloads prior to submission', action='store_true', default=False)
parser.set_defaults(func=drs_launch)
return parser
[docs]def generate_payloads(payload_dst, input_files, output_dir, start_year, end_year, project, model, domain, variables=None, schema=None, output_frequencies='1H,6H,1D,1M', num_batches=1, extra=None):
# Unpack the extra arguments
_extra = dict()
for kv in extra:
k, v = kv.split(',')
_extra[k] = v
payloads = adp.generate_payloads(
input_files=input_files,
output_directory=output_dir,
start_year=start_year, end_year=end_year,
project=project, model=model, domain=domain,
variables=variables, schema=schema,
output_frequencies=output_frequencies,
num_batches=num_batches,
**_extra
)
if len(payloads) == 0:
raise Exception('No payloads generated!!!')
# Create the output directory
os.makedirs(payload_dst, exist_ok=True)
# Generate the payloads at the destination
print('Writing payloads...')
for payload in tqdm(payloads):
filepath = os.path.join(
payload_dst,
payload.get_filename()
)
payload.to_json(filepath)
print(f'Payloads available at {payload_dst}')
[docs]def get_parser_generate_payloads(parent=None):
"""A parser to generate payloads.
Args:
parent (object, optional): Parent parser object. Defaults to None.
"""
parser = argparse.ArgumentParser() if parent is None else parent.add_parser('drs_gen_payloads')
parser.description = 'Generate a series of payload files.'
parser.add_argument('payload_dst', type=str, help='Where to write payload files.')
parser.add_argument('input_files', type=str, help='Globbable path to input files, use quotes.')
parser.add_argument('output_dir', type=str, help='Output directory (DRS written from here).')
parser.add_argument('start_year', type=int, help='Start year.')
parser.add_argument('end_year', type=int, help='End year.')
parser.add_argument('project', type=str, help='Project key from projects.json.')
parser.add_argument('model', type=str, help='Model key from models.json.')
parser.add_argument('domain', type=str, help='Domain key from domains.json.')
parser.add_argument('--variables', type=split_args, help='Comma-separated list of variables to process.')
parser.add_argument('--schema', type=str, help='Schema to read variables from in lieu of variables.')
parser.add_argument('--output_frequencies', type=split_args, help='Comma-separated list of output frequencies. Defaults to "1H,6H,1D,1M"', default='1H,6H,1D,1M')
parser.add_argument('-e', '--extra', type=str, nargs=argparse.ZERO_OR_MORE, help='Extra metadata to add, "key,value".')
parser.set_defaults(func=generate_payloads)
return parser
[docs]def rerun_failures(input_dir):
"""Method to rerun payloads based on the .failed output files.
Args:
input_dir (str) : Path to the input directory containing both the failed files and the original payloads.
"""
# Find all of the failed files in the provided directory
failed_filepaths = au.auto_glob(f'{input_dir}/*failed')
for failed_filepath in failed_filepaths:
# Open the failed filepath, read out the variables
raw = open(failed_filepath).read().splitlines()
failed_variables = [_raw.split(',')[0] for _raw in raw]
# Filter out duplicates
failed_variables = list(set(failed_variables))
# Open the payload
payload_filename = os.path.basename(failed_filepath).split('_')[0]
payload_filepath = os.path.join(
input_dir,
payload_filename
)
# Open them
payload = adp.Payload.from_json(payload_filepath)
# Replace the variables listed with that of those included in the failed file
payload.variables = failed_variables
# Save them to a rerun directory
output_dir = os.path.join(input_dir, 'rerun')
os.makedirs(output_dir, exist_ok=True)
output_filepath = os.path.join(
output_dir,
payload_filename
)
print(output_filepath)
payload.to_json(output_filepath)
[docs]def get_parser_rerun_failures(parent=None):
"""Get a parser for rerunning payloads.
Args:
parent (object, optional): Parent parser. Defaults to None.
"""
parser = argparse.ArgumentParser() if parent is None else parent.add_parser('drs_rerun_failures')
parser.description = 'Generate rerun payloads from the .failed files in the input directory'
parser.add_argument('input_dir', type=str, help='Path to .failed files and their payloads.')
parser.set_defaults(func=rerun_failures)
return parser
[docs]def get_parser_generate_user_config(parent=None):
"""Get a parser for generating a set of user config files from the installation directory.
Args:
parent (object, optional): Parent parser. Defaults to None.
"""
parser = argparse.ArgumentParser() if parent is None else parent.add_parser('drs_gen_user_config')
parser.description = 'Copy installation configuration to the user space (backing up anything already there).'
parser.set_defaults(func=adu.generate_user_config)
return parser