Source code for cedar.tracker

""" Tracker to submit and download GEE pre-ARD tasks
"""
from collections import defaultdict
import datetime as dt
import itertools
import logging
import os
from pathlib import Path
import string

import ee
import pandas as pd

from stems.gis.grids import TileGrid, Tile

from . import defaults, ordering, utils
from .exceptions import EmptyCollectionError
from .metadata import TrackingMetadata, get_submission_info

logger = logging.getLogger(__name__)

_STR_FORMATTER = string.Formatter()


[docs]class Tracker(object): """ CEDAR "pre-ARD" order tracker Parameters ---------- tile_grid : stems.gis.grids.TileGrid Tile Grid to use for ARD store : cedar.stores.Store A Store that can be used to store images & metadata name_template : str, optional Template for "pre-ARD" image and metadata name prefix_template : str, optional Template for "pre-ARD" image and metadata prefix tracking_template : str, optional Template for order tracking file name tracking_prefix : str, optional Order tracking file prefix folder filters : Dict[str, Sequence[dict or ee.Filter]] Earth Engine filters to apply, organized by image collection name. Values should either be ``ee.Filter`` objects or dictionaries that describe the filter (see :py:func:`cedar.utils.serialize_filter`) """ def __init__(self, tile_grid, store, name_template=defaults.PREARD_NAME, prefix_template=defaults.PREARD_PREFIX, tracking_template=defaults.PREARD_TRACKING, tracking_prefix=defaults.PREARD_TRACKING_PREFIX, filters=None, export_image_kwds=None): assert isinstance(tile_grid, TileGrid) self.tile_grid = tile_grid self.store = store self.name_template = name_template self.prefix_template = prefix_template self.tracking_template = tracking_template self.tracking_prefix = tracking_prefix self._filters = filters or defaultdict(list) self.export_image_kwds = export_image_kwds or {} @property def filters(self): """ list[ee.Filter]: Earth Engine filters to apply """ # Convert from dict as needed from .utils import create_filters return { image_collection: create_filters(filters) for image_collection, filters in self._filters.items() }
[docs] def submit(self, collections, tile_indices, period_start, period_end, period_freq=None, save_empty_metadata=True, error_if_empty=False): """ Submit and track GEE pre-ARD tasks Parameters ---------- collections: str or Sequence[str] GEE image collection name(s) tile_indices : Sequence[(int, int)] Tuple(s) of rows/columns in TileGrid to process period_start : dt.datetime Starting period date period_end : dt.datetime Ending period date period_freq : str, optional If provided, ``period_start``, ``period_end``, and ``period_freq`` are interpeted as the range for :py:func:`pandas.date_range` and one or more Tasks will be submitted save_empty_metadata : bool, optional If True, Pre-ARD image requests that have 0 results (e.g., because of spotty historical record) will store metadata, but will not start the task. If False, will not store this metadata error_if_empty : bool, optional If True, raise an EmptyCollectionError if the image collection result has no images. The default behavior is to log and skip empty search results Returns ------- str Task tracking information name str Task tracking information identifier (an ID, path, etc) """ # TODO: add callback (e.g., for progressbar) # TODO: eventually allow start/end to be None (use limits of data) if isinstance(collections, str): collections = (collections, ) assert len(tile_indices) >= 1 if isinstance(tile_indices[0], int): tile_indices = [tile_indices] # Get tiles tiles = [self.tile_grid[index] for index in tile_indices] # Split up period into 1 or more sub-periods if freq is given periods = _parse_date_freq(period_start, period_end, period_freq) logger.debug(f'Creating {len(periods)} ARD slice(s) for date range') # Create tracking name s_tile_indices = [f'h{h:03d}v{v:03d}' for v, h in tile_indices] namespace = { 'collections': collections, 'tiles': tiles, 'tile_indices': '_'.join(s_tile_indices), 'period_start': period_start.isoformat(), 'period_end': period_end.isoformat(), 'period_freq': period_freq, 'now': dt.datetime.now().isoformat() } tracking_name = self.tracking_template.format(**namespace) # Create submission info submission_info = get_submission_info(self.tile_grid, collections, tile_indices, period_start, period_end, period_freq) # Determine parameters for each submission iter_submit = list(itertools.product(collections, tiles, periods)) logger.debug(f'Creating order named "{tracking_name}"') order = ordering.Order( tracking_name, self.tracking_prefix, name_template=self.name_template, prefix_template=self.prefix_template ) # Loop over product of collections, tiles, and dates for collection, tile, (date_start, date_end) in iter_submit: logger.debug( f'Adding "{collection}" - ' f'"h{tile.horizontal:03d}v{tile.vertical:03d} - ' f'{date_start} to {date_end}' ) order.add( collection, tile, date_start, date_end, filters=self.filters.get(collection, []), error_if_empty=error_if_empty ) logger.debug('Submitting order') tracking_id = order.submit( self.store, submission_info=submission_info, save_empty_metadata=save_empty_metadata, export_image_kwds=self.export_image_kwds ) logger.debug(f'Submitted order with name="{tracking_name}" ' f'stored at ID="{tracking_id}"') return tracking_name, tracking_id
[docs] def list(self, pattern=None): """ Return a list of all tracking metadata Parameters ---------- pattern : str, optional Search pattern for tracking info. Specify to subset to specific tracking info (e.g., from some date). If ``None`` provided, looks for tracking information matching :py:attr:`~Tracker.tracking_template` Returns ------- list[str] Name of stored tracking information """ if pattern is None: d = defaultdict(lambda: '*') pattern = self.tracking_template.format_map(d).split('*')[0] return self.store.list(path=self.tracking_prefix, pattern=pattern)
[docs] def read(self, name): """ Returns stored tracking information as dict Parameters ---------- name : str Name of tracking metadata (e.g., taken from running :func:`~Tracker.list_tracking`) Returns ------- dict JSON tracking info data as a dict """ data = self.store.read_metadata(name, path=self.tracking_prefix) return TrackingMetadata(data)
[docs] def update(self, name): """ Refresh and reupload tracking information by checking with the GEE Parameters ---------- name : str Name of tracking metadata (e.g., taken from running :func:`~Tracker.list_tracking`) Returns ------- dict JSON tracking info data as a dict """ tracking_info = self.read(name) updated = tracking_info.update() name_ = self.store.store_metadata(dict(updated), name) return updated
[docs] def download(self, tracking_info, dest, overwrite=True, callback=None): """ Download "pre-ARD" and metadata to a directory Parameters ---------- tracking_info : dict JSON tracking info data as a dict dest : str or pathlib.Path Destination download directory overwrite : bool, optional Overwrite existing downloaded data callback : callable Callback function to execute after each file is downloaded. Should take arguments "item" and "n_steps". Use this for progress bars or other download status reporting Returns ------- tuple[str, list[str]] Key value pairs mapping GEE task IDs to the filenames of downloaded data. Wrap it in a ``dict`` to make it not lazy """ logger.debug(f'Downloading for {len(tracking_info["orders"])} tasks') iter_download = download_tracked(tracking_info, self.store, dest, overwrite=overwrite) downloaded = defaultdict(list) for task_id, n_images, meta, images in iter_download: # Download, report (if callback), and store filenames logger.debug(f'Downloading output for task "{task_id}" ' f'({n_images or "unknown"} images)') for meta_ in meta: if callback: # Metadata doesn't count as a "step" callback(item=task_id, n_steps=0) downloaded[task_id].append(meta_) # We might not know how many images will be downloaded steps_image = 1 / n_images if n_images else 0 for image in images: downloaded[task_id].append(image) if callback: callback(item=image.stem, n_steps=steps_image) # Update for all downloaded images at once if we didn't know # a priori if steps_image == 0 and callback: item = os.path.commonprefix( [p.name for p in downloaded[task_id]]) callback(item=item + '...', n_steps=1) return downloaded
[docs] def clean(self, tracking_info, tracking_name=None, callback=None): """ Clean "pre-ARD" imagery, metadata, and tracking metadata off GCS Parameters ---------- tracking_info : dict JSON tracking info data as a dict tracking_name : str Name of tracking info file (will be deleted if provided) callback : callable Callback function to execute after each file is deleted. Should take arguments "item" and "n_steps". Use this for progress bars or other download status reporting Returns ------- dict[str, list[str]] Mapping of GEE Task ID to filename(s) cleaned """ iter_clean = clean_tracked(tracking_info, self.store) cleaned = defaultdict(list) for task_id, n_images, names in iter_clean: for name in names: if callback: callback(item=task_id, n_steps=1 / n_images) cleaned[task_id].append(name) if tracking_name: self.store.remove(tracking_name, self.tracking_prefix) return cleaned
[docs]def download_tracked(tracking_info, store, dest, overwrite=False): """ Download stored "pre-ARD" and metadata described by tracking info Parameters ---------- tracking_info : dict Tracking information store : cedar.stores.Store cedar store class dest : str or pathlib.Path Destination download directory overwrite : bool, optional Overwrite previously downoaded data, or not Yields ------ id : str Task ID n_images : int Number of images to download if known, otherwise ``None`` metadata : generator Generator that downloads metadata and yields filenames images : generator Generator that downloads imagery and yields filenames """ dest_ = Path(str(dest)) if not dest_.exists(): dest_.mkdir(exist_ok=True, parents=True) else: assert dest_.is_dir() orders = tracking_info['orders'] for order in orders: # Get info about order id_ = order['status'].get('id', None) name, prefix = order['name'], order['prefix'] n_images = len(order['status'].get('output_url', [])) or None # Retrieve image and metadata dst_meta = store.retrieve_metadata(dest, name, prefix, overwrite=overwrite) dst_imgs = store.retrieve_image(dest, name, prefix, overwrite=overwrite) yield (id_, n_images, dst_meta, dst_imgs, )
[docs]def clean_tracked(tracking_info, store): """ Delete stored "pre-ARD" and metadata described by tracking info Parameters ---------- tracking_info : dict Tracking information store : cedar.stores.gcs.GCSStore or cedar.stores.gdrive.GDriveStore cedar store class Yields ------ id : str Order GEE Task ID names : generator Generator that deletes files and returns their names """ orders = tracking_info['orders'] for order in orders: id_ = order['status'].get('id', None) name, prefix = order['name'], order['prefix'] logger.debug(f'Deleting image and metadata for id={id_}, ' f'name="{name}", prefix="{prefix}"') # Retrieve image and metadata names = store.list(path=prefix, pattern=name) yield (id_, len(names), (store.remove(name) for name in names))
def _parse_date_freq(start, end, freq=None): import pandas as pd # hiding because it can be expensive to import start_ = pd.to_datetime(start).to_pydatetime() end_ = pd.to_datetime(end).to_pydatetime() if freq is None: return list(zip([start], [end])) else: # Add offset to make inclusive of end date from pandas.tseries.frequencies import to_offset offset = to_offset(freq) times = pd.date_range(start, end + offset, freq=freq).to_pydatetime() return list(zip(times[:-1], times[1:]))