511 lines
22 KiB
Python
511 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
GeoJSON Road Comparison Script - Optimized Version
|
|
|
|
Compares two GeoJSON files containing road data and identifies:
|
|
1. Roads in file1 that don't have corresponding coverage in file2 (removed roads)
|
|
2. Roads in file2 that don't have corresponding coverage in file1 (added roads)
|
|
|
|
Only reports differences that are significant (above minimum length threshold).
|
|
Optimized for performance with parallel processing and spatial indexing.
|
|
"""
|
|
|
|
import json
|
|
import argparse
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Tuple
|
|
import geopandas as gpd
|
|
from shapely.geometry import LineString, MultiLineString, Point
|
|
from shapely.ops import unary_union
|
|
from shapely.strtree import STRtree
|
|
import pandas as pd
|
|
import warnings
|
|
import multiprocessing as mp
|
|
from functools import partial
|
|
import numpy as np
|
|
from concurrent.futures import ProcessPoolExecutor, as_completed
|
|
import gc
|
|
|
|
# Suppress warnings for cleaner output
|
|
warnings.filterwarnings('ignore')
|
|
|
|
class RoadComparator:
|
|
def __init__(self, tolerance_feet: float = 50.0, min_gap_length_feet: float = 100.0,
|
|
n_jobs: int = None, chunk_size: int = 1000):
|
|
"""
|
|
Initialize the road comparator.
|
|
|
|
Args:
|
|
tolerance_feet: Distance tolerance for considering roads as overlapping (default: 50 feet)
|
|
min_gap_length_feet: Minimum length of gap/extra to be considered significant (default: 100 feet)
|
|
n_jobs: Number of parallel processes to use (default: CPU count - 1)
|
|
chunk_size: Number of geometries to process per chunk (default: 1000)
|
|
"""
|
|
self.tolerance_feet = tolerance_feet
|
|
self.min_gap_length_feet = min_gap_length_feet
|
|
self.n_jobs = n_jobs or max(1, mp.cpu_count() - 1)
|
|
self.chunk_size = chunk_size
|
|
|
|
# Convert feet to degrees (approximate conversion for continental US)
|
|
# 1 degree latitude ≈ 364,000 feet
|
|
# 1 degree longitude ≈ 288,000 feet (at 40° latitude)
|
|
self.tolerance_deg = tolerance_feet / 364000.0
|
|
self.min_gap_length_deg = min_gap_length_feet / 364000.0
|
|
|
|
print(f"Using {self.n_jobs} parallel processes with chunk size {self.chunk_size}")
|
|
|
|
def load_geojson(self, filepath: str) -> gpd.GeoDataFrame:
|
|
"""Load and validate GeoJSON file with optimizations."""
|
|
try:
|
|
# Use pyogr engine for faster loading of large files
|
|
gdf = gpd.read_file(filepath, engine='pyogrio')
|
|
|
|
# Filter only LineString and MultiLineString geometries
|
|
line_types = ['LineString', 'MultiLineString']
|
|
gdf = gdf[gdf.geometry.type.isin(line_types)].copy()
|
|
|
|
if len(gdf) == 0:
|
|
raise ValueError(f"No line geometries found in {filepath}")
|
|
|
|
# Reset index for efficient processing
|
|
gdf = gdf.reset_index(drop=True)
|
|
|
|
# Ensure geometry is valid and fix simple issues
|
|
invalid_mask = ~gdf.geometry.is_valid
|
|
if invalid_mask.any():
|
|
print(f"Fixing {invalid_mask.sum()} invalid geometries...")
|
|
gdf.loc[invalid_mask, 'geometry'] = gdf.loc[invalid_mask, 'geometry'].buffer(0)
|
|
|
|
print(f"Loaded {len(gdf)} road features from {filepath}")
|
|
return gdf
|
|
|
|
except Exception as e:
|
|
raise Exception(f"Error loading {filepath}: {str(e)}")
|
|
|
|
def create_buffered_union_optimized(self, gdf: gpd.GeoDataFrame) -> Any:
|
|
"""Create a buffered union using chunked processing for memory efficiency."""
|
|
print("Creating optimized buffered union...")
|
|
|
|
# Process in chunks to manage memory
|
|
chunks = [gdf.iloc[i:i+self.chunk_size] for i in range(0, len(gdf), self.chunk_size)]
|
|
chunk_unions = []
|
|
|
|
# Use partial function for multiprocessing
|
|
buffer_func = partial(self._buffer_chunk, tolerance=self.tolerance_deg)
|
|
|
|
with ProcessPoolExecutor(max_workers=self.n_jobs) as executor:
|
|
# Submit all chunk processing jobs
|
|
future_to_chunk = {executor.submit(buffer_func, chunk): i
|
|
for i, chunk in enumerate(chunks)}
|
|
|
|
# Collect results as they complete
|
|
for future in as_completed(future_to_chunk):
|
|
chunk_idx = future_to_chunk[future]
|
|
try:
|
|
chunk_union = future.result()
|
|
if chunk_union and not chunk_union.is_empty:
|
|
chunk_unions.append(chunk_union)
|
|
print(f"Processed chunk {chunk_idx + 1}/{len(chunks)}")
|
|
except Exception as e:
|
|
print(f"Error processing chunk {chunk_idx}: {str(e)}")
|
|
|
|
# Union all chunk results
|
|
print("Combining chunk unions...")
|
|
if chunk_unions:
|
|
final_union = unary_union(chunk_unions)
|
|
# Force garbage collection
|
|
del chunk_unions
|
|
gc.collect()
|
|
return final_union
|
|
else:
|
|
raise Exception("No valid geometries to create union")
|
|
|
|
@staticmethod
|
|
def _buffer_chunk(chunk_gdf: gpd.GeoDataFrame, tolerance: float) -> Any:
|
|
"""Buffer geometries in a chunk and return their union."""
|
|
try:
|
|
# Buffer all geometries in the chunk
|
|
buffered = chunk_gdf.geometry.buffer(tolerance)
|
|
|
|
# Create union of buffered geometries
|
|
if len(buffered) == 1:
|
|
return buffered.iloc[0]
|
|
else:
|
|
return unary_union(buffered.tolist())
|
|
except Exception as e:
|
|
print(f"Error in chunk processing: {str(e)}")
|
|
return None
|
|
|
|
def create_spatial_index(self, gdf: gpd.GeoDataFrame) -> STRtree:
|
|
"""Create spatial index for fast intersection queries."""
|
|
print("Creating spatial index...")
|
|
# Create STRtree for fast spatial queries
|
|
geometries = gdf.geometry.tolist()
|
|
return STRtree(geometries)
|
|
|
|
def find_removed_segments_optimized(self, source_gdf: gpd.GeoDataFrame,
|
|
target_union: Any) -> List[Dict[str, Any]]:
|
|
"""
|
|
Find segments in source_gdf that are not covered by target_union (removed roads).
|
|
Optimized with parallel processing.
|
|
"""
|
|
print("Finding removed segments...")
|
|
|
|
# Split into chunks for parallel processing
|
|
chunks = [source_gdf.iloc[i:i+self.chunk_size]
|
|
for i in range(0, len(source_gdf), self.chunk_size)]
|
|
|
|
all_removed = []
|
|
|
|
# Use partial function for multiprocessing
|
|
process_func = partial(self._process_removed_chunk,
|
|
target_union=target_union,
|
|
min_length_deg=self.min_gap_length_deg)
|
|
|
|
with ProcessPoolExecutor(max_workers=self.n_jobs) as executor:
|
|
# Submit all chunk processing jobs
|
|
future_to_chunk = {executor.submit(process_func, chunk): i
|
|
for i, chunk in enumerate(chunks)}
|
|
|
|
# Collect results as they complete
|
|
for future in as_completed(future_to_chunk):
|
|
chunk_idx = future_to_chunk[future]
|
|
try:
|
|
chunk_removed = future.result()
|
|
all_removed.extend(chunk_removed)
|
|
print(f"Processed removed chunk {chunk_idx + 1}/{len(chunks)}")
|
|
except Exception as e:
|
|
print(f"Error processing removed chunk {chunk_idx}: {str(e)}")
|
|
|
|
return all_removed
|
|
|
|
@staticmethod
|
|
def _process_removed_chunk(chunk_gdf: gpd.GeoDataFrame, target_union: Any,
|
|
min_length_deg: float) -> List[Dict[str, Any]]:
|
|
"""Process a chunk of geometries to find removed segments."""
|
|
removed_segments = []
|
|
|
|
for idx, row in chunk_gdf.iterrows():
|
|
geom = row.geometry
|
|
|
|
# Handle MultiLineString by processing each component
|
|
if isinstance(geom, MultiLineString):
|
|
lines = list(geom.geoms)
|
|
else:
|
|
lines = [geom]
|
|
|
|
for line in lines:
|
|
try:
|
|
# Find parts of the line that don't intersect with target_union
|
|
uncovered = line.difference(target_union)
|
|
|
|
if uncovered.is_empty:
|
|
continue
|
|
|
|
# Handle different geometry types returned by difference
|
|
uncovered_lines = []
|
|
if hasattr(uncovered, 'geoms'):
|
|
for geom_part in uncovered.geoms:
|
|
if isinstance(geom_part, LineString):
|
|
uncovered_lines.append(geom_part)
|
|
elif isinstance(uncovered, LineString):
|
|
uncovered_lines.append(uncovered)
|
|
|
|
# Check each uncovered line segment
|
|
for uncovered_line in uncovered_lines:
|
|
if uncovered_line.length >= min_length_deg:
|
|
# Create properties dict with original metadata plus 'removed: true'
|
|
properties = dict(row.drop('geometry'))
|
|
properties['removed'] = True
|
|
|
|
removed_segments.append({
|
|
'geometry': uncovered_line,
|
|
**properties
|
|
})
|
|
|
|
except Exception as e:
|
|
continue # Skip problematic geometries
|
|
|
|
return removed_segments
|
|
|
|
def find_added_roads_optimized(self, source_gdf: gpd.GeoDataFrame,
|
|
target_union: Any) -> List[Dict[str, Any]]:
|
|
"""
|
|
Find entire roads in source_gdf that don't significantly overlap with target_union.
|
|
Optimized with parallel processing.
|
|
"""
|
|
print("Finding added roads...")
|
|
|
|
# Split into chunks for parallel processing
|
|
chunks = [source_gdf.iloc[i:i+self.chunk_size]
|
|
for i in range(0, len(source_gdf), self.chunk_size)]
|
|
|
|
all_added = []
|
|
|
|
# Use partial function for multiprocessing
|
|
process_func = partial(self._process_added_chunk,
|
|
target_union=target_union,
|
|
min_length_deg=self.min_gap_length_deg)
|
|
|
|
with ProcessPoolExecutor(max_workers=self.n_jobs) as executor:
|
|
# Submit all chunk processing jobs
|
|
future_to_chunk = {executor.submit(process_func, chunk): i
|
|
for i, chunk in enumerate(chunks)}
|
|
|
|
# Collect results as they complete
|
|
for future in as_completed(future_to_chunk):
|
|
chunk_idx = future_to_chunk[future]
|
|
try:
|
|
chunk_added = future.result()
|
|
all_added.extend(chunk_added)
|
|
print(f"Processed added chunk {chunk_idx + 1}/{len(chunks)}")
|
|
except Exception as e:
|
|
print(f"Error processing added chunk {chunk_idx}: {str(e)}")
|
|
|
|
return all_added
|
|
|
|
@staticmethod
|
|
def _process_added_chunk(chunk_gdf: gpd.GeoDataFrame, target_union: Any,
|
|
min_length_deg: float) -> List[Dict[str, Any]]:
|
|
"""Process a chunk of geometries to find added roads."""
|
|
added_roads = []
|
|
|
|
for idx, row in chunk_gdf.iterrows():
|
|
geom = row.geometry
|
|
|
|
try:
|
|
# Check what portion of the road is not covered
|
|
uncovered = geom.difference(target_union)
|
|
|
|
if not uncovered.is_empty:
|
|
# Calculate what percentage of the original road is uncovered
|
|
uncovered_length = 0
|
|
if hasattr(uncovered, 'geoms'):
|
|
for geom_part in uncovered.geoms:
|
|
if isinstance(geom_part, LineString):
|
|
uncovered_length += geom_part.length
|
|
elif isinstance(uncovered, LineString):
|
|
uncovered_length = uncovered.length
|
|
|
|
original_length = geom.length
|
|
uncovered_ratio = uncovered_length / original_length if original_length > 0 else 0
|
|
|
|
# Include the entire road if:
|
|
# 1. The uncovered portion is above minimum threshold, AND
|
|
# 2. More than 50% of the road is uncovered
|
|
if uncovered_ratio > 0.5:
|
|
#uncovered_length >= min_length_deg and
|
|
# Include entire original road with all original metadata
|
|
properties = dict(row.drop('geometry'))
|
|
|
|
added_roads.append({
|
|
'geometry': geom,
|
|
**properties
|
|
})
|
|
|
|
except Exception as e:
|
|
continue # Skip problematic geometries
|
|
|
|
return added_roads
|
|
|
|
def compare_roads(self, file1_path: str, file2_path: str) -> Tuple[List[Dict], List[Dict]]:
|
|
"""
|
|
Compare two GeoJSON files and find significant differences.
|
|
Optimized version with parallel processing.
|
|
|
|
Returns:
|
|
Tuple of (removed_roads, added_roads)
|
|
"""
|
|
print(f"Comparing {file1_path} and {file2_path}")
|
|
print(f"Tolerance: {self.tolerance_feet} feet")
|
|
print(f"Minimum significant length: {self.min_gap_length_feet} feet")
|
|
print(f"Parallel processing: {self.n_jobs} workers")
|
|
print("-" * 50)
|
|
|
|
# Load both files
|
|
gdf1 = self.load_geojson(file1_path)
|
|
gdf2 = self.load_geojson(file2_path)
|
|
|
|
# Ensure both are in the same CRS
|
|
if gdf1.crs != gdf2.crs:
|
|
print(f"Warning: CRS mismatch. Converting {file2_path} to match {file1_path}")
|
|
gdf2 = gdf2.to_crs(gdf1.crs)
|
|
|
|
print("Creating optimized spatial unions...")
|
|
|
|
# Create buffered unions using optimized method
|
|
union1 = self.create_buffered_union_optimized(gdf1)
|
|
union2 = self.create_buffered_union_optimized(gdf2)
|
|
|
|
print("Finding removed and added roads with parallel processing...")
|
|
|
|
# Find roads using optimized parallel methods
|
|
removed_roads = self.find_removed_segments_optimized(gdf1, union2)
|
|
added_roads = self.find_added_roads_optimized(gdf2, union1)
|
|
|
|
# Clean up memory
|
|
del gdf1, gdf2, union1, union2
|
|
gc.collect()
|
|
|
|
return removed_roads, added_roads
|
|
|
|
def save_results(self, removed: List[Dict], added: List[Dict], output_path: str):
|
|
"""Save results to GeoJSON file."""
|
|
all_results = removed + added
|
|
|
|
if not all_results:
|
|
print("No significant differences found!")
|
|
return
|
|
|
|
# Create GeoDataFrame efficiently
|
|
print("Saving results...")
|
|
results_gdf = gpd.GeoDataFrame(all_results)
|
|
|
|
# Save to file with optimization
|
|
results_gdf.to_file(output_path, driver='GeoJSON', engine='pyogrio')
|
|
print(f"Results saved to: {output_path}")
|
|
|
|
def print_summary(self, removed: List[Dict], added: List[Dict], file1_name: str, file2_name: str):
|
|
"""Print a summary of the comparison results."""
|
|
print("\n" + "="*60)
|
|
print("COMPARISON SUMMARY")
|
|
print("="*60)
|
|
|
|
print(f"\nFile 1: {file1_name}")
|
|
print(f"File 2: {file2_name}")
|
|
print(f"Tolerance: {self.tolerance_feet} feet")
|
|
print(f"Minimum significant length: {self.min_gap_length_feet} feet")
|
|
|
|
if removed:
|
|
print(f"\n🔴 REMOVED ROADS ({len(removed)} segments):")
|
|
print("These road segments exist in File 1 but are missing or incomplete in File 2:")
|
|
|
|
# Calculate total length of removed segments
|
|
total_removed_length = 0
|
|
removed_by_road = {}
|
|
|
|
for segment in removed:
|
|
geom = segment['geometry']
|
|
length_feet = geom.length * 364000.0 # Convert to feet
|
|
total_removed_length += length_feet
|
|
|
|
# Get road name
|
|
road_name = "Unknown"
|
|
name_fields = ['name', 'NAME', 'road_name', 'street_name', 'FULLNAME']
|
|
for field in name_fields:
|
|
if field in segment and pd.notna(segment[field]):
|
|
road_name = str(segment[field])
|
|
break
|
|
|
|
if road_name not in removed_by_road:
|
|
removed_by_road[road_name] = []
|
|
removed_by_road[road_name].append(length_feet)
|
|
|
|
print(f"Total removed length: {total_removed_length:,.1f} feet ({total_removed_length/5280:.2f} miles)")
|
|
|
|
for road, lengths in sorted(removed_by_road.items()):
|
|
road_total = sum(lengths)
|
|
print(f" • {road}: {len(lengths)} segment(s), {road_total:,.1f} feet")
|
|
|
|
if added:
|
|
print(f"\n🔵 ADDED ROADS ({len(added)} roads):")
|
|
print("These roads exist in File 2 but are missing or incomplete in File 1:")
|
|
|
|
# Calculate total length of added roads
|
|
total_added_length = 0
|
|
added_by_road = {}
|
|
|
|
for road in added:
|
|
geom = road['geometry']
|
|
length_feet = geom.length * 364000.0 # Convert to feet
|
|
total_added_length += length_feet
|
|
|
|
# Get road name
|
|
road_name = "Unknown"
|
|
name_fields = ['name', 'NAME', 'road_name', 'street_name', 'FULLNAME']
|
|
for field in name_fields:
|
|
if field in road and pd.notna(road[field]):
|
|
road_name = str(road[field])
|
|
break
|
|
|
|
if road_name not in added_by_road:
|
|
added_by_road[road_name] = 0
|
|
added_by_road[road_name] += length_feet
|
|
|
|
print(f"Total added length: {total_added_length:,.1f} feet ({total_added_length/5280:.2f} miles)")
|
|
|
|
for road, length in sorted(added_by_road.items()):
|
|
print(f" • {road}: {length:,.1f} feet")
|
|
|
|
if not removed and not added:
|
|
print("\n✅ No significant differences found!")
|
|
print("The road networks have good coverage overlap within the specified tolerance.")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Compare two GeoJSON files containing roads and find significant gaps or extras (Optimized)",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
python compare_roads.py roads1.geojson roads2.geojson
|
|
python compare_roads.py roads1.geojson roads2.geojson --tolerance 100 --min-length 200
|
|
python compare_roads.py roads1.geojson roads2.geojson --output differences.geojson
|
|
python compare_roads.py roads1.geojson roads2.geojson --jobs 8 --chunk-size 2000
|
|
"""
|
|
)
|
|
|
|
parser.add_argument('file1', help='First GeoJSON file')
|
|
parser.add_argument('file2', help='Second GeoJSON file')
|
|
parser.add_argument('--tolerance', '-t', type=float, default=50.0,
|
|
help='Distance tolerance in feet for considering roads as overlapping (default: 50)')
|
|
parser.add_argument('--min-length', '-m', type=float, default=100.0,
|
|
help='Minimum length in feet for gaps/extras to be considered significant (default: 100)')
|
|
parser.add_argument('--output', '-o', help='Output GeoJSON file for results (optional)')
|
|
parser.add_argument('--jobs', '-j', type=int, default=None,
|
|
help='Number of parallel processes (default: CPU count - 1)')
|
|
parser.add_argument('--chunk-size', '-c', type=int, default=1000,
|
|
help='Number of geometries to process per chunk (default: 1000)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Validate input files
|
|
if not Path(args.file1).exists():
|
|
print(f"Error: File {args.file1} does not exist")
|
|
return 1
|
|
|
|
if not Path(args.file2).exists():
|
|
print(f"Error: File {args.file2} does not exist")
|
|
return 1
|
|
|
|
try:
|
|
# Create comparator and run comparison
|
|
comparator = RoadComparator(
|
|
tolerance_feet=args.tolerance,
|
|
min_gap_length_feet=args.min_length,
|
|
n_jobs=args.jobs,
|
|
chunk_size=args.chunk_size
|
|
)
|
|
|
|
removed, added = comparator.compare_roads(args.file1, args.file2)
|
|
|
|
# Print summary
|
|
comparator.print_summary(removed, added, args.file1, args.file2)
|
|
|
|
# Save results if output file specified
|
|
if args.output:
|
|
comparator.save_results(removed, added, args.output)
|
|
elif removed or added:
|
|
# Auto-generate output filename if differences found
|
|
output_file = f"road_differences_{Path(args.file1).stem}_vs_{Path(args.file2).stem}.geojson"
|
|
comparator.save_results(removed, added, output_file)
|
|
|
|
return 0
|
|
|
|
except Exception as e:
|
|
print(f"Error: {str(e)}")
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
exit(main()) |