Source code for axiom.drs.payload

"""Class to represent a JSON payload."""
import enum
import json
import axiom.schemas as axs
import axiom.utilities as au


[docs]class Payload: """Class to represent a Payload object. Args: input_files (str): Globbable path to input files. output_directory (str): Output directory. start_year (int) : Start year. end_year (int): End year. output_frequency (str): Output frequency. project (str): Project key from projects.json. model (str): Model key from models.json. domain (str): Domain key from domains.json. variables (list, optional): List of variables to process. Defaults to []. **kwargs : Additional key/value pairs added as a metadata. Attributes: input_files (str): Globbable path to input files. output_directory (str): Destination output directory (DRS built from here). project (str): Project key from projects.json. model (str): Model key from models.json. domain (str): Domain key from domains.json. start_year (int): Start year. end_year (int): End year. variables (list): List of variable names to process. output_frequency (str): Output frequency/resolution of data. extra (dict): Additional metadata key/value pairs. """ def __init__(self, input_files, output_directory, start_year, end_year, output_frequency, project, model, domain, variables=[], batch=1, **kwargs): self.input_files = input_files self.output_directory = output_directory self.project = project self.model = model self.domain = domain self.start_year = start_year self.end_year = end_year self.variables = variables self.output_frequency = output_frequency self.batch = batch self.extra = kwargs
[docs] def get_filename(self): """Generate a filename for this payload. Returns: str: Filename Examples: >>> filename = payload.get_filename() """ bbb = str(self.batch).zfill(3) return f'payload.{self.start_year}.{self.output_frequency}.{bbb}.json'
[docs] def to_dict(self): """Convert the Payload object to a dictionary. Returns: dict: Dictionary representation of the Payload. Examples: >>> payload.to_dict() """ _dict = self.__dict__.copy() extra = _dict.pop('extra') _dict.update(extra) return _dict
[docs] def from_dict(d): """Create a Payload from a dictionary. Args: d (dict): Dictionary. Returns: axiom.drs.Payload: Payload object. Examples: >>> payload = Payload.from_dict(d) """ return Payload(**d)
[docs] def to_json(self, filepath=None): """Serialize the Payload to the filepath (save as JSON). Args: filepath (str, optional): Filepath to write. Defaults to None (Writes default filename to current directory). Examples: >>> payload.to_json('payload.json') """ if filepath is None: filepath = self.get_filename() _dict = self.to_dict() with open(filepath, 'w') as f: f.write(json.dumps(_dict, indent=4))
[docs] def from_json(filepath): """Instantiate a payload from a json filepath. Args: filepath (str): Path. Returns: axiom.drs.Payload: Payload. Examples: >>> payload = Payload.from_json('payload.json') """ d = json.load(open(filepath, 'r')) return Payload.from_dict(d)
[docs]def generate_payloads(input_files, output_directory, start_year, end_year, project, model, domain, variables=None, schema=None, output_frequencies=['1H', '6H', '1D', '1M'], num_batches=1, **extra): """Generate payload files. Args: input_files (str): Globbable path to input files. output_directory (str): Destination filepath, DRS will be built from here. start_year (int): Start year. end_year (int): End year. project (str): Project key from projects.json. model (str): Model key from models.json. domain (str): Domain key from domains.json. variables (list(str), optional): List of variables to project. Defaults to None (read all from schema). schema (str): Schema name or filepath. output_frequencies (list(str), optional): List of output frequencies. Defaults to ['1H', '6H', '1D', '1M']. num_batches (int, optional): Number of batches to split processing into. Defaults to 1. **extra : Key/value pairs added as additional metadata. Returns: list : A list of payload objects. """ payloads = list() # Load the variables from the schema if not provided if variables is None: schema = axs.load_schema(schema) variables = list(schema['variables'].keys()) # Batch if required (could be a single batch) batches = au.batch_split(variables, num_batches) for year in range(start_year, end_year+1): for output_frequency in output_frequencies: for batch_ix, batch in enumerate(batches): # Generate a payload payload = Payload( input_files=input_files, output_directory=output_directory, project=project, model=model, domain=domain, start_year=year, end_year=year, variables=batch, output_frequency=output_frequency, batch=batch_ix, **extra ) payloads.append(payload) return payloads