Files
2026-04-03 09:08:08 +09:00

459 lines
16 KiB
Python

from __future__ import annotations
import csv
from html import unescape
from io import TextIOWrapper
import json
import re
from typing import Any, Iterable
from xml.etree import ElementTree as ET
import zipfile
import httpx
from civilplan_mcp.config import get_settings
from civilplan_mcp.models import ProjectDomain
from civilplan_mcp.tools._base import wrap_response
VWORLD_ADDRESS_URL = "https://api.vworld.kr/req/address"
VWORLD_DATA_URL = "https://api.vworld.kr/req/data"
VWORLD_WFS_URL = "https://api.vworld.kr/req/wfs"
EUM_LAND_USE_URL = "https://www.eum.go.kr/eum/plan/info/getLandUseInfo"
EUM_LAND_USE_GY_AJAX_URL = "https://www.eum.go.kr/web/ar/lu/luLandDetUseGYAjax.jsp"
CADASTRAL_LAYER = "LP_PA_CBND_BUBUN"
LAND_USE_LAYER = "LT_C_LHBLPN"
LAND_PRICE_DIR = "land_prices"
_LAND_USE_ALIASES = {
"use_district": (
"usedistrict",
"usedistrictnm",
"usedistrictname",
"usedistrictnm1",
"useDistrict",
"zonenm",
"zonename",
"landuse",
),
"district_2": (
"district2",
"usedistrict2",
"usedistrict2nm",
"usedistrict2name",
"usezone",
"usezonename",
),
"bcr": ("bcr", "bcrrate", "buildingcoverage"),
"far": ("far", "farrate", "floorarearatio"),
"height_limit_m": ("height", "heightlimit", "heightlimitm"),
}
_LAND_PRICE_KEY_ALIASES = {"pnu", "필지고유번호", "법정동코드"}
_LAND_PRICE_VALUE_ALIASES = {"공시지가", "landprice", "지가"}
def extract_address_result(payload: dict[str, Any]) -> dict[str, Any]:
response = payload.get("response", {})
if response.get("status") != "OK":
error = response.get("error", {})
raise ValueError(error.get("text", "address lookup failed"))
result = response.get("result", {})
point = result.get("point", {}) or {}
items = result.get("items", []) or []
first = items[0] if items else {}
parcel = first.get("address", {}).get("parcel")
pnu = first.get("id") or first.get("pnu")
return {
"pnu": pnu,
"parcel_address": parcel,
"x": float(point["x"]) if point.get("x") else None,
"y": float(point["y"]) if point.get("y") else None,
}
def extract_feature_properties(payload: dict[str, Any]) -> dict[str, Any]:
if payload.get("type") == "FeatureCollection":
features = payload.get("features", [])
if not features:
return {}
return features[0].get("properties", {}) or {}
response = payload.get("response", {})
if response.get("status") != "OK":
error = response.get("error", {})
raise ValueError(error.get("text", "feature lookup failed"))
features = response.get("result", {}).get("featureCollection", {}).get("features", [])
if not features:
return {}
return features[0].get("properties", {}) or {}
def build_land_use_bbox_params(x: float, y: float, api_key: str, buffer_deg: float = 0.0005) -> dict[str, Any]:
return {
"SERVICE": "WFS",
"VERSION": "2.0.0",
"REQUEST": "GetFeature",
"TYPENAME": "lt_c_lhblpn",
"BBOX": f"{x-buffer_deg:.4f},{y-buffer_deg:.4f},{x+buffer_deg:.4f},{y+buffer_deg:.4f},EPSG:4326",
"SRSNAME": "EPSG:4326",
"KEY": api_key,
"OUTPUTFORMAT": "application/json",
}
def _normalize_header(value: str) -> str:
return re.sub(r"[^0-9a-z가-힣]+", "", value.lower())
def _open_tabular_text(binary_handle, suffix: str) -> Iterable[dict[str, Any]]:
for encoding in ("utf-8-sig", "cp949", "euc-kr"):
try:
wrapper = TextIOWrapper(binary_handle, encoding=encoding, newline="")
sample = wrapper.read(2048)
wrapper.seek(0)
dialect = csv.excel_tab if suffix == ".tsv" else csv.Sniffer().sniff(sample or "a,b\n1,2\n")
reader = csv.DictReader(wrapper, dialect=dialect)
for row in reader:
yield row
return
except UnicodeDecodeError:
binary_handle.seek(0)
continue
except csv.Error:
binary_handle.seek(0)
wrapper = TextIOWrapper(binary_handle, encoding=encoding, newline="")
reader = csv.DictReader(wrapper)
for row in reader:
yield row
return
def _iter_land_price_rows(directory) -> Iterable[tuple[str, dict[str, Any]]]:
for path in sorted(directory.glob("*")):
suffix = path.suffix.lower()
if suffix in {".csv", ".tsv"}:
with path.open("rb") as handle:
for row in _open_tabular_text(handle, suffix):
yield path.name, row
continue
if suffix != ".zip":
continue
with zipfile.ZipFile(path) as archive:
for member_name in archive.namelist():
member_suffix = member_name.lower().rsplit(".", 1)[-1] if "." in member_name else ""
if member_suffix not in {"csv", "tsv"}:
continue
with archive.open(member_name) as handle:
for row in _open_tabular_text(handle, f".{member_suffix}"):
yield f"{path.name}:{member_name}", row
def _read_land_price_from_files(pnu: str | None) -> dict[str, Any] | None:
if not pnu:
return None
settings = get_settings()
directory = settings.data_dir / LAND_PRICE_DIR
if not directory.exists():
return None
key_aliases = {_normalize_header(alias) for alias in _LAND_PRICE_KEY_ALIASES}
value_aliases = {_normalize_header(alias) for alias in _LAND_PRICE_VALUE_ALIASES}
for source_name, row in _iter_land_price_rows(directory):
normalized = {
_normalize_header(str(key)): value for key, value in row.items() if key
}
row_pnu = next((value for key, value in normalized.items() if key in key_aliases), None)
if str(row_pnu).strip() != pnu:
continue
price = next((value for key, value in normalized.items() if key in value_aliases), None)
if not price:
continue
return {
"individual_m2_won": int(float(str(price).replace(",", ""))),
"source": source_name,
}
return None
def _fetch_address_to_pnu(address: str, api_key: str) -> dict[str, Any]:
params = {
"service": "address",
"request": "getcoord",
"version": "2.0",
"crs": "epsg:4326",
"refine": "true",
"simple": "false",
"format": "json",
"type": "PARCEL",
"address": address,
"key": api_key,
}
response = httpx.get(VWORLD_ADDRESS_URL, params=params, timeout=20)
response.raise_for_status()
return extract_address_result(response.json())
def _fetch_vworld_properties(layer: str, pnu: str, api_key: str) -> dict[str, Any]:
params = {
"service": "data",
"version": "2.0",
"request": "GetFeature",
"format": "json",
"errorformat": "json",
"data": layer,
"attrFilter": f"pnu:=:{pnu}",
"geometry": "false",
"size": 1,
"page": 1,
"key": api_key,
}
response = httpx.get(VWORLD_DATA_URL, params=params, timeout=20)
response.raise_for_status()
return extract_feature_properties(response.json())
def _flatten_scalar_values(value: Any, bucket: dict[str, str]) -> None:
if isinstance(value, dict):
for key, nested in value.items():
_flatten_scalar_values(nested, bucket)
if not isinstance(nested, (dict, list)) and nested not in (None, ""):
bucket[str(key).lower()] = str(nested)
return
if isinstance(value, list):
for nested in value:
_flatten_scalar_values(nested, bucket)
def _flatten_xml_values(text: str) -> dict[str, str]:
root = ET.fromstring(text)
bucket: dict[str, str] = {}
for element in root.iter():
tag = element.tag.split("}")[-1].lower()
value = (element.text or "").strip()
if value:
bucket[tag] = value
return bucket
def _pick_first(flattened: dict[str, str], aliases: tuple[str, ...]) -> str | None:
for alias in aliases:
if alias.lower() in flattened:
return flattened[alias.lower()]
return None
def _strip_tags(text: str) -> str:
cleaned = re.sub(r"<[^>]+>", " ", text)
return re.sub(r"\s+", " ", unescape(cleaned)).strip()
def extract_land_use_html_properties(html_text: str) -> dict[str, Any]:
rows = re.findall(r"<tr[^>]*>(.*?)</tr>", html_text, flags=re.IGNORECASE | re.DOTALL)
data_rows: list[list[str]] = []
for row_html in rows:
columns = re.findall(r"<t[dh][^>]*>(.*?)</t[dh]>", row_html, flags=re.IGNORECASE | re.DOTALL)
texts = [_strip_tags(column) for column in columns]
if len(texts) != 3:
continue
if not texts[0] or "조회된 데이터가 없습니다" in texts[0]:
continue
if not re.search(r"\d", texts[1]) or not re.search(r"\d", texts[2]):
continue
data_rows.append(texts)
result: dict[str, Any] = {}
if data_rows:
result["usedistrictnm"] = data_rows[0][0]
result["bcr"] = re.search(r"\d+(?:\.\d+)?", data_rows[0][1]).group(0) if re.search(r"\d+(?:\.\d+)?", data_rows[0][1]) else None
result["far"] = re.search(r"\d+(?:\.\d+)?", data_rows[0][2]).group(0) if re.search(r"\d+(?:\.\d+)?", data_rows[0][2]) else None
if len(data_rows) > 1:
result["usedistrict2nm"] = data_rows[1][0]
popup_g = re.search(r'id="PopupG_pop[^"]*"[^>]*>(.*?)</td>', html_text, flags=re.IGNORECASE | re.DOTALL)
popup_y = re.search(r'id="PopupY_pop[^"]*"[^>]*>(.*?)</td>', html_text, flags=re.IGNORECASE | re.DOTALL)
if "bcr" not in result and popup_g:
match = re.search(r"\d+(?:\.\d+)?", _strip_tags(popup_g.group(1)))
if match:
result["bcr"] = match.group(0)
if "far" not in result and popup_y:
match = re.search(r"\d+(?:\.\d+)?", _strip_tags(popup_y.group(1)))
if match:
result["far"] = match.group(0)
return result
def extract_land_use_properties(response: httpx.Response) -> dict[str, Any]:
flattened: dict[str, str] = {}
body = response.text.strip()
content_type = (response.headers.get("content-type") or "").lower()
if not body:
return {}
if "json" in content_type:
_flatten_scalar_values(response.json(), flattened)
else:
try:
_flatten_scalar_values(response.json(), flattened)
except (json.JSONDecodeError, ValueError):
if body.startswith("<") and ("<html" in body.lower() or "<div" in body.lower()):
return extract_land_use_html_properties(body)
if body.startswith("<"):
flattened = _flatten_xml_values(body)
else:
return {}
return {
"usedistrictnm": _pick_first(flattened, _LAND_USE_ALIASES["use_district"]),
"usedistrict2nm": _pick_first(flattened, _LAND_USE_ALIASES["district_2"]),
"bcr": _pick_first(flattened, _LAND_USE_ALIASES["bcr"]),
"far": _pick_first(flattened, _LAND_USE_ALIASES["far"]),
"height": _pick_first(flattened, _LAND_USE_ALIASES["height_limit_m"]),
}
def _has_land_use_values(payload: dict[str, Any]) -> bool:
return any(payload.get(key) for key in ("usedistrictnm", "usedistrict2nm", "bcr", "far", "height"))
def _fetch_land_use_properties_by_pnu(pnu: str, api_key: str) -> dict[str, Any]:
errors: list[str] = []
if api_key:
try:
response = httpx.get(
EUM_LAND_USE_URL,
params={"pnu": pnu, "serviceKey": api_key, "format": "json", "_type": "json"},
headers={"Accept": "application/json, application/xml;q=0.9, text/xml;q=0.8"},
timeout=20,
)
response.raise_for_status()
parsed = extract_land_use_properties(response)
if _has_land_use_values(parsed):
return parsed
errors.append("official EUM REST returned no usable zoning fields")
except Exception as exc:
errors.append(f"official EUM REST failed: {exc}")
try:
response = httpx.get(
EUM_LAND_USE_GY_AJAX_URL,
params={"pnu": pnu, "sggcd": pnu[:5], "carGbn": "GY", "ucodes": ""},
timeout=20,
)
response.raise_for_status()
parsed = extract_land_use_properties(response)
if _has_land_use_values(parsed):
return parsed
errors.append("EUM HTML fallback returned no usable zoning fields")
except Exception as exc:
errors.append(f"EUM HTML fallback failed: {exc}")
raise ValueError("; ".join(errors))
def _to_number(value: Any) -> float | None:
if value in (None, ""):
return None
match = re.search(r"-?\d+(?:\.\d+)?", str(value).replace(",", ""))
if not match:
return None
return float(match.group(0))
def query_land_info(*, address: str | None, pnu: str | None) -> dict:
settings = get_settings()
if not settings.vworld_api_key:
return wrap_response(
{
"status": "disabled",
"message": "VWORLD_API_KEY is missing. Land info lookup is disabled but the server remains available.",
"required_keys": ["VWORLD_API_KEY"],
"address": address,
"pnu": pnu,
},
ProjectDomain.복합,
)
try:
resolved = {"pnu": pnu, "parcel_address": address, "x": None, "y": None}
if address and not pnu:
resolved = _fetch_address_to_pnu(address, settings.vworld_api_key)
pnu = resolved["pnu"]
cadastral = _fetch_vworld_properties(CADASTRAL_LAYER, pnu, settings.vworld_api_key) if pnu else {}
land_use: dict[str, Any] = {}
warnings: list[str] = []
if pnu:
try:
land_use = _fetch_land_use_properties_by_pnu(pnu, settings.data_go_kr_api_key)
except Exception as exc:
warnings.append(
f"Land use zoning could not be fetched from EUM. {exc}. Verify the parcel on eum.go.kr before filing."
)
land_price = _read_land_price_from_files(pnu)
if land_price is None:
warnings.append("Individual land price CSV has not been loaded into data/land_prices yet.")
if not land_use:
warnings.append("Land use zoning data is currently unavailable.")
area = _to_number(cadastral.get("area") or cadastral.get("a2")) or 0
bcr = _to_number(land_use.get("bcr"))
far = _to_number(land_use.get("far"))
return wrap_response(
{
"status": "success",
"address": resolved.get("parcel_address") or address,
"pnu": pnu,
"land": {
"area_m2": area or None,
"jibmok": cadastral.get("jimok") or cadastral.get("jibmok"),
"ownership": cadastral.get("ownership"),
},
"zoning": {
"use_district": land_use.get("usedistrictnm"),
"district_2": land_use.get("usedistrict2nm"),
"bcr_pct": int(bcr) if bcr is not None else None,
"far_pct": int(far) if far is not None else None,
"height_limit_m": _to_number(land_use.get("height")),
},
"land_price": {
"individual_m2_won": land_price["individual_m2_won"] if land_price else None,
"total_won": round(land_price["individual_m2_won"] * area) if land_price and area else None,
"base_year": None,
"source": land_price["source"] if land_price else None,
},
"buildable": {
"max_floor_area_m2": round(area * (far / 100), 2) if area and far else None,
"max_building_coverage_m2": round(area * (bcr / 100), 2) if area and bcr else None,
},
"coordinates": {"x": resolved.get("x"), "y": resolved.get("y")},
"warnings": warnings,
},
ProjectDomain.복합,
)
except Exception as exc:
return wrap_response(
{
"status": "error",
"message": str(exc),
"address": address,
"pnu": pnu,
"note": "Check the VWorld key, address input, or local land price files.",
},
ProjectDomain.복합,
)