Threaded version of script

This commit is contained in:
Will Bradley 2025-06-28 19:09:21 -07:00
parent a9914dd7c0
commit 01ce4eec5c

510
threaded.py Normal file
View File

@ -0,0 +1,510 @@
#!/usr/bin/env python3
"""
GeoJSON Road Comparison Script - Optimized Version
Compares two GeoJSON files containing road data and identifies:
1. Roads in file1 that don't have corresponding coverage in file2 (removed roads)
2. Roads in file2 that don't have corresponding coverage in file1 (added roads)
Only reports differences that are significant (above minimum length threshold).
Optimized for performance with parallel processing and spatial indexing.
"""
import json
import argparse
from pathlib import Path
from typing import List, Dict, Any, Tuple
import geopandas as gpd
from shapely.geometry import LineString, MultiLineString, Point
from shapely.ops import unary_union
from shapely.strtree import STRtree
import pandas as pd
import warnings
import multiprocessing as mp
from functools import partial
import numpy as np
from concurrent.futures import ProcessPoolExecutor, as_completed
import gc
# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')
class RoadComparator:
def __init__(self, tolerance_feet: float = 50.0, min_gap_length_feet: float = 100.0,
n_jobs: int = None, chunk_size: int = 1000):
"""
Initialize the road comparator.
Args:
tolerance_feet: Distance tolerance for considering roads as overlapping (default: 50 feet)
min_gap_length_feet: Minimum length of gap/extra to be considered significant (default: 100 feet)
n_jobs: Number of parallel processes to use (default: CPU count - 1)
chunk_size: Number of geometries to process per chunk (default: 1000)
"""
self.tolerance_feet = tolerance_feet
self.min_gap_length_feet = min_gap_length_feet
self.n_jobs = n_jobs or max(1, mp.cpu_count() - 1)
self.chunk_size = chunk_size
# Convert feet to degrees (approximate conversion for continental US)
# 1 degree latitude ≈ 364,000 feet
# 1 degree longitude ≈ 288,000 feet (at 40° latitude)
self.tolerance_deg = tolerance_feet / 364000.0
self.min_gap_length_deg = min_gap_length_feet / 364000.0
print(f"Using {self.n_jobs} parallel processes with chunk size {self.chunk_size}")
def load_geojson(self, filepath: str) -> gpd.GeoDataFrame:
"""Load and validate GeoJSON file with optimizations."""
try:
# Use pyogr engine for faster loading of large files
gdf = gpd.read_file(filepath, engine='pyogrio')
# Filter only LineString and MultiLineString geometries
line_types = ['LineString', 'MultiLineString']
gdf = gdf[gdf.geometry.type.isin(line_types)].copy()
if len(gdf) == 0:
raise ValueError(f"No line geometries found in {filepath}")
# Reset index for efficient processing
gdf = gdf.reset_index(drop=True)
# Ensure geometry is valid and fix simple issues
invalid_mask = ~gdf.geometry.is_valid
if invalid_mask.any():
print(f"Fixing {invalid_mask.sum()} invalid geometries...")
gdf.loc[invalid_mask, 'geometry'] = gdf.loc[invalid_mask, 'geometry'].buffer(0)
print(f"Loaded {len(gdf)} road features from {filepath}")
return gdf
except Exception as e:
raise Exception(f"Error loading {filepath}: {str(e)}")
def create_buffered_union_optimized(self, gdf: gpd.GeoDataFrame) -> Any:
"""Create a buffered union using chunked processing for memory efficiency."""
print("Creating optimized buffered union...")
# Process in chunks to manage memory
chunks = [gdf.iloc[i:i+self.chunk_size] for i in range(0, len(gdf), self.chunk_size)]
chunk_unions = []
# Use partial function for multiprocessing
buffer_func = partial(self._buffer_chunk, tolerance=self.tolerance_deg)
with ProcessPoolExecutor(max_workers=self.n_jobs) as executor:
# Submit all chunk processing jobs
future_to_chunk = {executor.submit(buffer_func, chunk): i
for i, chunk in enumerate(chunks)}
# Collect results as they complete
for future in as_completed(future_to_chunk):
chunk_idx = future_to_chunk[future]
try:
chunk_union = future.result()
if chunk_union and not chunk_union.is_empty:
chunk_unions.append(chunk_union)
print(f"Processed chunk {chunk_idx + 1}/{len(chunks)}")
except Exception as e:
print(f"Error processing chunk {chunk_idx}: {str(e)}")
# Union all chunk results
print("Combining chunk unions...")
if chunk_unions:
final_union = unary_union(chunk_unions)
# Force garbage collection
del chunk_unions
gc.collect()
return final_union
else:
raise Exception("No valid geometries to create union")
@staticmethod
def _buffer_chunk(chunk_gdf: gpd.GeoDataFrame, tolerance: float) -> Any:
"""Buffer geometries in a chunk and return their union."""
try:
# Buffer all geometries in the chunk
buffered = chunk_gdf.geometry.buffer(tolerance)
# Create union of buffered geometries
if len(buffered) == 1:
return buffered.iloc[0]
else:
return unary_union(buffered.tolist())
except Exception as e:
print(f"Error in chunk processing: {str(e)}")
return None
def create_spatial_index(self, gdf: gpd.GeoDataFrame) -> STRtree:
"""Create spatial index for fast intersection queries."""
print("Creating spatial index...")
# Create STRtree for fast spatial queries
geometries = gdf.geometry.tolist()
return STRtree(geometries)
def find_removed_segments_optimized(self, source_gdf: gpd.GeoDataFrame,
target_union: Any) -> List[Dict[str, Any]]:
"""
Find segments in source_gdf that are not covered by target_union (removed roads).
Optimized with parallel processing.
"""
print("Finding removed segments...")
# Split into chunks for parallel processing
chunks = [source_gdf.iloc[i:i+self.chunk_size]
for i in range(0, len(source_gdf), self.chunk_size)]
all_removed = []
# Use partial function for multiprocessing
process_func = partial(self._process_removed_chunk,
target_union=target_union,
min_length_deg=self.min_gap_length_deg)
with ProcessPoolExecutor(max_workers=self.n_jobs) as executor:
# Submit all chunk processing jobs
future_to_chunk = {executor.submit(process_func, chunk): i
for i, chunk in enumerate(chunks)}
# Collect results as they complete
for future in as_completed(future_to_chunk):
chunk_idx = future_to_chunk[future]
try:
chunk_removed = future.result()
all_removed.extend(chunk_removed)
print(f"Processed removed chunk {chunk_idx + 1}/{len(chunks)}")
except Exception as e:
print(f"Error processing removed chunk {chunk_idx}: {str(e)}")
return all_removed
@staticmethod
def _process_removed_chunk(chunk_gdf: gpd.GeoDataFrame, target_union: Any,
min_length_deg: float) -> List[Dict[str, Any]]:
"""Process a chunk of geometries to find removed segments."""
removed_segments = []
for idx, row in chunk_gdf.iterrows():
geom = row.geometry
# Handle MultiLineString by processing each component
if isinstance(geom, MultiLineString):
lines = list(geom.geoms)
else:
lines = [geom]
for line in lines:
try:
# Find parts of the line that don't intersect with target_union
uncovered = line.difference(target_union)
if uncovered.is_empty:
continue
# Handle different geometry types returned by difference
uncovered_lines = []
if hasattr(uncovered, 'geoms'):
for geom_part in uncovered.geoms:
if isinstance(geom_part, LineString):
uncovered_lines.append(geom_part)
elif isinstance(uncovered, LineString):
uncovered_lines.append(uncovered)
# Check each uncovered line segment
for uncovered_line in uncovered_lines:
if uncovered_line.length >= min_length_deg:
# Create properties dict with original metadata plus 'removed: true'
properties = dict(row.drop('geometry'))
properties['removed'] = True
removed_segments.append({
'geometry': uncovered_line,
**properties
})
except Exception as e:
continue # Skip problematic geometries
return removed_segments
def find_added_roads_optimized(self, source_gdf: gpd.GeoDataFrame,
target_union: Any) -> List[Dict[str, Any]]:
"""
Find entire roads in source_gdf that don't significantly overlap with target_union.
Optimized with parallel processing.
"""
print("Finding added roads...")
# Split into chunks for parallel processing
chunks = [source_gdf.iloc[i:i+self.chunk_size]
for i in range(0, len(source_gdf), self.chunk_size)]
all_added = []
# Use partial function for multiprocessing
process_func = partial(self._process_added_chunk,
target_union=target_union,
min_length_deg=self.min_gap_length_deg)
with ProcessPoolExecutor(max_workers=self.n_jobs) as executor:
# Submit all chunk processing jobs
future_to_chunk = {executor.submit(process_func, chunk): i
for i, chunk in enumerate(chunks)}
# Collect results as they complete
for future in as_completed(future_to_chunk):
chunk_idx = future_to_chunk[future]
try:
chunk_added = future.result()
all_added.extend(chunk_added)
print(f"Processed added chunk {chunk_idx + 1}/{len(chunks)}")
except Exception as e:
print(f"Error processing added chunk {chunk_idx}: {str(e)}")
return all_added
@staticmethod
def _process_added_chunk(chunk_gdf: gpd.GeoDataFrame, target_union: Any,
min_length_deg: float) -> List[Dict[str, Any]]:
"""Process a chunk of geometries to find added roads."""
added_roads = []
for idx, row in chunk_gdf.iterrows():
geom = row.geometry
try:
# Check what portion of the road is not covered
uncovered = geom.difference(target_union)
if not uncovered.is_empty:
# Calculate what percentage of the original road is uncovered
uncovered_length = 0
if hasattr(uncovered, 'geoms'):
for geom_part in uncovered.geoms:
if isinstance(geom_part, LineString):
uncovered_length += geom_part.length
elif isinstance(uncovered, LineString):
uncovered_length = uncovered.length
original_length = geom.length
uncovered_ratio = uncovered_length / original_length if original_length > 0 else 0
# Include the entire road if:
# 1. The uncovered portion is above minimum threshold, AND
# 2. More than 50% of the road is uncovered
if uncovered_length >= min_length_deg and uncovered_ratio > 0.5:
# Include entire original road with all original metadata
properties = dict(row.drop('geometry'))
added_roads.append({
'geometry': geom,
**properties
})
except Exception as e:
continue # Skip problematic geometries
return added_roads
def compare_roads(self, file1_path: str, file2_path: str) -> Tuple[List[Dict], List[Dict]]:
"""
Compare two GeoJSON files and find significant differences.
Optimized version with parallel processing.
Returns:
Tuple of (removed_roads, added_roads)
"""
print(f"Comparing {file1_path} and {file2_path}")
print(f"Tolerance: {self.tolerance_feet} feet")
print(f"Minimum significant length: {self.min_gap_length_feet} feet")
print(f"Parallel processing: {self.n_jobs} workers")
print("-" * 50)
# Load both files
gdf1 = self.load_geojson(file1_path)
gdf2 = self.load_geojson(file2_path)
# Ensure both are in the same CRS
if gdf1.crs != gdf2.crs:
print(f"Warning: CRS mismatch. Converting {file2_path} to match {file1_path}")
gdf2 = gdf2.to_crs(gdf1.crs)
print("Creating optimized spatial unions...")
# Create buffered unions using optimized method
union1 = self.create_buffered_union_optimized(gdf1)
union2 = self.create_buffered_union_optimized(gdf2)
print("Finding removed and added roads with parallel processing...")
# Find roads using optimized parallel methods
removed_roads = self.find_removed_segments_optimized(gdf1, union2)
added_roads = self.find_added_roads_optimized(gdf2, union1)
# Clean up memory
del gdf1, gdf2, union1, union2
gc.collect()
return removed_roads, added_roads
def save_results(self, removed: List[Dict], added: List[Dict], output_path: str):
"""Save results to GeoJSON file."""
all_results = removed + added
if not all_results:
print("No significant differences found!")
return
# Create GeoDataFrame efficiently
print("Saving results...")
results_gdf = gpd.GeoDataFrame(all_results)
# Save to file with optimization
results_gdf.to_file(output_path, driver='GeoJSON', engine='pyogrio')
print(f"Results saved to: {output_path}")
def print_summary(self, removed: List[Dict], added: List[Dict], file1_name: str, file2_name: str):
"""Print a summary of the comparison results."""
print("\n" + "="*60)
print("COMPARISON SUMMARY")
print("="*60)
print(f"\nFile 1: {file1_name}")
print(f"File 2: {file2_name}")
print(f"Tolerance: {self.tolerance_feet} feet")
print(f"Minimum significant length: {self.min_gap_length_feet} feet")
if removed:
print(f"\n🔴 REMOVED ROADS ({len(removed)} segments):")
print("These road segments exist in File 1 but are missing or incomplete in File 2:")
# Calculate total length of removed segments
total_removed_length = 0
removed_by_road = {}
for segment in removed:
geom = segment['geometry']
length_feet = geom.length * 364000.0 # Convert to feet
total_removed_length += length_feet
# Get road name
road_name = "Unknown"
name_fields = ['name', 'NAME', 'road_name', 'street_name', 'FULLNAME']
for field in name_fields:
if field in segment and pd.notna(segment[field]):
road_name = str(segment[field])
break
if road_name not in removed_by_road:
removed_by_road[road_name] = []
removed_by_road[road_name].append(length_feet)
print(f"Total removed length: {total_removed_length:,.1f} feet ({total_removed_length/5280:.2f} miles)")
for road, lengths in sorted(removed_by_road.items()):
road_total = sum(lengths)
print(f"{road}: {len(lengths)} segment(s), {road_total:,.1f} feet")
if added:
print(f"\n🔵 ADDED ROADS ({len(added)} roads):")
print("These roads exist in File 2 but are missing or incomplete in File 1:")
# Calculate total length of added roads
total_added_length = 0
added_by_road = {}
for road in added:
geom = road['geometry']
length_feet = geom.length * 364000.0 # Convert to feet
total_added_length += length_feet
# Get road name
road_name = "Unknown"
name_fields = ['name', 'NAME', 'road_name', 'street_name', 'FULLNAME']
for field in name_fields:
if field in road and pd.notna(road[field]):
road_name = str(road[field])
break
if road_name not in added_by_road:
added_by_road[road_name] = 0
added_by_road[road_name] += length_feet
print(f"Total added length: {total_added_length:,.1f} feet ({total_added_length/5280:.2f} miles)")
for road, length in sorted(added_by_road.items()):
print(f"{road}: {length:,.1f} feet")
if not removed and not added:
print("\n✅ No significant differences found!")
print("The road networks have good coverage overlap within the specified tolerance.")
def main():
parser = argparse.ArgumentParser(
description="Compare two GeoJSON files containing roads and find significant gaps or extras (Optimized)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python compare_roads.py roads1.geojson roads2.geojson
python compare_roads.py roads1.geojson roads2.geojson --tolerance 100 --min-length 200
python compare_roads.py roads1.geojson roads2.geojson --output differences.geojson
python compare_roads.py roads1.geojson roads2.geojson --jobs 8 --chunk-size 2000
"""
)
parser.add_argument('file1', help='First GeoJSON file')
parser.add_argument('file2', help='Second GeoJSON file')
parser.add_argument('--tolerance', '-t', type=float, default=50.0,
help='Distance tolerance in feet for considering roads as overlapping (default: 50)')
parser.add_argument('--min-length', '-m', type=float, default=100.0,
help='Minimum length in feet for gaps/extras to be considered significant (default: 100)')
parser.add_argument('--output', '-o', help='Output GeoJSON file for results (optional)')
parser.add_argument('--jobs', '-j', type=int, default=None,
help='Number of parallel processes (default: CPU count - 1)')
parser.add_argument('--chunk-size', '-c', type=int, default=1000,
help='Number of geometries to process per chunk (default: 1000)')
args = parser.parse_args()
# Validate input files
if not Path(args.file1).exists():
print(f"Error: File {args.file1} does not exist")
return 1
if not Path(args.file2).exists():
print(f"Error: File {args.file2} does not exist")
return 1
try:
# Create comparator and run comparison
comparator = RoadComparator(
tolerance_feet=args.tolerance,
min_gap_length_feet=args.min_length,
n_jobs=args.jobs,
chunk_size=args.chunk_size
)
removed, added = comparator.compare_roads(args.file1, args.file2)
# Print summary
comparator.print_summary(removed, added, args.file1, args.file2)
# Save results if output file specified
if args.output:
comparator.save_results(removed, added, args.output)
elif removed or added:
# Auto-generate output filename if differences found
output_file = f"road_differences_{Path(args.file1).stem}_vs_{Path(args.file2).stem}.geojson"
comparator.save_results(removed, added, output_file)
return 0
except Exception as e:
print(f"Error: {str(e)}")
return 1
if __name__ == "__main__":
exit(main())