This commit is contained in:
2025-08-25 20:33:21 -07:00
parent ec0e4a6efb
commit ff0511b3c5
2 changed files with 195 additions and 22 deletions

156
download-overpass.py Normal file
View File

@@ -0,0 +1,156 @@
#!/usr/bin/env python3
"""
Download OSM data from Overpass API for a given county and save as GeoJSON.
Usage:
python download-overpass.py "Sumter County Florida" highways.geojson
python download-overpass.py "Lake County Florida" output/lake-addresses.geojson --type addresses
python download-overpass.py "Sumter County Florida" paths.geojson --type multimodal
TODO:
- Don't just download roads. Probably ignore relations also.
"""
import argparse
import json
import sys
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
def build_overpass_query(county_name, state_name, data_type="highways"):
"""Build Overpass API query for specified data type in a county."""
base_query = f"""[out:json][timeout:60];
area["name"="{state_name}"]->.state;
area["name"="{county_name}"](area.state)->.searchArea;"""
if data_type == "highways":
selector = '('
selector += 'way["highway"="motorway"](area.searchArea);'
selector += 'way["highway"="trunk"](area.searchArea);'
selector += 'way["highway"="primary"](area.searchArea);'
selector += 'way["highway"="secondary"](area.searchArea);'
selector += 'way["highway"="tertiary"](area.searchArea);'
selector += 'way["highway"="unclassified"](area.searchArea);'
selector += 'way["highway"="residential"](area.searchArea);'
selector += 'way["highway"~"_link"](area.searchArea);'
selector += 'way["highway"="service"](area.searchArea);'
selector += ');'
elif data_type == "addresses":
selector = 'nwr["addr:housenumber"](area.searchArea);'
elif data_type == "multimodal":
selector = '(way["highway"="path"](area.searchArea);way["highway"="cycleway"](area.searchArea););'
else:
raise ValueError(f"Unknown data type: {data_type}")
query = base_query + selector + "out geom;"
return query
def query_overpass(query):
"""Send query to Overpass API and return JSON response."""
url = "https://overpass-api.de/api/interpreter"
data = urllib.parse.urlencode({"data": query}).encode("utf-8")
try:
with urllib.request.urlopen(url, data=data) as response:
return json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
print(f"HTTP Error {e.code}: {e.reason}", file=sys.stderr)
try:
error_body = e.read().decode("utf-8")
print(f"Error response body: {error_body}", file=sys.stderr)
except:
print("Could not read error response body", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Error querying Overpass API: {e}", file=sys.stderr)
sys.exit(1)
def convert_to_geojson(overpass_data):
"""Convert Overpass API response to GeoJSON format."""
features = []
for element in overpass_data.get("elements", []):
if element["type"] == "way" and "geometry" in element:
coordinates = [[coord["lon"], coord["lat"]] for coord in element["geometry"]]
feature = {
"type": "Feature",
"properties": element.get("tags", {}),
"geometry": {
"type": "LineString",
"coordinates": coordinates
}
}
features.append(feature)
elif element["type"] == "node":
feature = {
"type": "Feature",
"properties": element.get("tags", {}),
"geometry": {
"type": "Point",
"coordinates": [element["lon"], element["lat"]]
}
}
features.append(feature)
return {
"type": "FeatureCollection",
"features": features
}
def main():
parser = argparse.ArgumentParser(
description="Download OSM data from Overpass API for a county"
)
parser.add_argument(
"county",
help="County name (e.g., 'Lake County')"
)
parser.add_argument(
"state",
help="State name (e.g., 'Florida')"
)
parser.add_argument(
"output",
help="Output GeoJSON file path"
)
parser.add_argument(
"--type", "-t",
choices=["highways", "addresses", "multimodal"],
default="highways",
help="Type of data to download (default: highways)"
)
args = parser.parse_args()
# Create output directory if it doesn't exist
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f"Downloading {args.type} for {args.county}, {args.state}...")
# Build and execute query
query = build_overpass_query(args.county, args.state, args.type)
print(f"Query: {query}")
overpass_data = query_overpass(query)
# Convert to GeoJSON
geojson = convert_to_geojson(overpass_data)
# Save to file
with open(output_path, "w", encoding="utf-8") as f:
json.dump(geojson, f, indent=2)
print(f"Saved {len(geojson['features'])} {args.type} features to {args.output}")
if __name__ == "__main__":
main()

View File

