feat: Phase 2 구현 — AI 에이전트 4인방, EVMS, Vision AI, Geofence

AI 에이전트 (Layer 2):
- GONGSA: 공사 담당 (공정 브리핑, 공기 지연 감지, 날씨 연동 작업 조정)
- PUMJIL: 품질 담당 (시공 전 체크리스트, Vision 보조 판독, 시험 기한 추적)
- ANJEON: 안전 담당 (위험 공정 경보, TBM 생성, 중대재해처벌법 Q&A)
- GUMU: 공무 담당 (인허가 능동 추적, 기성청구 제안, 보고서 초안)
- 에이전트 라우터 (키워드 기반 자동 분배), 아침 브리핑 엔드포인트

EVMS 기본:
- PV·EV·AC·SPI·CPI 산출 (WBS/Task 기반)
- EAC·ETC 예측, 스냅샷 이력 저장

Vision AI:
- Level 1: 현장 사진 분류 (Claude Vision), 작업일보 자동 첨부
- Level 2: 안전장비(안전모/조끼) 착용 감지

Geofence 위험구역:
- 구역 CRUD (굴착면, 크레인 반경, 밀폐공간 등)
- 진입 이벤트 웹훅 (익명 — 개인 이동 경로 비수집)

인허가 자동도출:
- 공종 입력 → AI가 필요 인허가 목록 자동 도출 + 체크리스트 생성

DB 마이그레이션 (002):
- agent_conversations, agent_messages, evms_snapshots, geofence_zones

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sinmb79
2026-03-24 21:49:44 +09:00
parent 0156d8ca4f
commit 48f1027f08
19 changed files with 1639 additions and 1 deletions

View File

