""" Handle chunks/chunksize related logic
Chunks vs chunksizes::
"Chunks" refers to a collection of chunk sizes organized by dimension
* e.g., ``{'time': (3, 3, 3, 1, )}``
* For :py:class:`dask.array.Array` and :py:class:`xarray.DataArray`,
``.chunks`` is a tuple
* :py:class:`xarray.Dataset` ``.chunks`` is a mapping
"Chunksizes" refers to a scalar size (an integer) organized by dimension
* e.g., ``{'time': 3}``
* ``chunksizes`` is used in encoding for NetCDF4 xarray backend
"""
from collections import Counter, OrderedDict, defaultdict
from functools import singledispatch
import logging
from pathlib import Path
import xarray as xr
from ..utils import register_multi_singledispatch
logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------------
# Read chunks from files
[docs]def read_chunks(filename, variables=None):
""" Return chunks associated with each variable if possible
Parameters
----------
filename : str
Read chunks from this file
variables : Sequence
Subset of variables to retrieve ``chunking`` for
Returns
-------
Mapping[str, Mapping[str, int]]
Mapping of variable names to chunks. Chunks are stored
mapping dimension name to chunksize (e.g., ``{'x': 250}``)
Raises
------
ValueError
Raised if no chunks can be determined (unknown file format, etc.)
"""
read_funcs = (read_chunks_netcdf4, )
for func in read_funcs:
try:
var_chunks = func(filename, variables=variables)
except Exception as e:
logger.debug(f'Could not determine chunks for "{filename}" '
f'using "{func.__name__}"', exc_info=True)
else:
return var_chunks
raise ValueError(f'Could not determine chunks for "{filename}"')
[docs]def read_chunks_netcdf4(filename, variables=None):
""" Return chunks associated with each variable
Parameters
----------
filename : str
Filename of NetCDF file
variables : Sequence
Subset of variables to retrieve `chunking` for
Returns
-------
Mapping[str, Mapping[str, int]]
Mapping of variable names to chunks. Chunks are stored
mapping dimension name to chunksize (e.g., ``{'x': 250}``)
"""
# Keep this import inside incase user doesn't have library
# (e.g., with a minimal install of xarray)
from netCDF4 import Dataset
logger.debug(f'Opening "{filename}" as a `netCDF4.Dataset` to read '
'saved chunksizes')
with Dataset(filename, mode='r') as nc:
# Store info on each chunk: what vars use, and how many
chunks = OrderedDict()
variables = variables or nc.variables.keys()
for name in variables:
var = nc.variables[name]
dims = var.dimensions
chunking = var.chunking()
if isinstance(chunking, list):
chunking = OrderedDict((
(_dim, _chunk) for _dim, _chunk in
zip(dims, chunking)
if not _dim.startswith('string')
))
else:
chunking = None
chunks[name] = chunking
return chunks
[docs]def read_chunks_rasterio(riods):
""" Returns chunks for rasterio dataset formatted for xarray
Parameters
----------
riods : str, pathlib.Path, or rasterio.DatasetReader
Rasterio dataset or path to dataset
Returns
-------
dict
Chunks as expected by xarray (e.g., ``{'x': 50, 'y': 50}``)
"""
if isinstance(riods, (str, Path, )):
# Keep this import inside incase user doesn't have library
# (e.g., with a minimal install of xarray)
import rasterio
# Open it for ourselves
with rasterio.open(str(riods), 'r') as riods:
return read_chunks_rasterio(riods)
chunks = riods.block_shapes
if len(set(chunks)) != 1:
warnings.warn('Block shapes inconsistent across bands. '
'Using block shapes from first band')
chunks = dict(y=chunks[0][0], x=chunks[0][1])
return chunks
# ----------------------------------------------------------------------------
# Chunk heuristics
[docs]def best_chunksizes(chunks, tiebreaker=max):
"""Decide which chunksize to use for each dimension from variables
Parameters
----------
chunks : Mapping[str, Mapping[str, int]]
Mapping of variable names to variable chunksizes
tiebreaker : callable, optional
Controls what chunksize should be used for a dimension in the event
of a tie. For example, if 3 variables had a chunksize of 250 and
another 3 had a chunksize of 500, the guess is determined by
``callable([250, 500])``. By default, prefer the larger chunksize
(i.e., :py:func:`max`)
Returns
-------
dict
Chunksize per dimension
Examples
--------
>>> chunks = {
... 'blu': {'x': 5, 'y': 5},
... 'grn': {'x': 5, 'y': 10},
... 'red': {'x': 5, 'y': 5},
... 'ordinal': None
}
>>> best_chunksizes(chunks)
{'x': 5, 'y': 5}
"""
# Collect all chunksizes as {dim: [chunksizes, ...]}
dim_chunks = defaultdict(list)
for var, var_chunks in chunks.items():
if var_chunks:
# Guard if chunks/chunksizes
dims = list(var_chunks.keys())
chunksizes = chunks_to_chunksizes(var_chunks)
for dim, chunksize in zip(dims, chunksizes):
dim_chunks[dim].append(chunksize)
guess = {}
for dim, chunksizes in dim_chunks.items():
# Use most frequently used chunksize
counter = Counter(chunksizes)
max_n = max(counter.values())
max_val = tuple(k for k, v in counter.items() if v == max_n)
# If multiple, prefer biggest value (by default)
if len(max_val) > 1:
logger.debug('Multiple common chunksizes found. Breaking tie using'
f'`{tiebreaker}`')
pick = tiebreaker(max_val)
else:
pick = max_val[0]
logger.debug(f'Guessing value "{pick}" for dim "{dim}"')
guess[dim] = pick
return guess
[docs]def auto_determine_chunks(filename):
""" Try to guess the best chunksizes for a filename
Parameters
----------
filename : str
File to read
Returns
-------
dict
Best guess for chunksizes to use for each dimension
"""
try:
var_chunks = read_chunks(str(filename))
except ValueError:
logger.debug('"auto" chunk determination failed')
chunks = None
else:
chunks = best_chunksizes(var_chunks)
return chunks
# ----------------------------------------------------------------------------
# Chunk format handling
[docs]@singledispatch
def get_chunksizes(xarr):
""" Return the chunk sizes used for each dimension in `xarr`
Parameters
----------
xarr : xr.DataArray or xr.Dataset
Chunked data
Returns
-------
dict
Dimensions (keys) and chunk sizes (values)
Raises
------
TypeError
Raised if input is not a Dataset or DataArray
"""
raise TypeError('Input `xarr` must be an xarray Dataset or DataArray, '
f'not "{type(xarr)}"')
@get_chunksizes.register(xr.DataArray)
def _get_chunksizes_dataarray(xarr):
if not xarr.chunks:
return {}
return OrderedDict((
(dim, xarr.chunks[i][0]) for i, dim in enumerate(xarr.dims)
))
@get_chunksizes.register(xr.Dataset)
def _get_chunksizes_dataset(xarr):
if not xarr.chunks:
return {}
return OrderedDict((
(dim, chunks[0]) for dim, chunks in xarr.chunks.items()
))
[docs]@singledispatch
def chunks_to_chunksizes(data, dims=None):
""" Convert an object to chunksizes (i.e., used in encoding)
Parameters
----------
data : xarray.DataArray, dict, or xarray.Dataset
Input data containing chunk information
dims : Sequence[str], optional
Optionally, provide the order in which dimension chunksizes
should be returned. Useful when asking for chunksizes from
not-necessarily-ordered data (dicts and Datasets)
Returns
-------
tuple
Chunk sizes for each dimension. Returns an empty tuple if there
are no chunks.
"""
raise TypeError(f'Unknown type for input ``data`` "{type(data)}"')
_DICTS = (dict, xr.core.utils.FrozenOrderedDict)
@register_multi_singledispatch(chunks_to_chunksizes, _DICTS)
def _chunks_to_chunksizes_dict(data, dims=None):
dims_ = dims or data.keys()
return tuple(data[d] if isinstance(data[d], int) else data[d][0]
for d in dims_)
@chunks_to_chunksizes.register(xr.Dataset)
def _chunks_to_chunksizes_dataset(data, dims=None):
if not data.chunks:
return ()
return _chunks_to_chunksizes_dict(data.chunks, dims=dims)
@chunks_to_chunksizes.register(xr.DataArray)
def _chunks_to_chunksizes_dataarray(data, dims=None):
if not data.chunks:
return ()
if dims:
dim_idx = [data.dims.index(d) for d in dims]
else:
dim_idx = dims or range(len(data.chunks))
return tuple(data.chunks[i][0] for i in dim_idx)