#!/usr/bin/env python3 """ Address Data Comparison Tool for US Counties Compares local government address data (from ZIP/shapefile) with OpenStreetMap address data. Downloads OSM data via Overpass API, converts local data to GeoJSON, and performs comprehensive comparison to identify new, existing, and removed addresses. Usage: python compare-addresses.py "Lake" "Florida" --local-zip "original data/Lake/Addresspoints 2025-06.zip" python compare-addresses.py "Sumter" "Florida" --local-zip "original data/Sumter/Address9_13_2024.zip" """ import argparse import json import os import sys import zipfile from datetime import datetime from pathlib import Path from typing import Dict, List, Tuple, Any, Optional import urllib.error import urllib.parse import urllib.request import geopandas as gpd import pandas as pd from shapely.geometry import Point from shapely.strtree import STRtree from shapely.ops import nearest_points import warnings # Import local modules import importlib qgis_functions = importlib.import_module("qgis-functions") # Suppress warnings for cleaner output warnings.filterwarnings('ignore') class AddressComparator: def __init__(self, tolerance_meters: float = 50.0, cache_dir: str = "osm_cache"): """ Initialize the address comparator. Args: tolerance_meters: Distance tolerance for considering addresses as matching cache_dir: Directory to cache OSM data """ self.tolerance_meters = tolerance_meters self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(exist_ok=True) # Convert meters to degrees (approximate) # 1 degree latitude ≈ 111,000 meters self.tolerance_deg = tolerance_meters / 111000.0 def download_osm_addresses(self, county: str, state: str, output_file: str = None) -> str: """Download address data from OpenStreetMap via Overpass API.""" if output_file is None: timestamp = datetime.now().strftime("%Y%m%d") output_file = self.cache_dir / f"osm_addresses_{county.lower()}_{timestamp}.geojson" else: output_file = Path(output_file) # Check if cached file exists and is recent (less than 7 days old) if output_file.exists(): file_age = datetime.now().timestamp() - output_file.stat().st_mtime if file_age < 7 * 24 * 3600: # 7 days in seconds print(f"Using cached OSM data: {output_file}") return str(output_file) print(f"Downloading OSM addresses for {county} County, {state}...") # Build Overpass query for addresses query = f"""[out:json][timeout:180]; area["name"="{state}"]->.state; area["name"="{county} County"](area.state)->.searchArea; nwr["addr:housenumber"](area.searchArea); out geom;""" # Query Overpass API osm_data = self._query_overpass(query) # Convert to GeoJSON geojson = self._convert_osm_to_geojson(osm_data) # Save to file output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, 'w', encoding='utf-8') as f: json.dump(geojson, f, indent=2) print(f"Downloaded {len(geojson['features'])} OSM addresses to {output_file}") return str(output_file) def _query_overpass(self, query: str) -> Dict[str, Any]: """Send query to Overpass API and return JSON response.""" url = "https://overpass-api.de/api/interpreter" data = urllib.parse.urlencode({"data": query}).encode("utf-8") try: with urllib.request.urlopen(url, data=data, timeout=300) as response: return json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as e: print(f"HTTP Error {e.code}: {e.reason}", file=sys.stderr) try: error_body = e.read().decode("utf-8") print(f"Error response: {error_body}", file=sys.stderr) except: pass sys.exit(1) except Exception as e: print(f"Error querying Overpass API: {e}", file=sys.stderr) sys.exit(1) def _convert_osm_to_geojson(self, overpass_data: Dict[str, Any]) -> Dict[str, Any]: """Convert Overpass API response to GeoJSON format.""" features = [] for element in overpass_data.get("elements", []): properties = element.get("tags", {}) # Extract coordinates based on element type if element["type"] == "node": coordinates = [element["lon"], element["lat"]] geometry = {"type": "Point", "coordinates": coordinates} elif element["type"] == "way" and "geometry" in element: # For ways, use the centroid coords = [[coord["lon"], coord["lat"]] for coord in element["geometry"]] if len(coords) > 0: # Calculate centroid lon = sum(coord[0] for coord in coords) / len(coords) lat = sum(coord[1] for coord in coords) / len(coords) coordinates = [lon, lat] geometry = {"type": "Point", "coordinates": coordinates} else: continue else: continue # Skip relations and ways without geometry feature = { "type": "Feature", "properties": properties, "geometry": geometry } features.append(feature) return { "type": "FeatureCollection", "features": features } def load_local_addresses(self, zip_path: str, output_geojson: str = None) -> str: """Load and convert local address data from ZIP file.""" zip_path = Path(zip_path) if output_geojson is None: output_geojson = zip_path.parent / f"{zip_path.stem}_converted.geojson" else: output_geojson = Path(output_geojson) # Check if conversion already exists and is newer than the ZIP if (output_geojson.exists() and zip_path.exists() and output_geojson.stat().st_mtime > zip_path.stat().st_mtime): print(f"Using existing converted data: {output_geojson}") return str(output_geojson) print(f"Converting local address data from {zip_path}...") # Extract and find shapefile in ZIP temp_dir = zip_path.parent / "temp_extract" temp_dir.mkdir(exist_ok=True) try: with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(temp_dir) # Find the shapefile shp_files = list(temp_dir.glob("*.shp")) if not shp_files: raise FileNotFoundError("No shapefile (.shp) found in ZIP") shp_file = shp_files[0] # Load and process shapefile gdf = gpd.read_file(shp_file) # Convert CRS to WGS84 if needed if gdf.crs and gdf.crs != 'EPSG:4326': print(f"Converting from {gdf.crs} to EPSG:4326") gdf = gdf.to_crs('EPSG:4326') # Process address fields using existing logic from sumter-address-convert.py gdf = self._process_address_fields(gdf) # Filter to only point geometries and valid addresses gdf = gdf[gdf.geometry.type == 'Point'].copy() gdf = gdf[gdf['addr:housenumber'].notna()].copy() # Clean output data - keep only OSM address fields osm_fields = [ 'addr:housenumber', 'addr:unit', 'addr:street', 'addr:city', 'addr:postcode', 'addr:state' ] existing_fields = [field for field in osm_fields if field in gdf.columns] gdf = gdf[existing_fields + ['geometry']] # Save to GeoJSON output_geojson.parent.mkdir(parents=True, exist_ok=True) gdf.to_file(output_geojson, driver='GeoJSON') print(f"Converted {len(gdf)} addresses to {output_geojson}") finally: # Clean up temp directory import shutil if temp_dir.exists(): shutil.rmtree(temp_dir) return str(output_geojson) def _process_address_fields(self, gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """Process address fields according to OSM schema (handles multiple formats).""" processed_gdf = gdf.copy() address_mapping = {} # House number - try multiple field names house_number_fields = ['ADD_NUM', 'AddressNum', 'ADDRESS_NUM', 'HOUSE_NUM'] for field in house_number_fields: if field in processed_gdf.columns: add_num_series = processed_gdf[field].copy() add_num_series = pd.to_numeric(add_num_series, errors='coerce') address_mapping['addr:housenumber'] = add_num_series.round().astype('Int64') break # Unit number - try multiple field names unit_fields = ['UNIT', 'UnitNumber', 'UNIT_NUM', 'APT'] for field in unit_fields: if field in processed_gdf.columns: unit_series = processed_gdf[field].copy() unit_series = unit_series.replace(['nan', 'None', '', None], None) unit_series = unit_series.where(unit_series.notna(), None) address_mapping['addr:unit'] = unit_series break # Street name - try multiple approaches if 'SADD' in processed_gdf.columns: # Sumter County format - full address in SADD field street_names = [] for sadd_value in processed_gdf['SADD']: if pd.notna(sadd_value): street_from_addr = qgis_functions.getstreetfromaddress(str(sadd_value), None, None) street_titled = qgis_functions.title(street_from_addr) street_names.append(street_titled) else: street_names.append(None) address_mapping['addr:street'] = street_names elif 'FullAddres' in processed_gdf.columns: # Lake County format - full address in FullAddres field street_names = [] for full_addr in processed_gdf['FullAddres']: if pd.notna(full_addr): street_from_addr = qgis_functions.getstreetfromaddress(str(full_addr), None, None) street_titled = qgis_functions.title(street_from_addr) street_names.append(street_titled) else: street_names.append(None) address_mapping['addr:street'] = street_names elif 'BaseStreet' in processed_gdf.columns: # Lake County alternative - combine street components street_names = [] for idx, row in processed_gdf.iterrows(): street_parts = [] # Prefix direction if 'PrefixDire' in row and pd.notna(row['PrefixDire']): street_parts.append(str(row['PrefixDire']).strip()) # Prefix type if 'PrefixType' in row and pd.notna(row['PrefixType']): street_parts.append(str(row['PrefixType']).strip()) # Base street name if pd.notna(row['BaseStreet']): street_parts.append(str(row['BaseStreet']).strip()) # Suffix type if 'SuffixType' in row and pd.notna(row['SuffixType']): street_parts.append(str(row['SuffixType']).strip()) if street_parts: street_name = ' '.join(street_parts) street_titled = qgis_functions.title(street_name) street_names.append(street_titled) else: street_names.append(None) address_mapping['addr:street'] = street_names # City - try multiple field names city_fields = ['POST_COMM', 'PostalCity', 'CITY', 'Jurisdicti'] for field in city_fields: if field in processed_gdf.columns: city_names = [] for city_value in processed_gdf[field]: if pd.notna(city_value): city_titled = qgis_functions.title(str(city_value)) city_names.append(city_titled) else: city_names.append(None) address_mapping['addr:city'] = city_names break # Postal code - try multiple field names postcode_fields = ['POST_CODE', 'ZipCode', 'ZIP', 'POSTAL_CODE'] for field in postcode_fields: if field in processed_gdf.columns: post_code_series = processed_gdf[field].copy() post_code_series = pd.to_numeric(post_code_series, errors='coerce') address_mapping['addr:postcode'] = post_code_series.round().astype('Int64') break # Manually add addr:state address_mapping['addr:state'] = 'FL' # Add the new address columns to the GeoDataFrame for key, value in address_mapping.items(): processed_gdf[key] = value return processed_gdf def compare_addresses(self, local_file: str, osm_file: str) -> Tuple[List[Dict], List[Dict], List[Dict]]: """ Compare local and OSM address data. Returns: Tuple of (new_addresses, existing_addresses, removed_addresses) """ print(f"Comparing addresses: {local_file} vs {osm_file}") # Load data local_gdf = gpd.read_file(local_file) osm_gdf = gpd.read_file(osm_file) print(f"Loaded local addresses: {len(local_gdf)}") print(f"Loaded OSM addresses: {len(osm_gdf)}") # Apply sampling if requested if hasattr(self, 'sample_size') and self.sample_size: if len(local_gdf) > self.sample_size: local_gdf = local_gdf.sample(n=self.sample_size, random_state=42).reset_index(drop=True) print(f"Sampled local addresses to: {len(local_gdf)}") if hasattr(self, 'max_osm') and self.max_osm: if len(osm_gdf) > self.max_osm: osm_gdf = osm_gdf.sample(n=self.max_osm, random_state=42).reset_index(drop=True) print(f"Sampled OSM addresses to: {len(osm_gdf)}") print(f"Processing local addresses: {len(local_gdf)}") print(f"Processing OSM addresses: {len(osm_gdf)}") # Ensure both are in the same CRS if local_gdf.crs != osm_gdf.crs: osm_gdf = osm_gdf.to_crs(local_gdf.crs) # Create spatial indexes local_index = STRtree(local_gdf.geometry.tolist()) osm_index = STRtree(osm_gdf.geometry.tolist()) # Find matches existing_addresses = [] new_addresses = [] # For each local address, find closest OSM address for idx, local_row in local_gdf.iterrows(): local_point = local_row.geometry # Query nearby OSM addresses nearby_indices = osm_index.query(local_point.buffer(self.tolerance_deg)) best_match = None min_distance = float('inf') for osm_idx in nearby_indices: osm_row = osm_gdf.iloc[osm_idx] osm_point = osm_row.geometry distance = local_point.distance(osm_point) # Additional verification: check if house numbers match (handle type differences) local_house_num = str(local_row.get('addr:housenumber', '')) osm_house_num = str(osm_row.get('addr:housenumber', '')) # Only consider as potential match if house numbers match if local_house_num == osm_house_num and distance < min_distance: min_distance = distance best_match = osm_idx # Convert distance to meters for comparison distance_meters = min_distance * 111000.0 if best_match is not None and distance_meters <= self.tolerance_meters: # Found a match - this is an existing address local_props = dict(local_row.drop('geometry')) osm_props = dict(osm_gdf.iloc[best_match].drop('geometry')) existing_addresses.append({ 'geometry': local_point, 'local_data': local_props, 'osm_data': osm_props, 'distance_meters': distance_meters }) else: # No match found - this is a new address to add to OSM local_props = dict(local_row.drop('geometry')) local_props['status'] = 'new' new_addresses.append({ 'geometry': local_point, **local_props }) # Find OSM addresses that don't have local matches (potentially removed) removed_addresses = [] # Track which OSM addresses were matched during the first pass # by storing their index during the matching process above matched_osm_indices = set() # Re-do the matching to track OSM indices for idx, local_row in local_gdf.iterrows(): local_point = local_row.geometry nearby_indices = osm_index.query(local_point.buffer(self.tolerance_deg)) for osm_idx in nearby_indices: osm_row = osm_gdf.iloc[osm_idx] osm_point = osm_row.geometry distance_meters = local_point.distance(osm_point) * 111000.0 # Check if house numbers match (handle type differences) local_house_num = str(local_row.get('addr:housenumber', '')) osm_house_num = str(osm_row.get('addr:housenumber', '')) if (distance_meters <= self.tolerance_meters and local_house_num == osm_house_num): matched_osm_indices.add(osm_idx) break # Only match to first OSM address found # Find unmatched OSM addresses for idx, osm_row in osm_gdf.iterrows(): if idx not in matched_osm_indices: osm_props = dict(osm_row.drop('geometry')) osm_props['status'] = 'removed' removed_addresses.append({ 'geometry': osm_row.geometry, **osm_props }) return new_addresses, existing_addresses, removed_addresses def save_results(self, new_addresses: List[Dict], existing_addresses: List[Dict], removed_addresses: List[Dict], output_dir: str): """Save comparison results to separate GeoJSON files.""" output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Save new addresses (to add to OSM) if new_addresses: new_gdf = gpd.GeoDataFrame(new_addresses) new_file = output_dir / f"addresses_to_add_{timestamp}.geojson" new_gdf.to_file(new_file, driver='GeoJSON') print(f"Saved {len(new_addresses)} new addresses to {new_file}") # Save removed addresses (missing from local data) if removed_addresses: removed_gdf = gpd.GeoDataFrame(removed_addresses) removed_file = output_dir / f"addresses_potentially_removed_{timestamp}.geojson" removed_gdf.to_file(removed_file, driver='GeoJSON') print(f"Saved {len(removed_addresses)} potentially removed addresses to {removed_file}") # Save existing addresses for reference if existing_addresses: # Create simplified format for existing addresses existing_simple = [] for addr in existing_addresses: existing_simple.append({ 'geometry': addr['geometry'], 'distance_meters': addr['distance_meters'], 'status': 'existing' }) existing_gdf = gpd.GeoDataFrame(existing_simple) existing_file = output_dir / f"addresses_existing_{timestamp}.geojson" existing_gdf.to_file(existing_file, driver='GeoJSON') print(f"Saved {len(existing_addresses)} existing addresses to {existing_file}") def print_summary(self, new_addresses: List[Dict], existing_addresses: List[Dict], removed_addresses: List[Dict]): """Print a summary of the comparison results.""" print("\n" + "="*60) print("ADDRESS COMPARISON SUMMARY") print("="*60) print(f"\nTOTAL ADDRESSES ANALYZED:") print(f" • New addresses (to add to OSM): {len(new_addresses)}") print(f" • Existing addresses (matched): {len(existing_addresses)}") print(f" • Potentially removed addresses: {len(removed_addresses)}") if existing_addresses: distances = [addr['distance_meters'] for addr in existing_addresses] avg_distance = sum(distances) / len(distances) print(f"\nMATCHING STATISTICS:") print(f" • Average distance of matches: {avg_distance:.1f} meters") print(f" • Max distance of matches: {max(distances):.1f} meters") if new_addresses: print(f"\nNEW ADDRESSES TO ADD:") print(f" These addresses exist in local data but not in OSM") # Group by street name streets = {} for addr in new_addresses[:10]: # Show first 10 street = addr.get('addr:street', 'Unknown Street') if street not in streets: streets[street] = 0 streets[street] += 1 for street, count in sorted(streets.items()): print(f" • {street}: {count} address(es)") if len(new_addresses) > 10: print(f" • ... and {len(new_addresses) - 10} more") if removed_addresses: print(f"\nPOTENTIALLY REMOVED ADDRESSES:") print(f" These addresses exist in OSM but not in local data") print(f" (May indicate addresses that were removed or demolished)") def main(): parser = argparse.ArgumentParser( description="Compare local government address data with OpenStreetMap addresses", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python compare-addresses.py "Lake" "Florida" --local-zip "original data/Lake/Addresspoints 2025-06.zip" python compare-addresses.py "Sumter" "Florida" --local-zip "original data/Sumter/Address9_13_2024.zip" --tolerance 30 python compare-addresses.py "Orange" "Florida" --local-zip "addresses.zip" --output-dir "results/orange" """ ) parser.add_argument('county', help='County name (e.g., "Lake", "Sumter")') parser.add_argument('state', help='State name (e.g., "Florida")') parser.add_argument('--local-zip', required=True, help='Path to local address data ZIP file') parser.add_argument('--tolerance', '-t', type=float, default=50.0, help='Distance tolerance in meters for matching addresses (default: 50)') parser.add_argument('--output-dir', '-o', help='Output directory for results (default: processed data/[County])') parser.add_argument('--cache-dir', default='osm_cache', help='Directory to cache OSM downloads (default: osm_cache)') parser.add_argument('--force-download', action='store_true', help='Force re-download of OSM data (ignore cache)') parser.add_argument('--sample', '-s', type=int, help='Process only a sample of N addresses for testing') parser.add_argument('--max-osm', type=int, default=50000, help='Maximum number of OSM addresses to process (default: 50000)') args = parser.parse_args() # Validate input file local_zip = Path(args.local_zip) if not local_zip.exists(): print(f"Error: Local ZIP file {args.local_zip} does not exist") return 1 # Set output directory if args.output_dir: output_dir = Path(args.output_dir) else: output_dir = Path("processed data") / args.county try: # Create comparator comparator = AddressComparator( tolerance_meters=args.tolerance, cache_dir=args.cache_dir ) # Set sampling parameters if args.sample: comparator.sample_size = args.sample if args.max_osm: comparator.max_osm = args.max_osm # Download/load OSM data if args.force_download: # Remove existing cache for this county for cache_file in Path(args.cache_dir).glob(f"osm_addresses_{args.county.lower()}_*.geojson"): cache_file.unlink() osm_file = comparator.download_osm_addresses(args.county, args.state) # Convert local data local_file = comparator.load_local_addresses(args.local_zip) # Perform comparison new_addresses, existing_addresses, removed_addresses = comparator.compare_addresses( local_file, osm_file ) # Save results comparator.save_results(new_addresses, existing_addresses, removed_addresses, output_dir) # Print summary comparator.print_summary(new_addresses, existing_addresses, removed_addresses) return 0 except Exception as e: print(f"Error: {str(e)}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": sys.exit(main())