diff --git a/.env.example b/.env.example
index b0396e3..91be9cf 100644
--- a/.env.example
+++ b/.env.example
@@ -75,3 +75,17 @@ TIKTOK_OPEN_ID=
# YouTube Data API v3 (기존 Google Cloud 프로젝트에서 API 추가 활성화)
# YouTube Studio > 채널 > 고급 설정에서 채널 ID 확인
YOUTUBE_CHANNEL_ID=
+
+# ─── v3 엔진 추상화 (선택) ────────────────────────────
+# Seedance 2.0 — AI 시네마틱 영상 생성 (소설 쇼츠 권장)
+# https://seedance2.ai/
+SEEDANCE_API_KEY=
+# ElevenLabs — 고품질 한국어 TTS
+# https://elevenlabs.io/
+ELEVENLABS_API_KEY=
+# Google Gemini — 글쓰기 대체 / Veo 영상
+# https://aistudio.google.com/
+GEMINI_API_KEY=
+# Runway Gen-3 — AI 영상 생성
+# https://runwayml.com/
+RUNWAY_API_KEY=
diff --git a/bots/collector_bot.py b/bots/collector_bot.py
index b5252ef..009c83f 100644
--- a/bots/collector_bot.py
+++ b/bots/collector_bot.py
@@ -97,10 +97,24 @@ def calc_freshness_score(published_at: datetime | None, max_score: int = 20) ->
def calc_korean_relevance(text: str, rules: dict) -> int:
"""한국 독자 관련성 점수"""
+ max_score = rules['scoring']['korean_relevance']['max']
keywords = rules['scoring']['korean_relevance']['keywords']
+
+ # 한국어 문자(가-힣) 비율 체크 — 한국어 콘텐츠 자체에 기본점수 부여
+ korean_chars = sum(1 for c in text if '\uac00' <= c <= '\ud7a3')
+ korean_ratio = korean_chars / max(len(text), 1)
+ if korean_ratio >= 0.15:
+ base = 15 # 한국어 텍스트면 기본 15점
+ elif korean_ratio >= 0.05:
+ base = 8
+ else:
+ base = 0
+
+ # 브랜드/지역 키워드 보너스
matched = sum(1 for kw in keywords if kw in text)
- score = min(matched * 6, rules['scoring']['korean_relevance']['max'])
- return score
+ bonus = min(matched * 5, max_score - base)
+
+ return min(base + bonus, max_score)
def calc_source_trust(source_url: str, rules: dict) -> tuple[int, str]:
@@ -215,9 +229,14 @@ def calculate_quality_score(item: dict, rules: dict) -> int:
kr_score = calc_korean_relevance(text, rules)
fresh_score = calc_freshness_score(pub_at)
- # search_demand: pytrends 연동 후 실제값 사용 (현재 기본값 10)
- search_score = item.get('search_demand_score', 10)
- trust_score, trust_level = calc_source_trust(source_url, rules)
+ # search_demand: pytrends 연동 후 실제값 사용 (RSS 기본값 12)
+ search_score = item.get('search_demand_score', 12)
+ # 신뢰도: _trust_override 이미 설정된 경우 우선 사용
+ if '_trust_score' in item:
+ trust_score = item['_trust_score']
+ trust_level = item.get('source_trust_level', 'medium')
+ else:
+ trust_score, trust_level = calc_source_trust(source_url, rules)
mono_score = calc_monetization(text, rules)
item['korean_relevance_score'] = kr_score
diff --git a/bots/converters/video_engine.py b/bots/converters/video_engine.py
new file mode 100644
index 0000000..87623b4
--- /dev/null
+++ b/bots/converters/video_engine.py
@@ -0,0 +1,783 @@
+"""
+비디오 엔진 추상화 (bots/converters/video_engine.py)
+역할: engine.json video_generation 설정에 따라 적절한 영상 생성 엔진 인스턴스 반환
+설계서: blog-engine-final-masterplan-v3.txt
+
+지원 엔진:
+ - FFmpegSlidesEngine: 기존 shorts_converter.py 파이프라인 (슬라이드 + TTS + ffmpeg)
+ - SeedanceEngine: Seedance 2.0 API (AI 영상 생성)
+ - SoraEngine: OpenAI Sora (미지원 → ffmpeg_slides 폴백)
+ - RunwayEngine: Runway Gen-3 API
+ - VeoEngine: Google Veo 3.1 (미지원 → ffmpeg_slides 폴백)
+"""
+import json
+import logging
+import os
+import shutil
+import subprocess
+import tempfile
+from abc import ABC, abstractmethod
+from datetime import datetime
+from pathlib import Path
+from typing import Optional
+
+from dotenv import load_dotenv
+
+load_dotenv()
+
+BASE_DIR = Path(__file__).parent.parent.parent
+LOG_DIR = BASE_DIR / 'logs'
+OUTPUT_DIR = BASE_DIR / 'data' / 'outputs'
+ASSETS_DIR = BASE_DIR / 'assets'
+BGM_PATH = ASSETS_DIR / 'bgm.mp3'
+
+LOG_DIR.mkdir(exist_ok=True)
+OUTPUT_DIR.mkdir(exist_ok=True)
+
+logger = logging.getLogger(__name__)
+if not logger.handlers:
+ handler = logging.FileHandler(LOG_DIR / 'video_engine.log', encoding='utf-8')
+ handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))
+ logger.addHandler(handler)
+ logger.addHandler(logging.StreamHandler())
+ logger.setLevel(logging.INFO)
+
+
+# ─── 추상 기본 클래스 ──────────────────────────────────
+
+class VideoEngine(ABC):
+ @abstractmethod
+ def generate(self, scenes: list, output_path: str, **kwargs) -> str:
+ """
+ scenes로 영상 생성.
+
+ scenes 형식:
+ [
+ {
+ "text": str, # 자막/TTS 텍스트
+ "type": str, # "intro"|"headline"|"point"|"data"|"outro"
+ "image_prompt": str, # DALL-E 배경 프롬프트 (선택)
+ "slide_path": str, # 슬라이드 PNG 경로 (있으면 사용)
+ "audio_path": str, # TTS WAV 경로 (있으면 사용)
+ }
+ ]
+
+ Returns: 생성된 MP4 파일 경로 (실패 시 빈 문자열)
+ """
+
+
+# ─── FFmpegSlidesEngine ────────────────────────────────
+
+class FFmpegSlidesEngine(VideoEngine):
+ """
+ 기존 shorts_converter.py의 ffmpeg 파이프라인을 재사용하는 엔진.
+ scenes에 slide_path + audio_path가 있으면 그대로 사용,
+ 없으면 빈 슬라이드와 gTTS로 생성 후 진행.
+ """
+
+ def __init__(self, cfg: dict):
+ self.cfg = cfg
+ self.ffmpeg_path = os.getenv('FFMPEG_PATH', 'ffmpeg')
+ self.ffprobe_path = os.getenv('FFPROBE_PATH', 'ffprobe')
+ self.resolution = cfg.get('resolution', '1080x1920')
+ self.fps = cfg.get('fps', 30)
+ self.transition = cfg.get('transition', 'fade')
+ self.trans_dur = cfg.get('transition_duration', 0.5)
+ self.bgm_volume = cfg.get('bgm_volume', 0.08)
+ self.burn_subs = cfg.get('burn_subtitles', True)
+
+ def _check_ffmpeg(self) -> bool:
+ try:
+ r = subprocess.run(
+ [self.ffmpeg_path, '-version'],
+ capture_output=True, timeout=5,
+ )
+ return r.returncode == 0
+ except Exception:
+ return False
+
+ def _run_ffmpeg(self, args: list, quiet: bool = True) -> bool:
+ cmd = [self.ffmpeg_path, '-y']
+ if quiet:
+ cmd += ['-loglevel', 'error']
+ cmd += args
+ result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
+ if result.returncode != 0:
+ logger.error(f"ffmpeg 오류: {result.stderr[-400:]}")
+ return result.returncode == 0
+
+ def _get_audio_duration(self, wav_path: str) -> float:
+ try:
+ result = subprocess.run(
+ [self.ffprobe_path, '-v', 'quiet', '-print_format', 'json',
+ '-show_format', wav_path],
+ capture_output=True, text=True, timeout=10,
+ )
+ data = json.loads(result.stdout)
+ return float(data['format']['duration'])
+ except Exception:
+ return 5.0
+
+ def _make_silent_wav(self, output_path: str, duration: float = 2.0) -> bool:
+ return self._run_ffmpeg([
+ '-f', 'lavfi', '-i', f'anullsrc=r=24000:cl=mono',
+ '-t', str(duration), output_path,
+ ])
+
+ def _make_blank_slide(self, output_path: str) -> bool:
+ """단색(어두운) 빈 슬라이드 PNG 생성"""
+ try:
+ from PIL import Image, ImageDraw
+ img = Image.new('RGB', (1080, 1920), (10, 10, 13))
+ draw = ImageDraw.Draw(img)
+ draw.rectangle([60, 950, 1020, 954], fill=(200, 168, 78))
+ img.save(output_path)
+ return True
+ except ImportError:
+ # Pillow 없으면 ffmpeg lavfi로 단색 이미지 생성
+ return self._run_ffmpeg([
+ '-f', 'lavfi', '-i', 'color=c=black:s=1080x1920:r=1',
+ '-frames:v', '1', output_path,
+ ])
+
+ def _tts_gtts(self, text: str, output_path: str) -> bool:
+ try:
+ from gtts import gTTS
+ mp3_path = str(output_path).replace('.wav', '_tmp.mp3')
+ tts = gTTS(text=text, lang='ko', slow=False)
+ tts.save(mp3_path)
+ ok = self._run_ffmpeg(['-i', mp3_path, '-ar', '24000', output_path])
+ Path(mp3_path).unlink(missing_ok=True)
+ return ok and Path(output_path).exists()
+ except Exception as e:
+ logger.warning(f"gTTS 실패: {e}")
+ return False
+
+ def _make_clip(self, slide_png: str, audio_wav: str, output_mp4: str) -> float:
+ """슬라이드 PNG + 오디오 WAV → MP4 클립 (Ken Burns zoompan). 클립 길이(초) 반환."""
+ duration = self._get_audio_duration(audio_wav) + 0.3
+ ok = self._run_ffmpeg([
+ '-loop', '1', '-i', slide_png,
+ '-i', audio_wav,
+ '-c:v', 'libx264', '-tune', 'stillimage',
+ '-c:a', 'aac', '-b:a', '192k',
+ '-pix_fmt', 'yuv420p',
+ '-vf', (
+ 'scale=1080:1920,'
+ 'zoompan=z=\'min(zoom+0.0003,1.05)\':'
+ 'x=\'iw/2-(iw/zoom/2)\':'
+ 'y=\'ih/2-(ih/zoom/2)\':'
+ 'd=1:s=1080x1920:fps=30'
+ ),
+ '-shortest',
+ '-r', '30',
+ output_mp4,
+ ])
+ return duration if ok else 0.0
+
+ def _concat_clips_xfade(self, clips: list, output_mp4: str) -> bool:
+ """여러 클립을 xfade 전환으로 결합"""
+ if len(clips) == 1:
+ shutil.copy2(clips[0]['mp4'], output_mp4)
+ return True
+
+ n = len(clips)
+ inputs = []
+ for c in clips:
+ inputs += ['-i', c['mp4']]
+
+ filter_parts = []
+ prev_v = '[0:v]'
+ prev_a = '[0:a]'
+ for i in range(1, n):
+ offset = sum(c['duration'] for c in clips[:i]) - self.trans_dur * i
+ out_v = f'[f{i}v]' if i < n - 1 else '[video]'
+ out_a = f'[f{i}a]' if i < n - 1 else '[audio]'
+ filter_parts.append(
+ f'{prev_v}[{i}:v]xfade=transition={self.transition}:'
+ f'duration={self.trans_dur}:offset={offset:.3f}{out_v}'
+ )
+ filter_parts.append(
+ f'{prev_a}[{i}:a]acrossfade=d={self.trans_dur}{out_a}'
+ )
+ prev_v = out_v
+ prev_a = out_a
+
+ return self._run_ffmpeg(
+ inputs + [
+ '-filter_complex', '; '.join(filter_parts),
+ '-map', '[video]', '-map', '[audio]',
+ '-c:v', 'libx264', '-c:a', 'aac',
+ '-pix_fmt', 'yuv420p',
+ output_mp4,
+ ]
+ )
+
+ def _mix_bgm(self, video_mp4: str, output_mp4: str) -> bool:
+ if not BGM_PATH.exists():
+ logger.warning(f"BGM 파일 없음 ({BGM_PATH}) — BGM 없이 진행")
+ shutil.copy2(video_mp4, output_mp4)
+ return True
+ return self._run_ffmpeg([
+ '-i', video_mp4,
+ '-i', str(BGM_PATH),
+ '-filter_complex',
+ f'[1:a]volume={self.bgm_volume}[bgm];[0:a][bgm]amix=inputs=2:duration=first[a]',
+ '-map', '0:v', '-map', '[a]',
+ '-c:v', 'copy', '-c:a', 'aac',
+ '-shortest',
+ output_mp4,
+ ])
+
+ def _burn_subtitles(self, video_mp4: str, srt_path: str, output_mp4: str) -> bool:
+ font_name = 'NanumGothic'
+ fonts_dir = ASSETS_DIR / 'fonts'
+ for fname in ['NotoSansKR-Regular.ttf', 'malgun.ttf']:
+ fp = fonts_dir / fname
+ if not fp.exists():
+ fp = Path(f'C:/Windows/Fonts/{fname}')
+ if fp.exists():
+ font_name = fp.stem
+ break
+ style = (
+ f'FontName={font_name},'
+ 'FontSize=22,'
+ 'PrimaryColour=&H00FFFFFF,'
+ 'OutlineColour=&H80000000,'
+ 'BorderStyle=4,'
+ 'BackColour=&H80000000,'
+ 'Outline=0,Shadow=0,'
+ 'MarginV=120,'
+ 'Alignment=2,'
+ 'Bold=1'
+ )
+ srt_esc = str(srt_path).replace('\\', '/').replace(':', '\\:')
+ return self._run_ffmpeg([
+ '-i', video_mp4,
+ '-vf', f'subtitles={srt_esc}:force_style=\'{style}\'',
+ '-c:v', 'libx264', '-c:a', 'copy',
+ output_mp4,
+ ])
+
+ def _build_srt(self, scenes: list, clips: list) -> str:
+ lines = []
+ t = 0.0
+ for i, (scene, clip) in enumerate(zip(scenes, clips), 1):
+ text = scene.get('text', '')
+ if not text:
+ t += clip['duration'] - self.trans_dur
+ continue
+ end = t + clip['duration']
+ mid = len(text) // 2
+ if len(text) > 30:
+ space = text.rfind(' ', 0, mid)
+ if space > 0:
+ text = text[:space] + '\n' + text[space + 1:]
+ lines += [
+ str(i),
+ f'{self._sec_to_srt(t)} --> {self._sec_to_srt(end)}',
+ text,
+ '',
+ ]
+ t += clip['duration'] - self.trans_dur
+ return '\n'.join(lines)
+
+ @staticmethod
+ def _sec_to_srt(s: float) -> str:
+ h, rem = divmod(int(s), 3600)
+ m, sec = divmod(rem, 60)
+ ms = int((s - int(s)) * 1000)
+ return f'{h:02d}:{m:02d}:{sec:02d},{ms:03d}'
+
+ def generate(self, scenes: list, output_path: str, **kwargs) -> str:
+ """
+ scenes 리스트로 쇼츠 MP4 생성.
+
+ kwargs:
+ article (dict): 원본 article 데이터 (슬라이드 합성에 사용)
+ tts_engine: BaseTTS 인스턴스 (없으면 GTTSEngine 사용)
+ """
+ if not self._check_ffmpeg():
+ logger.error("ffmpeg 없음. PATH 또는 FFMPEG_PATH 환경변수 확인")
+ return ''
+
+ if not scenes:
+ logger.warning("scenes 비어 있음 — 영상 생성 불가")
+ return ''
+
+ logger.info(f"FFmpegSlidesEngine 시작: {len(scenes)}개 씬 → {output_path}")
+
+ tts_engine = kwargs.get('tts_engine', None)
+
+ with tempfile.TemporaryDirectory() as tmp:
+ tmp_dir = Path(tmp)
+ clips = []
+
+ for idx, scene in enumerate(scenes):
+ scene_key = scene.get('type', f'scene{idx}')
+
+ # ── 슬라이드 준비 ──────────────────────
+ slide_path = scene.get('slide_path', '')
+ if not slide_path or not Path(slide_path).exists():
+ # shorts_converter의 슬라이드 합성 함수 재사용 시도
+ slide_path = str(tmp_dir / f'slide_{idx}.png')
+ article = kwargs.get('article', {})
+ composed = self._compose_scene_slide(
+ scene, idx, article, tmp_dir
+ )
+ if composed:
+ slide_path = composed
+ else:
+ self._make_blank_slide(slide_path)
+
+ # ── 오디오 준비 ────────────────────────
+ audio_path = scene.get('audio_path', '')
+ if not audio_path or not Path(audio_path).exists():
+ audio_path = str(tmp_dir / f'tts_{idx}.wav')
+ text = scene.get('text', '')
+ ok = False
+ if tts_engine and text:
+ try:
+ ok = tts_engine.synthesize(text, audio_path)
+ except Exception as e:
+ logger.warning(f"TTS 엔진 실패: {e}")
+ if not ok and text:
+ ok = self._tts_gtts(text, audio_path)
+ if not ok:
+ self._make_silent_wav(audio_path)
+
+ # ── 클립 생성 ──────────────────────────
+ clip_path = str(tmp_dir / f'clip_{idx}.mp4')
+ dur = self._make_clip(slide_path, audio_path, clip_path)
+ if dur > 0:
+ clips.append({'mp4': clip_path, 'duration': dur})
+ else:
+ logger.warning(f"씬 {idx} ({scene_key}) 클립 생성 실패 — 건너뜀")
+
+ if not clips:
+ logger.error("생성된 클립 없음")
+ return ''
+
+ # ── 클립 결합 ──────────────────────────────
+ merged = str(tmp_dir / 'merged.mp4')
+ if not self._concat_clips_xfade(clips, merged):
+ logger.error("클립 결합 실패")
+ return ''
+
+ # ── BGM 믹스 ───────────────────────────────
+ with_bgm = str(tmp_dir / 'with_bgm.mp4')
+ self._mix_bgm(merged, with_bgm)
+ source_for_srt = with_bgm if Path(with_bgm).exists() else merged
+
+ # ── 자막 burn-in ───────────────────────────
+ if self.burn_subs:
+ srt_content = self._build_srt(scenes, clips)
+ srt_path = str(tmp_dir / 'subtitles.srt')
+ Path(srt_path).write_text(srt_content, encoding='utf-8-sig')
+
+ Path(output_path).parent.mkdir(parents=True, exist_ok=True)
+ if not self._burn_subtitles(source_for_srt, srt_path, output_path):
+ logger.warning("자막 burn-in 실패 — 자막 없는 버전으로 저장")
+ shutil.copy2(source_for_srt, output_path)
+ else:
+ Path(output_path).parent.mkdir(parents=True, exist_ok=True)
+ shutil.copy2(source_for_srt, output_path)
+
+ if Path(output_path).exists():
+ logger.info(f"FFmpegSlidesEngine 완료: {output_path}")
+ return output_path
+ else:
+ logger.error(f"최종 파일 없음: {output_path}")
+ return ''
+
+ def _compose_scene_slide(self, scene: dict, idx: int,
+ article: dict, tmp_dir: Path) -> Optional[str]:
+ """
+ shorts_converter의 슬라이드 합성 함수를 재사용해 씬별 슬라이드 생성.
+ 임포트 실패 시 None 반환 (blank slide 폴백).
+ """
+ try:
+ from bots.converters.shorts_converter import (
+ compose_intro_slide,
+ compose_headline_slide,
+ compose_point_slide,
+ compose_outro_slide,
+ compose_data_slide,
+ _set_tmp_dir,
+ _load_template,
+ )
+ _set_tmp_dir(tmp_dir)
+ cfg = _load_template()
+ scene_type = scene.get('type', '')
+ out_path = str(tmp_dir / f'slide_{idx}.png')
+
+ if scene_type == 'intro':
+ return compose_intro_slide(cfg)
+ elif scene_type == 'headline':
+ return compose_headline_slide(article, cfg)
+ elif scene_type in ('point', 'point1', 'point2', 'point3'):
+ num = int(scene_type[-1]) if scene_type[-1].isdigit() else 1
+ return compose_point_slide(scene.get('text', ''), num, article, cfg)
+ elif scene_type == 'data':
+ return compose_data_slide(article, cfg)
+ elif scene_type == 'outro':
+ return compose_outro_slide(cfg)
+ else:
+ # 알 수 없는 타입 → 헤드라인 슬라이드로 대체
+ return compose_headline_slide(article, cfg)
+ except ImportError as e:
+ logger.warning(f"shorts_converter 임포트 실패: {e}")
+ return None
+ except Exception as e:
+ logger.warning(f"슬라이드 합성 실패 (씬 {idx}): {e}")
+ return None
+
+
+# ─── SeedanceEngine ────────────────────────────────────
+
+class SeedanceEngine(VideoEngine):
+ """
+ Seedance 2.0 API를 사용한 AI 영상 생성 엔진.
+ API 키 없거나 실패 시 FFmpegSlidesEngine으로 자동 폴백.
+ """
+
+ def __init__(self, cfg: dict):
+ self.api_url = cfg.get('api_url', 'https://api.seedance2.ai/v1/generate')
+ self.api_key = os.getenv(cfg.get('api_key_env', 'SEEDANCE_API_KEY'), '')
+ self.resolution = cfg.get('resolution', '1080x1920')
+ self.duration = cfg.get('duration', '10s')
+ self.audio = cfg.get('audio', True)
+ self._fallback_cfg = cfg
+
+ def _fallback(self, scenes: list, output_path: str, **kwargs) -> str:
+ logger.info("SeedanceEngine → FFmpegSlidesEngine 폴백")
+ return FFmpegSlidesEngine(self._fallback_cfg).generate(
+ scenes, output_path, **kwargs
+ )
+
+ def _download_file(self, url: str, dest: str, timeout: int = 120) -> bool:
+ try:
+ import requests as req
+ resp = req.get(url, timeout=timeout, stream=True)
+ resp.raise_for_status()
+ with open(dest, 'wb') as f:
+ for chunk in resp.iter_content(chunk_size=8192):
+ f.write(chunk)
+ return True
+ except Exception as e:
+ logger.error(f"파일 다운로드 실패 ({url}): {e}")
+ return False
+
+ def _concat_clips_ffmpeg(self, clip_paths: list, output_path: str) -> bool:
+ """ffmpeg concat demuxer로 클립 결합 (인트로 2초 + 씬 + 아웃트로 3초)"""
+ if not clip_paths:
+ return False
+ ffmpeg = os.getenv('FFMPEG_PATH', 'ffmpeg')
+ with tempfile.TemporaryDirectory() as tmp:
+ list_file = str(Path(tmp) / 'clips.txt')
+ with open(list_file, 'w', encoding='utf-8') as f:
+ for p in clip_paths:
+ f.write(f"file '{p}'\n")
+ result = subprocess.run(
+ [ffmpeg, '-y', '-loglevel', 'error',
+ '-f', 'concat', '-safe', '0',
+ '-i', list_file,
+ '-c', 'copy', output_path],
+ capture_output=True, timeout=300,
+ )
+ return result.returncode == 0
+
+ def _generate_scene_clip(self, scene: dict, output_path: str) -> bool:
+ """단일 씬에 대해 Seedance API 호출 → 클립 다운로드"""
+ try:
+ import requests as req
+ prompt = scene.get('image_prompt') or scene.get('text', '')
+ if not prompt:
+ return False
+
+ payload = {
+ 'prompt': prompt,
+ 'resolution': self.resolution,
+ 'duration': self.duration,
+ 'audio': self.audio,
+ }
+ headers = {
+ 'Authorization': f'Bearer {self.api_key}',
+ 'Content-Type': 'application/json',
+ }
+ logger.info(f"Seedance API 호출: {prompt[:60]}...")
+ resp = req.post(self.api_url, json=payload, headers=headers, timeout=120)
+ resp.raise_for_status()
+
+ data = resp.json()
+ video_url = data.get('video_url') or data.get('url', '')
+ if not video_url:
+ logger.error(f"Seedance 응답에 video_url 없음: {data}")
+ return False
+
+ return self._download_file(video_url, output_path)
+ except Exception as e:
+ logger.error(f"Seedance API 오류: {e}")
+ return False
+
+ def generate(self, scenes: list, output_path: str, **kwargs) -> str:
+ if not self.api_key:
+ logger.warning("SEEDANCE_API_KEY 없음 — FFmpegSlidesEngine으로 폴백")
+ return self._fallback(scenes, output_path, **kwargs)
+
+ if not scenes:
+ logger.warning("scenes 비어 있음")
+ return ''
+
+ logger.info(f"SeedanceEngine 시작: {len(scenes)}개 씬")
+
+ ffmpeg = os.getenv('FFMPEG_PATH', 'ffmpeg')
+
+ with tempfile.TemporaryDirectory() as tmp:
+ tmp_dir = Path(tmp)
+ clip_paths = []
+
+ # 인트로 클립 (2초 단색)
+ intro_path = str(tmp_dir / 'intro.mp4')
+ subprocess.run(
+ [ffmpeg, '-y', '-loglevel', 'error',
+ '-f', 'lavfi', '-i', 'color=c=black:s=1080x1920:r=30',
+ '-t', '2', '-c:v', 'libx264', '-pix_fmt', 'yuv420p',
+ intro_path],
+ capture_output=True, timeout=30,
+ )
+ if Path(intro_path).exists():
+ clip_paths.append(intro_path)
+
+ # 씬별 클립 생성
+ success_count = 0
+ for idx, scene in enumerate(scenes):
+ clip_path = str(tmp_dir / f'scene_{idx}.mp4')
+ if self._generate_scene_clip(scene, clip_path):
+ clip_paths.append(clip_path)
+ success_count += 1
+ else:
+ logger.warning(f"씬 {idx} Seedance 실패 — 폴백으로 전환")
+ return self._fallback(scenes, output_path, **kwargs)
+
+ if success_count == 0:
+ logger.warning("모든 씬 실패 — FFmpegSlidesEngine으로 폴백")
+ return self._fallback(scenes, output_path, **kwargs)
+
+ # 아웃트로 클립 (3초 단색)
+ outro_path = str(tmp_dir / 'outro.mp4')
+ subprocess.run(
+ [ffmpeg, '-y', '-loglevel', 'error',
+ '-f', 'lavfi', '-i', 'color=c=black:s=1080x1920:r=30',
+ '-t', '3', '-c:v', 'libx264', '-pix_fmt', 'yuv420p',
+ outro_path],
+ capture_output=True, timeout=30,
+ )
+ if Path(outro_path).exists():
+ clip_paths.append(outro_path)
+
+ # 클립 결합
+ Path(output_path).parent.mkdir(parents=True, exist_ok=True)
+ if not self._concat_clips_ffmpeg(clip_paths, output_path):
+ logger.error("SeedanceEngine 클립 결합 실패")
+ return self._fallback(scenes, output_path, **kwargs)
+
+ if Path(output_path).exists():
+ logger.info(f"SeedanceEngine 완료: {output_path}")
+ return output_path
+ return self._fallback(scenes, output_path, **kwargs)
+
+
+# ─── SoraEngine ────────────────────────────────────────
+
+class SoraEngine(VideoEngine):
+ """
+ OpenAI Sora 영상 생성 엔진.
+ 현재 API 공개 접근 불가 — ffmpeg_slides로 폴백.
+ """
+
+ def __init__(self, cfg: dict):
+ self.cfg = cfg
+
+ def generate(self, scenes: list, output_path: str, **kwargs) -> str:
+ logger.warning("Sora API 미지원. ffmpeg_slides로 폴백.")
+ return FFmpegSlidesEngine(self.cfg).generate(scenes, output_path, **kwargs)
+
+
+# ─── RunwayEngine ──────────────────────────────────────
+
+class RunwayEngine(VideoEngine):
+ """
+ Runway Gen-3 API를 사용한 AI 영상 생성 엔진.
+ API 키 없거나 실패 시 FFmpegSlidesEngine으로 자동 폴백.
+ """
+
+ def __init__(self, cfg: dict):
+ self.cfg = cfg
+ self.api_key = os.getenv(cfg.get('api_key_env', 'RUNWAY_API_KEY'), '')
+ self.api_url = cfg.get('api_url', 'https://api.runwayml.com/v1/image_to_video')
+ self.model = cfg.get('model', 'gen3a_turbo')
+ self.duration = cfg.get('duration', 10)
+ self.ratio = cfg.get('ratio', '768:1344')
+
+ def _fallback(self, scenes: list, output_path: str, **kwargs) -> str:
+ logger.info("RunwayEngine → FFmpegSlidesEngine 폴백")
+ return FFmpegSlidesEngine(self.cfg).generate(scenes, output_path, **kwargs)
+
+ def _generate_scene_clip(self, scene: dict, output_path: str) -> bool:
+ """단일 씬에 대해 Runway API 호출 → 클립 다운로드"""
+ try:
+ import requests as req
+ prompt = scene.get('image_prompt') or scene.get('text', '')
+ if not prompt:
+ return False
+
+ headers = {
+ 'Authorization': f'Bearer {self.api_key}',
+ 'Content-Type': 'application/json',
+ 'X-Runway-Version': '2024-11-06',
+ }
+ payload = {
+ 'model': self.model,
+ 'promptText': prompt,
+ 'duration': self.duration,
+ 'ratio': self.ratio,
+ }
+ logger.info(f"Runway API 호출: {prompt[:60]}...")
+ resp = req.post(self.api_url, json=payload, headers=headers, timeout=30)
+ resp.raise_for_status()
+
+ data = resp.json()
+ task_id = data.get('id', '')
+ if not task_id:
+ logger.error(f"Runway 태스크 ID 없음: {data}")
+ return False
+
+ # 폴링: 태스크 완료 대기
+ poll_url = f'https://api.runwayml.com/v1/tasks/{task_id}'
+ import time
+ for _ in range(60):
+ time.sleep(10)
+ poll = req.get(poll_url, headers=headers, timeout=30)
+ poll.raise_for_status()
+ status_data = poll.json()
+ status = status_data.get('status', '')
+ if status == 'SUCCEEDED':
+ video_url = (status_data.get('output') or [''])[0]
+ if not video_url:
+ logger.error("Runway 완료됐으나 video_url 없음")
+ return False
+ return self._download_file(video_url, output_path)
+ elif status in ('FAILED', 'CANCELLED'):
+ logger.error(f"Runway 태스크 실패: {status_data}")
+ return False
+ logger.error("Runway 태스크 타임아웃 (10분)")
+ return False
+ except Exception as e:
+ logger.error(f"Runway API 오류: {e}")
+ return False
+
+ def _download_file(self, url: str, dest: str, timeout: int = 120) -> bool:
+ try:
+ import requests as req
+ resp = req.get(url, timeout=timeout, stream=True)
+ resp.raise_for_status()
+ with open(dest, 'wb') as f:
+ for chunk in resp.iter_content(chunk_size=8192):
+ f.write(chunk)
+ return True
+ except Exception as e:
+ logger.error(f"파일 다운로드 실패 ({url}): {e}")
+ return False
+
+ def generate(self, scenes: list, output_path: str, **kwargs) -> str:
+ if not self.api_key:
+ logger.warning("RUNWAY_API_KEY 없음 — FFmpegSlidesEngine으로 폴백")
+ return self._fallback(scenes, output_path, **kwargs)
+
+ if not scenes:
+ logger.warning("scenes 비어 있음")
+ return ''
+
+ logger.info(f"RunwayEngine 시작: {len(scenes)}개 씬")
+
+ ffmpeg = os.getenv('FFMPEG_PATH', 'ffmpeg')
+
+ with tempfile.TemporaryDirectory() as tmp:
+ tmp_dir = Path(tmp)
+ clip_paths = []
+
+ for idx, scene in enumerate(scenes):
+ clip_path = str(tmp_dir / f'scene_{idx}.mp4')
+ if self._generate_scene_clip(scene, clip_path):
+ clip_paths.append(clip_path)
+ else:
+ logger.warning(f"씬 {idx} Runway 실패 — FFmpegSlidesEngine 폴백")
+ return self._fallback(scenes, output_path, **kwargs)
+
+ if not clip_paths:
+ return self._fallback(scenes, output_path, **kwargs)
+
+ # concat
+ list_file = str(tmp_dir / 'clips.txt')
+ with open(list_file, 'w', encoding='utf-8') as f:
+ for p in clip_paths:
+ f.write(f"file '{p}'\n")
+
+ Path(output_path).parent.mkdir(parents=True, exist_ok=True)
+ result = subprocess.run(
+ [ffmpeg, '-y', '-loglevel', 'error',
+ '-f', 'concat', '-safe', '0',
+ '-i', list_file, '-c', 'copy', output_path],
+ capture_output=True, timeout=300,
+ )
+ if result.returncode != 0:
+ logger.error("RunwayEngine 클립 결합 실패")
+ return self._fallback(scenes, output_path, **kwargs)
+
+ if Path(output_path).exists():
+ logger.info(f"RunwayEngine 완료: {output_path}")
+ return output_path
+ return self._fallback(scenes, output_path, **kwargs)
+
+
+# ─── VeoEngine ─────────────────────────────────────────
+
+class VeoEngine(VideoEngine):
+ """
+ Google Veo 3.1 영상 생성 엔진.
+ 현재 API 공개 접근 불가 — ffmpeg_slides로 폴백.
+ """
+
+ def __init__(self, cfg: dict):
+ self.cfg = cfg
+
+ def generate(self, scenes: list, output_path: str, **kwargs) -> str:
+ logger.warning("Veo API 미지원. ffmpeg_slides로 폴백.")
+ return FFmpegSlidesEngine(self.cfg).generate(scenes, output_path, **kwargs)
+
+
+# ─── 팩토리 함수 ───────────────────────────────────────
+
+def get_engine(video_cfg: dict) -> VideoEngine:
+ """
+ engine.json video_generation 설정에서 엔진 인스턴스 반환.
+
+ 사용:
+ cfg = {'provider': 'ffmpeg_slides', 'options': {...}}
+ engine = get_engine(cfg)
+ mp4 = engine.generate(scenes, '/path/to/output.mp4')
+ """
+ provider = video_cfg.get('provider', 'ffmpeg_slides')
+ opts = video_cfg.get('options', {}).get(provider, {})
+
+ engine_map = {
+ 'ffmpeg_slides': FFmpegSlidesEngine,
+ 'seedance': SeedanceEngine,
+ 'sora': SoraEngine,
+ 'runway': RunwayEngine,
+ 'veo': VeoEngine,
+ }
+ cls = engine_map.get(provider, FFmpegSlidesEngine)
+ logger.info(f"VideoEngine 선택: {provider} ({cls.__name__})")
+ return cls(opts)
diff --git a/bots/engine_loader.py b/bots/engine_loader.py
new file mode 100644
index 0000000..4343f04
--- /dev/null
+++ b/bots/engine_loader.py
@@ -0,0 +1,521 @@
+"""
+엔진 로더 (bots/engine_loader.py)
+역할: config/engine.json을 읽어 현재 설정된 provider에 맞는 구현체를 반환
+설계서: blog-engine-final-masterplan-v3.txt
+
+사용:
+ loader = EngineLoader()
+ writer = loader.get_writer()
+ result = writer.write("AI 관련 기사 써줘")
+ tts = loader.get_tts()
+ tts.synthesize("안녕하세요", "/tmp/out.wav")
+"""
+import json
+import logging
+import os
+import subprocess
+from abc import ABC, abstractmethod
+from pathlib import Path
+from typing import Any, Optional
+
+from dotenv import load_dotenv
+
+load_dotenv()
+
+BASE_DIR = Path(__file__).parent.parent
+CONFIG_PATH = BASE_DIR / 'config' / 'engine.json'
+LOG_DIR = BASE_DIR / 'logs'
+LOG_DIR.mkdir(exist_ok=True)
+
+logger = logging.getLogger(__name__)
+if not logger.handlers:
+ handler = logging.FileHandler(LOG_DIR / 'engine_loader.log', encoding='utf-8')
+ handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))
+ logger.addHandler(handler)
+ logger.addHandler(logging.StreamHandler())
+ logger.setLevel(logging.INFO)
+
+
+# ─── 기본 추상 클래스 ──────────────────────────────────
+
+class BaseWriter(ABC):
+ @abstractmethod
+ def write(self, prompt: str, system: str = '') -> str:
+ """글쓰기 요청. prompt에 대한 결과 문자열 반환."""
+
+
+class BaseTTS(ABC):
+ @abstractmethod
+ def synthesize(self, text: str, output_path: str,
+ lang: str = 'ko', speed: float = 1.05) -> bool:
+ """TTS 합성. 성공 시 True 반환."""
+
+
+class BaseImageGenerator(ABC):
+ @abstractmethod
+ def generate(self, prompt: str, output_path: str,
+ size: str = '1024x1792') -> bool:
+ """이미지 생성. 성공 시 True 반환."""
+
+
+# VideoEngine은 video_engine.py에 정의됨
+# BaseVideoGenerator 타입 힌트 호환용
+BaseVideoGenerator = object
+
+
+# ─── Writer 구현체 ──────────────────────────────────────
+
+class ClaudeWriter(BaseWriter):
+ """Anthropic Claude API를 사용하는 글쓰기 엔진"""
+
+ def __init__(self, cfg: dict):
+ self.api_key = os.getenv(cfg.get('api_key_env', 'ANTHROPIC_API_KEY'), '')
+ self.model = cfg.get('model', 'claude-opus-4-5')
+ self.max_tokens = cfg.get('max_tokens', 4096)
+ self.temperature = cfg.get('temperature', 0.7)
+
+ def write(self, prompt: str, system: str = '') -> str:
+ if not self.api_key:
+ logger.warning("ANTHROPIC_API_KEY 없음 — ClaudeWriter 비활성화")
+ return ''
+ try:
+ import anthropic
+ client = anthropic.Anthropic(api_key=self.api_key)
+ kwargs: dict = {
+ 'model': self.model,
+ 'max_tokens': self.max_tokens,
+ 'messages': [{'role': 'user', 'content': prompt}],
+ }
+ if system:
+ kwargs['system'] = system
+ message = client.messages.create(**kwargs)
+ return message.content[0].text
+ except Exception as e:
+ logger.error(f"ClaudeWriter 오류: {e}")
+ return ''
+
+
+class OpenClawWriter(BaseWriter):
+ """OpenClaw CLI를 subprocess로 호출하는 글쓰기 엔진"""
+
+ def __init__(self, cfg: dict):
+ self.agent_name = cfg.get('agent_name', 'blog-writer')
+ self.timeout = cfg.get('timeout', 120)
+
+ def write(self, prompt: str, system: str = '') -> str:
+ try:
+ cmd = ['openclaw', 'run', self.agent_name, '--prompt', prompt]
+ if system:
+ cmd += ['--system', system]
+ result = subprocess.run(
+ cmd,
+ capture_output=True,
+ text=True,
+ timeout=self.timeout,
+ encoding='utf-8',
+ )
+ if result.returncode != 0:
+ logger.error(f"OpenClawWriter 오류: {result.stderr[:300]}")
+ return ''
+ return result.stdout.strip()
+ except subprocess.TimeoutExpired:
+ logger.error(f"OpenClawWriter 타임아웃 ({self.timeout}초)")
+ return ''
+ except FileNotFoundError:
+ logger.warning("openclaw CLI 없음 — OpenClawWriter 비활성화")
+ return ''
+ except Exception as e:
+ logger.error(f"OpenClawWriter 오류: {e}")
+ return ''
+
+
+class GeminiWriter(BaseWriter):
+ """Google Gemini API를 사용하는 글쓰기 엔진"""
+
+ def __init__(self, cfg: dict):
+ self.api_key = os.getenv(cfg.get('api_key_env', 'GEMINI_API_KEY'), '')
+ self.model = cfg.get('model', 'gemini-2.0-flash')
+ self.max_tokens = cfg.get('max_tokens', 4096)
+ self.temperature = cfg.get('temperature', 0.7)
+
+ def write(self, prompt: str, system: str = '') -> str:
+ if not self.api_key:
+ logger.warning("GEMINI_API_KEY 없음 — GeminiWriter 비활성화")
+ return ''
+ try:
+ import google.generativeai as genai # type: ignore
+ genai.configure(api_key=self.api_key)
+ model = genai.GenerativeModel(
+ model_name=self.model,
+ generation_config={
+ 'max_output_tokens': self.max_tokens,
+ 'temperature': self.temperature,
+ },
+ system_instruction=system if system else None,
+ )
+ response = model.generate_content(prompt)
+ return response.text
+ except ImportError:
+ logger.warning("google-generativeai 미설치 — GeminiWriter 비활성화")
+ return ''
+ except Exception as e:
+ logger.error(f"GeminiWriter 오류: {e}")
+ return ''
+
+
+# ─── TTS 구현체 ─────────────────────────────────────────
+
+class GoogleCloudTTS(BaseTTS):
+ """Google Cloud TTS REST API (API Key 방식)"""
+
+ def __init__(self, cfg: dict):
+ self.api_key = os.getenv(cfg.get('api_key_env', 'GOOGLE_TTS_API_KEY'), '')
+ self.voice = cfg.get('voice', 'ko-KR-Wavenet-A')
+ self.default_speed = cfg.get('speaking_rate', 1.05)
+ self.pitch = cfg.get('pitch', 0)
+
+ def synthesize(self, text: str, output_path: str,
+ lang: str = 'ko', speed: float = 0.0) -> bool:
+ if not self.api_key:
+ logger.warning("GOOGLE_TTS_API_KEY 없음 — GoogleCloudTTS 비활성화")
+ return False
+ import base64
+ try:
+ import requests as req
+ speaking_rate = speed if speed > 0 else self.default_speed
+ voice_name = self.voice if lang == 'ko' else 'en-US-Wavenet-D'
+ language_code = 'ko-KR' if lang == 'ko' else 'en-US'
+ url = (
+ f'https://texttospeech.googleapis.com/v1/text:synthesize'
+ f'?key={self.api_key}'
+ )
+ payload = {
+ 'input': {'text': text},
+ 'voice': {'languageCode': language_code, 'name': voice_name},
+ 'audioConfig': {
+ 'audioEncoding': 'LINEAR16',
+ 'speakingRate': speaking_rate,
+ 'pitch': self.pitch,
+ },
+ }
+ resp = req.post(url, json=payload, timeout=30)
+ resp.raise_for_status()
+ audio_b64 = resp.json().get('audioContent', '')
+ if audio_b64:
+ Path(output_path).write_bytes(base64.b64decode(audio_b64))
+ return True
+ except Exception as e:
+ logger.warning(f"GoogleCloudTTS 실패: {e}")
+ return False
+
+
+class OpenAITTS(BaseTTS):
+ """OpenAI TTS API (tts-1-hd)"""
+
+ def __init__(self, cfg: dict):
+ self.api_key = os.getenv(cfg.get('api_key_env', 'OPENAI_API_KEY'), '')
+ self.model = cfg.get('model', 'tts-1-hd')
+ self.voice = cfg.get('voice', 'alloy')
+ self.default_speed = cfg.get('speed', 1.0)
+
+ def synthesize(self, text: str, output_path: str,
+ lang: str = 'ko', speed: float = 0.0) -> bool:
+ if not self.api_key:
+ logger.warning("OPENAI_API_KEY 없음 — OpenAITTS 비활성화")
+ return False
+ try:
+ from openai import OpenAI
+ client = OpenAI(api_key=self.api_key)
+ speak_speed = speed if speed > 0 else self.default_speed
+ response = client.audio.speech.create(
+ model=self.model,
+ voice=self.voice,
+ input=text,
+ speed=speak_speed,
+ response_format='wav',
+ )
+ response.stream_to_file(output_path)
+ return Path(output_path).exists()
+ except ImportError:
+ logger.warning("openai 미설치 — OpenAITTS 비활성화")
+ return False
+ except Exception as e:
+ logger.error(f"OpenAITTS 실패: {e}")
+ return False
+
+
+class ElevenLabsTTS(BaseTTS):
+ """ElevenLabs REST API TTS"""
+
+ def __init__(self, cfg: dict):
+ self.api_key = os.getenv(cfg.get('api_key_env', 'ELEVENLABS_API_KEY'), '')
+ self.model = cfg.get('model', 'eleven_multilingual_v2')
+ self.voice_id = cfg.get('voice_id', 'pNInz6obpgDQGcFmaJgB')
+ self.stability = cfg.get('stability', 0.5)
+ self.similarity_boost = cfg.get('similarity_boost', 0.75)
+
+ def synthesize(self, text: str, output_path: str,
+ lang: str = 'ko', speed: float = 0.0) -> bool:
+ if not self.api_key:
+ logger.warning("ELEVENLABS_API_KEY 없음 — ElevenLabsTTS 비활성화")
+ return False
+ try:
+ import requests as req
+ url = (
+ f'https://api.elevenlabs.io/v1/text-to-speech/'
+ f'{self.voice_id}'
+ )
+ headers = {
+ 'xi-api-key': self.api_key,
+ 'Content-Type': 'application/json',
+ 'Accept': 'audio/mpeg',
+ }
+ payload = {
+ 'text': text,
+ 'model_id': self.model,
+ 'voice_settings': {
+ 'stability': self.stability,
+ 'similarity_boost': self.similarity_boost,
+ },
+ }
+ resp = req.post(url, json=payload, headers=headers, timeout=60)
+ resp.raise_for_status()
+ # mp3 응답 → 파일 저장 (wav 확장자라도 mp3 데이터 저장 후 ffmpeg 변환)
+ mp3_path = str(output_path).replace('.wav', '_tmp.mp3')
+ Path(mp3_path).write_bytes(resp.content)
+ # mp3 → wav 변환 (ffmpeg 사용)
+ ffmpeg = os.getenv('FFMPEG_PATH', 'ffmpeg')
+ result = subprocess.run(
+ [ffmpeg, '-y', '-loglevel', 'error', '-i', mp3_path,
+ '-ar', '24000', output_path],
+ capture_output=True, timeout=60,
+ )
+ Path(mp3_path).unlink(missing_ok=True)
+ return Path(output_path).exists() and result.returncode == 0
+ except Exception as e:
+ logger.error(f"ElevenLabsTTS 실패: {e}")
+ return False
+
+
+class GTTSEngine(BaseTTS):
+ """gTTS 무료 TTS 엔진"""
+
+ def __init__(self, cfg: dict):
+ self.default_lang = cfg.get('lang', 'ko')
+ self.slow = cfg.get('slow', False)
+
+ def synthesize(self, text: str, output_path: str,
+ lang: str = 'ko', speed: float = 0.0) -> bool:
+ try:
+ from gtts import gTTS
+ use_lang = lang if lang else self.default_lang
+ mp3_path = str(output_path).replace('.wav', '_tmp.mp3')
+ tts = gTTS(text=text, lang=use_lang, slow=self.slow)
+ tts.save(mp3_path)
+ # mp3 → wav 변환 (ffmpeg 사용)
+ ffmpeg = os.getenv('FFMPEG_PATH', 'ffmpeg')
+ result = subprocess.run(
+ [ffmpeg, '-y', '-loglevel', 'error', '-i', mp3_path,
+ '-ar', '24000', output_path],
+ capture_output=True, timeout=60,
+ )
+ Path(mp3_path).unlink(missing_ok=True)
+ return Path(output_path).exists() and result.returncode == 0
+ except ImportError:
+ logger.warning("gTTS 미설치 — GTTSEngine 비활성화")
+ return False
+ except Exception as e:
+ logger.warning(f"GTTSEngine 실패: {e}")
+ return False
+
+
+# ─── ImageGenerator 구현체 ─────────────────────────────
+
+class DALLEGenerator(BaseImageGenerator):
+ """OpenAI DALL-E 3 이미지 생성 엔진"""
+
+ def __init__(self, cfg: dict):
+ self.api_key = os.getenv(cfg.get('api_key_env', 'OPENAI_API_KEY'), '')
+ self.model = cfg.get('model', 'dall-e-3')
+ self.default_size = cfg.get('size', '1024x1792')
+ self.quality = cfg.get('quality', 'standard')
+
+ def generate(self, prompt: str, output_path: str,
+ size: str = '') -> bool:
+ if not self.api_key:
+ logger.warning("OPENAI_API_KEY 없음 — DALLEGenerator 비활성화")
+ return False
+ try:
+ from openai import OpenAI
+ import requests as req
+ import io
+ from PIL import Image
+
+ use_size = size if size else self.default_size
+ client = OpenAI(api_key=self.api_key)
+ full_prompt = prompt + ' No text, no letters, no numbers, no watermarks.'
+ response = client.images.generate(
+ model=self.model,
+ prompt=full_prompt,
+ size=use_size,
+ quality=self.quality,
+ n=1,
+ )
+ img_url = response.data[0].url
+ img_bytes = req.get(img_url, timeout=30).content
+ img = Image.open(io.BytesIO(img_bytes)).convert('RGB')
+ img.save(output_path)
+ logger.info(f"DALL-E 이미지 생성 완료: {output_path}")
+ return True
+ except ImportError as e:
+ logger.warning(f"DALLEGenerator 의존성 없음: {e}")
+ return False
+ except Exception as e:
+ logger.error(f"DALLEGenerator 실패: {e}")
+ return False
+
+
+class ExternalGenerator(BaseImageGenerator):
+ """수동 이미지 제공 (자동 생성 없음)"""
+
+ def __init__(self, cfg: dict):
+ pass
+
+ def generate(self, prompt: str, output_path: str,
+ size: str = '') -> bool:
+ logger.info(f"ExternalGenerator: 수동 이미지 필요 — 프롬프트: {prompt[:60]}")
+ return False
+
+
+# ─── EngineLoader ───────────────────────────────────────
+
+class EngineLoader:
+ """
+ config/engine.json을 읽어 현재 설정된 provider에 맞는 구현체를 반환하는
+ 중앙 팩토리 클래스.
+
+ 사용 예:
+ loader = EngineLoader()
+ writer = loader.get_writer()
+ text = writer.write("오늘의 AI 뉴스 정리해줘")
+ """
+
+ _DEFAULT_CONFIG = {
+ 'writing': {'provider': 'claude', 'options': {'claude': {}}},
+ 'tts': {'provider': 'gtts', 'options': {'gtts': {}}},
+ 'image_generation': {'provider': 'external', 'options': {'external': {}}},
+ 'video_generation': {'provider': 'ffmpeg_slides', 'options': {'ffmpeg_slides': {}}},
+ 'publishing': {},
+ 'quality_gates': {'gate1_research_min_score': 60},
+ }
+
+ def __init__(self, config_path: Optional[Path] = None):
+ self._config_path = config_path or CONFIG_PATH
+ self._config = self._load_config()
+
+ def _load_config(self) -> dict:
+ if self._config_path.exists():
+ try:
+ return json.loads(self._config_path.read_text(encoding='utf-8'))
+ except Exception as e:
+ logger.error(f"engine.json 로드 실패: {e} — 기본값 사용")
+ else:
+ logger.warning(f"engine.json 없음 ({self._config_path}) — 기본값으로 gtts + ffmpeg_slides 사용")
+ return dict(self._DEFAULT_CONFIG)
+
+ def get_config(self, *keys) -> Any:
+ """
+ engine.json 값 접근.
+ 예: loader.get_config('writing', 'provider')
+ loader.get_config('quality_gates', 'gate1_research_min_score')
+ """
+ val = self._config
+ for key in keys:
+ if isinstance(val, dict):
+ val = val.get(key)
+ else:
+ return None
+ return val
+
+ def update_provider(self, category: str, provider: str) -> None:
+ """
+ 런타임 provider 변경 (engine.json 파일은 수정하지 않음).
+ 예: loader.update_provider('tts', 'openai')
+ """
+ if category in self._config:
+ self._config[category]['provider'] = provider
+ logger.info(f"런타임 provider 변경: {category} → {provider}")
+ else:
+ logger.warning(f"update_provider: 알 수 없는 카테고리 '{category}'")
+
+ def get_writer(self) -> BaseWriter:
+ """현재 설정된 writing provider에 맞는 BaseWriter 구현체 반환"""
+ writing_cfg = self._config.get('writing', {})
+ provider = writing_cfg.get('provider', 'claude')
+ options = writing_cfg.get('options', {}).get(provider, {})
+
+ writers = {
+ 'claude': ClaudeWriter,
+ 'openclaw': OpenClawWriter,
+ 'gemini': GeminiWriter,
+ }
+ cls = writers.get(provider, ClaudeWriter)
+ logger.info(f"Writer 로드: {provider} ({cls.__name__})")
+ return cls(options)
+
+ def get_tts(self) -> BaseTTS:
+ """현재 설정된 tts provider에 맞는 BaseTTS 구현체 반환"""
+ tts_cfg = self._config.get('tts', {})
+ provider = tts_cfg.get('provider', 'gtts')
+ options = tts_cfg.get('options', {}).get(provider, {})
+
+ tts_engines = {
+ 'google_cloud': GoogleCloudTTS,
+ 'openai': OpenAITTS,
+ 'elevenlabs': ElevenLabsTTS,
+ 'gtts': GTTSEngine,
+ }
+ cls = tts_engines.get(provider, GTTSEngine)
+ logger.info(f"TTS 로드: {provider} ({cls.__name__})")
+ return cls(options)
+
+ def get_image_generator(self) -> BaseImageGenerator:
+ """현재 설정된 image_generation provider에 맞는 구현체 반환"""
+ img_cfg = self._config.get('image_generation', {})
+ provider = img_cfg.get('provider', 'external')
+ options = img_cfg.get('options', {}).get(provider, {})
+
+ generators = {
+ 'dalle': DALLEGenerator,
+ 'external': ExternalGenerator,
+ }
+ cls = generators.get(provider, ExternalGenerator)
+ logger.info(f"ImageGenerator 로드: {provider} ({cls.__name__})")
+ return cls(options)
+
+ def get_video_generator(self):
+ """현재 설정된 video_generation provider에 맞는 VideoEngine 구현체 반환"""
+ from bots.converters import video_engine
+ video_cfg = self._config.get('video_generation', {
+ 'provider': 'ffmpeg_slides',
+ 'options': {'ffmpeg_slides': {}},
+ })
+ engine = video_engine.get_engine(video_cfg)
+ logger.info(f"VideoGenerator 로드: {video_cfg.get('provider', 'ffmpeg_slides')}")
+ return engine
+
+ def get_publishers(self) -> list:
+ """
+ 활성화된 publishing 채널 목록 반환.
+ 반환 형식: [{'name': str, 'enabled': bool, ...설정값}, ...]
+ """
+ publishing_cfg = self._config.get('publishing', {})
+ result = []
+ for name, cfg in publishing_cfg.items():
+ if isinstance(cfg, dict):
+ result.append({'name': name, **cfg})
+ return result
+
+ def get_enabled_publishers(self) -> list:
+ """enabled: true인 publishing 채널만 반환"""
+ return [p for p in self.get_publishers() if p.get('enabled', False)]
diff --git a/bots/novel/__init__.py b/bots/novel/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/bots/novel/novel_blog_converter.py b/bots/novel/novel_blog_converter.py
new file mode 100644
index 0000000..89aef64
--- /dev/null
+++ b/bots/novel/novel_blog_converter.py
@@ -0,0 +1,341 @@
+"""
+novel_blog_converter.py
+소설 연재 파이프라인 — 에피소드 → Blogger-ready HTML 변환 모듈
+역할: 에피소드 dict + 소설 설정 → 장르별 테마 HTML 생성
+출력: data/novels/{novel_id}/episodes/ep{N:03d}_blog.html
+"""
+import json
+import logging
+import sys
+from datetime import datetime, timezone
+from pathlib import Path
+
+from dotenv import load_dotenv
+
+load_dotenv()
+
+BASE_DIR = Path(__file__).parent.parent.parent
+sys.path.insert(0, str(BASE_DIR / 'bots'))
+
+logger = logging.getLogger(__name__)
+
+BLOG_BASE_URL = 'https://the4thpath.com'
+
+# ─── 장르별 컬러 테마 ─────────────────────────────────────────────────────────
+
+GENRE_THEMES = {
+ 'sci-fi': {
+ 'bg': '#0a0f1e',
+ 'accent': '#00bcd4',
+ 'accent_dim': '#007c8c',
+ 'card_bg': '#0e1628',
+ 'text': '#cfe8ef',
+ 'meta': '#6fa8bc',
+ 'nav_bg': '#0c1220',
+ },
+ 'thriller': {
+ 'bg': '#0a0a0d',
+ 'accent': '#bf3a3a',
+ 'accent_dim': '#8a2222',
+ 'card_bg': '#141418',
+ 'text': '#e8e0e0',
+ 'meta': '#a08080',
+ 'nav_bg': '#111115',
+ },
+ 'fantasy': {
+ 'bg': '#0f0a1e',
+ 'accent': '#c8a84e',
+ 'accent_dim': '#8a7030',
+ 'card_bg': '#180f2e',
+ 'text': '#e8e0f0',
+ 'meta': '#9a8ab0',
+ 'nav_bg': '#130c22',
+ },
+ 'romance': {
+ 'bg': '#ffffff',
+ 'accent': '#d85a30',
+ 'accent_dim': '#a04020',
+ 'card_bg': '#fff5f0',
+ 'text': '#2a1a14',
+ 'meta': '#8a5a4a',
+ 'nav_bg': '#fff0ea',
+ },
+ 'default': {
+ 'bg': '#0a0a0d',
+ 'accent': '#c8a84e',
+ 'accent_dim': '#8a7030',
+ 'card_bg': '#141418',
+ 'text': '#e8e0d0',
+ 'meta': '#a09070',
+ 'nav_bg': '#111115',
+ },
+}
+
+
+def _get_theme(genre: str) -> dict:
+ """장르 문자열에서 테마 결정 (부분 매칭 포함)"""
+ genre_lower = genre.lower()
+ for key in GENRE_THEMES:
+ if key in genre_lower:
+ return GENRE_THEMES[key]
+ return GENRE_THEMES['default']
+
+
+def _build_json_ld(episode: dict, novel_config: dict, post_url: str = '') -> str:
+ """Schema.org Article JSON-LD 생성"""
+ schema = {
+ '@context': 'https://schema.org',
+ '@type': 'Article',
+ 'headline': f"{novel_config.get('title_ko', '')} {episode.get('episode_num', 0)}화 — {episode.get('title', '')}",
+ 'description': episode.get('hook', ''),
+ 'datePublished': datetime.now(timezone.utc).isoformat(),
+ 'dateModified': datetime.now(timezone.utc).isoformat(),
+ 'author': {
+ '@type': 'Person',
+ 'name': 'The 4th Path'
+ },
+ 'publisher': {
+ '@type': 'Organization',
+ 'name': 'The 4th Path',
+ 'logo': {
+ '@type': 'ImageObject',
+ 'url': f'{BLOG_BASE_URL}/logo.png'
+ }
+ },
+ 'mainEntityOfPage': {
+ '@type': 'WebPage',
+ '@id': post_url or BLOG_BASE_URL
+ },
+ 'genre': novel_config.get('genre', ''),
+ 'isPartOf': {
+ '@type': 'CreativeWorkSeries',
+ 'name': novel_config.get('title_ko', ''),
+ 'position': episode.get('episode_num', 0)
+ }
+ }
+ return (
+ ''
+ )
+
+
+def _body_to_html(body_text: str) -> str:
+ """소설 본문 텍스트 → HTML 단락 변환 (빈 줄 기준 분리)"""
+ paragraphs = []
+ for para in body_text.split('\n\n'):
+ para = para.strip()
+ if not para:
+ continue
+ # 대화문 들여쓰기 처리
+ lines = para.split('\n')
+ html_lines = []
+ for line in lines:
+ line = line.strip()
+ if not line:
+ continue
+ # HTML 특수문자 이스케이프
+ line = (line.replace('&', '&')
+ .replace('<', '<')
+ .replace('>', '>'))
+ # 대화문 (따옴표 시작) 스타일 적용
+ if line.startswith('"') or line.startswith('"') or line.startswith('"'):
+ html_lines.append(
+ f'{line}'
+ )
+ else:
+ html_lines.append(line)
+ paragraphs.append('
\n'.join(html_lines))
+
+ return '\n'.join(
+ f'
{p}
' + for p in paragraphs if p + ) + + +def convert( + episode: dict, + novel_config: dict, + prev_url: str = '', + next_url: str = '', + save_file: bool = True +) -> str: + """ + 에피소드 + 소설 설정 → Blogger-ready HTML. + data/novels/{novel_id}/episodes/ep{N:03d}_blog.html 저장. + 반환: HTML 문자열 + """ + novel_id = novel_config.get('novel_id', episode.get('novel_id', 'unknown')) + ep_num = episode.get('episode_num', 0) + title = episode.get('title', f'에피소드 {ep_num}') + body_text = episode.get('body', '') + hook = episode.get('hook', '') + genre = novel_config.get('genre', '') + title_ko = novel_config.get('title_ko', '') + + logger.info(f"[{novel_id}] 에피소드 {ep_num} 블로그 변환 시작") + + theme = _get_theme(genre) + bg = theme['bg'] + accent = theme['accent'] + accent_dim = theme['accent_dim'] + card_bg = theme['card_bg'] + text_color = theme['text'] + meta_color = theme['meta'] + nav_bg = theme['nav_bg'] + + # 다음 에피소드 예정일 (publish_schedule 파싱 — 간단 처리) + next_date_str = '다음 회 예고' + try: + schedule = novel_config.get('publish_schedule', '') + # "매주 월/목 09:00" 형식에서 요일 추출 + if schedule: + next_date_str = schedule.replace('매주 ', '').replace('09:00', '').strip() + except Exception: + pass + + # 본문 HTML + body_html = _body_to_html(body_text) + + # JSON-LD + post_url = '' + json_ld = _build_json_ld(episode, novel_config, post_url) + + # 이전/다음 네비게이션 + prev_link = ( + f'← {ep_num - 1}화' + if prev_url and ep_num > 1 + else f'첫 번째 에피소드' + ) + next_link = ( + f'{ep_num + 1}화 →' + if next_url + else f'다음 회 업데이트 예정' + ) + + # 전체 HTML 조립 + html = f"""{json_ld} + + ++ {title_ko} +
+ + ++ 다음 에피소드 예고 · {next_date_str} +
++ {hook if hook else '다음 회를 기대해 주세요.'} +
++ {title_ko} 정보 +
++ 장르: {genre} · 목표 {novel_config.get('episode_count_target', 20)}화 완결 +
++ 연재 일정: {novel_config.get('publish_schedule', '')} · The 4th Path +
+