feat: Phase 3 구현 — 완전 자동화, 준공도서, Vision L3, 발주처 포털

EVMS 완전 자동화:
- 공기 지연 AI 예측 (SPI 기반 준공일 예측)
- 기성청구 가능 금액 자동 산출
- 매일 자정 EVMS 스냅샷 자동 생성 (APScheduler)
- 매일 07:00 GONGSA 아침 브리핑 자동 생성

준공도서 패키지:
- 준공 요약 + 품질시험 목록 + 검측 이력 + 인허가 현황 → ZIP 번들
- 준공 준비 체크리스트 API
- 4종 HTML 템플릿 (WeasyPrint PDF 출력)

Vision AI Level 3:
- 설계 도면 vs 현장 사진 비교 보조 판독 (Claude Vision)
- 철근 배근, 거푸집 치수 1차 분석

설계도서 파싱:
- PDF 이미지/텍스트에서 공종·수량·규격 자동 추출
- Pandoc HWP 출력 지원

발주처 전용 포털:
- 토큰 기반 읽기 전용 API
- 공사 현황 대시보드, 공정률 추이 차트

에이전트 협업 고도화:
- 협업 시나리오 (concrete_pour, excavation, weekly_report)
- GONGSA→PUMJIL→ANJEON 순차 처리

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sinmb79
2026-03-24 22:02:29 +09:00
parent 48f1027f08
commit 5a044a3882
17 changed files with 1350 additions and 8 deletions
@@ -0,0 +1,113 @@
"""
에이전트 협업 고도화 — Phase 3
복수 에이전트가 순차적으로 하나의 시나리오를 처리합니다.
예시: 콘크리트 타설 당일 시나리오
07:00 GONGSA → 공정 브리핑 + 날씨 체크
07:05 PUMJIL → 타설 전 품질 체크리스트
07:10 ANJEON → TBM 자료 + 안전 체크
"""
from datetime import date
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.agent import AgentConversation, AgentMessage, AgentType
from .gongsa import gongsa_agent
from .pumjil import pumjil_agent
from .anjeon import anjeon_agent
from .gumu import gumu_agent
from .base import BaseAgent
SCENARIO_AGENTS = {
"concrete_pour": [
(AgentType.GONGSA, "콘크리트 타설 예정입니다. 날씨와 공정 현황을 브리핑해주세요."),
(AgentType.PUMJIL, "오늘 콘크리트 타설 전 품질 체크리스트를 발송해주세요."),
(AgentType.ANJEON, "콘크리트 타설 작업 TBM 자료를 작성해주세요."),
],
"excavation": [
(AgentType.GONGSA, "굴착 작업 예정입니다. 공정 현황을 브리핑해주세요."),
(AgentType.ANJEON, "굴착 작업 안전 사전 경보와 TBM 자료를 작성해주세요."),
(AgentType.PUMJIL, "굴착 작업 품질 관리 체크리스트를 발송해주세요."),
],
"weekly_report": [
(AgentType.GONGSA, "이번 주 공정 현황을 요약해주세요."),
(AgentType.PUMJIL, "이번 주 품질시험 및 검측 현황을 요약해주세요."),
(AgentType.GUMU, "이번 주 행정 처리 현황과 다음 주 예정 사항을 정리해주세요."),
],
}
async def run_scenario(
db: AsyncSession,
project_id,
user_id,
scenario: str,
) -> list[dict]:
"""
협업 시나리오 실행.
반환: [{"agent": "gongsa", "content": "...", "message_id": "..."}, ...]
"""
import uuid
pid = uuid.UUID(str(project_id))
steps = SCENARIO_AGENTS.get(scenario)
if not steps:
raise ValueError(f"알 수 없는 시나리오: {scenario}. 가능한 값: {list(SCENARIO_AGENTS.keys())}")
results = []
agent_map: dict[AgentType, BaseAgent] = {
AgentType.GONGSA: gongsa_agent,
AgentType.PUMJIL: pumjil_agent,
AgentType.ANJEON: anjeon_agent,
AgentType.GUMU: gumu_agent,
}
# 이전 에이전트 응답을 컨텍스트에 누적
prev_responses: list[str] = []
for agent_type, prompt_base in steps:
agent = agent_map[agent_type]
context = await agent.build_context(db, str(pid))
# 이전 에이전트 응답을 프롬프트에 추가
prompt = prompt_base
if prev_responses:
prompt += "\n\n이전 에이전트 응답 요약:\n" + "\n---\n".join(prev_responses[-2:])
# 대화 세션 생성
conv = AgentConversation(
project_id=pid,
user_id=uuid.UUID(str(user_id)),
agent_type=agent_type,
title=f"{scenario} 협업 시나리오 ({date.today()})",
)
db.add(conv)
await db.flush()
reply = await agent.chat(
messages=[{"role": "user", "content": prompt}],
context=context,
)
msg = AgentMessage(
conversation_id=conv.id,
role="assistant",
content=reply,
is_proactive=True,
metadata={"scenario": scenario, "step": agent_type.value},
)
db.add(msg)
await db.flush()
prev_responses.append(f"[{agent_type.value.upper()}] {reply[:300]}")
results.append({
"agent": agent_type.value,
"agent_name": agent.name_ko,
"content": reply,
"conversation_id": str(conv.id),
"message_id": str(msg.id),
})
await db.commit()
return results
+142
View File
@@ -0,0 +1,142 @@
"""
준공도서 패키지 자동화 서비스
작업일보, 품질시험, 검측이력, 사진대장, 인허가 현황을 종합하여
준공도서 PDF 번들을 생성합니다.
"""
import io
import zipfile
from datetime import date
from pathlib import Path
from jinja2 import Environment, FileSystemLoader
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.project import Project
from app.models.daily_report import DailyReport
from app.models.quality import QualityTest, QualityResult
from app.models.inspection import InspectionRequest, InspectionStatus
from app.models.permit import PermitItem, PermitStatus
TEMPLATES_DIR = Path(__file__).parent.parent / "templates"
_jinja = Environment(loader=FileSystemLoader(str(TEMPLATES_DIR)), autoescape=True)
def _html_to_pdf(html: str) -> bytes:
try:
from weasyprint import HTML
return HTML(string=html).write_pdf()
except ImportError:
raise RuntimeError("WeasyPrint가 설치되지 않았습니다. `pip install weasyprint`")
async def build_completion_package(
db: AsyncSession,
project_id,
) -> tuple[bytes, str]:
"""
준공도서 ZIP 패키지 생성.
반환: (zip_bytes, filename)
"""
import uuid
pid = uuid.UUID(str(project_id))
today = date.today()
# 프로젝트 정보
proj_r = await db.execute(select(Project).where(Project.id == pid))
project = proj_r.scalar_one_or_none()
if not project:
raise ValueError("프로젝트를 찾을 수 없습니다")
# 데이터 수집
dr_r = await db.execute(
select(DailyReport).where(DailyReport.project_id == pid).order_by(DailyReport.report_date)
)
daily_reports = dr_r.scalars().all()
qt_r = await db.execute(
select(QualityTest).where(QualityTest.project_id == pid).order_by(QualityTest.test_date)
)
quality_tests = qt_r.scalars().all()
insp_r = await db.execute(
select(InspectionRequest).where(InspectionRequest.project_id == pid).order_by(InspectionRequest.requested_date)
)
inspections = insp_r.scalars().all()
permit_r = await db.execute(
select(PermitItem).where(PermitItem.project_id == pid).order_by(PermitItem.sort_order)
)
permits = permit_r.scalars().all()
# 통계
total_dr = len(daily_reports)
total_qt = len(quality_tests)
pass_qt = sum(1 for q in quality_tests if q.result == QualityResult.PASS)
total_insp = len(inspections)
completed_insp = sum(1 for i in inspections if i.status == InspectionStatus.COMPLETED)
total_permits = len(permits)
approved_permits = sum(1 for p in permits if p.status == PermitStatus.APPROVED)
# 1. 준공 요약 PDF
summary_html = _jinja.get_template("completion_summary.html").render(
project=project,
today=today,
total_dr=total_dr,
total_qt=total_qt,
pass_qt=pass_qt,
pass_rate=round(pass_qt / total_qt * 100, 1) if total_qt else 0,
total_insp=total_insp,
completed_insp=completed_insp,
total_permits=total_permits,
approved_permits=approved_permits,
daily_reports=daily_reports[:5], # 최근 5개 미리보기
quality_tests=quality_tests[-10:], # 마지막 10개 미리보기
)
summary_pdf = _html_to_pdf(summary_html)
# 2. 품질시험 목록 PDF
qt_html = _jinja.get_template("quality_list.html").render(
project=project,
quality_tests=quality_tests,
today=today,
)
qt_pdf = _html_to_pdf(qt_html)
# 3. 검측 이력 PDF
insp_html = _jinja.get_template("inspection_list.html").render(
project=project,
inspections=inspections,
today=today,
)
insp_pdf = _html_to_pdf(insp_html)
# 4. 인허가 현황 PDF
permit_html = _jinja.get_template("permit_list.html").render(
project=project,
permits=permits,
today=today,
)
permit_pdf = _html_to_pdf(permit_html)
# ZIP 번들 생성
zip_buffer = io.BytesIO()
project_code = getattr(project, "code", str(pid)[:8])
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
zf.writestr(f"01_준공요약_{project_code}.pdf", summary_pdf)
zf.writestr(f"02_품질시험목록_{project_code}.pdf", qt_pdf)
zf.writestr(f"03_검측이력_{project_code}.pdf", insp_pdf)
zf.writestr(f"04_인허가현황_{project_code}.pdf", permit_pdf)
zf.writestr("README.txt", (
f"준공도서 패키지\n"
f"프로젝트: {project.name}\n"
f"생성일: {today}\n\n"
f"포함 문서:\n"
f" 01_준공요약 — 전체 공사 실적 요약\n"
f" 02_품질시험목록 — 전체 품질시험 기록 ({total_qt}건)\n"
f" 03_검측이력 — 검측 요청·완료 이력 ({total_insp}건)\n"
f" 04_인허가현황 — 인허가 취득 현황 ({total_permits}건)\n"
).encode("utf-8"))
zip_buffer.seek(0)
filename = f"completion_{project_code}_{today}.zip"
return zip_buffer.read(), filename
+133
View File
@@ -0,0 +1,133 @@
"""
설계도서 파싱 서비스 (Phase 3)
PDF 설계도서에서 공종·수량·규격을 AI로 자동 추출합니다.
HWP 출력은 Pandoc을 통해 변환합니다.
"""
import base64
import json
import re
import subprocess
import tempfile
import os
from pathlib import Path
from anthropic import AsyncAnthropic
from app.config import settings
_client = AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY)
_PARSE_SYSTEM = """당신은 건설 설계도서 분석 전문가입니다.
제공된 설계도서 내용에서 다음 정보를 추출하세요:
1. 공종 목록 (work_types): 주요 공종과 세부 공종
2. 수량 목록 (quantities): 각 공종별 수량과 단위
3. 규격 (specifications): 재료 규격, 강도, 등급
4. 특기 사항 (notes): 시공 시 주의사항, 특수 조건
JSON 형식으로만 반환하세요."""
async def parse_design_document_text(text: str) -> dict:
"""
설계도서 텍스트에서 공종/수량/규격 추출 (Claude API).
RAG 시드 스크립트로 읽은 텍스트를 직접 전달하는 용도.
"""
prompt = f"""다음 설계도서 내용을 분석해주세요:
{text[:8000]}
JSON 형식으로만 반환하세요:
{{
"work_types": ["터파기", "철근콘크리트", ...],
"quantities": [
{{"work_type": "터파기", "quantity": 500, "unit": ""}},
...
],
"specifications": [
{{"item": "콘크리트", "spec": "fck=24MPa", "notes": ""}},
...
],
"notes": ["주의사항1", ...]
}}"""
response = await _client.messages.create(
model=settings.CLAUDE_MODEL,
max_tokens=2048,
system=_PARSE_SYSTEM,
messages=[{"role": "user", "content": prompt}],
)
raw = response.content[0].text.strip()
match = re.search(r"\{.*\}", raw, re.DOTALL)
if match:
return json.loads(match.group())
return {"error": "파싱 실패", "raw": raw[:500]}
async def parse_design_document_image(image_data: bytes, media_type: str = "image/jpeg") -> dict:
"""
설계 도면 이미지에서 공종/수량/규격 추출 (Claude Vision).
도면 스캔 이미지나 PDF 페이지를 직접 분석합니다.
"""
image_b64 = base64.standard_b64encode(image_data).decode()
response = await _client.messages.create(
model=settings.CLAUDE_MODEL,
max_tokens=2048,
system=_PARSE_SYSTEM,
messages=[{
"role": "user",
"content": [
{"type": "image", "source": {"type": "base64", "media_type": media_type, "data": image_b64}},
{"type": "text", "text": "이 설계 도면/도서에서 공종, 수량, 규격을 추출해주세요. JSON으로만 반환하세요."},
],
}],
)
raw = response.content[0].text.strip()
match = re.search(r"\{.*\}", raw, re.DOTALL)
if match:
return json.loads(match.group())
return {"error": "파싱 실패", "raw": raw[:500]}
def convert_to_hwp(html_content: str, output_path: str | None = None) -> bytes | str:
"""
HTML → HWP 변환 (Pandoc 필요).
output_path 미지정 시 바이트 반환.
사전 요구사항: pandoc 설치 필요
설치: https://pandoc.org/installing.html
"""
try:
with tempfile.NamedTemporaryFile(suffix=".html", delete=False, mode="w", encoding="utf-8") as f:
f.write(html_content)
html_path = f.name
if output_path:
out = output_path
else:
out = html_path.replace(".html", ".hwp")
result = subprocess.run(
["pandoc", html_path, "-o", out, "--from=html"],
capture_output=True, text=True, timeout=30,
)
os.unlink(html_path)
if result.returncode != 0:
raise RuntimeError(f"Pandoc 변환 실패: {result.stderr}")
if output_path:
return output_path
else:
with open(out, "rb") as f:
data = f.read()
os.unlink(out)
return data
except FileNotFoundError:
raise RuntimeError(
"Pandoc이 설치되지 않았습니다.\n"
"설치: https://pandoc.org/installing.html\n"
"Windows: winget install JohnMacFarlane.Pandoc"
)
+59 -3
View File
@@ -1,8 +1,8 @@
"""
EVMS (Earned Value Management System) 계산 서비스
PV, EV, AC, SPI, CPI 산출
EVMS (Earned Value Management System) — Phase 3 완전 자동화
PV, EV, AC, SPI, CPI 산출 + 공정 지연 예측 AI + 기성청구 자동 알림
"""
from datetime import date
from datetime import date, timedelta
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
@@ -10,6 +10,62 @@ from app.models.task import Task
from app.models.project import Project
async def predict_delay(
db: AsyncSession,
project_id,
spi: float,
planned_end: date | None,
snapshot_date: date,
) -> dict:
"""
SPI 기반 공기 지연 예측.
spi < 1 이면 지연, 남은 기간을 SPI로 나눠 예상 준공일 계산.
"""
if not planned_end or spi is None or spi <= 0:
return {"delay_days": None, "predicted_end": None, "status": "예측 불가"}
remaining_days = (planned_end - snapshot_date).days
if remaining_days <= 0:
return {"delay_days": 0, "predicted_end": str(planned_end), "status": "준공 예정일 경과"}
predicted_remaining = remaining_days / spi
predicted_end = snapshot_date + timedelta(days=int(predicted_remaining))
delay_days = (predicted_end - planned_end).days
if delay_days > 0:
status = f"{delay_days}일 지연 예상"
elif delay_days < -3:
status = f"{abs(delay_days)}일 조기 준공 예상"
else:
status = "정상 진행"
return {
"delay_days": delay_days,
"predicted_end": str(predicted_end),
"status": status,
}
async def compute_progress_claim(
total_budget: float,
actual_progress: float,
already_claimed_pct: float = 0.0,
) -> dict:
"""
기성청구 가능 금액 산출.
기성청구 가능 금액 = 총예산 × (실제 공정률 - 기청구 공정률)
"""
claimable_pct = max(0.0, actual_progress - already_claimed_pct)
claimable_amount = total_budget * (claimable_pct / 100)
return {
"actual_progress": actual_progress,
"already_claimed_pct": already_claimed_pct,
"claimable_pct": round(claimable_pct, 1),
"claimable_amount": round(claimable_amount, 0),
"claimable_amount_formatted": f"{claimable_amount:,.0f}",
}
def _clamp(v: float, lo: float, hi: float) -> float:
return max(lo, min(hi, v))
+106 -2
View File
@@ -8,6 +8,7 @@ from datetime import date, datetime
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy import select
from app.core.database import AsyncSessionLocal
@@ -115,6 +116,89 @@ async def _collect_weather_for_all_projects():
logger.info("날씨 수집 완료")
async def _daily_evms_snapshot():
"""매일 자정 활성 프로젝트 EVMS 스냅샷 자동 저장"""
from app.services.evms_service import compute_evms
from app.models.evms import EVMSSnapshot
today = date.today()
async with AsyncSessionLocal() as db:
result = await db.execute(select(Project).where(Project.status == "active"))
projects = result.scalars().all()
for project in projects:
try:
data = await compute_evms(db, project.id, today)
snap = EVMSSnapshot(
project_id=project.id,
snapshot_date=today,
total_budget=data["total_budget"],
planned_progress=data["planned_progress"],
actual_progress=data["actual_progress"],
pv=data["pv"], ev=data["ev"], ac=data["ac"],
spi=data["spi"], cpi=data["cpi"],
eac=data["eac"], etc=data["etc"],
detail_json={"tasks": data["detail"]},
)
db.add(snap)
except Exception as e:
logger.error(f"EVMS 스냅샷 실패 [{project.id}]: {e}")
await db.commit()
logger.info(f"EVMS 일일 스냅샷 완료: {len(projects)}개 프로젝트")
async def _morning_agent_briefings():
"""매일 오전 7시 GONGSA 아침 브리핑 자동 생성"""
from app.models.agent import AgentConversation, AgentMessage, AgentType
from app.models.user import User
from app.services.agents.gongsa import gongsa_agent
async with AsyncSessionLocal() as db:
result = await db.execute(select(Project).where(Project.status == "active"))
projects = result.scalars().all()
for project in projects:
try:
# 프로젝트 관리자 조회 (첫 번째 admin)
user_r = await db.execute(
select(User).where(User.role == "site_manager").limit(1)
)
user = user_r.scalar_one_or_none()
if not user:
continue
context = await gongsa_agent.build_context(db, str(project.id))
prompt = (
f"오늘({context.get('today', str(date.today()))}) 아침 공정 브리핑을 작성해주세요. "
"날씨, 오늘 예정 공종, 주의사항을 포함해주세요."
)
reply = await gongsa_agent.chat(
messages=[{"role": "user", "content": prompt}],
context=context,
)
conv = AgentConversation(
project_id=project.id,
user_id=user.id,
agent_type=AgentType.GONGSA,
title=f"{date.today()} 아침 브리핑 (자동)",
)
db.add(conv)
await db.flush()
msg = AgentMessage(
conversation_id=conv.id,
role="assistant",
content=reply,
is_proactive=True,
)
db.add(msg)
except Exception as e:
logger.error(f"아침 브리핑 실패 [{project.id}]: {e}")
await db.commit()
logger.info("아침 브리핑 자동 생성 완료")
def start_scheduler():
"""FastAPI 시작 시 스케줄러를 초기화하고 시작합니다."""
global _scheduler
@@ -127,11 +211,31 @@ def start_scheduler():
id="weather_collect",
name="날씨 데이터 자동 수집",
replace_existing=True,
misfire_grace_time=300, # 5분 내 누락 허용
misfire_grace_time=300,
)
# 매일 자정 EVMS 스냅샷
_scheduler.add_job(
_daily_evms_snapshot,
trigger=CronTrigger(hour=0, minute=5),
id="evms_daily",
name="EVMS 일일 스냅샷",
replace_existing=True,
misfire_grace_time=600,
)
# 매일 오전 7시 아침 브리핑
_scheduler.add_job(
_morning_agent_briefings,
trigger=CronTrigger(hour=7, minute=0),
id="morning_briefing",
name="GONGSA 아침 브리핑 자동 생성",
replace_existing=True,
misfire_grace_time=600,
)
_scheduler.start()
logger.info("APScheduler 시작: 날씨 수집 3시간 주기")
logger.info("APScheduler 시작: 날씨(3h), EVMS 스냅샷(00:05), 아침 브리핑(07:00)")
return _scheduler
+57
View File
@@ -114,6 +114,63 @@ async def classify_photo(
return result
async def compare_with_drawing(
field_photo: bytes,
drawing_image: bytes,
comparison_type: str = "rebar",
field_media_type: str = "image/jpeg",
drawing_media_type: str = "image/jpeg",
) -> dict:
"""
Vision AI Level 3 — 설계 도면 vs 현장 사진 비교 (고난도 보조 판독)
comparison_type: rebar(철근배근), formwork(거푸집), general(일반)
최종 합격/불합격 판정은 현장 책임자가 합니다.
"""
field_b64 = base64.standard_b64encode(field_photo).decode()
drawing_b64 = base64.standard_b64encode(drawing_image).decode()
type_prompts = {
"rebar": "철근 배근 간격·직경·이음 방법을 도면과 비교해주세요.",
"formwork": "거푸집 치수·형태·지지 방법을 도면과 비교해주세요.",
"general": "현장 시공 상태를 도면과 전반적으로 비교해주세요.",
}
specific = type_prompts.get(comparison_type, type_prompts["general"])
prompt = f"""왼쪽은 설계 도면, 오른쪽은 현장 사진입니다.
{specific}
다음 JSON 형식으로만 응답하세요:
{{
"comparison_type": "{comparison_type}",
"conformances": ["도면과 일치하는 항목"],
"discrepancies": ["불일치 또는 확인 필요 항목"],
"risk_level": "저/중/고",
"recommendation": "현장 책임자에게 전달할 권고사항",
"confidence": 0.7,
"disclaimer": "이 결과는 AI 1차 보조 판독이며 최종 판정은 현장 책임자가 합니다."
}}"""
response = await _client.messages.create(
model=settings.CLAUDE_MODEL,
max_tokens=1024,
messages=[{
"role": "user",
"content": [
{"type": "image", "source": {"type": "base64", "media_type": drawing_media_type, "data": drawing_b64}},
{"type": "image", "source": {"type": "base64", "media_type": field_media_type, "data": field_b64}},
{"type": "text", "text": prompt},
],
}],
)
raw = response.content[0].text.strip()
import json, re
json_match = re.search(r"\{.*\}", raw, re.DOTALL)
if json_match:
return json.loads(json_match.group())
return {"error": "분석 실패", "raw": raw[:200]}
async def analyze_safety(
image_data: bytes,
media_type: str = "image/jpeg",