258 lines
8.6 KiB
Python
258 lines
8.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Shapefile to GeoJSON Converter for Address Data
|
|
Converts ESRI:102659 CRS shapefile to EPSG:4326 GeoJSON with OSM-style address tags
|
|
"""
|
|
|
|
import geopandas as gpd
|
|
import json
|
|
import sys
|
|
import os
|
|
from pathlib import Path
|
|
|
|
import importlib
|
|
qgis_functions = importlib.import_module("qgis-functions")
|
|
title = qgis_functions.title
|
|
getstreetfromaddress = qgis_functions.getstreetfromaddress
|
|
|
|
def convert_crs(gdf, source_crs='ESRI:102659', target_crs='EPSG:4326'):
|
|
"""
|
|
Convert coordinate reference system from source to target CRS
|
|
|
|
Args:
|
|
gdf: GeoDataFrame to convert
|
|
source_crs: Source coordinate reference system (default: ESRI:102659)
|
|
target_crs: Target coordinate reference system (default: EPSG:4326)
|
|
|
|
Returns:
|
|
GeoDataFrame with converted CRS
|
|
"""
|
|
if gdf.crs is None:
|
|
print(f"Warning: No CRS detected, assuming {source_crs}")
|
|
gdf.crs = source_crs
|
|
|
|
if gdf.crs != target_crs:
|
|
print(f"Converting from {gdf.crs} to {target_crs}")
|
|
gdf = gdf.to_crs(target_crs)
|
|
|
|
return gdf
|
|
|
|
def process_address_fields(gdf):
|
|
"""
|
|
Process and map address fields according to OSM address schema
|
|
|
|
Args:
|
|
gdf: GeoDataFrame with address data
|
|
|
|
Returns:
|
|
GeoDataFrame with processed address fields
|
|
"""
|
|
processed_gdf = gdf.copy()
|
|
|
|
# Create new columns for OSM address tags
|
|
address_mapping = {}
|
|
|
|
# ADD_NUM -> addr:housenumber (as integer)
|
|
if 'ADD_NUM' in processed_gdf.columns:
|
|
# Handle NaN values and convert to nullable integer
|
|
add_num_series = processed_gdf['ADD_NUM'].copy()
|
|
# Convert to numeric, coercing errors to NaN
|
|
add_num_series = pd.to_numeric(add_num_series, errors='coerce')
|
|
# Round to remove decimal places, then convert to nullable integer
|
|
address_mapping['addr:housenumber'] = add_num_series.round().astype('Int64')
|
|
|
|
# UNIT -> addr:unit (as string)
|
|
if 'UNIT' in processed_gdf.columns:
|
|
unit_series = processed_gdf['UNIT'].copy()
|
|
# Replace NaN, empty strings, and 'None' string with actual None
|
|
unit_series = unit_series.replace(['nan', 'None', '', None], None)
|
|
# Only keep non-null values as strings
|
|
unit_series = unit_series.where(unit_series.notna(), None)
|
|
address_mapping['addr:unit'] = unit_series
|
|
|
|
# SADD -> addr:street via title(getstreetfromaddress("SADD"))
|
|
if 'SADD' in processed_gdf.columns:
|
|
street_names = []
|
|
for sadd_value in processed_gdf['SADD']:
|
|
if pd.notna(sadd_value):
|
|
street_from_addr = getstreetfromaddress(str(sadd_value), None, None)
|
|
street_titled = title(street_from_addr)
|
|
street_names.append(street_titled)
|
|
else:
|
|
street_names.append(None)
|
|
address_mapping['addr:street'] = street_names
|
|
|
|
# POST_COMM -> addr:city via title("POST_COMM")
|
|
if 'POST_COMM' in processed_gdf.columns:
|
|
city_names = []
|
|
for post_comm in processed_gdf['POST_COMM']:
|
|
if pd.notna(post_comm):
|
|
city_titled = title(str(post_comm))
|
|
city_names.append(city_titled)
|
|
else:
|
|
city_names.append(None)
|
|
address_mapping['addr:city'] = city_names
|
|
|
|
# POST_CODE -> addr:postcode (as integer)
|
|
if 'POST_CODE' in processed_gdf.columns:
|
|
# Handle NaN values and convert to nullable integer
|
|
post_code_series = processed_gdf['POST_CODE'].copy()
|
|
# Convert to numeric, coercing errors to NaN
|
|
post_code_series = pd.to_numeric(post_code_series, errors='coerce')
|
|
# Round to remove decimal places, then convert to nullable integer
|
|
address_mapping['addr:postcode'] = post_code_series.round().astype('Int64')
|
|
|
|
# Manually add addr:state = 'FL'
|
|
address_mapping['addr:state'] = 'FL'
|
|
|
|
# Add the new address columns to the GeoDataFrame
|
|
for key, value in address_mapping.items():
|
|
processed_gdf[key] = value
|
|
|
|
return processed_gdf
|
|
|
|
def clean_output_data(gdf, keep_original_fields=False):
|
|
"""
|
|
Clean the output data, optionally keeping original fields
|
|
|
|
Args:
|
|
gdf: GeoDataFrame to clean
|
|
keep_original_fields: Whether to keep original shapefile fields
|
|
|
|
Returns:
|
|
Cleaned GeoDataFrame
|
|
"""
|
|
# Define the OSM address fields we want to keep
|
|
osm_fields = [
|
|
'addr:housenumber', 'addr:unit', 'addr:street',
|
|
'addr:city', 'addr:postcode', 'addr:state'
|
|
]
|
|
|
|
if keep_original_fields:
|
|
# Keep both original and OSM fields
|
|
original_fields = ['ADD_NUM', 'UNIT', 'SADD', 'POST_COMM', 'POST_CODE']
|
|
fields_to_keep = list(set(osm_fields + original_fields + ['geometry']))
|
|
else:
|
|
# Keep only OSM fields and geometry
|
|
fields_to_keep = osm_fields + ['geometry']
|
|
|
|
# Filter to only existing columns
|
|
existing_fields = [field for field in fields_to_keep if field in gdf.columns]
|
|
|
|
return gdf[existing_fields]
|
|
|
|
def convert_shapefile_to_geojson(
|
|
input_shapefile,
|
|
output_geojson,
|
|
keep_original_fields=False,
|
|
source_crs='ESRI:102659',
|
|
target_crs='EPSG:4326'
|
|
):
|
|
"""
|
|
Main conversion function
|
|
|
|
Args:
|
|
input_shapefile: Path to input shapefile
|
|
output_geojson: Path to output GeoJSON file
|
|
keep_original_fields: Whether to keep original shapefile fields
|
|
source_crs: Source coordinate reference system
|
|
target_crs: Target coordinate reference system
|
|
"""
|
|
try:
|
|
# Read shapefile
|
|
print(f"Reading shapefile: {input_shapefile}")
|
|
gdf = gpd.read_file(input_shapefile)
|
|
print(f"Loaded {len(gdf)} features")
|
|
|
|
# Display original columns
|
|
print(f"Original columns: {list(gdf.columns)}")
|
|
|
|
# Convert CRS if needed
|
|
gdf = convert_crs(gdf, source_crs, target_crs)
|
|
|
|
# Process address fields
|
|
print("Processing address fields...")
|
|
gdf = process_address_fields(gdf)
|
|
|
|
# Clean output data
|
|
gdf = clean_output_data(gdf, keep_original_fields)
|
|
|
|
# Remove rows with no valid geometry
|
|
gdf = gdf[gdf.geometry.notna()]
|
|
|
|
print(f"Final columns: {list(gdf.columns)}")
|
|
print(f"Final feature count: {len(gdf)}")
|
|
|
|
# Write to GeoJSON
|
|
print(f"Writing GeoJSON: {output_geojson}")
|
|
gdf.to_file(output_geojson, driver='GeoJSON')
|
|
|
|
print(f"Conversion completed successfully!")
|
|
|
|
# Display sample of processed data
|
|
if len(gdf) > 0:
|
|
print("\nSample of processed data:")
|
|
sample_cols = [col for col in gdf.columns if col.startswith('addr:')]
|
|
if sample_cols:
|
|
print(gdf[sample_cols].head())
|
|
|
|
except Exception as e:
|
|
print(f"Error during conversion: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
def main():
|
|
"""
|
|
Main function to handle command line arguments
|
|
"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description='Convert shapefile to GeoJSON with OSM address tags'
|
|
)
|
|
parser.add_argument(
|
|
'input_shapefile',
|
|
help='Path to input shapefile'
|
|
)
|
|
parser.add_argument(
|
|
'output_geojson',
|
|
help='Path to output GeoJSON file'
|
|
)
|
|
parser.add_argument(
|
|
'--keep-original',
|
|
action='store_true',
|
|
help='Keep original shapefile fields in addition to OSM fields'
|
|
)
|
|
parser.add_argument(
|
|
'--source-crs',
|
|
default='ESRI:102659',
|
|
help='Source coordinate reference system (default: ESRI:102659)'
|
|
)
|
|
parser.add_argument(
|
|
'--target-crs',
|
|
default='EPSG:4326',
|
|
help='Target coordinate reference system (default: EPSG:4326)'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Validate input file
|
|
if not os.path.exists(args.input_shapefile):
|
|
print(f"Error: Input shapefile '{args.input_shapefile}' not found")
|
|
sys.exit(1)
|
|
|
|
# Create output directory if it doesn't exist
|
|
output_dir = Path(args.output_geojson).parent
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Run conversion
|
|
convert_shapefile_to_geojson(
|
|
args.input_shapefile,
|
|
args.output_geojson,
|
|
args.keep_original,
|
|
args.source_crs,
|
|
args.target_crs
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
import pandas as pd
|
|
main() |