feat: v3.1 대시보드 추가 (React + FastAPI)

Media Engine Control Panel — 6탭 웹 대시보드

[백엔드] FastAPI (dashboard/backend/)
- server.py: 포트 8080, CORS, React SPA 서빙
- api_overview.py: KPI 카드 + 파이프라인 상태 + 활동 로그
- api_content.py: 칸반 보드 + 승인/거부 + 수동 트리거
- api_analytics.py: 방문자 추이 + 플랫폼/코너별 성과
- api_novels.py: 소설 목록/생성/에피소드 관리
- api_settings.py: engine.json CRUD
- api_connections.py: AI 서비스 연결 관리 + 키 저장
- api_tools.py: 기능별 AI 도구 선택
- api_cost.py: 구독 현황 + API 사용량 추적
- api_logs.py: 시스템 로그 필터/검색

[프론트엔드] React + Vite + Tailwind + Recharts (dashboard/frontend/)
- Overview: KPI 카드 + 파이프라인 + 코너별 바차트 + 활동 로그
- Content: 4열 칸반 보드 + 상세 모달 + 승인/거부
- Analytics: LineChart 방문자 추이 + 플랫폼별 성과
- Novel: 소설 목록 + 에피소드 테이블 + 새 소설 생성 폼
- Settings: 5개 서브탭 (AI연결/도구선택/배포채널/품질/비용관리)
- Logs: 필터/검색 시스템 로그 뷰어

[디자인] CNN 다크+골드 테마
- 배경 #0a0a0d + 액센트 #c8a84e
- 모바일 반응형 (Tailscale 외부 접속 대응)

