feat: v3.2 — YouTube Shorts 봇 + 수동 어시스트 + 보안 개선

주요 추가 기능:
- bots/shorts/ 서브모듈 7개: tts_engine, script_extractor, asset_resolver,
  stock_fetcher, caption_renderer, video_assembler, youtube_uploader
- bots/shorts_bot.py: 6단계 Shorts 파이프라인 오케스트레이터
  (auto/semi_auto 두 가지 생산 모드, CLI 지원)
- bots/writer_bot.py: 독립 실행형 AI 글쓰기 봇 (대시보드 연동)
- bots/assist_bot.py: URL 기반 수동 어시스트 파이프라인
- config/shorts_config.json: Shorts 전체 설정
- templates/shorts/extract_prompt.txt: LLM 스크립트 추출 프롬프트
- scheduler.py에 shorts 잡(10:35/16:00) + /shorts Telegram 명령 추가

보안 개선:
- .env 파일 외부 경로 참조로 변경 (load_dotenv dotenv_path, 24개 파일)
- .gitignore에 민감 파일/내부 문서/런타임 데이터 항목 추가

문서:
- README.md 전면 재작성 (상세 한글 설명, 설치/설정/사용법 포함)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sinmb79
2026-03-28 17:51:02 +09:00
parent 392c2e13f1
commit 9b44a07a44
39 changed files with 3455 additions and 641 deletions
+1
View File
@@ -0,0 +1 @@
# bots/shorts/ — YouTube Shorts production submodules
+328
View File
@@ -0,0 +1,328 @@
"""
bots/shorts/asset_resolver.py
역할: 각 파이프라인 단계에서 사용할 에셋 소스를 결정하고
resolution_manifest.json을 생성.
Semi-auto 우선순위:
input/{scripts,images,videos,audio}/{article_id}* 파일 체크
→ 있으면 user_provided, 없으면 auto
캐릭터 결정:
article.corner → shorts_config corner_character_map → character type
→ character assets 경로 결정
출력:
resolution_manifest.json (메모리 dict로 반환, 필요시 저장)
"""
import json
import logging
import re
import shutil
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
BASE_DIR = Path(__file__).parent.parent.parent
EXPRESSION_MOOD_MAP = {
'dramatic': ['surprised', 'thinking', 'determined'],
'upbeat': ['curious', 'explaining', 'smiling'],
'mysterious': ['curious', 'thinking', 'smiling'],
'calm': ['explaining','thinking', 'smiling'],
}
SEGMENT_EXPRESSION = {
'hook': 0, # index into mood expression list
'body': 1,
'closer': 2,
}
def _load_config() -> dict:
cfg_path = BASE_DIR / 'config' / 'shorts_config.json'
if cfg_path.exists():
return json.loads(cfg_path.read_text(encoding='utf-8'))
return {}
def _normalize_id(article_id: str) -> str:
"""slug/article_id → 파일명 접두사 비교용 문자열."""
return re.sub(r'[^a-z0-9_\-]', '', article_id.lower())
# ─── Input 폴더 스캔 ─────────────────────────────────────────
def _scan_input(article_id: str, cfg: dict) -> dict:
"""
input/ 폴더에서 article_id와 매칭되는 사용자 제공 파일 탐색.
Returns: {script: path|None, images: [path], videos: [path], audio: path|None}
"""
input_dirs = cfg.get('input_dirs', {})
norm_id = _normalize_id(article_id)
found = {'script': None, 'images': [], 'videos': [], 'audio': None}
# scripts
scripts_dir = BASE_DIR / input_dirs.get('scripts', 'input/scripts/')
if scripts_dir.exists():
for f in scripts_dir.glob('*.json'):
if _normalize_id(f.stem).startswith(norm_id) or f.stem == article_id:
found['script'] = str(f)
break
if not found['script']:
# FIFO 소비: 가장 오래된 파일
files = sorted(scripts_dir.glob('*.json'))
if files:
found['script'] = str(files[0])
# images
images_dir = BASE_DIR / input_dirs.get('images', 'input/images/')
if images_dir.exists():
matched = [f for f in sorted(images_dir.glob('*.png'))
if _normalize_id(f.stem).startswith(norm_id)]
matched += [f for f in sorted(images_dir.glob('*.jpg'))
if _normalize_id(f.stem).startswith(norm_id)]
if matched:
found['images'] = [str(f) for f in matched]
else:
# FIFO: 매칭 없으면 순서대로 소비
all_imgs = sorted((images_dir.glob('*.png'))) + sorted(images_dir.glob('*.jpg'))
if all_imgs:
found['images'] = [str(f) for f in all_imgs[:5]]
# videos
videos_dir = BASE_DIR / input_dirs.get('videos', 'input/videos/')
if videos_dir.exists():
matched = [f for f in sorted(videos_dir.glob('*.mp4'))
if _normalize_id(f.stem).startswith(norm_id)]
if matched:
found['videos'] = [str(f) for f in matched]
else:
all_vids = sorted(videos_dir.glob('*.mp4'))
if all_vids:
found['videos'] = [str(f) for f in all_vids[:5]]
# audio
audio_dir = BASE_DIR / input_dirs.get('audio', 'input/audio/')
if audio_dir.exists():
for ext in ('*.wav', '*.mp3'):
for f in sorted(audio_dir.glob(ext)):
if _normalize_id(f.stem).startswith(norm_id) or f.stem == article_id:
found['audio'] = str(f)
break
if found['audio']:
break
if not found['audio']:
# FIFO
for ext in ('*.wav', '*.mp3'):
files = sorted(audio_dir.glob(ext))
if files:
found['audio'] = str(files[0])
break
return found
def _move_to_processed(paths: list[str]) -> None:
"""처리 완료 파일을 input/_processed/ 로 이동."""
if not paths:
return
processed = BASE_DIR / 'input' / '_processed'
processed.mkdir(parents=True, exist_ok=True)
for p in paths:
src = Path(p)
if src.exists():
dst = processed / src.name
try:
shutil.move(str(src), str(dst))
logger.debug(f'처리 완료 이동: {src.name} → input/_processed/')
except Exception as e:
logger.warning(f'파일 이동 실패 ({src.name}): {e}')
# ─── 캐릭터 결정 ──────────────────────────────────────────────
def _resolve_character(article: dict, cfg: dict) -> dict:
"""
article.corner → character type → assets 경로.
Returns: {type, name, display_name, default_pose, poses_dir, expressions_dir, backgrounds_dir, ...}
"""
corner = article.get('corner', '')
corner_map = cfg.get('assets', {}).get('corner_character_map', {})
char_type = corner_map.get(corner, 'tech_blog')
characters = cfg.get('assets', {}).get('characters', {})
char_cfg = characters.get(char_type, characters.get('tech_blog', {}))
return {
'type': char_type,
'name': char_cfg.get('name', 'bao'),
'display_name': char_cfg.get('display_name', '바오'),
'default_pose': str(BASE_DIR / char_cfg.get('default_pose', '')),
'poses_dir': str(BASE_DIR / char_cfg.get('poses_dir', '')),
'expressions_dir': str(BASE_DIR / char_cfg.get('expressions_dir', '')),
'backgrounds_dir': str(BASE_DIR / char_cfg.get('backgrounds_dir', '')),
'scarves_dir': str(BASE_DIR / char_cfg.get('scarves_dir', '')) if 'scarves_dir' in char_cfg else None,
}
def _pick_pose(char_info: dict, mood: str) -> str:
"""mood 기반 포즈 선택 (poses_dir 내 파일)."""
poses_dir = Path(char_info['poses_dir'])
if not poses_dir.exists():
return char_info['default_pose']
pose_files = sorted(poses_dir.glob('*.png'))
if not pose_files:
return char_info['default_pose']
mood_pose_map = {
'dramatic': 'pose_explaining',
'upbeat': 'pose_waving',
'mysterious': 'pose_thinking',
'calm': 'pose_sitting',
}
preferred = mood_pose_map.get(mood, '')
for pf in pose_files:
if preferred and preferred in pf.stem:
return str(pf)
return str(pose_files[0])
def _pick_expressions(char_info: dict, mood: str) -> list[str]:
"""훅/본문/클로저 각각 표정 파일 경로 선택."""
expr_dir = Path(char_info['expressions_dir'])
if not expr_dir.exists():
return [char_info['default_pose']] * 3
expr_files = {f.stem: str(f) for f in expr_dir.glob('*.png')}
if not expr_files:
return [char_info['default_pose']] * 3
mood_exprs = EXPRESSION_MOOD_MAP.get(mood, ['curious', 'explaining', 'smiling'])
result = []
for expr_name in mood_exprs:
# 완전 일치 또는 접두사 일치
match = next((v for k, v in expr_files.items() if expr_name in k), None)
if not match:
match = list(expr_files.values())[0]
result.append(match)
return result
def _pick_background(char_info: dict) -> str:
"""캐릭터 타입에 맞는 배경 파일 선택 (첫 번째 파일)."""
bg_dir = Path(char_info['backgrounds_dir'])
if not bg_dir.exists():
return ''
bg_files = sorted(bg_dir.glob('*.png')) + sorted(bg_dir.glob('*.jpg'))
return str(bg_files[0]) if bg_files else ''
# ─── 메인 엔트리포인트 ────────────────────────────────────────
def resolve(
article: dict,
script: Optional[dict] = None,
cfg: Optional[dict] = None,
commit_processed: bool = False,
) -> dict:
"""
에셋 소스 결정 → resolution manifest 생성.
Args:
article: article dict (slug, corner 등)
script: 이미 추출된 스크립트 (mood 결정용)
cfg: shorts_config.json dict
commit_processed: True이면 사용된 input/ 파일을 _processed/로 이동
Returns:
manifest dict:
{
script_source: "auto" | "user_provided",
visual_source: "auto" | "user_provided" | "mixed",
audio_source: "auto" | "user_provided",
character: {type, name, display_name, default_pose, poses_dir, ...},
pose: "path/to/pose.png",
expressions: ["path/to/expr1.png", ...], # [hook, body, closer]
background: "path/to/bg.png",
user_script_path: str | None,
user_clips: [str, ...], # mp4 경로
user_images: [str, ...], # png/jpg 경로
user_audio: str | None,
}
"""
if cfg is None:
cfg = _load_config()
article_id = article.get('slug', article.get('article_id', 'unknown'))
mood = (script or {}).get('mood', 'upbeat')
production_mode = cfg.get('production_mode', 'auto')
manifest = {
'article_id': article_id,
'production_mode': production_mode,
'script_source': 'auto',
'visual_source': 'auto',
'audio_source': 'auto',
'user_script_path': None,
'user_clips': [],
'user_images': [],
'user_audio': None,
}
# Semi-auto: input/ 폴더 스캔
if production_mode == 'semi_auto':
found = _scan_input(article_id, cfg)
if found['script']:
manifest['script_source'] = 'user_provided'
manifest['user_script_path'] = found['script']
if found['videos']:
manifest['visual_source'] = 'user_provided'
manifest['user_clips'] = found['videos']
elif found['images']:
manifest['visual_source'] = 'user_provided'
manifest['user_images'] = found['images']
if manifest['user_clips'] and manifest['user_images']:
manifest['visual_source'] = 'mixed'
if found['audio']:
manifest['audio_source'] = 'user_provided'
manifest['user_audio'] = found['audio']
logger.info(
f'에셋 결정 (semi_auto): '
f'script={manifest["script_source"]}, '
f'visual={manifest["visual_source"]}, '
f'audio={manifest["audio_source"]}'
)
else:
logger.info('에셋 결정 (auto): 모든 에셋 자동 생성')
# 캐릭터 결정
char_info = _resolve_character(article, cfg)
pose = _pick_pose(char_info, mood)
expressions = _pick_expressions(char_info, mood)
background = _pick_background(char_info)
manifest['character'] = char_info
manifest['pose'] = pose
manifest['expressions'] = expressions
manifest['background'] = background
# 처리된 input/ 파일 이동
if commit_processed and production_mode == 'semi_auto':
to_move = []
if manifest['user_script_path']:
to_move.append(manifest['user_script_path'])
to_move.extend(manifest['user_clips'])
to_move.extend(manifest['user_images'])
if manifest['user_audio']:
to_move.append(manifest['user_audio'])
_move_to_processed(to_move)
return manifest
+261
View File
@@ -0,0 +1,261 @@
"""
bots/shorts/caption_renderer.py
역할: 단어별 타임스탬프 → ASS 자막 파일 생성 (단어별 하이라이트)
스타일:
- 기본: 흰색 볼드, 검정 아웃라인 3px
- 하이라이트: 노란색 (#FFD700) — 현재 발음 중인 단어
- 훅 텍스트: 중앙 상단, 72px, 1.5초 표시
- 본문 자막: 하단 200px, 48px, 최대 2줄
출력:
data/shorts/captions/{timestamp}.ass
"""
import json
import logging
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
BASE_DIR = Path(__file__).parent.parent.parent
def _load_config() -> dict:
cfg_path = BASE_DIR / 'config' / 'shorts_config.json'
if cfg_path.exists():
return json.loads(cfg_path.read_text(encoding='utf-8'))
return {}
# ─── 색상 변환 ────────────────────────────────────────────────
def _hex_to_ass(hex_color: str, alpha: int = 0) -> str:
"""
HTML hex (#RRGGBB) → ASS 색상 &HAABBGGRR 변환.
ASS는 BGR 순서이며 alpha는 00(불투명)~FF(투명).
"""
c = hex_color.lstrip('#')
r, g, b = c[0:2], c[2:4], c[4:6]
return f'&H{alpha:02X}{b}{g}{r}'
# ─── 시간 포맷 ────────────────────────────────────────────────
def _sec_to_ass_time(seconds: float) -> str:
"""초(float) → ASS 시간 포맷 H:MM:SS.cc."""
cs = int(round(seconds * 100))
h = cs // 360000
cs %= 360000
m = cs // 6000
cs %= 6000
s = cs // 100
cs %= 100
return f'{h}:{m:02d}:{s:02d}.{cs:02d}'
# ─── ASS 헤더 ────────────────────────────────────────────────
def _ass_header(cfg: dict) -> str:
cap_cfg = cfg.get('caption', {})
font_ko = cap_cfg.get('font_ko', 'Pretendard')
font_size = cap_cfg.get('font_size', 48)
hook_size = cap_cfg.get('hook_font_size', 72)
default_color = _hex_to_ass(cap_cfg.get('default_color', '#FFFFFF'))
highlight_color = _hex_to_ass(cap_cfg.get('highlight_color', '#FFD700'))
outline_color = _hex_to_ass(cap_cfg.get('outline_color', '#000000'))
outline_w = cap_cfg.get('outline_width', 3)
margin_v = cap_cfg.get('position_from_bottom', 200)
return f"""[Script Info]
ScriptType: v4.00+
PlayResX: 1080
PlayResY: 1920
ScaledBorderAndShadow: yes
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: Default,{font_ko},{font_size},{default_color},{default_color},{outline_color},&H80000000,-1,0,0,0,100,100,0,0,1,{outline_w},1,2,20,20,{margin_v},1
Style: Highlight,{font_ko},{font_size},{highlight_color},{highlight_color},{outline_color},&H80000000,-1,0,0,0,100,100,0,0,1,{outline_w},1,2,20,20,{margin_v},1
Style: Hook,{font_ko},{hook_size},{default_color},{default_color},{outline_color},&H80000000,-1,0,0,0,100,100,0,0,1,{outline_w+1},2,5,20,20,100,1
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
# ─── 단어 → 자막 라인 분할 ────────────────────────────────────
def _split_into_lines(words: list[dict], max_chars: int = 18) -> list[list[dict]]:
"""
단어 리스트 → 라인 리스트 (최대 max_chars 자).
반환: [[{word, start, end}, ...], ...]
"""
lines = []
cur_line: list[dict] = []
cur_len = 0
for w in words:
word_text = w['word']
if cur_line and cur_len + len(word_text) + 1 > max_chars:
lines.append(cur_line)
cur_line = [w]
cur_len = len(word_text)
else:
cur_line.append(w)
cur_len += len(word_text) + (1 if cur_line else 0)
if cur_line:
lines.append(cur_line)
return lines
def _line_start_end(line: list[dict]) -> tuple[float, float]:
return line[0]['start'], line[-1]['end']
# ─── ASS 이벤트 생성 ─────────────────────────────────────────
def _word_highlight_event(
line: list[dict],
highlight_color_hex: str,
default_color_hex: str,
outline_color_hex: str,
outline_w: int,
) -> str:
"""
한 라인의 모든 단어에 대해 단어별 하이라이트 오버라이드 태그 생성.
각 단어 재생 시간 동안 해당 단어만 highlight_color로 표시.
ASS override tag: {\\c&Hxxxxxx&} 로 색상 변경.
반환: 단어별 ASS 이벤트 문자열 목록
"""
hi_ass = _hex_to_ass(highlight_color_hex)
df_ass = _hex_to_ass(default_color_hex)
events = []
for i, w in enumerate(line):
start_t = w['start']
end_t = w['end']
# 전체 라인 텍스트: 현재 단어만 하이라이트
parts = []
for j, other in enumerate(line):
if j == i:
parts.append(f'{{\\c{hi_ass}}}{other["word"]}{{\\c{df_ass}}}')
else:
parts.append(other['word'])
text = ' '.join(parts)
event = (
f'Dialogue: 0,{_sec_to_ass_time(start_t)},{_sec_to_ass_time(end_t)},'
f'Default,,0,0,0,,{text}'
)
events.append(event)
return '\n'.join(events)
def _hook_event(hook_text: str, duration: float = 1.5) -> str:
"""훅 텍스트 — 중앙 상단, 72px, 1.5초 표시."""
return (
f'Dialogue: 1,{_sec_to_ass_time(0.0)},{_sec_to_ass_time(duration)},'
f'Hook,,0,0,0,,{hook_text}'
)
# ─── 균등 분할 타임스탬프 폴백 ───────────────────────────────
def _build_uniform_timestamps(script: dict, total_duration: float) -> list[dict]:
"""
Whisper 타임스탬프 없을 때 텍스트를 균등 시간으로 분할.
"""
parts = [script.get('hook', '')]
parts.extend(script.get('body', []))
parts.append(script.get('closer', ''))
text = ' '.join(p for p in parts if p)
words = text.split()
if not words:
return []
dur_per_word = total_duration / len(words)
return [
{
'word': w,
'start': round(i * dur_per_word, 3),
'end': round((i + 1) * dur_per_word, 3),
}
for i, w in enumerate(words)
]
# ─── 메인 엔트리포인트 ────────────────────────────────────────
def render_captions(
script: dict,
timestamps: list[dict],
output_dir: Path,
timestamp: str,
wav_duration: float = 0.0,
cfg: Optional[dict] = None,
) -> Path:
"""
스크립트 + 단어별 타임스탬프 → ASS 자막 파일 생성.
Args:
script: {hook, body, closer, ...}
timestamps: [{word, start, end}, ...] — 비어있으면 균등 분할
output_dir: data/shorts/captions/
timestamp: 파일명 prefix
wav_duration: TTS 오디오 총 길이 (균등 분할 폴백용)
cfg: shorts_config.json dict
Returns:
ass_path
"""
if cfg is None:
cfg = _load_config()
output_dir.mkdir(parents=True, exist_ok=True)
ass_path = output_dir / f'{timestamp}.ass'
cap_cfg = cfg.get('caption', {})
max_chars = cap_cfg.get('max_chars_per_line_ko', 18)
highlight_color = cap_cfg.get('highlight_color', '#FFD700')
default_color = cap_cfg.get('default_color', '#FFFFFF')
outline_color = cap_cfg.get('outline_color', '#000000')
outline_w = cap_cfg.get('outline_width', 3)
# 타임스탬프 없으면 균등 분할
if not timestamps:
logger.warning('단어별 타임스탬프 없음 — 균등 분할 사용 (캡션 품질 저하)')
if wav_duration <= 0:
wav_duration = 20.0
timestamps = _build_uniform_timestamps(script, wav_duration)
# ASS 헤더
header = _ass_header(cfg)
events = []
# 훅 이벤트 (첫 1.5초 중앙 표시)
hook_text = script.get('hook', '')
if hook_text and timestamps:
hook_end = min(1.5, timestamps[0]['start'] + 1.5) if timestamps else 1.5
events.append(_hook_event(hook_text, hook_end))
# 단어별 하이라이트 이벤트
lines = _split_into_lines(timestamps, max_chars)
for line in lines:
if not line:
continue
line_event = _word_highlight_event(
line, highlight_color, default_color, outline_color, outline_w
)
events.append(line_event)
ass_content = header + '\n'.join(events) + '\n'
ass_path.write_text(ass_content, encoding='utf-8-sig') # BOM for Windows compatibility
logger.info(f'ASS 자막 생성: {ass_path.name} ({len(timestamps)}단어, {len(lines)}라인)')
return ass_path
+305
View File
@@ -0,0 +1,305 @@
"""
bots/shorts/script_extractor.py
역할: 블로그 포스트 dict → 쇼츠용 스크립트 JSON 생성
LLM 우선순위:
1. OpenClaw (로컬, EngineLoader 경유)
2. Claude API (ANTHROPIC_API_KEY)
폴백: 제목+KEY_POINTS 기반 규칙 기반 추출
출력:
data/shorts/scripts/{timestamp}.json
{hook, body, closer, keywords, mood, originality_check, article_id}
"""
import json
import logging
import re
import sys
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
BASE_DIR = Path(__file__).parent.parent.parent
PROMPT_TEMPLATE_PATH = BASE_DIR / 'templates' / 'shorts' / 'extract_prompt.txt'
# ─── 유틸 ────────────────────────────────────────────────────
def _load_config() -> dict:
cfg_path = BASE_DIR / 'config' / 'shorts_config.json'
if cfg_path.exists():
return json.loads(cfg_path.read_text(encoding='utf-8'))
return {}
def _build_post_text(article: dict) -> str:
"""article dict → LLM에 전달할 블로그 본문 텍스트."""
title = article.get('title', '')
key_points = article.get('key_points', article.get('KEY_POINTS', []))
body_html = article.get('body', article.get('BODY', ''))
# HTML 태그 제거 (간단한 정규식)
body_plain = re.sub(r'<[^>]+>', ' ', body_html)
body_plain = re.sub(r'\s+', ' ', body_plain).strip()
# 너무 길면 잘라냄 (LLM 토큰 절약)
if len(body_plain) > 1500:
body_plain = body_plain[:1500] + '...'
lines = [f'제목: {title}']
if key_points:
if isinstance(key_points, list):
lines.append('핵심 포인트:')
lines.extend(f'- {p}' for p in key_points)
else:
lines.append(f'핵심 포인트: {key_points}')
if body_plain:
lines.append(f'본문: {body_plain}')
return '\n'.join(lines)
def _load_prompt_template() -> str:
if PROMPT_TEMPLATE_PATH.exists():
return PROMPT_TEMPLATE_PATH.read_text(encoding='utf-8')
# 인라인 폴백
return (
'You are a YouTube Shorts script writer for a Korean tech blog.\n'
'Given the blog post below, extract a 15-20 second Shorts script.\n\n'
'OUTPUT FORMAT (JSON only):\n'
'{{"hook":"...","body":["..."],"closer":"...","keywords":["..."],'
'"mood":"...","originality_check":"..."}}\n\n'
'BLOG POST:\n---\n{blog_post_content}\n---'
)
def _parse_json_response(raw: str) -> Optional[dict]:
"""LLM 응답에서 JSON 추출 (마크다운 코드블록 포함 대응)."""
# ```json ... ``` 블록 제거
raw = re.sub(r'```json\s*', '', raw)
raw = re.sub(r'```\s*', '', raw)
raw = raw.strip()
# JSON 부분만 추출
match = re.search(r'\{[\s\S]+\}', raw)
if not match:
return None
try:
return json.loads(match.group())
except json.JSONDecodeError:
return None
def _validate_script(script: dict) -> bool:
"""필수 필드 존재 + 최소 품질 검사."""
required = ['hook', 'body', 'closer', 'keywords', 'mood']
if not all(k in script for k in required):
return False
if not script.get('hook'):
return False
if not isinstance(script.get('body'), list) or len(script['body']) == 0:
return False
# originality_check 없으면 경고만
if not script.get('originality_check'):
logger.warning('originality_check 필드 없음 — 스크립트 고유성 검증 불가')
return True
def _check_template_similarity(new_script: dict, scripts_dir: Path) -> bool:
"""
직전 10개 스크립트와 본문 단어 중복률 체크.
60% 초과 → True (유사도 과다, 거부 권고)
"""
new_words = set(' '.join(new_script.get('body', [])).split())
if not new_words:
return False
history_files = sorted(scripts_dir.glob('*.json'), reverse=True)[:10]
for hf in history_files:
try:
old = json.loads(hf.read_text(encoding='utf-8'))
old_words = set(' '.join(old.get('body', [])).split())
if not old_words:
continue
overlap = len(new_words & old_words) / len(new_words)
if overlap > 0.6:
logger.warning(f'스크립트 유사도 과다 ({overlap:.0%}): {hf.name}')
return True
except Exception:
continue
return False
# ─── LLM 호출 ────────────────────────────────────────────────
def _extract_via_engine(post_text: str, cfg: dict) -> Optional[dict]:
"""EngineLoader (OpenClaw/Claude API)로 스크립트 추출."""
sys.path.insert(0, str(BASE_DIR / 'bots'))
try:
from engine_loader import EngineLoader
except ImportError:
return None
template = _load_prompt_template()
prompt = template.replace('{blog_post_content}', post_text)
system = (
'You are a YouTube Shorts script extraction assistant. '
'Output only valid JSON, no explanation.'
)
try:
writer = EngineLoader(cfg_override={'writing': cfg.get('script', {}).get('llm_provider', 'openclaw')}).get_writer()
raw = writer.write(prompt, system=system).strip()
return _parse_json_response(raw)
except Exception as e:
logger.warning(f'EngineLoader 스크립트 추출 실패: {e}')
return None
def _extract_via_claude_api(post_text: str) -> Optional[dict]:
"""Anthropic API 직접 호출 (폴백)."""
import os
api_key = os.environ.get('ANTHROPIC_API_KEY', '')
if not api_key:
return None
try:
import anthropic
client = anthropic.Anthropic(api_key=api_key)
template = _load_prompt_template()
prompt = template.replace('{blog_post_content}', post_text)
msg = client.messages.create(
model='claude-haiku-4-5-20251001',
max_tokens=512,
messages=[{'role': 'user', 'content': prompt}],
)
raw = msg.content[0].text
return _parse_json_response(raw)
except Exception as e:
logger.warning(f'Claude API 스크립트 추출 실패: {e}')
return None
def _extract_rule_based(article: dict) -> dict:
"""
LLM 없을 때 규칙 기반 스크립트 추출 (최소 품질 보장).
제목 → hook, KEY_POINTS → body, CTA → closer.
"""
title = article.get('title', '제목 없음')
key_points = article.get('key_points', article.get('KEY_POINTS', []))
corner = article.get('corner', '')
if isinstance(key_points, str):
key_points = [kp.strip('- ').strip() for kp in key_points.split('\n') if kp.strip()]
# hook: 제목을 의문문으로 변환
hook = title
if not hook.endswith('?'):
hook = f'{title[:20]}... 알고 계셨나요?'
# body: KEY_POINTS 앞 3개
body = [p.strip('- ').strip() for p in key_points[:3]] if key_points else [title]
# closer: 코너별 CTA
cta_map = {
'쉬운세상': '블로그에서 더 자세히 확인해보세요.',
'숨은보물': '이 꿀팁, 주변에 공유해보세요.',
'웹소설': '전편 블로그에서 읽어보세요.',
}
closer = cta_map.get(corner, '구독하고 다음 편도 기대해주세요.')
# keywords: 제목 명사 추출 (간단)
keywords = [w for w in re.findall(r'[가-힣A-Za-z]{2,}', title)][:5]
if not keywords:
keywords = ['technology', 'korea', 'blog']
return {
'hook': hook,
'body': body,
'closer': closer,
'keywords': keywords,
'mood': 'upbeat',
'originality_check': f'{title}에 대한 핵심 포인트 요약',
}
# ─── 메인 엔트리포인트 ────────────────────────────────────────
def extract_script(
article: dict,
output_dir: Path,
timestamp: str,
cfg: Optional[dict] = None,
manifest: Optional[dict] = None,
) -> dict:
"""
블로그 포스트 → 쇼츠 스크립트 생성 + 저장.
Args:
article: article dict (title, body, key_points, corner 등)
output_dir: data/shorts/scripts/
timestamp: 파일명 prefix
cfg: shorts_config.json dict
manifest: asset_resolver 결과 (script_source 확인용)
Returns:
script dict {hook, body, closer, keywords, mood, originality_check, article_id}
"""
if cfg is None:
cfg = _load_config()
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / f'{timestamp}.json'
article_id = article.get('slug', timestamp)
# 1. Semi-auto: input/scripts/ 에 사용자 제공 스크립트 있으면 로드
if manifest and manifest.get('script_source') == 'user_provided':
user_script_path = manifest.get('user_script_path')
if user_script_path and Path(user_script_path).exists():
script = json.loads(Path(user_script_path).read_text(encoding='utf-8'))
script['article_id'] = article_id
logger.info(f'사용자 제공 스크립트 사용: {user_script_path}')
output_path.write_text(json.dumps(script, ensure_ascii=False, indent=2), encoding='utf-8')
return script
# 2. LLM 추출
post_text = _build_post_text(article)
script = None
# OpenClaw/EngineLoader 시도
script = _extract_via_engine(post_text, cfg)
# Claude API 폴백
if not script or not _validate_script(script):
logger.info('Claude API 스크립트 추출 시도...')
script = _extract_via_claude_api(post_text)
# 규칙 기반 폴백
if not script or not _validate_script(script):
logger.warning('LLM 스크립트 추출 실패 — 규칙 기반 폴백 사용')
script = _extract_rule_based(article)
if not _validate_script(script):
raise RuntimeError('스크립트 검증 실패 — 필수 필드 누락')
# 유사도 검사
if _check_template_similarity(script, output_dir):
logger.warning('스크립트 유사도 과다 — 재추출 시도')
# 한 번 더 시도 (다른 엔진)
retry = _extract_via_claude_api(post_text)
if retry and _validate_script(retry) and not _check_template_similarity(retry, output_dir):
script = retry
script['article_id'] = article_id
output_path.write_text(json.dumps(script, ensure_ascii=False, indent=2), encoding='utf-8')
logger.info(f'스크립트 저장: {output_path.name}')
logger.debug(f'hook: {script.get("hook")} | mood: {script.get("mood")}')
return script
def load_script(script_path: Path) -> dict:
"""저장된 스크립트 JSON 로드."""
return json.loads(script_path.read_text(encoding='utf-8'))
+402
View File
@@ -0,0 +1,402 @@
"""
bots/shorts/stock_fetcher.py
역할: 스크립트 keywords → 스톡 영상 클립 다운로드 (Pexels → Pixabay → 이미지 폴백)
캐릭터 오버레이:
manifest.character_overlay.enabled = true 이면
캐릭터 PNG를 각 클립 우하단에 FFmpeg overlay로 합성.
출력:
data/shorts/clips/{timestamp}/clip_N.mp4
"""
import json
import logging
import os
import subprocess
import tempfile
import urllib.request
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
BASE_DIR = Path(__file__).parent.parent.parent
PEXELS_VIDEO_URL = 'https://api.pexels.com/videos/search'
PIXABAY_VIDEO_URL = 'https://pixabay.com/api/videos/'
def _load_config() -> dict:
cfg_path = BASE_DIR / 'config' / 'shorts_config.json'
if cfg_path.exists():
return json.loads(cfg_path.read_text(encoding='utf-8'))
return {}
def _get_ffmpeg() -> str:
ffmpeg_env = os.environ.get('FFMPEG_PATH', '')
if ffmpeg_env and Path(ffmpeg_env).exists():
return ffmpeg_env
return 'ffmpeg'
# ─── Pexels ──────────────────────────────────────────────────
def _search_pexels(keyword: str, api_key: str, prefer_vertical: bool = True) -> list[dict]:
"""Pexels Video API 검색 → [{url, width, height, duration}, ...] 반환."""
import urllib.parse
import urllib.request
params = urllib.parse.urlencode({
'query': keyword,
'orientation': 'portrait' if prefer_vertical else 'landscape',
'size': 'medium',
'per_page': 10,
})
req = urllib.request.Request(
f'{PEXELS_VIDEO_URL}?{params}',
headers={'Authorization': api_key},
)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
data = json.loads(resp.read())
results = []
for v in data.get('videos', []):
# 최적 파일 선택 (HD 이하, portrait 우선)
best = None
for vf in v.get('video_files', []):
if vf.get('quality') in ('hd', 'sd') and vf.get('link', '').endswith('.mp4'):
if best is None or (prefer_vertical and vf.get('height', 0) > vf.get('width', 0)):
best = vf
if best:
results.append({
'url': best['link'],
'width': best.get('width', 0),
'height': best.get('height', 0),
'duration': v.get('duration', 5),
})
return results
except Exception as e:
logger.warning(f'Pexels 검색 실패 ({keyword}): {e}')
return []
# ─── Pixabay ─────────────────────────────────────────────────
def _search_pixabay(keyword: str, api_key: str, prefer_vertical: bool = True) -> list[dict]:
"""Pixabay Video API 검색 → [{url, width, height, duration}, ...] 반환."""
import urllib.parse
params = urllib.parse.urlencode({
'key': api_key,
'q': keyword,
'video_type': 'film',
'per_page': 10,
})
req = urllib.request.Request(f'{PIXABAY_VIDEO_URL}?{params}')
try:
with urllib.request.urlopen(req, timeout=15) as resp:
data = json.loads(resp.read())
results = []
for hit in data.get('hits', []):
videos = hit.get('videos', {})
# medium 우선
for quality in ('medium', 'large', 'small', 'tiny'):
vf = videos.get(quality)
if vf and vf.get('url', '').endswith('.mp4'):
results.append({
'url': vf['url'],
'width': vf.get('width', 0),
'height': vf.get('height', 0),
'duration': hit.get('duration', 5),
})
break
return results
except Exception as e:
logger.warning(f'Pixabay 검색 실패 ({keyword}): {e}')
return []
# ─── 다운로드 ─────────────────────────────────────────────────
def _download_clip(url: str, dest: Path) -> bool:
"""URL → dest 파일 다운로드. 성공 시 True."""
try:
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req, timeout=60) as resp:
data = resp.read()
dest.write_bytes(data)
logger.debug(f'클립 다운로드: {dest.name} ({len(data)//1024}KB)')
return True
except Exception as e:
logger.warning(f'클립 다운로드 실패 ({url[:60]}): {e}')
return False
# ─── FFmpeg 전처리 ────────────────────────────────────────────
def _prepare_clip(input_path: Path, output_path: Path, duration: float = 6.0) -> bool:
"""
클립을 1080×1920 세로 포맷으로 변환 + 길이 트리밍.
가로 클립은 center-crop, 세로 클립은 scale.
"""
ffmpeg = _get_ffmpeg()
cmd = [
ffmpeg, '-y',
'-i', str(input_path),
'-t', str(duration),
'-vf', (
'scale=1080:1920:force_original_aspect_ratio=increase,'
'crop=1080:1920'
),
'-r', '30',
'-c:v', 'libx264', '-crf', '23', '-preset', 'fast',
'-an', # 스톡 클립 오디오 제거
str(output_path),
]
try:
subprocess.run(cmd, check=True, capture_output=True, timeout=120)
return True
except subprocess.CalledProcessError as e:
logger.warning(f'클립 전처리 실패: {e.stderr.decode(errors="ignore")[:200]}')
return False
def _kenburns_image(image_path: Path, output_path: Path, duration: float = 6.0) -> bool:
"""정지 이미지 → Ken Burns 효과 MP4."""
ffmpeg = _get_ffmpeg()
frames = int(duration * 30)
cmd = [
ffmpeg, '-y',
'-loop', '1',
'-i', str(image_path),
'-vf', (
f'scale=1200:2134,'
f'zoompan=z=\'min(zoom+0.0008,1.1)\':'
f'd={frames}:'
f'x=\'iw/2-(iw/zoom/2)\':'
f'y=\'ih/2-(ih/zoom/2)\':'
f's=1080x1920'
),
'-t', str(duration),
'-r', '30',
'-c:v', 'libx264', '-crf', '23', '-preset', 'fast',
'-an',
str(output_path),
]
try:
subprocess.run(cmd, check=True, capture_output=True, timeout=120)
return True
except subprocess.CalledProcessError as e:
logger.warning(f'Ken Burns 실패: {e.stderr.decode(errors="ignore")[:200]}')
return False
# ─── 캐릭터 오버레이 ──────────────────────────────────────────
def _overlay_character(
clip_path: Path,
output_path: Path,
char_png: str,
char_cfg: dict,
) -> bool:
"""
클립 우하단에 캐릭터 PNG 오버레이.
char_cfg: {scale_width, margin_right, margin_bottom}
"""
if not char_png or not Path(char_png).exists():
return False
ffmpeg = _get_ffmpeg()
scale_w = char_cfg.get('scale_width', 300)
mr = char_cfg.get('margin_right', 40)
mb = char_cfg.get('margin_bottom', 250)
# overlay 위치: 오른쪽 끝 - margin
overlay_x = f'W-{scale_w}-{mr}'
overlay_y = f'H-{scale_w * 2}-{mb}' # 대략적인 높이 추정
cmd = [
ffmpeg, '-y',
'-i', str(clip_path),
'-i', char_png,
'-filter_complex', (
f'[1:v]scale={scale_w}:-1[char];'
f'[0:v][char]overlay={overlay_x}:{overlay_y}'
),
'-c:v', 'libx264', '-crf', '23', '-preset', 'fast',
'-an',
str(output_path),
]
try:
subprocess.run(cmd, check=True, capture_output=True, timeout=120)
return True
except subprocess.CalledProcessError as e:
logger.warning(f'캐릭터 오버레이 실패: {e.stderr.decode(errors="ignore")[:200]}')
return False
# ─── 메인 엔트리포인트 ────────────────────────────────────────
def fetch_clips(
script: dict,
manifest: dict,
output_dir: Path,
timestamp: str,
cfg: Optional[dict] = None,
) -> list[Path]:
"""
스크립트 keywords → 클립 목록 (1080×1920, 준비 완료).
Args:
script: {keywords, mood, ...}
manifest: asset_resolver 결과
output_dir: data/shorts/clips/
timestamp: 파일명 prefix
cfg: shorts_config.json dict
Returns:
[clip_path, ...] — 최소 2개, 최대 5개
"""
if cfg is None:
cfg = _load_config()
clips_dir = output_dir / timestamp
clips_dir.mkdir(parents=True, exist_ok=True)
vis_cfg = cfg.get('visuals', {})
min_clips = vis_cfg.get('min_clips', 3)
max_clips = vis_cfg.get('max_clips', 5)
prefer_vertical = vis_cfg.get('prefer_vertical', True)
pexels_key = os.environ.get(vis_cfg.get('pexels_api_key_env', 'PEXELS_API_KEY'), '')
pixabay_key = os.environ.get(vis_cfg.get('pixabay_api_key_env', 'PIXABAY_API_KEY'), '')
char_overlay_cfg = cfg.get('assets', {}).get('character_overlay', {})
overlay_enabled = char_overlay_cfg.get('enabled', True)
# 표정 순서: hook/body/closer → 각 세그먼트에 할당
expressions = manifest.get('expressions', [])
char_pose = manifest.get('pose', manifest.get('character', {}).get('default_pose', ''))
result_clips: list[Path] = []
# 1. 사용자 제공 비디오 클립
for i, user_clip in enumerate(manifest.get('user_clips', [])[:max_clips]):
out = clips_dir / f'clip_{i+1:02d}.mp4'
if _prepare_clip(Path(user_clip), out):
result_clips.append(out)
# 2. 사용자 제공 이미지 → Ken Burns
for i, user_img in enumerate(manifest.get('user_images', [])[:max_clips]):
if len(result_clips) >= max_clips:
break
out = clips_dir / f'clip_img_{i+1:02d}.mp4'
if _kenburns_image(Path(user_img), out):
result_clips.append(out)
# 3. 캐릭터 에셋 + 배경 합성
background = manifest.get('background', '')
if background and Path(background).exists() and len(result_clips) < max_clips:
# 배경 이미지 → Ken Burns 클립 (표정별 합성)
for seg_idx, expr_png in enumerate(expressions[:3]):
if len(result_clips) >= max_clips:
break
out_bg = clips_dir / f'clip_bg_{seg_idx+1:02d}.mp4'
if _kenburns_image(Path(background), out_bg):
# 표정 오버레이
if expr_png and Path(expr_png).exists():
out_char = clips_dir / f'clip_char_{seg_idx+1:02d}.mp4'
if _overlay_character(out_bg, out_char, expr_png, char_overlay_cfg):
out_bg.unlink(missing_ok=True)
result_clips.append(out_char)
else:
result_clips.append(out_bg)
else:
result_clips.append(out_bg)
# 4. Pexels 스톡 클립
keywords = script.get('keywords', [])
stock_idx = len(result_clips)
for keyword in keywords:
if len(result_clips) >= max_clips:
break
if pexels_key:
videos = _search_pexels(keyword, pexels_key, prefer_vertical)
for v in videos[:2]:
if len(result_clips) >= max_clips:
break
stock_idx += 1
raw = clips_dir / f'raw_{stock_idx:02d}.mp4'
if _download_clip(v['url'], raw):
out = clips_dir / f'clip_stock_{stock_idx:02d}.mp4'
if _prepare_clip(raw, out):
raw.unlink(missing_ok=True)
# 캐릭터 오버레이 (포즈)
if overlay_enabled and char_pose and Path(char_pose).exists():
out_o = clips_dir / f'clip_o_{stock_idx:02d}.mp4'
if _overlay_character(out, out_o, char_pose, char_overlay_cfg):
out.unlink(missing_ok=True)
result_clips.append(out_o)
else:
result_clips.append(out)
else:
result_clips.append(out)
else:
raw.unlink(missing_ok=True)
# 5. Pixabay 폴백
for keyword in keywords:
if len(result_clips) >= max_clips:
break
if pixabay_key:
videos = _search_pixabay(keyword, pixabay_key, prefer_vertical)
for v in videos[:2]:
if len(result_clips) >= max_clips:
break
stock_idx += 1
raw = clips_dir / f'raw_px_{stock_idx:02d}.mp4'
if _download_clip(v['url'], raw):
out = clips_dir / f'clip_px_{stock_idx:02d}.mp4'
if _prepare_clip(raw, out):
raw.unlink(missing_ok=True)
result_clips.append(out)
else:
raw.unlink(missing_ok=True)
# 6. 폴백: 배경 이미지만 있는 단순 클립
if len(result_clips) < min_clips:
logger.warning(f'클립 부족 ({len(result_clips)}/{min_clips}) — 배경 반복 폴백')
fallback_img = Path(background) if background and Path(background).exists() else None
if not fallback_img:
# 단색 배경 생성
fallback_img = clips_dir / 'fallback_bg.png'
_generate_solid_bg(fallback_img)
while len(result_clips) < min_clips:
stock_idx += 1
out = clips_dir / f'clip_fallback_{stock_idx:02d}.mp4'
if _kenburns_image(fallback_img, out):
result_clips.append(out)
else:
break
logger.info(f'클립 준비 완료: {len(result_clips)}개 → {clips_dir}')
return result_clips[:max_clips]
def _generate_solid_bg(output_path: Path, color: str = '#1a1a2e') -> None:
"""단색 배경 PNG 생성 (Pillow 사용, 없으면 FFmpeg)."""
try:
from PIL import Image
img = Image.new('RGB', (1080, 1920), color)
img.save(str(output_path))
except Exception:
ffmpeg = _get_ffmpeg()
try:
subprocess.run(
[ffmpeg, '-y', '-f', 'lavfi',
'-i', f'color=c={color.lstrip("#")}:size=1080x1920:rate=1',
'-frames:v', '1', str(output_path)],
check=True, capture_output=True, timeout=30,
)
except Exception as e:
logger.warning(f'단색 배경 생성 실패: {e}')
+371
View File
@@ -0,0 +1,371 @@
"""
bots/shorts/tts_engine.py
역할: 쇼츠 스크립트 텍스트 → 음성(WAV) + 단어별 타임스탬프(JSON) 생성
엔진 우선순위 (shorts_config.json tts.engine_priority):
1. ElevenLabs — 최고 품질, ELEVENLABS_API_KEY 필요
2. Google Cloud TTS — 중간 품질, GOOGLE_TTS_API_KEY 필요
3. Edge TTS — 무료 폴백, API 키 불필요
출력:
data/shorts/tts/{timestamp}.wav
data/shorts/tts/{timestamp}_timestamps.json
[{word: str, start: float, end: float}, ...]
"""
import asyncio
import json
import logging
import os
import re
import struct
import tempfile
import wave
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
# ─── 공통 유틸 ────────────────────────────────────────────────
def _load_config() -> dict:
cfg_path = Path(__file__).parent.parent.parent / 'config' / 'shorts_config.json'
if cfg_path.exists():
return json.loads(cfg_path.read_text(encoding='utf-8'))
return {}
def _concat_script(script: dict) -> str:
"""스크립트 dict → 읽기용 단일 텍스트. 문장 사이 공백 추가."""
parts = [script.get('hook', '')]
parts.extend(script.get('body', []))
parts.append(script.get('closer', ''))
return ' '.join(p for p in parts if p)
def _add_pause(wav_path: Path, pause_ms: int = 300) -> None:
"""WAV 파일 끝에 무음 pause_ms 밀리초 추가 (인플레이스)."""
with wave.open(str(wav_path), 'rb') as wf:
params = wf.getparams()
frames = wf.readframes(wf.getnframes())
silence_frames = int(params.framerate * pause_ms / 1000)
silence = b'\x00' * silence_frames * params.nchannels * params.sampwidth
with wave.open(str(wav_path), 'wb') as wf:
wf.setparams(params)
wf.writeframes(frames + silence)
def _get_wav_duration(wav_path: Path) -> float:
with wave.open(str(wav_path), 'rb') as wf:
return wf.getnframes() / wf.getframerate()
# ─── ElevenLabs ───────────────────────────────────────────────
def _tts_elevenlabs(text: str, output_path: Path, cfg: dict) -> list[dict]:
"""
ElevenLabs TTS + 단어별 타임스탬프.
Returns: [{word, start, end}, ...]
"""
import requests
api_key = os.environ.get('ELEVENLABS_API_KEY', '')
if not api_key:
raise RuntimeError('ELEVENLABS_API_KEY not set')
el_cfg = cfg.get('tts', {}).get('elevenlabs', {})
voice_id = el_cfg.get('voice_id', 'pNInz6obpgDQGcFmaJgB')
model_id = el_cfg.get('model', 'eleven_multilingual_v2')
stability = el_cfg.get('stability', 0.5)
similarity = el_cfg.get('similarity_boost', 0.8)
speed = el_cfg.get('speed', 1.1)
url = f'https://api.elevenlabs.io/v1/text-to-speech/{voice_id}/with-timestamps'
headers = {'xi-api-key': api_key, 'Content-Type': 'application/json'}
payload = {
'text': text,
'model_id': model_id,
'voice_settings': {
'stability': stability,
'similarity_boost': similarity,
'speed': speed,
},
}
resp = requests.post(url, headers=headers, json=payload, timeout=60)
resp.raise_for_status()
data = resp.json()
# 오디오 디코딩
import base64
audio_b64 = data.get('audio_base64', '')
audio_bytes = base64.b64decode(audio_b64)
# ElevenLabs는 mp3 반환 → wav 변환
mp3_tmp = output_path.with_suffix('.mp3')
mp3_tmp.write_bytes(audio_bytes)
_mp3_to_wav(mp3_tmp, output_path)
mp3_tmp.unlink(missing_ok=True)
# 타임스탬프 파싱
alignment = data.get('alignment', {})
chars = alignment.get('characters', [])
starts = alignment.get('character_start_times_seconds', [])
ends = alignment.get('character_end_times_seconds', [])
timestamps = _chars_to_words(chars, starts, ends)
return timestamps
def _chars_to_words(chars: list, starts: list, ends: list) -> list[dict]:
"""ElevenLabs 문자 레벨 타임스탬프 → 단어 레벨."""
words = []
cur_word = ''
cur_start = 0.0
cur_end = 0.0
for ch, st, en in zip(chars, starts, ends):
if ch in (' ', '\n'):
if cur_word:
words.append({'word': cur_word, 'start': round(cur_start, 3), 'end': round(cur_end, 3)})
cur_word = ''
else:
if not cur_word:
cur_start = st
cur_word += ch
cur_end = en
if cur_word:
words.append({'word': cur_word, 'start': round(cur_start, 3), 'end': round(cur_end, 3)})
return words
def _mp3_to_wav(mp3_path: Path, wav_path: Path) -> None:
try:
from pydub import AudioSegment
AudioSegment.from_mp3(str(mp3_path)).export(str(wav_path), format='wav')
return
except Exception:
pass
# ffmpeg 폴백
import subprocess
ffmpeg = _get_ffmpeg()
subprocess.run(
[ffmpeg, '-y', '-i', str(mp3_path), str(wav_path)],
check=True, capture_output=True,
)
def _get_ffmpeg() -> str:
ffmpeg_env = os.environ.get('FFMPEG_PATH', '')
if ffmpeg_env and Path(ffmpeg_env).exists():
return ffmpeg_env
return 'ffmpeg'
# ─── Google Cloud TTS ─────────────────────────────────────────
def _tts_google_cloud(text: str, output_path: Path, cfg: dict) -> list[dict]:
"""
Google Cloud TTS (REST API) + SSML time_pointing으로 타임스탬프 추출.
Returns: [{word, start, end}, ...]
"""
import requests
api_key = os.environ.get('GOOGLE_TTS_API_KEY', '')
if not api_key:
raise RuntimeError('GOOGLE_TTS_API_KEY not set')
gc_cfg = cfg.get('tts', {}).get('google_cloud', {})
voice_name = gc_cfg.get('voice_name', 'ko-KR-Neural2-C')
speaking_rate = gc_cfg.get('speaking_rate', 1.1)
# SSML: 단어별 mark 삽입
words = text.split()
ssml_parts = []
for i, w in enumerate(words):
ssml_parts.append(f'<mark name="w{i}"/>{w}')
ssml_text = ' '.join(ssml_parts)
ssml = f'<speak>{ssml_text}<mark name="end"/></speak>'
url = f'https://texttospeech.googleapis.com/v1beta1/text:synthesize?key={api_key}'
payload = {
'input': {'ssml': ssml},
'voice': {'languageCode': voice_name[:5], 'name': voice_name},
'audioConfig': {
'audioEncoding': 'LINEAR16',
'speakingRate': speaking_rate,
'sampleRateHertz': 44100,
},
'enableTimePointing': ['SSML_MARK'],
}
resp = requests.post(url, json=payload, timeout=60)
resp.raise_for_status()
data = resp.json()
import base64
audio_bytes = base64.b64decode(data['audioContent'])
output_path.write_bytes(audio_bytes)
# 타임스탬프 파싱
timepoints = data.get('timepoints', [])
timestamps = _gcloud_marks_to_words(words, timepoints)
return timestamps
def _gcloud_marks_to_words(words: list[str], timepoints: list[dict]) -> list[dict]:
"""Google Cloud TTS mark 타임포인트 → 단어별 {word, start, end}."""
mark_map = {tp['markName']: tp['timeSeconds'] for tp in timepoints}
total_dur = mark_map.get('end', 0.0)
result = []
for i, w in enumerate(words):
start = mark_map.get(f'w{i}', 0.0)
end = mark_map.get(f'w{i+1}', total_dur)
result.append({'word': w, 'start': round(start, 3), 'end': round(end, 3)})
return result
# ─── Edge TTS + Whisper ───────────────────────────────────────
def _tts_edge(text: str, output_path: Path, cfg: dict) -> list[dict]:
"""
Edge TTS (무료) → WAV 생성 후 Whisper로 단어별 타임스탬프 추출.
Returns: [{word, start, end}, ...]
"""
import edge_tts
edge_cfg = cfg.get('tts', {}).get('edge_tts', {})
voice = edge_cfg.get('voice', 'ko-KR-SunHiNeural')
rate = edge_cfg.get('rate', '+10%')
mp3_tmp = output_path.with_suffix('.mp3')
async def _generate():
communicate = edge_tts.Communicate(text, voice, rate=rate)
await communicate.save(str(mp3_tmp))
asyncio.get_event_loop().run_until_complete(_generate())
# mp3 → wav
_mp3_to_wav(mp3_tmp, output_path)
mp3_tmp.unlink(missing_ok=True)
# Whisper로 타임스탬프 추출
timestamps = _whisper_timestamps(output_path)
return timestamps
def _whisper_timestamps(wav_path: Path) -> list[dict]:
"""openai-whisper를 사용해 단어별 타임스탬프 추출. 없으면 균등 분할."""
try:
import whisper # type: ignore
model = whisper.load_model('tiny')
result = model.transcribe(str(wav_path), word_timestamps=True, language='ko')
words = []
for seg in result.get('segments', []):
for w in seg.get('words', []):
words.append({
'word': w['word'].strip(),
'start': round(w['start'], 3),
'end': round(w['end'], 3),
})
if words:
return words
except Exception as e:
logger.warning(f'Whisper 타임스탬프 실패: {e} — 균등 분할 사용')
return _uniform_timestamps(wav_path)
def _uniform_timestamps(wav_path: Path) -> list[dict]:
"""Whisper 없을 때 균등 분할 타임스탬프 (캡션 품질 저하 감수)."""
duration = _get_wav_duration(wav_path)
with wave.open(str(wav_path), 'rb') as wf:
pass # just to confirm it's readable
# WAV 파일에서 텍스트를 다시 알 수 없으므로 빈 리스트 반환
# (caption_renderer가 균등 분할을 처리)
return []
# ─── 메인 엔트리포인트 ────────────────────────────────────────
def generate_tts(
script: dict,
output_dir: Path,
timestamp: str,
cfg: Optional[dict] = None,
) -> tuple[Path, list[dict]]:
"""
스크립트 dict → WAV + 단어별 타임스탬프.
Args:
script: {hook, body, closer, ...}
output_dir: data/shorts/tts/
timestamp: 파일명 prefix (e.g. "20260328_120000")
cfg: shorts_config.json dict (없으면 자동 로드)
Returns:
(wav_path, timestamps) — timestamps: [{word, start, end}, ...]
"""
if cfg is None:
cfg = _load_config()
output_dir.mkdir(parents=True, exist_ok=True)
wav_path = output_dir / f'{timestamp}.wav'
ts_path = output_dir / f'{timestamp}_timestamps.json'
text = _concat_script(script)
pause_ms = cfg.get('tts', {}).get('inter_sentence_pause_ms', 300)
priority = cfg.get('tts', {}).get('engine_priority', ['elevenlabs', 'google_cloud', 'edge_tts'])
engine_map = {
'elevenlabs': _tts_elevenlabs,
'google_cloud': _tts_google_cloud,
'edge_tts': _tts_edge,
}
timestamps: list[dict] = []
last_error: Optional[Exception] = None
for engine_name in priority:
fn = engine_map.get(engine_name)
if fn is None:
continue
try:
logger.info(f'TTS 엔진 시도: {engine_name}')
timestamps = fn(text, wav_path, cfg)
logger.info(f'TTS 완료 ({engine_name}): {wav_path.name}')
break
except Exception as e:
logger.warning(f'TTS 엔진 실패 ({engine_name}): {e}')
last_error = e
if wav_path.exists():
wav_path.unlink()
if not wav_path.exists():
raise RuntimeError(f'모든 TTS 엔진 실패. 마지막 오류: {last_error}')
# 문장 끝 무음 추가
try:
_add_pause(wav_path, pause_ms)
except Exception as e:
logger.warning(f'무음 추가 실패: {e}')
# 타임스탬프 저장
ts_path.write_text(json.dumps(timestamps, ensure_ascii=False, indent=2), encoding='utf-8')
logger.info(f'타임스탬프 저장: {ts_path.name} ({len(timestamps)}단어)')
return wav_path, timestamps
def load_timestamps(ts_path: Path) -> list[dict]:
"""저장된 타임스탬프 JSON 로드."""
return json.loads(ts_path.read_text(encoding='utf-8'))
+415
View File
@@ -0,0 +1,415 @@
"""
bots/shorts/video_assembler.py
역할: 준비된 클립 + TTS 오디오 + ASS 자막 → 최종 쇼츠 MP4 조립
FFmpeg 전용 (CapCut 없음):
1. 각 클립을 오디오 길이에 맞게 비율 배분
2. xfade crossfade로 연결
3. ASS 자막 burn-in
4. TTS 오디오 합성 + BGM 덕킹
5. 페이드인/페이드아웃
6. 루프 최적화: 마지막 클립 = 첫 클립 복사 (리플레이 유도)
출력:
data/shorts/rendered/{timestamp}.mp4
"""
import json
import logging
import os
import subprocess
import tempfile
import wave
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
BASE_DIR = Path(__file__).parent.parent.parent
def _load_config() -> dict:
cfg_path = BASE_DIR / 'config' / 'shorts_config.json'
if cfg_path.exists():
return json.loads(cfg_path.read_text(encoding='utf-8'))
return {}
def _get_ffmpeg() -> str:
ffmpeg_env = os.environ.get('FFMPEG_PATH', '')
if ffmpeg_env and Path(ffmpeg_env).exists():
return ffmpeg_env
return 'ffmpeg'
def _get_wav_duration(wav_path: Path) -> float:
try:
with wave.open(str(wav_path), 'rb') as wf:
return wf.getnframes() / wf.getframerate()
except Exception:
# ffprobe 폴백
try:
result = subprocess.run(
['ffprobe', '-v', 'error', '-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1', str(wav_path)],
capture_output=True, text=True, timeout=10,
)
return float(result.stdout.strip())
except Exception:
return 20.0
def _get_video_duration(video_path: Path) -> float:
try:
result = subprocess.run(
['ffprobe', '-v', 'error', '-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1', str(video_path)],
capture_output=True, text=True, timeout=10,
)
return float(result.stdout.strip())
except Exception:
return 6.0
# ─── 클립 연결 ────────────────────────────────────────────────
def _trim_clip(src: Path, dst: Path, duration: float, ffmpeg: str) -> bool:
"""클립을 duration 초로 트리밍."""
cmd = [
ffmpeg, '-y', '-i', str(src),
'-t', f'{duration:.3f}',
'-c:v', 'libx264', '-crf', '23', '-preset', 'fast',
'-an', '-r', '30',
str(dst),
]
try:
subprocess.run(cmd, check=True, capture_output=True, timeout=120)
return True
except subprocess.CalledProcessError as e:
logger.warning(f'트리밍 실패: {e.stderr.decode(errors="ignore")[:200]}')
return False
def _concat_with_xfade(clips: list[Path], output: Path, crossfade: float, ffmpeg: str) -> bool:
"""
xfade 트랜지션으로 클립 연결.
2개 이상 클립의 경우 순차 xfade 적용.
"""
if len(clips) == 1:
import shutil
shutil.copy2(str(clips[0]), str(output))
return True
# 각 클립 길이 확인
durations = [_get_video_duration(c) for c in clips]
# ffmpeg complex filtergraph 구성
inputs = []
for c in clips:
inputs += ['-i', str(c)]
# xfade chain: [0][1]xfade, [xfade1][2]xfade, ...
filter_parts = []
offset = 0.0
prev_label = '[0:v]'
for i in range(1, len(clips)):
offset += durations[i - 1] - crossfade
out_label = f'[xf{i}]'
filter_parts.append(
f'{prev_label}[{i}:v]xfade=transition=fade:duration={crossfade}:offset={offset:.3f}{out_label}'
)
prev_label = out_label
filter_complex = ';'.join(filter_parts)
cmd = [
ffmpeg, '-y',
*inputs,
'-filter_complex', filter_complex,
'-map', prev_label,
'-c:v', 'libx264', '-crf', '23', '-preset', 'fast',
'-an', '-r', '30',
str(output),
]
try:
subprocess.run(cmd, check=True, capture_output=True, timeout=300)
return True
except subprocess.CalledProcessError as e:
logger.warning(f'xfade 연결 실패: {e.stderr.decode(errors="ignore")[:300]}')
# 폴백: 단순 concat (트랜지션 없음)
return _concat_simple(clips, output, ffmpeg)
def _concat_simple(clips: list[Path], output: Path, ffmpeg: str) -> bool:
"""트랜지션 없는 단순 concat (폴백)."""
list_file = output.parent / 'concat_list.txt'
lines = [f"file '{c.as_posix()}'" for c in clips]
list_file.write_text('\n'.join(lines), encoding='utf-8')
cmd = [
ffmpeg, '-y',
'-f', 'concat', '-safe', '0',
'-i', str(list_file),
'-c:v', 'libx264', '-crf', '23', '-preset', 'fast',
'-an', '-r', '30',
str(output),
]
try:
subprocess.run(cmd, check=True, capture_output=True, timeout=300)
list_file.unlink(missing_ok=True)
return True
except subprocess.CalledProcessError as e:
logger.error(f'단순 concat 실패: {e.stderr.decode(errors="ignore")[:200]}')
list_file.unlink(missing_ok=True)
return False
# ─── 오디오 합성 ─────────────────────────────────────────────
def _mix_audio(tts_wav: Path, bgm_path: Optional[Path], bgm_db: float,
total_dur: float, output: Path, ffmpeg: str) -> bool:
"""TTS + BGM 혼합 (BGM 덕킹)."""
if bgm_path and bgm_path.exists():
cmd = [
ffmpeg, '-y',
'-i', str(tts_wav),
'-stream_loop', '-1', '-i', str(bgm_path),
'-filter_complex', (
f'[1:a]volume={bgm_db}dB,atrim=0:{total_dur:.3f}[bgm];'
f'[0:a][bgm]amix=inputs=2:duration=first[aout]'
),
'-map', '[aout]',
'-c:a', 'aac', '-b:a', '192k',
'-t', f'{total_dur:.3f}',
str(output),
]
else:
cmd = [
ffmpeg, '-y',
'-i', str(tts_wav),
'-c:a', 'aac', '-b:a', '192k',
'-t', f'{total_dur:.3f}',
str(output),
]
try:
subprocess.run(cmd, check=True, capture_output=True, timeout=120)
return True
except subprocess.CalledProcessError as e:
logger.warning(f'오디오 혼합 실패: {e.stderr.decode(errors="ignore")[:200]}')
return False
# ─── 최종 합성 ────────────────────────────────────────────────
def _assemble_final(
video: Path, audio: Path, ass_path: Optional[Path],
output: Path, fade_in: float, fade_out: float,
total_dur: float, cfg: dict, ffmpeg: str,
) -> bool:
"""
비디오 + 오디오 + ASS 자막 → 최종 MP4.
페이드인/아웃 + 루프 최적화 (0.2s 무음 끝에 추가).
"""
vid_cfg = cfg.get('video', {})
crf = vid_cfg.get('crf', 18)
codec = vid_cfg.get('codec', 'libx264')
audio_codec = vid_cfg.get('audio_codec', 'aac')
audio_bitrate = vid_cfg.get('audio_bitrate', '192k')
# 페이드인/아웃 필터
fade_filter = (
f'fade=t=in:st=0:d={fade_in},'
f'fade=t=out:st={total_dur - fade_out:.3f}:d={fade_out}'
)
# ASS 자막 burn-in
if ass_path and ass_path.exists():
ass_posix = ass_path.as_posix().replace(':', '\\:')
vf = f'{fade_filter},ass={ass_posix}'
else:
vf = fade_filter
cmd = [
ffmpeg, '-y',
'-i', str(video),
'-i', str(audio),
'-vf', vf,
'-af', (
f'afade=t=in:st=0:d={fade_in},'
f'afade=t=out:st={total_dur - fade_out:.3f}:d={fade_out},'
f'apad=pad_dur=0.2' # 루프 최적화: 0.2s 무음
),
'-c:v', codec, '-crf', str(crf), '-preset', 'medium',
'-c:a', audio_codec, '-b:a', audio_bitrate,
'-r', str(vid_cfg.get('fps', 30)),
'-shortest',
str(output),
]
try:
subprocess.run(cmd, check=True, capture_output=True, timeout=600)
return True
except subprocess.CalledProcessError as e:
logger.error(f'최종 합성 실패: {e.stderr.decode(errors="ignore")[:400]}')
return False
# ─── 파일 크기 체크 ──────────────────────────────────────────
def _check_filesize(path: Path, max_mb: int = 50) -> bool:
size_mb = path.stat().st_size / (1024 * 1024)
logger.info(f'출력 파일 크기: {size_mb:.1f}MB')
return size_mb <= max_mb
def _rerender_smaller(src: Path, dst: Path, ffmpeg: str) -> bool:
"""파일 크기 초과 시 CRF 23으로 재인코딩."""
cmd = [
ffmpeg, '-y', '-i', str(src),
'-c:v', 'libx264', '-crf', '23', '-preset', 'medium',
'-c:a', 'aac', '-b:a', '128k',
str(dst),
]
try:
subprocess.run(cmd, check=True, capture_output=True, timeout=600)
return True
except subprocess.CalledProcessError as e:
logger.error(f'재인코딩 실패: {e.stderr.decode(errors="ignore")[:200]}')
return False
# ─── 메인 엔트리포인트 ────────────────────────────────────────
def assemble(
clips: list[Path],
tts_wav: Path,
ass_path: Optional[Path],
output_dir: Path,
timestamp: str,
cfg: Optional[dict] = None,
work_dir: Optional[Path] = None,
) -> Path:
"""
클립 + TTS + 자막 → 최종 쇼츠 MP4.
Args:
clips: [clip_path, ...] — 준비된 1080×1920 MP4 목록
tts_wav: TTS 오디오 WAV 경로
ass_path: ASS 자막 경로 (None이면 자막 없음)
output_dir: data/shorts/rendered/
timestamp: 파일명 prefix
cfg: shorts_config.json dict
work_dir: 임시 작업 디렉터리 (None이면 자동 생성)
Returns:
rendered_path
Raises:
RuntimeError — 조립 실패 또는 품질 게이트 미통과
"""
if cfg is None:
cfg = _load_config()
output_dir.mkdir(parents=True, exist_ok=True)
ffmpeg = _get_ffmpeg()
vid_cfg = cfg.get('video', {})
crossfade = vid_cfg.get('crossfade_sec', 0.3)
fade_in = vid_cfg.get('fade_in_sec', 0.5)
fade_out = vid_cfg.get('fade_out_sec', 0.5)
bgm_path_str = vid_cfg.get('bgm_path', '')
bgm_db = vid_cfg.get('bgm_volume_db', -18)
bgm_path = BASE_DIR / bgm_path_str if bgm_path_str else None
audio_dur = _get_wav_duration(tts_wav)
logger.info(f'TTS 길이: {audio_dur:.1f}')
# 품질 게이트: 15초 미만 / 60초 초과
if audio_dur < 10:
raise RuntimeError(f'TTS 길이 너무 짧음: {audio_dur:.1f}초 (최소 10초)')
if audio_dur > 65:
raise RuntimeError(f'TTS 길이 너무 김: {audio_dur:.1f}초 (최대 65초)')
if not clips:
raise RuntimeError('클립 없음 — 조립 불가')
# 임시 작업 디렉터리
import contextlib
import shutil
tmp_cleanup = work_dir is None
if work_dir is None:
work_dir = output_dir / f'_work_{timestamp}'
work_dir.mkdir(parents=True, exist_ok=True)
try:
# ── 루프 최적화: 클립 목록 끝에 첫 클립 추가 ──────────────
loop_clips = list(clips)
if len(clips) > 1:
loop_clip = work_dir / 'loop_clip.mp4'
if _trim_clip(clips[0], loop_clip, min(2.0, _get_video_duration(clips[0])), ffmpeg):
loop_clips.append(loop_clip)
# ── 클립 길이 배분 ────────────────────────────────────────
total_clip_dur = audio_dur + fade_in + fade_out
n = len(loop_clips)
base_dur = total_clip_dur / n
clip_dur = max(3.0, min(base_dur, 8.0))
# 각 클립 트리밍
trimmed = []
for i, clip in enumerate(loop_clips):
t = work_dir / f'trimmed_{i:02d}.mp4'
src_dur = _get_video_duration(clip)
actual_dur = min(clip_dur, src_dur)
if actual_dur < 1.0:
actual_dur = src_dur
if _trim_clip(clip, t, actual_dur, ffmpeg):
trimmed.append(t)
else:
logger.warning(f'클립 {i} 트리밍 실패 — 건너뜀')
if not trimmed:
raise RuntimeError('트리밍된 클립 없음')
# ── 클립 연결 ─────────────────────────────────────────────
concat_out = work_dir / 'concat.mp4'
if not _concat_with_xfade(trimmed, concat_out, crossfade, ffmpeg):
raise RuntimeError('클립 연결 실패')
# ── 오디오 혼합 ───────────────────────────────────────────
audio_out = work_dir / 'audio_mixed.aac'
if not _mix_audio(tts_wav, bgm_path, bgm_db, audio_dur + 0.2, audio_out, ffmpeg):
# BGM 없이 TTS만
audio_out = tts_wav
# ── 최종 합성 ─────────────────────────────────────────────
final_out = output_dir / f'{timestamp}.mp4'
if not _assemble_final(
concat_out, audio_out, ass_path,
final_out, fade_in, fade_out, audio_dur,
cfg, ffmpeg,
):
raise RuntimeError('최종 합성 실패')
# ── 파일 크기 게이트 ──────────────────────────────────────
if not _check_filesize(final_out, max_mb=50):
logger.warning('파일 크기 초과 (>50MB) — CRF 23으로 재인코딩')
rerender_out = output_dir / f'{timestamp}_small.mp4'
if _rerender_smaller(final_out, rerender_out, ffmpeg):
final_out.unlink()
rerender_out.rename(final_out)
# ── 최종 길이 검증 ─────────────────────────────────────────
final_dur = _get_video_duration(final_out)
if final_dur < 10:
raise RuntimeError(f'최종 영상 길이 너무 짧음: {final_dur:.1f}')
if final_dur > 65:
logger.warning(f'최종 영상 길이 초과: {final_dur:.1f}초 (YouTube Shorts 제한 60초)')
logger.info(f'쇼츠 조립 완료: {final_out.name} ({final_dur:.1f}초)')
return final_out
finally:
if tmp_cleanup and work_dir.exists():
import shutil
shutil.rmtree(work_dir, ignore_errors=True)
+251
View File
@@ -0,0 +1,251 @@
"""
bots/shorts/youtube_uploader.py
역할: 렌더링된 쇼츠 MP4 → YouTube Data API v3 업로드
OAuth2: 기존 Blogger token.json 재사용 (youtube.upload 스코프 추가 필요).
AI Disclosure: YouTube 정책 준수 — 합성 콘텐츠 레이블 자동 설정.
업로드 쿼터: 하루 max daily_upload_limit (기본 6) 체크.
출력:
data/shorts/published/{timestamp}.json
{video_id, url, title, upload_time, article_id}
"""
import json
import logging
import os
from datetime import datetime
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
BASE_DIR = Path(__file__).parent.parent.parent
TOKEN_PATH = BASE_DIR / 'token.json'
PUBLISHED_DIR = BASE_DIR / 'data' / 'shorts' / 'published'
AI_DISCLOSURE_KO = '이 영상은 AI 도구를 활용하여 제작되었습니다.'
YOUTUBE_SCOPES = [
'https://www.googleapis.com/auth/blogger',
'https://www.googleapis.com/auth/youtube.upload',
'https://www.googleapis.com/auth/youtube.readonly',
'https://www.googleapis.com/auth/webmasters',
]
def _load_config() -> dict:
cfg_path = BASE_DIR / 'config' / 'shorts_config.json'
if cfg_path.exists():
return json.loads(cfg_path.read_text(encoding='utf-8'))
return {}
def _get_youtube_service():
"""YouTube Data API v3 서비스 객체 생성 (기존 OAuth token.json 재사용)."""
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
if not TOKEN_PATH.exists():
raise RuntimeError(f'OAuth 토큰 없음: {TOKEN_PATH} — scripts/get_token.py 실행 필요')
creds_data = json.loads(TOKEN_PATH.read_text(encoding='utf-8'))
client_id = os.environ.get('GOOGLE_CLIENT_ID', creds_data.get('client_id', ''))
client_secret = os.environ.get('GOOGLE_CLIENT_SECRET', creds_data.get('client_secret', ''))
creds = Credentials(
token=creds_data.get('token'),
refresh_token=creds_data.get('refresh_token') or os.environ.get('GOOGLE_REFRESH_TOKEN'),
token_uri='https://oauth2.googleapis.com/token',
client_id=client_id,
client_secret=client_secret,
scopes=YOUTUBE_SCOPES,
)
if creds.expired and creds.refresh_token:
creds.refresh(Request())
# 갱신된 토큰 저장
creds_data['token'] = creds.token
TOKEN_PATH.write_text(json.dumps(creds_data, indent=2), encoding='utf-8')
return build('youtube', 'v3', credentials=creds)
def _count_today_uploads(cfg: dict) -> int:
"""오늘 업로드 횟수 카운트."""
PUBLISHED_DIR.mkdir(parents=True, exist_ok=True)
today = datetime.now().strftime('%Y%m%d')
count = 0
for f in PUBLISHED_DIR.glob(f'{today}_*.json'):
try:
data = json.loads(f.read_text(encoding='utf-8'))
if data.get('video_id'):
count += 1
except Exception:
pass
return count
def _build_description(article: dict, script: dict) -> str:
"""업로드 설명 생성: 블로그 링크 + 해시태그 + AI 공시."""
title = article.get('title', '')
blog_url = article.get('url', article.get('link', ''))
corner = article.get('corner', '')
keywords = script.get('keywords', [])
lines = []
if title:
lines.append(title)
if blog_url:
lines.append(f'\n자세한 내용: {blog_url}')
lines.append('')
# 해시태그
tags = ['#Shorts', f'#{corner}'] if corner else ['#Shorts']
tags += [f'#{k.replace(" ", "")}' for k in keywords[:3]]
lines.append(' '.join(tags))
# AI 공시 (YouTube 정책 준수)
lines.append('')
lines.append(AI_DISCLOSURE_KO)
return '\n'.join(lines)
def _build_tags(article: dict, script: dict, cfg: dict) -> list[str]:
"""태그 목록 생성."""
base_tags = cfg.get('youtube', {}).get('default_tags', ['shorts', 'AI', '테크'])
corner = article.get('corner', '')
keywords = script.get('keywords', [])
tags = list(base_tags)
if corner:
tags.append(corner)
tags.extend(keywords[:5])
return list(dict.fromkeys(tags)) # 중복 제거
# ─── 업로드 ──────────────────────────────────────────────────
def upload(
video_path: Path,
article: dict,
script: dict,
timestamp: str,
cfg: Optional[dict] = None,
) -> dict:
"""
쇼츠 MP4 → YouTube 업로드.
Args:
video_path: 렌더링된 MP4 경로
article: article dict (title, url, corner 등)
script: shorts 스크립트 (hook, keywords 등)
timestamp: 파일명 prefix (발행 기록용)
cfg: shorts_config.json dict
Returns:
{video_id, url, title, upload_time, article_id}
Raises:
RuntimeError — 업로드 실패 또는 쿼터 초과
"""
if cfg is None:
cfg = _load_config()
yt_cfg = cfg.get('youtube', {})
daily_limit = yt_cfg.get('daily_upload_limit', 6)
# 쿼터 체크
today_count = _count_today_uploads(cfg)
if today_count >= daily_limit:
raise RuntimeError(f'YouTube 일일 업로드 한도 초과: {today_count}/{daily_limit}')
# 메타데이터 구성
title = script.get('hook', article.get('title', ''))[:100]
description = _build_description(article, script)
tags = _build_tags(article, script, cfg)
try:
from googleapiclient.http import MediaFileUpload
youtube = _get_youtube_service()
body = {
'snippet': {
'title': title,
'description': description,
'tags': tags,
'categoryId': yt_cfg.get('category_id', '28'),
},
'status': {
'privacyStatus': yt_cfg.get('privacy_status', 'public'),
'madeForKids': yt_cfg.get('made_for_kids', False),
'selfDeclaredMadeForKids': False,
},
}
media = MediaFileUpload(
str(video_path),
mimetype='video/mp4',
resumable=True,
chunksize=5 * 1024 * 1024, # 5MB chunks
)
request = youtube.videos().insert(
part='snippet,status',
body=body,
media_body=media,
)
logger.info(f'YouTube 업로드 시작: {video_path.name}')
response = None
while response is None:
status, response = request.next_chunk()
if status:
logger.debug(f'업로드 진행: {int(status.progress() * 100)}%')
video_id = response.get('id', '')
video_url = f'https://www.youtube.com/shorts/{video_id}'
logger.info(f'YouTube 업로드 완료: {video_url}')
# AI 합성 콘텐츠 레이블 설정 (YouTube 정책 준수)
_set_ai_disclosure(youtube, video_id)
except Exception as e:
raise RuntimeError(f'YouTube 업로드 실패: {e}') from e
# 발행 기록 저장
PUBLISHED_DIR.mkdir(parents=True, exist_ok=True)
record = {
'video_id': video_id,
'url': video_url,
'title': title,
'upload_time': datetime.now().isoformat(),
'article_id': article.get('slug', ''),
'script_hook': script.get('hook', ''),
}
record_path = PUBLISHED_DIR / f'{timestamp}.json'
record_path.write_text(json.dumps(record, ensure_ascii=False, indent=2), encoding='utf-8')
logger.info(f'발행 기록 저장: {record_path.name}')
return record
def _set_ai_disclosure(youtube, video_id: str) -> None:
"""
YouTube 합성 콘텐츠 레이블 설정 (v2 — AI 공시 정책 준수).
contentDetails.contentRating 업데이트.
"""
try:
youtube.videos().update(
part='contentDetails',
body={
'id': video_id,
'contentDetails': {
'contentRating': {
# Altered/synthetic content declaration
},
},
},
).execute()
logger.debug('AI 합성 콘텐츠 레이블 설정 완료')
except Exception as e:
# 레이블 실패는 경고만 (업로드 자체는 성공)
logger.warning(f'AI 공시 레이블 설정 실패: {e}')