""" Helper utilities for using Google Cloud Storage
"""
import fnmatch
import functools
import io
import json
import logging
import os
from pathlib import Path
import re
from google.cloud import storage
from google.oauth2 import service_account
import ee
from stems.utils import renamed_upon_completion
logger = logging.getLogger(__name__)
_RE_FILE = re.compile(r'.*(?<!\/)$')
METADATA_ENCODING = 'utf-8'
SCOPES = [
'https://www.googleapis.com/auth/devstorage.read_write'
]
ENVVAR_CREDENTIALS = 'GOOGLE_APPLICATION_CREDENTIALS'
ENVVAR_PROJECT = 'GOOGLE_CLOUD_PROJECT'
[docs]def build_gcs_client(credentials=None, project=None):
""" Return a Google Cloud Store API service client
Parameters
----------
credentials : str, optional
File name of Google Cloud Store credentials (typically from a service
account)
project : str, optional
Google Cloud Platform project to use
Returns
-------
google.cloud.storage.Client
Client for the Google Cloud Storage client library
Notes
-----
You might consider setting the envirnment variable
``GOOGLE_APPLICATION_CREDENTIALS`` with the path to your service account
credentials file [1]_.
References
----------
.. [1] https://cloud.google.com/storage/docs/reference/libraries#setting_up_authentication
"""
if credentials is None and ENVVAR_CREDENTIALS in os.environ:
credentials = os.environ[ENVVAR_CREDENTIALS]
logger.debug('Got GCS credentials from environment variable')
if project is None and ENVVAR_PROJECT in os.environ:
project = os.environ[ENVVAR_PROJECT]
logger.debug('Got GCS project from environment variable')
credentials = service_account.Credentials.from_service_account_file(
credentials,
scopes=SCOPES)
client = storage.Client(project=project, credentials=credentials)
return client
[docs]class GCSStore(object):
""" Store GEE "pre-ARD" images and metadata on Google Cloud Storage
Parameters
----------
client : google.cloud.storage.client.Client
GCS client
bucket : google.cloud.storage.bucket.Bucket
GCS bucket
"""
def __init__(self, client, bucket):
assert isinstance(client, storage.Client)
assert isinstance(bucket, storage.Bucket)
self.client = client
self.bucket = bucket
[docs] @classmethod
def from_credentials(cls, bucket_name, credentials=None, project=None):
""" Load Google Cloud Storage credentials and create store
"""
client = build_gcs_client(credentials=credentials, project=project)
bucket = client.get_bucket(bucket_name)
return cls(client, bucket)
[docs] def list(self, path=None, pattern=None):
""" List stored images or metadata
Parameters
----------
path : str, optional
Prefix path to search within
pattern : str, optional
Filename pattern
Returns
-------
list[str]
Names of stored data
"""
blobs = list_blobs(self.bucket, prefix=path, pattern=pattern)
return [blob.name for blob in blobs]
[docs] def store_image(self, image, name, path=None, **export_image_kwds):
""" Create ee.batch.Task to create and store "pre-ARD"
Parameters
----------
image : ee.Image
Earth Engine image to compute & store
name : str
Name of file/object to store
path : str, optional
Parent directory for file/object stored
export_image_kwds : dict, optional
Additional keyword arguments to pass onto
:py:meth:`ee.batch.Export.image.toCloudStorage` (hint: ``scale`` & ``crs``)
Returns
-------
ee.Task
Earth Engine Task
"""
# Make parent directory
fullname, basename, path_ = _combine_name_path(name, path)
path_ = mkdir_p(self.bucket, path_)
# Create compute/store export task
# Canonicalized:
# bucket -> outputBucket
# fileNamePrefix, path -> outputPrefix
task = ee.batch.Export.image.toCloudStorage(
image,
bucket=self.bucket.name,
description=basename,
fileNamePrefix=fullname,
**export_image_kwds
)
return task
def _retrieve_extension(self, dest, name, ext, path=None, overwrite=True):
if not name.endswith(ext):
name += f'*{ext}'
blobs = list_blobs(self.bucket, prefix=path, pattern=name)
logger.debug(f'Found {len(blobs)} blobs matching image name/prefix')
dests = []
for blob in blobs:
blob_name = blob.name.split('/')[-1]
msg = f'{i}/{len(blobs)} - "{blob_name}"'
dest_ = dest.joinpath(blob_name)
if not dest_.exists() or overwrite:
logger.debug(f'Downloading {msg}')
dst_ = download_blob(blob, dest_)
else:
logger.debug(f'Already downloaded {msg}')
yield dest_
[docs] def retrieve_image(self, dest, name, path=None, overwrite=True):
""" Retrieve (pieces of) an image from the GCS
Parameters
----------
dest : str
Destination folder to save image(s)
name : str
Name of stored file/object
path : str, optional
Parent directory for file/object stored on GCS
Yields
------
Sequence[str]
Filename(s) corresponding to retrieved data
"""
return _retrieve_extension(dest, name, 'tif', path=path,
overwrite=overwrite)
[docs] def remove(self, name, path=None):
""" Remove a file from GCS
Parameters
----------
name : str
Name of stored file/object
path : str, optional
Parent directory for file/object stored
Returns
-------
str
Name of file removed
"""
fullname, _, _ = _combine_name_path(name, path)
return delete_blob(self.bucket, fullname)
[docs]def upload_json(bucket, data, path, check=False, encoding=METADATA_ENCODING):
""" Upload data as JSON to GCS
Parameters
----------
bucket : google.cloud.storage.bucket.Bucket
GCS bucket
data : str or dict
JSON, either already dumped to str or as a dict
path : str
Destination path on GCS for data
check : bool, optional
Check to see if the file already exists first. If so, will
overwrite (or "update" instead of "create")
encoding : str, optional
Metadata encoding
Returns
-------
google.cloud.storage.blob.Blob
JSON as GCS blob
"""
blob = bucket.blob(path)
# Dump dict to JSON str and/or encode if needed
if isinstance(data, dict):
data = json.dumps(data, indent=2)
if not isinstance(data, bytes):
data = data.encode(encoding)
blob.upload_from_string(data, content_type='application/json')
return blob
[docs]def read_json(blob, encoding=METADATA_ENCODING):
""" Read a blob of JSON string data into a dict
Parameters
----------
blob : google.cloud.storage.blob.Blob
Blob to read, decode, and load
encoding : str, optional
Metadata encoding
Returns
-------
dict
Blob read, decoded, and loaded as a dict
"""
data = blob.download_as_string().decode(encoding)
return json.loads(data)
[docs]def mkdir_p(bucket, path):
""" Create a "directory" on GCS
Parameters
----------
bucket : str or google.cloud.storage.bucket.Bucket
Bucket or bucket name
path : str
Path to folder
Returns
-------
google.cloud.storage.blob.Blob
GCS blob for directory created
Notes
-----
Directories don't really exist on GCS but we can fake it [1]_.
References
----------
.. [1] https://cloud.google.com/storage/docs/gsutil/addlhelp/HowSubdirectoriesWork
"""
paths = path.rstrip('/').split('/')
blob_dir = None
for i in range(len(paths)):
path_ = _format_dirpath('/'.join(paths[:i + 1]))
if not exists(bucket, path_):
logger.debug(f'Creating "directory" on GCS "{path_}"')
blob_dir = mkdir(bucket, path_)
else:
logger.debug(f'Path {path_} already exists...')
return blob_dir
[docs]def mkdir(bucket, path):
""" Make a directory, recursively
Parameters
----------
bucket : str or google.cloud.storage.bucket.Bucket
Bucket or bucket name
path : str
Path to folder
Returns
-------
google.cloud.storage.blob.Blob
GCS blob for directory created
"""
path_ = _format_dirpath(path)
blob = bucket.blob(path_)
content_type = 'application/x-www-form-urlencoded;charset=UTF-8'
blob.upload_from_string('', content_type=content_type)
return path
[docs]def exists(bucket, path):
""" Check if file/folder exists
Parameters
----------
bucket : str or google.cloud.storage.bucket.Bucket
Bucket or bucket name
path : str
Path to file/folder
Returns
-------
bool
True or False if exists
"""
blob = storage.Blob(path, bucket)
return blob.exists()
[docs]def delete_blob(bucket, path):
""" Delete a GCS blob
Parameters
----------
bucket : str or google.cloud.storage.bucket.Bucket
Bucket or bucket name
path : str
Path to file/folder
Returns
-------
path
Name of deleted blob
"""
blob = bucket.blob(path)
blob.delete()
return blob.name
[docs]def list_dirs(bucket, prefix=None):
""" Return "directory" blobs within a on GCS
Parameters
----------
bucket : str or google.cloud.storage.bucket.Bucket
Bucket or bucket name
prefix : str
List contents within this folder
Returns
-------
Sequence[str]
List of directories names inside at ``prefix``
"""
prefix = _format_dirpath(prefix) if prefix else None
blobs = bucket.list_blobs(prefix=prefix, delimiter='/')
prefixes = set()
for page in blobs.pages:
prefixes.update(page.prefixes)
return list(prefixes)
[docs]def list_blobs(bucket, prefix=None, pattern=None):
""" Return file/non-directory blobs within a on GCS
Parameters
----------
bucket : str or google.cloud.storage.bucket.Bucket
Bucket or bucket name
prefix : str
List contents within this folder
pattern : str, optional
Filter search by glob pattern (i.e., ``*.json``)
Returns
-------
list[google.cloud.storage.blob.Blob]
List of blob files inside at ``prefix``
"""
if prefix:
prefix = _format_dirpath(prefix)
blobs = bucket.list_blobs(prefix=prefix)
blobs = [blob for blob in blobs if _RE_FILE.match(blob.name)]
if pattern:
re_pattern = re.compile(fnmatch.translate(pattern))
blobs = [blob for blob in blobs
if re_pattern.match(_blob_basename(blob, prefix))]
return blobs
[docs]def download_blob(blob, dest):
""" Download a blob to a destination directory
Parameters
----------
blob : google.cloud.storage.blob.Blob
GCS blob to download
dest : str
Local directory to download blob into
Returns
-------
pathlib.Path
Filename written to
"""
dest = Path(dest)
if not dest.exists():
dest.parent.mkdir(parents=True, exist_ok=True)
suffix = f'.tmp.{socket.gethostname()}.{os.getpid()}'
with renamed_upon_completion(dest, suffix=suffix) as tmp:
blob.download_to_filename(tmp)
return dest
def _format_dirpath(path):
return path if path.endswith('/') else path + '/'
def _blob_basename(blob, prefix):
if prefix and blob.name.startswith(prefix):
return blob.name[len(prefix):]
else:
return blob.name
def _combine_name_path(name, prefix=None):
# Entire filename
if prefix is None:
parts = name.split('/')
prefix_ = '/'.join(parts[:-1])
basename = parts[-1]
return name, basename, _format_dirpath(prefix_)
# Filename + prefix
else:
assert '/' not in name # should just be a file
prefix_ = _format_dirpath(prefix)
return prefix_ + name, name, prefix_