Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

from collections import OrderedDict 

from functools import singledispatch 

import logging 

 

import numpy as np 

import xarray as xr 

 

from ..utils import list_like 

from .chunk import chunks_to_chunksizes 

 

logger = logging.getLogger(__name__) 

 

 

#: NumPy string types (str, bytes, unicode) 

_NP_STRING_TYPES = (np.str_, np.bytes_, np.unicode_, ) 

 

 

@singledispatch 

def netcdf_encoding(data, 

chunks=None, 

zlib=True, 

complevel=4, 

nodata=None, 

**encoding_kwds): 

""" Return "good" NetCDF encoding information for some data 

 

The returned encoding is the default or "good" known standard for data 

used in ``stems``. Each default determined in this function is given 

as a keyword argument to allow overriding, and you can also pass additional 

encoding items via ``**encoding``. You may pass one override for all 

data, or overrides for each data variable (as a dict). 

 

For more information, see the NetCDF4 documentation for the 

``createVariable`` [1]. 

 

Parameters 

---------- 

data : xr.DataArray or xr.Dataset 

Define encoding for this data. If ``xr.Dataset``, map function across 

all ``xr.DataArray`` in ``data.data_vars`` 

dtype : np.dtype, optional 

The data type used for the encoded data. Defaults to the input data 

type(s), but can be set to facilitate discretization based compression 

(typically alongside scale_factor and _FillValue) 

chunks : None, tuple or dict, optional 

Chunksizes used to encode NetCDF. If given as a `tuple`, chunks should 

be given for each dimension. Chunks for dimensions not specified when 

given as a `dict` will default to 1. Passing ``False`` will not 

use chunks. 

zlib : bool, optional 

Use compression 

complevel : int, optional 

Compression level 

nodata : int, float, or sequence, optional 

NoDataValue(s). Specify one for each ``DataArray`` in ``data`` 

if a :py:class:`xarray.Dataset`. Used for ``_FillValue`` 

encoding_kwds : dict 

Additional encoding data to pass 

 

Returns 

------- 

dict 

Dict mapping band name (e.g., variable name) to relevant encoding 

information 

 

See Also 

-------- 

xarray.Dataset.to_netcdf 

Encoding information designed to be passed to 

:py:meth:`xarray.Dataset.to_netcdf`. 

 

References 

---------- 

.. [1] http://unidata.github.io/netcdf4-python/#netCDF4.Dataset.createVariable 

""" 

raise TypeError(f'Unknown type for input ``data`` "{type(data)}"') 

 

 

@netcdf_encoding.register(xr.DataArray) 

def _netcdf_encoding_dataarray(data, 

dtype=None, 

chunks=None, 

zlib=True, 

complevel=4, 

nodata=None, 

**encoding_kwds): 

name = encoding_name(data) 

encoding = {name: {}} 

 

# dtype: Determine & guard/fixup 

dtype_ = guard_dtype(data, dtype or encoding_dtype(data)) 

assert isinstance(dtype_, dict) 

encoding[name].update(dtype_) 

 

# chunksizes: Determine and guard/fixup 

if chunks is not False: 

chunks = encoding_chunksizes(data, chunks=chunks) 

chunks = guard_chunksizes_str(data, guard_chunksizes(data, chunks)) 

assert isinstance(chunks, tuple) 

assert all(isinstance(i, int) for i in chunks) 

if chunks: 

encoding[name]['chunksizes'] = tuple(chunks) 

 

# _FillValue 

if nodata is not None: 

assert isinstance(nodata, (float, int, np.integer, np.floating)) 

encoding[name]['_FillValue'] = nodata 

 

# complevel & zlib: compression 

encoding[name]['complevel'] = complevel 

encoding[name]['zlib'] = zlib 

 

# Fill in user input 

encoding[name].update(encoding_kwds) 

 

return encoding 

 

 

@netcdf_encoding.register(xr.Dataset) 

def _netcdf_encoding_dataset(data, 

dtype=None, 

chunks=None, 

zlib=True, 

complevel=4, 

nodata=None, 

**encoding_kwds): 

encoding = OrderedDict() 

for var in data.data_vars: 

# Allow user to specify local (key exists) or global (KeyError) kwds 

kwds = encoding_kwds.get(var, encoding_kwds) 

 

# Construct, unpacking if needed 

var_encoding = _netcdf_encoding_dataarray( 

data[var], 

dtype=dtype[var] if _is_dict(dtype) else dtype, 

chunks=chunks[var] if _has_key(chunks, var) else chunks, 

zlib=zlib[var] if _is_dict(zlib) else zlib, 

complevel=complevel[var] if _is_dict(complevel) else complevel, 

nodata=nodata[var] if _is_dict(nodata) else nodata, 

**kwds 

) 

