#!/usr/bin/env python3 """ GeoJSON Road Comparison Script - Optimized Version Compares two GeoJSON files containing road data and identifies: 1. Roads in file1 that don't have corresponding coverage in file2 (removed roads) 2. Roads in file2 that don't have corresponding coverage in file1 (added roads) Only reports differences that are significant (above minimum length threshold). Optimized for performance with parallel processing and spatial indexing. TODO: - put properties properly on removed roads, so they're visible in JOSM - handle polygons properly (on previous geojson step?) for circular roads """ import json import argparse from pathlib import Path from typing import List, Dict, Any, Tuple import geopandas as gpd from shapely.geometry import LineString, MultiLineString, Point from shapely.ops import unary_union from shapely.strtree import STRtree import pandas as pd import warnings import multiprocessing as mp from functools import partial import numpy as np from concurrent.futures import ProcessPoolExecutor, as_completed import gc import importlib qgisfunctions = importlib.import_module("qgis-functions") # Suppress warnings for cleaner output warnings.filterwarnings('ignore') import re def titlecase(s): return re.sub( r"[A-Za-z]+('[A-Za-z]+)?", lambda word: word.group(0).capitalize(), s) class RoadComparator: def __init__(self, tolerance_feet: float = 50.0, min_gap_length_feet: float = 100.0, n_jobs: int = None, chunk_size: int = 1000): """ Initialize the road comparator. Args: tolerance_feet: Distance tolerance for considering roads as overlapping (default: 50 feet) min_gap_length_feet: Minimum length of gap/extra to be considered significant (default: 100 feet) n_jobs: Number of parallel processes to use (default: CPU count - 1) chunk_size: Number of geometries to process per chunk (default: 1000) """ self.tolerance_feet = tolerance_feet self.min_gap_length_feet = min_gap_length_feet self.n_jobs = n_jobs or max(1, mp.cpu_count() - 1) self.chunk_size = chunk_size # Convert feet to degrees (approximate conversion for continental US) # 1 degree latitude ā‰ˆ 364,000 feet # 1 degree longitude ā‰ˆ 288,000 feet (at 40° latitude) self.tolerance_deg = tolerance_feet / 364000.0 self.min_gap_length_deg = min_gap_length_feet / 364000.0 print(f"Using {self.n_jobs} parallel processes with chunk size {self.chunk_size}") def load_geojson(self, filepath: str) -> gpd.GeoDataFrame: """Load and validate GeoJSON file with optimizations.""" try: # Use pyogr engine for faster loading of large files gdf = gpd.read_file(filepath, engine='pyogrio') # Filter only LineString and MultiLineString geometries line_types = ['LineString', 'MultiLineString'] gdf = gdf[gdf.geometry.type.isin(line_types)].copy() if len(gdf) == 0: raise ValueError(f"No line geometries found in {filepath}") # Reset index for efficient processing gdf = gdf.reset_index(drop=True) # Ensure geometry is valid and fix simple issues invalid_mask = ~gdf.geometry.is_valid if invalid_mask.any(): print(f"Fixing {invalid_mask.sum()} invalid geometries...") gdf.loc[invalid_mask, 'geometry'] = gdf.loc[invalid_mask, 'geometry'].buffer(0) print(f"Loaded {len(gdf)} road features from {filepath}") return gdf except Exception as e: raise Exception(f"Error loading {filepath}: {str(e)}") def create_buffered_union_optimized(self, gdf: gpd.GeoDataFrame) -> Any: """Create a buffered union using chunked processing for memory efficiency.""" print("Creating optimized buffered union...") # Process in chunks to manage memory chunks = [gdf.iloc[i:i+self.chunk_size] for i in range(0, len(gdf), self.chunk_size)] chunk_unions = [] # Use partial function for multiprocessing buffer_func = partial(self._buffer_chunk, tolerance=self.tolerance_deg) with ProcessPoolExecutor(max_workers=self.n_jobs) as executor: # Submit all chunk processing jobs future_to_chunk = {executor.submit(buffer_func, chunk): i for i, chunk in enumerate(chunks)} # Collect results as they complete for future in as_completed(future_to_chunk): chunk_idx = future_to_chunk[future] try: chunk_union = future.result() if chunk_union and not chunk_union.is_empty: chunk_unions.append(chunk_union) print(f"Processed chunk {chunk_idx + 1}/{len(chunks)}") except Exception as e: print(f"Error processing chunk {chunk_idx}: {str(e)}") # Union all chunk results print("Combining chunk unions...") if chunk_unions: final_union = unary_union(chunk_unions) # Force garbage collection del chunk_unions gc.collect() return final_union else: raise Exception("No valid geometries to create union") @staticmethod def _buffer_chunk(chunk_gdf: gpd.GeoDataFrame, tolerance: float) -> Any: """Buffer geometries in a chunk and return their union.""" try: # Buffer all geometries in the chunk buffered = chunk_gdf.geometry.buffer(tolerance) # Create union of buffered geometries if len(buffered) == 1: return buffered.iloc[0] else: return unary_union(buffered.tolist()) except Exception as e: print(f"Error in chunk processing: {str(e)}") return None def create_spatial_index(self, gdf: gpd.GeoDataFrame) -> STRtree: """Create spatial index for fast intersection queries.""" print("Creating spatial index...") # Create STRtree for fast spatial queries geometries = gdf.geometry.tolist() return STRtree(geometries) def find_removed_segments_optimized(self, source_gdf: gpd.GeoDataFrame, target_union: Any) -> List[Dict[str, Any]]: """ Find segments in source_gdf that are not covered by target_union (removed roads). Optimized with parallel processing. """ print("Finding removed segments...") # Split into chunks for parallel processing chunks = [source_gdf.iloc[i:i+self.chunk_size] for i in range(0, len(source_gdf), self.chunk_size)] all_removed = [] # Use partial function for multiprocessing process_func = partial(self._process_removed_chunk, target_union=target_union, min_length_deg=self.min_gap_length_deg) with ProcessPoolExecutor(max_workers=self.n_jobs) as executor: # Submit all chunk processing jobs future_to_chunk = {executor.submit(process_func, chunk): i for i, chunk in enumerate(chunks)} # Collect results as they complete for future in as_completed(future_to_chunk): chunk_idx = future_to_chunk[future] try: chunk_removed = future.result() all_removed.extend(chunk_removed) print(f"Processed removed chunk {chunk_idx + 1}/{len(chunks)}") except Exception as e: print(f"Error processing removed chunk {chunk_idx}: {str(e)}") return all_removed @staticmethod def _process_removed_chunk(chunk_gdf: gpd.GeoDataFrame, target_union: Any, min_length_deg: float) -> List[Dict[str, Any]]: """Process a chunk of geometries to find removed segments.""" removed_segments = [] for idx, row in chunk_gdf.iterrows(): geom = row.geometry # Handle MultiLineString by processing each component if isinstance(geom, MultiLineString): lines = list(geom.geoms) else: lines = [geom] for line in lines: try: # Find parts of the line that don't intersect with target_union uncovered = line.difference(target_union) if uncovered.is_empty: continue # Handle different geometry types returned by difference uncovered_lines = [] if hasattr(uncovered, 'geoms'): for geom_part in uncovered.geoms: if isinstance(geom_part, LineString): uncovered_lines.append(geom_part) elif isinstance(uncovered, LineString): uncovered_lines.append(uncovered) # Check each uncovered line segment for uncovered_line in uncovered_lines: if uncovered_line.length >= min_length_deg: # Create properties dict with original metadata plus 'removed: true' properties = dict(row.drop('geometry')) properties['removed'] = True removed_segments.append({ 'geometry': uncovered_line, **properties }) except Exception as e: continue # Skip problematic geometries return removed_segments def find_added_roads_optimized(self, source_gdf: gpd.GeoDataFrame, target_union: Any) -> List[Dict[str, Any]]: """ Find entire roads in source_gdf that don't significantly overlap with target_union. Optimized with parallel processing. """ print("Finding added roads...") # Split into chunks for parallel processing chunks = [source_gdf.iloc[i:i+self.chunk_size] for i in range(0, len(source_gdf), self.chunk_size)] all_added = [] # Use partial function for multiprocessing process_func = partial(self._process_added_chunk, target_union=target_union, min_length_deg=self.min_gap_length_deg) with ProcessPoolExecutor(max_workers=self.n_jobs) as executor: # Submit all chunk processing jobs future_to_chunk = {executor.submit(process_func, chunk): i for i, chunk in enumerate(chunks)} # Collect results as they complete for future in as_completed(future_to_chunk): chunk_idx = future_to_chunk[future] try: chunk_added = future.result() all_added.extend(chunk_added) print(f"Processed added chunk {chunk_idx + 1}/{len(chunks)}") except Exception as e: print(f"Error processing added chunk {chunk_idx}: {str(e)}") return all_added @staticmethod def _process_added_chunk(chunk_gdf: gpd.GeoDataFrame, target_union: Any, min_length_deg: float) -> List[Dict[str, Any]]: """Process a chunk of geometries to find added roads.""" added_roads = [] for idx, row in chunk_gdf.iterrows(): geom = row.geometry try: # Check what portion of the road is not covered uncovered = geom.difference(target_union) if not uncovered.is_empty: # Calculate what percentage of the original road is uncovered uncovered_length = 0 if hasattr(uncovered, 'geoms'): for geom_part in uncovered.geoms: if isinstance(geom_part, LineString): uncovered_length += geom_part.length elif isinstance(uncovered, LineString): uncovered_length = uncovered.length original_length = geom.length uncovered_ratio = uncovered_length / original_length if original_length > 0 else 0 # Include the entire road if: # 1. The uncovered portion is above minimum threshold, AND # 2. More than 50% of the road is uncovered if uncovered_ratio > 0.5: #uncovered_length >= min_length_deg and # Include entire original road with all original metadata original_properties = dict(row.drop('geometry')) # # For Sumter County Roads # properties = { 'surface': 'asphalt' } for key, value in original_properties.items(): if key == 'NAME': properties['name'] = titlecase(qgisfunctions.formatstreet(value,None,None)) if value is not None else None elif key == 'SpeedLimit': properties['maxspeed'] = f"{value} mph" if value is not None else None elif key == 'RoadClass': if value is None: properties['highway'] = 'residential' elif value.startswith('PRIMARY'): properties['highway'] = 'trunk' elif value.startswith('MAJOR'): properties['highway'] = 'primary' #elif value.startswith('MINOR'): else: properties['highway'] = 'residential' # else: # # Keep other properties as-is, or transform them as needed # properties[key] = value added_roads.append({ 'geometry': geom, **properties }) except Exception as e: print(e) continue # Skip problematic geometries return added_roads def compare_roads(self, file1_path: str, file2_path: str) -> Tuple[List[Dict], List[Dict]]: """ Compare two GeoJSON files and find significant differences. Optimized version with parallel processing. Returns: Tuple of (removed_roads, added_roads) """ print(f"Comparing {file1_path} and {file2_path}") print(f"Tolerance: {self.tolerance_feet} feet") print(f"Minimum significant length: {self.min_gap_length_feet} feet") print(f"Parallel processing: {self.n_jobs} workers") print("-" * 50) # Load both files gdf1 = self.load_geojson(file1_path) gdf2 = self.load_geojson(file2_path) # Ensure both are in the same CRS if gdf1.crs != gdf2.crs: print(f"Warning: CRS mismatch. Converting {file2_path} to match {file1_path}") gdf2 = gdf2.to_crs(gdf1.crs) print("Creating optimized spatial unions...") # Create buffered unions using optimized method union1 = self.create_buffered_union_optimized(gdf1) union2 = self.create_buffered_union_optimized(gdf2) print("Finding removed and added roads with parallel processing...") # Find roads using optimized parallel methods removed_roads = self.find_removed_segments_optimized(gdf1, union2) added_roads = self.find_added_roads_optimized(gdf2, union1) # Clean up memory del gdf1, gdf2, union1, union2 gc.collect() return removed_roads, added_roads def save_results(self, removed: List[Dict], added: List[Dict], output_path: str): """Save results to GeoJSON file.""" all_results = removed + added if not all_results: print("No significant differences found!") return # Create GeoDataFrame efficiently print("Saving results...") results_gdf = gpd.GeoDataFrame(all_results) # Save to file with optimization results_gdf.to_file(output_path, driver='GeoJSON', engine='pyogrio') print(f"Results saved to: {output_path}") def print_summary(self, removed: List[Dict], added: List[Dict], file1_name: str, file2_name: str): """Print a summary of the comparison results.""" print("\n" + "="*60) print("COMPARISON SUMMARY") print("="*60) print(f"\nFile 1: {file1_name}") print(f"File 2: {file2_name}") print(f"Tolerance: {self.tolerance_feet} feet") print(f"Minimum significant length: {self.min_gap_length_feet} feet") if removed: print(f"\nšŸ”“ REMOVED ROADS ({len(removed)} segments):") print("These road segments exist in File 1 but are missing or incomplete in File 2:") # Calculate total length of removed segments total_removed_length = 0 removed_by_road = {} for segment in removed: geom = segment['geometry'] length_feet = geom.length * 364000.0 # Convert to feet total_removed_length += length_feet # Get road name road_name = "Unknown" name_fields = ['name', 'NAME', 'road_name', 'street_name', 'FULLNAME'] for field in name_fields: if field in segment and pd.notna(segment[field]): road_name = str(segment[field]) break if road_name not in removed_by_road: removed_by_road[road_name] = [] removed_by_road[road_name].append(length_feet) print(f"Total removed length: {total_removed_length:,.1f} feet ({total_removed_length/5280:.2f} miles)") for road, lengths in sorted(removed_by_road.items()): road_total = sum(lengths) print(f" • {road}: {len(lengths)} segment(s), {road_total:,.1f} feet") if added: print(f"\nšŸ”µ ADDED ROADS ({len(added)} roads):") print("These roads exist in File 2 but are missing or incomplete in File 1:") # Calculate total length of added roads total_added_length = 0 added_by_road = {} for road in added: geom = road['geometry'] length_feet = geom.length * 364000.0 # Convert to feet total_added_length += length_feet # Get road name road_name = "Unknown" name_fields = ['name', 'NAME', 'road_name', 'street_name', 'FULLNAME'] for field in name_fields: if field in road and pd.notna(road[field]): road_name = str(road[field]) break if road_name not in added_by_road: added_by_road[road_name] = 0 added_by_road[road_name] += length_feet print(f"Total added length: {total_added_length:,.1f} feet ({total_added_length/5280:.2f} miles)") for road, length in sorted(added_by_road.items()): print(f" • {road}: {length:,.1f} feet") if not removed and not added: print("\nāœ… No significant differences found!") print("The road networks have good coverage overlap within the specified tolerance.") def main(): parser = argparse.ArgumentParser( description="Compare two GeoJSON files containing roads and find significant gaps or extras (Optimized)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python compare_roads.py roads1.geojson roads2.geojson python compare_roads.py roads1.geojson roads2.geojson --tolerance 100 --min-length 200 python compare_roads.py roads1.geojson roads2.geojson --output differences.geojson python compare_roads.py roads1.geojson roads2.geojson --jobs 8 --chunk-size 2000 """ ) parser.add_argument('file1', help='First GeoJSON file') parser.add_argument('file2', help='Second GeoJSON file') parser.add_argument('--tolerance', '-t', type=float, default=50.0, help='Distance tolerance in feet for considering roads as overlapping (default: 50)') parser.add_argument('--min-length', '-m', type=float, default=100.0, help='Minimum length in feet for gaps/extras to be considered significant (default: 100)') parser.add_argument('--output', '-o', help='Output GeoJSON file for results (optional)') parser.add_argument('--jobs', '-j', type=int, default=None, help='Number of parallel processes (default: CPU count - 1)') parser.add_argument('--chunk-size', '-c', type=int, default=1000, help='Number of geometries to process per chunk (default: 1000)') args = parser.parse_args() # Validate input files if not Path(args.file1).exists(): print(f"Error: File {args.file1} does not exist") return 1 if not Path(args.file2).exists(): print(f"Error: File {args.file2} does not exist") return 1 try: # Create comparator and run comparison comparator = RoadComparator( tolerance_feet=args.tolerance, min_gap_length_feet=args.min_length, n_jobs=args.jobs, chunk_size=args.chunk_size ) removed, added = comparator.compare_roads(args.file1, args.file2) # Print summary comparator.print_summary(removed, added, args.file1, args.file2) # Save results if output file specified if args.output: comparator.save_results(removed, added, args.output) elif removed or added: # Auto-generate output filename if differences found output_file = f"road_differences_{Path(args.file1).stem}_vs_{Path(args.file2).stem}.geojson" comparator.save_results(removed, added, output_file) return 0 except Exception as e: print(f"Error: {str(e)}") return 1 if __name__ == "__main__": exit(main())