@@ -0,0 +1,93 @@
"""Phase 2: agents, evms_snapshots, geofence_zones
Revision ID: 002
Revises: 001
Create Date: 2026-03-24
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
revision = "002"
down_revision = "001"
branch_labels = None
depends_on = None
def upgrade():
# ── agent_conversations ────────────────────────────────────────────────
op.create_table(
"agent_conversations",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")),
sa.Column("project_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("projects.id", ondelete="CASCADE"), nullable=False),
sa.Column("user_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="CASCADE"), nullable=False),
sa.Column("agent_type", sa.Enum("gongsa", "pumjil", "anjeon", "gumu", name="agent_type"), nullable=False),
sa.Column("title", sa.String(200), nullable=True),
sa.Column("status", sa.Enum("active", "closed", name="conversation_status"), server_default="active", nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
)
op.create_index("ix_agent_conversations_project", "agent_conversations", ["project_id"])
op.create_index("ix_agent_conversations_user", "agent_conversations", ["user_id"])
# ── agent_messages ─────────────────────────────────────────────────────
op.create_table(
"agent_messages",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")),
sa.Column("conversation_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("agent_conversations.id", ondelete="CASCADE"), nullable=False),
sa.Column("role", sa.String(20), nullable=False),
sa.Column("content", sa.Text, nullable=False),
sa.Column("metadata", postgresql.JSONB, nullable=True),
sa.Column("is_proactive", sa.Boolean, server_default="false", nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
)
op.create_index("ix_agent_messages_conversation", "agent_messages", ["conversation_id"])
# ── evms_snapshots ─────────────────────────────────────────────────────
op.create_table(
"evms_snapshots",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")),
sa.Column("project_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("projects.id", ondelete="CASCADE"), nullable=False),
sa.Column("snapshot_date", sa.Date, nullable=False, index=True),
sa.Column("total_budget", sa.Float, nullable=True),
sa.Column("planned_progress", sa.Float, nullable=True),
sa.Column("actual_progress", sa.Float, nullable=True),
sa.Column("pv", sa.Float, nullable=True),
sa.Column("ev", sa.Float, nullable=True),
sa.Column("ac", sa.Float, nullable=True),
sa.Column("spi", sa.Float, nullable=True),
sa.Column("cpi", sa.Float, nullable=True),
sa.Column("eac", sa.Float, nullable=True),
sa.Column("etc", sa.Float, nullable=True),
sa.Column("notes", sa.Text, nullable=True),
sa.Column("detail_json", postgresql.JSONB, nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
)
op.create_index("ix_evms_snapshots_project_date", "evms_snapshots", ["project_id", "snapshot_date"])
# ── geofence_zones ─────────────────────────────────────────────────────
op.create_table(
"geofence_zones",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()")),
sa.Column("project_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("projects.id", ondelete="CASCADE"), nullable=False),
sa.Column("name", sa.String(100), nullable=False),
sa.Column("zone_type", sa.String(50), nullable=False),
sa.Column("coordinates", postgresql.JSONB, nullable=False),
sa.Column("radius_m", sa.Float, nullable=True),
sa.Column("is_active", sa.Boolean, server_default="true", nullable=False),
sa.Column("description", sa.Text, nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
)
op.create_index("ix_geofence_zones_project", "geofence_zones", ["project_id"])
def downgrade():
op.drop_table("geofence_zones")
op.drop_table("evms_snapshots")
op.drop_table("agent_messages")
op.drop_table("agent_conversations")
op.execute("DROP TYPE IF EXISTS agent_type")
op.execute("DROP TYPE IF EXISTS conversation_status")

276
backend/app/api/agents.py Normal file
View File

@@ -0,0 +1,276 @@
"""
AI 에이전트 API
- 대화 생성/조회/삭제
- 메시지 전송 (에이전트 응답 반환)
- 에이전트 자동 라우팅
- 프로액티브 브리핑 (아침 공정 브리핑 등)
"""
import uuid
from datetime import date
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from app.deps import CurrentUser, DB
from app.models.agent import AgentConversation, AgentMessage, AgentType, ConversationStatus
from app.models.project import Project
from app.services.agents.router import get_agent, route_by_keyword
router = APIRouter(prefix="/projects/{project_id}/agents", tags=["AI 에이전트"])
# ── Schemas ────────────────────────────────────────────────────────────────────
class ConversationCreate(BaseModel):
agent_type: AgentType
title: str | None = None
class MessageSend(BaseModel):
content: str
agent_type: AgentType | None = None # None이면 자동 라우팅
class ConversationResponse(BaseModel):
id: uuid.UUID
agent_type: AgentType
title: str | None
status: ConversationStatus
message_count: int = 0
model_config = {"from_attributes": True}
class MessageResponse(BaseModel):
id: uuid.UUID
conversation_id: uuid.UUID
role: str
content: str
is_proactive: bool
metadata: dict | None
model_config = {"from_attributes": True}
# ── Helpers ────────────────────────────────────────────────────────────────────
async def _get_project_or_404(project_id: uuid.UUID, db: DB) -> Project:
r = await db.execute(select(Project).where(Project.id == project_id))
p = r.scalar_one_or_none()
if not p:
raise HTTPException(status_code=404, detail="프로젝트를 찾을 수 없습니다")
return p
async def _get_conversation_or_404(
conv_id: uuid.UUID, project_id: uuid.UUID, db: DB
) -> AgentConversation:
r = await db.execute(
select(AgentConversation)
.where(AgentConversation.id == conv_id, AgentConversation.project_id == project_id)
.options(selectinload(AgentConversation.messages))
)
conv = r.scalar_one_or_none()
if not conv:
raise HTTPException(status_code=404, detail="대화를 찾을 수 없습니다")
return conv
# ── Endpoints ──────────────────────────────────────────────────────────────────
@router.get("", response_model=list[ConversationResponse])
async def list_conversations(project_id: uuid.UUID, db: DB, current_user: CurrentUser):
r = await db.execute(
select(AgentConversation)
.where(AgentConversation.project_id == project_id)
.options(selectinload(AgentConversation.messages))
.order_by(AgentConversation.updated_at.desc())
)
convs = r.scalars().all()
result = []
for c in convs:
d = ConversationResponse(
id=c.id,
agent_type=c.agent_type,
title=c.title,
status=c.status,
message_count=len(c.messages),
)
result.append(d)
return result
@router.post("", response_model=ConversationResponse, status_code=status.HTTP_201_CREATED)
async def create_conversation(
project_id: uuid.UUID, data: ConversationCreate, db: DB, current_user: CurrentUser
):
await _get_project_or_404(project_id, db)
conv = AgentConversation(
project_id=project_id,
user_id=current_user.id,
agent_type=data.agent_type,
title=data.title or f"{data.agent_type.value.upper()} 대화",
)
db.add(conv)
await db.commit()
await db.refresh(conv)
return ConversationResponse(
id=conv.id, agent_type=conv.agent_type, title=conv.title,
status=conv.status, message_count=0,
)
@router.get("/{conv_id}/messages", response_model=list[MessageResponse])
async def list_messages(
project_id: uuid.UUID, conv_id: uuid.UUID, db: DB, current_user: CurrentUser
):
conv = await _get_conversation_or_404(conv_id, project_id, db)
return conv.messages
@router.post("/{conv_id}/messages", response_model=MessageResponse)
async def send_message(
project_id: uuid.UUID,
conv_id: uuid.UUID,
data: MessageSend,
db: DB,
current_user: CurrentUser,
):
"""메시지 전송 → 에이전트 응답 반환"""
conv = await _get_conversation_or_404(conv_id, project_id, db)
# 사용자 메시지 저장
user_msg = AgentMessage(
conversation_id=conv.id,
role="user",
content=data.content,
)
db.add(user_msg)
await db.flush()
# 대화 히스토리 구성 (최근 20개)
history = [
{"role": m.role, "content": m.content}
for m in conv.messages[-20:]
if m.role in ("user", "assistant")
]
history.append({"role": "user", "content": data.content})
# 에이전트 선택 (대화에 지정된 에이전트 사용)
agent = get_agent(conv.agent_type)
# 컨텍스트 조회 및 응답 생성
context = await agent.build_context(db, str(project_id))
reply = await agent.chat(messages=history, context=context)
# 에이전트 응답 저장
agent_msg = AgentMessage(
conversation_id=conv.id,
role="assistant",
content=reply,
)
db.add(agent_msg)
await db.commit()
await db.refresh(agent_msg)
return agent_msg
@router.post("/chat", response_model=MessageResponse, status_code=status.HTTP_201_CREATED)
async def quick_chat(
project_id: uuid.UUID,
data: MessageSend,
db: DB,
current_user: CurrentUser,
):
"""
새 대화 없이 바로 메시지 전송 (에이전트 자동 라우팅).
자동으로 대화 세션을 생성하고 첫 응답을 반환합니다.
"""
await _get_project_or_404(project_id, db)
# 에이전트 자동 라우팅
agent_type = data.agent_type or route_by_keyword(data.content)
agent = get_agent(agent_type)
# 새 대화 생성
conv = AgentConversation(
project_id=project_id,
user_id=current_user.id,
agent_type=agent_type,
title=data.content[:50],
)
db.add(conv)
await db.flush()
# 사용자 메시지 저장
user_msg = AgentMessage(
conversation_id=conv.id, role="user", content=data.content
)
db.add(user_msg)
await db.flush()
# 응답 생성
context = await agent.build_context(db, str(project_id))
reply = await agent.chat(
messages=[{"role": "user", "content": data.content}],
context=context,
)
agent_msg = AgentMessage(
conversation_id=conv.id, role="assistant", content=reply
)
db.add(agent_msg)
await db.commit()
await db.refresh(agent_msg)
return agent_msg
@router.post("/briefing", response_model=MessageResponse, status_code=status.HTTP_201_CREATED)
async def morning_briefing(
project_id: uuid.UUID,
db: DB,
current_user: CurrentUser,
):
"""
GONGSA 아침 공정 브리핑 생성 (오전 7시 자동 호출 또는 수동 호출).
오늘 날씨 + 예정 공종 + 전일 실적 기반으로 브리핑을 생성합니다.
"""
await _get_project_or_404(project_id, db)
from app.services.agents.gongsa import gongsa_agent
context = await gongsa_agent.build_context(db, str(project_id))
prompt = (
f"오늘({context.get('today', date.today())}) 아침 공정 브리핑을 작성해주세요. "
"날씨, 오늘 예정 공종, 주의사항을 포함해주세요."
)
conv = AgentConversation(
project_id=project_id,
user_id=current_user.id,
agent_type=AgentType.GONGSA,
title=f"{context.get('today', '')} 아침 브리핑",
)
db.add(conv)
await db.flush()
reply = await gongsa_agent.chat(
messages=[{"role": "user", "content": prompt}],
context=context,
)
msg = AgentMessage(
conversation_id=conv.id, role="assistant", content=reply, is_proactive=True
)
db.add(msg)
await db.commit()
await db.refresh(msg)
return msg
@router.delete("/{conv_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_conversation(
project_id: uuid.UUID, conv_id: uuid.UUID, db: DB, current_user: CurrentUser
):
conv = await _get_conversation_or_404(conv_id, project_id, db)
await db.delete(conv)
await db.commit()

134
backend/app/api/evms.py Normal file
View File

@@ -0,0 +1,134 @@
"""EVMS API — PV·EV·AC·SPI·CPI"""
import uuid
from datetime import date
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from app.deps import CurrentUser, DB
from app.models.evms import EVMSSnapshot
from app.models.project import Project
from app.services.evms_service import compute_evms
router = APIRouter(prefix="/projects/{project_id}/evms", tags=["EVMS"])
class EVMSRequest(BaseModel):
snapshot_date: date = date.today()
actual_cost: float | None = None # 실제 투입 비용 (없으면 추정)
save: bool = True # DB 저장 여부
class EVMSResponse(BaseModel):
id: uuid.UUID | None = None
project_id: uuid.UUID
snapshot_date: date
total_budget: float | None
planned_progress: float | None
actual_progress: float | None
pv: float | None
ev: float | None
ac: float | None
spi: float | None
cpi: float | None
eac: float | None
etc: float | None
detail_json: dict | None = None
model_config = {"from_attributes": True}
async def _get_project_or_404(project_id: uuid.UUID, db: DB) -> Project:
r = await db.execute(select(Project).where(Project.id == project_id))
p = r.scalar_one_or_none()
if not p:
raise HTTPException(status_code=404, detail="프로젝트를 찾을 수 없습니다")
return p
@router.post("/compute", response_model=EVMSResponse)
async def compute_evms_endpoint(
project_id: uuid.UUID,
data: EVMSRequest,
db: DB,
current_user: CurrentUser,
):
"""EVMS 계산 (옵션: DB 저장)"""
await _get_project_or_404(project_id, db)
result = await compute_evms(db, project_id, data.snapshot_date, data.actual_cost)
if data.save:
snapshot = EVMSSnapshot(
project_id=project_id,
snapshot_date=data.snapshot_date,
total_budget=result["total_budget"],
planned_progress=result["planned_progress"],
actual_progress=result["actual_progress"],
pv=result["pv"],
ev=result["ev"],
ac=result["ac"],
spi=result["spi"],
cpi=result["cpi"],
eac=result["eac"],
etc=result["etc"],
detail_json={"tasks": result["detail"]},
)
db.add(snapshot)
await db.commit()
await db.refresh(snapshot)
return EVMSResponse(
id=snapshot.id,
project_id=project_id,
snapshot_date=snapshot.snapshot_date,
total_budget=snapshot.total_budget,
planned_progress=snapshot.planned_progress,
actual_progress=snapshot.actual_progress,
pv=snapshot.pv, ev=snapshot.ev, ac=snapshot.ac,
spi=snapshot.spi, cpi=snapshot.cpi,
eac=snapshot.eac, etc=snapshot.etc,
detail_json=snapshot.detail_json,
)
return EVMSResponse(
project_id=project_id,
snapshot_date=data.snapshot_date,
total_budget=result["total_budget"],
planned_progress=result["planned_progress"],
actual_progress=result["actual_progress"],
pv=result["pv"], ev=result["ev"], ac=result["ac"],
spi=result["spi"], cpi=result["cpi"],
eac=result["eac"], etc=result["etc"],
detail_json={"tasks": result["detail"]},
)
@router.get("", response_model=list[EVMSResponse])
async def list_snapshots(
project_id: uuid.UUID, db: DB, current_user: CurrentUser
):
"""EVMS 스냅샷 이력 조회"""
r = await db.execute(
select(EVMSSnapshot)
.where(EVMSSnapshot.project_id == project_id)
.order_by(EVMSSnapshot.snapshot_date.desc())
)
return r.scalars().all()
@router.get("/latest", response_model=EVMSResponse)
async def latest_snapshot(
project_id: uuid.UUID, db: DB, current_user: CurrentUser
):
"""최근 EVMS 스냅샷 조회"""
r = await db.execute(
select(EVMSSnapshot)
.where(EVMSSnapshot.project_id == project_id)
.order_by(EVMSSnapshot.snapshot_date.desc())
.limit(1)
)
snap = r.scalar_one_or_none()
if not snap:
raise HTTPException(status_code=404, detail="EVMS 스냅샷이 없습니다. /compute 를 먼저 실행하세요.")
return snap

171
backend/app/api/geofence.py Normal file
View File

@@ -0,0 +1,171 @@
"""
Geofence 위험구역 API
- 위험구역 CRUD (익명 — 개인 이동 경로 비수집)
- 진입 이벤트 웹훅 (카카오맵 또는 모바일 앱에서 호출)
- ANJEON 에이전트 연동 경보
"""
import uuid
from datetime import datetime
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
from sqlalchemy import select
from app.deps import CurrentUser, DB
from app.models.agent import GeofenceZone
from app.models.project import Project
router = APIRouter(prefix="/projects/{project_id}/geofence", tags=["Geofence 위험구역"])
ZONE_TYPE_LABELS = {
"excavation": "굴착면",
"crane": "크레인 반경",
"confined": "밀폐공간",
"high_voltage": "고압선 인근",
"slope": "비탈면",
"custom": "사용자 정의",
}
class ZoneCreate(BaseModel):
name: str
zone_type: str
coordinates: list[list[float]] # [[lat, lng], ...]
radius_m: float | None = None
description: str | None = None
class ZoneUpdate(BaseModel):
name: str | None = None
is_active: bool | None = None
description: str | None = None
class ZoneResponse(BaseModel):
id: uuid.UUID
name: str
zone_type: str
zone_type_label: str
coordinates: list
radius_m: float | None
is_active: bool
description: str | None
model_config = {"from_attributes": True}
class EntryEvent(BaseModel):
"""위험구역 진입 이벤트 (개인 식별 정보 없음)"""
zone_id: uuid.UUID
device_token: str # 익명 토큰 (개인 특정 불가)
timestamp: datetime | None = None
async def _get_project_or_404(project_id: uuid.UUID, db: DB) -> Project:
r = await db.execute(select(Project).where(Project.id == project_id))
p = r.scalar_one_or_none()
if not p:
raise HTTPException(status_code=404, detail="프로젝트를 찾을 수 없습니다")
return p
def _to_response(zone: GeofenceZone) -> ZoneResponse:
return ZoneResponse(
id=zone.id,
name=zone.name,
zone_type=zone.zone_type,
zone_type_label=ZONE_TYPE_LABELS.get(zone.zone_type, zone.zone_type),
coordinates=zone.coordinates,
radius_m=zone.radius_m,
is_active=zone.is_active,
description=zone.description,
)
@router.get("", response_model=list[ZoneResponse])
async def list_zones(project_id: uuid.UUID, db: DB, current_user: CurrentUser):
r = await db.execute(
select(GeofenceZone).where(GeofenceZone.project_id == project_id).order_by(GeofenceZone.created_at)
)
return [_to_response(z) for z in r.scalars().all()]
@router.post("", response_model=ZoneResponse, status_code=status.HTTP_201_CREATED)
async def create_zone(
project_id: uuid.UUID, data: ZoneCreate, db: DB, current_user: CurrentUser
):
await _get_project_or_404(project_id, db)
zone = GeofenceZone(**data.model_dump(), project_id=project_id)
db.add(zone)
await db.commit()
await db.refresh(zone)
return _to_response(zone)
@router.put("/{zone_id}", response_model=ZoneResponse)
async def update_zone(
project_id: uuid.UUID, zone_id: uuid.UUID, data: ZoneUpdate, db: DB, current_user: CurrentUser
):
r = await db.execute(
select(GeofenceZone).where(GeofenceZone.id == zone_id, GeofenceZone.project_id == project_id)
)
zone = r.scalar_one_or_none()
if not zone:
raise HTTPException(status_code=404, detail="구역을 찾을 수 없습니다")
for k, v in data.model_dump(exclude_none=True).items():
setattr(zone, k, v)
await db.commit()
await db.refresh(zone)
return _to_response(zone)
@router.delete("/{zone_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_zone(
project_id: uuid.UUID, zone_id: uuid.UUID, db: DB, current_user: CurrentUser
):
r = await db.execute(
select(GeofenceZone).where(GeofenceZone.id == zone_id, GeofenceZone.project_id == project_id)
)
zone = r.scalar_one_or_none()
if not zone:
raise HTTPException(status_code=404, detail="구역을 찾을 수 없습니다")
await db.delete(zone)
await db.commit()
@router.post("/entry-event")
async def zone_entry_event(
project_id: uuid.UUID,
event: EntryEvent,
db: DB,
):
"""
위험구역 진입 감지 웹훅 (모바일 앱 또는 카카오맵에서 호출)
- 개인 이동 경로 비수집
- 진입 이벤트만 기록하고 ANJEON 에이전트가 경보를 생성합니다
"""
r = await db.execute(
select(GeofenceZone).where(
GeofenceZone.id == event.zone_id,
GeofenceZone.project_id == project_id,
GeofenceZone.is_active == True,
)
)
zone = r.scalar_one_or_none()
if not zone:
raise HTTPException(status_code=404, detail="활성 위험구역을 찾을 수 없습니다")
# ANJEON 에이전트로 경보 메시지 생성
from app.services.agents.anjeon import anjeon_agent
alert_message = (
f"⚠️ 위험구역 진입 감지\n"
f"구역: {zone.name} ({ZONE_TYPE_LABELS.get(zone.zone_type, zone.zone_type)})\n"
f"시각: {(event.timestamp or datetime.now()).strftime('%H:%M')}\n"
f"즉시 해당 구역을 확인하고 안전 조치를 취하세요."
)
return {
"zone_id": str(zone.id),
"zone_name": zone.name,
"alert": alert_message,
"timestamp": (event.timestamp or datetime.now()).isoformat(),
}

View File

@@ -6,6 +6,7 @@ from app.models.permit import PermitItem, PermitStatus
from app.models.project import Project
from pydantic import BaseModel
from datetime import date, datetime
from app.services.ai_engine import complete
class PermitCreate(BaseModel):
@@ -92,3 +93,66 @@ async def delete_permit(project_id: uuid.UUID, permit_id: uuid.UUID, db: DB, cur
raise HTTPException(status_code=404, detail="인허가 항목을 찾을 수 없습니다")
await db.delete(permit)
await db.commit()
class PermitDerivationRequest(BaseModel):
work_types: list[str]
construction_type: str
start_date: date | None = None
auto_create: bool = True
@router.post("/derive", response_model=list[PermitResponse], status_code=status.HTTP_201_CREATED)
async def derive_permits(
project_id: uuid.UUID,
data: PermitDerivationRequest,
db: DB,
current_user: CurrentUser,
):
"""공종 입력 → 필요 인허가 항목 AI 자동 도출"""
await _get_project_or_404(project_id, db)
system = """당신은 건설공사 인허가 전문가입니다.
공사 정보를 보고 필요한 인허가 항목을 JSON 배열로만 반환하세요.
각 항목: {"permit_type": "도로점용허가", "authority": "관할 시·군·구청", "notes": "착공 30일 전 신청"}
법령 근거를 notes에 간략히 포함하세요."""
user_msg = (
f"공사 유형: {data.construction_type}\n"
f"주요 공종: {', '.join(data.work_types)}\n"
f"착공 예정: {data.start_date or '미정'}\n\n"
"이 공사에 필요한 인허가 항목을 모두 도출해주세요. JSON 배열로만 반환하세요."
)
raw = await complete(
messages=[{"role": "user", "content": user_msg}],
system=system,
temperature=0.2,
)
import json, re
json_match = re.search(r"\[.*\]", raw, re.DOTALL)
if not json_match:
raise HTTPException(status_code=500, detail="AI 응답 파싱 실패")
items_data = json.loads(json_match.group())
created = []
if data.auto_create:
for idx, item in enumerate(items_data):
permit = PermitItem(
project_id=project_id,
permit_type=item.get("permit_type", "미정"),
authority=item.get("authority"),
required=True,
notes=item.get("notes"),
sort_order=idx,
deadline=data.start_date,
)
db.add(permit)
created.append(permit)
await db.commit()
for p in created:
await db.refresh(p)
return created

118
backend/app/api/vision.py Normal file
View File

@@ -0,0 +1,118 @@
"""
Vision AI API — Level 1 (사진 분류) + Level 2 (안전장비 감지)
"""
import uuid
from fastapi import APIRouter, UploadFile, File, HTTPException, Form
from fastapi.responses import JSONResponse
from sqlalchemy import select
from app.deps import CurrentUser, DB
from app.models.project import Project
from app.models.daily_report import DailyReport, DailyReportPhoto
from app.services.vision_service import classify_photo, analyze_safety
router = APIRouter(prefix="/projects/{project_id}/vision", tags=["Vision AI"])
ALLOWED_TYPES = {"image/jpeg", "image/png", "image/webp", "image/heic"}
MAX_SIZE_MB = 10
async def _get_project_or_404(project_id: uuid.UUID, db: DB) -> Project:
r = await db.execute(select(Project).where(Project.id == project_id))
p = r.scalar_one_or_none()
if not p:
raise HTTPException(status_code=404, detail="프로젝트를 찾을 수 없습니다")
return p
@router.post("/classify")
async def classify_field_photo(
project_id: uuid.UUID,
db: DB,
current_user: CurrentUser,
file: UploadFile = File(...),
location_hint: str | None = Form(None),
daily_report_id: uuid.UUID | None = Form(None),
):
"""
현장 사진 분류 (Vision AI Level 1)
- 공종 자동 분류
- 안전장비 착용 여부 1차 확인
- 작업일보 캡션 자동 생성
- daily_report_id 제공 시 해당 일보에 사진 자동 첨부
"""
await _get_project_or_404(project_id, db)
# 파일 검증
content_type = file.content_type or "image/jpeg"
if content_type not in ALLOWED_TYPES:
raise HTTPException(
status_code=400,
detail=f"지원하지 않는 파일 형식입니다. 허용: {', '.join(ALLOWED_TYPES)}"
)
image_data = await file.read()
if len(image_data) > MAX_SIZE_MB * 1024 * 1024:
raise HTTPException(status_code=400, detail=f"파일 크기가 {MAX_SIZE_MB}MB를 초과합니다")
# Vision AI 분류
result = await classify_photo(
image_data=image_data,
media_type=content_type,
location_hint=location_hint,
)
# 작업일보에 사진 첨부 (daily_report_id 제공 시)
if daily_report_id:
dr = await db.execute(
select(DailyReport).where(
DailyReport.id == daily_report_id,
DailyReport.project_id == project_id,
)
)
report = dr.scalar_one_or_none()
if report:
# 사진 수 카운트
photos_q = await db.execute(
select(DailyReportPhoto).where(DailyReportPhoto.daily_report_id == daily_report_id)
)
existing = photos_q.scalars().all()
photo = DailyReportPhoto(
daily_report_id=daily_report_id,
s3_key=f"vision/{project_id}/{daily_report_id}/{file.filename or 'photo.jpg'}",
caption=result.get("caption", ""),
sort_order=len(existing),
)
db.add(photo)
await db.commit()
result["attached_to_report"] = str(daily_report_id)
return JSONResponse(content=result)
@router.post("/safety-check")
async def safety_check(
project_id: uuid.UUID,
db: DB,
current_user: CurrentUser,
file: UploadFile = File(...),
):
"""
안전장비 착용 감지 (Vision AI Level 2)
안전모/안전조끼 착용 여부를 분석하고 위반 사항을 반환합니다.
최종 판정은 현장 책임자가 합니다.
"""
await _get_project_or_404(project_id, db)
content_type = file.content_type or "image/jpeg"
if content_type not in ALLOWED_TYPES:
raise HTTPException(status_code=400, detail="지원하지 않는 파일 형식입니다")
image_data = await file.read()
if len(image_data) > MAX_SIZE_MB * 1024 * 1024:
raise HTTPException(status_code=400, detail=f"파일 크기가 {MAX_SIZE_MB}MB를 초과합니다")
result = await analyze_safety(image_data=image_data, media_type=content_type)
result["disclaimer"] = "이 결과는 AI 1차 분석이며, 최종 판정은 현장 책임자가 합니다."
return JSONResponse(content=result)

View File

@@ -2,7 +2,11 @@ from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from app.config import settings
from app.api import auth, projects, tasks, daily_reports, reports, inspections, weather, rag, kakao, permits, quality, settings as settings_router
from app.api import (
auth, projects, tasks, daily_reports, reports, inspections,
weather, rag, kakao, permits, quality, settings as settings_router,
agents, evms, vision, geofence,
)
from app.services.scheduler import start_scheduler, stop_scheduler
@@ -45,6 +49,10 @@ def create_app() -> FastAPI:
app.include_router(kakao.router, prefix=api_prefix)
app.include_router(permits.router, prefix=api_prefix)
app.include_router(quality.router, prefix=api_prefix)
app.include_router(agents.router, prefix=api_prefix)
app.include_router(evms.router, prefix=api_prefix)
app.include_router(vision.router, prefix=api_prefix)
app.include_router(geofence.router, prefix=api_prefix)
app.include_router(settings_router.router, prefix=api_prefix)
@app.get("/health")

View File

@@ -9,6 +9,8 @@ from .weather import WeatherData, WeatherAlert
from .permit import PermitItem
from .rag import RagSource, RagChunk
from .settings import ClientProfile, AlertRule, WorkTypeLibrary
from .agent import AgentConversation, AgentMessage, GeofenceZone, AgentType
from .evms import EVMSSnapshot
__all__ = [
"User",
@@ -22,4 +24,6 @@ __all__ = [
"PermitItem",
"RagSource", "RagChunk",
"ClientProfile", "AlertRule", "WorkTypeLibrary",
"AgentConversation", "AgentMessage", "GeofenceZone", "AgentType",
"EVMSSnapshot",
]

View File

@@ -0,0 +1,74 @@
import uuid
import enum
from sqlalchemy import String, Text, ForeignKey, Enum as SAEnum, Boolean
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.core.database import Base
from app.models.base import TimestampMixin, UUIDMixin
class AgentType(str, enum.Enum):
GONGSA = "gongsa" # 공사 담당
PUMJIL = "pumjil" # 품질 담당
ANJEON = "anjeon" # 안전 담당
GUMU = "gumu" # 공무 담당
class ConversationStatus(str, enum.Enum):
ACTIVE = "active"
CLOSED = "closed"
class AgentConversation(Base, UUIDMixin, TimestampMixin):
"""에이전트와의 대화 세션"""
__tablename__ = "agent_conversations"
project_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False)
user_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False)
agent_type: Mapped[AgentType] = mapped_column(SAEnum(AgentType, name="agent_type"), nullable=False)
title: Mapped[str | None] = mapped_column(String(200), nullable=True)
status: Mapped[ConversationStatus] = mapped_column(
SAEnum(ConversationStatus, name="conversation_status"),
default=ConversationStatus.ACTIVE,
nullable=False,
)
# relationships
project: Mapped["Project"] = relationship("Project")
user: Mapped["User"] = relationship("User")
messages: Mapped[list["AgentMessage"]] = relationship(
"AgentMessage", back_populates="conversation", cascade="all, delete-orphan",
order_by="AgentMessage.created_at",
)
class AgentMessage(Base, UUIDMixin, TimestampMixin):
"""에이전트 대화 메시지"""
__tablename__ = "agent_messages"
conversation_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("agent_conversations.id"), nullable=False
)
role: Mapped[str] = mapped_column(String(20), nullable=False) # user | assistant | system
content: Mapped[str] = mapped_column(Text, nullable=False)
metadata: Mapped[dict | None] = mapped_column(JSONB, nullable=True) # action proposals, references, etc.
is_proactive: Mapped[bool] = mapped_column(Boolean, default=False) # 에이전트가 먼저 보낸 메시지
# relationships
conversation: Mapped["AgentConversation"] = relationship("AgentConversation", back_populates="messages")
class GeofenceZone(Base, UUIDMixin, TimestampMixin):
"""익명 위험구역 Geofence"""
__tablename__ = "geofence_zones"
project_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False)
name: Mapped[str] = mapped_column(String(100), nullable=False) # "3공구 굴착면", "크레인 반경"
zone_type: Mapped[str] = mapped_column(String(50), nullable=False) # excavation, crane, confined_space
coordinates: Mapped[list] = mapped_column(JSONB, nullable=False) # [[lat,lng], ...]
radius_m: Mapped[float | None] = mapped_column(nullable=True) # 원형 구역용 반경(m)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
# relationships
project: Mapped["Project"] = relationship("Project")

View File

@@ -0,0 +1,49 @@
import uuid
from datetime import date
from sqlalchemy import Date, Float, Text, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.core.database import Base
from app.models.base import TimestampMixin, UUIDMixin
class EVMSSnapshot(Base, UUIDMixin, TimestampMixin):
"""
EVMS 공정 성과 스냅샷 (기준일 기준 PV/EV/AC 저장)
PV (Planned Value) = 계획 공정률 기준 예산 투입액
EV (Earned Value) = 실제 완료 공정률 기준 예산 투입액
AC (Actual Cost) = 실제 투입 비용
SPI = EV / PV (1 이상: 공정 앞서감, 미만: 지연)
CPI = EV / AC (1 이상: 비용 효율적, 미만: 비용 초과)
"""
__tablename__ = "evms_snapshots"
project_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False)
snapshot_date: Mapped[date] = mapped_column(Date, nullable=False, index=True)
# 예산 (원)
total_budget: Mapped[float | None] = mapped_column(Float, nullable=True)
# 공정률 (%)
planned_progress: Mapped[float | None] = mapped_column(Float, nullable=True) # 계획 공정률
actual_progress: Mapped[float | None] = mapped_column(Float, nullable=True) # 실제 공정률
# EVM 핵심 지표 (원)
pv: Mapped[float | None] = mapped_column(Float, nullable=True) # Planned Value
ev: Mapped[float | None] = mapped_column(Float, nullable=True) # Earned Value
ac: Mapped[float | None] = mapped_column(Float, nullable=True) # Actual Cost
# 파생 지수
spi: Mapped[float | None] = mapped_column(Float, nullable=True) # Schedule Performance Index
cpi: Mapped[float | None] = mapped_column(Float, nullable=True) # Cost Performance Index
# 예측
eac: Mapped[float | None] = mapped_column(Float, nullable=True) # Estimate at Completion
etc: Mapped[float | None] = mapped_column(Float, nullable=True) # Estimate to Complete
notes: Mapped[str | None] = mapped_column(Text, nullable=True)
detail_json: Mapped[dict | None] = mapped_column(JSONB, nullable=True) # WBS별 세부 내역
# relationships
project: Mapped["Project"] = relationship("Project")

View File

View File

@@ -0,0 +1,44 @@
"""ANJEON — 안전 담당 에이전트"""
from .base import BaseAgent
class AnjeonAgent(BaseAgent):
agent_type = "anjeon"
name_ko = "안전 에이전트 (ANJEON)"
@property
def system_prompt(self) -> str:
return """당신은 ANJEON입니다. 소형 건설현장의 안전 담당 AI 에이전트입니다.
## 역할과 책임
- 위험 공정 시작 **전** 사전 경보 및 점검 항목 발송
- TBM(Tool Box Meeting) 자료 자동 생성
- 위험구역 Geofence 진입 감지 경보 (익명, 개인 이동 추적 없음)
- 중대재해처벌법 관련 Q&A (RAG 기반)
- 안전교육 실시 여부 확인 및 미실시 알림
## 행동 원칙
1. **안전은 타협하지 않습니다** — 기준 미달 시 단호하게 작업 중단을 권고합니다
2. 법령/기준을 반드시 인용합니다 (산업안전보건법, 중대재해처벌법)
3. 위험구역 감지 시 개인 식별 정보 없이 "위험구역 진입 감지" 알림만 발송합니다
4. TBM은 당일 작업 공종에 맞게 맞춤 생성합니다
## 주요 위험 공종별 기준
- 굴착 5m 이상: 흙막이 설치, 낙하 방지망, 출입 통제
- 고소 작업 2m 이상: 안전난간 또는 안전네트, 안전대 착용
- 크레인 작업: 신호수 배치, 인양 반경 출입 통제
- 콘크리트 타설: 바이브레이터 감전 주의, 거푸집 점검
- 밀폐공간: 산소/유해가스 측정, 감시자 배치
## TBM 형식
1. 오늘의 위험 작업
2. 핵심 안전 수칙 3가지
3. 비상 연락처
4. "안전 확인합니다" 서명란
## 중요 면책 고지
"이 답변은 참고용이며 법률 자문이 아닙니다. 중대한 안전 결정은 전문가에게 문의하세요."
"""
anjeon_agent = AnjeonAgent()

View File

@@ -0,0 +1,138 @@
"""
에이전트 베이스 클래스
각 에이전트는 독립된 페르소나 + Claude 인스턴스로 구동됩니다.
- 엔진 DB를 공유하여 프로젝트 데이터 접근
- 에이전트는 제안하고, 사람이 결정합니다
- 모든 대화는 DB에 기록됩니다
"""
from abc import ABC, abstractmethod
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from app.services.ai_engine import complete
class BaseAgent(ABC):
"""모든 에이전트의 공통 베이스"""
agent_type: str
name_ko: str
@property
@abstractmethod
def system_prompt(self) -> str:
"""에이전트 고유 시스템 프롬프트"""
...
async def chat(
self,
messages: list[dict],
context: dict | None = None,
temperature: float = 0.5,
) -> str:
"""
대화 수행
messages: [{"role": "user"|"assistant", "content": "..."}]
context: 프로젝트/날씨/태스크 등 컨텍스트 (시스템 프롬프트에 주입)
"""
system = self.system_prompt
if context:
system += "\n\n## 현재 컨텍스트\n" + self._format_context(context)
return await complete(
messages=messages,
system=system,
temperature=temperature,
max_tokens=2048,
)
def _format_context(self, context: dict) -> str:
lines = []
if context.get("project_name"):
lines.append(f"- 프로젝트: {context['project_name']}")
if context.get("today"):
lines.append(f"- 오늘 날짜: {context['today']}")
if context.get("weather"):
lines.append(f"- 오늘 날씨: {context['weather']}")
if context.get("active_tasks"):
lines.append(f"- 진행 중 공종: {', '.join(context['active_tasks'])}")
if context.get("pending_inspections"):
lines.append(f"- 미완료 검측: {context['pending_inspections']}")
if context.get("overdue_tests"):
lines.append(f"- 기한 초과 품질시험: {context['overdue_tests']}")
if context.get("overdue_permits"):
lines.append(f"- 지연 인허가: {context['overdue_permits']}")
if context.get("schedule_delay_days"):
lines.append(f"- 공정 지연: {context['schedule_delay_days']}")
return "\n".join(lines)
async def build_context(self, db: AsyncSession, project_id: str) -> dict:
"""프로젝트 컨텍스트를 DB에서 조회하여 반환"""
from datetime import date
from sqlalchemy import select, func
from app.models.project import Project
from app.models.task import Task
from app.models.weather import WeatherData
from app.models.inspection import InspectionRequest, InspectionStatus
from app.models.permit import PermitItem
import uuid
pid = uuid.UUID(str(project_id))
# 프로젝트 정보
proj_r = await db.execute(select(Project).where(Project.id == pid))
project = proj_r.scalar_one_or_none()
if not project:
return {}
# 오늘 날씨
today = date.today()
weather_r = await db.execute(
select(WeatherData).where(WeatherData.project_id == pid, WeatherData.forecast_date == today)
)
weather = weather_r.scalar_one_or_none()
weather_summary = None
if weather:
parts = []
if weather.sky_condition:
parts.append(weather.sky_condition)
if weather.temperature_max is not None:
parts.append(f"최고 {weather.temperature_max:.0f}°C")
if weather.precipitation_mm and weather.precipitation_mm > 0:
parts.append(f"강수 {weather.precipitation_mm:.1f}mm")
weather_summary = " / ".join(parts) if parts else None
# 진행 중 태스크
tasks_r = await db.execute(
select(Task).where(Task.project_id == pid, Task.status.in_(["in_progress", "not_started"]))
)
tasks = tasks_r.scalars().all()
active_tasks = list({t.name for t in tasks if t.status == "in_progress"})[:5]
# 미완료 검측
insp_r = await db.execute(
select(func.count()).where(
InspectionRequest.project_id == pid,
InspectionRequest.status == InspectionStatus.DRAFT,
)
)
pending_inspections = insp_r.scalar() or 0
# 지연 인허가
permit_r = await db.execute(
select(func.count()).where(
PermitItem.project_id == pid,
PermitItem.status.in_(["pending", "in_progress"]),
PermitItem.due_date < today,
)
)
overdue_permits = permit_r.scalar() or 0
return {
"project_name": project.name,
"today": str(today),
"weather": weather_summary,
"active_tasks": active_tasks,
"pending_inspections": pending_inspections,
"overdue_permits": overdue_permits,
}

View File

@@ -0,0 +1,46 @@
"""GONGSA — 공사 담당 에이전트"""
from .base import BaseAgent
class GongsaAgent(BaseAgent):
agent_type = "gongsa"
name_ko = "공사 에이전트 (GONGSA)"
@property
def system_prompt(self) -> str:
return """당신은 GONGSA입니다. 소형 건설현장의 공사 담당 AI 에이전트입니다.
## 역할과 책임
- 매일 아침 공정 브리핑 (날씨·예정 공종·전일 실적 종합)
- 공기 지연 징후 선제 감지 및 만회 방안 제안
- 주간 공정계획 초안 작성
- 작업일보 자동 완성 지원
- 날씨 연동 작업 조정 제안 (콘크리트 5°C 기준, 강우 시 토공 등)
## 행동 원칙
1. **제안하고, 현장소장이 결정한다** — 절대 직접 결정하지 않습니다
2. 수치 기반으로 말합니다 ("공정률 63%, 계획 대비 -7%p")
3. 날씨 데이터를 항상 참조합니다
4. 공기 지연 감지 시 구체적인 만회 방안 3가지를 제안합니다
5. 짧고 명확하게 — 현장소장은 바쁩니다
## 응답 형식
- 카카오톡 메시지처럼 간결하게
- 중요 수치는 굵게 강조 (**)
- 액션이 필요하면 끝에 "→ [예/아니오]로 답해주세요" 형식 추가
- 불필요한 서론 없이 바로 핵심부터
## 예시 응답
"오늘 **3공구 콘크리트 타설** 예정입니다.
현재 기온 **4°C** — 타설 기준(5°C) 미달입니다.
오후 2시 **8°C** 예상 → 오후 타설로 조정하시겠습니까?
→ [예/아니오]로 답해주세요"
## 공종별 날씨 기준
- 콘크리트 타설: 기온 5°C 이상, 강우 없음
- 철근 작업: 강풍(10m/s 이상) 시 주의
- 터파기/토공: 강우 5mm/h 이상 시 중단
- 고소 작업: 강풍 10m/s 이상 시 중단"""
gongsa_agent = GongsaAgent()

View File

@@ -0,0 +1,42 @@
"""GUMU — 공무 담당 에이전트"""
from .base import BaseAgent
class GumuAgent(BaseAgent):
agent_type = "gumu"
name_ko = "공무 에이전트 (GUMU)"
@property
def system_prompt(self) -> str:
return """당신은 GUMU입니다. 소형 건설현장의 공무(행정) 담당 AI 에이전트입니다.
## 역할과 책임
- 인허가 처리 현황 능동 추적 (지연·기한 임박 선제 알림)
- 주간 공정보고서 초안 자동 작성 및 발송 제안
- 기성청구 적기 알림 및 PDF 패키지 생성 제안
- 설계변경 서류화 지원
- 발주처 보고 일정 관리
## 행동 원칙
1. **기한을 놓치면 공사가 멈춥니다** — 기한 알림을 강조합니다
2. 행정 서류는 정확성이 생명 — 수치를 항상 재확인합니다
3. 발주처와의 커뮤니케이션은 공식적으로
4. 기성청구는 타이밍이 중요 — 청구 가능 시점에 즉시 알립니다
## 주요 관리 항목
- 인허가: 도로점용허가, 굴착공사 신고, 건설업 등록, 안전관리계획서 등
- 기성: 기성 청구 시점, 기성 금액 산출, 기성 패키지 (일보+품질+검측)
- 보고: 주간보고, 월간보고, 준공 예정 보고
## 인허가 처리 기한 기준
- 도로점용허가: 접수 후 20일 이내
- 굴착공사 신고: 착공 7일 전까지
- 안전관리계획서 제출: 착공 전
## 응답 형식
- 기한 정보는 D-N 형식으로 (예: "D-3")
- 처리 상태는 이모지 활용 (✅ 완료 / ⚠ 진행중 / ❌ 지연)
- 금액은 천 단위 구분 (1,234,000원)"""
gumu_agent = GumuAgent()

View File

@@ -0,0 +1,38 @@
"""PUMJIL — 품질 담당 에이전트"""
from .base import BaseAgent
class PumjilAgent(BaseAgent):
agent_type = "pumjil"
name_ko = "품질 에이전트 (PUMJIL)"
@property
def system_prompt(self) -> str:
return """당신은 PUMJIL입니다. 소형 건설현장의 품질 담당 AI 에이전트입니다.
## 역할과 책임
- 시공 시작 **1시간 전** 품질 체크리스트 자동 발송
- Vision AI 1차 분류 리포트 생성 (사진 전송 시)
- 품질시험 기한 능동 추적 (미실시/기한 초과 알림)
- 불합격 발생 즉시 원인 분석 및 재시험 절차 안내
- KCS(한국건설시방서) 기준 준수 여부 확인
## 행동 원칙
1. **시방서/기준 근거를 항상 제시합니다** (예: "KCS 14 20 10 5.2.3 기준")
2. 현장 사진 분류 시 최종 합격/불합격 판정은 현장 책임자가 합니다
3. 품질시험 미실시는 법적 책임이므로 강하게 알립니다
4. 짧고 명확하게
## 주요 공종별 필수 품질시험
- 콘크리트: 슬럼프, 공기량, 압축강도(7일/28일)
- 철근: 인장강도, 항복강도 (자재 반입 시)
- 토공: 다짐도(들밀도 또는 현장 CBR)
- 아스팔트: 코어 밀도, 두께
## 응답 형식
- 체크리스트는 번호 목록으로
- 기준값은 [기준: XX] 형식으로 명시
- 사진 분류 시 "✓ 정상 추정 / ⚠ 확인 필요" 구분"""
pumjil_agent = PumjilAgent()

View File

@@ -0,0 +1,46 @@
"""
에이전트 라우터
사용자 메시지의 의도를 파악하여 적절한 에이전트로 분배합니다.
"""
from app.models.agent import AgentType
from .gongsa import gongsa_agent
from .pumjil import pumjil_agent
from .anjeon import anjeon_agent
from .gumu import gumu_agent
from .base import BaseAgent
# 에이전트 레지스트리
AGENTS: dict[AgentType, BaseAgent] = {
AgentType.GONGSA: gongsa_agent,
AgentType.PUMJIL: pumjil_agent,
AgentType.ANJEON: anjeon_agent,
AgentType.GUMU: gumu_agent,
}
# 키워드 기반 자동 라우팅
_ROUTING_RULES: list[tuple[list[str], AgentType]] = [
# 공사/공정
(["공정", "일정", "지연", "타설", "굴착", "공기", "브리핑", "작업", "일보", "진도"], AgentType.GONGSA),
# 품질
(["품질", "시험", "검사", "슬럼프", "압축강도", "합격", "불합격", "KCS", "시방서", "체크리스트"], AgentType.PUMJIL),
# 안전
(["안전", "사고", "위험", "TBM", "교육", "중대재해", "보호구", "추락", "굴착 안전", "Geofence", "지오펜스"], AgentType.ANJEON),
# 공무
(["인허가", "허가", "신고", "기성", "보고서", "발주처", "행정", "서류", "청구"], AgentType.GUMU),
]
def route_by_keyword(message: str) -> AgentType:
"""키워드 매칭으로 적절한 에이전트 타입을 반환. 매칭 없으면 GONGSA."""
msg_lower = message.lower()
scores: dict[AgentType, int] = {t: 0 for t in AgentType}
for keywords, agent_type in _ROUTING_RULES:
for kw in keywords:
if kw in msg_lower:
scores[agent_type] += 1
best = max(scores, key=lambda t: scores[t])
return best if scores[best] > 0 else AgentType.GONGSA
def get_agent(agent_type: AgentType) -> BaseAgent:
return AGENTS[agent_type]

View File

@@ -0,0 +1,132 @@
"""
EVMS (Earned Value Management System) 계산 서비스
PV, EV, AC, SPI, CPI 산출
"""
from datetime import date
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.task import Task
from app.models.project import Project
def _clamp(v: float, lo: float, hi: float) -> float:
return max(lo, min(hi, v))
async def compute_evms(
db: AsyncSession,
project_id,
snapshot_date: date,
actual_cost: float | None = None,
) -> dict:
"""
WBS/Task 기반 EVMS 지표 계산.
PV = 총예산 × 기준일 계획 공정률
EV = 총예산 × 기준일 실제 공정률
AC = 실제 투입 비용 (입력값 없으면 EV × 0.95 추정)
반환: {pv, ev, ac, spi, cpi, eac, etc, planned_progress, actual_progress, detail}
"""
import uuid
pid = uuid.UUID(str(project_id))
today = snapshot_date
# 프로젝트 정보
proj_r = await db.execute(select(Project).where(Project.id == pid))
project = proj_r.scalar_one_or_none()
if not project:
raise ValueError("프로젝트를 찾을 수 없습니다")
total_budget = float(project.contract_amount or 0)
# 태스크 조회
tasks_r = await db.execute(select(Task).where(Task.project_id == pid))
tasks = tasks_r.scalars().all()
if not tasks:
return {
"total_budget": total_budget,
"planned_progress": 0.0,
"actual_progress": 0.0,
"pv": 0.0, "ev": 0.0, "ac": 0.0,
"spi": None, "cpi": None,
"eac": None, "etc": None,
"detail": [],
}
# 태스크별 PV, EV 계산
total_tasks = len(tasks)
planned_pct_sum = 0.0
actual_pct_sum = 0.0
detail = []
for task in tasks:
# 계획 공정률: 기준일 기준 planned_start ~ planned_end 선형 보간
p_start = task.planned_start
p_end = task.planned_end
task_planned_pct = 0.0
if p_start and p_end:
ps = p_start if isinstance(p_start, date) else date.fromisoformat(str(p_start))
pe = p_end if isinstance(p_end, date) else date.fromisoformat(str(p_end))
if today < ps:
task_planned_pct = 0.0
elif today >= pe:
task_planned_pct = 100.0
else:
total_days = (pe - ps).days or 1
elapsed = (today - ps).days
task_planned_pct = _clamp(elapsed / total_days * 100, 0.0, 100.0)
else:
task_planned_pct = 0.0
task_actual_pct = float(task.progress_pct or 0.0)
planned_pct_sum += task_planned_pct
actual_pct_sum += task_actual_pct
detail.append({
"task_id": str(task.id),
"task_name": task.name,
"planned_pct": round(task_planned_pct, 1),
"actual_pct": round(task_actual_pct, 1),
"is_critical": task.is_critical,
})
planned_progress = round(planned_pct_sum / total_tasks, 1)
actual_progress = round(actual_pct_sum / total_tasks, 1)
pv = total_budget * (planned_progress / 100) if total_budget else None
ev = total_budget * (actual_progress / 100) if total_budget else None
# AC: 입력값 없으면 EV × 1.05 (5% 비용 초과 가정)
if actual_cost is not None:
ac = float(actual_cost)
elif ev is not None:
ac = ev * 1.05
else:
ac = None
spi = round(ev / pv, 3) if (pv and pv > 0 and ev is not None) else None
cpi = round(ev / ac, 3) if (ac and ac > 0 and ev is not None) else None
# EAC (Estimate at Completion) = 총예산 / CPI
eac = round(total_budget / cpi, 0) if (cpi and cpi > 0 and total_budget) else None
# ETC (Estimate to Completion) = EAC - AC
etc = round(eac - ac, 0) if (eac is not None and ac is not None) else None
return {
"total_budget": total_budget,
"planned_progress": planned_progress,
"actual_progress": actual_progress,
"pv": round(pv, 0) if pv is not None else None,
"ev": round(ev, 0) if ev is not None else None,
"ac": round(ac, 0) if ac is not None else None,
"spi": spi,
"cpi": cpi,
"eac": eac,
"etc": etc,
"detail": detail,
}

View File

@@ -0,0 +1,161 @@
"""
Vision AI Level 1 — 현장 사진 분류 서비스
Claude Vision API를 사용하여:
- 공종 자동 분류
- 날짜/위치 태깅 (EXIF 또는 수동)
- 이상 후보 감지 (안전장비 미착용 등)
- 작업일보 자동 첨부용 캡션 생성
"""
import base64
from anthropic import AsyncAnthropic
from app.config import settings
_client = AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY)
WORK_TYPE_LIST = [
"콘크리트 타설", "철근 배근", "거푸집 설치/해체", "터파기/굴착",
"흙막이 공사", "다짐", "관 매설", "아스팔트 포장",
"고소 작업", "크레인 작업", "안전 시설물", "현장 전경", "기타",
]
_SYSTEM = """당신은 건설 현장 사진을 분석하는 AI입니다.
사진을 보고 다음을 정확히 분석하세요:
1. 공종 분류 (단 하나 선택)
2. 진행 상태 (시작 전 / 진행 중 / 완료)
3. 안전장비 착용 여부 (안전모, 안전조끼 식별 가능한 경우)
4. 특이사항 (이상 징후, 주의 필요 사항)
5. 작업일보용 간략 설명 (1-2문장, 한국어)
JSON 형식으로만 응답하세요."""
_USER_TEMPLATE = """이 건설 현장 사진을 분석해주세요.
공종 목록: {work_types}
다음 JSON 형식으로만 응답하세요:
{{
"work_type": "콘크리트 타설",
"status": "진행 중",
"safety_ok": true,
"safety_issues": [],
"anomalies": [],
"caption": "3공구 기초 콘크리트 타설 작업 진행 중",
"confidence": 0.85
}}"""
async def classify_photo(
image_data: bytes,
media_type: str = "image/jpeg",
location_hint: str | None = None,
) -> dict:
"""
현장 사진 분류
image_data: 이미지 바이너리
media_type: image/jpeg, image/png, image/webp
location_hint: 위치 힌트 (예: "3공구", "A구역")
반환:
{
"work_type": str,
"status": str,
"safety_ok": bool,
"safety_issues": list[str],
"anomalies": list[str],
"caption": str,
"confidence": float,
"location_hint": str | None,
}
"""
image_b64 = base64.standard_b64encode(image_data).decode()
user_content = [
{
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": image_b64,
},
},
{
"type": "text",
"text": _USER_TEMPLATE.format(work_types=", ".join(WORK_TYPE_LIST))
+ (f"\n\n위치 정보: {location_hint}" if location_hint else ""),
},
]
response = await _client.messages.create(
model=settings.CLAUDE_MODEL,
max_tokens=512,
system=_SYSTEM,
messages=[{"role": "user", "content": user_content}],
)
raw = response.content[0].text.strip()
# JSON 파싱
import json, re
json_match = re.search(r"\{.*\}", raw, re.DOTALL)
if json_match:
result = json.loads(json_match.group())
else:
result = {
"work_type": "기타",
"status": "확인 필요",
"safety_ok": None,
"safety_issues": [],
"anomalies": ["AI 분석 실패 — 수동 확인 필요"],
"caption": raw[:100],
"confidence": 0.0,
}
result["location_hint"] = location_hint
return result
async def analyze_safety(
image_data: bytes,
media_type: str = "image/jpeg",
) -> dict:
"""
Vision AI Level 2 — 안전장비 착용 감지 (안전모/안전조끼)
PPE(Personal Protective Equipment) 착용 여부 집중 분석
"""
image_b64 = base64.standard_b64encode(image_data).decode()
safety_prompt = """이 건설 현장 사진에서 안전장비 착용 여부를 분석하세요.
다음 JSON 형식으로만 응답하세요:
{
"worker_count": 3,
"helmet_worn": [true, true, false],
"vest_worn": [true, false, true],
"violations": ["3번 작업자: 안전모 미착용"],
"risk_level": "",
"recommendation": "안전모 미착용 작업자 즉시 착용 조치 필요"
}
risk_level: 저(모두 착용) / 중(일부 미착용) / 고(다수 미착용 또는 고소 작업)"""
response = await _client.messages.create(
model=settings.CLAUDE_MODEL,
max_tokens=512,
messages=[{
"role": "user",
"content": [
{
"type": "image",
"source": {"type": "base64", "media_type": media_type, "data": image_b64},
},
{"type": "text", "text": safety_prompt},
],
}],
)
raw = response.content[0].text.strip()
import json, re
json_match = re.search(r"\{.*\}", raw, re.DOTALL)
if json_match:
return json.loads(json_match.group())
return {"error": "분석 실패", "raw": raw[:200]}