Source code for cedar.metadata.tracking

""" Classes and other core tools for tracking & image metadata
"""
from collections import Mapping, defaultdict
import datetime as dt
import json
import logging
from pathlib import Path

import numpy as np

from .core import get_task_metadata
from .. import validation
from ..utils import EE_STATES

logger = logging.getLogger(__name__)

_HERE = Path(__file__).resolve().parent
SCHEMA_TRACKING = _HERE.joinpath('schema_tracking.json')
SCHEMA_IMAGE = _HERE.joinpath('schema_image.json')


[docs]class TrackingMetadata(Mapping): """ CEDAR order tracking metadata Parameters ---------- metadata : dict Tracking metadata information as a dict schema : dict, optional Validate metadata against a specific schema """ def __init__(self, metadata, schema=None): self._metadata = metadata self.schema = schema or self._load_schema() self.validate()
[docs] @classmethod def from_file(cls, filename, schema=None): """ Create from a tracking metadata file """ with open(filename) as src: metadata = json.load(src) return cls(metadata, schema=schema)
[docs] @classmethod def from_json(cls, json_str, schema=None): """ Load tracking metadata from a JSON string """ metadata = json.loads(json_str) return cls(metadata, schema=schema)
# Mapping object requirements def __getitem__(self, key): return self._metadata[key] def __len__(self): return len(self._metadata) def __iter__(self): return iter(self._metadata) # Main sections -- make them available as attributes @property def program(self): return self['program'] @property def submission(self): return self['submission'] @property def tracking(self): return self['tracking'] @property def orders(self): return self['orders'] @property def metadata(self): return self['metadata'] @property def states(self): """ dict: Summary of EE task status for this order """ return summarize_states(self.orders) @property def progress(self): """ float: Percent of EE tasks completed in this order """ states = self.states n = sum(states.values()) n_completed = states.get(EE_STATES.COMPLETED, 0) return n_completed / n if n_completed else 0. @property def complete(self): """ bool: True if all orders have completed """ states = set(self.states) done_or_empty = [state in (EE_STATES.COMPLETED, EE_STATES.EMPTY) for state in states] return all(done_or_empty) @property def tasks(self): """ List[ee.batch.Task]: EarthEngine tasks associated with this order """ from ..utils import get_ee_tasks, load_ee ee_api = load_ee(False) # should initialize elsewhere tasks = get_ee_tasks() order_tasks = [ tasks.get(order['status'].get('id', 'UNKNOWN'), None) for order in self.orders ] return order_tasks
[docs] def update(self, skip_if_missing=True): """ Update the tracking metadata order info against EE task status Parameters ---------- skip_if_missing : bool, optional If True, logs and skips updating order information when the order task cannot be retrieved. Returns ------- self Returns a new instance of this TrackingMetadata with updated info """ updated = [] for task, order in zip(self.tasks, self.orders): id_ = order['status'].get('id', 'UNKNOWN') if task is None: msg = f'Order task id="{id_}" does not exist or has expired' if skip_if_missing: logger.debug(msg) updated.append(order) else: raise ValueError(msg) else: order_ = order.copy() task_info = get_task_metadata(task) order_.update(task_info) updated.append(order_) data = dict(self) data['orders'] = updated return self.__class__(data)
# schema
[docs] def validate(self): validation.validate_with_defaults(self._metadata, self.schema, resolve=SCHEMA_TRACKING.parent)
@staticmethod def _load_schema(filename=SCHEMA_TRACKING): with open(str(filename)) as src: return json.load(src) # repr, printing, etc info def __repr__(self): return repr_tracking(self) def _repr_html_(self): # TODO return "CEDAR Tracking Metadata"
# ----------------------------------------------------------------------------- # String repr
[docs]def repr_tracking(tracking_data, show_program=True, show_submission=True, show_tracking=True, show_states=True, show_runtimes=True, show_orders=None): """ Return string formatted information about CEDAR tracking metadata """ lines = [] if show_program: lines.extend(repr_metadata_program(tracking_data['program'])) if show_submission: lines.extend(repr_tracking_submission(tracking_data['submission'])) if show_tracking: lines.extend(repr_tracking_tracking(tracking_data['tracking'])) # Top level order information is always added lines.extend(repr_tracking_orders(tracking_data['orders'], show_states=show_states, show_runtimes=show_runtimes)) if show_orders is not None: # Handle 'True', which is shortcut to showing all order details if show_orders is True: show_orders = range(len(tracking_data['orders'])) # Otherwise expect sequence of order indexes (1st, 2nd, etc) for order_id in show_orders: order_md = tracking_data['orders'][order_id] order_repr = repr_metadata_order(order_md, f'Order #{order_id}') lines.extend(order_repr) return '\n'.join(lines)
[docs]def repr_metadata_program(info, header='Program Info:'): lines = _indent([ f'* {info["name"]}={info["version"]}', f'* earthengine-api={info["ee"]}' ], n=1) return [header] + lines
[docs]def repr_tracking_submission(info, header='Submission Info:'): tile_indices = [f'({i[0]}, {i[1]})' for i in info['tile_indices']] # Format subsection first so it can be added as a string period_info_str = ['* Period:'] + _indent([ f'* Start: {info["period_start"]}', f'* End: {info["period_end"]}', f'* Freq: {info["period_freq"]}' ], n=2) lines = _indent([ f'* Submitted on {info["submitted"]}', f'* Tile Grid: "{info["tile_grid"]["name"]}"', f'* Tile Indices : {", ".join(tile_indices)}', '\n'.join(period_info_str) ]) return [header] + lines
[docs]def repr_tracking_tracking(info, header='Tracking Info'): lines = _indent([ f'* Name: {info["name"]}', f'* Prefix: {info["prefix"]}', f'* Collections: {", ".join(info["collections"])}', f'* Image template: {info["name_template"]}', f'* Image prefix: {info["prefix_template"]}' ], n=1) return [header] + lines
[docs]def repr_tracking_orders(info, show_states=True, show_runtimes=True, header='Orders'): lines = [f'* Count: {len(info)}'] if show_states: states = summarize_states(info) lines_ = [f'- {state}: {n}' for state, n in states.items()] lines.append('* States:') lines.extend(_indent(lines_, n=1)) if show_runtimes: runtimes = summarize_runtimes(info) mean = _format_runtime(runtimes['mean']) std = _format_runtime(runtimes['std']) line = f'* Runtime: {mean} +/- {std} minutes' lines.append(line) return [header] + _indent(lines)
[docs]def repr_metadata_order(info, header='Order:'): summary = _summarize_order(info) s_runtime = _format_runtime(summary['runtime']) lines = _indent([ f'- Name: {summary["name"]}', f'- Prefix: {summary["prefix"]}', f'- ID: {summary["id"]}', f'- Task state: {summary["state"]}', f'- Runtime: {s_runtime} minutes', f'- Image pieces: {summary["n_images"]}', f'- Output URL: {summary["output_url"]}', ], n=1) return [header] + lines
# ----------------------------------------------------------------------------- # Calculations
[docs]def calculate_order_runtime(start_time, update_time, nan=np.nan): if start_time and update_time: return update_time - start_time elif start_time: now = dt.datetime.now().timestamp() return now - start_time else: return nan
[docs]def summarize_states(orders): """ Returns tallies of order task status """ order_states = defaultdict(lambda: 0) for order in orders: order_states[order['status'].get('state', EE_STATES.EMPTY)] += 1 return dict(order_states)
[docs]def summarize_runtimes(orders): """ Summarize runtimes of order tasks """ runtimes = [] for order in orders: start_ = order['status'].get('start_timestamp_ms', None) update = order['status'].get('update_timestamp_ms', None) runtime = calculate_order_runtime(start_, update) # nan if Nones runtimes.append(runtime) runtimes = np.asarray(runtimes) mask = np.isfinite(runtimes) any_ = mask.any() return { 'runtimes': runtimes, 'total': np.sum(runtimes[mask]) if any_ else np.nan, 'n': runtimes.size, 'mean': np.mean(runtimes[mask]) if any_ else np.nan, 'std': np.std(runtimes[mask]) if any_ else np.nan }
def _summarize_order(order): status = order['status'] start_ = status.get('start_timestamp_ms', None) update = status.get('update_timestamp_ms', None) output_url = status.get('output_url', []) summary = { 'name': order['name'], 'prefix': order['prefix'], 'id': status.get('id', 'UNKNOWN'), 'state': status.get('state', EE_STATES.EMPTY), 'runtime': calculate_order_runtime(start_, update, np.nan), 'n_images': len(output_url), 'output_url': list(set(output_url)) } return summary # Helpers def _indent(lines, n=1, length=4): return [' ' * n * length + line for line in lines] def _format_runtime(time_ms): if time_ms: return '{0:02.2f}'.format(time_ms / 60. / 1000.) else: return 'nan'