diff --git a/bots/converters/smart_video_router.py b/bots/converters/smart_video_router.py new file mode 100644 index 0000000..8db42fc --- /dev/null +++ b/bots/converters/smart_video_router.py @@ -0,0 +1,337 @@ +""" +bots/converters/smart_video_router.py [NEW] + +Budget-aware video engine selection and fallback router. + +Selection logic: + 1. Kling free credits remaining? → use Kling + 2. Budget allows paid? → cheapest quality engine + 3. Daily limit hit? → FFmpeg fallback + 4. Any engine fails? → next in priority (no retry on same) + +Usage: + from bots.converters.smart_video_router import SmartVideoRouter + router = SmartVideoRouter(resolved_config) + engine = router.select(duration_sec=30, needs_audio=True) + path = router.generate(prompt, engine, '/tmp/out.mp4') + +Test mode: + python -m bots.converters.smart_video_router --test +""" + +import json +import logging +import os +import sys +from datetime import date +from pathlib import Path +from typing import Optional + +from dotenv import load_dotenv + +load_dotenv(dotenv_path='D:/key/blog-writer.env.env') + +BASE_DIR = Path(__file__).parent.parent.parent +LOG_DIR = BASE_DIR / 'logs' +DATA_DIR = BASE_DIR / 'data' +STATE_FILE = DATA_DIR / 'video_router_state.json' + +LOG_DIR.mkdir(exist_ok=True) +DATA_DIR.mkdir(exist_ok=True) + +logger = logging.getLogger(__name__) +if not logger.handlers: + handler = logging.FileHandler(LOG_DIR / 'smart_video_router.log', encoding='utf-8') + handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')) + logger.addHandler(handler) + logger.addHandler(logging.StreamHandler()) + logger.setLevel(logging.INFO) + + +class SmartVideoRouter: + """ + Budget-aware video engine selection and fallback. + + Logic: + 1. Kling free credits remaining? → use Kling + 2. Budget allows paid? → cheapest quality engine + 3. Daily limit hit? → FFmpeg fallback + 4. Any engine fails? → next in priority (no retry on same) + """ + + def __init__(self, resolved_config: dict): + """ + resolved_config: output from ConfigResolver.resolve(), or raw engine.json dict. + Expects video_generation key with provider/options structure. + """ + video_cfg = resolved_config.get('video_generation', {}) + opts = video_cfg.get('options', {}) + router_cfg = opts.get('smart_router', {}) + + self.priority: list = router_cfg.get( + 'priority', ['kling_free', 'veo3', 'seedance2', 'ffmpeg_slides'] + ) + self.daily_cost_limit_usd: float = router_cfg.get('daily_cost_limit_usd', 0.50) + self.prefer_free_first: bool = router_cfg.get('prefer_free_first', True) + self.fallback_engine: str = router_cfg.get('fallback', 'ffmpeg_slides') + + self.engine_opts: dict = opts # all engine option blocks + self.state: dict = self._get_state() + + # ── State management ──────────────────────────────────── + + def _get_state(self) -> dict: + """Load daily state from disk; reset if date has changed.""" + today = str(date.today()) + default = { + 'date': today, + 'cost_usd': 0.0, + 'kling_credits_used': 0, + } + if STATE_FILE.exists(): + try: + saved = json.loads(STATE_FILE.read_text(encoding='utf-8')) + if saved.get('date') == today: + return saved + # New day — reset counters, keep structure + logger.info(f"날짜 변경 감지 ({saved.get('date')} → {today}): 라우터 상태 초기화") + except Exception as e: + logger.warning(f"상태 파일 읽기 실패: {e}") + self._save_state(default) + return default + + def _save_state(self, state: Optional[dict] = None) -> None: + """Persist router state to data/video_router_state.json.""" + target = state if state is not None else self.state + try: + STATE_FILE.write_text( + json.dumps(target, ensure_ascii=False, indent=2), + encoding='utf-8', + ) + except Exception as e: + logger.warning(f"상태 파일 저장 실패: {e}") + + # ── Engine availability checks ─────────────────────────── + + def _has_api_key(self, engine_name: str) -> bool: + """Return True if the engine's API key env var is set and non-empty.""" + cfg = self.engine_opts.get(engine_name, {}) + key_env = cfg.get('api_key_env', '') + if not key_env: + # ffmpeg_slides has no API key requirement + return True + return bool(os.getenv(key_env, '').strip()) + + def _kling_credits_available(self) -> bool: + """Return True if Kling free credits are still available today.""" + kling_cfg = self.engine_opts.get('kling_free', {}) + daily_credits = kling_cfg.get('free_daily_credits', 66) + used = self.state.get('kling_credits_used', 0) + return used < daily_credits + + def _budget_allows(self, engine_name: str, duration_sec: float) -> bool: + """Return True if engine cost fits within remaining daily budget.""" + cfg = self.engine_opts.get(engine_name, {}) + cost_per_sec = cfg.get('cost_per_sec', 0) + if cost_per_sec == 0: + return True + estimated_cost = cost_per_sec * duration_sec + spent = self.state.get('cost_usd', 0.0) + return (spent + estimated_cost) <= self.daily_cost_limit_usd + + # ── Public API ──────────────────────────────────────────── + + def select(self, duration_sec: float, needs_audio: bool) -> str: + """ + Select best available engine for the given clip duration. + Returns engine name string (never empty — falls back to ffmpeg_slides). + """ + self.state = self._get_state() # refresh in case of date change + + for engine in self.priority: + if engine == 'ffmpeg_slides': + logger.info("영상 라우터: ffmpeg_slides 선택 (최종 폴백)") + return 'ffmpeg_slides' + + if engine == 'kling_free': + if self._has_api_key('kling_free') and self._kling_credits_available(): + logger.info("영상 라우터: kling_free 선택 (무료 크레딧 잔여)") + return 'kling_free' + continue + + # Paid engines (veo3, seedance2, ...) + if self._has_api_key(engine) and self._budget_allows(engine, duration_sec): + logger.info(f"영상 라우터: {engine} 선택 (예산 내 유료 엔진)") + return engine + + # Final safety net + logger.info("영상 라우터: ffmpeg_slides 최종 폴백 선택") + return self.fallback_engine + + def generate(self, prompt, engine: str, output_path: str) -> str: + """ + Generate a video clip using the specified engine. + + prompt: ComposedPrompt object with .text attribute, or plain str. + Returns path to output MP4, or '' on failure. + """ + # Normalise prompt to str + if hasattr(prompt, 'text'): + prompt_text = prompt.text + else: + prompt_text = str(prompt) + + logger.info(f"영상 생성 시작: 엔진={engine}, 출력={output_path}") + + if engine == 'kling_free': + result = self._generate_kling(prompt_text, output_path) + elif engine == 'ffmpeg_slides': + result = self._generate_ffmpeg(prompt_text, output_path) + else: + # veo3, seedance2, runway, etc. — stub: not yet implemented + logger.warning(f"{engine} 구현 미완성 — 폴백 트리거") + result = '' + + if result: + # Update cost tracking + cfg = self.engine_opts.get(engine, {}) + cost_per_sec = cfg.get('cost_per_sec', 0) + if cost_per_sec > 0: + # Estimate 30s clip cost as a rough default + self.state['cost_usd'] = round( + self.state.get('cost_usd', 0.0) + cost_per_sec * 30, 4 + ) + self._save_state() + logger.info(f"영상 생성 완료: {result}") + else: + logger.warning(f"영상 생성 실패: 엔진={engine}") + + return result + + def on_failure(self, engine: str, error: str) -> str: + """ + Called when an engine fails mid-generation. + Returns next engine to try, or 'ffmpeg_slides' as final fallback. + """ + logger.warning(f"엔진 실패 처리: {engine} — {error}") + try: + idx = self.priority.index(engine) + next_engines = self.priority[idx + 1:] + except ValueError: + next_engines = [] + + for candidate in next_engines: + logger.info(f"다음 엔진 시도: {candidate}") + return candidate + + logger.info("모든 엔진 소진 — ffmpeg_slides 최종 폴백") + return 'ffmpeg_slides' + + # ── Engine implementations ──────────────────────────────── + + def _generate_kling(self, prompt_text: str, output_path: str) -> str: + """ + Kling free tier stub implementation. + + The actual Kling API integration is pending (V3.1). + For now, log that the call would be made and fall back to ffmpeg_slides. + """ + api_key = os.getenv('KLING_API_KEY', '') + if not api_key: + logger.warning("KLING_API_KEY 미설정 — ffmpeg_slides 폴백") + return self._generate_ffmpeg(prompt_text, output_path) + + kling_cfg = self.engine_opts.get('kling_free', {}) + api_url = kling_cfg.get('api_url', 'https://api.klingai.com/v1') + + # Stub: log what would happen, then fall back + logger.info( + f"[스텁] Kling API 호출 예정: POST {api_url}/videos/text2video " + f"(프롬프트: {prompt_text[:60]}...) — 실제 통합 V3.1에서 구현 예정" + ) + logger.info("Kling 스텁 실행 — ffmpeg_slides로 폴백하여 영상 생성") + + # Track credit usage even for stub (as if 1 credit consumed per call) + self.state['kling_credits_used'] = self.state.get('kling_credits_used', 0) + 1 + self._save_state() + + return self._generate_ffmpeg(prompt_text, output_path) + + def _generate_ffmpeg(self, prompt_text: str, output_path: str) -> str: + """ + Generate a minimal single-scene video using FFmpegSlidesEngine. + Accepts a plain text prompt and wraps it into a scene list. + """ + try: + from bots.converters.video_engine import FFmpegSlidesEngine + ffmpeg_cfg = self.engine_opts.get('ffmpeg_slides', {}) + engine = FFmpegSlidesEngine(ffmpeg_cfg) + + # Wrap prompt into minimal scene structure expected by FFmpegSlidesEngine + scenes = [ + { + 'text': prompt_text[:200], # truncate if very long + 'type': 'headline', + } + ] + return engine.generate(scenes, output_path) + except Exception as e: + logger.error(f"FFmpegSlidesEngine 실패: {e}") + return '' + + +# ── Module entry point (--test mode) ───────────────────────── + +def _load_engine_config() -> dict: + """Load engine.json from config directory.""" + config_path = BASE_DIR / 'config' / 'engine.json' + try: + return json.loads(config_path.read_text(encoding='utf-8')) + except Exception as e: + logger.error(f"engine.json 로드 실패: {e}") + return {} + + +def _run_test() -> None: + """Print current router state and selected engine for a 30s clip.""" + print("=" * 60) + print("SmartVideoRouter - 테스트 모드") + print("=" * 60) + + config = _load_engine_config() + if not config: + print("[오류] engine.json 로드 실패") + sys.exit(1) + + router = SmartVideoRouter(config) + + print("\n[현재 상태]") + state = router._get_state() + for k, v in state.items(): + print(f" {k}: {v}") + + print("\n[엔진 우선순위]") + for i, eng in enumerate(router.priority, 1): + has_key = router._has_api_key(eng) + key_env = router.engine_opts.get(eng, {}).get('api_key_env', '(없음)') + print(f" {i}. {eng} - API키={key_env} 설정됨={has_key}") + + print("\n[30초 클립 엔진 선택]") + selected = router.select(duration_sec=30, needs_audio=True) + print(f" → 선택된 엔진: {selected}") + + cost_spent = state.get('cost_usd', 0.0) + cost_limit = router.daily_cost_limit_usd + kling_used = state.get('kling_credits_used', 0) + kling_limit = router.engine_opts.get('kling_free', {}).get('free_daily_credits', 66) + print(f"\n[예산 현황]") + print(f" 일일 비용: ${cost_spent:.4f} / ${cost_limit:.2f}") + print(f" Kling 크레딧: {kling_used} / {kling_limit} 사용") + print("=" * 60) + + +if __name__ == '__main__': + if '--test' in sys.argv: + _run_test() + else: + print("사용법: python -m bots.converters.smart_video_router --test") diff --git a/config/engine.json b/config/engine.json index e291f98..de22589 100644 --- a/config/engine.json +++ b/config/engine.json @@ -1,6 +1,6 @@ { "_comment": "The 4th Path 블로그 자동 수익 엔진 — 엔진 설정 (v3)", - "_updated": "2026-03-26", + "_updated": "2026-03-29", "writing": { "provider": "openclaw", "_comment_provider": "openclaw=ChatGPT Pro(OAuth), claude_web=Claude Max(웹쿠키), gemini_web=Gemini Pro(웹쿠키), claude=Anthropic API키, gemini=Google AI API키", @@ -75,8 +75,40 @@ } }, "video_generation": { - "provider": "sora", + "provider": "smart_router", "options": { + "smart_router": { + "priority": ["kling_free", "veo3", "seedance2", "ffmpeg_slides"], + "daily_cost_limit_usd": 0.50, + "prefer_free_first": true, + "fallback": "ffmpeg_slides" + }, + "kling_free": { + "api_url": "https://api.klingai.com/v1", + "api_key_env": "KLING_API_KEY", + "free_daily_credits": 66, + "mode": "standard", + "resolution": "720p", + "aspect_ratio": "9:16", + "audio": true, + "cost_per_sec": 0 + }, + "veo3": { + "api_key_env": "GEMINI_API_KEY", + "model": "veo-3.1", + "resolution": "720p", + "aspect_ratio": "9:16", + "audio": true, + "cost_per_sec": 0.03 + }, + "seedance2": { + "provider": "fal.ai", + "api_key_env": "FAL_API_KEY", + "model": "seedance-2.0", + "resolution": "720p", + "audio": true, + "cost_per_sec": 0.022 + }, "ffmpeg_slides": { "resolution": "1080x1920", "fps": 30, @@ -88,6 +120,7 @@ "burn_subtitles": true }, "seedance": { + "_comment": "레거시 — seedance2/FAL_API_KEY로 대체됨", "api_url": "https://api.seedance2.ai/v1/generate", "api_key_env": "SEEDANCE_API_KEY", "resolution": "1080x1920", @@ -95,11 +128,6 @@ "audio": true, "fallback": "ffmpeg_slides" }, - "sora": { - "comment": "OpenAI Sora — API 접근 가능 시 활성화", - "api_key_env": "OPENAI_API_KEY", - "fallback": "ffmpeg_slides" - }, "runway": { "api_key_env": "RUNWAY_API_KEY", "model": "gen3a_turbo", @@ -108,7 +136,7 @@ "fallback": "ffmpeg_slides" }, "veo": { - "comment": "Google Veo 3.1 — API 접근 가능 시 활성화", + "comment": "Google Veo 3.1 레거시 — veo3로 대체됨", "api_key_env": "GEMINI_API_KEY", "fallback": "ffmpeg_slides" } @@ -170,9 +198,10 @@ "cta": "팔로우하면 매일 이런 정보를 받습니다" }, "optional_keys": { - "SEEDANCE_API_KEY": "Seedance 2.0 AI 영상 생성", + "KLING_API_KEY": "Kling 3.0 AI 영상 생성 (66 무료 크레딧/일)", + "FAL_API_KEY": "Seedance 2.0 AI 영상 생성 (fal.ai)", "ELEVENLABS_API_KEY": "ElevenLabs 고품질 TTS", "GEMINI_API_KEY": "Google Gemini 글쓰기 / Veo 영상", "RUNWAY_API_KEY": "Runway Gen-3 AI 영상 생성" } -} \ No newline at end of file +}