Files
blog-writer/bots/engine_loader.py
T
sinmb79 8a7a122bb3 feat: v3.0 엔진 추상화 + 소설 파이프라인 추가
[1순위] 엔진 추상화 리팩토링
- config/engine.json: 단일 설정 파일로 writing/tts/image/video/publishing 엔진 제어
- bots/engine_loader.py: EngineLoader 팩토리 클래스 (Claude/OpenClaw/Gemini Writer, gTTS/GoogleCloud/OpenAI/ElevenLabs TTS, DALL-E/External 이미지)

[2순위] VideoEngine 추상화
- bots/converters/video_engine.py: VideoEngine ABC + FFmpegSlidesEngine/SeedanceEngine/SoraEngine/RunwayEngine/VeoEngine 구현
- Seedance 2.0 API 연동 + 실패 시 ffmpeg_slides 자동 fallback

[3순위] 소설 연재 파이프라인
- bots/novel/novel_writer.py: AI 에피소드 자동 생성 (Claude/엔진 추상화)
- bots/novel/novel_blog_converter.py: 에피소드 → 장르별 테마 Blogger HTML
- bots/novel/novel_shorts_converter.py: key_scenes → TTS + Pillow + VideoEngine → MP4
- bots/novel/novel_manager.py: 전체 파이프라인 조율 + Telegram 명령 처리
- config/novels/shadow-protocol.json: 예시 소설 설정 (2040 서울 SF 스릴러)

[스케줄러] 소설 파이프라인 통합
- 매주 월/목 09:00 자동 실행 (job_novel_pipeline)
- Telegram 명령: /novel_list, /novel_gen, /novel_status

[기타 수정]
- collector_bot.py: 한국어 유니코드 감지 + RSS 신뢰도 override 버그 수정
- quality_rules.json: min_score 70→60
- scripts/get_token.py: YouTube OAuth scope 추가
- .env.example: SEEDANCE/ELEVENLABS/GEMINI/RUNWAY API 키 항목 추가

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-26 09:33:04 +09:00

522 lines
20 KiB
Python

