Coverage for cedar/metadata/tracking.py : 41%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
""" Classes and other core tools for tracking & image metadata """
""" CEDAR order tracking metadata
Parameters ---------- metadata : dict Tracking metadata information as a dict schema : dict, optional Validate metadata against a specific schema """
""" Create from a tracking metadata file """
""" Load tracking metadata from a JSON string """ metadata = json.loads(json_str) return cls(metadata, schema=schema)
# Mapping object requirements
# Main sections -- make them available as attributes def program(self):
def submission(self):
def tracking(self):
def orders(self):
def metadata(self):
def states(self): """ dict: Summary of EE task status for this order """ return summarize_states(self.orders)
def progress(self): """ float: Percent of EE tasks completed in this order """ states = self.states n = sum(states.values()) n_completed = states.get(EE_STATES.COMPLETED, 0) return n_completed / n if n_completed else 0.
def complete(self): """ bool: True if all orders have completed """ states = set(self.states) done_or_empty = [state in (EE_STATES.COMPLETED, EE_STATES.EMPTY) for state in states] return all(done_or_empty)
def tasks(self): """ List[ee.batch.Task]: EarthEngine tasks associated with this order """ from ..utils import get_ee_tasks, load_ee ee_api = load_ee(False) # should initialize elsewhere tasks = get_ee_tasks() order_tasks = [ tasks.get(order['status'].get('id', 'UNKNOWN'), None) for order in self.orders ] return order_tasks
""" Update the tracking metadata order info against EE task status
Parameters ---------- skip_if_missing : bool, optional If True, logs and skips updating order information when the order task cannot be retrieved.
Returns ------- self Returns a new instance of this TrackingMetadata with updated info """ updated = [] for task, order in zip(self.tasks, self.orders): id_ = order['status'].get('id', 'UNKNOWN') if task is None: msg = f'Order task id="{id_}" does not exist or has expired' if skip_if_missing: logger.debug(msg) updated.append(order) else: raise ValueError(msg) else: order_ = order.copy() task_info = get_task_metadata(task) order_.update(task_info) updated.append(order_)
data = dict(self) data['orders'] = updated
return self.__class__(data)
# schema resolve=SCHEMA_TRACKING.parent)
# repr, printing, etc info return repr_tracking(self)
# TODO return "CEDAR Tracking Metadata"
# ----------------------------------------------------------------------------- # String repr show_program=True, show_submission=True, show_tracking=True, show_states=True, show_runtimes=True, show_orders=None): """ Return string formatted information about CEDAR tracking metadata """ lines = []
if show_program: lines.extend(repr_metadata_program(tracking_data['program'])) if show_submission: lines.extend(repr_tracking_submission(tracking_data['submission'])) if show_tracking: lines.extend(repr_tracking_tracking(tracking_data['tracking']))
# Top level order information is always added lines.extend(repr_tracking_orders(tracking_data['orders'], show_states=show_states, show_runtimes=show_runtimes))
if show_orders is not None: # Handle 'True', which is shortcut to showing all order details if show_orders is True: show_orders = range(len(tracking_data['orders']))
# Otherwise expect sequence of order indexes (1st, 2nd, etc) for order_id in show_orders: order_md = tracking_data['orders'][order_id] order_repr = repr_metadata_order(order_md, f'Order #{order_id}') lines.extend(order_repr)
return '\n'.join(lines)
lines = _indent([ f'* {info["name"]}={info["version"]}', f'* earthengine-api={info["ee"]}' ], n=1) return [header] + lines
tile_indices = [f'({i[0]}, {i[1]})' for i in info['tile_indices']]
# Format subsection first so it can be added as a string period_info_str = ['* Period:'] + _indent([ f'* Start: {info["period_start"]}', f'* End: {info["period_end"]}', f'* Freq: {info["period_freq"]}' ], n=2)
lines = _indent([ f'* Submitted on {info["submitted"]}', f'* Tile Grid: "{info["tile_grid"]["name"]}"', f'* Tile Indices : {", ".join(tile_indices)}', '\n'.join(period_info_str) ]) return [header] + lines
lines = _indent([ f'* Name: {info["name"]}', f'* Prefix: {info["prefix"]}', f'* Collections: {", ".join(info["collections"])}', f'* Image template: {info["name_template"]}', f'* Image prefix: {info["prefix_template"]}' ], n=1) return [header] + lines
header='Orders'): lines = [f'* Count: {len(info)}'] if show_states: states = summarize_states(info) lines_ = [f'- {state}: {n}' for state, n in states.items()] lines.append('* States:') lines.extend(_indent(lines_, n=1))
if show_runtimes: runtimes = summarize_runtimes(info) mean = _format_runtime(runtimes['mean']) std = _format_runtime(runtimes['std']) line = f'* Runtime: {mean} +/- {std} minutes' lines.append(line)
return [header] + _indent(lines)
summary = _summarize_order(info) s_runtime = _format_runtime(summary['runtime']) lines = _indent([ f'- Name: {summary["name"]}', f'- Prefix: {summary["prefix"]}', f'- ID: {summary["id"]}', f'- Task state: {summary["state"]}', f'- Runtime: {s_runtime} minutes', f'- Image pieces: {summary["n_images"]}', f'- Output URL: {summary["output_url"]}', ], n=1) return [header] + lines
# ----------------------------------------------------------------------------- # Calculations if start_time and update_time: return update_time - start_time elif start_time: now = dt.datetime.now().timestamp() return now - start_time else: return nan
""" Returns tallies of order task status """
""" Summarize runtimes of order tasks """ runtimes = [] for order in orders: start_ = order['status'].get('start_timestamp_ms', None) update = order['status'].get('update_timestamp_ms', None) runtime = calculate_order_runtime(start_, update) # nan if Nones runtimes.append(runtime)
runtimes = np.asarray(runtimes) mask = np.isfinite(runtimes) any_ = mask.any()
return { 'runtimes': runtimes, 'total': np.sum(runtimes[mask]) if any_ else np.nan, 'n': runtimes.size, 'mean': np.mean(runtimes[mask]) if any_ else np.nan, 'std': np.std(runtimes[mask]) if any_ else np.nan }
status = order['status'] start_ = status.get('start_timestamp_ms', None) update = status.get('update_timestamp_ms', None) output_url = status.get('output_url', []) summary = { 'name': order['name'], 'prefix': order['prefix'], 'id': status.get('id', 'UNKNOWN'), 'state': status.get('state', EE_STATES.EMPTY), 'runtime': calculate_order_runtime(start_, update, np.nan), 'n_images': len(output_url), 'output_url': list(set(output_url)) } return summary
# Helpers return [' ' * n * length + line for line in lines]
if time_ms: return '{0:02.2f}'.format(time_ms / 60. / 1000.) else: return 'nan' |