feat: YouTube Shorts 파이프라인 완성 및 HJW TV 업로드 연동

- youtube_uploader.py: YOUTUBE_REFRESH_TOKEN/CLIENT_ID/CLIENT_SECRET 환경변수 폴백 추가
  (token.json 없는 Docker 환경에서 브랜드 계정 인증 가능)
- shorts_config.json: corners_eligible를 실제 블로그 코너명으로 수정
- caption_renderer.py: render_captions() 반환값 누락 수정
- get_token.py: web→installed 타입 변환, port 8080 고정, prompt=consent 추가
- get_youtube_token.py: YouTube 전용 OAuth 토큰 발급 스크립트 (별도 클라이언트)
- CLAUDE.md: 프로젝트 개요, 배포 방법, 핵심 파일, YouTube 채널 정보 추가
- publisher_bot.py: 이미지 분산 배치, SEO 검증, 버그 수정
- scheduler.py: 알림 강화, atomic write, 중복 방지, hot reload 개선

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
JOUNGWOOK KWON
2026-04-06 09:27:48 +09:00
parent 15dfc39f0f
commit fb5e6ddbdf
8 changed files with 410 additions and 41 deletions

View File

@@ -464,6 +464,19 @@ def fetch_featured_image(article: dict) -> str:
except Exception:
pass
# 5) Unsplash 무료 이미지 검색 (API 키 불필요 — source 파라미터로 크레딧 자동 표시)
title = article.get('title', '')
unsplash_query = title[:50] if title else (search_keywords[0] if search_keywords else '')
if unsplash_query:
try:
unsplash_url = f'https://source.unsplash.com/800x450/?{_quote(unsplash_query)}'
resp = requests.head(unsplash_url, timeout=8, allow_redirects=True)
if resp.status_code == 200 and 'images.unsplash.com' in resp.url:
logger.info(f"Unsplash 이미지 사용: {unsplash_query[:30]}{resp.url[:60]}")
return resp.url
except Exception:
pass
return ''
@@ -512,6 +525,7 @@ def build_full_html(article: dict, body_html: str, toc_html: str) -> str:
f'width="100%" style="max-height:420px;object-fit:cover;border-radius:8px;'
f'margin-bottom:1.2em;" />'
)
n_imgs = len(img_tags)
if n_imgs == 1:
# 1장: 본문 최상단에 배치
body_html = img_tags[0] + '\n' + body_html
@@ -525,11 +539,17 @@ def build_full_html(article: dict, body_html: str, toc_html: str) -> str:
blocks = block_pattern.split(body_html)
boundary_indices = [i for i in range(1, len(blocks), 2)]
if len(boundary_indices) >= n_imgs + 1:
spacing = len(boundary_indices) // (n_imgs + 1)
insert_positions = [spacing * (k + 1) for k in range(n_imgs)]
# 균등 분산: spacing=0 방지를 위해 비율 기반 계산
insert_positions = [
int(len(boundary_indices) * (k + 1) / (n_imgs + 1))
for k in range(n_imgs)
]
# 중복 위치 제거 (spacing이 너무 좁을 때)
insert_positions = sorted(set(insert_positions))
for img_idx, pos in enumerate(reversed(insert_positions)):
bi = boundary_indices[min(pos, len(boundary_indices) - 1)]
blocks.insert(bi, '\n' + img_tags[n_imgs - 1 - img_idx] + '\n')
img_tag_idx = min(len(img_tags) - 1, len(insert_positions) - 1 - img_idx)
blocks.insert(bi, '\n' + img_tags[img_tag_idx] + '\n')
body_html = ''.join(blocks)
else:
body_html = '\n'.join(img_tags) + '\n' + body_html
@@ -715,6 +735,46 @@ def load_pending_review_file(filepath: str) -> dict:
return json.load(f)
def validate_seo(article: dict) -> list[str]:
"""#10 발행 전 SEO 기본 요건 검증 — 경고 목록 반환"""
warnings = []
title = article.get('title', '') or ''
meta = article.get('meta', '') or ''
body = article.get('body', '') or ''
tags = article.get('tags', []) or []
if isinstance(tags, str):
tags = [t.strip() for t in tags.split(',') if t.strip()]
# 제목 길이 (30~70자 권장)
if len(title) < 15:
warnings.append(f"제목이 너무 짧음 ({len(title)}자, 최소 15자 권장)")
elif len(title) > 80:
warnings.append(f"제목이 너무 김 ({len(title)}자, 80자 이내 권장)")
# 메타 설명 (50~160자 권장)
if len(meta) < 30:
warnings.append(f"메타 설명이 너무 짧음 ({len(meta)}자, 최소 30자 권장)")
elif len(meta) > 160:
warnings.append(f"메타 설명이 너무 김 ({len(meta)}자, 160자 이내 권장)")
# H2 태그 (최소 2개 권장)
h2_count = body.lower().count('<h2')
if h2_count < 2:
warnings.append(f"H2 소제목이 부족 ({h2_count}개, 최소 2개 권장)")
# 태그 (최소 3개 권장)
if len(tags) < 3:
warnings.append(f"태그가 부족 ({len(tags)}개, 최소 3개 권장)")
# 본문 길이 (최소 500자)
import re as _re
text_only = _re.sub(r'<[^>]+>', '', body)
if len(text_only) < 500:
warnings.append(f"본문이 너무 짧음 ({len(text_only)}자, 최소 500자 권장)")
return warnings
# ─── 메인 발행 함수 ──────────────────────────────────
def publish(article: dict) -> bool:
@@ -727,6 +787,12 @@ def publish(article: dict) -> bool:
Returns: True(발행 성공) / False(수동 검토 대기)
"""
logger.info(f"발행 시도: {article.get('title', '')}")
# #10 SEO 검증
seo_warnings = validate_seo(article)
if seo_warnings:
logger.warning(f"SEO 경고: {'; '.join(seo_warnings)}")
safety_cfg = load_config('safety_keywords.json')
# 안전장치 검사
@@ -792,6 +858,11 @@ def approve_pending(filepath: str) -> bool:
article.pop('pending_reason', None)
article.pop('created_at', None)
# #10 SEO 검증
seo_warnings = validate_seo(article)
if seo_warnings:
logger.warning(f"SEO 경고 (승인 발행): {'; '.join(seo_warnings)}")
# 안전장치 우회하여 강제 발행
body_html, toc_html = markdown_to_html(article.get('body', ''))
body_html = insert_adsense_placeholders(body_html)

View File

@@ -97,12 +97,95 @@ _awaiting_article_image: dict[int, str] = {}
_publish_enabled = True
# 대화 히스토리 영속 파일
_HISTORY_FILE = DATA_DIR / 'conversation_history.json'
# 스케줄러 중복 실행 방지 Lock
_LOCK_FILE = BASE_DIR / 'scheduler.lock'
def _telegram_notify(text: str):
"""동기 텔레그램 알림 (스케줄 잡에서 사용)"""
import requests as _req
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
return
try:
_req.post(
f'https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage',
json={'chat_id': TELEGRAM_CHAT_ID, 'text': text, 'parse_mode': 'HTML'},
timeout=10,
)
except Exception as e:
logger.error(f"Telegram 알림 실패: {e}")
def _load_conversation_history():
"""파일에서 대화 히스토리 복원"""
global _conversation_history
if _HISTORY_FILE.exists():
try:
data = json.loads(_HISTORY_FILE.read_text(encoding='utf-8'))
_conversation_history = {int(k): v for k, v in data.items()}
logger.info(f"대화 히스토리 복원: {len(_conversation_history)}개 채팅")
except Exception as e:
logger.warning(f"대화 히스토리 로드 실패: {e}")
def _save_conversation_history():
"""대화 히스토리를 파일에 저장 (atomic write)"""
try:
import tempfile
_HISTORY_FILE.parent.mkdir(parents=True, exist_ok=True)
data = json.dumps(_conversation_history, ensure_ascii=False, indent=2)
tmp_fd, tmp_path = tempfile.mkstemp(
dir=str(_HISTORY_FILE.parent), suffix='.tmp'
)
try:
os.write(tmp_fd, data.encode('utf-8'))
os.fsync(tmp_fd)
finally:
os.close(tmp_fd)
os.replace(tmp_path, str(_HISTORY_FILE))
except Exception as e:
logger.warning(f"대화 히스토리 저장 실패: {e}")
def _acquire_lock() -> bool:
"""스케줄러 중복 실행 방지 Lock 획득"""
import fcntl
try:
_LOCK_FILE.parent.mkdir(parents=True, exist_ok=True)
lock_fd = open(_LOCK_FILE, 'w')
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
lock_fd.write(str(os.getpid()))
lock_fd.flush()
# fd를 전역에 유지해야 lock이 유지됨
globals()['_lock_fd'] = lock_fd
logger.info(f"스케줄러 Lock 획득 (PID {os.getpid()})")
return True
except (OSError, IOError):
logger.error("스케줄러가 이미 실행 중입니다! 중복 실행 방지됨.")
return False
def load_schedule() -> dict:
with open(CONFIG_DIR / 'schedule.json', 'r', encoding='utf-8') as f:
return json.load(f)
def _reload_configs():
"""engine.json, schedule.json, .env 등 설정 핫 리로드"""
global TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID, ANTHROPIC_API_KEY, ANTHROPIC_BASE_URL, IMAGE_MODE
logger.info("설정 파일 핫 리로드 시작")
load_dotenv(override=True)
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN', '')
TELEGRAM_CHAT_ID = os.getenv('TELEGRAM_CHAT_ID', '')
ANTHROPIC_API_KEY = os.getenv('ANTHROPIC_API_KEY', '')
ANTHROPIC_BASE_URL = os.getenv('ANTHROPIC_BASE_URL', '')
IMAGE_MODE = os.getenv('IMAGE_MODE', 'manual').lower()
logger.info("설정 핫 리로드 완료")
# ─── 스케줄 작업 ──────────────────────────────────────
def job_collector():
@@ -126,6 +209,34 @@ def job_ai_writer():
logger.error(f"AI 글 작성 트리거 오류: {e}")
def _is_duplicate_topic(topic: str) -> bool:
"""#9 최근 7일 내 동일/유사 주제가 이미 발행되었는지 검사"""
from difflib import SequenceMatcher
published_dir = DATA_DIR / 'published'
if not published_dir.exists():
return False
now = datetime.now()
for f in published_dir.glob('*.json'):
try:
# 파일명에서 날짜 추출 (YYYYMMDD_...)
date_str = f.name[:8]
file_date = datetime.strptime(date_str, '%Y%m%d')
if (now - file_date).days > 7:
continue
article = json.loads(f.read_text(encoding='utf-8'))
prev_topic = article.get('topic', '') or article.get('title', '')
similarity = SequenceMatcher(None, topic, prev_topic).ratio()
if similarity > 0.7:
logger.info(f"중복 주제 감지 ({similarity:.0%}): '{topic}''{prev_topic}'")
return True
except (ValueError, json.JSONDecodeError):
continue
except Exception as e:
logger.debug(f"중복 검사 오류 ({f.name}): {e}")
continue
return False
def _trigger_openclaw_writer():
topics_dir = DATA_DIR / 'topics'
drafts_dir = DATA_DIR / 'drafts'
@@ -143,7 +254,12 @@ def _trigger_openclaw_writer():
if draft_check.exists() or original_check.exists():
continue
topic_data = json.loads(topic_file.read_text(encoding='utf-8'))
logger.info(f"글 작성 요청: {topic_data.get('topic', '')}")
topic_text = topic_data.get('topic', '')
# #9 중복 주제 검사
if _is_duplicate_topic(topic_text):
logger.info(f"중복 주제 건너뜀: {topic_text}")
continue
logger.info(f"글 작성 요청: {topic_text}")
_call_openclaw(topic_data, original_check)
@@ -506,6 +622,7 @@ def job_distribute_instagram():
_distribute_instagram()
except Exception as e:
logger.error(f"인스타그램 배포 오류: {e}")
_telegram_notify(f"⚠️ 인스타그램 배포 오류: {e}")
def _distribute_instagram():
@@ -538,6 +655,7 @@ def job_distribute_instagram_reels():
_distribute_instagram_reels()
except Exception as e:
logger.error(f"Instagram Reels 배포 오류: {e}")
_telegram_notify(f"⚠️ Instagram Reels 배포 오류: {e}")
def _distribute_instagram_reels():
@@ -569,6 +687,7 @@ def job_distribute_x():
_distribute_x()
except Exception as e:
logger.error(f"X 배포 오류: {e}")
_telegram_notify(f"⚠️ X 배포 오류: {e}")
def _distribute_x():
@@ -599,6 +718,7 @@ def job_distribute_tiktok():
_distribute_shorts('tiktok')
except Exception as e:
logger.error(f"TikTok 배포 오류: {e}")
_telegram_notify(f"⚠️ TikTok 배포 오류: {e}")
def job_distribute_youtube():
@@ -610,6 +730,7 @@ def job_distribute_youtube():
_distribute_shorts('youtube')
except Exception as e:
logger.error(f"YouTube 배포 오류: {e}")
_telegram_notify(f"⚠️ YouTube 배포 오류: {e}")
def _distribute_shorts(platform: str):
@@ -622,18 +743,41 @@ def _distribute_shorts(platform: str):
today = datetime.now().strftime('%Y%m%d')
outputs_dir = DATA_DIR / 'outputs'
# #7 YouTube 일일 업로드 제한
yt_daily_limit = 0
if platform == 'youtube':
try:
engine_cfg = json.loads((CONFIG_DIR / 'engine.json').read_text(encoding='utf-8'))
yt_daily_limit = engine_cfg.get('publishing', {}).get('youtube', {}).get('daily_upload_limit', 6)
except Exception:
yt_daily_limit = 6
for shorts_file in sorted(outputs_dir.glob(f'{today}_*_shorts.mp4')):
done_flag = shorts_file.with_suffix(f'.{platform}_done')
if done_flag.exists():
continue
# YouTube 제한: 루프 내에서 매번 체크
if platform == 'youtube' and yt_daily_limit:
done_count = len(list(outputs_dir.glob(f'{today}_*_shorts.youtube_done')))
if done_count >= yt_daily_limit:
logger.info(f"YouTube 일일 업로드 제한 도달 ({done_count}/{yt_daily_limit}) — 중단")
break
slug = shorts_file.stem.replace(f'{today}_', '').replace('_shorts', '')
article = _load_article_by_slug(today, slug)
if not article:
logger.warning(f"{platform}: 원본 article 없음 ({slug})")
continue
success = dist_bot.publish_shorts(article, str(shorts_file))
if success:
done_flag.touch()
try:
success = dist_bot.publish_shorts(article, str(shorts_file))
if success:
done_flag.touch()
logger.info(f"{platform} 업로드 완료: {shorts_file.name}")
else:
_telegram_notify(f"⚠️ {platform} 업로드 실패: {shorts_file.name}")
except Exception as e:
logger.error(f"{platform} 업로드 오류 ({shorts_file.name}): {e}")
_telegram_notify(f"⚠️ {platform} 업로드 오류: {shorts_file.name}\n{e}")
def _load_article_by_slug(date_str: str, slug: str) -> dict:
@@ -712,8 +856,26 @@ def job_novel_pipeline():
async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE):
status = "🟢 발행 활성" if _publish_enabled else "🔴 발행 중단"
mode_label = {'manual': '수동', 'request': '요청', 'auto': '자동'}.get(IMAGE_MODE, IMAGE_MODE)
today = datetime.now().strftime('%Y%m%d')
# #4 일일 리포트 강화 — 오늘 현황 요약
topics_count = len(list((DATA_DIR / 'topics').glob(f'{today}_*.json'))) if (DATA_DIR / 'topics').exists() else 0
originals_count = len(list((DATA_DIR / 'originals').glob(f'{today}_*.json'))) if (DATA_DIR / 'originals').exists() else 0
pending_count = len(list((DATA_DIR / 'pending_review').glob('*_pending.json'))) if (DATA_DIR / 'pending_review').exists() else 0
published_count = len(list((DATA_DIR / 'published').glob(f'{today}_*.json'))) if (DATA_DIR / 'published').exists() else 0
outputs_dir = DATA_DIR / 'outputs'
shorts_count = len(list(outputs_dir.glob(f'{today}_*_shorts.mp4'))) if outputs_dir.exists() else 0
yt_done = len(list(outputs_dir.glob(f'{today}_*.youtube_done'))) if outputs_dir.exists() else 0
await update.message.reply_text(
f"블로그 엔진 상태: {status}\n이미지 모드: {mode_label} ({IMAGE_MODE})"
f"📊 블로그 엔진 상태: {status}\n"
f"이미지 모드: {mode_label}\n\n"
f"── 오늘 ({today[:4]}-{today[4:6]}-{today[6:]}) ──\n"
f"📥 수집: {topics_count}\n"
f"✍️ 작성: {originals_count}\n"
f"⏳ 검토 대기: {pending_count}\n"
f"✅ 발행: {published_count}\n"
f"🎬 쇼츠: {shorts_count}개 (YT업로드: {yt_done}개)"
)
@@ -729,6 +891,12 @@ async def cmd_resume_publish(update: Update, context: ContextTypes.DEFAULT_TYPE)
await update.message.reply_text("🟢 발행이 재개되었습니다.")
async def cmd_reload(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""#8 설정 핫 리로드"""
_reload_configs()
await update.message.reply_text("🔄 설정 파일을 다시 로드했습니다. (engine.json, .env 등)")
async def cmd_collect(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("🔄 글감 수집을 시작합니다...")
loop = asyncio.get_event_loop()
@@ -1768,6 +1936,7 @@ async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
)
reply = response.content[0].text
history.append({"role": "assistant", "content": reply})
_save_conversation_history()
await update.message.reply_text(reply)
except Exception as e:
logger.error(f"Claude API 오류: {e}")
@@ -1982,6 +2151,15 @@ def setup_scheduler() -> AsyncIOScheduler:
async def main():
logger.info("=== 블로그 엔진 스케줄러 시작 ===")
# #6 중복 실행 방지
if not _acquire_lock():
logger.error("스케줄러 종료: 이미 다른 인스턴스가 실행 중")
return
# #3 대화 히스토리 복원
_load_conversation_history()
scheduler = setup_scheduler()
scheduler.start()
@@ -2002,6 +2180,7 @@ async def main():
app.add_handler(CommandHandler('topic', cmd_topic))
app.add_handler(CommandHandler('topics', cmd_show_topics))
app.add_handler(CommandHandler('convert', cmd_convert))
app.add_handler(CommandHandler('reload', cmd_reload))
# 이미지 관련 (request / manual 공통 사용 가능)
app.add_handler(CommandHandler('images', cmd_images))

View File

@@ -375,6 +375,7 @@ def render_captions(
ass_content = header + '\n'.join(events) + '\n'
ass_path.write_text(ass_content, encoding='utf-8-sig') # BOM for Windows compatibility
logger.info(f'ASS 자막 생성: {ass_path.name} ({len(timestamps)}단어, {len(lines)}라인)')
return ass_path
# ── Standalone test ──────────────────────────────────────────────
@@ -429,4 +430,3 @@ if __name__ == '__main__':
assert exists and size > 0, "ASS 파일 생성 실패"
print("\n✅ 모든 테스트 통과")
return ass_path

View File

@@ -40,31 +40,56 @@ def _load_config() -> dict:
def _get_youtube_service():
"""YouTube Data API v3 서비스 객체 생성 (기존 OAuth token.json 재사용)."""
"""YouTube Data API v3 서비스 객체 생성 (token.json 우선, env fallback)."""
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
if not TOKEN_PATH.exists():
raise RuntimeError(f'OAuth 토큰 없음: {TOKEN_PATH} — scripts/get_token.py 실행 필요')
creds = None
creds_data = json.loads(TOKEN_PATH.read_text(encoding='utf-8'))
client_id = os.environ.get('GOOGLE_CLIENT_ID', creds_data.get('client_id', ''))
client_secret = os.environ.get('GOOGLE_CLIENT_SECRET', creds_data.get('client_secret', ''))
# 1) token.json 파일 우선
if TOKEN_PATH.exists():
creds_data = json.loads(TOKEN_PATH.read_text(encoding='utf-8'))
client_id = os.environ.get('GOOGLE_CLIENT_ID', creds_data.get('client_id', ''))
client_secret = os.environ.get('GOOGLE_CLIENT_SECRET', creds_data.get('client_secret', ''))
creds = Credentials(
token=creds_data.get('token'),
refresh_token=creds_data.get('refresh_token') or os.environ.get('GOOGLE_REFRESH_TOKEN'),
token_uri='https://oauth2.googleapis.com/token',
client_id=client_id,
client_secret=client_secret,
)
creds = Credentials(
token=creds_data.get('token'),
refresh_token=creds_data.get('refresh_token') or os.environ.get('GOOGLE_REFRESH_TOKEN'),
token_uri='https://oauth2.googleapis.com/token',
client_id=client_id,
client_secret=client_secret,
scopes=YOUTUBE_SCOPES,
)
if creds.expired and creds.refresh_token:
creds.refresh(Request())
# 갱신된 토큰 저장
creds_data['token'] = creds.token
TOKEN_PATH.write_text(json.dumps(creds_data, indent=2), encoding='utf-8')
# 2) .env의 YOUTUBE_REFRESH_TOKEN으로 직접 생성 (Docker 환경 대응)
if not creds:
refresh_token = os.environ.get('YOUTUBE_REFRESH_TOKEN', '') or os.environ.get('GOOGLE_REFRESH_TOKEN', '')
client_id = os.environ.get('YOUTUBE_CLIENT_ID', '') or os.environ.get('GOOGLE_CLIENT_ID', '')
client_secret = os.environ.get('YOUTUBE_CLIENT_SECRET', '') or os.environ.get('GOOGLE_CLIENT_SECRET', '')
if not all([refresh_token, client_id, client_secret]):
raise RuntimeError(
'OAuth 인증 정보 없음: token.json 또는 '
'YOUTUBE_REFRESH_TOKEN/YOUTUBE_CLIENT_ID/YOUTUBE_CLIENT_SECRET 환경변수 필요'
)
creds = Credentials(
token=None,
refresh_token=refresh_token,
token_uri='https://oauth2.googleapis.com/token',
client_id=client_id,
client_secret=client_secret,
)
logger.info('token.json 없음 — YOUTUBE_REFRESH_TOKEN 환경변수로 인증')
# 토큰 갱신
if creds.expired or not creds.token:
if creds.refresh_token:
creds.refresh(Request())
# token.json이 있으면 갱신된 토큰 저장
if TOKEN_PATH.exists():
creds_data = json.loads(TOKEN_PATH.read_text(encoding='utf-8'))
creds_data['token'] = creds.token
TOKEN_PATH.write_text(json.dumps(creds_data, indent=2), encoding='utf-8')
else:
raise RuntimeError('refresh_token 없음 — 재인증 필요')
return build('youtube', 'v3', credentials=creds)