Batch Processing
Large-scale batch processing and automation techniques.
This page documents the batch processing example.
1"""
2Advanced Example 2: Batch Processing and Large-Scale Analysis
3=============================================================
4
5This example demonstrates:
6- Large-scale batch processing techniques
7- Parallel processing strategies
8- Memory-efficient workflows
9- Error handling for long-running operations
10- Progress monitoring and logging
11- Export optimization for large datasets
12
13Prerequisites:
14- Experience with Earth Engine collections
15- Understanding of parallel processing concepts
16- Knowledge of export operations
17- Familiarity with error handling
18"""
19
20import ee
21import time
22import json
23import logging
24from datetime import datetime, timedelta
25from concurrent.futures import ThreadPoolExecutor, as_completed
26import pandas as pd
27
28class BatchProcessor:
29 """Advanced batch processing system for Earth Engine operations."""
30
31 def __init__(self, project_id, max_workers=5):
32 """Initialize batch processor."""
33 self.project_id = project_id
34 self.max_workers = max_workers
35 self.tasks = []
36 self.results = {}
37
38 # Setup logging
39 logging.basicConfig(
40 level=logging.INFO,
41 format='%(asctime)s - %(levelname)s - %(message)s',
42 handlers=[
43 logging.FileHandler('batch_processing.log'),
44 logging.StreamHandler()
45 ]
46 )
47 self.logger = logging.getLogger(__name__)
48
49 # Initialize Earth Engine
50 self.initialize_ee()
51
52 def initialize_ee(self):
53 """Initialize Earth Engine with error handling."""
54 try:
55 ee.Initialize(project=self.project_id)
56 self.logger.info("✓ Earth Engine initialized successfully")
57 except Exception as e:
58 self.logger.error(f"✗ Failed to initialize Earth Engine: {e}")
59 raise
60
61 def create_processing_grid(self, region, grid_size=1.0):
62 """
63 Create a grid of processing tiles over a large region.
64
65 Args:
66 region: ee.Geometry defining the area of interest
67 grid_size: Size of each grid cell in degrees
68
69 Returns:
70 ee.FeatureCollection: Grid cells for processing
71 """
72 self.logger.info(f"Creating processing grid with {grid_size}° cells")
73
74 # Get region bounds
75 bounds = region.bounds()
76 coords = ee.List(bounds.coordinates().get(0))
77
78 # Extract coordinates
79 xs = coords.map(lambda item: ee.List(item).get(0))
80 ys = coords.map(lambda item: ee.List(item).get(1))
81
82 min_x = xs.reduce(ee.Reducer.min())
83 max_x = xs.reduce(ee.Reducer.max())
84 min_y = ys.reduce(ee.Reducer.min())
85 max_y = ys.reduce(ee.Reducer.max())
86
87 # Create grid
88 def create_grid_cell(x):
89 def create_cell_row(y):
90 x_coord = ee.Number(x)
91 y_coord = ee.Number(y)
92
93 cell = ee.Geometry.Rectangle([
94 x_coord, y_coord,
95 x_coord.add(grid_size), y_coord.add(grid_size)
96 ])
97
98 # Only include cells that intersect the region
99 intersects = cell.intersects(region)
100
101 return ee.Feature(cell, {
102 'tile_x': x_coord,
103 'tile_y': y_coord,
104 'tile_id': x_coord.format('%.2f').cat('_').cat(y_coord.format('%.2f')),
105 'intersects_region': intersects
106 }).set('intersects_region', intersects)
107
108 x_range = ee.List.sequence(min_y, max_y, grid_size)
109 return x_range.map(create_cell_row)
110
111 x_range = ee.List.sequence(min_x, max_x, grid_size)
112 grid_features = x_range.map(create_grid_cell).flatten()
113
114 # Filter to only cells that intersect the region
115 valid_grid = ee.FeatureCollection(grid_features).filter(
116 ee.Filter.eq('intersects_region', True)
117 )
118
119 grid_size_info = valid_grid.size().getInfo()
120 self.logger.info(f"Created grid with {grid_size_info} tiles")
121
122 return valid_grid
123
124 def batch_image_collection_processing(self, collection, process_func, batch_size=50):
125 """
126 Process large image collections in batches.
127
128 Args:
129 collection: ee.ImageCollection to process
130 process_func: Function to apply to each batch
131 batch_size: Number of images per batch
132
133 Returns:
134 list: Results from each batch
135 """
136 total_images = collection.size().getInfo()
137 self.logger.info(f"Processing {total_images} images in batches of {batch_size}")
138
139 results = []
140 num_batches = (total_images + batch_size - 1) // batch_size
141
142 for i in range(0, total_images, batch_size):
143 batch_num = i // batch_size + 1
144 self.logger.info(f"Processing batch {batch_num}/{num_batches}")
145
146 try:
147 # Get batch
148 batch = collection.limit(batch_size, i)
149
150 # Apply processing function
151 start_time = time.time()
152 batch_result = process_func(batch)
153 processing_time = time.time() - start_time
154
155 results.append({
156 'batch_id': batch_num,
157 'result': batch_result,
158 'processing_time': processing_time,
159 'images_processed': min(batch_size, total_images - i)
160 })
161
162 self.logger.info(f"Batch {batch_num} completed in {processing_time:.2f}s")
163
164 # Small delay to avoid rate limiting
165 time.sleep(1)
166
167 except Exception as e:
168 self.logger.error(f"Error processing batch {batch_num}: {e}")
169 results.append({
170 'batch_id': batch_num,
171 'result': None,
172 'error': str(e),
173 'processing_time': 0
174 })
175
176 return results
177
178 def parallel_export_tasks(self, export_configs):
179 """
180 Create and manage multiple export tasks in parallel.
181
182 Args:
183 export_configs: List of export configuration dictionaries
184
185 Returns:
186 dict: Task IDs and their configurations
187 """
188 self.logger.info(f"Creating {len(export_configs)} parallel export tasks")
189
190 tasks = {}
191
192 for i, config in enumerate(export_configs):
193 try:
194 # Create export task based on type
195 if config['type'] == 'image':
196 task = ee.batch.Export.image.toDrive(**config['params'])
197 elif config['type'] == 'table':
198 task = ee.batch.Export.table.toDrive(**config['params'])
199 elif config['type'] == 'video':
200 task = ee.batch.Export.video.toDrive(**config['params'])
201 else:
202 raise ValueError(f"Unknown export type: {config['type']}")
203
204 # Start task
205 task.start()
206
207 tasks[task.id] = {
208 'task': task,
209 'config': config,
210 'started_at': datetime.now(),
211 'status': 'RUNNING'
212 }
213
214 self.logger.info(f"Started export task {i+1}: {task.id}")
215
216 # Small delay between task starts
217 time.sleep(2)
218
219 except Exception as e:
220 self.logger.error(f"Failed to create export task {i+1}: {e}")
221
222 return tasks
223
224 def monitor_task_progress(self, tasks, check_interval=60):
225 """
226 Monitor progress of running tasks.
227
228 Args:
229 tasks: Dictionary of task IDs and configurations
230 check_interval: How often to check status (seconds)
231
232 Returns:
233 dict: Final status of all tasks
234 """
235 self.logger.info(f"Monitoring {len(tasks)} tasks")
236
237 completed_tasks = 0
238 total_tasks = len(tasks)
239
240 while completed_tasks < total_tasks:
241 self.logger.info(f"Checking task status... ({completed_tasks}/{total_tasks} completed)")
242
243 for task_id, task_info in tasks.items():
244 if task_info['status'] in ['COMPLETED', 'FAILED', 'CANCELLED']:
245 continue
246
247 try:
248 # Check task status
249 status = task_info['task'].status()
250
251 if status['state'] in ['COMPLETED', 'FAILED', 'CANCELLED']:
252 task_info['status'] = status['state']
253 task_info['completed_at'] = datetime.now()
254
255 if status['state'] == 'COMPLETED':
256 completed_tasks += 1
257 self.logger.info(f"✓ Task completed: {task_id}")
258 else:
259 completed_tasks += 1
260 error_msg = status.get('error_message', 'Unknown error')
261 self.logger.error(f"✗ Task failed: {task_id} - {error_msg}")
262
263 except Exception as e:
264 self.logger.error(f"Error checking task {task_id}: {e}")
265
266 if completed_tasks < total_tasks:
267 time.sleep(check_interval)
268
269 self.logger.info("All tasks completed")
270 return tasks
271
272 def time_series_batch_processing(self, region, start_date, end_date, time_step_days=16):
273 """
274 Process time series data in temporal batches.
275
276 Args:
277 region: Area of interest
278 start_date: Start date string
279 end_date: End date string
280 time_step_days: Days per time step
281
282 Returns:
283 list: Time series results
284 """
285 self.logger.info(f"Processing time series from {start_date} to {end_date}")
286
287 # Create time periods
288 start = datetime.strptime(start_date, '%Y-%m-%d')
289 end = datetime.strptime(end_date, '%Y-%m-%d')
290
291 periods = []
292 current_date = start
293
294 while current_date < end:
295 period_end = min(current_date + timedelta(days=time_step_days), end)
296 periods.append({
297 'start': current_date.strftime('%Y-%m-%d'),
298 'end': period_end.strftime('%Y-%m-%d'),
299 'period_id': len(periods) + 1
300 })
301 current_date = period_end
302
303 self.logger.info(f"Created {len(periods)} time periods")
304
305 # Process each period
306 results = []
307
308 for period in periods:
309 self.logger.info(f"Processing period {period['period_id']}: {period['start']} to {period['end']}")
310
311 try:
312 # Load collection for this period
313 collection = (ee.ImageCollection('LANDSAT/LC08/C02/T1_L2')
314 .filterDate(period['start'], period['end'])
315 .filterBounds(region)
316 .filter(ee.Filter.lt('CLOUD_COVER', 30)))
317
318 # Check if any images available
319 count = collection.size().getInfo()
320 if count == 0:
321 self.logger.warning(f"No images found for period {period['period_id']}")
322 continue
323
324 # Create composite
325 composite = collection.median()
326
327 # Calculate NDVI
328 ndvi = composite.normalizedDifference(['SR_B5', 'SR_B4']).rename('NDVI')
329
330 # Calculate statistics
331 stats = ndvi.reduceRegion(
332 reducer=ee.Reducer.mean().combine(
333 reducer2=ee.Reducer.stdDev(),
334 sharedInputs=True
335 ),
336 geometry=region,
337 scale=30,
338 maxPixels=1e9
339 )
340
341 result = {
342 'period_id': period['period_id'],
343 'start_date': period['start'],
344 'end_date': period['end'],
345 'image_count': count,
346 'ndvi_mean': stats.getInfo().get('NDVI_mean'),
347 'ndvi_stddev': stats.getInfo().get('NDVI_stdDev')
348 }
349
350 results.append(result)
351 self.logger.info(f"✓ Period {period['period_id']} processed successfully")
352
353 except Exception as e:
354 self.logger.error(f"✗ Error processing period {period['period_id']}: {e}")
355 results.append({
356 'period_id': period['period_id'],
357 'start_date': period['start'],
358 'end_date': period['end'],
359 'error': str(e)
360 })
361
362 return results
363
364 def large_scale_classification(self, training_data, region_grid, output_folder):
365 """
366 Perform classification across a large region using grid-based processing.
367
368 Args:
369 training_data: Training feature collection
370 region_grid: Grid of processing tiles
371 output_folder: Output folder for results
372
373 Returns:
374 list: Export task configurations
375 """
376 self.logger.info("Starting large-scale classification")
377
378 # Prepare classifier
379 def train_classifier():
380 """Train random forest classifier."""
381 # Load recent Landsat collection
382 collection = (ee.ImageCollection('LANDSAT/LC08/C02/T1_L2')
383 .filterDate('2023-01-01', '2023-12-31')
384 .filter(ee.Filter.lt('CLOUD_COVER', 10))
385 .median())
386
387 # Calculate spectral indices
388 ndvi = collection.normalizedDifference(['SR_B5', 'SR_B4']).rename('NDVI')
389 ndwi = collection.normalizedDifference(['SR_B3', 'SR_B5']).rename('NDWI')
390
391 # Create feature image
392 features = collection.select(['SR_B2', 'SR_B3', 'SR_B4', 'SR_B5', 'SR_B6', 'SR_B7']).addBands([ndvi, ndwi])
393
394 # Sample training data
395 training = features.sampleRegions(
396 collection=training_data,
397 properties=['landcover'],
398 scale=30
399 )
400
401 # Train classifier
402 classifier = ee.Classifier.smileRandomForest(100).train(
403 features=training,
404 classProperty='landcover',
405 inputProperties=features.bandNames()
406 )
407
408 return features, classifier
409
410 features, classifier = train_classifier()
411
412 # Get grid tiles
413 grid_list = region_grid.getInfo()['features']
414 self.logger.info(f"Processing {len(grid_list)} grid tiles")
415
416 # Create export configurations for each tile
417 export_configs = []
418
419 for i, tile_feature in enumerate(grid_list):
420 tile_geom = ee.Geometry(tile_feature['geometry'])
421 tile_id = tile_feature['properties']['tile_id']
422
423 # Classify tile
424 tile_features = features.clip(tile_geom)
425 classified = tile_features.classify(classifier).select(['classification'])
426
427 # Create export configuration
428 export_config = {
429 'type': 'image',
430 'params': {
431 'image': classified,
432 'description': f'classification_tile_{tile_id}',
433 'folder': output_folder,
434 'region': tile_geom,
435 'scale': 30,
436 'crs': 'EPSG:4326',
437 'maxPixels': 1e13
438 }
439 }
440
441 export_configs.append(export_config)
442
443 return export_configs
444
445 def optimize_memory_usage(self):
446 """Implement memory optimization strategies."""
447 self.logger.info("Applying memory optimization strategies")
448
449 strategies = {
450 'reduce_precision': 'Use .float() instead of .double() for calculations',
451 'select_bands': 'Only select necessary bands for processing',
452 'clip_early': 'Clip images to study area as early as possible',
453 'batch_processing': 'Process data in smaller chunks',
454 'clear_cache': 'Periodically clear Earth Engine cache'
455 }
456
457 for strategy, description in strategies.items():
458 self.logger.info(f" {strategy}: {description}")
459
460 # Example optimization
461 def optimized_processing_example(image, region):
462 """Example of optimized image processing."""
463 # Clip early and select only needed bands
464 clipped = image.clip(region).select(['SR_B4', 'SR_B5'])
465
466 # Use float precision
467 processed = clipped.float()
468
469 # Calculate only necessary indices
470 ndvi = processed.normalizedDifference(['SR_B5', 'SR_B4'])
471
472 return ndvi
473
474 return optimized_processing_example
475
476def main():
477 """Main function demonstrating batch processing capabilities."""
478
479 # Initialize batch processor
480 processor = BatchProcessor('your-project-id', max_workers=3)
481
482 print("="*80)
483 print("⚡ ADVANCED BATCH PROCESSING AND LARGE-SCALE ANALYSIS")
484 print("="*80)
485
486 # Define study region (California)
487 study_region = ee.Geometry.Rectangle([-124.5, 32.5, -114.0, 42.0])
488
489 # Example 1: Grid-based processing
490 print("\n1️⃣ Grid-Based Processing")
491 print("-" * 30)
492
493 processing_grid = processor.create_processing_grid(study_region, grid_size=2.0)
494 print(f"✓ Created processing grid with {processing_grid.size().getInfo()} tiles")
495
496 # Example 2: Batch collection processing
497 print("\n2️⃣ Batch Collection Processing")
498 print("-" * 35)
499
500 # Load large collection
501 large_collection = (ee.ImageCollection('LANDSAT/LC08/C02/T1_L2')
502 .filterDate('2023-01-01', '2023-12-31')
503 .filterBounds(study_region.centroid().buffer(100000))
504 .filter(ee.Filter.lt('CLOUD_COVER', 50)))
505
506 def calculate_ndvi_stats(batch):
507 """Calculate NDVI statistics for a batch of images."""
508 composite = batch.median()
509 ndvi = composite.normalizedDifference(['SR_B5', 'SR_B4'])
510
511 stats = ndvi.reduceRegion(
512 reducer=ee.Reducer.mean(),
513 geometry=study_region.centroid().buffer(50000),
514 scale=30,
515 maxPixels=1e6
516 )
517
518 return stats.get('nd')
519
520 batch_results = processor.batch_image_collection_processing(
521 large_collection,
522 calculate_ndvi_stats,
523 batch_size=20
524 )
525
526 print(f"✓ Processed {len(batch_results)} batches")
527 successful_batches = [r for r in batch_results if 'error' not in r]
528 print(f"✓ {len(successful_batches)} batches completed successfully")
529
530 # Example 3: Time series batch processing
531 print("\n3️⃣ Time Series Batch Processing")
532 print("-" * 35)
533
534 small_region = study_region.centroid().buffer(10000)
535 time_series_results = processor.time_series_batch_processing(
536 small_region,
537 '2023-01-01',
538 '2023-12-31',
539 time_step_days=30
540 )
541
542 print(f"✓ Processed {len(time_series_results)} time periods")
543
544 # Create summary statistics
545 valid_results = [r for r in time_series_results if 'error' not in r and r.get('ndvi_mean')]
546 if valid_results:
547 avg_ndvi = sum(r['ndvi_mean'] for r in valid_results) / len(valid_results)
548 print(f"✓ Average NDVI across time series: {avg_ndvi:.3f}")
549
550 # Example 4: Parallel export setup
551 print("\n4️⃣ Parallel Export Configuration")
552 print("-" * 35)
553
554 # Create sample export configurations
555 sample_exports = []
556 grid_sample = processing_grid.limit(3) # Process first 3 tiles
557
558 grid_features = grid_sample.getInfo()['features']
559
560 for i, tile in enumerate(grid_features):
561 tile_geom = ee.Geometry(tile['geometry'])
562
563 # Create sample image for export
564 sample_image = (ee.ImageCollection('LANDSAT/LC08/C02/T1_L2')
565 .filterBounds(tile_geom)
566 .filterDate('2023-06-01', '2023-08-31')
567 .median()
568 .select(['SR_B4', 'SR_B3', 'SR_B2']))
569
570 export_config = {
571 'type': 'image',
572 'params': {
573 'image': sample_image,
574 'description': f'sample_export_tile_{i+1}',
575 'folder': 'BatchProcessing_Exports',
576 'region': tile_geom,
577 'scale': 30,
578 'maxPixels': 1e13
579 }
580 }
581
582 sample_exports.append(export_config)
583
584 print(f"✓ Created {len(sample_exports)} export configurations")
585 print(" (Note: Exports not started in demo mode)")
586
587 # Example 5: Memory optimization demonstration
588 print("\n5️⃣ Memory Optimization")
589 print("-" * 25)
590
591 optimization_func = processor.optimize_memory_usage()
592 print("✓ Memory optimization strategies implemented")
593
594 # Example 6: Performance monitoring
595 print("\n6️⃣ Performance Monitoring")
596 print("-" * 28)
597
598 # Calculate processing statistics
599 if batch_results:
600 total_processing_time = sum(r.get('processing_time', 0) for r in batch_results)
601 total_images = sum(r.get('images_processed', 0) for r in batch_results)
602
603 print(f"✓ Total processing time: {total_processing_time:.2f} seconds")
604 print(f"✓ Total images processed: {total_images}")
605
606 if total_images > 0:
607 avg_time_per_image = total_processing_time / total_images
608 print(f"✓ Average time per image: {avg_time_per_image:.3f} seconds")
609
610 # Summary
611 print("\n" + "="*80)
612 print("📊 BATCH PROCESSING SUMMARY")
613 print("="*80)
614
615 print("\n🎯 Capabilities Demonstrated:")
616 print("• Grid-based spatial processing")
617 print("• Batch collection processing with error handling")
618 print("• Time series analysis in temporal batches")
619 print("• Parallel export task management")
620 print("• Memory optimization strategies")
621 print("• Performance monitoring and logging")
622
623 print("\n📈 Processing Statistics:")
624 print(f"• Processing grid: {processing_grid.size().getInfo()} tiles")
625 print(f"• Collection batches: {len(batch_results)} processed")
626 print(f"• Time series periods: {len(time_series_results)} analyzed")
627 print(f"• Export tasks configured: {len(sample_exports)}")
628
629 print("\n🏆 Best Practices Applied:")
630 print("• Comprehensive error handling and logging")
631 print("• Progress monitoring for long operations")
632 print("• Memory-efficient processing strategies")
633 print("• Scalable parallel processing architecture")
634 print("• Resource optimization techniques")
635
636 print("\n✅ Advanced Batch Processing Complete!")
637
638if __name__ == "__main__":
639 main()