road-conflator/main.py
2025-06-26 15:57:15 -07:00

309 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""
GeoJSON Road Comparison Script
Compares two GeoJSON files containing road data and identifies:
1. Roads in file1 that don't have corresponding coverage in file2 (gaps in file2)
2. Roads in file2 that don't have corresponding coverage in file1 (extra in file2)
Only reports differences that are significant (above minimum length threshold).
"""
import json
import argparse
from pathlib import Path
from typing import List, Dict, Any, Tuple
import geopandas as gpd
from shapely.geometry import LineString, MultiLineString, Point
from shapely.ops import unary_union
import pandas as pd
import warnings
# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')
class RoadComparator:
def __init__(self, tolerance_feet: float = 50.0, min_gap_length_feet: float = 100.0):
"""
Initialize the road comparator.
Args:
tolerance_feet: Distance tolerance for considering roads as overlapping (default: 50 feet)
min_gap_length_feet: Minimum length of gap/extra to be considered significant (default: 100 feet)
"""
self.tolerance_feet = tolerance_feet
self.min_gap_length_feet = min_gap_length_feet
# Convert feet to degrees (approximate conversion for continental US)
# 1 degree latitude ≈ 364,000 feet
# 1 degree longitude ≈ 288,000 feet (at 40° latitude)
self.tolerance_deg = tolerance_feet / 364000.0
self.min_gap_length_deg = min_gap_length_feet / 364000.0
def load_geojson(self, filepath: str) -> gpd.GeoDataFrame:
"""Load and validate GeoJSON file."""
try:
gdf = gpd.read_file(filepath)
# Filter only LineString and MultiLineString geometries
line_types = ['LineString', 'MultiLineString']
gdf = gdf[gdf.geometry.type.isin(line_types)]
if len(gdf) == 0:
raise ValueError(f"No line geometries found in {filepath}")
print(f"Loaded {len(gdf)} road features from {filepath}")
return gdf
except Exception as e:
raise Exception(f"Error loading {filepath}: {str(e)}")
def create_buffer_union(self, gdf: gpd.GeoDataFrame) -> Any:
"""Create a buffered union of all geometries in the GeoDataFrame."""
# Buffer all geometries
buffered_geoms = gdf.geometry.buffer(self.tolerance_deg)
# Create union of all buffered geometries
union_geom = unary_union(buffered_geoms.tolist())
return union_geom
def find_uncovered_segments(self, source_gdf: gpd.GeoDataFrame,
target_union: Any,
label: str) -> List[Dict[str, Any]]:
"""
Find segments in source_gdf that are not covered by target_union.
Args:
source_gdf: GeoDataFrame with roads to check for coverage
target_union: Buffered union geometry from the other dataset
label: Label for the type of difference (e.g., "gap_in_file2", "extra_in_file2")
Returns:
List of uncovered segments with metadata
"""
uncovered_segments = []
for idx, row in source_gdf.iterrows():
geom = row.geometry
# Handle MultiLineString by processing each component
if isinstance(geom, MultiLineString):
lines = list(geom.geoms)
else:
lines = [geom]
for line in lines:
# Find parts of the line that don't intersect with target_union
try:
uncovered = line.difference(target_union)
# Skip if no uncovered area
if uncovered.is_empty:
continue
# Handle different geometry types returned by difference
uncovered_lines = []
if hasattr(uncovered, 'geoms'):
# MultiLineString or GeometryCollection
for geom_part in uncovered.geoms:
if isinstance(geom_part, LineString):
uncovered_lines.append(geom_part)
elif isinstance(uncovered, LineString):
uncovered_lines.append(uncovered)
# Check each uncovered line segment
for uncovered_line in uncovered_lines:
# Calculate length in degrees, then convert to feet
length_deg = uncovered_line.length
length_feet = length_deg * 364000.0 # Approximate conversion
# Only include if above minimum threshold
if length_feet >= self.min_gap_length_feet:
# Get road name/identifier if available
road_name = "Unknown"
name_fields = ['name', 'NAME', 'road_name', 'street_name', 'FULLNAME']
for field in name_fields:
if field in row and pd.notna(row[field]):
road_name = str(row[field])
break
uncovered_segments.append({
'type': label,
'road_name': road_name,
'length_feet': round(length_feet, 1),
'geometry': uncovered_line,
'original_index': idx,
'original_properties': dict(row.drop('geometry'))
})
except Exception as e:
print(f"Warning: Error processing geometry at index {idx}: {str(e)}")
continue
return uncovered_segments
def compare_roads(self, file1_path: str, file2_path: str) -> Tuple[List[Dict], List[Dict]]:
"""
Compare two GeoJSON files and find significant differences.
Returns:
Tuple of (gaps_in_file2, extras_in_file2)
"""
print(f"Comparing {file1_path} and {file2_path}")
print(f"Tolerance: {self.tolerance_feet} feet")
print(f"Minimum significant length: {self.min_gap_length_feet} feet")
print("-" * 50)
# Load both files
gdf1 = self.load_geojson(file1_path)
gdf2 = self.load_geojson(file2_path)
# Ensure both are in the same CRS
if gdf1.crs != gdf2.crs:
print(f"Warning: CRS mismatch. Converting {file2_path} to match {file1_path}")
gdf2 = gdf2.to_crs(gdf1.crs)
print("Creating spatial unions...")
# Create buffered unions
union1 = self.create_buffer_union(gdf1)
union2 = self.create_buffer_union(gdf2)
print("Finding gaps and extras...")
# Find roads in file1 not covered by file2 (gaps in file2)
gaps_in_file2 = self.find_uncovered_segments(gdf1, union2, "gap_in_file2")
# Find roads in file2 not covered by file1 (extras in file2)
extras_in_file2 = self.find_uncovered_segments(gdf2, union1, "extra_in_file2")
return gaps_in_file2, extras_in_file2
def save_results(self, gaps: List[Dict], extras: List[Dict], output_path: str):
"""Save results to GeoJSON file."""
all_results = gaps + extras
if not all_results:
print("No significant differences found!")
return
# Create GeoDataFrame
results_gdf = gpd.GeoDataFrame(all_results)
# Save to file
results_gdf.to_file(output_path, driver='GeoJSON')
print(f"Results saved to: {output_path}")
def print_summary(self, gaps: List[Dict], extras: List[Dict], file1_name: str, file2_name: str):
"""Print a summary of the comparison results."""
print("\n" + "="*60)
print("COMPARISON SUMMARY")
print("="*60)
print(f"\nFile 1: {file1_name}")
print(f"File 2: {file2_name}")
print(f"Tolerance: {self.tolerance_feet} feet")
print(f"Minimum significant length: {self.min_gap_length_feet} feet")
if gaps:
print(f"\n🔴 GAPS IN FILE 2 ({len(gaps)} segments):")
print("These road segments exist in File 1 but are missing or incomplete in File 2:")
total_gap_length = sum(g['length_feet'] for g in gaps)
print(f"Total gap length: {total_gap_length:,.1f} feet ({total_gap_length/5280:.2f} miles)")
# Group by road name
gap_by_road = {}
for gap in gaps:
road = gap['road_name']
if road not in gap_by_road:
gap_by_road[road] = []
gap_by_road[road].append(gap)
for road, road_gaps in sorted(gap_by_road.items()):
road_total = sum(g['length_feet'] for g in road_gaps)
print(f"{road}: {len(road_gaps)} segment(s), {road_total:,.1f} feet")
if extras:
print(f"\n🔵 EXTRAS IN FILE 2 ({len(extras)} segments):")
print("These road segments exist in File 2 but are missing or incomplete in File 1:")
total_extra_length = sum(e['length_feet'] for e in extras)
print(f"Total extra length: {total_extra_length:,.1f} feet ({total_extra_length/5280:.2f} miles)")
# Group by road name
extra_by_road = {}
for extra in extras:
road = extra['road_name']
if road not in extra_by_road:
extra_by_road[road] = []
extra_by_road[road].append(extra)
for road, road_extras in sorted(extra_by_road.items()):
road_total = sum(e['length_feet'] for e in road_extras)
print(f"{road}: {len(road_extras)} segment(s), {road_total:,.1f} feet")
if not gaps and not extras:
print("\n✅ No significant differences found!")
print("The road networks have good coverage overlap within the specified tolerance.")
def main():
parser = argparse.ArgumentParser(
description="Compare two GeoJSON files containing roads and find significant gaps or extras",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python compare_roads.py roads1.geojson roads2.geojson
python compare_roads.py roads1.geojson roads2.geojson --tolerance 100 --min-length 200
python compare_roads.py roads1.geojson roads2.geojson --output differences.geojson
"""
)
parser.add_argument('file1', help='First GeoJSON file')
parser.add_argument('file2', help='Second GeoJSON file')
parser.add_argument('--tolerance', '-t', type=float, default=50.0,
help='Distance tolerance in feet for considering roads as overlapping (default: 50)')
parser.add_argument('--min-length', '-m', type=float, default=100.0,
help='Minimum length in feet for gaps/extras to be considered significant (default: 100)')
parser.add_argument('--output', '-o', help='Output GeoJSON file for results (optional)')
args = parser.parse_args()
# Validate input files
if not Path(args.file1).exists():
print(f"Error: File {args.file1} does not exist")
return 1
if not Path(args.file2).exists():
print(f"Error: File {args.file2} does not exist")
return 1
try:
# Create comparator and run comparison
comparator = RoadComparator(
tolerance_feet=args.tolerance,
min_gap_length_feet=args.min_length
)
gaps, extras = comparator.compare_roads(args.file1, args.file2)
# Print summary
comparator.print_summary(gaps, extras, args.file1, args.file2)
# Save results if output file specified
if args.output:
comparator.save_results(gaps, extras, args.output)
elif gaps or extras:
# Auto-generate output filename if differences found
output_file = f"road_differences_{Path(args.file1).stem}_vs_{Path(args.file2).stem}.geojson"
comparator.save_results(gaps, extras, output_file)
return 0
except Exception as e:
print(f"Error: {str(e)}")
return 1
if __name__ == "__main__":
exit(main())