Files
blog-writer/bots/engine_loader.py
JOUNGWOOK KWON 726c593e85 feat: Reddit 수집, 쇼츠 텔레그램 미리보기, 코너 9개 체계 정비
- Reddit 트렌딩 수집기 추가 (/reddit collect, /pick 명령어)
- 쇼츠 영상 텔레그램 미리보기 후 승인 기반 YouTube 업로드
- 코너 9개로 통합 (앱추천→제품리뷰, 재테크절약→재테크, TV로보는세상/건강정보 추가)
- RSS 피드 73개로 확대 (9개 코너 전체 커버)
- 블로그 중복 검토 알림 수정, 글 잘림 방지 (max_tokens 8192)
- 제품리뷰 다중 이미지 지원, 저품질 이미지 필터링 강화
- HookOptimizer LLM 연동, 인스타/X/틱톡 스케줄러 비활성화

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-07 13:56:20 +09:00

780 lines
31 KiB
Python

"""
엔진 로더 (bots/engine_loader.py)
역할: config/engine.json을 읽어 현재 설정된 provider에 맞는 구현체를 반환
설계서: blog-engine-final-masterplan-v3.txt
사용:
loader = EngineLoader()
writer = loader.get_writer()
result = writer.write("AI 관련 기사 써줘")
tts = loader.get_tts()
tts.synthesize("안녕하세요", "/tmp/out.wav")
"""
import json
import logging
import os
import subprocess
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Optional
from dotenv import load_dotenv
load_dotenv()
BASE_DIR = Path(__file__).parent.parent
CONFIG_PATH = BASE_DIR / 'config' / 'engine.json'
LOG_DIR = BASE_DIR / 'logs'
LOG_DIR.mkdir(exist_ok=True)
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.FileHandler(LOG_DIR / 'engine_loader.log', encoding='utf-8')
handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))
logger.addHandler(handler)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)
# ─── 기본 추상 클래스 ──────────────────────────────────
class BaseWriter(ABC):
@abstractmethod
def write(self, prompt: str, system: str = '') -> str:
"""글쓰기 요청. prompt에 대한 결과 문자열 반환."""
class BaseTTS(ABC):
@abstractmethod
def synthesize(self, text: str, output_path: str,
lang: str = 'ko', speed: float = 1.05) -> bool:
"""TTS 합성. 성공 시 True 반환."""
class BaseImageGenerator(ABC):
@abstractmethod
def generate(self, prompt: str, output_path: str,
size: str = '1024x1792') -> bool:
"""이미지 생성. 성공 시 True 반환."""
# VideoEngine은 video_engine.py에 정의됨
# BaseVideoGenerator 타입 힌트 호환용
BaseVideoGenerator = object
# ─── Writer 구현체 ──────────────────────────────────────
class ClaudeWriter(BaseWriter):
"""Anthropic Claude API를 사용하는 글쓰기 엔진 (프록시 base_url 지원)"""
def __init__(self, cfg: dict):
self.api_key = os.getenv(cfg.get('api_key_env', 'ANTHROPIC_API_KEY'), '')
self.model = cfg.get('model', 'claude-sonnet-4-6')
self.max_tokens = cfg.get('max_tokens', 4096)
self.temperature = cfg.get('temperature', 0.7)
self.base_url = cfg.get('base_url', '') # 프록시 URL (옵션)
def write(self, prompt: str, system: str = '') -> str:
if not self.api_key:
logger.warning("ANTHROPIC_API_KEY 없음 — ClaudeWriter 비활성화")
return ''
try:
import anthropic
client_kwargs = {
'api_key': self.api_key,
'timeout': 120.0, # 2분 타임아웃
'max_retries': 0, # 401 등 에러 시 재시도 안 함 → 즉시 fallback
}
if self.base_url:
client_kwargs['base_url'] = self.base_url
client = anthropic.Anthropic(**client_kwargs)
kwargs: dict = {
'model': self.model,
'max_tokens': self.max_tokens,
'messages': [{'role': 'user', 'content': prompt}],
}
if system:
kwargs['system'] = system
message = client.messages.create(**kwargs)
if message.stop_reason == 'max_tokens':
logger.warning(f"ClaudeWriter: 응답이 max_tokens({self.max_tokens})에서 잘림 — 글이 불완전할 수 있음")
return message.content[0].text
except Exception as e:
logger.error(f"ClaudeWriter 오류: {e}")
return ''
class OpenClawWriter(BaseWriter):
"""OpenClaw CLI를 subprocess로 호출하는 글쓰기 엔진 (ChatGPT Pro OAuth)"""
# Windows에서 npm 글로벌 .cmd 스크립트 우선 사용
_CLI = 'openclaw.cmd' if os.name == 'nt' else 'openclaw'
def __init__(self, cfg: dict):
self.agent_name = cfg.get('agent_name', 'blog-writer')
self.timeout = cfg.get('timeout', 300)
def write(self, prompt: str, system: str = '') -> str:
try:
message = f"{system}\n\n{prompt}".strip() if system else prompt
cmd = [
self._CLI, 'agent',
'--agent', self.agent_name,
'--message', message,
'--json',
]
result = subprocess.run(
cmd,
capture_output=True,
timeout=self.timeout,
shell=False,
)
stderr_str = result.stderr.decode('utf-8', errors='replace').strip()
if result.returncode != 0:
logger.error(f"OpenClawWriter returncode={result.returncode} stderr={stderr_str[:300]}")
return ''
# stdout이 비어있는 경우 — openclaw가 stderr에만 출력하거나 인증 실패
if not result.stdout:
logger.error(f"OpenClawWriter stdout 비어있음 (returncode=0) stderr={stderr_str[:300]}")
return ''
stdout = result.stdout.decode('utf-8', errors='replace').strip()
if not stdout:
logger.error("OpenClawWriter stdout 디코딩 후 비어있음")
return ''
# 1) JSON 응답 시도 — JSON 블록 추출
json_candidate = stdout
if not stdout.startswith('{'):
import re
m = re.search(r'\{[\s\S]*\}', stdout)
json_candidate = m.group(0) if m else ''
if json_candidate:
try:
data = json.loads(json_candidate)
payloads = data.get('result', {}).get('payloads', [])
if payloads:
return payloads[0].get('text', '') or stdout
except json.JSONDecodeError:
pass
# 2) JSON 파싱 실패 또는 payloads 없음 → plain text 그대로 반환
logger.info(f"OpenClawWriter plain text 응답 ({len(stdout)}자)")
return stdout
except subprocess.TimeoutExpired:
logger.error(f"OpenClawWriter 타임아웃 ({self.timeout}초)")
return ''
except FileNotFoundError:
logger.warning("openclaw CLI 없음 — OpenClawWriter 비활성화")
return ''
except Exception as e:
logger.error(f"OpenClawWriter 오류: {e}")
return ''
class GeminiWriter(BaseWriter):
"""Google Gemini API를 사용하는 글쓰기 엔진"""
def __init__(self, cfg: dict):
self.api_key = os.getenv(cfg.get('api_key_env', 'GEMINI_API_KEY'), '')
self.model = cfg.get('model', 'gemini-2.0-flash')
self.max_tokens = cfg.get('max_tokens', 4096)
self.temperature = cfg.get('temperature', 0.7)
def write(self, prompt: str, system: str = '') -> str:
if not self.api_key:
logger.warning("GEMINI_API_KEY 없음 — GeminiWriter 비활성화")
return ''
try:
import google.generativeai as genai # type: ignore
genai.configure(api_key=self.api_key)
model = genai.GenerativeModel(
model_name=self.model,
generation_config={
'max_output_tokens': self.max_tokens,
'temperature': self.temperature,
},
system_instruction=system if system else None,
)
response = model.generate_content(prompt)
return response.text
except ImportError:
logger.warning("google-generativeai 미설치 — GeminiWriter 비활성화")
return ''
except Exception as e:
logger.error(f"GeminiWriter 오류: {e}")
return ''
class GroqWriter(BaseWriter):
"""Groq API를 사용하는 글쓰기 엔진 (Gemini fallback용)"""
def __init__(self, cfg: dict):
self.api_key = os.getenv(cfg.get('api_key_env', 'GROQ_API_KEY'), '')
self.model = cfg.get('model', 'llama-3.3-70b-versatile')
self.max_tokens = cfg.get('max_tokens', 4096)
self.temperature = cfg.get('temperature', 0.7)
def write(self, prompt: str, system: str = '') -> str:
if not self.api_key:
logger.warning("GROQ_API_KEY 없음 — GroqWriter 비활성화")
return ''
try:
from groq import Groq
client = Groq(api_key=self.api_key)
messages = []
if system:
messages.append({'role': 'system', 'content': system})
messages.append({'role': 'user', 'content': prompt})
response = client.chat.completions.create(
model=self.model,
messages=messages,
max_tokens=self.max_tokens,
temperature=self.temperature,
)
return response.choices[0].message.content or ''
except ImportError:
logger.warning("groq 미설치 — GroqWriter 비활성화")
return ''
except Exception as e:
logger.error(f"GroqWriter 오류: {e}")
return ''
class ClaudeWebWriter(BaseWriter):
"""Playwright Chromium에 세션 쿠키를 주입해 claude.ai를 자동화하는 Writer
Chrome을 닫을 필요 없음 — Playwright 자체 Chromium을 별도로 실행.
필요 환경변수:
CLAUDE_WEB_COOKIE — __Secure-next-auth.session-token 값
"""
def __init__(self, cfg: dict):
self.cookie = os.getenv(cfg.get('cookie_env', 'CLAUDE_WEB_COOKIE'), '')
self.timeout_ms = cfg.get('timeout', 180) * 1000
def write(self, prompt: str, system: str = '') -> str:
# claude.ai는 Cloudflare Turnstile로 헤드리스 브라우저를 차단함
# 쿠키 세션 만료 여부와 무관하게 자동화 불가 → 비활성화
logger.warning("ClaudeWebWriter: Cloudflare 차단으로 비활성화 (수동 사용 권장)")
return ''
if not self.cookie:
logger.warning("CLAUDE_WEB_COOKIE 없음 — ClaudeWebWriter 비활성화")
return ''
try:
from playwright.sync_api import sync_playwright
except ImportError:
logger.warning("playwright 미설치 — ClaudeWebWriter 비활성화")
return ''
token = self.cookie.strip()
message = f"{system}\n\n{prompt}".strip() if system else prompt
try:
with sync_playwright() as pw:
browser = pw.chromium.launch(
headless=True,
args=['--disable-blink-features=AutomationControlled'],
)
ctx = browser.new_context(
user_agent=(
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/131.0.0.0 Safari/537.36'
),
)
# 세션 쿠키 주입
ctx.add_cookies([{
'name': '__Secure-next-auth.session-token',
'value': token,
'domain': 'claude.ai',
'path': '/',
'secure': True,
'httpOnly': True,
}])
page = ctx.new_page()
try:
from playwright_stealth import stealth_sync
stealth_sync(page)
except ImportError:
pass
page.goto('https://claude.ai/new', wait_until='domcontentloaded', timeout=60000)
page.wait_for_timeout(3000)
# 입력창 대기
editor = page.locator('[contenteditable="true"]').first
editor.wait_for(timeout=30000)
editor.click()
page.keyboard.type(message, delay=30)
page.keyboard.press('Enter')
# 스트리밍 완료 대기 — 전송 버튼 재활성화
page.wait_for_selector(
'button[aria-label="Send message"]:not([disabled])',
timeout=self.timeout_ms,
)
# 응답 텍스트 추출
blocks = page.locator('.font-claude-message')
text = blocks.last.inner_text() if blocks.count() else ''
browser.close()
return text.strip()
except Exception as e:
logger.error(f"ClaudeWebWriter 오류: {e}")
return ''
class GeminiWebWriter(BaseWriter):
"""gemini.google.com 웹 세션 쿠키를 사용하는 비공식 Writer (gemini-webapi)
필요 환경변수:
GEMINI_WEB_1PSID — 브라우저 DevTools > Application > Cookies >
google.com 에서 __Secure-1PSID 값
GEMINI_WEB_1PSIDTS — 같은 위치에서 __Secure-1PSIDTS 값
"""
def __init__(self, cfg: dict):
self.psid = os.getenv(cfg.get('psid_env', 'GEMINI_WEB_1PSID'), '')
self.psidts = os.getenv(cfg.get('psidts_env', 'GEMINI_WEB_1PSIDTS'), '')
def write(self, prompt: str, system: str = '') -> str:
if not self.psid or not self.psidts:
logger.warning("GEMINI_WEB_1PSID / GEMINI_WEB_1PSIDTS 없음 — GeminiWebWriter 비활성화")
return ''
try:
import asyncio
from gemini_webapi import GeminiClient
async def _run():
client = GeminiClient(secure_1psid=self.psid, secure_1psidts=self.psidts)
await client.init(timeout=30, auto_close=False, close_delay=300)
message = f"{system}\n\n{prompt}".strip() if system else prompt
resp = await client.generate_content(message)
return resp.text
try:
loop = asyncio.get_event_loop()
if loop.is_running():
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as pool:
future = pool.submit(asyncio.run, _run())
return future.result(timeout=120)
else:
return loop.run_until_complete(_run())
except RuntimeError:
return asyncio.run(_run())
except Exception as e:
logger.error(f"GeminiWebWriter 오류: {e}")
return ''
# ─── TTS 구현체 ─────────────────────────────────────────
class GoogleCloudTTS(BaseTTS):
"""Google Cloud TTS REST API (API Key 방식)"""
def __init__(self, cfg: dict):
self.api_key = os.getenv(cfg.get('api_key_env', 'GOOGLE_TTS_API_KEY'), '')
self.voice = cfg.get('voice', 'ko-KR-Wavenet-A')
self.default_speed = cfg.get('speaking_rate', 1.05)
self.pitch = cfg.get('pitch', 0)
def synthesize(self, text: str, output_path: str,
lang: str = 'ko', speed: float = 0.0) -> bool:
if not self.api_key:
logger.warning("GOOGLE_TTS_API_KEY 없음 — GoogleCloudTTS 비활성화")
return False
import base64
try:
import requests as req
speaking_rate = speed if speed > 0 else self.default_speed
voice_name = self.voice if lang == 'ko' else 'en-US-Wavenet-D'
language_code = 'ko-KR' if lang == 'ko' else 'en-US'
url = (
f'https://texttospeech.googleapis.com/v1/text:synthesize'
f'?key={self.api_key}'
)
payload = {
'input': {'text': text},
'voice': {'languageCode': language_code, 'name': voice_name},
'audioConfig': {
'audioEncoding': 'LINEAR16',
'speakingRate': speaking_rate,
'pitch': self.pitch,
},
}
resp = req.post(url, json=payload, timeout=30)
resp.raise_for_status()
audio_b64 = resp.json().get('audioContent', '')
if audio_b64:
Path(output_path).write_bytes(base64.b64decode(audio_b64))
return True
except Exception as e:
logger.warning(f"GoogleCloudTTS 실패: {e}")
return False
class OpenAITTS(BaseTTS):
"""OpenAI TTS API (tts-1-hd)"""
def __init__(self, cfg: dict):
self.api_key = os.getenv(cfg.get('api_key_env', 'OPENAI_API_KEY'), '')
self.model = cfg.get('model', 'tts-1-hd')
self.voice = cfg.get('voice', 'alloy')
self.default_speed = cfg.get('speed', 1.0)
def synthesize(self, text: str, output_path: str,
lang: str = 'ko', speed: float = 0.0) -> bool:
if not self.api_key:
logger.warning("OPENAI_API_KEY 없음 — OpenAITTS 비활성화")
return False
try:
from openai import OpenAI
client = OpenAI(api_key=self.api_key)
speak_speed = speed if speed > 0 else self.default_speed
response = client.audio.speech.create(
model=self.model,
voice=self.voice,
input=text,
speed=speak_speed,
response_format='wav',
)
response.stream_to_file(output_path)
return Path(output_path).exists()
except ImportError:
logger.warning("openai 미설치 — OpenAITTS 비활성화")
return False
except Exception as e:
logger.error(f"OpenAITTS 실패: {e}")
return False
class ElevenLabsTTS(BaseTTS):
"""ElevenLabs REST API TTS"""
def __init__(self, cfg: dict):
self.api_key = os.getenv(cfg.get('api_key_env', 'ELEVENLABS_API_KEY'), '')
self.model = cfg.get('model', 'eleven_multilingual_v2')
self.voice_id = cfg.get('voice_id', 'pNInz6obpgDQGcFmaJgB')
self.stability = cfg.get('stability', 0.5)
self.similarity_boost = cfg.get('similarity_boost', 0.75)
def synthesize(self, text: str, output_path: str,
lang: str = 'ko', speed: float = 0.0) -> bool:
if not self.api_key:
logger.warning("ELEVENLABS_API_KEY 없음 — ElevenLabsTTS 비활성화")
return False
try:
import requests as req
url = (
f'https://api.elevenlabs.io/v1/text-to-speech/'
f'{self.voice_id}'
)
headers = {
'xi-api-key': self.api_key,
'Content-Type': 'application/json',
'Accept': 'audio/mpeg',
}
payload = {
'text': text,
'model_id': self.model,
'voice_settings': {
'stability': self.stability,
'similarity_boost': self.similarity_boost,
},
}
resp = req.post(url, json=payload, headers=headers, timeout=60)
resp.raise_for_status()
# mp3 응답 → 파일 저장 (wav 확장자라도 mp3 데이터 저장 후 ffmpeg 변환)
mp3_path = str(output_path).replace('.wav', '_tmp.mp3')
Path(mp3_path).write_bytes(resp.content)
# mp3 → wav 변환 (ffmpeg 사용)
ffmpeg = os.getenv('FFMPEG_PATH', 'ffmpeg')
result = subprocess.run(
[ffmpeg, '-y', '-loglevel', 'error', '-i', mp3_path,
'-ar', '24000', output_path],
capture_output=True, timeout=60,
)
Path(mp3_path).unlink(missing_ok=True)
return Path(output_path).exists() and result.returncode == 0
except Exception as e:
logger.error(f"ElevenLabsTTS 실패: {e}")
return False
class GTTSEngine(BaseTTS):
"""gTTS 무료 TTS 엔진"""
def __init__(self, cfg: dict):
self.default_lang = cfg.get('lang', 'ko')
self.slow = cfg.get('slow', False)
def synthesize(self, text: str, output_path: str,
lang: str = 'ko', speed: float = 0.0) -> bool:
try:
from gtts import gTTS
use_lang = lang if lang else self.default_lang
mp3_path = str(output_path).replace('.wav', '_tmp.mp3')
tts = gTTS(text=text, lang=use_lang, slow=self.slow)
tts.save(mp3_path)
# mp3 → wav 변환 (ffmpeg 사용)
ffmpeg = os.getenv('FFMPEG_PATH', 'ffmpeg')
result = subprocess.run(
[ffmpeg, '-y', '-loglevel', 'error', '-i', mp3_path,
'-ar', '24000', output_path],
capture_output=True, timeout=60,
)
Path(mp3_path).unlink(missing_ok=True)
return Path(output_path).exists() and result.returncode == 0
except ImportError:
logger.warning("gTTS 미설치 — GTTSEngine 비활성화")
return False
except Exception as e:
logger.warning(f"GTTSEngine 실패: {e}")
return False
# ─── ImageGenerator 구현체 ─────────────────────────────
class DALLEGenerator(BaseImageGenerator):
"""OpenAI DALL-E 3 이미지 생성 엔진"""
def __init__(self, cfg: dict):
self.api_key = os.getenv(cfg.get('api_key_env', 'OPENAI_API_KEY'), '')
self.model = cfg.get('model', 'dall-e-3')
self.default_size = cfg.get('size', '1024x1792')
self.quality = cfg.get('quality', 'standard')
def generate(self, prompt: str, output_path: str,
size: str = '') -> bool:
if not self.api_key:
logger.warning("OPENAI_API_KEY 없음 — DALLEGenerator 비활성화")
return False
try:
from openai import OpenAI
import requests as req
import io
from PIL import Image
use_size = size if size else self.default_size
client = OpenAI(api_key=self.api_key)
full_prompt = prompt + ' No text, no letters, no numbers, no watermarks.'
response = client.images.generate(
model=self.model,
prompt=full_prompt,
size=use_size,
quality=self.quality,
n=1,
)
img_url = response.data[0].url
img_bytes = req.get(img_url, timeout=30).content
img = Image.open(io.BytesIO(img_bytes)).convert('RGB')
img.save(output_path)
logger.info(f"DALL-E 이미지 생성 완료: {output_path}")
return True
except ImportError as e:
logger.warning(f"DALLEGenerator 의존성 없음: {e}")
return False
except Exception as e:
logger.error(f"DALLEGenerator 실패: {e}")
return False
class ExternalGenerator(BaseImageGenerator):
"""수동 이미지 제공 (자동 생성 없음)"""
def __init__(self, cfg: dict):
pass
def generate(self, prompt: str, output_path: str,
size: str = '') -> bool:
logger.info(f"ExternalGenerator: 수동 이미지 필요 — 프롬프트: {prompt[:60]}")
return False
# ─── Fallback Writer ──────────────────────────────────────
class FallbackWriter(BaseWriter):
"""Primary writer 실패 시 fallback chain으로 자동 재시도"""
def __init__(self, primary_name: str, primary: BaseWriter,
fallbacks: list[tuple[str, BaseWriter]]):
self._primary_name = primary_name
self._primary = primary
self._fallbacks = fallbacks # [(provider_name, writer), ...]
def write(self, prompt: str, system: str = '') -> str:
# 1차: primary
try:
result = self._primary.write(prompt, system)
if result and result.strip():
return result
logger.warning(f"{self._primary_name} 빈 응답 — fallback 시도")
except Exception as e:
logger.warning(f"{self._primary_name} 실패: {e} — fallback 시도")
# 2차: fallback chain
for fb_name, fb_writer in self._fallbacks:
try:
logger.info(f"Fallback writer 시도: {fb_name}")
result = fb_writer.write(prompt, system)
if result and result.strip():
logger.info(f"Fallback 성공: {fb_name}")
return result
logger.warning(f"{fb_name} 빈 응답")
except Exception as e:
logger.warning(f"{fb_name} fallback 실패: {e}")
logger.error("모든 writer (primary + fallback) 실패")
return ''
# ─── EngineLoader ───────────────────────────────────────
class EngineLoader:
"""
config/engine.json을 읽어 현재 설정된 provider에 맞는 구현체를 반환하는
중앙 팩토리 클래스.
사용 예:
loader = EngineLoader()
writer = loader.get_writer()
text = writer.write("오늘의 AI 뉴스 정리해줘")
"""
_DEFAULT_CONFIG = {
'writing': {'provider': 'claude', 'options': {'claude': {}}},
'tts': {'provider': 'gtts', 'options': {'gtts': {}}},
'image_generation': {'provider': 'external', 'options': {'external': {}}},
'video_generation': {'provider': 'ffmpeg_slides', 'options': {'ffmpeg_slides': {}}},
'publishing': {},
'quality_gates': {'gate1_research_min_score': 60},
}
def __init__(self, config_path: Optional[Path] = None):
self._config_path = config_path or CONFIG_PATH
self._config = self._load_config()
def _load_config(self) -> dict:
if self._config_path.exists():
try:
return json.loads(self._config_path.read_text(encoding='utf-8'))
except Exception as e:
logger.error(f"engine.json 로드 실패: {e} — 기본값 사용")
else:
logger.warning(f"engine.json 없음 ({self._config_path}) — 기본값으로 gtts + ffmpeg_slides 사용")
return dict(self._DEFAULT_CONFIG)
def get_config(self, *keys) -> Any:
"""
engine.json 값 접근.
예: loader.get_config('writing', 'provider')
loader.get_config('quality_gates', 'gate1_research_min_score')
"""
val = self._config
for key in keys:
if isinstance(val, dict):
val = val.get(key)
else:
return None
return val
def update_provider(self, category: str, provider: str) -> None:
"""
런타임 provider 변경 (engine.json 파일은 수정하지 않음).
예: loader.update_provider('tts', 'openai')
"""
if category in self._config:
self._config[category]['provider'] = provider
logger.info(f"런타임 provider 변경: {category}{provider}")
else:
logger.warning(f"update_provider: 알 수 없는 카테고리 '{category}'")
def get_writer(self, with_fallback: bool = True) -> BaseWriter:
"""현재 설정된 writing provider에 맞는 BaseWriter 구현체 반환.
with_fallback=True이면 FallbackWriter로 감싸서 자동 재시도."""
writing_cfg = self._config.get('writing', {})
provider = writing_cfg.get('provider', 'claude')
options = writing_cfg.get('options', {}).get(provider, {})
fallback_chain = writing_cfg.get('fallback_chain', [])
writers = {
'claude': ClaudeWriter,
'openclaw': OpenClawWriter,
'gemini': GeminiWriter,
'groq': GroqWriter,
'claude_web': ClaudeWebWriter,
'gemini_web': GeminiWebWriter,
}
cls = writers.get(provider, ClaudeWriter)
logger.info(f"Writer 로드: {provider} ({cls.__name__})")
primary = cls(options)
if not with_fallback or not fallback_chain:
return primary
# fallback 체인 구성
fallback_writers = []
for fb_provider in fallback_chain:
if fb_provider == provider:
continue
fb_cls = writers.get(fb_provider)
fb_opts = writing_cfg.get('options', {}).get(fb_provider, {})
if fb_cls:
fallback_writers.append((fb_provider, fb_cls(fb_opts)))
if not fallback_writers:
return primary
return FallbackWriter(provider, primary, fallback_writers)
def get_tts(self) -> BaseTTS:
"""현재 설정된 tts provider에 맞는 BaseTTS 구현체 반환"""
tts_cfg = self._config.get('tts', {})
provider = tts_cfg.get('provider', 'gtts')
options = tts_cfg.get('options', {}).get(provider, {})
tts_engines = {
'google_cloud': GoogleCloudTTS,
'openai': OpenAITTS,
'elevenlabs': ElevenLabsTTS,
'gtts': GTTSEngine,
}
cls = tts_engines.get(provider, GTTSEngine)
logger.info(f"TTS 로드: {provider} ({cls.__name__})")
return cls(options)
def get_image_generator(self) -> BaseImageGenerator:
"""현재 설정된 image_generation provider에 맞는 구현체 반환"""
img_cfg = self._config.get('image_generation', {})
provider = img_cfg.get('provider', 'external')
options = img_cfg.get('options', {}).get(provider, {})
generators = {
'dalle': DALLEGenerator,
'external': ExternalGenerator,
}
cls = generators.get(provider, ExternalGenerator)
logger.info(f"ImageGenerator 로드: {provider} ({cls.__name__})")
return cls(options)
def get_video_generator(self):
"""현재 설정된 video_generation provider에 맞는 VideoEngine 구현체 반환"""
from bots.converters import video_engine
video_cfg = self._config.get('video_generation', {
'provider': 'ffmpeg_slides',
'options': {'ffmpeg_slides': {}},
})
engine = video_engine.get_engine(video_cfg)
logger.info(f"VideoGenerator 로드: {video_cfg.get('provider', 'ffmpeg_slides')}")
return engine
def get_publishers(self) -> list:
"""
활성화된 publishing 채널 목록 반환.
반환 형식: [{'name': str, 'enabled': bool, ...설정값}, ...]
"""
publishing_cfg = self._config.get('publishing', {})
result = []
for name, cfg in publishing_cfg.items():
if isinstance(cfg, dict):
result.append({'name': name, **cfg})
return result
def get_enabled_publishers(self) -> list:
"""enabled: true인 publishing 채널만 반환"""
return [p for p in self.get_publishers() if p.get('enabled', False)]