@@ -109,8 +109,8 @@ class RoadComparator:
"""Create a buffered union using chunked processing for memory efficiency."""
print("Creating optimized buffered union...")
# Process in chunks to manage memory
chunks = [gdf.iloc[i:i+self.chunk_size] for i in range(0, len(gdf), self.chunk_size)]
# Process in chunks to manage memory - extract geometries as lists
chunks = [gdf.iloc[i:i+self.chunk_size].geometry.tolist() for i in range(0, len(gdf), self.chunk_size)]
chunk_unions = []
# Use partial function for multiprocessing
@@ -144,17 +144,17 @@ class RoadComparator:
raise Exception("No valid geometries to create union")
@staticmethod
def _buffer_chunk(chunk_gdf: gpd.GeoDataFrame, tolerance: float) -> Any:
def _buffer_chunk(geometries: List, tolerance: float) -> Any:
"""Buffer geometries in a chunk and return their union."""
try:
# Buffer all geometries in the chunk
buffered = chunk_gdf.geometry.buffer(tolerance)
buffered = [geom.buffer(tolerance) for geom in geometries]
# Create union of buffered geometries
if len(buffered) == 1:
return buffered.iloc[0]
return buffered[0]
else:
return unary_union(buffered.tolist())
return unary_union(buffered)
except Exception as e:
print(f"Error in chunk processing: {str(e)}")
return None
@@ -174,9 +174,17 @@ class RoadComparator:
"""
print("Finding removed segments...")
# Split into chunks for parallel processing
chunks = [source_gdf.iloc[i:i+self.chunk_size]
for i in range(0, len(source_gdf), self.chunk_size)]
# Split into chunks for parallel processing - convert to serializable format
chunks = []
for i in range(0, len(source_gdf), self.chunk_size):
chunk_gdf = source_gdf.iloc[i:i+self.chunk_size]
chunk_data = []
for idx, row in chunk_gdf.iterrows():
chunk_data.append({
'geometry': row.geometry,
'properties': dict(row.drop('geometry'))
})
chunks.append(chunk_data)
all_removed = []
@@ -203,13 +211,14 @@ class RoadComparator:
return all_removed
@staticmethod
def _process_removed_chunk(chunk_gdf: gpd.GeoDataFrame, target_union: Any,
def _process_removed_chunk(chunk_data: List[Dict], target_union: Any,
min_length_deg: float) -> List[Dict[str, Any]]:
"""Process a chunk of geometries to find removed segments."""
removed_segments = []
for idx, row in chunk_gdf.iterrows():
geom = row.geometry
for row_data in chunk_data:
geom = row_data['geometry']
properties = row_data['properties']
# Handle MultiLineString by processing each component
if isinstance(geom, MultiLineString):
@@ -238,12 +247,12 @@ class RoadComparator:
for uncovered_line in uncovered_lines:
if uncovered_line.length >= min_length_deg:
# Create properties dict with original metadata plus 'removed: true'
properties = dict(row.drop('geometry'))
properties['removed'] = True
result_properties = properties.copy()
result_properties['removed'] = True
removed_segments.append({
'geometry': uncovered_line,
**properties
**result_properties
})
except Exception as e:
@@ -259,9 +268,17 @@ class RoadComparator:
"""
print("Finding added roads...")
# Split into chunks for parallel processing
chunks = [source_gdf.iloc[i:i+self.chunk_size]
for i in range(0, len(source_gdf), self.chunk_size)]
# Split into chunks for parallel processing - convert to serializable format
chunks = []
for i in range(0, len(source_gdf), self.chunk_size):
chunk_gdf = source_gdf.iloc[i:i+self.chunk_size]
chunk_data = []
for idx, row in chunk_gdf.iterrows():
chunk_data.append({
'geometry': row.geometry,
'properties': dict(row.drop('geometry'))
})
chunks.append(chunk_data)
all_added = []
@@ -288,13 +305,14 @@ class RoadComparator:
return all_added
@staticmethod
def _process_added_chunk(chunk_gdf: gpd.GeoDataFrame, target_union: Any,
def _process_added_chunk(chunk_data: List[Dict], target_union: Any,
min_length_deg: float) -> List[Dict[str, Any]]:
"""Process a chunk of geometries to find added roads."""
added_roads = []
for idx, row in chunk_gdf.iterrows():
geom = row.geometry
for row_data in chunk_data:
geom = row_data['geometry']
original_properties = row_data['properties']
try:
# Check what portion of the road is not covered
@@ -319,7 +337,6 @@ class RoadComparator:
if uncovered_ratio > 0.1:
#uncovered_length >= min_length_deg and
# Include entire original road with all original metadata
original_properties = dict(row.drop('geometry'))
#
# For Sumter County Roads