[실행]
- dashboard/start.bat 더블클릭 → http://localhost:8080

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sinmb79
2026-03-26 13:17:53 +09:00
parent 8a7a122bb3
commit 213f57b52d
35 changed files with 3971 additions and 0 deletions
+1
View File
@@ -0,0 +1 @@
# dashboard/backend package
+131
View File
@@ -0,0 +1,131 @@
"""
dashboard/backend/api_analytics.py
Analytics 탭 API — 방문자 통계, KPI, 코너별 성과
"""
import json
from datetime import date, timedelta
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Query
BASE_DIR = Path(__file__).parent.parent.parent
DATA_DIR = BASE_DIR / "data"
ANALYTICS_DIR = DATA_DIR / "analytics"
router = APIRouter()
def _load_all_analytics() -> list:
"""analytics/*.json 전체 로드"""
records = []
if not ANALYTICS_DIR.exists():
return records
for f in sorted(ANALYTICS_DIR.glob("*.json")):
try:
data = json.loads(f.read_text(encoding="utf-8"))
if isinstance(data, list):
records.extend(data)
elif isinstance(data, dict):
records.append(data)
except Exception:
pass
return records
def _aggregate_kpi(records: list) -> dict:
total_visitors = sum(r.get("visitors", 0) for r in records)
total_pageviews = sum(r.get("pageviews", 0) for r in records)
avg_duration = 0
avg_ctr = 0.0
durations = [r.get("avg_duration_sec", 0) for r in records if r.get("avg_duration_sec")]
if durations:
avg_duration = int(sum(durations) / len(durations))
ctrs = [r.get("ctr", 0.0) for r in records if r.get("ctr")]
if ctrs:
avg_ctr = round(sum(ctrs) / len(ctrs), 2)
return {
"visitors": total_visitors,
"pageviews": total_pageviews,
"avg_duration_sec": avg_duration,
"ctr": avg_ctr,
}
def _aggregate_corners(records: list) -> list:
corner_map: dict = {}
for r in records:
corner = r.get("corner", "기타")
if corner not in corner_map:
corner_map[corner] = {"visitors": 0, "pageviews": 0, "posts": 0}
corner_map[corner]["visitors"] += r.get("visitors", 0)
corner_map[corner]["pageviews"] += r.get("pageviews", 0)
corner_map[corner]["posts"] += r.get("post_count", 1)
result = []
for name, data in corner_map.items():
result.append({"corner": name, **data})
result.sort(key=lambda x: x["visitors"], reverse=True)
return result
def _top_posts(records: list, limit: int = 5) -> list:
posts = []
for r in records:
if "title" in r and "visitors" in r:
posts.append({
"title": r["title"],
"visitors": r["visitors"],
"corner": r.get("corner", ""),
"published_at": r.get("date", ""),
})
posts.sort(key=lambda x: x["visitors"], reverse=True)
return posts[:limit]
def _platform_performance(records: list) -> list:
platform_map: dict = {}
for r in records:
platform = r.get("platform", "blogger")
if platform not in platform_map:
platform_map[platform] = {"visitors": 0, "posts": 0}
platform_map[platform]["visitors"] += r.get("visitors", 0)
platform_map[platform]["posts"] += 1
return [{"platform": k, **v} for k, v in platform_map.items()]
@router.get("/analytics")
async def get_analytics():
records = _load_all_analytics()
return {
"kpi": _aggregate_kpi(records),
"corners": _aggregate_corners(records),
"top_posts": _top_posts(records),
"platforms": _platform_performance(records),
"total_records": len(records),
}
@router.get("/analytics/chart")
async def get_analytics_chart(days: int = Query(default=7, ge=1, le=365)):
"""days일간 방문자 시계열 데이터"""
records = _load_all_analytics()
today = date.today()
date_range = [(today - timedelta(days=i)).isoformat() for i in range(days - 1, -1, -1)]
# 날짜별 집계
daily: dict = {d: {"date": d, "visitors": 0, "pageviews": 0} for d in date_range}
for r in records:
d = r.get("date", "")[:10]
if d in daily:
daily[d]["visitors"] += r.get("visitors", 0)
daily[d]["pageviews"] += r.get("pageviews", 0)
return {"chart": list(daily.values()), "days": days}
+207
View File
@@ -0,0 +1,207 @@
"""
dashboard/backend/api_connections.py
Settings > Connections 탭 API — AI 서비스 연결 상태 확인/테스트
"""
import json
import os
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from dotenv import load_dotenv
load_dotenv()
BASE_DIR = Path(__file__).parent.parent.parent
CONFIG_PATH = BASE_DIR / "config" / "engine.json"
ENV_PATH = BASE_DIR / ".env"
router = APIRouter()
AI_SERVICES = [
{
"id": "claude",
"name": "Claude (Anthropic)",
"env_key": "ANTHROPIC_API_KEY",
"category": "writing",
"description": "글쓰기 엔진 — claude-opus-4-5",
},
{
"id": "gemini",
"name": "Google Gemini",
"env_key": "GEMINI_API_KEY",
"category": "writing",
"description": "글쓰기 엔진 — gemini-2.0-flash",
},
{
"id": "openai",
"name": "OpenAI (GPT + DALL-E + TTS)",
"env_key": "OPENAI_API_KEY",
"category": "multi",
"description": "이미지(DALL-E 3) + TTS(tts-1-hd)",
},
{
"id": "elevenlabs",
"name": "ElevenLabs TTS",
"env_key": "ELEVENLABS_API_KEY",
"category": "tts",
"description": "고품질 한국어 TTS",
},
{
"id": "google_tts",
"name": "Google Cloud TTS",
"env_key": "GOOGLE_TTS_API_KEY",
"category": "tts",
"description": "Google Wavenet TTS",
},
{
"id": "seedance",
"name": "Seedance AI Video",
"env_key": "SEEDANCE_API_KEY",
"category": "video",
"description": "AI 영상 생성 — Seedance 2.0",
},
{
"id": "runway",
"name": "Runway Gen-3",
"env_key": "RUNWAY_API_KEY",
"category": "video",
"description": "AI 영상 생성 — Gen-3 Turbo",
},
]
class ApiKeyUpdate(BaseModel):
api_key: str
def _mask_key(key: str) -> str:
if not key:
return ""
if len(key) <= 8:
return "****"
return key[:4] + "****" + key[-4:]
def _get_connections():
connections = []
for svc in AI_SERVICES:
key = os.getenv(svc["env_key"], "")
connections.append({
**svc,
"connected": bool(key),
"key_masked": _mask_key(key),
})
return connections
@router.get("/connections")
async def get_connections():
return {"connections": _get_connections()}
@router.post("/connections/{service_id}/test")
async def test_connection(service_id: str):
"""서비스 연결 테스트"""
svc = next((s for s in AI_SERVICES if s["id"] == service_id), None)
if not svc:
raise HTTPException(status_code=404, detail="서비스를 찾을 수 없습니다.")
api_key = os.getenv(svc["env_key"], "")
if not api_key:
return {"success": False, "message": "API 키가 설정되지 않았습니다."}
# 간단한 연결 테스트
try:
if service_id == "claude":
import anthropic
client = anthropic.Anthropic(api_key=api_key)
# 모델 목록으로 연결 테스트
client.messages.create(
model="claude-haiku-4-5",
max_tokens=10,
messages=[{"role": "user", "content": "ping"}],
)
return {"success": True, "message": "Claude 연결 성공"}
elif service_id == "openai":
from openai import OpenAI
client = OpenAI(api_key=api_key)
client.models.list()
return {"success": True, "message": "OpenAI 연결 성공"}
elif service_id == "gemini":
import google.generativeai as genai
genai.configure(api_key=api_key)
model = genai.GenerativeModel("gemini-2.0-flash")
model.generate_content("ping", generation_config={"max_output_tokens": 5})
return {"success": True, "message": "Gemini 연결 성공"}
elif service_id in ("elevenlabs", "seedance", "runway", "google_tts"):
import requests
test_urls = {
"elevenlabs": "https://api.elevenlabs.io/v1/models",
"google_tts": f"https://texttospeech.googleapis.com/v1/voices?key={api_key}",
"seedance": "https://api.seedance2.ai/v1/models",
"runway": "https://api.runwayml.com/v1/organization",
}
headers_map = {
"elevenlabs": {"xi-api-key": api_key},
"runway": {"Authorization": f"Bearer {api_key}"},
}
url = test_urls.get(service_id, "")
headers = headers_map.get(service_id, {})
if url:
resp = requests.get(url, headers=headers, timeout=10)
if resp.status_code < 400:
return {"success": True, "message": f"{svc['name']} 연결 성공"}
else:
return {"success": False, "message": f"HTTP {resp.status_code}"}
return {"success": True, "message": "키 존재 확인됨 (심층 테스트 미지원)"}
except ImportError as e:
return {"success": False, "message": f"라이브러리 미설치: {e}"}
except Exception as e:
return {"success": False, "message": str(e)[:200]}
@router.put("/connections/{service_id}")
async def update_api_key(service_id: str, req: ApiKeyUpdate):
"""API 키를 .env 파일에 저장"""
svc = next((s for s in AI_SERVICES if s["id"] == service_id), None)
if not svc:
raise HTTPException(status_code=404, detail="서비스를 찾을 수 없습니다.")
env_key = svc["env_key"]
api_key = req.api_key.strip()
try:
# .env 파일 읽기
if ENV_PATH.exists():
lines = ENV_PATH.read_text(encoding="utf-8").splitlines()
else:
lines = []
# 기존 키 교체 또는 추가
updated = False
new_lines = []
for line in lines:
if line.startswith(f"{env_key}=") or line.startswith(f"{env_key} ="):
new_lines.append(f"{env_key}={api_key}")
updated = True
else:
new_lines.append(line)
if not updated:
new_lines.append(f"{env_key}={api_key}")
ENV_PATH.write_text("\n".join(new_lines) + "\n", encoding="utf-8")
# 현재 프로세스 환경 변수도 업데이트
os.environ[env_key] = api_key
return {"success": True, "message": f"{env_key} 키 저장 완료"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"키 저장 실패: {e}")
+173
View File
@@ -0,0 +1,173 @@
"""
dashboard/backend/api_content.py
Content 탭 API — 칸반 보드, 승인/거부, 수동 트리거
"""
import json
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
BASE_DIR = Path(__file__).parent.parent.parent
DATA_DIR = BASE_DIR / "data"
router = APIRouter()
class WriteRequest(BaseModel):
topic: str = ""
def _read_folder_cards(folder: Path, status: str) -> list:
"""폴더에서 JSON 파일을 읽어 칸반 카드 목록 반환"""
cards = []
if not folder.exists():
return cards
for f in sorted(folder.glob("*.json"), key=lambda x: x.stat().st_mtime, reverse=True):
try:
data = json.loads(f.read_text(encoding="utf-8"))
cards.append({
"id": f.stem,
"file": str(f),
"title": data.get("title", f.stem),
"corner": data.get("corner", ""),
"source": data.get("source", ""),
"quality_score": data.get("quality_score", data.get("score", 0)),
"created_at": data.get("created_at", data.get("collected_at", "")),
"status": status,
"summary": data.get("summary", data.get("body", "")[:200] if data.get("body") else ""),
})
except Exception:
pass
return cards
@router.get("/content")
async def get_content():
"""칸반 4열 데이터 반환"""
queue = _read_folder_cards(DATA_DIR / "topics", "queue")
queue += _read_folder_cards(DATA_DIR / "collected", "queue")
writing = _read_folder_cards(DATA_DIR / "drafts", "writing")
review = _read_folder_cards(DATA_DIR / "pending_review", "review")
published = _read_folder_cards(DATA_DIR / "published", "published")
return {
"columns": {
"queue": {"label": "글감큐", "cards": queue},
"writing": {"label": "작성중", "cards": writing},
"review": {"label": "검수대기", "cards": review},
"published": {"label": "발행완료", "cards": published[:20]}, # 최근 20개만
}
}
@router.post("/content/{item_id}/approve")
async def approve_content(item_id: str):
"""검수 승인 — pending_review → published로 이동"""
src = DATA_DIR / "pending_review" / f"{item_id}.json"
if not src.exists():
raise HTTPException(status_code=404, detail="파일을 찾을 수 없습니다.")
try:
data = json.loads(src.read_text(encoding="utf-8"))
data["approved_at"] = datetime.now().isoformat()
data["status"] = "approved"
dst = DATA_DIR / "published" / f"{item_id}.json"
dst.parent.mkdir(parents=True, exist_ok=True)
dst.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
src.unlink(missing_ok=True)
return {"success": True, "message": f"{item_id} 승인 완료"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/content/{item_id}/reject")
async def reject_content(item_id: str):
"""검수 거부 — pending_review → discarded로 이동"""
src = DATA_DIR / "pending_review" / f"{item_id}.json"
if not src.exists():
raise HTTPException(status_code=404, detail="파일을 찾을 수 없습니다.")
try:
data = json.loads(src.read_text(encoding="utf-8"))
data["rejected_at"] = datetime.now().isoformat()
data["status"] = "rejected"
dst = DATA_DIR / "discarded" / f"{item_id}.json"
dst.parent.mkdir(parents=True, exist_ok=True)
dst.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
src.unlink(missing_ok=True)
return {"success": True, "message": f"{item_id} 거부 완료"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/manual-write")
async def manual_write(req: WriteRequest):
"""collector_bot + writer_bot 수동 트리거"""
python = sys.executable
bots_dir = BASE_DIR / "bots"
results = []
# collector_bot 실행
collector = bots_dir / "collector_bot.py"
if collector.exists():
try:
result = subprocess.run(
[python, str(collector)],
capture_output=True,
text=True,
timeout=120,
cwd=str(BASE_DIR),
encoding="utf-8",
)
results.append({
"step": "collector",
"success": result.returncode == 0,
"output": result.stdout[-500:] if result.stdout else "",
"error": result.stderr[-300:] if result.stderr else "",
})
except subprocess.TimeoutExpired:
results.append({"step": "collector", "success": False, "error": "타임아웃"})
except Exception as e:
results.append({"step": "collector", "success": False, "error": str(e)})
else:
results.append({"step": "collector", "success": False, "error": "파일 없음"})
# writer_bot 실행
writer = bots_dir / "writer_bot.py"
if writer.exists():
try:
result = subprocess.run(
[python, str(writer)],
capture_output=True,
text=True,
timeout=300,
cwd=str(BASE_DIR),
encoding="utf-8",
)
results.append({
"step": "writer",
"success": result.returncode == 0,
"output": result.stdout[-500:] if result.stdout else "",
"error": result.stderr[-300:] if result.stderr else "",
})
except subprocess.TimeoutExpired:
results.append({"step": "writer", "success": False, "error": "타임아웃"})
except Exception as e:
results.append({"step": "writer", "success": False, "error": str(e)})
else:
results.append({"step": "writer", "success": False, "error": "파일 없음"})
return {"results": results}
+133
View File
@@ -0,0 +1,133 @@
"""
dashboard/backend/api_cost.py
Settings > 비용관리 탭 API — 구독 정보, API 사용량
"""
import json
import re
from datetime import date, datetime
from pathlib import Path
from fastapi import APIRouter
BASE_DIR = Path(__file__).parent.parent.parent
CONFIG_PATH = BASE_DIR / "config" / "engine.json"
LOGS_DIR = BASE_DIR / "logs"
router = APIRouter()
SUBSCRIPTION_PLANS = [
{
"id": "claude_pro",
"name": "Claude Pro",
"provider": "Anthropic",
"monthly_cost_usd": 20.0,
"env_key": "ANTHROPIC_API_KEY",
"renewal_day": 1, # 매월 1일 갱신
},
{
"id": "openai_plus",
"name": "OpenAI API",
"provider": "OpenAI",
"monthly_cost_usd": 0.0, # 종량제
"env_key": "OPENAI_API_KEY",
"renewal_day": None,
},
{
"id": "gemini_api",
"name": "Google Gemini API",
"provider": "Google",
"monthly_cost_usd": 0.0, # 무료 티어 + 종량제
"env_key": "GEMINI_API_KEY",
"renewal_day": None,
},
{
"id": "elevenlabs",
"name": "ElevenLabs Starter",
"provider": "ElevenLabs",
"monthly_cost_usd": 5.0,
"env_key": "ELEVENLABS_API_KEY",
"renewal_day": 1,
},
]
def _days_until_renewal(renewal_day):
if renewal_day is None:
return None
today = date.today()
next_renewal = date(today.year, today.month, renewal_day)
if next_renewal <= today:
# 다음 달
if today.month == 12:
next_renewal = date(today.year + 1, 1, renewal_day)
else:
next_renewal = date(today.year, today.month + 1, renewal_day)
return (next_renewal - today).days
def _parse_api_usage() -> list:
"""logs/*.log에서 API 사용량 파싱"""
usage_map: dict = {}
patterns = {
"claude": re.compile(r"claude.*?(\d+)\s*토큰|tokens[:\s]+(\d+)", re.IGNORECASE),
"openai": re.compile(r"openai.*?(\d+)\s*토큰|gpt.*?tokens[:\s]+(\d+)", re.IGNORECASE),
"gemini": re.compile(r"gemini.*?(\d+)\s*토큰", re.IGNORECASE),
}
if not LOGS_DIR.exists():
return []
for log_file in LOGS_DIR.glob("*.log"):
try:
content = log_file.read_text(encoding="utf-8", errors="ignore")
for provider, pattern in patterns.items():
matches = pattern.findall(content)
tokens = sum(int(m[0] or m[1] or 0) for m in matches if any(m))
if tokens:
usage_map[provider] = usage_map.get(provider, 0) + tokens
except Exception:
pass
result = []
for provider, tokens in usage_map.items():
result.append({
"provider": provider,
"tokens": tokens,
"estimated_cost_usd": round(tokens / 1_000_000 * 3.0, 4), # 근사치
})
return result
@router.get("/cost/subscriptions")
async def get_subscriptions():
"""구독 정보 + 만료일 계산"""
import os
from dotenv import load_dotenv
load_dotenv()
subscriptions = []
for plan in SUBSCRIPTION_PLANS:
key_set = bool(os.getenv(plan["env_key"], ""))
days_left = _days_until_renewal(plan.get("renewal_day"))
subscriptions.append({
"id": plan["id"],
"name": plan["name"],
"provider": plan["provider"],
"monthly_cost_usd": plan["monthly_cost_usd"],
"active": key_set,
"renewal_day": plan.get("renewal_day"),
"days_until_renewal": days_left,
"alert": days_left is not None and days_left <= 5,
})
total_monthly = sum(p["monthly_cost_usd"] for p in subscriptions if p["active"])
return {
"subscriptions": subscriptions,
"total_monthly_usd": total_monthly,
}
@router.get("/cost/usage")
async def get_usage():
"""logs에서 API 사용량 파싱"""
return {"usage": _parse_api_usage()}
+109
View File
@@ -0,0 +1,109 @@
"""
dashboard/backend/api_logs.py
Logs 탭 API — 시스템 로그 파싱, 필터/검색
"""
import re
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Query
BASE_DIR = Path(__file__).parent.parent.parent
LOGS_DIR = BASE_DIR / "logs"
router = APIRouter()
LOG_MODULES = {
"": "전체",
"scheduler": "스케줄러",
"collector": "수집",
"writer": "글쓰기",
"converter": "변환",
"publisher": "발행",
"analytics": "분석",
"novel": "소설",
"engine_loader": "엔진",
"error": "에러만",
}
LOG_PATTERN = re.compile(
r"(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})[,\.]?\d*\s+"
r"\[?(\w+)\]?\s+(.*)"
)
def _parse_log_line(line: str, module: str) -> dict | None:
m = LOG_PATTERN.match(line.strip())
if not m:
return None
return {
"time": m.group(1),
"level": m.group(2).upper(),
"module": module,
"message": m.group(3)[:300],
}
def _read_logs(
filter_module: str = "",
search: str = "",
limit: int = 200,
) -> list:
logs = []
if not LOGS_DIR.exists():
return logs
# 로그 파일 목록 (최근 수정 순)
log_files = sorted(LOGS_DIR.glob("*.log"), key=lambda f: f.stat().st_mtime, reverse=True)
error_only = filter_module == "error"
for log_file in log_files:
module_name = log_file.stem # e.g. "scheduler", "collector"
# 모듈 필터
if filter_module and not error_only and module_name != filter_module:
continue
try:
lines = log_file.read_text(encoding="utf-8", errors="ignore").splitlines()
for line in reversed(lines):
if not line.strip():
continue
entry = _parse_log_line(line, module_name)
if entry is None:
continue
# 에러만 필터
if error_only and entry["level"] not in ("ERROR", "CRITICAL", "WARNING"):
continue
# 검색 필터
if search and search.lower() not in entry["message"].lower():
continue
logs.append(entry)
if len(logs) >= limit:
break
except Exception:
pass
if len(logs) >= limit:
break
return logs[:limit]
@router.get("/logs")
async def get_logs(
filter: str = Query(default="", description="모듈 필터 (scheduler/collector/writer/converter/publisher/error)"),
search: str = Query(default="", description="메시지 검색"),
limit: int = Query(default=200, ge=1, le=1000),
):
logs = _read_logs(filter_module=filter, search=search, limit=limit)
return {
"logs": logs,
"total": len(logs),
"modules": LOG_MODULES,
}
+170
View File
@@ -0,0 +1,170 @@
"""
dashboard/backend/api_novels.py
Novel 탭 API — 소설 목록, 새 소설 생성, 에피소드 생성
"""
import json
import sys
from datetime import datetime
from pathlib import Path
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
BASE_DIR = Path(__file__).parent.parent.parent
NOVELS_CONFIG_DIR = BASE_DIR / "config" / "novels"
NOVELS_DATA_DIR = BASE_DIR / "data" / "novels"
router = APIRouter()
class NewNovelRequest(BaseModel):
novel_id: str
title: str
title_ko: str
genre: str
setting: str
characters: str
base_story: str
publish_schedule: str = "매주 월/목 09:00"
episode_count_target: int = 50
@router.get("/novels")
async def get_novels():
"""config/novels/*.json 읽어 반환"""
NOVELS_CONFIG_DIR.mkdir(parents=True, exist_ok=True)
novels = []
for path in sorted(NOVELS_CONFIG_DIR.glob("*.json")):
try:
data = json.loads(path.read_text(encoding="utf-8"))
# 에피소드 수 계산
ep_dir = NOVELS_DATA_DIR / data.get("novel_id", path.stem) / "episodes"
ep_files = list(ep_dir.glob("ep*.json")) if ep_dir.exists() else []
ep_files = [
f for f in ep_files
if "_summary" not in f.name and "_blog" not in f.name
]
data["episode_files"] = len(ep_files)
# 에피소드 목록 로드
episodes = []
for ef in sorted(ep_files, key=lambda x: x.name)[-10:]: # 최근 10개
try:
ep_data = json.loads(ef.read_text(encoding="utf-8"))
episodes.append({
"episode_num": ep_data.get("episode_num", 0),
"title": ep_data.get("title", ""),
"generated_at": ep_data.get("generated_at", "")[:10],
"word_count": ep_data.get("word_count", 0),
})
except Exception:
pass
data["episodes"] = episodes
# 진행률
target = data.get("episode_count_target", 0)
current = data.get("current_episode", len(ep_files))
data["progress"] = round(current / target * 100) if target else 0
novels.append(data)
except Exception:
pass
return {"novels": novels}
@router.post("/novels")
async def create_novel(req: NewNovelRequest):
"""새 소설 config 생성"""
NOVELS_CONFIG_DIR.mkdir(parents=True, exist_ok=True)
config_path = NOVELS_CONFIG_DIR / f"{req.novel_id}.json"
if config_path.exists():
raise HTTPException(status_code=409, detail="이미 존재하는 소설 ID입니다.")
novel_config = {
"novel_id": req.novel_id,
"title": req.title,
"title_ko": req.title_ko,
"genre": req.genre,
"setting": req.setting,
"characters": req.characters,
"base_story": req.base_story,
"publish_schedule": req.publish_schedule,
"episode_count_target": req.episode_count_target,
"current_episode": 0,
"status": "active",
"created_at": datetime.now().isoformat(),
"episode_log": [],
}
config_path.write_text(
json.dumps(novel_config, ensure_ascii=False, indent=2),
encoding="utf-8"
)
# 데이터 디렉터리 생성
novel_data_dir = NOVELS_DATA_DIR / req.novel_id
for sub in ["episodes", "shorts", "images"]:
(novel_data_dir / sub).mkdir(parents=True, exist_ok=True)
return {"success": True, "novel_id": req.novel_id, "message": f"소설 '{req.title_ko}' 생성 완료"}
@router.post("/novels/{novel_id}/generate")
async def generate_episode(novel_id: str):
"""다음 에피소드 생성 — NovelManager.run_episode_pipeline() 호출"""
config_path = NOVELS_CONFIG_DIR / f"{novel_id}.json"
if not config_path.exists():
raise HTTPException(status_code=404, detail="소설을 찾을 수 없습니다.")
try:
sys.path.insert(0, str(BASE_DIR / "bots"))
sys.path.insert(0, str(BASE_DIR / "bots" / "novel"))
from bots.novel.novel_manager import NovelManager
manager = NovelManager()
ok = manager.run_episode_pipeline(novel_id, telegram_notify=False)
if ok:
status = manager.get_novel_status(novel_id)
return {
"success": True,
"episode_num": status.get("current_episode", 0),
"message": f"에피소드 생성 완료",
}
else:
raise HTTPException(status_code=500, detail="에피소드 생성 실패 — 로그 확인")
except ImportError as e:
raise HTTPException(status_code=500, detail=f"모듈 로드 실패: {e}")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/novels/{novel_id}/episodes")
async def get_episodes(novel_id: str):
"""소설 에피소드 전체 목록"""
ep_dir = NOVELS_DATA_DIR / novel_id / "episodes"
if not ep_dir.exists():
return {"episodes": []}
episodes = []
for ef in sorted(ep_dir.glob("ep*.json"), key=lambda x: x.name):
if "_summary" in ef.name or "_blog" in ef.name:
continue
try:
ep_data = json.loads(ef.read_text(encoding="utf-8"))
episodes.append({
"episode_num": ep_data.get("episode_num", 0),
"title": ep_data.get("title", ""),
"generated_at": ep_data.get("generated_at", "")[:10],
"word_count": ep_data.get("word_count", 0),
"published": ep_data.get("published", False),
})
except Exception:
pass
return {"episodes": episodes}
+218
View File
@@ -0,0 +1,218 @@
"""
dashboard/backend/api_overview.py
Overview 탭 API — KPI, 파이프라인 상태, 활동 로그
"""
import json
import re
from datetime import datetime, date
from pathlib import Path
from typing import List
from fastapi import APIRouter
BASE_DIR = Path(__file__).parent.parent.parent
DATA_DIR = BASE_DIR / "data"
LOGS_DIR = BASE_DIR / "logs"
CONFIG_DIR = BASE_DIR / "config"
router = APIRouter()
CORNER_LABELS = {
"easy_world": "쉬운세상",
"hidden_gem": "숨은보물",
"vibe": "바이브",
"fact_check": "팩트체크",
"deep_dive": "딥다이브",
"novel": "연재소설",
}
def _count_published_files() -> dict:
"""published 폴더에서 오늘/이번주/총 발행 수 카운트"""
published_dir = DATA_DIR / "published"
if not published_dir.exists():
return {"today": 0, "this_week": 0, "total": 0, "corners": {}}
today = date.today()
week_start = today.toordinal() - today.weekday()
today_count = 0
week_count = 0
total_count = 0
corner_counts: dict = {}
for f in published_dir.glob("*.json"):
total_count += 1
try:
data = json.loads(f.read_text(encoding="utf-8"))
published_at_str = data.get("published_at", "")
corner = data.get("corner", "기타")
corner_counts[corner] = corner_counts.get(corner, 0) + 1
if published_at_str:
try:
pub_date = datetime.fromisoformat(
published_at_str[:19]
).date()
if pub_date == today:
today_count += 1
if pub_date.toordinal() >= week_start:
week_count += 1
except Exception:
pass
except Exception:
pass
return {
"today": today_count,
"this_week": week_count,
"total": total_count,
"corners": corner_counts,
}
def _get_revenue() -> dict:
"""analytics 폴더에서 수익 데이터 읽기"""
analytics_dir = DATA_DIR / "analytics"
if not analytics_dir.exists():
return {"amount": 0.0, "currency": "USD", "status": "대기중"}
latest = None
for f in sorted(analytics_dir.glob("*.json"), reverse=True):
try:
data = json.loads(f.read_text(encoding="utf-8"))
if "revenue" in data:
latest = data["revenue"]
break
except Exception:
pass
if latest is None:
return {"amount": 0.0, "currency": "USD", "status": "대기중"}
return latest
def _parse_pipeline_status() -> List[dict]:
"""scheduler.log에서 파이프라인 단계별 상태 파싱"""
steps = [
{"id": "collector", "name": "수집", "status": "waiting", "done_at": ""},
{"id": "writer", "name": "글쓰기", "status": "waiting", "done_at": ""},
{"id": "converter", "name": "변환", "status": "waiting", "done_at": ""},
{"id": "publisher", "name": "발행", "status": "waiting", "done_at": ""},
{"id": "uploader", "name": "유튜브 업로드", "status": "waiting", "done_at": ""},
{"id": "analytics", "name": "분석", "status": "waiting", "done_at": ""},
]
log_file = LOGS_DIR / "scheduler.log"
if not log_file.exists():
return steps
try:
lines = log_file.read_text(encoding="utf-8", errors="ignore").splitlines()
today_str = date.today().strftime("%Y-%m-%d")
for line in lines:
if today_str not in line:
continue
low = line.lower()
for step in steps:
sid = step["id"]
if sid in low:
# 타임스탬프 파싱
m = re.match(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})", line)
ts = m.group(1)[11:16] if m else ""
if "완료" in line or "done" in low or "success" in low or "finish" in low:
step["status"] = "done"
step["done_at"] = ts
elif "시작" in line or "start" in low or "running" in low:
step["status"] = "running"
step["done_at"] = ts
elif "오류" in line or "error" in low or "fail" in low:
step["status"] = "error"
step["done_at"] = ts
except Exception:
pass
return steps
def _get_activity_logs() -> List[dict]:
"""logs/*.log에서 최근 20개 활동 로그 파싱"""
logs = []
log_files = sorted(LOGS_DIR.glob("*.log"), key=lambda f: f.stat().st_mtime, reverse=True)
for log_file in log_files[:5]: # 최근 5개 파일만
try:
lines = log_file.read_text(encoding="utf-8", errors="ignore").splitlines()
for line in reversed(lines):
if not line.strip():
continue
m = re.match(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}),?\d*\s+\[(\w+)\]\s+(.*)", line)
if m:
logs.append({
"time": m.group(1)[11:16],
"date": m.group(1)[:10],
"level": m.group(2),
"module": log_file.stem,
"message": m.group(3)[:120],
})
if len(logs) >= 20:
break
except Exception:
pass
if len(logs) >= 20:
break
return logs[:20]
def _get_corner_ratio(corner_counts: dict) -> List[dict]:
"""코너별 발행 비율 계산"""
total = sum(corner_counts.values()) or 1
result = []
for key, label in CORNER_LABELS.items():
count = corner_counts.get(key, corner_counts.get(label, 0))
result.append({
"name": label,
"count": count,
"ratio": round(count / total * 100),
})
# 정의되지 않은 코너 추가
known = set(CORNER_LABELS.keys()) | set(CORNER_LABELS.values())
for k, v in corner_counts.items():
if k not in known:
result.append({
"name": k,
"count": v,
"ratio": round(v / total * 100),
})
result.sort(key=lambda x: x["count"], reverse=True)
return result
@router.get("/overview")
async def get_overview():
counts = _count_published_files()
revenue = _get_revenue()
return {
"kpi": {
"today": counts["today"],
"this_week": counts["this_week"],
"total": counts["total"],
"revenue": revenue,
},
"corner_ratio": _get_corner_ratio(counts["corners"]),
}
@router.get("/pipeline")
async def get_pipeline():
return {"steps": _parse_pipeline_status()}
@router.get("/activity")
async def get_activity():
return {"logs": _get_activity_logs()}
+43
View File
@@ -0,0 +1,43 @@
"""
dashboard/backend/api_settings.py
Settings 탭 API — engine.json 읽기/쓰기
"""
import json
from pathlib import Path
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
BASE_DIR = Path(__file__).parent.parent.parent
CONFIG_PATH = BASE_DIR / "config" / "engine.json"
router = APIRouter()
class SettingsUpdate(BaseModel):
data: dict
@router.get("/settings")
async def get_settings():
"""config/engine.json 반환"""
if not CONFIG_PATH.exists():
return {}
try:
return json.loads(CONFIG_PATH.read_text(encoding="utf-8"))
except Exception as e:
raise HTTPException(status_code=500, detail=f"설정 파일 읽기 실패: {e}")
@router.put("/settings")
async def update_settings(req: SettingsUpdate):
"""config/engine.json 저장"""
try:
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
CONFIG_PATH.write_text(
json.dumps(req.data, ensure_ascii=False, indent=2),
encoding="utf-8"
)
return {"success": True, "message": "설정 저장 완료"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"설정 저장 실패: {e}")
+113
View File
@@ -0,0 +1,113 @@
"""
dashboard/backend/api_tools.py
Settings > 생성도구 선택 탭 API
"""
import json
from pathlib import Path
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
BASE_DIR = Path(__file__).parent.parent.parent
CONFIG_PATH = BASE_DIR / "config" / "engine.json"
router = APIRouter()
TOOL_CATEGORIES = {
"writing": {
"label": "글쓰기",
"options": ["claude", "gemini", "openclaw"],
"option_labels": {
"claude": "Claude (Anthropic)",
"gemini": "Google Gemini",
"openclaw": "OpenClaw AI",
},
},
"image_generation": {
"label": "이미지 생성",
"options": ["dalle", "external"],
"option_labels": {
"dalle": "DALL-E 3 (OpenAI)",
"external": "수동 제공",
},
},
"tts": {
"label": "TTS (음성합성)",
"options": ["google_cloud", "openai", "elevenlabs", "gtts"],
"option_labels": {
"google_cloud": "Google Cloud TTS",
"openai": "OpenAI TTS (tts-1-hd)",
"elevenlabs": "ElevenLabs",
"gtts": "gTTS (무료)",
},
},
"video_generation": {
"label": "영상 생성",
"options": ["ffmpeg_slides", "seedance", "runway", "sora", "veo"],
"option_labels": {
"ffmpeg_slides": "FFmpeg 슬라이드 (로컬)",
"seedance": "Seedance 2.0",
"runway": "Runway Gen-3",
"sora": "OpenAI Sora",
"veo": "Google Veo",
},
},
}
class ToolUpdate(BaseModel):
tools: dict # {"writing": "claude", "tts": "gtts", ...}
def _load_config() -> dict:
if not CONFIG_PATH.exists():
return {}
try:
return json.loads(CONFIG_PATH.read_text(encoding="utf-8"))
except Exception:
return {}
@router.get("/tools")
async def get_tools():
"""현재 선택된 도구 + 선택 가능 목록 반환"""
config = _load_config()
result = {}
for category, meta in TOOL_CATEGORIES.items():
current = config.get(category, {}).get("provider", meta["options"][0])
result[category] = {
"label": meta["label"],
"current": current,
"options": [
{
"value": opt,
"label": meta["option_labels"].get(opt, opt),
}
for opt in meta["options"]
],
}
return {"tools": result}
@router.put("/tools")
async def update_tools(req: ToolUpdate):
"""engine.json 도구 섹션 업데이트"""
config = _load_config()
for category, provider in req.tools.items():
if category in TOOL_CATEGORIES:
if category not in config:
config[category] = {}
config[category]["provider"] = provider
try:
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
CONFIG_PATH.write_text(
json.dumps(config, ensure_ascii=False, indent=2),
encoding="utf-8"
)
return {"success": True, "message": "도구 설정 저장 완료"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"저장 실패: {e}")
+78
View File
@@ -0,0 +1,78 @@
"""
dashboard/backend/server.py
미디어 엔진 컨트롤 패널 — FastAPI 메인 서버
실행: uvicorn dashboard.backend.server:app --port 8080
또는: python -m uvicorn dashboard.backend.server:app --port 8080 --reload
"""
import os
from pathlib import Path
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from dashboard.backend import (
api_overview,
api_content,
api_analytics,
api_novels,
api_settings,
api_connections,
api_tools,
api_cost,
api_logs,
)
app = FastAPI(title="The 4th Path — Control Panel", version="1.0.0")
# ── CORS ──────────────────────────────────────────────────────────────────────
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:5173",
"http://localhost:8080",
"http://127.0.0.1:5173",
"http://127.0.0.1:8080",
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ── API 라우터 등록 ────────────────────────────────────────────────────────────
app.include_router(api_overview.router, prefix="/api")
app.include_router(api_content.router, prefix="/api")
app.include_router(api_analytics.router, prefix="/api")
app.include_router(api_novels.router, prefix="/api")
app.include_router(api_settings.router, prefix="/api")
app.include_router(api_connections.router, prefix="/api")
app.include_router(api_tools.router, prefix="/api")
app.include_router(api_cost.router, prefix="/api")
app.include_router(api_logs.router, prefix="/api")
@app.get("/api/health")
async def health():
return {"status": "ok", "service": "The 4th Path Control Panel"}
# ── 정적 파일 서빙 (프론트엔드 빌드 결과) — API 라우터보다 나중에 등록 ──────────
FRONTEND_DIST = Path(__file__).parent.parent / "frontend" / "dist"
if FRONTEND_DIST.exists():
assets_dir = FRONTEND_DIST / "assets"
if assets_dir.exists():
app.mount("/assets", StaticFiles(directory=str(assets_dir)), name="assets")
@app.get("/", include_in_schema=False)
@app.get("/{full_path:path}", include_in_schema=False)
async def serve_spa(full_path: str = ""):
# API 경로는 위 라우터가 처리 — 여기는 SPA 라우팅용
if full_path.startswith("api/"):
from fastapi.responses import JSONResponse
return JSONResponse({"detail": "Not Found"}, status_code=404)
index = FRONTEND_DIST / "index.html"
if index.exists():
return FileResponse(str(index))
return {"status": "frontend not built — run: npm run build"}