diff --git a/threaded.py b/threaded.py new file mode 100644 index 0000000..50b5dfd --- /dev/null +++ b/threaded.py @@ -0,0 +1,510 @@ +#!/usr/bin/env python3 +""" +GeoJSON Road Comparison Script - Optimized Version + +Compares two GeoJSON files containing road data and identifies: +1. Roads in file1 that don't have corresponding coverage in file2 (removed roads) +2. Roads in file2 that don't have corresponding coverage in file1 (added roads) + +Only reports differences that are significant (above minimum length threshold). +Optimized for performance with parallel processing and spatial indexing. +""" + +import json +import argparse +from pathlib import Path +from typing import List, Dict, Any, Tuple +import geopandas as gpd +from shapely.geometry import LineString, MultiLineString, Point +from shapely.ops import unary_union +from shapely.strtree import STRtree +import pandas as pd +import warnings +import multiprocessing as mp +from functools import partial +import numpy as np +from concurrent.futures import ProcessPoolExecutor, as_completed +import gc + +# Suppress warnings for cleaner output +warnings.filterwarnings('ignore') + +class RoadComparator: + def __init__(self, tolerance_feet: float = 50.0, min_gap_length_feet: float = 100.0, + n_jobs: int = None, chunk_size: int = 1000): + """ + Initialize the road comparator. + + Args: + tolerance_feet: Distance tolerance for considering roads as overlapping (default: 50 feet) + min_gap_length_feet: Minimum length of gap/extra to be considered significant (default: 100 feet) + n_jobs: Number of parallel processes to use (default: CPU count - 1) + chunk_size: Number of geometries to process per chunk (default: 1000) + """ + self.tolerance_feet = tolerance_feet + self.min_gap_length_feet = min_gap_length_feet + self.n_jobs = n_jobs or max(1, mp.cpu_count() - 1) + self.chunk_size = chunk_size + + # Convert feet to degrees (approximate conversion for continental US) + # 1 degree latitude ā‰ˆ 364,000 feet + # 1 degree longitude ā‰ˆ 288,000 feet (at 40° latitude) + self.tolerance_deg = tolerance_feet / 364000.0 + self.min_gap_length_deg = min_gap_length_feet / 364000.0 + + print(f"Using {self.n_jobs} parallel processes with chunk size {self.chunk_size}") + + def load_geojson(self, filepath: str) -> gpd.GeoDataFrame: + """Load and validate GeoJSON file with optimizations.""" + try: + # Use pyogr engine for faster loading of large files + gdf = gpd.read_file(filepath, engine='pyogrio') + + # Filter only LineString and MultiLineString geometries + line_types = ['LineString', 'MultiLineString'] + gdf = gdf[gdf.geometry.type.isin(line_types)].copy() + + if len(gdf) == 0: + raise ValueError(f"No line geometries found in {filepath}") + + # Reset index for efficient processing + gdf = gdf.reset_index(drop=True) + + # Ensure geometry is valid and fix simple issues + invalid_mask = ~gdf.geometry.is_valid + if invalid_mask.any(): + print(f"Fixing {invalid_mask.sum()} invalid geometries...") + gdf.loc[invalid_mask, 'geometry'] = gdf.loc[invalid_mask, 'geometry'].buffer(0) + + print(f"Loaded {len(gdf)} road features from {filepath}") + return gdf + + except Exception as e: + raise Exception(f"Error loading {filepath}: {str(e)}") + + def create_buffered_union_optimized(self, gdf: gpd.GeoDataFrame) -> Any: + """Create a buffered union using chunked processing for memory efficiency.""" + print("Creating optimized buffered union...") + + # Process in chunks to manage memory + chunks = [gdf.iloc[i:i+self.chunk_size] for i in range(0, len(gdf), self.chunk_size)] + chunk_unions = [] + + # Use partial function for multiprocessing + buffer_func = partial(self._buffer_chunk, tolerance=self.tolerance_deg) + + with ProcessPoolExecutor(max_workers=self.n_jobs) as executor: + # Submit all chunk processing jobs + future_to_chunk = {executor.submit(buffer_func, chunk): i + for i, chunk in enumerate(chunks)} + + # Collect results as they complete + for future in as_completed(future_to_chunk): + chunk_idx = future_to_chunk[future] + try: + chunk_union = future.result() + if chunk_union and not chunk_union.is_empty: + chunk_unions.append(chunk_union) + print(f"Processed chunk {chunk_idx + 1}/{len(chunks)}") + except Exception as e: + print(f"Error processing chunk {chunk_idx}: {str(e)}") + + # Union all chunk results + print("Combining chunk unions...") + if chunk_unions: + final_union = unary_union(chunk_unions) + # Force garbage collection + del chunk_unions + gc.collect() + return final_union + else: + raise Exception("No valid geometries to create union") + + @staticmethod + def _buffer_chunk(chunk_gdf: gpd.GeoDataFrame, tolerance: float) -> Any: + """Buffer geometries in a chunk and return their union.""" + try: + # Buffer all geometries in the chunk + buffered = chunk_gdf.geometry.buffer(tolerance) + + # Create union of buffered geometries + if len(buffered) == 1: + return buffered.iloc[0] + else: + return unary_union(buffered.tolist()) + except Exception as e: + print(f"Error in chunk processing: {str(e)}") + return None + + def create_spatial_index(self, gdf: gpd.GeoDataFrame) -> STRtree: + """Create spatial index for fast intersection queries.""" + print("Creating spatial index...") + # Create STRtree for fast spatial queries + geometries = gdf.geometry.tolist() + return STRtree(geometries) + + def find_removed_segments_optimized(self, source_gdf: gpd.GeoDataFrame, + target_union: Any) -> List[Dict[str, Any]]: + """ + Find segments in source_gdf that are not covered by target_union (removed roads). + Optimized with parallel processing. + """ + print("Finding removed segments...") + + # Split into chunks for parallel processing + chunks = [source_gdf.iloc[i:i+self.chunk_size] + for i in range(0, len(source_gdf), self.chunk_size)] + + all_removed = [] + + # Use partial function for multiprocessing + process_func = partial(self._process_removed_chunk, + target_union=target_union, + min_length_deg=self.min_gap_length_deg) + + with ProcessPoolExecutor(max_workers=self.n_jobs) as executor: + # Submit all chunk processing jobs + future_to_chunk = {executor.submit(process_func, chunk): i + for i, chunk in enumerate(chunks)} + + # Collect results as they complete + for future in as_completed(future_to_chunk): + chunk_idx = future_to_chunk[future] + try: + chunk_removed = future.result() + all_removed.extend(chunk_removed) + print(f"Processed removed chunk {chunk_idx + 1}/{len(chunks)}") + except Exception as e: + print(f"Error processing removed chunk {chunk_idx}: {str(e)}") + + return all_removed + + @staticmethod + def _process_removed_chunk(chunk_gdf: gpd.GeoDataFrame, target_union: Any, + min_length_deg: float) -> List[Dict[str, Any]]: + """Process a chunk of geometries to find removed segments.""" + removed_segments = [] + + for idx, row in chunk_gdf.iterrows(): + geom = row.geometry + + # Handle MultiLineString by processing each component + if isinstance(geom, MultiLineString): + lines = list(geom.geoms) + else: + lines = [geom] + + for line in lines: + try: + # Find parts of the line that don't intersect with target_union + uncovered = line.difference(target_union) + + if uncovered.is_empty: + continue + + # Handle different geometry types returned by difference + uncovered_lines = [] + if hasattr(uncovered, 'geoms'): + for geom_part in uncovered.geoms: + if isinstance(geom_part, LineString): + uncovered_lines.append(geom_part) + elif isinstance(uncovered, LineString): + uncovered_lines.append(uncovered) + + # Check each uncovered line segment + for uncovered_line in uncovered_lines: + if uncovered_line.length >= min_length_deg: + # Create properties dict with original metadata plus 'removed: true' + properties = dict(row.drop('geometry')) + properties['removed'] = True + + removed_segments.append({ + 'geometry': uncovered_line, + **properties + }) + + except Exception as e: + continue # Skip problematic geometries + + return removed_segments + + def find_added_roads_optimized(self, source_gdf: gpd.GeoDataFrame, + target_union: Any) -> List[Dict[str, Any]]: + """ + Find entire roads in source_gdf that don't significantly overlap with target_union. + Optimized with parallel processing. + """ + print("Finding added roads...") + + # Split into chunks for parallel processing + chunks = [source_gdf.iloc[i:i+self.chunk_size] + for i in range(0, len(source_gdf), self.chunk_size)] + + all_added = [] + + # Use partial function for multiprocessing + process_func = partial(self._process_added_chunk, + target_union=target_union, + min_length_deg=self.min_gap_length_deg) + + with ProcessPoolExecutor(max_workers=self.n_jobs) as executor: + # Submit all chunk processing jobs + future_to_chunk = {executor.submit(process_func, chunk): i + for i, chunk in enumerate(chunks)} + + # Collect results as they complete + for future in as_completed(future_to_chunk): + chunk_idx = future_to_chunk[future] + try: + chunk_added = future.result() + all_added.extend(chunk_added) + print(f"Processed added chunk {chunk_idx + 1}/{len(chunks)}") + except Exception as e: + print(f"Error processing added chunk {chunk_idx}: {str(e)}") + + return all_added + + @staticmethod + def _process_added_chunk(chunk_gdf: gpd.GeoDataFrame, target_union: Any, + min_length_deg: float) -> List[Dict[str, Any]]: + """Process a chunk of geometries to find added roads.""" + added_roads = [] + + for idx, row in chunk_gdf.iterrows(): + geom = row.geometry + + try: + # Check what portion of the road is not covered + uncovered = geom.difference(target_union) + + if not uncovered.is_empty: + # Calculate what percentage of the original road is uncovered + uncovered_length = 0 + if hasattr(uncovered, 'geoms'): + for geom_part in uncovered.geoms: + if isinstance(geom_part, LineString): + uncovered_length += geom_part.length + elif isinstance(uncovered, LineString): + uncovered_length = uncovered.length + + original_length = geom.length + uncovered_ratio = uncovered_length / original_length if original_length > 0 else 0 + + # Include the entire road if: + # 1. The uncovered portion is above minimum threshold, AND + # 2. More than 50% of the road is uncovered + if uncovered_length >= min_length_deg and uncovered_ratio > 0.5: + # Include entire original road with all original metadata + properties = dict(row.drop('geometry')) + + added_roads.append({ + 'geometry': geom, + **properties + }) + + except Exception as e: + continue # Skip problematic geometries + + return added_roads + + def compare_roads(self, file1_path: str, file2_path: str) -> Tuple[List[Dict], List[Dict]]: + """ + Compare two GeoJSON files and find significant differences. + Optimized version with parallel processing. + + Returns: + Tuple of (removed_roads, added_roads) + """ + print(f"Comparing {file1_path} and {file2_path}") + print(f"Tolerance: {self.tolerance_feet} feet") + print(f"Minimum significant length: {self.min_gap_length_feet} feet") + print(f"Parallel processing: {self.n_jobs} workers") + print("-" * 50) + + # Load both files + gdf1 = self.load_geojson(file1_path) + gdf2 = self.load_geojson(file2_path) + + # Ensure both are in the same CRS + if gdf1.crs != gdf2.crs: + print(f"Warning: CRS mismatch. Converting {file2_path} to match {file1_path}") + gdf2 = gdf2.to_crs(gdf1.crs) + + print("Creating optimized spatial unions...") + + # Create buffered unions using optimized method + union1 = self.create_buffered_union_optimized(gdf1) + union2 = self.create_buffered_union_optimized(gdf2) + + print("Finding removed and added roads with parallel processing...") + + # Find roads using optimized parallel methods + removed_roads = self.find_removed_segments_optimized(gdf1, union2) + added_roads = self.find_added_roads_optimized(gdf2, union1) + + # Clean up memory + del gdf1, gdf2, union1, union2 + gc.collect() + + return removed_roads, added_roads + + def save_results(self, removed: List[Dict], added: List[Dict], output_path: str): + """Save results to GeoJSON file.""" + all_results = removed + added + + if not all_results: + print("No significant differences found!") + return + + # Create GeoDataFrame efficiently + print("Saving results...") + results_gdf = gpd.GeoDataFrame(all_results) + + # Save to file with optimization + results_gdf.to_file(output_path, driver='GeoJSON', engine='pyogrio') + print(f"Results saved to: {output_path}") + + def print_summary(self, removed: List[Dict], added: List[Dict], file1_name: str, file2_name: str): + """Print a summary of the comparison results.""" + print("\n" + "="*60) + print("COMPARISON SUMMARY") + print("="*60) + + print(f"\nFile 1: {file1_name}") + print(f"File 2: {file2_name}") + print(f"Tolerance: {self.tolerance_feet} feet") + print(f"Minimum significant length: {self.min_gap_length_feet} feet") + + if removed: + print(f"\nšŸ”“ REMOVED ROADS ({len(removed)} segments):") + print("These road segments exist in File 1 but are missing or incomplete in File 2:") + + # Calculate total length of removed segments + total_removed_length = 0 + removed_by_road = {} + + for segment in removed: + geom = segment['geometry'] + length_feet = geom.length * 364000.0 # Convert to feet + total_removed_length += length_feet + + # Get road name + road_name = "Unknown" + name_fields = ['name', 'NAME', 'road_name', 'street_name', 'FULLNAME'] + for field in name_fields: + if field in segment and pd.notna(segment[field]): + road_name = str(segment[field]) + break + + if road_name not in removed_by_road: + removed_by_road[road_name] = [] + removed_by_road[road_name].append(length_feet) + + print(f"Total removed length: {total_removed_length:,.1f} feet ({total_removed_length/5280:.2f} miles)") + + for road, lengths in sorted(removed_by_road.items()): + road_total = sum(lengths) + print(f" • {road}: {len(lengths)} segment(s), {road_total:,.1f} feet") + + if added: + print(f"\nšŸ”µ ADDED ROADS ({len(added)} roads):") + print("These roads exist in File 2 but are missing or incomplete in File 1:") + + # Calculate total length of added roads + total_added_length = 0 + added_by_road = {} + + for road in added: + geom = road['geometry'] + length_feet = geom.length * 364000.0 # Convert to feet + total_added_length += length_feet + + # Get road name + road_name = "Unknown" + name_fields = ['name', 'NAME', 'road_name', 'street_name', 'FULLNAME'] + for field in name_fields: + if field in road and pd.notna(road[field]): + road_name = str(road[field]) + break + + if road_name not in added_by_road: + added_by_road[road_name] = 0 + added_by_road[road_name] += length_feet + + print(f"Total added length: {total_added_length:,.1f} feet ({total_added_length/5280:.2f} miles)") + + for road, length in sorted(added_by_road.items()): + print(f" • {road}: {length:,.1f} feet") + + if not removed and not added: + print("\nāœ… No significant differences found!") + print("The road networks have good coverage overlap within the specified tolerance.") + + +def main(): + parser = argparse.ArgumentParser( + description="Compare two GeoJSON files containing roads and find significant gaps or extras (Optimized)", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + python compare_roads.py roads1.geojson roads2.geojson + python compare_roads.py roads1.geojson roads2.geojson --tolerance 100 --min-length 200 + python compare_roads.py roads1.geojson roads2.geojson --output differences.geojson + python compare_roads.py roads1.geojson roads2.geojson --jobs 8 --chunk-size 2000 + """ + ) + + parser.add_argument('file1', help='First GeoJSON file') + parser.add_argument('file2', help='Second GeoJSON file') + parser.add_argument('--tolerance', '-t', type=float, default=50.0, + help='Distance tolerance in feet for considering roads as overlapping (default: 50)') + parser.add_argument('--min-length', '-m', type=float, default=100.0, + help='Minimum length in feet for gaps/extras to be considered significant (default: 100)') + parser.add_argument('--output', '-o', help='Output GeoJSON file for results (optional)') + parser.add_argument('--jobs', '-j', type=int, default=None, + help='Number of parallel processes (default: CPU count - 1)') + parser.add_argument('--chunk-size', '-c', type=int, default=1000, + help='Number of geometries to process per chunk (default: 1000)') + + args = parser.parse_args() + + # Validate input files + if not Path(args.file1).exists(): + print(f"Error: File {args.file1} does not exist") + return 1 + + if not Path(args.file2).exists(): + print(f"Error: File {args.file2} does not exist") + return 1 + + try: + # Create comparator and run comparison + comparator = RoadComparator( + tolerance_feet=args.tolerance, + min_gap_length_feet=args.min_length, + n_jobs=args.jobs, + chunk_size=args.chunk_size + ) + + removed, added = comparator.compare_roads(args.file1, args.file2) + + # Print summary + comparator.print_summary(removed, added, args.file1, args.file2) + + # Save results if output file specified + if args.output: + comparator.save_results(removed, added, args.output) + elif removed or added: + # Auto-generate output filename if differences found + output_file = f"road_differences_{Path(args.file1).stem}_vs_{Path(args.file2).stem}.geojson" + comparator.save_results(removed, added, output_file) + + return 0 + + except Exception as e: + print(f"Error: {str(e)}") + return 1 + + +if __name__ == "__main__": + exit(main()) \ No newline at end of file