Batch Processing

Large-scale batch processing and automation techniques.

This page documents the batch processing example.

  1"""
  2Advanced Example 2: Batch Processing and Large-Scale Analysis
  3=============================================================
  4
  5This example demonstrates:
  6- Large-scale batch processing techniques
  7- Parallel processing strategies
  8- Memory-efficient workflows
  9- Error handling for long-running operations
 10- Progress monitoring and logging
 11- Export optimization for large datasets
 12
 13Prerequisites:
 14- Experience with Earth Engine collections
 15- Understanding of parallel processing concepts
 16- Knowledge of export operations
 17- Familiarity with error handling
 18"""
 19
 20import ee
 21import time
 22import json
 23import logging
 24from datetime import datetime, timedelta
 25from concurrent.futures import ThreadPoolExecutor, as_completed
 26import pandas as pd
 27
 28class BatchProcessor:
 29    """Advanced batch processing system for Earth Engine operations."""
 30    
 31    def __init__(self, project_id, max_workers=5):
 32        """Initialize batch processor."""
 33        self.project_id = project_id
 34        self.max_workers = max_workers
 35        self.tasks = []
 36        self.results = {}
 37        
 38        # Setup logging
 39        logging.basicConfig(
 40            level=logging.INFO,
 41            format='%(asctime)s - %(levelname)s - %(message)s',
 42            handlers=[
 43                logging.FileHandler('batch_processing.log'),
 44                logging.StreamHandler()
 45            ]
 46        )
 47        self.logger = logging.getLogger(__name__)
 48        
 49        # Initialize Earth Engine
 50        self.initialize_ee()
 51    
 52    def initialize_ee(self):
 53        """Initialize Earth Engine with error handling."""
 54        try:
 55            ee.Initialize(project=self.project_id)
 56            self.logger.info("✓ Earth Engine initialized successfully")
 57        except Exception as e:
 58            self.logger.error(f"✗ Failed to initialize Earth Engine: {e}")
 59            raise
 60    
 61    def create_processing_grid(self, region, grid_size=1.0):
 62        """
 63        Create a grid of processing tiles over a large region.
 64        
 65        Args:
 66            region: ee.Geometry defining the area of interest
 67            grid_size: Size of each grid cell in degrees
 68        
 69        Returns:
 70            ee.FeatureCollection: Grid cells for processing
 71        """
 72        self.logger.info(f"Creating processing grid with {grid_size}° cells")
 73        
 74        # Get region bounds
 75        bounds = region.bounds()
 76        coords = ee.List(bounds.coordinates().get(0))
 77        
 78        # Extract coordinates
 79        xs = coords.map(lambda item: ee.List(item).get(0))
 80        ys = coords.map(lambda item: ee.List(item).get(1))
 81        
 82        min_x = xs.reduce(ee.Reducer.min())
 83        max_x = xs.reduce(ee.Reducer.max())
 84        min_y = ys.reduce(ee.Reducer.min())
 85        max_y = ys.reduce(ee.Reducer.max())
 86        
 87        # Create grid
 88        def create_grid_cell(x):
 89            def create_cell_row(y):
 90                x_coord = ee.Number(x)
 91                y_coord = ee.Number(y)
 92                
 93                cell = ee.Geometry.Rectangle([
 94                    x_coord, y_coord,
 95                    x_coord.add(grid_size), y_coord.add(grid_size)
 96                ])
 97                
 98                # Only include cells that intersect the region
 99                intersects = cell.intersects(region)
