8a7a122bb3
[1순위] 엔진 추상화 리팩토링 - config/engine.json: 단일 설정 파일로 writing/tts/image/video/publishing 엔진 제어 - bots/engine_loader.py: EngineLoader 팩토리 클래스 (Claude/OpenClaw/Gemini Writer, gTTS/GoogleCloud/OpenAI/ElevenLabs TTS, DALL-E/External 이미지) [2순위] VideoEngine 추상화 - bots/converters/video_engine.py: VideoEngine ABC + FFmpegSlidesEngine/SeedanceEngine/SoraEngine/RunwayEngine/VeoEngine 구현 - Seedance 2.0 API 연동 + 실패 시 ffmpeg_slides 자동 fallback [3순위] 소설 연재 파이프라인 - bots/novel/novel_writer.py: AI 에피소드 자동 생성 (Claude/엔진 추상화) - bots/novel/novel_blog_converter.py: 에피소드 → 장르별 테마 Blogger HTML - bots/novel/novel_shorts_converter.py: key_scenes → TTS + Pillow + VideoEngine → MP4 - bots/novel/novel_manager.py: 전체 파이프라인 조율 + Telegram 명령 처리 - config/novels/shadow-protocol.json: 예시 소설 설정 (2040 서울 SF 스릴러) [스케줄러] 소설 파이프라인 통합 - 매주 월/목 09:00 자동 실행 (job_novel_pipeline) - Telegram 명령: /novel_list, /novel_gen, /novel_status [기타 수정] - collector_bot.py: 한국어 유니코드 감지 + RSS 신뢰도 override 버그 수정 - quality_rules.json: min_score 70→60 - scripts/get_token.py: YouTube OAuth scope 추가 - .env.example: SEEDANCE/ELEVENLABS/GEMINI/RUNWAY API 키 항목 추가 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
522 lines
20 KiB
Python
522 lines
20 KiB
Python
"""
|
|
엔진 로더 (bots/engine_loader.py)
|
|
역할: config/engine.json을 읽어 현재 설정된 provider에 맞는 구현체를 반환
|
|
설계서: blog-engine-final-masterplan-v3.txt
|
|
|
|
사용:
|
|
loader = EngineLoader()
|
|
writer = loader.get_writer()
|
|
result = writer.write("AI 관련 기사 써줘")
|
|
tts = loader.get_tts()
|
|
tts.synthesize("안녕하세요", "/tmp/out.wav")
|
|
"""
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
from abc import ABC, abstractmethod
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
BASE_DIR = Path(__file__).parent.parent
|
|
CONFIG_PATH = BASE_DIR / 'config' / 'engine.json'
|
|
LOG_DIR = BASE_DIR / 'logs'
|
|
LOG_DIR.mkdir(exist_ok=True)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
if not logger.handlers:
|
|
handler = logging.FileHandler(LOG_DIR / 'engine_loader.log', encoding='utf-8')
|
|
handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))
|
|
logger.addHandler(handler)
|
|
logger.addHandler(logging.StreamHandler())
|
|
logger.setLevel(logging.INFO)
|
|
|
|
|
|
# ─── 기본 추상 클래스 ──────────────────────────────────
|
|
|
|
class BaseWriter(ABC):
|
|
@abstractmethod
|
|
def write(self, prompt: str, system: str = '') -> str:
|
|
"""글쓰기 요청. prompt에 대한 결과 문자열 반환."""
|
|
|
|
|
|
class BaseTTS(ABC):
|
|
@abstractmethod
|
|
def synthesize(self, text: str, output_path: str,
|
|
lang: str = 'ko', speed: float = 1.05) -> bool:
|
|
"""TTS 합성. 성공 시 True 반환."""
|
|
|
|
|
|
class BaseImageGenerator(ABC):
|
|
@abstractmethod
|
|
def generate(self, prompt: str, output_path: str,
|
|
size: str = '1024x1792') -> bool:
|
|
"""이미지 생성. 성공 시 True 반환."""
|
|
|
|
|
|
# VideoEngine은 video_engine.py에 정의됨
|
|
# BaseVideoGenerator 타입 힌트 호환용
|
|
BaseVideoGenerator = object
|
|
|
|
|
|
# ─── Writer 구현체 ──────────────────────────────────────
|
|
|
|
class ClaudeWriter(BaseWriter):
|
|
"""Anthropic Claude API를 사용하는 글쓰기 엔진"""
|
|
|
|
def __init__(self, cfg: dict):
|
|
self.api_key = os.getenv(cfg.get('api_key_env', 'ANTHROPIC_API_KEY'), '')
|
|
self.model = cfg.get('model', 'claude-opus-4-5')
|
|
self.max_tokens = cfg.get('max_tokens', 4096)
|
|
self.temperature = cfg.get('temperature', 0.7)
|
|
|
|
def write(self, prompt: str, system: str = '') -> str:
|
|
if not self.api_key:
|
|
logger.warning("ANTHROPIC_API_KEY 없음 — ClaudeWriter 비활성화")
|
|
return ''
|
|
try:
|
|
import anthropic
|
|
client = anthropic.Anthropic(api_key=self.api_key)
|
|
kwargs: dict = {
|
|
'model': self.model,
|
|
'max_tokens': self.max_tokens,
|
|
'messages': [{'role': 'user', 'content': prompt}],
|
|
}
|
|
if system:
|
|
kwargs['system'] = system
|
|
message = client.messages.create(**kwargs)
|
|
return message.content[0].text
|
|
except Exception as e:
|
|
logger.error(f"ClaudeWriter 오류: {e}")
|
|
return ''
|
|
|
|
|
|
class OpenClawWriter(BaseWriter):
|
|
"""OpenClaw CLI를 subprocess로 호출하는 글쓰기 엔진"""
|
|
|
|
def __init__(self, cfg: dict):
|
|
self.agent_name = cfg.get('agent_name', 'blog-writer')
|
|
self.timeout = cfg.get('timeout', 120)
|
|
|
|
def write(self, prompt: str, system: str = '') -> str:
|
|
try:
|
|
cmd = ['openclaw', 'run', self.agent_name, '--prompt', prompt]
|
|
if system:
|
|
cmd += ['--system', system]
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=self.timeout,
|
|
encoding='utf-8',
|
|
)
|
|
if result.returncode != 0:
|
|
logger.error(f"OpenClawWriter 오류: {result.stderr[:300]}")
|
|
return ''
|
|
return result.stdout.strip()
|
|
except subprocess.TimeoutExpired:
|
|
logger.error(f"OpenClawWriter 타임아웃 ({self.timeout}초)")
|
|
return ''
|
|
except FileNotFoundError:
|
|
logger.warning("openclaw CLI 없음 — OpenClawWriter 비활성화")
|
|
return ''
|
|
except Exception as e:
|
|
logger.error(f"OpenClawWriter 오류: {e}")
|
|
return ''
|
|
|
|
|
|
class GeminiWriter(BaseWriter):
|
|
"""Google Gemini API를 사용하는 글쓰기 엔진"""
|
|
|
|
def __init__(self, cfg: dict):
|
|
self.api_key = os.getenv(cfg.get('api_key_env', 'GEMINI_API_KEY'), '')
|
|
self.model = cfg.get('model', 'gemini-2.0-flash')
|
|
self.max_tokens = cfg.get('max_tokens', 4096)
|
|
self.temperature = cfg.get('temperature', 0.7)
|
|
|
|
def write(self, prompt: str, system: str = '') -> str:
|
|
if not self.api_key:
|
|
logger.warning("GEMINI_API_KEY 없음 — GeminiWriter 비활성화")
|
|
return ''
|
|
try:
|
|
import google.generativeai as genai # type: ignore
|
|
genai.configure(api_key=self.api_key)
|
|
model = genai.GenerativeModel(
|
|
model_name=self.model,
|
|
generation_config={
|
|
'max_output_tokens': self.max_tokens,
|
|
'temperature': self.temperature,
|
|
},
|
|
system_instruction=system if system else None,
|
|
)
|
|
response = model.generate_content(prompt)
|
|
return response.text
|
|
except ImportError:
|
|
logger.warning("google-generativeai 미설치 — GeminiWriter 비활성화")
|
|
return ''
|
|
except Exception as e:
|
|
logger.error(f"GeminiWriter 오류: {e}")
|
|
return ''
|
|
|
|
|
|
# ─── TTS 구현체 ─────────────────────────────────────────
|
|
|
|
class GoogleCloudTTS(BaseTTS):
|
|
"""Google Cloud TTS REST API (API Key 방식)"""
|
|
|
|
def __init__(self, cfg: dict):
|
|
self.api_key = os.getenv(cfg.get('api_key_env', 'GOOGLE_TTS_API_KEY'), '')
|
|
self.voice = cfg.get('voice', 'ko-KR-Wavenet-A')
|
|
self.default_speed = cfg.get('speaking_rate', 1.05)
|
|
self.pitch = cfg.get('pitch', 0)
|
|
|
|
def synthesize(self, text: str, output_path: str,
|
|
lang: str = 'ko', speed: float = 0.0) -> bool:
|
|
if not self.api_key:
|
|
logger.warning("GOOGLE_TTS_API_KEY 없음 — GoogleCloudTTS 비활성화")
|
|
return False
|
|
import base64
|
|
try:
|
|
import requests as req
|
|
speaking_rate = speed if speed > 0 else self.default_speed
|
|
voice_name = self.voice if lang == 'ko' else 'en-US-Wavenet-D'
|
|
language_code = 'ko-KR' if lang == 'ko' else 'en-US'
|
|
url = (
|
|
f'https://texttospeech.googleapis.com/v1/text:synthesize'
|
|
f'?key={self.api_key}'
|
|
)
|
|
payload = {
|
|
'input': {'text': text},
|
|
'voice': {'languageCode': language_code, 'name': voice_name},
|
|
'audioConfig': {
|
|
'audioEncoding': 'LINEAR16',
|
|
'speakingRate': speaking_rate,
|
|
'pitch': self.pitch,
|
|
},
|
|
}
|
|
resp = req.post(url, json=payload, timeout=30)
|
|
resp.raise_for_status()
|
|
audio_b64 = resp.json().get('audioContent', '')
|
|
if audio_b64:
|
|
Path(output_path).write_bytes(base64.b64decode(audio_b64))
|
|
return True
|
|
except Exception as e:
|
|
logger.warning(f"GoogleCloudTTS 실패: {e}")
|
|
return False
|
|
|
|
|
|
class OpenAITTS(BaseTTS):
|
|
"""OpenAI TTS API (tts-1-hd)"""
|
|
|
|
def __init__(self, cfg: dict):
|
|
self.api_key = os.getenv(cfg.get('api_key_env', 'OPENAI_API_KEY'), '')
|
|
self.model = cfg.get('model', 'tts-1-hd')
|
|
self.voice = cfg.get('voice', 'alloy')
|
|
self.default_speed = cfg.get('speed', 1.0)
|
|
|
|
def synthesize(self, text: str, output_path: str,
|
|
lang: str = 'ko', speed: float = 0.0) -> bool:
|
|
if not self.api_key:
|
|
logger.warning("OPENAI_API_KEY 없음 — OpenAITTS 비활성화")
|
|
return False
|
|
try:
|
|
from openai import OpenAI
|
|
client = OpenAI(api_key=self.api_key)
|
|
speak_speed = speed if speed > 0 else self.default_speed
|
|
response = client.audio.speech.create(
|
|
model=self.model,
|
|
voice=self.voice,
|
|
input=text,
|
|
speed=speak_speed,
|
|
response_format='wav',
|
|
)
|
|
response.stream_to_file(output_path)
|
|
return Path(output_path).exists()
|
|
except ImportError:
|
|
logger.warning("openai 미설치 — OpenAITTS 비활성화")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"OpenAITTS 실패: {e}")
|
|
return False
|
|
|
|
|
|
class ElevenLabsTTS(BaseTTS):
|
|
"""ElevenLabs REST API TTS"""
|
|
|
|
def __init__(self, cfg: dict):
|
|
self.api_key = os.getenv(cfg.get('api_key_env', 'ELEVENLABS_API_KEY'), '')
|
|
self.model = cfg.get('model', 'eleven_multilingual_v2')
|
|
self.voice_id = cfg.get('voice_id', 'pNInz6obpgDQGcFmaJgB')
|
|
self.stability = cfg.get('stability', 0.5)
|
|
self.similarity_boost = cfg.get('similarity_boost', 0.75)
|
|
|
|
def synthesize(self, text: str, output_path: str,
|
|
lang: str = 'ko', speed: float = 0.0) -> bool:
|
|
if not self.api_key:
|
|
logger.warning("ELEVENLABS_API_KEY 없음 — ElevenLabsTTS 비활성화")
|
|
return False
|
|
try:
|
|
import requests as req
|
|
url = (
|
|
f'https://api.elevenlabs.io/v1/text-to-speech/'
|
|
f'{self.voice_id}'
|
|
)
|
|
headers = {
|
|
'xi-api-key': self.api_key,
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'audio/mpeg',
|
|
}
|
|
payload = {
|
|
'text': text,
|
|
'model_id': self.model,
|
|
'voice_settings': {
|
|
'stability': self.stability,
|
|
'similarity_boost': self.similarity_boost,
|
|
},
|
|
}
|
|
resp = req.post(url, json=payload, headers=headers, timeout=60)
|
|
resp.raise_for_status()
|
|
# mp3 응답 → 파일 저장 (wav 확장자라도 mp3 데이터 저장 후 ffmpeg 변환)
|
|
mp3_path = str(output_path).replace('.wav', '_tmp.mp3')
|
|
Path(mp3_path).write_bytes(resp.content)
|
|
# mp3 → wav 변환 (ffmpeg 사용)
|
|
ffmpeg = os.getenv('FFMPEG_PATH', 'ffmpeg')
|
|
result = subprocess.run(
|
|
[ffmpeg, '-y', '-loglevel', 'error', '-i', mp3_path,
|
|
'-ar', '24000', output_path],
|
|
capture_output=True, timeout=60,
|
|
)
|
|
Path(mp3_path).unlink(missing_ok=True)
|
|
return Path(output_path).exists() and result.returncode == 0
|
|
except Exception as e:
|
|
logger.error(f"ElevenLabsTTS 실패: {e}")
|
|
return False
|
|
|
|
|
|
class GTTSEngine(BaseTTS):
|
|
"""gTTS 무료 TTS 엔진"""
|
|
|
|
def __init__(self, cfg: dict):
|
|
self.default_lang = cfg.get('lang', 'ko')
|
|
self.slow = cfg.get('slow', False)
|
|
|
|
def synthesize(self, text: str, output_path: str,
|
|
lang: str = 'ko', speed: float = 0.0) -> bool:
|
|
try:
|
|
from gtts import gTTS
|
|
use_lang = lang if lang else self.default_lang
|
|
mp3_path = str(output_path).replace('.wav', '_tmp.mp3')
|
|
tts = gTTS(text=text, lang=use_lang, slow=self.slow)
|
|
tts.save(mp3_path)
|
|
# mp3 → wav 변환 (ffmpeg 사용)
|
|
ffmpeg = os.getenv('FFMPEG_PATH', 'ffmpeg')
|
|
result = subprocess.run(
|
|
[ffmpeg, '-y', '-loglevel', 'error', '-i', mp3_path,
|
|
'-ar', '24000', output_path],
|
|
capture_output=True, timeout=60,
|
|
)
|
|
Path(mp3_path).unlink(missing_ok=True)
|
|
return Path(output_path).exists() and result.returncode == 0
|
|
except ImportError:
|
|
logger.warning("gTTS 미설치 — GTTSEngine 비활성화")
|
|
return False
|
|
except Exception as e:
|
|
logger.warning(f"GTTSEngine 실패: {e}")
|
|
return False
|
|
|
|
|
|
# ─── ImageGenerator 구현체 ─────────────────────────────
|
|
|
|
class DALLEGenerator(BaseImageGenerator):
|
|
"""OpenAI DALL-E 3 이미지 생성 엔진"""
|
|
|
|
def __init__(self, cfg: dict):
|
|
self.api_key = os.getenv(cfg.get('api_key_env', 'OPENAI_API_KEY'), '')
|
|
self.model = cfg.get('model', 'dall-e-3')
|
|
self.default_size = cfg.get('size', '1024x1792')
|
|
self.quality = cfg.get('quality', 'standard')
|
|
|
|
def generate(self, prompt: str, output_path: str,
|
|
size: str = '') -> bool:
|
|
if not self.api_key:
|
|
logger.warning("OPENAI_API_KEY 없음 — DALLEGenerator 비활성화")
|
|
return False
|
|
try:
|
|
from openai import OpenAI
|
|
import requests as req
|
|
import io
|
|
from PIL import Image
|
|
|
|
use_size = size if size else self.default_size
|
|
client = OpenAI(api_key=self.api_key)
|
|
full_prompt = prompt + ' No text, no letters, no numbers, no watermarks.'
|
|
response = client.images.generate(
|
|
model=self.model,
|
|
prompt=full_prompt,
|
|
size=use_size,
|
|
quality=self.quality,
|
|
n=1,
|
|
)
|
|
img_url = response.data[0].url
|
|
img_bytes = req.get(img_url, timeout=30).content
|
|
img = Image.open(io.BytesIO(img_bytes)).convert('RGB')
|
|
img.save(output_path)
|
|
logger.info(f"DALL-E 이미지 생성 완료: {output_path}")
|
|
return True
|
|
except ImportError as e:
|
|
logger.warning(f"DALLEGenerator 의존성 없음: {e}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"DALLEGenerator 실패: {e}")
|
|
return False
|
|
|
|
|
|
class ExternalGenerator(BaseImageGenerator):
|
|
"""수동 이미지 제공 (자동 생성 없음)"""
|
|
|
|
def __init__(self, cfg: dict):
|
|
pass
|
|
|
|
def generate(self, prompt: str, output_path: str,
|
|
size: str = '') -> bool:
|
|
logger.info(f"ExternalGenerator: 수동 이미지 필요 — 프롬프트: {prompt[:60]}")
|
|
return False
|
|
|
|
|
|
# ─── EngineLoader ───────────────────────────────────────
|
|
|
|
class EngineLoader:
|
|
"""
|
|
config/engine.json을 읽어 현재 설정된 provider에 맞는 구현체를 반환하는
|
|
중앙 팩토리 클래스.
|
|
|
|
사용 예:
|
|
loader = EngineLoader()
|
|
writer = loader.get_writer()
|
|
text = writer.write("오늘의 AI 뉴스 정리해줘")
|
|
"""
|
|
|
|
_DEFAULT_CONFIG = {
|
|
'writing': {'provider': 'claude', 'options': {'claude': {}}},
|
|
'tts': {'provider': 'gtts', 'options': {'gtts': {}}},
|
|
'image_generation': {'provider': 'external', 'options': {'external': {}}},
|
|
'video_generation': {'provider': 'ffmpeg_slides', 'options': {'ffmpeg_slides': {}}},
|
|
'publishing': {},
|
|
'quality_gates': {'gate1_research_min_score': 60},
|
|
}
|
|
|
|
def __init__(self, config_path: Optional[Path] = None):
|
|
self._config_path = config_path or CONFIG_PATH
|
|
self._config = self._load_config()
|
|
|
|
def _load_config(self) -> dict:
|
|
if self._config_path.exists():
|
|
try:
|
|
return json.loads(self._config_path.read_text(encoding='utf-8'))
|
|
except Exception as e:
|
|
logger.error(f"engine.json 로드 실패: {e} — 기본값 사용")
|
|
else:
|
|
logger.warning(f"engine.json 없음 ({self._config_path}) — 기본값으로 gtts + ffmpeg_slides 사용")
|
|
return dict(self._DEFAULT_CONFIG)
|
|
|
|
def get_config(self, *keys) -> Any:
|
|
"""
|
|
engine.json 값 접근.
|
|
예: loader.get_config('writing', 'provider')
|
|
loader.get_config('quality_gates', 'gate1_research_min_score')
|
|
"""
|
|
val = self._config
|
|
for key in keys:
|
|
if isinstance(val, dict):
|
|
val = val.get(key)
|
|
else:
|
|
return None
|
|
return val
|
|
|
|
def update_provider(self, category: str, provider: str) -> None:
|
|
"""
|
|
런타임 provider 변경 (engine.json 파일은 수정하지 않음).
|
|
예: loader.update_provider('tts', 'openai')
|
|
"""
|
|
if category in self._config:
|
|
self._config[category]['provider'] = provider
|
|
logger.info(f"런타임 provider 변경: {category} → {provider}")
|
|
else:
|
|
logger.warning(f"update_provider: 알 수 없는 카테고리 '{category}'")
|
|
|
|
def get_writer(self) -> BaseWriter:
|
|
"""현재 설정된 writing provider에 맞는 BaseWriter 구현체 반환"""
|
|
writing_cfg = self._config.get('writing', {})
|
|
provider = writing_cfg.get('provider', 'claude')
|
|
options = writing_cfg.get('options', {}).get(provider, {})
|
|
|
|
writers = {
|
|
'claude': ClaudeWriter,
|
|
'openclaw': OpenClawWriter,
|
|
'gemini': GeminiWriter,
|
|
}
|
|
cls = writers.get(provider, ClaudeWriter)
|
|
logger.info(f"Writer 로드: {provider} ({cls.__name__})")
|
|
return cls(options)
|
|
|
|
def get_tts(self) -> BaseTTS:
|
|
"""현재 설정된 tts provider에 맞는 BaseTTS 구현체 반환"""
|
|
tts_cfg = self._config.get('tts', {})
|
|
provider = tts_cfg.get('provider', 'gtts')
|
|
options = tts_cfg.get('options', {}).get(provider, {})
|
|
|
|
tts_engines = {
|
|
'google_cloud': GoogleCloudTTS,
|
|
'openai': OpenAITTS,
|
|
'elevenlabs': ElevenLabsTTS,
|
|
'gtts': GTTSEngine,
|
|
}
|
|
cls = tts_engines.get(provider, GTTSEngine)
|
|
logger.info(f"TTS 로드: {provider} ({cls.__name__})")
|
|
return cls(options)
|
|
|
|
def get_image_generator(self) -> BaseImageGenerator:
|
|
"""현재 설정된 image_generation provider에 맞는 구현체 반환"""
|
|
img_cfg = self._config.get('image_generation', {})
|
|
provider = img_cfg.get('provider', 'external')
|
|
options = img_cfg.get('options', {}).get(provider, {})
|
|
|
|
generators = {
|
|
'dalle': DALLEGenerator,
|
|
'external': ExternalGenerator,
|
|
}
|
|
cls = generators.get(provider, ExternalGenerator)
|
|
logger.info(f"ImageGenerator 로드: {provider} ({cls.__name__})")
|
|
return cls(options)
|
|
|
|
def get_video_generator(self):
|
|
"""현재 설정된 video_generation provider에 맞는 VideoEngine 구현체 반환"""
|
|
from bots.converters import video_engine
|
|
video_cfg = self._config.get('video_generation', {
|
|
'provider': 'ffmpeg_slides',
|
|
'options': {'ffmpeg_slides': {}},
|
|
})
|
|
engine = video_engine.get_engine(video_cfg)
|
|
logger.info(f"VideoGenerator 로드: {video_cfg.get('provider', 'ffmpeg_slides')}")
|
|
return engine
|
|
|
|
def get_publishers(self) -> list:
|
|
"""
|
|
활성화된 publishing 채널 목록 반환.
|
|
반환 형식: [{'name': str, 'enabled': bool, ...설정값}, ...]
|
|
"""
|
|
publishing_cfg = self._config.get('publishing', {})
|
|
result = []
|
|
for name, cfg in publishing_cfg.items():
|
|
if isinstance(cfg, dict):
|
|
result.append({'name': name, **cfg})
|
|
return result
|
|
|
|
def get_enabled_publishers(self) -> list:
|
|
"""enabled: true인 publishing 채널만 반환"""
|
|
return [p for p in self.get_publishers() if p.get('enabled', False)]
|