"""
엔진 로더 (bots/engine_loader.py)
역할: config/engine.json을 읽어 현재 설정된 provider에 맞는 구현체를 반환
설계서: blog-engine-final-masterplan-v3.txt
사용:
loader = EngineLoader()
writer = loader.get_writer()
result = writer.write("AI 관련 기사 써줘")
tts = loader.get_tts()
tts.synthesize("안녕하세요", "/tmp/out.wav")
"""
import json
import logging
import os
import subprocess
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Optional
from dotenv import load_dotenv
load_dotenv()
BASE_DIR = Path(__file__).parent.parent
CONFIG_PATH = BASE_DIR / 'config' / 'engine.json'
LOG_DIR = BASE_DIR / 'logs'
LOG_DIR.mkdir(exist_ok=True)
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.FileHandler(LOG_DIR / 'engine_loader.log', encoding='utf-8')
handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))
logger.addHandler(handler)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)
# ─── 기본 추상 클래스 ──────────────────────────────────
class BaseWriter(ABC):
@abstractmethod
def write(self, prompt: str, system: str = '') -> str:
"""글쓰기 요청. prompt에 대한 결과 문자열 반환."""
class BaseTTS(ABC):
@abstractmethod
def synthesize(self, text: str, output_path: str,
lang: str = 'ko', speed: float = 1.05) -> bool:
"""TTS 합성. 성공 시 True 반환."""
class BaseImageGenerator(ABC):
@abstractmethod
def generate(self, prompt: str, output_path: str,
size: str = '1024x1792') -> bool:
"""이미지 생성. 성공 시 True 반환."""
# VideoEngine은 video_engine.py에 정의됨
# BaseVideoGenerator 타입 힌트 호환용
BaseVideoGenerator = object
# ─── Writer 구현체 ──────────────────────────────────────
class ClaudeWriter(BaseWriter):
"""Anthropic Claude API를 사용하는 글쓰기 엔진"""
def __init__(self, cfg: dict):
self.api_key = os.getenv(cfg.get('api_key_env', 'ANTHROPIC_API_KEY'), '')
self.model = cfg.get('model', 'claude-opus-4-5')
self.max_tokens = cfg.get('max_tokens', 4096)
self.temperature = cfg.get('temperature', 0.7)
def write(self, prompt: str, system: str = '') -> str:
if not self.api_key:
logger.warning("ANTHROPIC_API_KEY 없음 — ClaudeWriter 비활성화")
return ''
try:
import anthropic
client = anthropic.Anthropic(api_key=self.api_key)
kwargs: dict = {
'model': self.model,
'max_tokens': self.max_tokens,
'messages': [{'role': 'user', 'content': prompt}],
}
if system:
kwargs['system'] = system
message = client.messages.create(**kwargs)
return message.content[0].text
except Exception as e:
logger.error(f"ClaudeWriter 오류: {e}")
return ''
class OpenClawWriter(BaseWriter):
"""OpenClaw CLI를 subprocess로 호출하는 글쓰기 엔진"""
def __init__(self, cfg: dict):
self.agent_name = cfg.get('agent_name', 'blog-writer')
self.timeout = cfg.get('timeout', 120)
def write(self, prompt: str, system: str = '') -> str:
try:
cmd = ['openclaw', 'run', self.agent_name, '--prompt', prompt]
if system:
cmd += ['--system', system]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=self.timeout,
encoding='utf-8',
)
if result.returncode != 0:
logger.error(f"OpenClawWriter 오류: {result.stderr[:300]}")
return ''
return result.stdout.strip()
except subprocess.TimeoutExpired:
logger.error(f"OpenClawWriter 타임아웃 ({self.timeout}초)")
return ''
except FileNotFoundError:
logger.warning("openclaw CLI 없음 — OpenClawWriter 비활성화")
return ''
except Exception as e:
logger.error(f"OpenClawWriter 오류: {e}")
return ''
class GeminiWriter(BaseWriter):
"""Google Gemini API를 사용하는 글쓰기 엔진"""
def __init__(self, cfg: dict):
self.api_key = os.getenv(cfg.get('api_key_env', 'GEMINI_API_KEY'), '')
self.model = cfg.get('model', 'gemini-2.0-flash')
self.max_tokens = cfg.get('max_tokens', 4096)
self.temperature = cfg.get('temperature', 0.7)
def write(self, prompt: str, system: str = '') -> str:
if not self.api_key:
logger.warning("GEMINI_API_KEY 없음 — GeminiWriter 비활성화")
return ''
try:
import google.generativeai as genai # type: ignore
genai.configure(api_key=self.api_key)
model = genai.GenerativeModel(
model_name=self.model,
generation_config={
'max_output_tokens': self.max_tokens,
'temperature': self.temperature,
},
system_instruction=system if system else None,
)
response = model.generate_content(prompt)
return response.text
except ImportError:
logger.warning("google-generativeai 미설치 — GeminiWriter 비활성화")
return ''
except Exception as e:
logger.error(f"GeminiWriter 오류: {e}")
return ''
# ─── TTS 구현체 ─────────────────────────────────────────
class GoogleCloudTTS(BaseTTS):
"""Google Cloud TTS REST API (API Key 방식)"""
def __init__(self, cfg: dict):
self.api_key = os.getenv(cfg.get('api_key_env', 'GOOGLE_TTS_API_KEY'), '')
self.voice = cfg.get('voice', 'ko-KR-Wavenet-A')
self.default_speed = cfg.get('speaking_rate', 1.05)
self.pitch = cfg.get('pitch', 0)
def synthesize(self, text: str, output_path: str,
lang: str = 'ko', speed: float = 0.0) -> bool:
if not self.api_key:
logger.warning("GOOGLE_TTS_API_KEY 없음 — GoogleCloudTTS 비활성화")
return False
import base64
try:
import requests as req
speaking_rate = speed if speed > 0 else self.default_speed
voice_name = self.voice if lang == 'ko' else 'en-US-Wavenet-D'
language_code = 'ko-KR' if lang == 'ko' else 'en-US'
url = (
f'https://texttospeech.googleapis.com/v1/text:synthesize'
f'?key={self.api_key}'
)
payload = {
'input': {'text': text},
'voice': {'languageCode': language_code, 'name': voice_name},
'audioConfig': {
'audioEncoding': 'LINEAR16',
'speakingRate': speaking_rate,
'pitch': self.pitch,
},
}
resp = req.post(url, json=payload, timeout=30)
resp.raise_for_status()
audio_b64 = resp.json().get('audioContent', '')
if audio_b64:
Path(output_path).write_bytes(base64.b64decode(audio_b64))
return True
except Exception as e:
logger.warning(f"GoogleCloudTTS 실패: {e}")
return False
class OpenAITTS(BaseTTS):
"""OpenAI TTS API (tts-1-hd)"""
def __init__(self, cfg: dict):
self.api_key = os.getenv(cfg.get('api_key_env', 'OPENAI_API_KEY'), '')
self.model = cfg.get('model', 'tts-1-hd')
self.voice = cfg.get('voice', 'alloy')
self.default_speed = cfg.get('speed', 1.0)
def synthesize(self, text: str, output_path: str,
lang: str = 'ko', speed: float = 0.0) -> bool:
if not self.api_key:
logger.warning("OPENAI_API_KEY 없음 — OpenAITTS 비활성화")
return False
try:
from openai import OpenAI
client = OpenAI(api_key=self.api_key)
speak_speed = speed if speed > 0 else self.default_speed
response = client.audio.speech.create(
model=self.model,
voice=self.voice,
input=text,
speed=speak_speed,
response_format='wav',
)
response.stream_to_file(output_path)
return Path(output_path).exists()
except ImportError:
logger.warning("openai 미설치 — OpenAITTS 비활성화")
return False
except Exception as e:
logger.error(f"OpenAITTS 실패: {e}")
return False
class ElevenLabsTTS(BaseTTS):
"""ElevenLabs REST API TTS"""
def __init__(self, cfg: dict):
self.api_key = os.getenv(cfg.get('api_key_env', 'ELEVENLABS_API_KEY'), '')
self.model = cfg.get('model', 'eleven_multilingual_v2')
self.voice_id = cfg.get('voice_id', 'pNInz6obpgDQGcFmaJgB')
self.stability = cfg.get('stability', 0.5)
self.similarity_boost = cfg.get('similarity_boost', 0.75)
def synthesize(self, text: str, output_path: str,
lang: str = 'ko', speed: float = 0.0) -> bool:
if not self.api_key:
logger.warning("ELEVENLABS_API_KEY 없음 — ElevenLabsTTS 비활성화")
return False
try:
import requests as req
url = (
f'https://api.elevenlabs.io/v1/text-to-speech/'
f'{self.voice_id}'
)
headers = {
'xi-api-key': self.api_key,
'Content-Type': 'application/json',
'Accept': 'audio/mpeg',
}
payload = {
'text': text,
'model_id': self.model,
'voice_settings': {
'stability': self.stability,
'similarity_boost': self.similarity_boost,
},
}
resp = req.post(url, json=payload, headers=headers, timeout=60)
resp.raise_for_status()
# mp3 응답 → 파일 저장 (wav 확장자라도 mp3 데이터 저장 후 ffmpeg 변환)
mp3_path = str(output_path).replace('.wav', '_tmp.mp3')
Path(mp3_path).write_bytes(resp.content)
# mp3 → wav 변환 (ffmpeg 사용)
ffmpeg = os.getenv('FFMPEG_PATH', 'ffmpeg')
result = subprocess.run(
[ffmpeg, '-y', '-loglevel', 'error', '-i', mp3_path,
'-ar', '24000', output_path],
capture_output=True, timeout=60,
)
Path(mp3_path).unlink(missing_ok=True)
return Path(output_path).exists() and result.returncode == 0
except Exception as e:
logger.error(f"ElevenLabsTTS 실패: {e}")
return False
class GTTSEngine(BaseTTS):
"""gTTS 무료 TTS 엔진"""
def __init__(self, cfg: dict):
self.default_lang = cfg.get('lang', 'ko')
self.slow = cfg.get('slow', False)
def synthesize(self, text: str, output_path: str,
lang: str = 'ko', speed: float = 0.0) -> bool:
try:
from gtts import gTTS
use_lang = lang if lang else self.default_lang
mp3_path = str(output_path).replace('.wav', '_tmp.mp3')
tts = gTTS(text=text, lang=use_lang, slow=self.slow)
tts.save(mp3_path)
# mp3 → wav 변환 (ffmpeg 사용)
ffmpeg = os.getenv('FFMPEG_PATH', 'ffmpeg')
result = subprocess.run(
[ffmpeg, '-y', '-loglevel', 'error', '-i', mp3_path,
'-ar', '24000', output_path],
capture_output=True, timeout=60,
)
Path(mp3_path).unlink(missing_ok=True)
return Path(output_path).exists() and result.returncode == 0
except ImportError:
logger.warning("gTTS 미설치 — GTTSEngine 비활성화")
return False
except Exception as e:
logger.warning(f"GTTSEngine 실패: {e}")
return False
# ─── ImageGenerator 구현체 ─────────────────────────────
class DALLEGenerator(BaseImageGenerator):
"""OpenAI DALL-E 3 이미지 생성 엔진"""
def __init__(self, cfg: dict):
self.api_key = os.getenv(cfg.get('api_key_env', 'OPENAI_API_KEY'), '')
self.model = cfg.get('model', 'dall-e-3')
self.default_size = cfg.get('size', '1024x1792')
self.quality = cfg.get('quality', 'standard')
def generate(self, prompt: str, output_path: str,
size: str = '') -> bool:
if not self.api_key:
logger.warning("OPENAI_API_KEY 없음 — DALLEGenerator 비활성화")
return False
try:
from openai import OpenAI
import requests as req
import io
from PIL import Image
use_size = size if size else self.default_size
client = OpenAI(api_key=self.api_key)
full_prompt = prompt + ' No text, no letters, no numbers, no watermarks.'
response = client.images.generate(
model=self.model,
prompt=full_prompt,
size=use_size,
quality=self.quality,
n=1,
)
img_url = response.data[0].url
img_bytes = req.get(img_url, timeout=30).content
img = Image.open(io.BytesIO(img_bytes)).convert('RGB')
img.save(output_path)
logger.info(f"DALL-E 이미지 생성 완료: {output_path}")
return True
except ImportError as e:
logger.warning(f"DALLEGenerator 의존성 없음: {e}")
return False
except Exception as e:
logger.error(f"DALLEGenerator 실패: {e}")
return False
class ExternalGenerator(BaseImageGenerator):
"""수동 이미지 제공 (자동 생성 없음)"""
def __init__(self, cfg: dict):
pass
def generate(self, prompt: str, output_path: str,
size: str = '') -> bool:
logger.info(f"ExternalGenerator: 수동 이미지 필요 — 프롬프트: {prompt[:60]}")
return False
# ─── EngineLoader ───────────────────────────────────────
class EngineLoader:
"""
config/engine.json을 읽어 현재 설정된 provider에 맞는 구현체를 반환하는
중앙 팩토리 클래스.
사용 예:
loader = EngineLoader()
writer = loader.get_writer()
text = writer.write("오늘의 AI 뉴스 정리해줘")
"""
_DEFAULT_CONFIG = {
'writing': {'provider': 'claude', 'options': {'claude': {}}},
'tts': {'provider': 'gtts', 'options': {'gtts': {}}},
'image_generation': {'provider': 'external', 'options': {'external': {}}},
'video_generation': {'provider': 'ffmpeg_slides', 'options': {'ffmpeg_slides': {}}},
'publishing': {},
'quality_gates': {'gate1_research_min_score': 60},
}
def __init__(self, config_path: Optional[Path] = None):
self._config_path = config_path or CONFIG_PATH
self._config = self._load_config()
def _load_config(self) -> dict:
if self._config_path.exists():
try:
return json.loads(self._config_path.read_text(encoding='utf-8'))
except Exception as e:
logger.error(f"engine.json 로드 실패: {e} — 기본값 사용")
else:
logger.warning(f"engine.json 없음 ({self._config_path}) — 기본값으로 gtts + ffmpeg_slides 사용")
return dict(self._DEFAULT_CONFIG)
def get_config(self, *keys) -> Any:
"""
engine.json 값 접근.
예: loader.get_config('writing', 'provider')
loader.get_config('quality_gates', 'gate1_research_min_score')
"""
val = self._config
for key in keys:
if isinstance(val, dict):
val = val.get(key)
else:
return None
return val
def update_provider(self, category: str, provider: str) -> None:
"""
런타임 provider 변경 (engine.json 파일은 수정하지 않음).
예: loader.update_provider('tts', 'openai')
"""
if category in self._config:
self._config[category]['provider'] = provider
logger.info(f"런타임 provider 변경: {category}{provider}")
else:
logger.warning(f"update_provider: 알 수 없는 카테고리 '{category}'")
def get_writer(self) -> BaseWriter:
"""현재 설정된 writing provider에 맞는 BaseWriter 구현체 반환"""
writing_cfg = self._config.get('writing', {})
provider = writing_cfg.get('provider', 'claude')
options = writing_cfg.get('options', {}).get(provider, {})
writers = {
'claude': ClaudeWriter,
'openclaw': OpenClawWriter,
'gemini': GeminiWriter,
}
cls = writers.get(provider, ClaudeWriter)
logger.info(f"Writer 로드: {provider} ({cls.__name__})")
return cls(options)
def get_tts(self) -> BaseTTS:
"""현재 설정된 tts provider에 맞는 BaseTTS 구현체 반환"""
tts_cfg = self._config.get('tts', {})
provider = tts_cfg.get('provider', 'gtts')
options = tts_cfg.get('options', {}).get(provider, {})
tts_engines = {
'google_cloud': GoogleCloudTTS,
'openai': OpenAITTS,
'elevenlabs': ElevenLabsTTS,
'gtts': GTTSEngine,
}
cls = tts_engines.get(provider, GTTSEngine)
logger.info(f"TTS 로드: {provider} ({cls.__name__})")
return cls(options)
def get_image_generator(self) -> BaseImageGenerator:
"""현재 설정된 image_generation provider에 맞는 구현체 반환"""
img_cfg = self._config.get('image_generation', {})
provider = img_cfg.get('provider', 'external')
options = img_cfg.get('options', {}).get(provider, {})
generators = {
'dalle': DALLEGenerator,
'external': ExternalGenerator,
}
cls = generators.get(provider, ExternalGenerator)
logger.info(f"ImageGenerator 로드: {provider} ({cls.__name__})")
return cls(options)
def get_video_generator(self):
"""현재 설정된 video_generation provider에 맞는 VideoEngine 구현체 반환"""
from bots.converters import video_engine
video_cfg = self._config.get('video_generation', {
'provider': 'ffmpeg_slides',
'options': {'ffmpeg_slides': {}},
})
engine = video_engine.get_engine(video_cfg)
logger.info(f"VideoGenerator 로드: {video_cfg.get('provider', 'ffmpeg_slides')}")
return engine
def get_publishers(self) -> list:
"""
활성화된 publishing 채널 목록 반환.
반환 형식: [{'name': str, 'enabled': bool, ...설정값}, ...]
"""
publishing_cfg = self._config.get('publishing', {})
result = []
for name, cfg in publishing_cfg.items():
if isinstance(cfg, dict):
result.append({'name': name, **cfg})
return result
def get_enabled_publishers(self) -> list:
"""enabled: true인 publishing 채널만 반환"""
return [p for p in self.get_publishers() if p.get('enabled', False)]