Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

""" 

""" 

import collections 

import contextlib 

import errno 

import fnmatch 

import functools 

import importlib 

import logging 

import os 

from pathlib import Path 

import re 

import shutil 

 

import numpy as np 

 

logger = logging.getLogger(__name__) 

 

 

# ============================================================================ 

# STANDARD DATATYPE HELPERS 

def to_number(string): 

""" Convert string to most appropriate number 

""" 

try: 

n = int(string) 

except ValueError: 

n = float(string) 

return n 

 

 

def list_like(obj): 

""" Return True if ``obj`` is list-like 

 

List-like includes lists, tuples, and numpy.ndarrays, but not 

other sequences like str or Mapping. 

 

Parameters 

---------- 

obj : object 

An object 

 

Returns 

------- 

bool 

True if ``obj`` is a list (sequence but not str) 

""" 

return (hasattr(obj, '__iter__') 

and not isinstance(obj, collections.abc.Mapping) 

and not isinstance(obj, str)) 

 

 

def squeeze(l): 

""" Squeeze Sequences with 1 item into a scalar 

 

Parameters 

---------- 

l : Sequence 

List, tuple, etc 

 

Returns 

------- 

object 

Either original object if ``len(l) != 1`` else ``l[0]`` 

""" 

return l[0] if len(l) == 1 else l 

 

 

def concat_lists(l): 

""" Concatenate all list-like items in ``l`` 

 

Parameters 

---------- 

l : list or tuple 

Sequence (but not a str) 

 

Yields 

------ 

list 

Concatenated list items 

 

See Also 

-------- 

toolz.concat 

Similar function, but concatenates all iterables 

flatten_lists 

Recursive version of this function 

""" 

for ele in l: 

if list_like(ele): 

for i in ele: 

yield i 

else: 

yield ele 

 

 

def flatten_lists(l): 

""" Flatten all list-like items in ``l`` 

 

Parameters 

---------- 

l : list or tuple 

Sequence (but not a str) 

 

Yields 

------ 

list 

Flattened list items 

""" 

for ele in l: 

if list_like(ele): 

for i in flatten_lists(ele): 

yield i 

else: 

yield ele 

 

 

def update_nested(d, other): 

""" Update a potentially nested dict with another 

 

Parameters 

---------- 

d : dict 

Dictionary to update 

other : dict 

Other dict with replacement values 

 

Returns 

------- 

dict 

Updated dict 

""" 

d_ = d.copy() 

for k, v in other.items(): 

d_v = d.get(k, {}) 

if (isinstance(v, collections.abc.Mapping) and 

isinstance(d_v, collections.abc.Mapping)): 

d_[k] = update_nested(d.get(k, {}) or {}, v) 

else: 

d_[k] = v 

return d_ 

 

 

class FrozenKeyDict(collections.abc.MutableMapping): 

""" A dict that doesn't allow new keys 

""" 

 

def __init__(self, *args, **kwds): 

self._data = collections.OrderedDict(*args, **kwds) 

super(FrozenKeyDict, self).__init__() 

 

def __getitem__(self, key): 

return self._data[key] 

 

def __setitem__(self, key, value): 

if key not in self: 

raise KeyError('Cannot create new keys') 

self._data[key] = value 

 

def __delitem__(self, key): 

raise KeyError('Cannot delete keys') 

 

def __iter__(self): 

return iter(self._data) 

 

def __len__(self): 

return len(self._data) 

 

def __repr__(self): 

return '%s(%s)' % (self.__class__.__name__, repr(self._data)) 

 

def copy(self): 

return FrozenKeyDict(**self) 

 

 

# ============================================================================ 

# STANDARD LIBRARY HELPERS 

# ============================================================================ 

def cached_property(prop): 

""" Cache a class property (e.g., that requires a lookup) 

""" 

prop_name = f'_{prop.__name__}' 

 

@functools.wraps(prop) 

def wrapper(self): 

if not hasattr(self, prop_name): 

setattr(self, prop_name, prop(self)) 

return getattr(self, prop_name) 

 

return property(wrapper) 

 

 

def register_multi_singledispatch(func, types): 

""" Register multiple types for singledispatch 

 

Parameters 

---------- 

func : callable 

Function 

types : tuple 

Multiple types to register 

 

Returns 

------- 

func : callable 

Decorated function 

""" 

if not hasattr(func, 'registry'): 

raise TypeError("Function must be dispatchable (missing " 

"`func.registry` from wrapping with `singledispatch`)") 

 

def decorate(dispatch_func): 

for type_ in types: 

dispatch_func = func.register(type_, dispatch_func) 

return dispatch_func 

 

return decorate 

 

 

# ============================================================================ 

# MODULE HELPERS 

# ============================================================================ 

def find_subclasses(cls_): 

""" Find subclasses of an object 

 

Parameters 

---------- 

cls_ : class 

A Python class 

 

Returns 

------- 

set[class] 

Classes that inherit from ``cls_`` 

""" 

subcls = set() 

for sub in cls_.__subclasses__(): 

subsub = find_subclasses(sub) 

