Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

""" Metadata 

""" 

import contextlib 

import datetime as dt 

import json 

import logging 

from typing import List, Set 

 

import ee 

from stems.gis.grids import Tile 

 

from . import __version__ 

from . import defaults 

from .exceptions import EmptyCollectionError, EmptyOrderError 

from .metadata import (TrackingMetadata, 

get_order_metadata, 

get_program_metadata, 

get_task_metadata, 

get_tracking_metadata) 

 

from .sensors import CREATE_ARD_COLLECTION 

from .utils import EE_STATES 

 

logger = logging.getLogger(__name__) 

 

 

class Order(object): 

""" CEDAR order data 

 

Parameters 

---------- 

tracking_name : str 

Name of the order, which is also used as the name for tracking metadata 

tracking_prefix : str 

Directory or prefix location in storage for tracking data 

name_template : str 

String format template for pre-ARD image and metadata names 

prefix_template : str 

String format template for directory or prefix of pre-ARD 

""" 

def __init__(self, tracking_name, tracking_prefix, 

name_template=None, prefix_template=None): 

self.tracking_name = tracking_name 

self.tracking_prefix = tracking_prefix 

self.name_template = name_template or defaults.PREARD_NAME 

self.prefix_template = prefix_template or defaults.PREARD_PREFIX 

self._items = [] 

 

def __len__(self): 

return len(self._items) 

 

@property 

def collections(self) -> Set[str]: 

return set([item['collection'] for item in self._items]) 

 

@property 

def tiles(self) -> Set[Tile]: 

return set([item['tile'] for item in self._items]) 

 

def add(self, collection, tile, date_start, date_end, filters=None, 

error_if_empty=False): 

""" Add a "pre-ARD" image to the order 

 

Parameters 

========== 

collection : str 

Image collection 

tile : stems.gis.grids.Tile 

Tile to create 

date_start : datetime.datetime 

Starting time for image data 

date_end : datetime.datetime 

Ending time for image data 

filters : list, optional 

Earth Engine filters to apply to image collection before 

creating image 

error_if_empty : bool, optional 

If True, ``Order.add`` and other methods will raise an 

EmptyCollectionError if the image collection result has no images. The 

default behavior is to log, but skip, these empty results 

 

""" 

try: 

# Determine which function should be used for ARD generation 

func_create_ard = CREATE_ARD_COLLECTION[collection] 

logger.debug(f'Using function {func_create_ard}') 

except KeyError as ke: 

raise KeyError('Unknown or unsupported image collection ' 

f'"{collection}".') 

 

# Create image & metadata 

# Image is still "unbounded", but will be given crs, transform, 

# and size on export 

# ``image_metadata`` should have "bands", "nodata", "images" 

image, image_metadata = func_create_ard( 

collection, tile, date_start, date_end, filters=filters) 

 

# Create name/prefix from templates 

namespace = { 

'collection': collection.replace('/', '_'), 

'tile': tile, 

'date_start': date_start.date().isoformat(), 

'date_end': date_end.date().isoformat(), 

'now': dt.datetime.now().isoformat() 

} 

name = self.name_template.format(**namespace) 

prefix = self.prefix_template.format(**namespace) 

 

# Add in tile and order metadata 

if not image_metadata['images'] and error_if_empty: 

raise EmptyCollectionError( 

f'Found 0 images for "{collection}" between ' 

f'{date_start}-{date_end}' 

) 

 

self._items.append({ 

'collection': collection, 

'tile': tile, 

'name': name, 

'prefix': prefix, 

'image': image, 

'image_metadata': image_metadata, 

'date_start': date_start, 

'date_end': date_end, 

'filters': filters 

}) 

 

def submit(self, store, submission_info=None, 

save_empty_metadata=True, 

export_image_kwds=None): 

""" Submit "pre-ARD" for a collection and tile to be processed 

 

Parameters 

---------- 

store : cedar.stores.GDriveStore or cedar.stores.GCSStore 

Storage backend to use 

submission_info : dict, optional 

Information to include in tracking metadata about the submission 

save_empty_metadata : bool, optional 

If True, Pre-ARD image requests that have 0 results (e.g., because 

of spotty historical record) will store metadata, but will not 

start the task. If False, will not store this metadata 

export_image_kwds : dict, optional 

Additional keywords to pass onto ``store.store_image`` 

 

Returns 

------- 

tracking_id : str 

Tracking metadata object identitifer (an object ID for Google Drive 

or a remote path for GCS) 

""" 

