#!/usr/bin/env python3 """ GeoJSON Road Comparison Script Compares two GeoJSON files containing road data and identifies: 1. Roads in file1 that don't have corresponding coverage in file2 (gaps in file2) 2. Roads in file2 that don't have corresponding coverage in file1 (extra in file2) Only reports differences that are significant (above minimum length threshold). """ import json import argparse from pathlib import Path from typing import List, Dict, Any, Tuple import geopandas as gpd from shapely.geometry import LineString, MultiLineString, Point from shapely.ops import unary_union import pandas as pd import warnings # Suppress warnings for cleaner output warnings.filterwarnings('ignore') class RoadComparator: def __init__(self, tolerance_feet: float = 50.0, min_gap_length_feet: float = 100.0): """ Initialize the road comparator. Args: tolerance_feet: Distance tolerance for considering roads as overlapping (default: 50 feet) min_gap_length_feet: Minimum length of gap/extra to be considered significant (default: 100 feet) """ self.tolerance_feet = tolerance_feet self.min_gap_length_feet = min_gap_length_feet # Convert feet to degrees (approximate conversion for continental US) # 1 degree latitude ā‰ˆ 364,000 feet # 1 degree longitude ā‰ˆ 288,000 feet (at 40° latitude) self.tolerance_deg = tolerance_feet / 364000.0 self.min_gap_length_deg = min_gap_length_feet / 364000.0 def load_geojson(self, filepath: str) -> gpd.GeoDataFrame: """Load and validate GeoJSON file.""" try: gdf = gpd.read_file(filepath) # Filter only LineString and MultiLineString geometries line_types = ['LineString', 'MultiLineString'] gdf = gdf[gdf.geometry.type.isin(line_types)] if len(gdf) == 0: raise ValueError(f"No line geometries found in {filepath}") print(f"Loaded {len(gdf)} road features from {filepath}") return gdf except Exception as e: raise Exception(f"Error loading {filepath}: {str(e)}") def create_buffer_union(self, gdf: gpd.GeoDataFrame) -> Any: """Create a buffered union of all geometries in the GeoDataFrame.""" # Buffer all geometries buffered_geoms = gdf.geometry.buffer(self.tolerance_deg) # Create union of all buffered geometries union_geom = unary_union(buffered_geoms.tolist()) return union_geom def find_uncovered_segments(self, source_gdf: gpd.GeoDataFrame, target_union: Any, label: str) -> List[Dict[str, Any]]: """ Find segments in source_gdf that are not covered by target_union. Args: source_gdf: GeoDataFrame with roads to check for coverage target_union: Buffered union geometry from the other dataset label: Label for the type of difference (e.g., "gap_in_file2", "extra_in_file2") Returns: List of uncovered segments with metadata """ uncovered_segments = [] for idx, row in source_gdf.iterrows(): geom = row.geometry # Handle MultiLineString by processing each component if isinstance(geom, MultiLineString): lines = list(geom.geoms) else: lines = [geom] for line in lines: # Find parts of the line that don't intersect with target_union try: uncovered = line.difference(target_union) # Skip if no uncovered area if uncovered.is_empty: continue # Handle different geometry types returned by difference uncovered_lines = [] if hasattr(uncovered, 'geoms'): # MultiLineString or GeometryCollection for geom_part in uncovered.geoms: if isinstance(geom_part, LineString): uncovered_lines.append(geom_part) elif isinstance(uncovered, LineString): uncovered_lines.append(uncovered) # Check each uncovered line segment for uncovered_line in uncovered_lines: # Calculate length in degrees, then convert to feet length_deg = uncovered_line.length length_feet = length_deg * 364000.0 # Approximate conversion # Only include if above minimum threshold if length_feet >= self.min_gap_length_feet: # Get road name/identifier if available road_name = "Unknown" name_fields = ['name', 'NAME', 'road_name', 'street_name', 'FULLNAME'] for field in name_fields: if field in row and pd.notna(row[field]): road_name = str(row[field]) break uncovered_segments.append({ 'type': label, 'road_name': road_name, 'length_feet': round(length_feet, 1), 'geometry': uncovered_line, 'original_index': idx, 'original_properties': dict(row.drop('geometry')) }) except Exception as e: print(f"Warning: Error processing geometry at index {idx}: {str(e)}") continue return uncovered_segments def compare_roads(self, file1_path: str, file2_path: str) -> Tuple[List[Dict], List[Dict]]: """ Compare two GeoJSON files and find significant differences. Returns: Tuple of (gaps_in_file2, extras_in_file2) """ print(f"Comparing {file1_path} and {file2_path}") print(f"Tolerance: {self.tolerance_feet} feet") print(f"Minimum significant length: {self.min_gap_length_feet} feet") print("-" * 50) # Load both files gdf1 = self.load_geojson(file1_path) gdf2 = self.load_geojson(file2_path) # Ensure both are in the same CRS if gdf1.crs != gdf2.crs: print(f"Warning: CRS mismatch. Converting {file2_path} to match {file1_path}") gdf2 = gdf2.to_crs(gdf1.crs) print("Creating spatial unions...") # Create buffered unions union1 = self.create_buffer_union(gdf1) union2 = self.create_buffer_union(gdf2) print("Finding gaps and extras...") # Find roads in file1 not covered by file2 (gaps in file2) gaps_in_file2 = self.find_uncovered_segments(gdf1, union2, "gap_in_file2") # Find roads in file2 not covered by file1 (extras in file2) extras_in_file2 = self.find_uncovered_segments(gdf2, union1, "extra_in_file2") return gaps_in_file2, extras_in_file2 def save_results(self, gaps: List[Dict], extras: List[Dict], output_path: str): """Save results to GeoJSON file.""" all_results = gaps + extras if not all_results: print("No significant differences found!") return # Create GeoDataFrame results_gdf = gpd.GeoDataFrame(all_results) # Save to file results_gdf.to_file(output_path, driver='GeoJSON') print(f"Results saved to: {output_path}") def print_summary(self, gaps: List[Dict], extras: List[Dict], file1_name: str, file2_name: str): """Print a summary of the comparison results.""" print("\n" + "="*60) print("COMPARISON SUMMARY") print("="*60) print(f"\nFile 1: {file1_name}") print(f"File 2: {file2_name}") print(f"Tolerance: {self.tolerance_feet} feet") print(f"Minimum significant length: {self.min_gap_length_feet} feet") if gaps: print(f"\nšŸ”“ GAPS IN FILE 2 ({len(gaps)} segments):") print("These road segments exist in File 1 but are missing or incomplete in File 2:") total_gap_length = sum(g['length_feet'] for g in gaps) print(f"Total gap length: {total_gap_length:,.1f} feet ({total_gap_length/5280:.2f} miles)") # Group by road name gap_by_road = {} for gap in gaps: road = gap['road_name'] if road not in gap_by_road: gap_by_road[road] = [] gap_by_road[road].append(gap) for road, road_gaps in sorted(gap_by_road.items()): road_total = sum(g['length_feet'] for g in road_gaps) print(f" • {road}: {len(road_gaps)} segment(s), {road_total:,.1f} feet") if extras: print(f"\nšŸ”µ EXTRAS IN FILE 2 ({len(extras)} segments):") print("These road segments exist in File 2 but are missing or incomplete in File 1:") total_extra_length = sum(e['length_feet'] for e in extras) print(f"Total extra length: {total_extra_length:,.1f} feet ({total_extra_length/5280:.2f} miles)") # Group by road name extra_by_road = {} for extra in extras: road = extra['road_name'] if road not in extra_by_road: extra_by_road[road] = [] extra_by_road[road].append(extra) for road, road_extras in sorted(extra_by_road.items()): road_total = sum(e['length_feet'] for e in road_extras) print(f" • {road}: {len(road_extras)} segment(s), {road_total:,.1f} feet") if not gaps and not extras: print("\nāœ… No significant differences found!") print("The road networks have good coverage overlap within the specified tolerance.") def main(): parser = argparse.ArgumentParser( description="Compare two GeoJSON files containing roads and find significant gaps or extras", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python compare_roads.py roads1.geojson roads2.geojson python compare_roads.py roads1.geojson roads2.geojson --tolerance 100 --min-length 200 python compare_roads.py roads1.geojson roads2.geojson --output differences.geojson """ ) parser.add_argument('file1', help='First GeoJSON file') parser.add_argument('file2', help='Second GeoJSON file') parser.add_argument('--tolerance', '-t', type=float, default=50.0, help='Distance tolerance in feet for considering roads as overlapping (default: 50)') parser.add_argument('--min-length', '-m', type=float, default=100.0, help='Minimum length in feet for gaps/extras to be considered significant (default: 100)') parser.add_argument('--output', '-o', help='Output GeoJSON file for results (optional)') args = parser.parse_args() # Validate input files if not Path(args.file1).exists(): print(f"Error: File {args.file1} does not exist") return 1 if not Path(args.file2).exists(): print(f"Error: File {args.file2} does not exist") return 1 try: # Create comparator and run comparison comparator = RoadComparator( tolerance_feet=args.tolerance, min_gap_length_feet=args.min_length ) gaps, extras = comparator.compare_roads(args.file1, args.file2) # Print summary comparator.print_summary(gaps, extras, args.file1, args.file2) # Save results if output file specified if args.output: comparator.save_results(gaps, extras, args.output) return 0 except Exception as e: print(f"Error: {str(e)}") return 1 if __name__ == "__main__": exit(main())