Coverage for cedar/stores/gdrive.py : 25%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
""" Helper utilities for using Google Drive """
# TODO: rehash what these should be and document
""" Return Google Drive API service, either by credentials or file
Parameters ---------- credentials : google.oauth2.credentials.Credentials User credentials, including access token, for using the application.
Returns ------- googleapiclient.discovery.Resource GDrive v3 API resource """ service = build('drive', 'v3', credentials=credentials) return service
no_browser=True): """ Get OAuth2 Credentials for Google Drive
Parameters ---------- client_secrets_file : str or Path Filename of "client_secrets.[...].json" file credentials_file : str or Path Filename of user credentials to load, or to save to for future use. If not provided, will use default location. no_browser : bool, optional Disables opening a web browser in favor of prompting user to authenticate using a terminal prompt.
Returns ------- credentials : google.oauth2.credentials.Credentials User credentials, including access token, for using the application. credentials_file : str File storing user credentials """ # Locate authentication files, checking default places client_secrets_file_, credentials_file_ = find_credentials( client_secrets_file, credentials_file)
creds = None if credentials_file_ and os.path.exists(credentials_file_): logger.debug('Trying to load previous credentials file...') creds = load_credentials(credentials_file_)
if not creds or not creds.valid: # Try refreshing if creds and not creds.expired and creds.refresh_token: logger.debug('Trying to refresh credentials token...') creds.refresh(Request()) else: # Load from file if not client_secrets_file_: raise ValueError('Missing `client_secrets` or path to a ' 'valid `client_secrets_file` file needed ' 'to authenticate user')
logger.debug('Opening local web server to authenticate..') flow = InstalledAppFlow.from_client_secrets_file( client_secrets_file_, SCOPES)
flow_kwds = { # Enable offline access so we can refresh access token without # reprompting user for permission. 'access_type': 'offline' } if no_browser: creds = flow.run_console(**flow_kwds) else: creds = flow.run_local_server(**flow_kwds)
# Save for next time logger.debug(f'Saving Google API credentials to {credentials_file_}') save_credentials(creds, credentials_file_)
return creds, credentials_file_
""" Store GEE "pre-ARD" images and metadata on Google Drive
Parameters ---------- service : googleapiclient.discovery.Resource Google Drive API service """ assert isinstance(service, Resource) self.service = service
""" Create and/or load credentials and create the store
Parameters ---------- client_secrets_file : str or Path Filename of "client_secrets.[...].json" file credentials_file : str or Path Filename of user credentials to load, or to save to for future use. If not provided, will use default location. """ creds, creds_file = get_credentials( client_secrets_file=client_secrets_file, credentials_file=credentials_file) gdrive = build_gdrive_service(credentials=creds) return cls(gdrive)
""" List stored images or metadata
Parameters ---------- path : str, optional Prefix path to search within pattern : str, optional Filename pattern
Returns ------- list[str] Names of stored data """ parent_id = _path_to_parent_id(self.service, path) info = list_objects(self.service, parent_id=parent_id, name=pattern) return [i['name'] for i in info]
""" Store JSON metadata
Parameters ---------- metadata : dict Metadata, to be saved as JSON name : str Name of file/object to store path : str, optional Parent directory for file/object stored
Returns ------- str ID of file uploaded """ if not name.endswith('.json'): name += '.json'
if path is not None: parent_id = mkdir(self.service, path, check=True)
meta_id = upload_json(self.service, metadata, name, path=path, check=True) return meta_id
""" Create ee.batch.Task to create and store "pre-ARD"
Parameters ---------- image : ee.Image Earth Engine image to compute & store name : str Name of file/object to store path : str, optional Parent directory for file/object stored export_image_kwds : dict, optional Additional keyword arguments to pass onto :py:meth:`ee.batch.Export.image.toCloudStorage` (hint: ``scale`` & ``crs``)
Returns ------- ee.Task Earth Engine Task """ # TODO: warn / fail if `path` looks nested... # Make parent directory -- currently GEE interprets this directory as # a single, non-nested directory. So, "PREARD/LT05" and "PREARD/LE07" # won't be separate directories under "PREARD/", but two directories # stored in the root of your drive. # As such, we use mkdir here (for now?) if path is not None: parent_id = mkdir(self.service, path, check=True)
# Canonicalized: # folder -> driveFolder # fileNamePrefix, path -> driveFileNamePrefix task = ee.batch.Export.image.toDrive( image, description=name, fileNamePrefix=name, driveFolder=path, **export_image_kwds ) return task
# Find parent ID if provided & list objects parent_id = _path_to_parent_id(self.service, path) query = list(list_objects(self.service, parent_id=parent_id, name=name))
# Can't pattern search on GDrive, so limit by extension query_ = [q for q in query if q['name'].endswith(ext)]
for i, result in enumerate(query_): id_, result_name = result['id'], result['name'] msg = f'{i}/{len(query_)} - "{result_name}"'
dest_ = Path(dest).joinpath(result_name) if not dest_.exists() or overwrite: logger.debug(f'Downloading {msg}') dst = download_file_id(self.service, id_, dest_) else: logger.debug(f'Already downloaded {msg}')
yield dest_
""" Retrieve (pieces of) an image from the Google Drive
Parameters ---------- dest : str Destination folder to save image(s) name : str Name of stored file/object path : str, optional Parent directory for file/object stored on Google Drive
Yields ------ Sequence[str] Filename(s) corresponding to retrieved data """ return self._retrieve_extension(dest, name, '.tif', path=path, overwrite=overwrite)
""" Retrieve image metadata from the GCS
Parameters ---------- dest : str Destination folder to save metadata name : str Name of stored file/object path : str, optional Parent directory for file/object stored
Yields ------ pathlib.Path Filename corresponding to retrieved data """ return self._retrieve_extension(dest, name, '.json', path=path, overwrite=overwrite)
""" Read and parse JSON metadata into a dict
Parameters ---------- name : str Filename of metadata to read path : str, optional Parent directory for file/object stored
Returns ------- dict JSON metadata """ parent_id = _path_to_parent_id(self.service, path) return read_json(self.service, name, parent_id=parent_id)
""" Remove a file from Google Drive
Parameters ---------- name : str Name of stored file/object path : str, optional Parent directory for file/object stored
Returns ------- str Name of file removed """ parent_id = _path_to_parent_id(self.service, path) return delete(self.service, name, parent_id=parent_id)
""" Returns private "appProperties" to include when creating files """ return { 'application': __name__.split('.')[0], 'version': __version__ }
""" Build a query component for ``appProperties`` """ q = ('appProperties has {key="application" and value="%s"}' % get_appProperties()['application']) return q
encoding=METADATA_ENCODING): """ Upload JSON data to Google Drive
Parameters ---------- service : googleapiclient.discovery.Resource Google API resource for GDrive v3 data : str or dict JSON, either already dumped to str or as a dict name : str Filename for data path : str, optional Parent directory to put file. If ``None``, stores in root of Google Drive check : bool, optional Check to see if the file already exists first. If so, will overwrite (or "update" instead of "create") encoding : str, optional Metadata encoding
Returns ------- str ID of file uploaded """ # Dump to JSON if needed if isinstance(data, dict): data = json.dumps(data, indent=2) if not isinstance(data, bytes): data = data.encode(encoding)
# Find folder ID for parent directory parent_id = _path_to_parent_id(service, path)
# Prepare metadata body & data body = { 'name': name, 'parents': [parent_id], 'mimeType': 'text/plain', 'appProperties': get_appProperties() } content = io.BytesIO(data) media = MediaIoBaseUpload(content, 'text/plain', resumable=True)
# Check to see if file already exists... if check: file_id = exists(service, name, parent_id=parent_id) else: file_id = ''
# Update or create if check and file_id: req = service.files().update(fileId=file_id, media_body=media) else: req = service.files().create(body=body, media_body=media, fields='id') meta = req.execute()
return meta['id']
def download_file_id(service, file_id, dest): """ Download a file to a destination directory using its ID
Parameters ---------- service : googleapiclient.discovery.Resource Google API resource for GDrive v3 file_id : str ID of file on Google Drive dest : str or pathlib.Path Destination filename
Returns ------- pathlib.Path Filename written to
Raises ------ FileExistsError Raised if file exists in destination but not allowed to overwrite, ValueError Raised if the file given does not exist in Google Drive """ # Create destination if needed dest = Path(dest) if not dest.parent.exists(): dest.parent.mkdir(parents=True, exist_ok=True)
# Download... request = service.files().get_media(fileId=file_id)
suffix = f'.tmp.{socket.gethostname()}{os.getpid()}' with renamed_upon_completion(dest, suffix=suffix) as tmp: with open(str(tmp), 'wb') as dst: downloader = MediaIoBaseDownload(dst, request) done = False # TODO: would be nice to have _some_ logging indicator of dl speed while done is False: status, done = downloader.next_chunk()
return dest
""" Download a file to a destination directory
Parameters ---------- service : googleapiclient.discovery.Resource Google API resource for GDrive v3 dest : str Name of destination parent_id : str, optional Parent ID of folder containing file (to narrow search)
Returns ------- pathlib.Path Filename written to
Raises ------ FileExistsError Raised if file exists in destination but not allowed to overwrite, ValueError Raised if the file given does not exist in Google Drive """ # Find the file name_id = exists(service, name, parent_id=parent_id) if not name_id: raise ValueError(f'File "{name}" not found on Google Drive')
return download_file_id(service, name_id, dest)
def _read_json_id(service, file_id): request = service.files().get_media(fileId=file_id) with io.BytesIO() as fh: downloader = MediaIoBaseDownload(fh, request) done = False while done is False: status, done = downloader.next_chunk() fh.seek(0) data = fh.read() return json.loads(data, encoding=METADATA_ENCODING)
""" Reads and returns a JSON file from Google Drive
Parameters ---------- service : googleapiclient.discovery.Resource Google API resource for GDrive v3 name : str Name of file/folder parent_id : str, optional Parent ID of folder containing file (to narrow search)
Returns ------- dict JSON data as a dict """ name_id = exists(service, name, parent_id=parent_id) if not name_id: raise ValueError(f'File "{name}" not found on Google Drive') return _read_json_id(service, name_id)
""" Make a directory, recursively
Parameters ---------- service : googleapiclient.discovery.Resource Google API resource for GDrive v3 dest : str Directory to create
Returns ------- str Google Drive ID for directory created (or already existing) """ paths = dest.split('/', 1) if len(paths) == 1: # root name_id = exists(service, paths[0], parent_id=parent_id) if not name_id: return mkdir(service, paths[0], parent_id=parent_id) else: # already exists return name_id else: name, paths_ = paths[0], paths[1:] name_id = exists(service, name, parent_id=parent_id, directory=True) if not name_id: name_id = mkdir(service, name, parent_id=parent_id) dest_ = '/'.join(paths_) return mkdir_p(service, dest_, parent_id=name_id)
""" Make a directory on GDrive
Parameters ---------- service : googleapiclient.discovery.Resource Google API resource for GDrive v3 name : str Directory to create parent_id : str, optional ID of parent directory check : bool, optional Check if directory exists before creating. If exists, will not create a new directory and instead return the ID of this directory
Returns ------- str Google Drive ID for directory created (or already existing) """ if check: name_id = exists(service, name, parent_id=parent_id, directory=True) if name_id: logger.debug('Not creating new directory; already exists') return name_id
logger.debug(f'Creating directory {name}') meta = {'name': name, 'mimeType': MIME_TYPE_DIRECTORY} if parent_id is not None: meta['parents'] = [parent_id]
dir_ = service.files().create(body=meta, fields='id').execute() return dir_['id']
# TODO: maybe also make this have a timeout for memoized keys def inner(*args, **kwds): # don't use "service" as key key = (args[1], kwds.get('parent_id', None), kwds.get('directory', False), kwds.get('trashed', False), ) if key in cache: return cache[key] else: ans = func(*args, **kwds) # only cache "hits" (id != '') if ans: cache[key] = ans return ans
appProperties=defaults.GDRIVE_USE_APPPROPERTIES): """ Check if file/folder exists
Parameters ---------- service : googleapiclient.discovery.Resource Google API resource for GDrive v3 name : str Name of file/folder parent_id : str, optional Parent ID of folder containing file (to narrow search) directory : bool, optional True if file needs to also be a directory trashed : bool, optional Search in the trash? appProperties : bool Search for application-specific files using ``appProperties``
Returns ------- str Returns object ID if exists, otherwise empty string """ q = [] if directory: q.append(f'mimeType = "{MIME_TYPE_DIRECTORY}"') if appProperties: q.append(query_appProperties())
results = list(list_objects(service, parent_id=parent_id, name=name, q=q, appProperties=appProperties)) n_results = len(results)
if n_results == 1: return results[0]['id'] elif n_results > 1: match_name = [r['name'] == name for r in results] if any(match_name): idx = match_name.index(True) return results[idx]['id'] else: id_ = results[0]['id'] logger.debug(f'Returning first of {n_results} matches -- id={id_}') return id_ else: return ''
# TODO: memoize appProperties=defaults.GDRIVE_USE_APPPROPERTIES): """ List files/folders on Google Drive
Parameters ---------- service : googleapiclient.discovery.Resource Google API resource for GDrive v3 parent_id : str, optional Parent ID of folder to list (to narrow search) name : str, optional Name to search for (don't include asterisks) q : str or Sequence[str], optional Additional search query parameters appProperties : bool Search for application-specific files using ``appProperties``
Yields ------ list[dict] Info about objects stored on Google Drive (keys=('id', 'name', )) """ # Check for passed query if isinstance(q, str): query = [q] elif isinstance(q, (list, tuple)): query = list(q) else: query = []
# Build up query query.append("trashed = false") if parent_id: query.append(f"'{parent_id}' in parents") if name: query.append(f"name contains '{name}'") if appProperties: query.append(query_appProperties())
# Combine query_ = ' and '.join(query) logger.debug(f'Searching for query: "{query_}"')
# Execute in pages page_token = None while True: resp = service.files().list(q=query_, spaces='drive', orderBy=_ORDER_BY, fields='nextPageToken, files(id, name)', pageToken=page_token).execute() for file_ in resp.get('files', []): yield file_ page_token = resp.get('nextPageToken', None) if page_token is None: break
""" List folders on Google Drive
Parameters ---------- service : googleapiclient.discovery.Resource Google API resource for GDrive v3 parent_id : str, optional Parent ID of folder to list (to narrow search) name : str, optional Name to search for (don't include asterisks)
Yields ------ list[dict] Info about objects stored on Google Drive (keys=('id', 'name', )) """ q = [f'mimeType = "{MIME_TYPE_DIRECTORY}"'] return list_objects(service, parent_id=parent_id, name=name, q=q)
appProperties=defaults.GDRIVE_USE_APPPROPERTIES): """ Delete a file/folder on Google Drive
Parameters ---------- service : googleapiclient.discovery.Resource Google API resource for GDrive v3 name : str Name of file/folder parent_id : str, optional Parent ID of folder containing file (to narrow search) appProperties : bool Search for application-specific files using ``appProperties``
Returns ------- str ID of deleted file/folder """ name_id = exists(service, name, parent_id=parent_id) resp = service.files().delete(fileId=name_id).execute() return name_id
if path is None: return None else: # Never search using appProperties # (e.g., in case GEE created the export folder) parent_id = exists(service, path, directory=True, appProperties=False) if not parent_id: raise FileNotFoundError( f'Cannot find prefix path provided "{path}" on Google Drive') return parent_id
# ============================================================================= # OAuth helpers 'client_id', 'client_secret', 'scopes']
assert isinstance(d, dict) token = d['token'] kwds = {k: v for k, v in d.items() if k != 'token'} creds = Credentials(token, **kwds) return creds
creds_ = { k: getattr(creds, k, None) for k in _OAUTH2_CREDS } return creds_
""" Load Google OAuth2 credentials from file
Returns ------- google.oauth2.credentials.Credentials """ with open(filename) as src: data = json.load(src) creds = _dict_to_creds(data) return creds
""" Save Google OAuth2 credentials
Parameters ---------- credentials : google.oauth2.credentials.Credentials Google OAuth2 credentials filename : str or Path Path to save credentials """ creds_ = _creds_to_dict(credentials)
filename = Path(filename) filename.parent.mkdir(exist_ok=True, parents=True)
suffix = f'.tmp.{socket.gethostname()}{os.getpid()}' with renamed_upon_completion(filename, suffix=suffix) as tmp: with open(str(tmp), 'w') as dst: json.dump(creds_, dst, indent=2, sort_keys=False) utils.set_file_urw(tmp)
""" Locate GDrive client secrets & credentials files """ def _check(fname, exists=True): dname, bname = os.path.dirname(fname), os.path.basename(fname) dirs_ = defaults.CEDAR_ROOT_CONFIG.copy() if dname: dirs_.insert(0, dname) paths = [os.path.join(d, bname) for d in dirs_] return utils.get_file(*paths, exists=exists)
client_secrets_file_ = _check(client_secrets_file or _CLIENT_SECRETS, True) credentials_file_ = _check(credentials_file or _USER_CREDS, False)
return client_secrets_file_, credentials_file_ |