Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

""" STEMS CLI components 

""" 

import datetime as dt 

import logging 

import re 

 

import click 

 

from stems.executor import setup_backend, setup_executor 

 

logger = logging.getLogger(__name__) 

 

 

# ============================================================================= 

# CONSTANTS 

_RE_NUMBER = re.compile(r'[-+]?\d*\.\d+|\d+') 

_TYPE_FILE = click.Path(exists=True, readable=True, 

dir_okay=False, resolve_path=True) 

_KEY_DATE_FORMAT = 'date_format' 

 

 

# ============================================================================ 

# CALLBACKS 

def cb_dict(ctx, param, value): 

""" Call back for dict style arguments (e.g., KEY=VALUE) 

""" 

d = {} 

for val in value or []: 

if '=' not in val: 

raise click.BadParameter( 

f'Must specify "{param}" as KEY=VALUE ({value} given)' 

) 

else: 

k, v = val.split('=', 1) 

d[k] = v 

return d 

 

 

def cb_time(ctx, param, value): 

""" Callback for parsing to a ``datetime`` with ``opt_date_format`` 

""" 

if value is None: 

return value 

 

_format = ctx.params[_KEY_DATE_FORMAT] 

try: 

time = dt.datetime.strptime(value, _format) 

except KeyError: 

raise click.ClickException( 

f'Need to use `--{_KEY_DATE_FORMAT}` when using `cb_time`.') 

except Exception as e: 

raise click.BadParameter( 

f'Cannot parse "{value}" to date with format "{_format}"') 

else: 

return time 

 

 

def cb_bounds(ctx, param, value): 

""" Callback to create a BoundingBox 

""" 

if value is None: 

return value 

 

try: 

bbox = _RE_NUMBER.findall(value) 

if len(bbox) != 4: 

raise ValueError( 

f'Did not parse 4 numbers from input "{value}" (got "{bbox}")') 

bbox = [float(i) for i in bbox] 

except Exception as e: 

raise click.BadParameter( 

f'Cannot parse "{value}" to a BoundingBox: {e}') 

else: 

from rasterio.coords import BoundingBox 

return BoundingBox(*bbox) 

 

 

def _param_name_job_id_total(param): 

if isinstance(param, click.Option): 

return '"--job_id"', '"--job_total"' 

else: 

return '"job_id"'.upper(), '"job_total"'.upper() 

 

 

def _cb_job_id(ctx, param, value): 

s_job_id, s_job_total = _param_name_job_id_total(param) 

if value is None: 

return 1 

elif value <= 0: 

raise click.BadParameter(f'{s_job_id} must be at least 1 (>=1)') 

else: 

total = ctx.params.get('job_total', 1) 

if value > total: 

raise click.BadParameter(f'{s_job_id} is larger than {s_job_total} ' 

f'({value} > {total})') 

else: 

return value 

 

 

def _cb_job_total(ctx, param, value): 

s_job_id, s_job_total = _param_name_job_id_total(param) 

if value <= 0: 

raise click.BadParameter(f'{s_job_total} must be at least 1') 

else: 

return value 

 

 

# ============================================================================ 

# ARGS 

arg_config_file = click.argument('config', nargs=1, type=_TYPE_FILE) 

 

arg_job_id = click.argument('job_id', nargs=1, callback=_cb_job_id, 

type=click.INT) 

arg_job_total = click.argument('job_total', nargs=1, callback=_cb_job_total, 

type=click.INT) 

 

 

# ============================================================================ 

# OPTIONS 

 

# Job ID / Total as options 

opt_job_id = click.option('--job_id', type=click.INT, callback=_cb_job_id, 

help='Job ID (out of ``--job_total`` workers)') 

opt_job_total = click.option('--job_total', type=click.INT, is_eager=True, 

callback=_cb_job_total, 

default=1, show_default=True, 

help='Total number of jobs running this script') 

 

 

opt_verbose = click.option('--verbose', '-v', count=True, help='Be verbose') 

opt_quiet = click.option('--quiet', '-q', count=True, help='Be quiet') 

 

opt_bounds = click.option( 

'--bounds', default=None, callback=cb_bounds, 

help='BoundingBox : left, bottom, right, top' 

) 

opt_date_format = click.option( 

'--%s' % _KEY_DATE_FORMAT, default='%Y-%m-%d', 

show_default=True, is_eager=True, 

help='Format string for dates' 

) 

opt_nodata = click.option( 

'--nodata', '--ndv', 'nodata', 

default=-9999, type=float, 

show_default=True, help='NoDataValue' 

) 

 

# ============================================================================= 

# Dask Distributed Executor / Client 

EXECUTORS = ('sync', 'threads', 'processes', 'distributed', ) 

 

 

def cb_executor(ctx, param, value): 

""" Callback for returning a Distributed client 

 

TODO 

---- 

* Can we find if there's no subcommand, and just skip? 

""" 

executor, workers_or_ip = value 

 

# Handle requests for threads/processes 

client = None 

if executor in ('threads', 'processes', 'sync', ): 

setup_backend(executor, workers_or_ip) 

elif executor == 'distributed': 

try: 

workers = int(workers_or_ip) 

except ValueError: 

address = workers_or_ip 

workers = None 

else: 

address = None 

client = setup_executor(address=address, n_workers=workers) 

 

if client: 

def close_scheduler(): 

if client is not None: 

try: 

client.close(timeout=10) 

except Exception as e: 

logger.debug(f'Exception closing executor client: {e}') 

else: 

logger.debug('Closed executor client') 

else: 

logger.debug('No client to close') 

ctx.call_on_close(close_scheduler) 

 

return client 

 

 

opt_executor = click.option( 

'--executor', type=(click.Choice(EXECUTORS), str), 

default=('sync', None), show_default=True, 

callback=cb_executor, 

help=( 

'Configure parallel processing options for Dask locally ("sync", ' 

'"threads", or "processes") or using Distributed ("distributed"). ' 

'Must provide either worker count or scheduler address (ip:port).' 

) 

)