if not self._items: 

raise EmptyOrderError( 

'No items in order to submit (see ``Order.add``)' 

) 

 

# Check to make sure names are unique 

self._validate_names() 

 

# Submit items to order 

to_submit = [] 

for item in self._items: 

# Don't save empty results if not okay with 

empty = not item['image_metadata']['images'] 

if empty and not save_empty_metadata: 

continue 

 

# Create metadata about order and submit 

order_metadata = get_order_metadata( 

item['collection'], 

item['date_start'], item['date_end'], 

item['filters'] 

) 

task, item_metadata, item_metadata_id = create_preard_task( 

item['image'], item['image_metadata'], 

item['name'], item['prefix'], 

item['tile'], 

store, 

order_info=order_metadata, 

export_image_kwds=export_image_kwds 

) 

to_submit.append((task, item_metadata, item_metadata_id)) 

 

# Create submission metadata 

# TODO: Use cedar.metadata.TrackingMetadata 

data = { 

'program': get_program_metadata(), 

'submission': submission_info or {}, 

'tracking': get_tracking_metadata(self.tracking_name, 

self.tracking_prefix, 

self.name_template, 

self.prefix_template, 

self.collections, 

self.tiles), 

'orders': [sub[1]['task'] for sub in to_submit], 

'metadata': [sub[2] for sub in to_submit] 

} 

self.tracking_metadata = TrackingMetadata(data) 

 

# Start tasks and save tracking metadata 

for task, task_metadata, task_metadata_id in to_submit: 

# Don't try to start tasks that will error (because 0 images) 

if task_metadata['image']['images']: 

task.start() 

else: 

logger.debug('Not starting task because it exports 0 images') 

 

self.tracking_id = store.store_metadata(dict(self.tracking_metadata), 

self.tracking_name, 

self.tracking_prefix) 

return self.tracking_id 

 

def _validate_names(self): 

""" Raise ValueError if there is a name collision 

""" 

names = [item['name'] for item in self._items] 

if len(set(names)) != len(names): 

raise ValueError( 

'Pre-ARD items in this order do not have unique names and ' 

'overwrite each other. Please make sure ' 

'``self.name_template`` has enough information (collection, ' 

'date_start, date_end, tile, etc) to generate unique names.') 

 

 

def create_preard_task(image, image_metadata, name, prefix, tile, store, 

order_info=None, export_image_kwds=None): 

""" Submit an EE pre-ARD processing task and store task info 

""" 

# TODO: create model for PreARDMetadata and use it 

# Prepare  

export_image_kwds = export_image_kwds or {} 

export_image_kwds.update(tile_export_image_kwds(tile)) 

 

empty = len(image_metadata['images']) == 0 

# Don't actually create / submit task if order is empty 

if empty: 

task = None 

task_metadata = { 

'name': name, 

'prefix': prefix, 

'status': {'state': EE_STATES.EMPTY} 

} 

else: 

# Create EE task 

task = store.store_image(image, name, prefix, **export_image_kwds) 

task_metadata = get_task_metadata(task) 

 

# Finish creating metadata for task 

metadata = { 

'program': get_program_metadata(), 

'order': order_info or {}, 

'tile': tile.to_dict(), 

'store': { 

'service': store.__class__.__name__, 

'export_image_kwds': export_image_kwds 

}, 

'task': task_metadata, 

'image': image_metadata, 

} 

metadata_id = store.store_metadata(metadata, name, prefix) 

 

return task, metadata, metadata_id 

 

 

def tile_export_image_kwds(tile): 

""" Returns keywords to tile exported ``ee.Image`` by reprojecting/clipping 

 

Parameters 

---------- 

tile : stems.gis.grids.Tile 

A tile 

 

Returns 

------- 

dict[str, str] 

Keywords "crs", "crs_transform", and "dimensions" 

""" 

kwds = { 

'crs': tile.crs.wkt, 

'crs_transform': json.dumps(tile.transform[:6]), 

'dimensions': f'{tile.width}x{tile.height}', 

} 

return kwds