encoding[var] = var_encoding[var] 

 

return encoding 

 

 

# ---------------------------------------------------------------------------- 

# Encoding components for DataArray(s) 

def encoding_name(xarr): 

""" Return the name of the variable to provide encoding for 

 

Either returns the name of the DataArray, or the name that XArray will 

assign it when writing to disk 

(:py:data:`xarray.backends.api.DATAARRAY_VARIABLE`). 

 

Parameters 

---------- 

xarr : xarray.DataArray 

Provide the name of this DataArray used for encoding 

 

Returns 

------- 

str 

Encoding variable name 

""" 

return xarr.name or xr.backends.api.DATAARRAY_VARIABLE 

 

 

def encoding_dtype(xarr): 

"""Get dtype encoding info 

 

Parameters 

---------- 

xarr : xarray.DataArray or np.ndarray 

DataArray to consider 

 

Returns 

------- 

dict[str, np.dtype] 

Datatype information for encoding (e.g., ``{'dtype': np.float32}``) 

 

""" 

return {'dtype': xarr.dtype} 

 

 

def encoding_chunksizes(xarr, chunks=None): 

""" Find/resolve chunksize for a DataArray 

 

Parameters 

---------- 

xarr : xarray.DataArray 

DataArray to consider 

chunks : tuple[int] or Mapping[str, int] 

Chunks per dimension 

 

Returns 

------- 

tuple[int] 

Chunksizes per dimension 

""" 

if chunks is None: 

# Grab chunks from DataArray 

chunksize = chunks_to_chunksizes(xarr) 

elif isinstance(chunks, dict): 

# Default to 1 chunk per dimension if none found 

chunksize = tuple(chunks.get(dim, len(xarr.coords[dim])) 

for dim in xarr.dims) 

return chunksize 

 

 

# ---------------------------------------------------------------------------- 

# Encoding checks, safeguards, and fixes 

def guard_chunksizes(xarr, chunksizes): 

""" Guard chunksize to be <= dimension sizes 

 

Parameters 

---------- 

xarr : xarray.DataArray 

DataArray to consider 

chunksizes : tuple[int] 

Chunks per dimension 

 

Returns 

------- 

tuple[int] 

Guarded chunksizes 

""" 

assert isinstance(xarr, xr.DataArray) and list_like(chunksizes) 

assert len(chunksizes) == xarr.ndim or not chunksizes 

 

chunksizes_ = [] 

for csize, dim, size in zip(chunksizes, xarr.dims, xarr.shape): 

if csize > size: 

logger.warning(f'Chunk size for dim {dim} is larger than dim ' 

f'size ({size}). Resetting ({csize}->{size})') 

chunksizes_.append(size) 

elif csize == -1: # shortcut for length, but doesn't work in encoding 

chunksizes_.append(size) 

else: 

chunksizes_.append(csize) 

 

return tuple(chunksizes_) 

 

 

def guard_chunksizes_str(xarr, chunksizes): 

"""Guard chunk sizes for str datatypes 

 

Chunks for ``str`` need to include string length dimension since 

python-netcdf represents, for example, 1d char array as 2d array 

 

Parameters 

---------- 

xarr : xarray.DataArray 

DataArray to consider 

chunksizes : tuple[int] 

Chunk sizes per dimension 

 

Returns 

------- 

tuple[int] 

Guarded chunk sizes 

""" 

if xarr.dtype.type in _NP_STRING_TYPES and chunksizes is not None: 

if len(chunksizes) == xarr.ndim: 

logger.debug('Adding chunk size to `str` dtype variable ' 

f'"{xarr.name}" corresponding to the string length') 

chunksizes = chunksizes + (xarr.dtype.itemsize, ) 

elif xarr.dtype.kind == 'O': 

logger.debug('Removing chunks for `dtype=object` variable ' 

f'"{xarr.name}"') 

chunksizes = () 

 

return chunksizes 

 

 

def guard_dtype(xarr, dtype_): 

"""Guard dtype encoding for datetime datatypes 

 

Parameters 

---------- 

xarr : xarray.DataArray or np.ndarray 

DataArray to consider 

dtype_ : dict[str, np.dtype] 

Datatype information for encoding (e.g., ``{'dtype': np.float32}``) 

 

Returns 

------- 

dict[str, np.dtype] 

Datatype information for encoding (e.g., ``{'dtype': np.float32}``), 

if valid. Otherwise returns empty dict 

""" 

# Don't encode datatype for datetime types, since xarray changes it 

if xarr.dtype.kind == 'M': 

dtype_ = {} 

return dtype_ 

 

 

def _is_dict(x): 

return isinstance(x, dict) 

 

 

def _has_key(d, k): 

return _is_dict(d) and k in d