Add readme info, claude scripts, gitignore

This commit is contained in:
2025-10-15 18:40:02 -07:00
parent ff0511b3c5
commit e9e284ee66
4 changed files with 694 additions and 1 deletions

3
.gitignore vendored
View File

@@ -1,2 +1,5 @@
__pycache__ __pycache__
desktop.ini desktop.ini
*.geojson
osm_cache/
.claude

View File

@@ -2,6 +2,9 @@
See [https://wiki.openstreetmap.org/wiki/The_Villages_Road_and_Address_Import](https://wiki.openstreetmap.org/wiki/The_Villages_Road_and_Address_Import) See [https://wiki.openstreetmap.org/wiki/The_Villages_Road_and_Address_Import](https://wiki.openstreetmap.org/wiki/The_Villages_Road_and_Address_Import)
See compare-addresses.py for an automated way of running the complete address diff toolchain in one step.
- TODO: fails to split out units
## Data ## Data
- Lake County Streets and Address Points: https://c.lakecountyfl.gov/ftp/GIS/GisDownloads/Shapefiles/ - Lake County Streets and Address Points: https://c.lakecountyfl.gov/ftp/GIS/GisDownloads/Shapefiles/
@@ -120,3 +123,22 @@ source:url=https://gitlab.com/zyphlar/the-villages-import
* Review imported data in Achavi or Osmcha to ensure it looks proper. * Review imported data in Achavi or Osmcha to ensure it looks proper.
## Useful queries:
```
[timeout:60];
area["name"="Florida"]->.state;
area["name"="Lake County"](area.state)->.searchArea;nwr["addr:housenumber"](area.searchArea);
(._;>;);
out meta;
```
```
[timeout:60];
area["name"="Florida"]->.state;
area["name"="Lake County"](area.state)->.searchArea;way["highway"](area.searchArea);
(._;>;);
out meta;
```

627
compare-addresses.py Normal file
View File

@@ -0,0 +1,627 @@
#!/usr/bin/env python3
"""
Address Data Comparison Tool for US Counties
Compares local government address data (from ZIP/shapefile) with OpenStreetMap address data.
Downloads OSM data via Overpass API, converts local data to GeoJSON, and performs comprehensive
comparison to identify new, existing, and removed addresses.
Usage:
python compare-addresses.py "Lake" "Florida" --local-zip "original data/Lake/Addresspoints 2025-06.zip"
python compare-addresses.py "Sumter" "Florida" --local-zip "original data/Sumter/Address9_13_2024.zip"
"""
import argparse
import json
import os
import sys
import zipfile
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple, Any, Optional
import urllib.error
import urllib.parse
import urllib.request
import geopandas as gpd
import pandas as pd
from shapely.geometry import Point
from shapely.strtree import STRtree
from shapely.ops import nearest_points
import warnings
# Import local modules
import importlib
qgis_functions = importlib.import_module("qgis-functions")
# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')
class AddressComparator:
def __init__(self, tolerance_meters: float = 50.0, cache_dir: str = "osm_cache"):
"""
Initialize the address comparator.
Args:
tolerance_meters: Distance tolerance for considering addresses as matching
cache_dir: Directory to cache OSM data
"""
self.tolerance_meters = tolerance_meters
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
# Convert meters to degrees (approximate)
# 1 degree latitude ≈ 111,000 meters
self.tolerance_deg = tolerance_meters / 111000.0
def download_osm_addresses(self, county: str, state: str, output_file: str = None) -> str:
"""Download address data from OpenStreetMap via Overpass API."""
if output_file is None:
timestamp = datetime.now().strftime("%Y%m%d")
output_file = self.cache_dir / f"osm_addresses_{county.lower()}_{timestamp}.geojson"
else:
output_file = Path(output_file)
# Check if cached file exists and is recent (less than 7 days old)
if output_file.exists():
file_age = datetime.now().timestamp() - output_file.stat().st_mtime
if file_age < 7 * 24 * 3600: # 7 days in seconds
print(f"Using cached OSM data: {output_file}")
return str(output_file)
print(f"Downloading OSM addresses for {county} County, {state}...")
# Build Overpass query for addresses
query = f"""[out:json][timeout:180];
area["name"="{state}"]->.state;
area["name"="{county} County"](area.state)->.searchArea;
nwr["addr:housenumber"](area.searchArea);
out geom;"""
# Query Overpass API
osm_data = self._query_overpass(query)
# Convert to GeoJSON
geojson = self._convert_osm_to_geojson(osm_data)
# Save to file
output_file.parent.mkdir(parents=True, exist_ok=True)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(geojson, f, indent=2)
print(f"Downloaded {len(geojson['features'])} OSM addresses to {output_file}")
return str(output_file)
def _query_overpass(self, query: str) -> Dict[str, Any]:
"""Send query to Overpass API and return JSON response."""
url = "https://overpass-api.de/api/interpreter"
data = urllib.parse.urlencode({"data": query}).encode("utf-8")
try:
with urllib.request.urlopen(url, data=data, timeout=300) as response:
return json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
print(f"HTTP Error {e.code}: {e.reason}", file=sys.stderr)
try:
error_body = e.read().decode("utf-8")
print(f"Error response: {error_body}", file=sys.stderr)
except:
pass
sys.exit(1)
except Exception as e:
print(f"Error querying Overpass API: {e}", file=sys.stderr)
sys.exit(1)
def _convert_osm_to_geojson(self, overpass_data: Dict[str, Any]) -> Dict[str, Any]:
"""Convert Overpass API response to GeoJSON format."""
features = []
for element in overpass_data.get("elements", []):
properties = element.get("tags", {})
# Extract coordinates based on element type
if element["type"] == "node":
coordinates = [element["lon"], element["lat"]]
geometry = {"type": "Point", "coordinates": coordinates}
elif element["type"] == "way" and "geometry" in element:
# For ways, use the centroid
coords = [[coord["lon"], coord["lat"]] for coord in element["geometry"]]
if len(coords) > 0:
# Calculate centroid
lon = sum(coord[0] for coord in coords) / len(coords)
lat = sum(coord[1] for coord in coords) / len(coords)
coordinates = [lon, lat]
geometry = {"type": "Point", "coordinates": coordinates}
else:
continue
else:
continue # Skip relations and ways without geometry
feature = {
"type": "Feature",
"properties": properties,
"geometry": geometry
}
features.append(feature)
return {
"type": "FeatureCollection",
"features": features
}
def load_local_addresses(self, zip_path: str, output_geojson: str = None) -> str:
"""Load and convert local address data from ZIP file."""
zip_path = Path(zip_path)
if output_geojson is None:
output_geojson = zip_path.parent / f"{zip_path.stem}_converted.geojson"
else:
output_geojson = Path(output_geojson)
# Check if conversion already exists and is newer than the ZIP
if (output_geojson.exists() and
zip_path.exists() and
output_geojson.stat().st_mtime > zip_path.stat().st_mtime):
print(f"Using existing converted data: {output_geojson}")
return str(output_geojson)
print(f"Converting local address data from {zip_path}...")
# Extract and find shapefile in ZIP
temp_dir = zip_path.parent / "temp_extract"
temp_dir.mkdir(exist_ok=True)
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
# Find the shapefile
shp_files = list(temp_dir.glob("*.shp"))
if not shp_files:
raise FileNotFoundError("No shapefile (.shp) found in ZIP")
shp_file = shp_files[0]
# Load and process shapefile
gdf = gpd.read_file(shp_file)
# Convert CRS to WGS84 if needed
if gdf.crs and gdf.crs != 'EPSG:4326':
print(f"Converting from {gdf.crs} to EPSG:4326")
gdf = gdf.to_crs('EPSG:4326')
# Process address fields using existing logic from sumter-address-convert.py
gdf = self._process_address_fields(gdf)
# Filter to only point geometries and valid addresses
gdf = gdf[gdf.geometry.type == 'Point'].copy()
gdf = gdf[gdf['addr:housenumber'].notna()].copy()
# Clean output data - keep only OSM address fields
osm_fields = [
'addr:housenumber', 'addr:unit', 'addr:street',
'addr:city', 'addr:postcode', 'addr:state'
]
existing_fields = [field for field in osm_fields if field in gdf.columns]
gdf = gdf[existing_fields + ['geometry']]
# Save to GeoJSON
output_geojson.parent.mkdir(parents=True, exist_ok=True)
gdf.to_file(output_geojson, driver='GeoJSON')
print(f"Converted {len(gdf)} addresses to {output_geojson}")
finally:
# Clean up temp directory
import shutil
if temp_dir.exists():
shutil.rmtree(temp_dir)
return str(output_geojson)
def _process_address_fields(self, gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Process address fields according to OSM schema (handles multiple formats)."""
processed_gdf = gdf.copy()
address_mapping = {}
# House number - try multiple field names
house_number_fields = ['ADD_NUM', 'AddressNum', 'ADDRESS_NUM', 'HOUSE_NUM']
for field in house_number_fields:
if field in processed_gdf.columns:
add_num_series = processed_gdf[field].copy()
add_num_series = pd.to_numeric(add_num_series, errors='coerce')
address_mapping['addr:housenumber'] = add_num_series.round().astype('Int64')
break
# Unit number - try multiple field names
unit_fields = ['UNIT', 'UnitNumber', 'UNIT_NUM', 'APT']
for field in unit_fields:
if field in processed_gdf.columns:
unit_series = processed_gdf[field].copy()
unit_series = unit_series.replace(['nan', 'None', '', None], None)
unit_series = unit_series.where(unit_series.notna(), None)
address_mapping['addr:unit'] = unit_series
break
# Street name - try multiple approaches
if 'SADD' in processed_gdf.columns:
# Sumter County format - full address in SADD field
street_names = []
for sadd_value in processed_gdf['SADD']:
if pd.notna(sadd_value):
street_from_addr = qgis_functions.getstreetfromaddress(str(sadd_value), None, None)
street_titled = qgis_functions.title(street_from_addr)
street_names.append(street_titled)
else:
street_names.append(None)
address_mapping['addr:street'] = street_names
elif 'FullAddres' in processed_gdf.columns:
# Lake County format - full address in FullAddres field
street_names = []
for full_addr in processed_gdf['FullAddres']:
if pd.notna(full_addr):
street_from_addr = qgis_functions.getstreetfromaddress(str(full_addr), None, None)
street_titled = qgis_functions.title(street_from_addr)
street_names.append(street_titled)
else:
street_names.append(None)
address_mapping['addr:street'] = street_names
elif 'BaseStreet' in processed_gdf.columns:
# Lake County alternative - combine street components
street_names = []
for idx, row in processed_gdf.iterrows():
street_parts = []
# Prefix direction
if 'PrefixDire' in row and pd.notna(row['PrefixDire']):
street_parts.append(str(row['PrefixDire']).strip())
# Prefix type
if 'PrefixType' in row and pd.notna(row['PrefixType']):
street_parts.append(str(row['PrefixType']).strip())
# Base street name
if pd.notna(row['BaseStreet']):
street_parts.append(str(row['BaseStreet']).strip())
# Suffix type
if 'SuffixType' in row and pd.notna(row['SuffixType']):
street_parts.append(str(row['SuffixType']).strip())
if street_parts:
street_name = ' '.join(street_parts)
street_titled = qgis_functions.title(street_name)
street_names.append(street_titled)
else:
street_names.append(None)
address_mapping['addr:street'] = street_names
# City - try multiple field names
city_fields = ['POST_COMM', 'PostalCity', 'CITY', 'Jurisdicti']
for field in city_fields:
if field in processed_gdf.columns:
city_names = []
for city_value in processed_gdf[field]:
if pd.notna(city_value):
city_titled = qgis_functions.title(str(city_value))
city_names.append(city_titled)
else:
city_names.append(None)
address_mapping['addr:city'] = city_names
break
# Postal code - try multiple field names
postcode_fields = ['POST_CODE', 'ZipCode', 'ZIP', 'POSTAL_CODE']
for field in postcode_fields:
if field in processed_gdf.columns:
post_code_series = processed_gdf[field].copy()
post_code_series = pd.to_numeric(post_code_series, errors='coerce')
address_mapping['addr:postcode'] = post_code_series.round().astype('Int64')
break
# Manually add addr:state
address_mapping['addr:state'] = 'FL'
# Add the new address columns to the GeoDataFrame
for key, value in address_mapping.items():
processed_gdf[key] = value
return processed_gdf
def compare_addresses(self, local_file: str, osm_file: str) -> Tuple[List[Dict], List[Dict], List[Dict]]:
"""
Compare local and OSM address data.
Returns:
Tuple of (new_addresses, existing_addresses, removed_addresses)
"""
print(f"Comparing addresses: {local_file} vs {osm_file}")
# Load data
local_gdf = gpd.read_file(local_file)
osm_gdf = gpd.read_file(osm_file)
print(f"Loaded local addresses: {len(local_gdf)}")
print(f"Loaded OSM addresses: {len(osm_gdf)}")
# Apply sampling if requested
if hasattr(self, 'sample_size') and self.sample_size:
if len(local_gdf) > self.sample_size:
local_gdf = local_gdf.sample(n=self.sample_size, random_state=42).reset_index(drop=True)
print(f"Sampled local addresses to: {len(local_gdf)}")
if hasattr(self, 'max_osm') and self.max_osm:
if len(osm_gdf) > self.max_osm:
osm_gdf = osm_gdf.sample(n=self.max_osm, random_state=42).reset_index(drop=True)
print(f"Sampled OSM addresses to: {len(osm_gdf)}")
print(f"Processing local addresses: {len(local_gdf)}")
print(f"Processing OSM addresses: {len(osm_gdf)}")
# Ensure both are in the same CRS
if local_gdf.crs != osm_gdf.crs:
osm_gdf = osm_gdf.to_crs(local_gdf.crs)
# Create spatial indexes
local_index = STRtree(local_gdf.geometry.tolist())
osm_index = STRtree(osm_gdf.geometry.tolist())
# Find matches
existing_addresses = []
new_addresses = []
# For each local address, find closest OSM address
for idx, local_row in local_gdf.iterrows():
local_point = local_row.geometry
# Query nearby OSM addresses
nearby_indices = osm_index.query(local_point.buffer(self.tolerance_deg))
best_match = None
min_distance = float('inf')
for osm_idx in nearby_indices:
osm_row = osm_gdf.iloc[osm_idx]
osm_point = osm_row.geometry
distance = local_point.distance(osm_point)
# Additional verification: check if house numbers match (handle type differences)
local_house_num = str(local_row.get('addr:housenumber', ''))
osm_house_num = str(osm_row.get('addr:housenumber', ''))
# Only consider as potential match if house numbers match
if local_house_num == osm_house_num and distance < min_distance:
min_distance = distance
best_match = osm_idx
# Convert distance to meters for comparison
distance_meters = min_distance * 111000.0
if best_match is not None and distance_meters <= self.tolerance_meters:
# Found a match - this is an existing address
local_props = dict(local_row.drop('geometry'))
osm_props = dict(osm_gdf.iloc[best_match].drop('geometry'))
existing_addresses.append({
'geometry': local_point,
'local_data': local_props,
'osm_data': osm_props,
'distance_meters': distance_meters
})
else:
# No match found - this is a new address to add to OSM
local_props = dict(local_row.drop('geometry'))
local_props['status'] = 'new'
new_addresses.append({
'geometry': local_point,
**local_props
})
# Find OSM addresses that don't have local matches (potentially removed)
removed_addresses = []
# Track which OSM addresses were matched during the first pass
# by storing their index during the matching process above
matched_osm_indices = set()
# Re-do the matching to track OSM indices
for idx, local_row in local_gdf.iterrows():
local_point = local_row.geometry
nearby_indices = osm_index.query(local_point.buffer(self.tolerance_deg))
for osm_idx in nearby_indices:
osm_row = osm_gdf.iloc[osm_idx]
osm_point = osm_row.geometry
distance_meters = local_point.distance(osm_point) * 111000.0
# Check if house numbers match (handle type differences)
local_house_num = str(local_row.get('addr:housenumber', ''))
osm_house_num = str(osm_row.get('addr:housenumber', ''))
if (distance_meters <= self.tolerance_meters and
local_house_num == osm_house_num):
matched_osm_indices.add(osm_idx)
break # Only match to first OSM address found
# Find unmatched OSM addresses
for idx, osm_row in osm_gdf.iterrows():
if idx not in matched_osm_indices:
osm_props = dict(osm_row.drop('geometry'))
osm_props['status'] = 'removed'
removed_addresses.append({
'geometry': osm_row.geometry,
**osm_props
})
return new_addresses, existing_addresses, removed_addresses
def save_results(self, new_addresses: List[Dict], existing_addresses: List[Dict],
removed_addresses: List[Dict], output_dir: str):
"""Save comparison results to separate GeoJSON files."""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Save new addresses (to add to OSM)
if new_addresses:
new_gdf = gpd.GeoDataFrame(new_addresses)
new_file = output_dir / f"addresses_to_add_{timestamp}.geojson"
new_gdf.to_file(new_file, driver='GeoJSON')
print(f"Saved {len(new_addresses)} new addresses to {new_file}")
# Save removed addresses (missing from local data)
if removed_addresses:
removed_gdf = gpd.GeoDataFrame(removed_addresses)
removed_file = output_dir / f"addresses_potentially_removed_{timestamp}.geojson"
removed_gdf.to_file(removed_file, driver='GeoJSON')
print(f"Saved {len(removed_addresses)} potentially removed addresses to {removed_file}")
# Save existing addresses for reference
if existing_addresses:
# Create simplified format for existing addresses
existing_simple = []
for addr in existing_addresses:
existing_simple.append({
'geometry': addr['geometry'],
'distance_meters': addr['distance_meters'],
'status': 'existing'
})
existing_gdf = gpd.GeoDataFrame(existing_simple)
existing_file = output_dir / f"addresses_existing_{timestamp}.geojson"
existing_gdf.to_file(existing_file, driver='GeoJSON')
print(f"Saved {len(existing_addresses)} existing addresses to {existing_file}")
def print_summary(self, new_addresses: List[Dict], existing_addresses: List[Dict],
removed_addresses: List[Dict]):
"""Print a summary of the comparison results."""
print("\n" + "="*60)
print("ADDRESS COMPARISON SUMMARY")
print("="*60)
print(f"\nTOTAL ADDRESSES ANALYZED:")
print(f" • New addresses (to add to OSM): {len(new_addresses)}")
print(f" • Existing addresses (matched): {len(existing_addresses)}")
print(f" • Potentially removed addresses: {len(removed_addresses)}")
if existing_addresses:
distances = [addr['distance_meters'] for addr in existing_addresses]
avg_distance = sum(distances) / len(distances)
print(f"\nMATCHING STATISTICS:")
print(f" • Average distance of matches: {avg_distance:.1f} meters")
print(f" • Max distance of matches: {max(distances):.1f} meters")
if new_addresses:
print(f"\nNEW ADDRESSES TO ADD:")
print(f" These addresses exist in local data but not in OSM")
# Group by street name
streets = {}
for addr in new_addresses[:10]: # Show first 10
street = addr.get('addr:street', 'Unknown Street')
if street not in streets:
streets[street] = 0
streets[street] += 1
for street, count in sorted(streets.items()):
print(f"{street}: {count} address(es)")
if len(new_addresses) > 10:
print(f" • ... and {len(new_addresses) - 10} more")
if removed_addresses:
print(f"\nPOTENTIALLY REMOVED ADDRESSES:")
print(f" These addresses exist in OSM but not in local data")
print(f" (May indicate addresses that were removed or demolished)")
def main():
parser = argparse.ArgumentParser(
description="Compare local government address data with OpenStreetMap addresses",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python compare-addresses.py "Lake" "Florida" --local-zip "original data/Lake/Addresspoints 2025-06.zip"
python compare-addresses.py "Sumter" "Florida" --local-zip "original data/Sumter/Address9_13_2024.zip" --tolerance 30
python compare-addresses.py "Orange" "Florida" --local-zip "addresses.zip" --output-dir "results/orange"
"""
)
parser.add_argument('county', help='County name (e.g., "Lake", "Sumter")')
parser.add_argument('state', help='State name (e.g., "Florida")')
parser.add_argument('--local-zip', required=True, help='Path to local address data ZIP file')
parser.add_argument('--tolerance', '-t', type=float, default=50.0,
help='Distance tolerance in meters for matching addresses (default: 50)')
parser.add_argument('--output-dir', '-o', help='Output directory for results (default: processed data/[County])')
parser.add_argument('--cache-dir', default='osm_cache',
help='Directory to cache OSM downloads (default: osm_cache)')
parser.add_argument('--force-download', action='store_true',
help='Force re-download of OSM data (ignore cache)')
parser.add_argument('--sample', '-s', type=int,
help='Process only a sample of N addresses for testing')
parser.add_argument('--max-osm', type=int, default=50000,
help='Maximum number of OSM addresses to process (default: 50000)')
args = parser.parse_args()
# Validate input file
local_zip = Path(args.local_zip)
if not local_zip.exists():
print(f"Error: Local ZIP file {args.local_zip} does not exist")
return 1
# Set output directory
if args.output_dir:
output_dir = Path(args.output_dir)
else:
output_dir = Path("processed data") / args.county
try:
# Create comparator
comparator = AddressComparator(
tolerance_meters=args.tolerance,
cache_dir=args.cache_dir
)
# Set sampling parameters
if args.sample:
comparator.sample_size = args.sample
if args.max_osm:
comparator.max_osm = args.max_osm
# Download/load OSM data
if args.force_download:
# Remove existing cache for this county
for cache_file in Path(args.cache_dir).glob(f"osm_addresses_{args.county.lower()}_*.geojson"):
cache_file.unlink()
osm_file = comparator.download_osm_addresses(args.county, args.state)
# Convert local data
local_file = comparator.load_local_addresses(args.local_zip)
# Perform comparison
new_addresses, existing_addresses, removed_addresses = comparator.compare_addresses(
local_file, osm_file
)
# Save results
comparator.save_results(new_addresses, existing_addresses, removed_addresses, output_dir)
# Print summary
comparator.print_summary(new_addresses, existing_addresses, removed_addresses)
return 0
except Exception as e:
print(f"Error: {str(e)}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())

41
run-lake-addresses.py Normal file
View File

@@ -0,0 +1,41 @@
#!/usr/bin/env python3
"""
Simple wrapper script for comparing Lake County addresses
"""
import subprocess
import sys
from pathlib import Path
def main():
# Change to script directory
script_dir = Path(__file__).parent
# Define the command
cmd = [
sys.executable,
"compare-addresses.py",
"Lake",
"Florida",
"--local-zip", "original data/Lake/Addresspoints 2025-06.zip",
"--tolerance", "50",
"--output-dir", "processed data/Lake"
]
print("Running Lake County address comparison...")
print("Command:", " ".join(cmd))
print()
# Run the command
result = subprocess.run(cmd, cwd=script_dir)
if result.returncode == 0:
print("\nAddress comparison completed successfully!")
print("Results saved in: processed data/Lake/")
else:
print(f"\nError: Script failed with return code {result.returncode}")
return result.returncode
if __name__ == "__main__":
sys.exit(main())