from __future__ import annotations import csv from html import unescape from io import TextIOWrapper import json import re from typing import Any, Iterable from xml.etree import ElementTree as ET import zipfile import httpx from civilplan_mcp.config import get_settings from civilplan_mcp.models import ProjectDomain from civilplan_mcp.tools._base import wrap_response VWORLD_ADDRESS_URL = "https://api.vworld.kr/req/address" VWORLD_DATA_URL = "https://api.vworld.kr/req/data" VWORLD_WFS_URL = "https://api.vworld.kr/req/wfs" EUM_LAND_USE_URL = "https://www.eum.go.kr/eum/plan/info/getLandUseInfo" EUM_LAND_USE_GY_AJAX_URL = "https://www.eum.go.kr/web/ar/lu/luLandDetUseGYAjax.jsp" CADASTRAL_LAYER = "LP_PA_CBND_BUBUN" LAND_USE_LAYER = "LT_C_LHBLPN" LAND_PRICE_DIR = "land_prices" _LAND_USE_ALIASES = { "use_district": ( "usedistrict", "usedistrictnm", "usedistrictname", "usedistrictnm1", "useDistrict", "zonenm", "zonename", "landuse", ), "district_2": ( "district2", "usedistrict2", "usedistrict2nm", "usedistrict2name", "usezone", "usezonename", ), "bcr": ("bcr", "bcrrate", "buildingcoverage"), "far": ("far", "farrate", "floorarearatio"), "height_limit_m": ("height", "heightlimit", "heightlimitm"), } _LAND_PRICE_KEY_ALIASES = {"pnu", "필지고유번호", "법정동코드"} _LAND_PRICE_VALUE_ALIASES = {"공시지가", "landprice", "지가"} def extract_address_result(payload: dict[str, Any]) -> dict[str, Any]: response = payload.get("response", {}) if response.get("status") != "OK": error = response.get("error", {}) raise ValueError(error.get("text", "address lookup failed")) result = response.get("result", {}) point = result.get("point", {}) or {} items = result.get("items", []) or [] first = items[0] if items else {} parcel = first.get("address", {}).get("parcel") pnu = first.get("id") or first.get("pnu") return { "pnu": pnu, "parcel_address": parcel, "x": float(point["x"]) if point.get("x") else None, "y": float(point["y"]) if point.get("y") else None, } def extract_feature_properties(payload: dict[str, Any]) -> dict[str, Any]: if payload.get("type") == "FeatureCollection": features = payload.get("features", []) if not features: return {} return features[0].get("properties", {}) or {} response = payload.get("response", {}) if response.get("status") != "OK": error = response.get("error", {}) raise ValueError(error.get("text", "feature lookup failed")) features = response.get("result", {}).get("featureCollection", {}).get("features", []) if not features: return {} return features[0].get("properties", {}) or {} def build_land_use_bbox_params(x: float, y: float, api_key: str, buffer_deg: float = 0.0005) -> dict[str, Any]: return { "SERVICE": "WFS", "VERSION": "2.0.0", "REQUEST": "GetFeature", "TYPENAME": "lt_c_lhblpn", "BBOX": f"{x-buffer_deg:.4f},{y-buffer_deg:.4f},{x+buffer_deg:.4f},{y+buffer_deg:.4f},EPSG:4326", "SRSNAME": "EPSG:4326", "KEY": api_key, "OUTPUTFORMAT": "application/json", } def _normalize_header(value: str) -> str: return re.sub(r"[^0-9a-z가-힣]+", "", value.lower()) def _open_tabular_text(binary_handle, suffix: str) -> Iterable[dict[str, Any]]: for encoding in ("utf-8-sig", "cp949", "euc-kr"): try: wrapper = TextIOWrapper(binary_handle, encoding=encoding, newline="") sample = wrapper.read(2048) wrapper.seek(0) dialect = csv.excel_tab if suffix == ".tsv" else csv.Sniffer().sniff(sample or "a,b\n1,2\n") reader = csv.DictReader(wrapper, dialect=dialect) for row in reader: yield row return except UnicodeDecodeError: binary_handle.seek(0) continue except csv.Error: binary_handle.seek(0) wrapper = TextIOWrapper(binary_handle, encoding=encoding, newline="") reader = csv.DictReader(wrapper) for row in reader: yield row return def _iter_land_price_rows(directory) -> Iterable[tuple[str, dict[str, Any]]]: for path in sorted(directory.glob("*")): suffix = path.suffix.lower() if suffix in {".csv", ".tsv"}: with path.open("rb") as handle: for row in _open_tabular_text(handle, suffix): yield path.name, row continue if suffix != ".zip": continue with zipfile.ZipFile(path) as archive: for member_name in archive.namelist(): member_suffix = member_name.lower().rsplit(".", 1)[-1] if "." in member_name else "" if member_suffix not in {"csv", "tsv"}: continue with archive.open(member_name) as handle: for row in _open_tabular_text(handle, f".{member_suffix}"): yield f"{path.name}:{member_name}", row def _read_land_price_from_files(pnu: str | None) -> dict[str, Any] | None: if not pnu: return None settings = get_settings() directory = settings.data_dir / LAND_PRICE_DIR if not directory.exists(): return None key_aliases = {_normalize_header(alias) for alias in _LAND_PRICE_KEY_ALIASES} value_aliases = {_normalize_header(alias) for alias in _LAND_PRICE_VALUE_ALIASES} for source_name, row in _iter_land_price_rows(directory): normalized = { _normalize_header(str(key)): value for key, value in row.items() if key } row_pnu = next((value for key, value in normalized.items() if key in key_aliases), None) if str(row_pnu).strip() != pnu: continue price = next((value for key, value in normalized.items() if key in value_aliases), None) if not price: continue return { "individual_m2_won": int(float(str(price).replace(",", ""))), "source": source_name, } return None def _fetch_address_to_pnu(address: str, api_key: str) -> dict[str, Any]: params = { "service": "address", "request": "getcoord", "version": "2.0", "crs": "epsg:4326", "refine": "true", "simple": "false", "format": "json", "type": "PARCEL", "address": address, "key": api_key, } response = httpx.get(VWORLD_ADDRESS_URL, params=params, timeout=20) response.raise_for_status() return extract_address_result(response.json()) def _fetch_vworld_properties(layer: str, pnu: str, api_key: str) -> dict[str, Any]: params = { "service": "data", "version": "2.0", "request": "GetFeature", "format": "json", "errorformat": "json", "data": layer, "attrFilter": f"pnu:=:{pnu}", "geometry": "false", "size": 1, "page": 1, "key": api_key, } response = httpx.get(VWORLD_DATA_URL, params=params, timeout=20) response.raise_for_status() return extract_feature_properties(response.json()) def _flatten_scalar_values(value: Any, bucket: dict[str, str]) -> None: if isinstance(value, dict): for key, nested in value.items(): _flatten_scalar_values(nested, bucket) if not isinstance(nested, (dict, list)) and nested not in (None, ""): bucket[str(key).lower()] = str(nested) return if isinstance(value, list): for nested in value: _flatten_scalar_values(nested, bucket) def _flatten_xml_values(text: str) -> dict[str, str]: root = ET.fromstring(text) bucket: dict[str, str] = {} for element in root.iter(): tag = element.tag.split("}")[-1].lower() value = (element.text or "").strip() if value: bucket[tag] = value return bucket def _pick_first(flattened: dict[str, str], aliases: tuple[str, ...]) -> str | None: for alias in aliases: if alias.lower() in flattened: return flattened[alias.lower()] return None def _strip_tags(text: str) -> str: cleaned = re.sub(r"<[^>]+>", " ", text) return re.sub(r"\s+", " ", unescape(cleaned)).strip() def extract_land_use_html_properties(html_text: str) -> dict[str, Any]: rows = re.findall(r"