road-conflator/main.py

379 lines
16 KiB
Python

#!/usr/bin/env python3
"""
GeoJSON Road Comparison Script
Compares two GeoJSON files containing road data and identifies:
1. Roads in file1 that don't have corresponding coverage in file2 (gaps in file2)
2. Roads in file2 that don't have corresponding coverage in file1 (extra in file2)
Only reports differences that are significant (above minimum length threshold).
"""
import json
import argparse
from pathlib import Path
from typing import List, Dict, Any, Tuple
import geopandas as gpd
from shapely.geometry import LineString, MultiLineString, Point
from shapely.ops import unary_union
import pandas as pd
import warnings
# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')
class RoadComparator:
def __init__(self, tolerance_feet: float = 50.0, min_gap_length_feet: float = 100.0):
"""
Initialize the road comparator.
Args:
tolerance_feet: Distance tolerance for considering roads as overlapping (default: 50 feet)
min_gap_length_feet: Minimum length of gap/extra to be considered significant (default: 100 feet)
"""
self.tolerance_feet = tolerance_feet
self.min_gap_length_feet = min_gap_length_feet
# Convert feet to degrees (approximate conversion for continental US)
# 1 degree latitude ≈ 364,000 feet
# 1 degree longitude ≈ 288,000 feet (at 40° latitude)
self.tolerance_deg = tolerance_feet / 364000.0
self.min_gap_length_deg = min_gap_length_feet / 364000.0
def load_geojson(self, filepath: str) -> gpd.GeoDataFrame:
"""Load and validate GeoJSON file."""
try:
gdf = gpd.read_file(filepath)
# Filter only LineString and MultiLineString geometries
line_types = ['LineString', 'MultiLineString']
gdf = gdf[gdf.geometry.type.isin(line_types)]
if len(gdf) == 0:
raise ValueError(f"No line geometries found in {filepath}")
print(f"Loaded {len(gdf)} road features from {filepath}")
return gdf
except Exception as e:
raise Exception(f"Error loading {filepath}: {str(e)}")
def create_buffer_union(self, gdf: gpd.GeoDataFrame) -> Any:
"""Create a buffered union of all geometries in the GeoDataFrame."""
# Buffer all geometries
buffered_geoms = gdf.geometry.buffer(self.tolerance_deg)
# Create union of all buffered geometries
union_geom = unary_union(buffered_geoms.tolist())
return union_geom
def find_removed_segments(self, source_gdf: gpd.GeoDataFrame,
target_union: Any) -> List[Dict[str, Any]]:
"""
Find segments in source_gdf that are not covered by target_union (removed roads).
Args:
source_gdf: GeoDataFrame with roads to check for coverage
target_union: Buffered union geometry from the other dataset
Returns:
List of removed segments with original metadata plus 'removed: true'
"""
removed_segments = []
for idx, row in source_gdf.iterrows():
geom = row.geometry
# Handle MultiLineString by processing each component
if isinstance(geom, MultiLineString):
lines = list(geom.geoms)
else:
lines = [geom]
for line in lines:
# Find parts of the line that don't intersect with target_union
try:
uncovered = line.difference(target_union)
# Skip if no uncovered area
if uncovered.is_empty:
continue
# Handle different geometry types returned by difference
uncovered_lines = []
if hasattr(uncovered, 'geoms'):
# MultiLineString or GeometryCollection
for geom_part in uncovered.geoms:
if isinstance(geom_part, LineString):
uncovered_lines.append(geom_part)
elif isinstance(uncovered, LineString):
uncovered_lines.append(uncovered)
# Check each uncovered line segment
for uncovered_line in uncovered_lines:
# Calculate length in degrees, then convert to feet
length_deg = uncovered_line.length
length_feet = length_deg * 364000.0 # Approximate conversion
# Only include if above minimum threshold
if length_feet >= self.min_gap_length_feet:
# Create properties dict with original metadata plus 'removed: true'
properties = dict(row.drop('geometry'))
properties['removed'] = True
removed_segments.append({
'geometry': uncovered_line,
**properties
})
except Exception as e:
print(f"Warning: Error processing geometry at index {idx}: {str(e)}")
continue
return removed_segments
def find_added_roads(self, source_gdf: gpd.GeoDataFrame,
target_union: Any) -> List[Dict[str, Any]]:
"""
Find entire roads in source_gdf that don't significantly overlap with target_union (added roads).
Args:
source_gdf: GeoDataFrame with roads to check for coverage
target_union: Buffered union geometry from the other dataset
Returns:
List of entire added roads with original metadata unchanged
"""
added_roads = []
for idx, row in source_gdf.iterrows():
geom = row.geometry
try:
# Check what portion of the road is not covered
uncovered = geom.difference(target_union)
if not uncovered.is_empty:
# Calculate what percentage of the original road is uncovered
uncovered_length = 0
if hasattr(uncovered, 'geoms'):
for geom_part in uncovered.geoms:
if isinstance(geom_part, LineString):
uncovered_length += geom_part.length
elif isinstance(uncovered, LineString):
uncovered_length = uncovered.length
original_length = geom.length
uncovered_ratio = uncovered_length / original_length if original_length > 0 else 0
# Convert to feet for minimum length check
uncovered_length_feet = uncovered_length * 364000.0
# Include the entire road if:
# 1. The uncovered portion is above minimum threshold, AND
# 2. More than 50% of the road is uncovered (indicating it's genuinely new/added)
if uncovered_length_feet >= self.min_gap_length_feet and uncovered_ratio > 0.5:
# Include entire original road with all original metadata
properties = dict(row.drop('geometry'))
added_roads.append({
'geometry': geom, # Use original geometry, not just the uncovered part
**properties
})
except Exception as e:
print(f"Warning: Error processing geometry at index {idx}: {str(e)}")
continue
return added_roads
def compare_roads(self, file1_path: str, file2_path: str) -> Tuple[List[Dict], List[Dict]]:
"""
Compare two GeoJSON files and find significant differences.
Returns:
Tuple of (removed_roads, added_roads)
"""
print(f"Comparing {file1_path} and {file2_path}")
print(f"Tolerance: {self.tolerance_feet} feet")
print(f"Minimum significant length: {self.min_gap_length_feet} feet")
print("-" * 50)
# Load both files
gdf1 = self.load_geojson(file1_path)
gdf2 = self.load_geojson(file2_path)
# Ensure both are in the same CRS
if gdf1.crs != gdf2.crs:
print(f"Warning: CRS mismatch. Converting {file2_path} to match {file1_path}")
gdf2 = gdf2.to_crs(gdf1.crs)
print("Creating spatial unions...")
# Create buffered unions
union1 = self.create_buffer_union(gdf1)
union2 = self.create_buffer_union(gdf2)
print("Finding removed and added roads...")
# Find roads in file1 not covered by file2 (removed roads)
removed_roads = self.find_removed_segments(gdf1, union2)
# Find roads in file2 not covered by file1 (added roads)
added_roads = self.find_added_roads(gdf2, union1)
return removed_roads, added_roads
def save_results(self, removed: List[Dict], added: List[Dict], output_path: str):
"""Save results to GeoJSON file."""
all_results = removed + added
if not all_results:
print("No significant differences found!")
return
# Create GeoDataFrame
results_gdf = gpd.GeoDataFrame(all_results)
# Save to file
results_gdf.to_file(output_path, driver='GeoJSON')
print(f"Results saved to: {output_path}")
def print_summary(self, removed: List[Dict], added: List[Dict], file1_name: str, file2_name: str):
"""Print a summary of the comparison results."""
print("\n" + "="*60)
print("COMPARISON SUMMARY")
print("="*60)
print(f"\nFile 1: {file1_name}")
print(f"File 2: {file2_name}")
print(f"Tolerance: {self.tolerance_feet} feet")
print(f"Minimum significant length: {self.min_gap_length_feet} feet")
if removed:
print(f"\n🔴 REMOVED ROADS ({len(removed)} segments):")
print("These road segments exist in File 1 but are missing or incomplete in File 2:")
# Calculate total length of removed segments
total_removed_length = 0
removed_by_road = {}
for segment in removed:
geom = segment['geometry']
length_feet = geom.length * 364000.0 # Convert to feet
total_removed_length += length_feet
# Get road name
road_name = "Unknown"
name_fields = ['name', 'NAME', 'road_name', 'street_name', 'FULLNAME']
for field in name_fields:
if field in segment and pd.notna(segment[field]):
road_name = str(segment[field])
break
if road_name not in removed_by_road:
removed_by_road[road_name] = []
removed_by_road[road_name].append(length_feet)
print(f"Total removed length: {total_removed_length:,.1f} feet ({total_removed_length/5280:.2f} miles)")
for road, lengths in sorted(removed_by_road.items()):
road_total = sum(lengths)
print(f"{road}: {len(lengths)} segment(s), {road_total:,.1f} feet")
if added:
print(f"\n🔵 ADDED ROADS ({len(added)} roads):")
print("These roads exist in File 2 but are missing or incomplete in File 1:")
# Calculate total length of added roads
total_added_length = 0
added_by_road = {}
for road in added:
geom = road['geometry']
length_feet = geom.length * 364000.0 # Convert to feet
total_added_length += length_feet
# Get road name
road_name = "Unknown"
name_fields = ['name', 'NAME', 'road_name', 'street_name', 'FULLNAME']
for field in name_fields:
if field in road and pd.notna(road[field]):
road_name = str(road[field])
break
if road_name not in added_by_road:
added_by_road[road_name] = 0
added_by_road[road_name] += length_feet
print(f"Total added length: {total_added_length:,.1f} feet ({total_added_length/5280:.2f} miles)")
for road, length in sorted(added_by_road.items()):
print(f"{road}: {length:,.1f} feet")
if not removed and not added:
print("\n✅ No significant differences found!")
print("The road networks have good coverage overlap within the specified tolerance.")
def main():
parser = argparse.ArgumentParser(
description="Compare two GeoJSON files containing roads and find significant gaps or extras",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python compare_roads.py roads1.geojson roads2.geojson
python compare_roads.py roads1.geojson roads2.geojson --tolerance 100 --min-length 200
python compare_roads.py roads1.geojson roads2.geojson --output differences.geojson
"""
)
parser.add_argument('file1', help='First GeoJSON file')
parser.add_argument('file2', help='Second GeoJSON file')
parser.add_argument('--tolerance', '-t', type=float, default=50.0,
help='Distance tolerance in feet for considering roads as overlapping (default: 50)')
parser.add_argument('--min-length', '-m', type=float, default=100.0,
help='Minimum length in feet for gaps/extras to be considered significant (default: 100)')
parser.add_argument('--output', '-o', help='Output GeoJSON file for results (optional)')
args = parser.parse_args()
# Validate input files
if not Path(args.file1).exists():
print(f"Error: File {args.file1} does not exist")
return 1
if not Path(args.file2).exists():
print(f"Error: File {args.file2} does not exist")
return 1
try:
# Create comparator and run comparison
comparator = RoadComparator(
tolerance_feet=args.tolerance,
min_gap_length_feet=args.min_length
)
removed, added = comparator.compare_roads(args.file1, args.file2)
# Print summary
comparator.print_summary(removed, added, args.file1, args.file2)
# Save results if output file specified
if args.output:
comparator.save_results(removed, added, args.output)
elif removed or added:
# Auto-generate output filename if differences found
output_file = f"road_differences_{Path(args.file1).stem}_vs_{Path(args.file2).stem}.geojson"
comparator.save_results(removed, added, output_file)
return 0
except Exception as e:
print(f"Error: {str(e)}")
return 1
if __name__ == "__main__":
exit(main())