#!/usr/bin/env python3 """ GeoJSON Road Comparison Script Compares two GeoJSON files containing road data and identifies: 1. Roads in file1 that don't have corresponding coverage in file2 (gaps in file2) 2. Roads in file2 that don't have corresponding coverage in file1 (extra in file2) Only reports differences that are significant (above minimum length threshold). """ import json import argparse from pathlib import Path from typing import List, Dict, Any, Tuple import geopandas as gpd from shapely.geometry import LineString, MultiLineString, Point from shapely.ops import unary_union import pandas as pd import warnings # Suppress warnings for cleaner output warnings.filterwarnings('ignore') class RoadComparator: def __init__(self, tolerance_feet: float = 50.0, min_gap_length_feet: float = 100.0): """ Initialize the road comparator. Args: tolerance_feet: Distance tolerance for considering roads as overlapping (default: 50 feet) min_gap_length_feet: Minimum length of gap/extra to be considered significant (default: 100 feet) """ self.tolerance_feet = tolerance_feet self.min_gap_length_feet = min_gap_length_feet # Convert feet to degrees (approximate conversion for continental US) # 1 degree latitude ā‰ˆ 364,000 feet # 1 degree longitude ā‰ˆ 288,000 feet (at 40° latitude) self.tolerance_deg = tolerance_feet / 364000.0 self.min_gap_length_deg = min_gap_length_feet / 364000.0 def load_geojson(self, filepath: str) -> gpd.GeoDataFrame: """Load and validate GeoJSON file.""" try: gdf = gpd.read_file(filepath) # Filter only LineString and MultiLineString geometries line_types = ['LineString', 'MultiLineString'] gdf = gdf[gdf.geometry.type.isin(line_types)] if len(gdf) == 0: raise ValueError(f"No line geometries found in {filepath}") print(f"Loaded {len(gdf)} road features from {filepath}") return gdf except Exception as e: raise Exception(f"Error loading {filepath}: {str(e)}") def create_buffer_union(self, gdf: gpd.GeoDataFrame) -> Any: """Create a buffered union of all geometries in the GeoDataFrame.""" # Buffer all geometries buffered_geoms = gdf.geometry.buffer(self.tolerance_deg) # Create union of all buffered geometries union_geom = unary_union(buffered_geoms.tolist()) return union_geom def find_removed_segments(self, source_gdf: gpd.GeoDataFrame, target_union: Any) -> List[Dict[str, Any]]: """ Find segments in source_gdf that are not covered by target_union (removed roads). Args: source_gdf: GeoDataFrame with roads to check for coverage target_union: Buffered union geometry from the other dataset Returns: List of removed segments with original metadata plus 'removed: true' """ removed_segments = [] for idx, row in source_gdf.iterrows(): geom = row.geometry # Handle MultiLineString by processing each component if isinstance(geom, MultiLineString): lines = list(geom.geoms) else: lines = [geom] for line in lines: # Find parts of the line that don't intersect with target_union try: uncovered = line.difference(target_union) # Skip if no uncovered area if uncovered.is_empty: continue # Handle different geometry types returned by difference uncovered_lines = [] if hasattr(uncovered, 'geoms'): # MultiLineString or GeometryCollection for geom_part in uncovered.geoms: if isinstance(geom_part, LineString): uncovered_lines.append(geom_part) elif isinstance(uncovered, LineString): uncovered_lines.append(uncovered) # Check each uncovered line segment for uncovered_line in uncovered_lines: # Calculate length in degrees, then convert to feet length_deg = uncovered_line.length length_feet = length_deg * 364000.0 # Approximate conversion # Only include if above minimum threshold if length_feet >= self.min_gap_length_feet: # Create properties dict with original metadata plus 'removed: true' properties = dict(row.drop('geometry')) properties['removed'] = True removed_segments.append({ 'geometry': uncovered_line, **properties }) except Exception as e: print(f"Warning: Error processing geometry at index {idx}: {str(e)}") continue return removed_segments def find_added_roads(self, source_gdf: gpd.GeoDataFrame, target_union: Any) -> List[Dict[str, Any]]: """ Find entire roads in source_gdf that don't significantly overlap with target_union (added roads). Args: source_gdf: GeoDataFrame with roads to check for coverage target_union: Buffered union geometry from the other dataset Returns: List of entire added roads with original metadata unchanged """ added_roads = [] for idx, row in source_gdf.iterrows(): geom = row.geometry try: # Check what portion of the road is not covered uncovered = geom.difference(target_union) if not uncovered.is_empty: # Calculate what percentage of the original road is uncovered uncovered_length = 0 if hasattr(uncovered, 'geoms'): for geom_part in uncovered.geoms: if isinstance(geom_part, LineString): uncovered_length += geom_part.length elif isinstance(uncovered, LineString): uncovered_length = uncovered.length original_length = geom.length uncovered_ratio = uncovered_length / original_length if original_length > 0 else 0 # Convert to feet for minimum length check uncovered_length_feet = uncovered_length * 364000.0 # Include the entire road if: # 1. The uncovered portion is above minimum threshold, AND # 2. More than 50% of the road is uncovered (indicating it's genuinely new/added) if uncovered_length_feet >= self.min_gap_length_feet and uncovered_ratio > 0.5: # Include entire original road with all original metadata properties = dict(row.drop('geometry')) added_roads.append({ 'geometry': geom, # Use original geometry, not just the uncovered part **properties }) except Exception as e: print(f"Warning: Error processing geometry at index {idx}: {str(e)}") continue return added_roads def compare_roads(self, file1_path: str, file2_path: str) -> Tuple[List[Dict], List[Dict]]: """ Compare two GeoJSON files and find significant differences. Returns: Tuple of (removed_roads, added_roads) """ print(f"Comparing {file1_path} and {file2_path}") print(f"Tolerance: {self.tolerance_feet} feet") print(f"Minimum significant length: {self.min_gap_length_feet} feet") print("-" * 50) # Load both files gdf1 = self.load_geojson(file1_path) gdf2 = self.load_geojson(file2_path) # Ensure both are in the same CRS if gdf1.crs != gdf2.crs: print(f"Warning: CRS mismatch. Converting {file2_path} to match {file1_path}") gdf2 = gdf2.to_crs(gdf1.crs) print("Creating spatial unions...") # Create buffered unions union1 = self.create_buffer_union(gdf1) union2 = self.create_buffer_union(gdf2) print("Finding removed and added roads...") # Find roads in file1 not covered by file2 (removed roads) removed_roads = self.find_removed_segments(gdf1, union2) # Find roads in file2 not covered by file1 (added roads) added_roads = self.find_added_roads(gdf2, union1) return removed_roads, added_roads def save_results(self, removed: List[Dict], added: List[Dict], output_path: str): """Save results to GeoJSON file.""" all_results = removed + added if not all_results: print("No significant differences found!") return # Create GeoDataFrame results_gdf = gpd.GeoDataFrame(all_results) # Save to file results_gdf.to_file(output_path, driver='GeoJSON') print(f"Results saved to: {output_path}") def print_summary(self, removed: List[Dict], added: List[Dict], file1_name: str, file2_name: str): """Print a summary of the comparison results.""" print("\n" + "="*60) print("COMPARISON SUMMARY") print("="*60) print(f"\nFile 1: {file1_name}") print(f"File 2: {file2_name}") print(f"Tolerance: {self.tolerance_feet} feet") print(f"Minimum significant length: {self.min_gap_length_feet} feet") if removed: print(f"\nšŸ”“ REMOVED ROADS ({len(removed)} segments):") print("These road segments exist in File 1 but are missing or incomplete in File 2:") # Calculate total length of removed segments total_removed_length = 0 removed_by_road = {} for segment in removed: geom = segment['geometry'] length_feet = geom.length * 364000.0 # Convert to feet total_removed_length += length_feet # Get road name road_name = "Unknown" name_fields = ['name', 'NAME', 'road_name', 'street_name', 'FULLNAME'] for field in name_fields: if field in segment and pd.notna(segment[field]): road_name = str(segment[field]) break if road_name not in removed_by_road: removed_by_road[road_name] = [] removed_by_road[road_name].append(length_feet) print(f"Total removed length: {total_removed_length:,.1f} feet ({total_removed_length/5280:.2f} miles)") for road, lengths in sorted(removed_by_road.items()): road_total = sum(lengths) print(f" • {road}: {len(lengths)} segment(s), {road_total:,.1f} feet") if added: print(f"\nšŸ”µ ADDED ROADS ({len(added)} roads):") print("These roads exist in File 2 but are missing or incomplete in File 1:") # Calculate total length of added roads total_added_length = 0 added_by_road = {} for road in added: geom = road['geometry'] length_feet = geom.length * 364000.0 # Convert to feet total_added_length += length_feet # Get road name road_name = "Unknown" name_fields = ['name', 'NAME', 'road_name', 'street_name', 'FULLNAME'] for field in name_fields: if field in road and pd.notna(road[field]): road_name = str(road[field]) break if road_name not in added_by_road: added_by_road[road_name] = 0 added_by_road[road_name] += length_feet print(f"Total added length: {total_added_length:,.1f} feet ({total_added_length/5280:.2f} miles)") for road, length in sorted(added_by_road.items()): print(f" • {road}: {length:,.1f} feet") if not removed and not added: print("\nāœ… No significant differences found!") print("The road networks have good coverage overlap within the specified tolerance.") def main(): parser = argparse.ArgumentParser( description="Compare two GeoJSON files containing roads and find significant gaps or extras", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python compare_roads.py roads1.geojson roads2.geojson python compare_roads.py roads1.geojson roads2.geojson --tolerance 100 --min-length 200 python compare_roads.py roads1.geojson roads2.geojson --output differences.geojson """ ) parser.add_argument('file1', help='First GeoJSON file') parser.add_argument('file2', help='Second GeoJSON file') parser.add_argument('--tolerance', '-t', type=float, default=50.0, help='Distance tolerance in feet for considering roads as overlapping (default: 50)') parser.add_argument('--min-length', '-m', type=float, default=100.0, help='Minimum length in feet for gaps/extras to be considered significant (default: 100)') parser.add_argument('--output', '-o', help='Output GeoJSON file for results (optional)') args = parser.parse_args() # Validate input files if not Path(args.file1).exists(): print(f"Error: File {args.file1} does not exist") return 1 if not Path(args.file2).exists(): print(f"Error: File {args.file2} does not exist") return 1 try: # Create comparator and run comparison comparator = RoadComparator( tolerance_feet=args.tolerance, min_gap_length_feet=args.min_length ) removed, added = comparator.compare_roads(args.file1, args.file2) # Print summary comparator.print_summary(removed, added, args.file1, args.file2) # Save results if output file specified if args.output: comparator.save_results(removed, added, args.output) elif removed or added: # Auto-generate output filename if differences found output_file = f"road_differences_{Path(args.file1).stem}_vs_{Path(args.file2).stem}.geojson" comparator.save_results(removed, added, output_file) return 0 except Exception as e: print(f"Error: {str(e)}") return 1 if __name__ == "__main__": exit(main())