100                
101                return ee.Feature(cell, {
102                    'tile_x': x_coord,
103                    'tile_y': y_coord,
104                    'tile_id': x_coord.format('%.2f').cat('_').cat(y_coord.format('%.2f')),
105                    'intersects_region': intersects
106                }).set('intersects_region', intersects)
107            
108            x_range = ee.List.sequence(min_y, max_y, grid_size)
109            return x_range.map(create_cell_row)
110        
111        x_range = ee.List.sequence(min_x, max_x, grid_size)
112        grid_features = x_range.map(create_grid_cell).flatten()
113        
114        # Filter to only cells that intersect the region
115        valid_grid = ee.FeatureCollection(grid_features).filter(
116            ee.Filter.eq('intersects_region', True)
117        )
118        
119        grid_size_info = valid_grid.size().getInfo()
120        self.logger.info(f"Created grid with {grid_size_info} tiles")
121        
122        return valid_grid
123    
124    def batch_image_collection_processing(self, collection, process_func, batch_size=50):
125        """
126        Process large image collections in batches.
127        
128        Args:
129            collection: ee.ImageCollection to process
130            process_func: Function to apply to each batch
131            batch_size: Number of images per batch
132        
133        Returns:
134            list: Results from each batch
135        """
136        total_images = collection.size().getInfo()
137        self.logger.info(f"Processing {total_images} images in batches of {batch_size}")
138        
139        results = []
140        num_batches = (total_images + batch_size - 1) // batch_size
141        
142        for i in range(0, total_images, batch_size):
143            batch_num = i // batch_size + 1
144            self.logger.info(f"Processing batch {batch_num}/{num_batches}")
145            
146            try:
147                # Get batch
148                batch = collection.limit(batch_size, i)
149                
150                # Apply processing function
151                start_time = time.time()
152                batch_result = process_func(batch)
153                processing_time = time.time() - start_time
154                
155                results.append({
156                    'batch_id': batch_num,
157                    'result': batch_result,
158                    'processing_time': processing_time,
159                    'images_processed': min(batch_size, total_images - i)
160                })
161                
162                self.logger.info(f"Batch {batch_num} completed in {processing_time:.2f}s")
163                
164                # Small delay to avoid rate limiting
165                time.sleep(1)
166                
167            except Exception as e:
168                self.logger.error(f"Error processing batch {batch_num}: {e}")
169                results.append({
170                    'batch_id': batch_num,
171                    'result': None,
172                    'error': str(e),
173                    'processing_time': 0
174                })
175        
176        return results
177    
178    def parallel_export_tasks(self, export_configs):
179        """
180        Create and manage multiple export tasks in parallel.
181        
182        Args:
183            export_configs: List of export configuration dictionaries
184        
185        Returns:
186            dict: Task IDs and their configurations
187        """
188        self.logger.info(f"Creating {len(export_configs)} parallel export tasks")
189        
190        tasks = {}
191        
192        for i, config in enumerate(export_configs):
193            try:
194                # Create export task based on type
195                if config['type'] == 'image':
196                    task = ee.batch.Export.image.toDrive(**config['params'])
197                elif config['type'] == 'table':
198                    task = ee.batch.Export.table.toDrive(**config['params'])
199                elif config['type'] == 'video':
200                    task = ee.batch.Export.video.toDrive(**config['params'])
201                else:
202                    raise ValueError(f"Unknown export type: {config['type']}")
203                
204                # Start task
205                task.start()
206                
207                tasks[task.id] = {
208                    'task': task,
209                    'config': config,
210                    'started_at': datetime.now(),
211                    'status': 'RUNNING'
212                }
213                
214                self.logger.info(f"Started export task {i+1}: {task.id}")
215                
216                # Small delay between task starts
217                time.sleep(2)
218                
219            except Exception as e:
220                self.logger.error(f"Failed to create export task {i+1}: {e}")
221        
222        return tasks
223    
224    def monitor_task_progress(self, tasks, check_interval=60):
225        """
226        Monitor progress of running tasks.
227        
228        Args:
229            tasks: Dictionary of task IDs and configurations
230            check_interval: How often to check status (seconds)
231        
232        Returns:
233            dict: Final status of all tasks
234        """
235        self.logger.info(f"Monitoring {len(tasks)} tasks")
236        
237        completed_tasks = 0
238        total_tasks = len(tasks)
239        
240        while completed_tasks < total_tasks:
241            self.logger.info(f"Checking task status... ({completed_tasks}/{total_tasks} completed)")
242            
243            for task_id, task_info in tasks.items():
244                if task_info['status'] in ['COMPLETED', 'FAILED', 'CANCELLED']:
245                    continue
246                
247                try:
248                    # Check task status
249                    status = task_info['task'].status()
250                    
251                    if status['state'] in ['COMPLETED', 'FAILED', 'CANCELLED']:
252                        task_info['status'] = status['state']
253                        task_info['completed_at'] = datetime.now()
254                        
255                        if status['state'] == 'COMPLETED':
256                            completed_tasks += 1
257                            self.logger.info(f"✓ Task completed: {task_id}")
258                        else:
259                            completed_tasks += 1
260                            error_msg = status.get('error_message', 'Unknown error')
261                            self.logger.error(f"✗ Task failed: {task_id} - {error_msg}")
262                
263                except Exception as e:
264                    self.logger.error(f"Error checking task {task_id}: {e}")
265            
266            if completed_tasks < total_tasks:
267                time.sleep(check_interval)
268        
269        self.logger.info("All tasks completed")
270        return tasks
271    
272    def time_series_batch_processing(self, region, start_date, end_date, time_step_days=16):
273        """
274        Process time series data in temporal batches.
275        
276        Args:
277            region: Area of interest
278            start_date: Start date string
279            end_date: End date string
280            time_step_days: Days per time step
281        
282        Returns:
283            list: Time series results
284        """
285        self.logger.info(f"Processing time series from {start_date} to {end_date}")
286        
287        # Create time periods
288        start = datetime.strptime(start_date, '%Y-%m-%d')
289        end = datetime.strptime(end_date, '%Y-%m-%d')
290        
291        periods = []
292        current_date = start
293        
294        while current_date < end:
295            period_end = min(current_date + timedelta(days=time_step_days), end)
296            periods.append({
297                'start': current_date.strftime('%Y-%m-%d'),
298                'end': period_end.strftime('%Y-%m-%d'),
299                'period_id': len(periods) + 1
300            })
301            current_date = period_end
302        
303        self.logger.info(f"Created {len(periods)} time periods")
304        
305        # Process each period
306        results = []
307        
308        for period in periods:
309            self.logger.info(f"Processing period {period['period_id']}: {period['start']} to {period['end']}")
310            
311            try:
312                # Load collection for this period
313                collection = (ee.ImageCollection('LANDSAT/LC08/C02/T1_L2')
314                             .filterDate(period['start'], period['end'])
315                             .filterBounds(region)
316                             .filter(ee.Filter.lt('CLOUD_COVER', 30)))
317                
318                # Check if any images available
319                count = collection.size().getInfo()
320                if count == 0:
321                    self.logger.warning(f"No images found for period {period['period_id']}")
322                    continue
323                
324                # Create composite
325                composite = collection.median()
326                
327                # Calculate NDVI
328                ndvi = composite.normalizedDifference(['SR_B5', 'SR_B4']).rename('NDVI')
329                
330                # Calculate statistics
331                stats = ndvi.reduceRegion(
332                    reducer=ee.Reducer.mean().combine(
333                        reducer2=ee.Reducer.stdDev(),
334                        sharedInputs=True
335                    ),
336                    geometry=region,
337                    scale=30,
338                    maxPixels=1e9
339                )
340                
341                result = {
342                    'period_id': period['period_id'],
343                    'start_date': period['start'],
344                    'end_date': period['end'],
345                    'image_count': count,
346                    'ndvi_mean': stats.getInfo().get('NDVI_mean'),
347                    'ndvi_stddev': stats.getInfo().get('NDVI_stdDev')
348                }
349                
350                results.append(result)
351                self.logger.info(f"✓ Period {period['period_id']} processed successfully")
352                
353            except Exception as e:
354                self.logger.error(f"✗ Error processing period {period['period_id']}: {e}")
355                results.append({
356                    'period_id': period['period_id'],
357                    'start_date': period['start'],
358                    'end_date': period['end'],
359                    'error': str(e)
360                })
361        
362        return results
363    
364    def large_scale_classification(self, training_data, region_grid, output_folder):
365        """
366        Perform classification across a large region using grid-based processing.
367        
368        Args:
369            training_data: Training feature collection
370            region_grid: Grid of processing tiles
371            output_folder: Output folder for results
372        
373        Returns:
374            list: Export task configurations
375        """
376        self.logger.info("Starting large-scale classification")
377        
378        # Prepare classifier
379        def train_classifier():
380            """Train random forest classifier."""
381            # Load recent Landsat collection
382            collection = (ee.ImageCollection('LANDSAT/LC08/C02/T1_L2')
383                         .filterDate('2023-01-01', '2023-12-31')
384                         .filter(ee.Filter.lt('CLOUD_COVER', 10))
385                         .median())
386            
387            # Calculate spectral indices
388            ndvi = collection.normalizedDifference(['SR_B5', 'SR_B4']).rename('NDVI')
389            ndwi = collection.normalizedDifference(['SR_B3', 'SR_B5']).rename('NDWI')
390            
391            # Create feature image
392            features = collection.select(['SR_B2', 'SR_B3', 'SR_B4', 'SR_B5', 'SR_B6', 'SR_B7']).addBands([ndvi, ndwi])
393            
394            # Sample training data
395            training = features.sampleRegions(
396                collection=training_data,
397                properties=['landcover'],
398                scale=30
399            )
400            
401            # Train classifier
402            classifier = ee.Classifier.smileRandomForest(100).train(
403                features=training,
404                classProperty='landcover',
405                inputProperties=features.bandNames()
406            )
407            
408            return features, classifier
409        
410        features, classifier = train_classifier()
411        
412        # Get grid tiles
413        grid_list = region_grid.getInfo()['features']
414        self.logger.info(f"Processing {len(grid_list)} grid tiles")
415        
416        # Create export configurations for each tile
417        export_configs = []
418        
419        for i, tile_feature in enumerate(grid_list):
420            tile_geom = ee.Geometry(tile_feature['geometry'])
421            tile_id = tile_feature['properties']['tile_id']
422            
423            # Classify tile
424            tile_features = features.clip(tile_geom)
425            classified = tile_features.classify(classifier).select(['classification'])
426            
427            # Create export configuration
428            export_config = {
429                'type': 'image',
430                'params': {
431                    'image': classified,
432                    'description': f'classification_tile_{tile_id}',
433                    'folder': output_folder,
434                    'region': tile_geom,
435                    'scale': 30,
436                    'crs': 'EPSG:4326',
437                    'maxPixels': 1e13
438                }
439            }
440            
441            export_configs.append(export_config)
442        
443        return export_configs
444    
445    def optimize_memory_usage(self):
446        """Implement memory optimization strategies."""
447        self.logger.info("Applying memory optimization strategies")
448        
449        strategies = {
450            'reduce_precision': 'Use .float() instead of .double() for calculations',
451            'select_bands': 'Only select necessary bands for processing',
452            'clip_early': 'Clip images to study area as early as possible',
453            'batch_processing': 'Process data in smaller chunks',
454            'clear_cache': 'Periodically clear Earth Engine cache'
455        }
456        
457        for strategy, description in strategies.items():
458            self.logger.info(f"  {strategy}: {description}")
459        
460        # Example optimization
461        def optimized_processing_example(image, region):
462            """Example of optimized image processing."""
463            # Clip early and select only needed bands
464            clipped = image.clip(region).select(['SR_B4', 'SR_B5'])
465            
466            # Use float precision
467            processed = clipped.float()
468            
469            # Calculate only necessary indices
470            ndvi = processed.normalizedDifference(['SR_B5', 'SR_B4'])
471            
472            return ndvi
473        
474        return optimized_processing_example
475
476def main():
477    """Main function demonstrating batch processing capabilities."""
478    
479    # Initialize batch processor
480    processor = BatchProcessor('your-project-id', max_workers=3)
481    
482    print("="*80)
483    print("⚡ ADVANCED BATCH PROCESSING AND LARGE-SCALE ANALYSIS")
484    print("="*80)
485    
486    # Define study region (California)
487    study_region = ee.Geometry.Rectangle([-124.5, 32.5, -114.0, 42.0])
488    
489    # Example 1: Grid-based processing
490    print("\n1️⃣ Grid-Based Processing")
491    print("-" * 30)
492    
493    processing_grid = processor.create_processing_grid(study_region, grid_size=2.0)
494    print(f"✓ Created processing grid with {processing_grid.size().getInfo()} tiles")
495    
496    # Example 2: Batch collection processing
497    print("\n2️⃣ Batch Collection Processing")
498    print("-" * 35)
499    
500    # Load large collection
501    large_collection = (ee.ImageCollection('LANDSAT/LC08/C02/T1_L2')
502                       .filterDate('2023-01-01', '2023-12-31')
503                       .filterBounds(study_region.centroid().buffer(100000))
504                       .filter(ee.Filter.lt('CLOUD_COVER', 50)))
505    
506    def calculate_ndvi_stats(batch):
507        """Calculate NDVI statistics for a batch of images."""
508        composite = batch.median()
509        ndvi = composite.normalizedDifference(['SR_B5', 'SR_B4'])
510        
511        stats = ndvi.reduceRegion(
512            reducer=ee.Reducer.mean(),
513            geometry=study_region.centroid().buffer(50000),
514            scale=30,
515            maxPixels=1e6
516        )
517        
518        return stats.get('nd')
519    
520    batch_results = processor.batch_image_collection_processing(
521        large_collection, 
522        calculate_ndvi_stats, 
523        batch_size=20
524    )
525    
526    print(f"✓ Processed {len(batch_results)} batches")
527    successful_batches = [r for r in batch_results if 'error' not in r]
528    print(f"✓ {len(successful_batches)} batches completed successfully")
529    
530    # Example 3: Time series batch processing
531    print("\n3️⃣ Time Series Batch Processing")
532    print("-" * 35)
533    
534    small_region = study_region.centroid().buffer(10000)
535    time_series_results = processor.time_series_batch_processing(
536        small_region,
537        '2023-01-01',
538        '2023-12-31',
539        time_step_days=30
540    )
541    
542    print(f"✓ Processed {len(time_series_results)} time periods")
543    
544    # Create summary statistics
545    valid_results = [r for r in time_series_results if 'error' not in r and r.get('ndvi_mean')]
546    if valid_results:
547        avg_ndvi = sum(r['ndvi_mean'] for r in valid_results) / len(valid_results)
548        print(f"✓ Average NDVI across time series: {avg_ndvi:.3f}")
549    
550    # Example 4: Parallel export setup
551    print("\n4️⃣ Parallel Export Configuration")
552    print("-" * 35)
553    
554    # Create sample export configurations
555    sample_exports = []
556    grid_sample = processing_grid.limit(3)  # Process first 3 tiles
557    
558    grid_features = grid_sample.getInfo()['features']
559    
560    for i, tile in enumerate(grid_features):
561        tile_geom = ee.Geometry(tile['geometry'])
562        
563        # Create sample image for export
564        sample_image = (ee.ImageCollection('LANDSAT/LC08/C02/T1_L2')
565                       .filterBounds(tile_geom)
566                       .filterDate('2023-06-01', '2023-08-31')
567                       .median()
568                       .select(['SR_B4', 'SR_B3', 'SR_B2']))
569        
570        export_config = {
571            'type': 'image',
572            'params': {
573                'image': sample_image,
574                'description': f'sample_export_tile_{i+1}',
575                'folder': 'BatchProcessing_Exports',
576                'region': tile_geom,
577                'scale': 30,
578                'maxPixels': 1e13
579            }
580        }
581        
582        sample_exports.append(export_config)
583    
584    print(f"✓ Created {len(sample_exports)} export configurations")
585    print("  (Note: Exports not started in demo mode)")
586    
587    # Example 5: Memory optimization demonstration
588    print("\n5️⃣ Memory Optimization")
589    print("-" * 25)
590    
591    optimization_func = processor.optimize_memory_usage()
592    print("✓ Memory optimization strategies implemented")
593    
594    # Example 6: Performance monitoring
595    print("\n6️⃣ Performance Monitoring")
596    print("-" * 28)
597    
598    # Calculate processing statistics
599    if batch_results:
600        total_processing_time = sum(r.get('processing_time', 0) for r in batch_results)
601        total_images = sum(r.get('images_processed', 0) for r in batch_results)
602        
603        print(f"✓ Total processing time: {total_processing_time:.2f} seconds")
604        print(f"✓ Total images processed: {total_images}")
605        
606        if total_images > 0:
607            avg_time_per_image = total_processing_time / total_images
608            print(f"✓ Average time per image: {avg_time_per_image:.3f} seconds")
609    
610    # Summary
611    print("\n" + "="*80)
612    print("📊 BATCH PROCESSING SUMMARY")
613    print("="*80)
614    
615    print("\n🎯 Capabilities Demonstrated:")
616    print("• Grid-based spatial processing")
617    print("• Batch collection processing with error handling")
618    print("• Time series analysis in temporal batches")
619    print("• Parallel export task management")
620    print("• Memory optimization strategies")
621    print("• Performance monitoring and logging")
622    
623    print("\n📈 Processing Statistics:")
624    print(f"• Processing grid: {processing_grid.size().getInfo()} tiles")
625    print(f"• Collection batches: {len(batch_results)} processed")
626    print(f"• Time series periods: {len(time_series_results)} analyzed")
627    print(f"• Export tasks configured: {len(sample_exports)}")
628    
629    print("\n🏆 Best Practices Applied:")
630    print("• Comprehensive error handling and logging")
631    print("• Progress monitoring for long operations")
632    print("• Memory-efficient processing strategies")
633    print("• Scalable parallel processing architecture")
634    print("• Resource optimization techniques")
635    
636    print("\n✅ Advanced Batch Processing Complete!")
637
638if __name__ == "__main__":
639    main()