subcls.add(sub) 

subcls.update(subsub) 

return subcls 

 

 

def import_object(string): 

""" Import a Python module/class/function from its string 

 

Parameters 

---------- 

string : str 

A Python object path 

 

Returns 

------- 

object 

The imported object (class/module/function) 

""" 

parts = string.split('.') 

module, item = '.'.join(parts[:-1]), parts[-1] 

try: 

mod = importlib.import_module(module) 

return getattr(mod, item) 

except (ImportError, AttributeError) as e: 

logger.exception(f'Could not import and find object "{string}"') 

raise 

 

 

# ============================================================================ 

# NUMPY HELPERS 

# ============================================================================ 

def np_promote_all_types(*dtypes): 

""" Return the largest NumPy datatype required to hold all types 

 

Parameters 

---------- 

dtypes : iterable 

NumPy datatypes to promote 

 

Returns 

------- 

np.dtype 

Smallest NumPy datatype required to store all input datatypes 

 

See Also 

-------- 

np.promote_types 

""" 

dtype = dtypes[0] 

if not all([dt == dtype for dt in dtypes[1:]]): 

logger.debug('Promoting memory allocation to largest datatype of ' 

'source bands') 

for _dtype in dtypes[1:]: 

dtype = np.promote_types(dtype, _dtype) 

return dtype 

 

 

def dtype_info(arr): 

"""Return integer or float dtype info 

 

Parameters 

---------- 

arr : np.ndarray 

Array 

 

Returns 

------- 

numpy.core.getlimits.finfo or numpy.core.getlimits.iinfo 

NumPy type information 

 

Raises 

------ 

TypeError 

Raised if ``arr`` is not a int or float type 

""" 

if arr.dtype.kind == 'i': 

return np.iinfo(arr.dtype) 

elif arr.dtype.kind == 'f': 

return np.finfo(arr.dtype) 

else: 

raise TypeError('Only valid for NumPy int or float ' 

f'(got "{arr.dtype.kind}")') 

 

 

# ============================================================================ 

# FILE HELPERS 

# ============================================================================ 

def find(location, pattern, regex=False): 

""" Return a sorted list of files matching pattern 

 

Parameters 

---------- 

location : str or pathlib.Path 

Directory location to search 

pattern : str 

Search pattern for files 

regex : bool 

True if ``pattern`` is a regular expression 

 

Returns 

-------- 

list 

List of file paths for files found 

 

""" 

if not regex: 

pattern = fnmatch.translate(pattern) 

regex = re.compile(pattern) 

 

files = [] 

for root, dirnames, filenames in os.walk(str(location)): 

for filename in filenames: 

if regex.search(filename): 

files.append(os.path.join(root, filename)) 

 

return sorted(files) 

 

 

def relative_to(one, two): 

""" Return the relative path of a file compared to another 

 

Parameters 

---------- 

one : str or Path 

File to return relative path for 

two : str or Path 

File ``one`` will be relative to 

 

Returns 

------- 

Path 

Relative path of ``one`` 

""" 

one, two = Path(one), Path(two).absolute() 

 

# Ensure could be a file (is_file or doesn't exist yet) 

assert one.is_file() or not one.exists() 

assert two.is_file() or not two.exists() 

 

root = os.path.abspath(os.sep) 

for parent in one.absolute().parents: 

if parent in two.parents: 

root = parent 

break 

 

fwd = one.absolute().relative_to(root) 

bwd = ('..', ) * (len(two.relative_to(root).parents) - 1) 

 

return Path('.').joinpath(*bwd).joinpath(fwd) 

 

 

@contextlib.contextmanager 

def renamed_upon_completion(destination, tmpdir=None, prefix='', suffix=None): 

""" Help save/write to file and move upon completion 

 

Parameters 

---------- 

destination : str or Path 

The final intended location of the file 

tmpdir : str, optional 

By default, this function will yield a temporary filename 

in the same directory as ``destination``, but you may specify 

another location using ``tmpdir``. 

prefix : str, optional 

Characters to prefix the temporary file with 

suffix : str, optional 

Characters to add at the end of the temporary filename. By default 

appends ".tmp." and the process ID 

 

Yields 

------ 

str 

A temporary filename to use during writing/saving/etc 

""" 

destination = Path(destination) 

assert not destination.is_dir() 

 

if tmpdir is None: 

tmpdir = destination.parent 

else: 

tmpdir = Path(tmpdir) 

assert tmpdir.is_dir() 

 

if suffix is None: 

suffix = f'.tmp.{os.getpid()}' 

 

tmpfile = tmpdir.joinpath(f'{prefix}{destination.name}{suffix}') 

 

# Yield tmpfile name for user to use for saving/etc 

logger.debug(f'Providing "{tmpfile}" as write location') 

yield str(tmpfile) 

 

# We're back -- rename/move the tmpfile to ``destination`` 

logger.debug(f'Renaming/moving file {tmpfile}->{destination}') 

# `shutil.move` supports move, or copy if on different device/partition 

shutil.move(str(tmpfile), str(destination))