feat: v3 멀티플랫폼 자동화 엔진 — 변환/배포 엔진 + 쇼츠 + README

## 변환 엔진 (bots/converters/)
- blog_converter: HTML 자동감지 + Schema.org JSON-LD + AdSense 플레이스홀더
- card_converter: Pillow 1080×1080 인스타그램 카드 이미지
- thread_converter: X 스레드 280자 자동 분할
- newsletter_converter: 주간 HTML 뉴스레터
- shorts_converter: TTS + ffmpeg 뉴스앵커 쇼츠 영상 (1080×1920)

## 배포 엔진 (bots/distributors/)
- image_host: ImgBB 업로드 / 로컬 HTTP 서버
- instagram_bot: Instagram Graph API (컨테이너 → 폴링 → 발행)
- x_bot: X API v2 OAuth1 스레드 게시
- tiktok_bot: TikTok Content Posting API v2 청크 업로드
- youtube_bot: YouTube Data API v3 재개가능 업로드

## 기타
- article_parser: KEY_POINTS 파싱 추가 (SNS/TTS용 핵심 3줄)
- publisher_bot: HTML 본문 직접 발행 지원
- scheduler: 시차 배포 스케줄 + Telegram 변환/배포 명령 추가
- remote_claude: Claude Agent SDK Telegram 연동
- templates/shorts_template.json: 코너별 색상/TTS/트랜지션 설정
- scripts/download_fonts.py: NotoSansKR / 맑은고딕 자동 설치
- .gitignore: .claude/, 기획문서, 생성 미디어 파일 추가
- .env.example: 플레이스홀더 텍스트 (실제 값 없음)
- README: v3 아키텍처 전체 문서화 (설치/API키/상세설명/FAQ)
- requirements.txt: openai, pydub 추가

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sinmb79
2026-03-25 18:15:07 +09:00
parent 6d6ba14e76
commit b54f8e198e
24 changed files with 4367 additions and 274 deletions
+10
View File
@@ -44,6 +44,15 @@ def parse_output(raw_output: str) -> Optional[dict]:
coupang_raw = sections.get('COUPANG_KEYWORDS', '')
coupang_keywords = [k.strip() for k in coupang_raw.split(',') if k.strip()]
# KEY_POINTS 파싱 (변환 엔진용 핵심 3줄)
key_points_raw = sections.get('KEY_POINTS', '')
key_points = []
for line in key_points_raw.splitlines():
line = line.strip().lstrip('•-*').strip()
if line:
key_points.append(line)
key_points = key_points[:3] # 최대 3개
return {
'title': sections.get('TITLE', ''),
'meta': sections.get('META', ''),
@@ -52,6 +61,7 @@ def parse_output(raw_output: str) -> Optional[dict]:
'corner': sections.get('CORNER', ''),
'body': sections.get('BODY', ''),
'coupang_keywords': coupang_keywords,
'key_points': key_points,
'sources': sources,
'disclaimer': sections.get('DISCLAIMER', ''),
}
View File
+172
View File
@@ -0,0 +1,172 @@
"""
블로그 변환봇 (converters/blog_converter.py)
역할: 원본 마크다운 → 블로그 HTML 변환 (LAYER 2)
- 마크다운 → HTML (목차, 테이블, 코드블록)
- AdSense 플레이스홀더 삽입
- Schema.org Article JSON-LD
- 쿠팡 링크봇 호출
출력: data/outputs/{date}_{slug}_blog.html
"""
import json
import logging
import sys
from datetime import datetime, timezone
from pathlib import Path
import markdown
from bs4 import BeautifulSoup
BASE_DIR = Path(__file__).parent.parent.parent
sys.path.insert(0, str(BASE_DIR / 'bots'))
LOG_DIR = BASE_DIR / 'logs'
LOG_DIR.mkdir(exist_ok=True)
OUTPUT_DIR = BASE_DIR / 'data' / 'outputs'
OUTPUT_DIR.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(LOG_DIR / 'converter.log', encoding='utf-8'),
logging.StreamHandler(),
]
)
logger = logging.getLogger(__name__)
BLOG_BASE_URL = 'https://the4thpath.com'
def markdown_to_html(md_text: str) -> tuple[str, str]:
"""마크다운 → HTML (목차 포함)"""
md = markdown.Markdown(
extensions=['toc', 'tables', 'fenced_code', 'attr_list'],
extension_configs={
'toc': {'title': '목차', 'toc_depth': '2-3'}
}
)
html = md.convert(md_text)
toc = md.toc
return html, toc
def insert_adsense_placeholders(html: str) -> str:
"""두 번째 H2 뒤 + 결론 H2 앞에 AdSense 슬롯 삽입"""
AD_SLOT_1 = '\n<!-- AD_SLOT_1 -->\n'
AD_SLOT_2 = '\n<!-- AD_SLOT_2 -->\n'
soup = BeautifulSoup(html, 'lxml')
h2_tags = soup.find_all('h2')
if len(h2_tags) >= 2:
ad_tag = BeautifulSoup(AD_SLOT_1, 'html.parser')
h2_tags[1].insert_after(ad_tag)
for h2 in soup.find_all('h2'):
if any(kw in h2.get_text() for kw in ['결론', '마무리', '정리', '요약', 'conclusion']):
ad_tag2 = BeautifulSoup(AD_SLOT_2, 'html.parser')
h2.insert_before(ad_tag2)
break
return str(soup)
def build_json_ld(article: dict, post_url: str = '') -> str:
"""Schema.org Article JSON-LD"""
schema = {
"@context": "https://schema.org",
"@type": "Article",
"headline": article.get('title', ''),
"description": article.get('meta', ''),
"datePublished": datetime.now(timezone.utc).isoformat(),
"dateModified": datetime.now(timezone.utc).isoformat(),
"author": {"@type": "Person", "name": "테크인사이더"},
"publisher": {
"@type": "Organization",
"name": "The 4th Path",
"logo": {"@type": "ImageObject", "url": f"{BLOG_BASE_URL}/logo.png"}
},
"mainEntityOfPage": {"@type": "WebPage", "@id": post_url or BLOG_BASE_URL},
}
return f'<script type="application/ld+json">\n{json.dumps(schema, ensure_ascii=False, indent=2)}\n</script>'
def build_full_html(article: dict, body_html: str, toc_html: str,
post_url: str = '') -> str:
"""JSON-LD + 목차 + 본문 + 면책 조합"""
json_ld = build_json_ld(article, post_url)
disclaimer = article.get('disclaimer', '')
parts = [json_ld]
if toc_html:
parts.append(f'<div class="toc-wrapper">{toc_html}</div>')
parts.append(body_html)
if disclaimer:
parts.append(f'<hr/><p class="disclaimer"><small>{disclaimer}</small></p>')
return '\n'.join(parts)
def _is_html_body(body: str) -> bool:
"""AI가 이미 HTML을 출력했는지 감지 (마크다운 변환 건너뜀)"""
stripped = body.lstrip()
return stripped.startswith('<') and any(
tag in stripped[:200].lower()
for tag in ['<style', '<div', '<h1', '<section', '<article', '<p>']
)
def convert(article: dict, save_file: bool = True) -> str:
"""
article dict → 블로그 HTML 문자열 반환.
save_file=True이면 data/outputs/에 저장.
BODY가 이미 HTML이면 마크다운 변환을 건너뜀.
"""
logger.info(f"블로그 변환 시작: {article.get('title', '')}")
body = article.get('body', '')
if _is_html_body(body):
# AI가 Blogger-ready HTML을 직접 출력한 경우 — 변환 없이 그대로 사용
logger.info("HTML 본문 감지 — 마크다운 변환 건너뜀")
body_html = body
toc_html = ''
else:
# 레거시 마크다운 → HTML 변환
body_html, toc_html = markdown_to_html(body)
# AdSense 삽입
body_html = insert_adsense_placeholders(body_html)
# 쿠팡 링크 삽입
try:
import linker_bot
body_html = linker_bot.process(article, body_html)
except Exception as e:
logger.warning(f"쿠팡 링크 삽입 실패 (건너뜀): {e}")
# 최종 HTML
html = build_full_html(article, body_html, toc_html)
if save_file:
slug = article.get('slug', 'article')
date_str = datetime.now().strftime('%Y%m%d')
filename = f"{date_str}_{slug}_blog.html"
output_path = OUTPUT_DIR / filename
output_path.write_text(html, encoding='utf-8')
logger.info(f"저장: {output_path}")
logger.info("블로그 변환 완료")
return html
if __name__ == '__main__':
sample = {
'title': '테스트 글',
'meta': '테스트 설명',
'slug': 'test-article',
'corner': '쉬운세상',
'body': '## 소개\n\n본문입니다.\n\n## 결론\n\n마무리입니다.',
'coupang_keywords': [],
'disclaimer': '',
'key_points': ['포인트 1', '포인트 2', '포인트 3'],
}
html = convert(sample)
print(html[:500])
+182
View File
@@ -0,0 +1,182 @@
"""
카드 변환봇 (converters/card_converter.py)
역할: 원본 마크다운 → 인스타그램 카드 이미지 (LAYER 2)
- 크기: 1080×1080 (정사각형)
- 배경: 흰색 + 골드 액센트 (#c8a84e)
- 폰트: Noto Sans KR (없으면 기본 폰트)
- 구성: 로고 + 코너 배지 + 제목 + 핵심 3줄 + URL
출력: data/outputs/{date}_{slug}_card.png
"""
import logging
import textwrap
from datetime import datetime
from pathlib import Path
BASE_DIR = Path(__file__).parent.parent.parent
LOG_DIR = BASE_DIR / 'logs'
LOG_DIR.mkdir(exist_ok=True)
OUTPUT_DIR = BASE_DIR / 'data' / 'outputs'
OUTPUT_DIR.mkdir(exist_ok=True)
ASSETS_DIR = BASE_DIR / 'assets'
FONTS_DIR = ASSETS_DIR / 'fonts'
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(LOG_DIR / 'converter.log', encoding='utf-8'),
logging.StreamHandler(),
]
)
logger = logging.getLogger(__name__)
# 디자인 상수
CARD_SIZE = (1080, 1080)
COLOR_WHITE = (255, 255, 255)
COLOR_GOLD = (200, 168, 78) # #c8a84e
COLOR_DARK = (30, 30, 30)
COLOR_GRAY = (120, 120, 120)
COLOR_GOLD_LIGHT = (255, 248, 220)
CORNER_COLORS = {
'쉬운세상': (52, 152, 219), # 파랑
'숨은보물': (46, 204, 113), # 초록
'바이브리포트': (155, 89, 182), # 보라
'팩트체크': (231, 76, 60), # 빨강
'한컷': (241, 196, 15), # 노랑
}
BLOG_URL = 'the4thpath.com'
BRAND_NAME = 'The 4th Path'
SUB_BRAND = 'by 22B Labs'
def _load_font(size: int):
"""Noto Sans KR 폰트 로드 (없으면 기본 폰트)"""
try:
from PIL import ImageFont
for fname in ['NotoSansKR-Bold.ttf', 'NotoSansKR-Regular.ttf', 'NotoSansKR-Medium.ttf']:
font_path = FONTS_DIR / fname
if font_path.exists():
return ImageFont.truetype(str(font_path), size)
# Windows 기본 한글 폰트 시도
for path in [
'C:/Windows/Fonts/malgun.ttf',
'C:/Windows/Fonts/malgunbd.ttf',
'C:/Windows/Fonts/NanumGothic.ttf',
]:
if Path(path).exists():
return ImageFont.truetype(path, size)
except Exception:
pass
try:
from PIL import ImageFont
return ImageFont.load_default()
except Exception:
return None
def _draw_rounded_rect(draw, xy, radius: int, fill):
"""PIL로 둥근 사각형 그리기"""
from PIL import ImageDraw
x1, y1, x2, y2 = xy
draw.rectangle([x1 + radius, y1, x2 - radius, y2], fill=fill)
draw.rectangle([x1, y1 + radius, x2, y2 - radius], fill=fill)
draw.ellipse([x1, y1, x1 + radius * 2, y1 + radius * 2], fill=fill)
draw.ellipse([x2 - radius * 2, y1, x2, y1 + radius * 2], fill=fill)
draw.ellipse([x1, y2 - radius * 2, x1 + radius * 2, y2], fill=fill)
draw.ellipse([x2 - radius * 2, y2 - radius * 2, x2, y2], fill=fill)
def convert(article: dict, save_file: bool = True) -> str:
"""
article dict → 카드 이미지 PNG.
Returns: 저장 경로 문자열 (save_file=False면 빈 문자열)
"""
try:
from PIL import Image, ImageDraw
except ImportError:
logger.error("Pillow가 설치되지 않음. pip install Pillow")
return ''
title = article.get('title', '')
corner = article.get('corner', '쉬운세상')
key_points = article.get('key_points', [])
logger.info(f"카드 변환 시작: {title}")
# 캔버스
img = Image.new('RGB', CARD_SIZE, COLOR_WHITE)
draw = ImageDraw.Draw(img)
# 골드 상단 바 (80px)
draw.rectangle([0, 0, 1080, 80], fill=COLOR_GOLD)
# 브랜드명 (좌상단)
font_brand = _load_font(36)
font_sub = _load_font(22)
font_corner = _load_font(26)
font_title = _load_font(52)
font_point = _load_font(38)
font_url = _load_font(28)
if font_brand:
draw.text((40, 22), BRAND_NAME, font=font_brand, fill=COLOR_WHITE)
if font_sub:
draw.text((460, 28), SUB_BRAND, font=font_sub, fill=(240, 235, 210))
# 코너 배지
badge_color = CORNER_COLORS.get(corner, COLOR_GOLD)
_draw_rounded_rect(draw, [40, 110, 250, 160], 20, badge_color)
if font_corner:
draw.text((60, 122), corner, font=font_corner, fill=COLOR_WHITE)
# 제목 (멀티라인, 최대 3줄)
title_lines = textwrap.wrap(title, width=18)[:3]
y_title = 200
for line in title_lines:
if font_title:
draw.text((40, y_title), line, font=font_title, fill=COLOR_DARK)
y_title += 65
# 구분선
draw.rectangle([40, y_title + 10, 1040, y_title + 14], fill=COLOR_GOLD)
# 핵심 포인트
y_points = y_title + 40
for i, point in enumerate(key_points[:3]):
# 불릿 원
draw.ellipse([40, y_points + 8, 64, y_points + 32], fill=COLOR_GOLD)
if font_point:
point_short = textwrap.shorten(point, width=22, placeholder='...')
draw.text((76, y_points), point_short, font=font_point, fill=COLOR_DARK)
y_points += 60
# 하단 바 (URL + 브랜딩)
draw.rectangle([0, 980, 1080, 1080], fill=COLOR_GOLD)
if font_url:
draw.text((40, 1008), BLOG_URL, font=font_url, fill=COLOR_WHITE)
# 저장
output_path = ''
if save_file:
slug = article.get('slug', 'article')
date_str = datetime.now().strftime('%Y%m%d')
filename = f"{date_str}_{slug}_card.png"
output_path = str(OUTPUT_DIR / filename)
img.save(output_path, 'PNG')
logger.info(f"카드 저장: {output_path}")
logger.info("카드 변환 완료")
return output_path
if __name__ == '__main__':
sample = {
'title': 'ChatGPT 처음 쓰는 사람을 위한 완전 가이드',
'slug': 'chatgpt-guide',
'corner': '쉬운세상',
'key_points': ['무료로 바로 시작 가능', 'GPT-4는 유료지만 3.5도 충분', '프롬프트가 결과를 결정한다'],
}
path = convert(sample)
print(f"저장: {path}")
+141
View File
@@ -0,0 +1,141 @@
"""
뉴스레터 변환봇 (converters/newsletter_converter.py)
역할: 원본 마크다운 → 주간 뉴스레터 HTML 발췌 (LAYER 2)
- TITLE + META + KEY_POINTS 발췌
- 주간 단위로 모아서 뉴스레터 HTML 생성
출력: data/outputs/weekly_{date}_newsletter.html
Phase 3에서 Substack 등 연동 예정
"""
import json
import logging
from datetime import datetime
from pathlib import Path
BASE_DIR = Path(__file__).parent.parent.parent
LOG_DIR = BASE_DIR / 'logs'
OUTPUT_DIR = BASE_DIR / 'data' / 'outputs'
OUTPUT_DIR.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(LOG_DIR / 'converter.log', encoding='utf-8'),
logging.StreamHandler(),
]
)
logger = logging.getLogger(__name__)
BLOG_BASE_URL = 'https://the4thpath.com'
def extract_newsletter_item(article: dict, blog_url: str = '') -> dict:
"""단일 글에서 뉴스레터용 발췌 추출"""
return {
'title': article.get('title', ''),
'meta': article.get('meta', ''),
'corner': article.get('corner', ''),
'key_points': article.get('key_points', []),
'url': blog_url or f"{BLOG_BASE_URL}/{article.get('slug', '')}",
'extracted_at': datetime.now().isoformat(),
}
def build_newsletter_html(items: list[dict], week_str: str = '') -> str:
"""주간 뉴스레터 HTML 생성"""
if not week_str:
week_str = datetime.now().strftime('%Y년 %m월 %d일 주간')
article_blocks = []
for item in items:
points_html = ''.join(
f'<li>{p}</li>' for p in item.get('key_points', [])
)
block = f"""
<div style="margin-bottom:32px;padding-bottom:24px;border-bottom:1px solid #eee;">
<p style="color:#c8a84e;font-size:12px;margin:0 0 4px;">{item.get('corner','')}</p>
<h2 style="font-size:20px;margin:0 0 8px;">
<a href="{item.get('url','')}" style="color:#1a1a1a;text-decoration:none;">{item.get('title','')}</a>
</h2>
<p style="color:#555;font-size:14px;margin:0 0 12px;">{item.get('meta','')}</p>
<ul style="color:#333;font-size:14px;margin:0;padding-left:20px;">
{points_html}
</ul>
<p style="margin:12px 0 0;">
<a href="{item.get('url','')}" style="color:#c8a84e;font-size:13px;">전체 읽기 →</a>
</p>
</div>"""
article_blocks.append(block)
articles_html = '\n'.join(article_blocks)
return f"""<!DOCTYPE html>
<html lang="ko">
<head>
<meta charset="UTF-8">
<title>The 4th Path 주간 뉴스레터 — {week_str}</title>
</head>
<body style="font-family:'Noto Sans KR',sans-serif;max-width:600px;margin:0 auto;padding:20px;color:#1a1a1a;">
<div style="background:#c8a84e;padding:24px;margin-bottom:32px;">
<h1 style="color:#fff;margin:0;font-size:28px;">The 4th Path</h1>
<p style="color:#fff5dc;margin:4px 0 0;font-size:14px;">{week_str} 뉴스레터</p>
</div>
{articles_html}
<div style="margin-top:32px;padding-top:16px;border-top:2px solid #c8a84e;text-align:center;">
<p style="color:#888;font-size:12px;">
<a href="{BLOG_BASE_URL}" style="color:#c8a84e;">the4thpath.com</a> | by 22B Labs
</p>
</div>
</body>
</html>"""
def generate_weekly(articles: list[dict], urls: list[str] = None,
save_file: bool = True) -> str:
"""
여러 글을 모아 주간 뉴스레터 HTML 생성.
articles: article dict 리스트
urls: 각 글의 발행 URL (없으면 slug로 생성)
"""
logger.info(f"주간 뉴스레터 생성 시작: {len(articles)}개 글")
items = []
for i, article in enumerate(articles):
url = (urls[i] if urls and i < len(urls) else '')
items.append(extract_newsletter_item(article, url))
week_str = datetime.now().strftime('%Y년 %m월 %d')
html = build_newsletter_html(items, week_str)
if save_file:
date_str = datetime.now().strftime('%Y%m%d')
filename = f"weekly_{date_str}_newsletter.html"
output_path = OUTPUT_DIR / filename
output_path.write_text(html, encoding='utf-8')
logger.info(f"뉴스레터 저장: {output_path}")
logger.info("주간 뉴스레터 생성 완료")
return html
if __name__ == '__main__':
samples = [
{
'title': 'ChatGPT 처음 쓰는 사람을 위한 완전 가이드',
'meta': 'ChatGPT를 처음 사용하는 분을 위한 단계별 가이드',
'slug': 'chatgpt-guide',
'corner': '쉬운세상',
'key_points': ['무료로 바로 시작', 'GPT-3.5로도 충분', '프롬프트가 핵심'],
},
{
'title': '개발자 생산성 10배 높이는 AI 도구 5선',
'meta': '실제 사용해본 AI 개발 도구 정직한 리뷰',
'slug': 'ai-dev-tools',
'corner': '숨은보물',
'key_points': ['Cursor로 코딩 속도 3배', 'Copilot과 차이점', '무료 플랜으로 충분'],
},
]
html = generate_weekly(samples)
print(html[:500])
+875
View File
@@ -0,0 +1,875 @@
"""
쇼츠 변환봇 (converters/shorts_converter.py)
역할: 원본 마크다운 → 뉴스앵커 포맷 쇼츠 MP4 (LAYER 2)
설계서: shorts-video-template-spec.txt
파이프라인:
1. 슬라이드 구성 결정 (intro/headline/point×3/data?/outro)
2. 각 섹션 TTS 생성 → 개별 WAV
3. DALL-E 배경 이미지 생성 (선택)
4. Pillow UI 오버레이 합성 → 슬라이드 PNG × N
5. 슬라이드 → 개별 클립 MP4 (Ken Burns zoompan)
6. xfade 전환으로 클립 결합
7. BGM 믹스 (8%)
8. SRT 자막 burn-in
9. 최종 MP4 저장
출력: data/outputs/{date}_{slug}_shorts.mp4 (1080×1920, 30~60초)
사전 조건:
pip install Pillow pydub google-cloud-texttospeech openai gTTS
ffmpeg 설치 후 PATH 등록 또는 FFMPEG_PATH 환경변수
"""
import base64
import json
import logging
import os
import subprocess
import textwrap
from datetime import datetime
from pathlib import Path
from typing import Optional
from dotenv import load_dotenv
load_dotenv()
BASE_DIR = Path(__file__).parent.parent.parent
LOG_DIR = BASE_DIR / 'logs'
OUTPUT_DIR = BASE_DIR / 'data' / 'outputs'
ASSETS_DIR = BASE_DIR / 'assets'
FONTS_DIR = ASSETS_DIR / 'fonts'
TEMPLATE_PATH = BASE_DIR / 'templates' / 'shorts_template.json'
BGM_PATH = ASSETS_DIR / 'bgm.mp3'
LOG_DIR.mkdir(exist_ok=True)
OUTPUT_DIR.mkdir(exist_ok=True)
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.FileHandler(LOG_DIR / 'converter.log', encoding='utf-8')
handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))
logger.addHandler(handler)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)
FFMPEG = os.getenv('FFMPEG_PATH', 'ffmpeg')
FFPROBE = os.getenv('FFPROBE_PATH', 'ffprobe')
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY', '')
GOOGLE_TTS_API_KEY = os.getenv('GOOGLE_TTS_API_KEY', '')
# 컬러 상수
COLOR_DARK = (10, 10, 13) # #0a0a0d
COLOR_DARK2 = (15, 10, 30) # #0f0a1e
COLOR_GOLD = (200, 168, 78) # #c8a84e
COLOR_WHITE = (255, 255, 255)
COLOR_BLACK = (0, 0, 0)
COLOR_TICKER_BG = (0, 0, 0, 200)
# ─── 설정 로드 ────────────────────────────────────────
def _load_template() -> dict:
if TEMPLATE_PATH.exists():
return json.loads(TEMPLATE_PATH.read_text(encoding='utf-8'))
return {}
# ─── 폰트 헬퍼 ───────────────────────────────────────
def _load_font(size: int, bold: bool = False):
"""NotoSansKR 로드, 없으면 Windows 맑은고딕, 없으면 기본 폰트"""
try:
from PIL import ImageFont
candidates = (
['NotoSansKR-Bold.ttf', 'NotoSansKR-Medium.ttf'] if bold
else ['NotoSansKR-Regular.ttf', 'NotoSansKR-Medium.ttf']
)
for fname in candidates:
p = FONTS_DIR / fname
if p.exists():
return ImageFont.truetype(str(p), size)
win_font = 'malgunbd.ttf' if bold else 'malgun.ttf'
wp = Path(f'C:/Windows/Fonts/{win_font}')
if wp.exists():
return ImageFont.truetype(str(wp), size)
return ImageFont.load_default()
except Exception:
return None
def _text_size(draw, text: str, font) -> tuple[int, int]:
"""PIL 버전 호환 텍스트 크기 측정"""
try:
bb = draw.textbbox((0, 0), text, font=font)
return bb[2] - bb[0], bb[3] - bb[1]
except AttributeError:
return draw.textsize(text, font=font)
# ─── Pillow 헬퍼 ─────────────────────────────────────
def _hex_to_rgb(hex_color: str) -> tuple[int, int, int]:
h = hex_color.lstrip('#')
return tuple(int(h[i:i+2], 16) for i in (0, 2, 4))
def _draw_rounded_rect(draw, xy, radius: int, fill):
x1, y1, x2, y2 = xy
r = radius
draw.rectangle([x1 + r, y1, x2 - r, y2], fill=fill)
draw.rectangle([x1, y1 + r, x2, y2 - r], fill=fill)
for cx, cy in [(x1, y1), (x2 - 2*r, y1), (x1, y2 - 2*r), (x2 - 2*r, y2 - 2*r)]:
draw.ellipse([cx, cy, cx + 2*r, cy + 2*r], fill=fill)
def _draw_gradient_overlay(img, top_alpha: int = 0, bottom_alpha: int = 200):
"""하단 다크 그라데이션 오버레이"""
from PIL import Image
W, H = img.size
overlay = Image.new('RGBA', (W, H), (0, 0, 0, 0))
import struct
for y in range(H // 2, H):
t = (y - H // 2) / (H // 2)
alpha = int(top_alpha + (bottom_alpha - top_alpha) * t)
for x in range(W):
overlay.putpixel((x, y), (0, 0, 0, alpha))
return Image.alpha_composite(img.convert('RGBA'), overlay).convert('RGB')
def _wrap_text_lines(text: str, font, max_width: int, draw) -> list[str]:
"""폰트 기준 줄 바꿈"""
words = text.split()
lines = []
current = ''
for word in words:
test = (current + ' ' + word).strip()
w, _ = _text_size(draw, test, font)
if w <= max_width:
current = test
else:
if current:
lines.append(current)
current = word
if current:
lines.append(current)
return lines
# ─── TTS ──────────────────────────────────────────────
def _tts_google_rest(text: str, output_path: str, voice: str, speed: float) -> bool:
"""Google Cloud TTS REST API (API Key 방식)"""
if not GOOGLE_TTS_API_KEY:
return False
try:
import requests as req
url = f'https://texttospeech.googleapis.com/v1/text:synthesize?key={GOOGLE_TTS_API_KEY}'
lang = 'ko-KR' if voice.startswith('ko') else 'en-US'
payload = {
'input': {'text': text},
'voice': {'languageCode': lang, 'name': voice},
'audioConfig': {
'audioEncoding': 'LINEAR16',
'speakingRate': speed,
'pitch': 0,
},
}
resp = req.post(url, json=payload, timeout=30)
resp.raise_for_status()
audio_b64 = resp.json().get('audioContent', '')
if audio_b64:
Path(output_path).write_bytes(base64.b64decode(audio_b64))
return True
except Exception as e:
logger.warning(f"Google Cloud TTS 실패: {e}")
return False
def _tts_gtts(text: str, output_path: str) -> bool:
"""gTTS 무료 (mp3 → pydub으로 wav 변환)"""
try:
from gtts import gTTS
mp3_path = output_path.replace('.wav', '_tmp.mp3')
tts = gTTS(text=text, lang='ko', slow=False)
tts.save(mp3_path)
# mp3 → wav
_run_ffmpeg(['-i', mp3_path, '-ar', '24000', output_path], quiet=True)
Path(mp3_path).unlink(missing_ok=True)
return Path(output_path).exists()
except Exception as e:
logger.warning(f"gTTS 실패: {e}")
return False
def synthesize_section(text: str, output_path: str, voice: str, speed: float) -> bool:
"""섹션별 TTS 생성 (Google Cloud REST → gTTS fallback)"""
if _tts_google_rest(text, output_path, voice, speed):
return True
return _tts_gtts(text, output_path)
def get_audio_duration(wav_path: str) -> float:
"""ffprobe로 오디오 파일 길이(초) 측정"""
try:
result = subprocess.run(
[FFPROBE, '-v', 'quiet', '-print_format', 'json',
'-show_format', wav_path],
capture_output=True, text=True, timeout=10
)
data = json.loads(result.stdout)
return float(data['format']['duration'])
except Exception:
# 폴백: 텍스트 길이 추정 (한국어 약 4자/초)
return max(2.0, len(text) / 4.0) if 'text' in dir() else 5.0
# ─── DALL-E 배경 이미지 ────────────────────────────────
def generate_background_dalle(prompt: str, corner: str) -> Optional['Image']:
"""
DALL-E 3로 배경 이미지 생성 (1024×1792 → 1080×1920 리사이즈).
OPENAI_API_KEY 없으면 None 반환 → 단색 배경 사용.
"""
if not OPENAI_API_KEY:
return None
try:
from openai import OpenAI
from PIL import Image
import io, requests as req
client = OpenAI(api_key=OPENAI_API_KEY)
full_prompt = prompt + ' No text, no letters, no numbers, no watermarks.'
response = client.images.generate(
model='dall-e-3',
prompt=full_prompt,
size='1024x1792',
quality='standard',
n=1,
)
img_url = response.data[0].url
img_bytes = req.get(img_url, timeout=30).content
img = Image.open(io.BytesIO(img_bytes)).convert('RGB')
img = img.resize((1080, 1920), Image.LANCZOS)
logger.info(f"DALL-E 배경 생성 완료: {corner}")
return img
except Exception as e:
logger.warning(f"DALL-E 배경 생성 실패 (단색 사용): {e}")
return None
def solid_background(color: tuple) -> 'Image':
"""단색 배경 이미지 생성"""
from PIL import Image
return Image.new('RGB', (1080, 1920), color)
# ─── 슬라이드 합성 ────────────────────────────────────
def compose_intro_slide(cfg: dict) -> str:
"""인트로 슬라이드: 다크 배경 + 로고 + 브랜드"""
from PIL import Image, ImageDraw
img = solid_background(COLOR_DARK)
draw = ImageDraw.Draw(img)
W, H = 1080, 1920
# 골드 수평선 (상단 1/3)
draw.rectangle([60, H//3 - 2, W - 60, H//3], fill=COLOR_GOLD)
# 브랜드명
font_brand = _load_font(cfg.get('font_title_size', 72), bold=True)
font_sub = _load_font(cfg.get('font_body_size', 48))
font_meta = _load_font(cfg.get('font_meta_size', 32))
brand = cfg.get('brand_name', 'The 4th Path')
sub = cfg.get('brand_sub', 'Independent Tech Media')
by_text = cfg.get('brand_by', 'by 22B Labs')
if font_brand:
bw, bh = _text_size(draw, brand, font_brand)
draw.text(((W - bw) // 2, H // 3 + 60), brand, font=font_brand, fill=COLOR_GOLD)
if font_sub:
sw, sh = _text_size(draw, sub, font_sub)
draw.text(((W - sw) // 2, H // 3 + 60 + (bh if font_brand else 72) + 24),
sub, font=font_sub, fill=COLOR_WHITE)
if font_meta:
mw, mh = _text_size(draw, by_text, font_meta)
draw.text(((W - mw) // 2, H * 2 // 3), by_text, font=font_meta, fill=COLOR_GOLD)
path = str(_tmp_slide('intro'))
img.save(path)
return path
def compose_headline_slide(article: dict, cfg: dict, bg_img=None) -> str:
"""헤드라인 슬라이드: DALL-E 배경 + 코너 배지 + 제목 + 날짜"""
from PIL import Image, ImageDraw
corner = article.get('corner', '쉬운세상')
corner_cfg = cfg.get('corners', {}).get(corner, {})
corner_color = _hex_to_rgb(corner_cfg.get('color', '#c8a84e'))
if bg_img is None:
bg_img = solid_background((20, 20, 35))
img = _draw_gradient_overlay(bg_img.copy())
draw = ImageDraw.Draw(img)
W, H = 1080, 1920
font_badge = _load_font(36)
font_title = _load_font(cfg.get('font_title_size', 72), bold=True)
font_meta = _load_font(cfg.get('font_meta_size', 32))
# 코너 배지
_draw_rounded_rect(draw, [60, 120, 60 + len(corner) * 28 + 40, 190], 20, corner_color)
if font_badge:
draw.text((80, 133), corner, font=font_badge, fill=COLOR_WHITE)
# 제목 (최대 3줄)
title = article.get('title', '')
if font_title:
lines = _wrap_text_lines(title, font_title, W - 120, draw)[:3]
y = H // 2 - (len(lines) * 90) // 2
for line in lines:
draw.text((60, y), line, font=font_title, fill=COLOR_WHITE)
y += 90
# 날짜 + 브랜드
meta_text = f"{datetime.now().strftime('%Y.%m.%d')} · 22B Labs"
if font_meta:
draw.text((60, H - 160), meta_text, font=font_meta, fill=COLOR_GOLD)
# 하단 골드 선
draw.rectangle([0, H - 100, W, H - 96], fill=COLOR_GOLD)
path = str(_tmp_slide('headline'))
img.save(path)
return path
def compose_point_slide(point: str, num: int, article: dict, cfg: dict,
bg_img=None) -> str:
"""포인트 슬라이드: 번호 배지 + 핵심 포인트 + 뉴스 티커"""
from PIL import Image, ImageDraw
corner = article.get('corner', '쉬운세상')
corner_cfg = cfg.get('corners', {}).get(corner, {})
corner_color = _hex_to_rgb(corner_cfg.get('color', '#c8a84e'))
if bg_img is None:
bg_img = solid_background((20, 15, 35))
# 배경 어둡게
from PIL import ImageEnhance
img = ImageEnhance.Brightness(bg_img.copy()).enhance(0.4)
draw = ImageDraw.Draw(img)
W, H = 1080, 1920
font_num = _load_font(80, bold=True)
font_point = _load_font(cfg.get('font_body_size', 48))
font_ticker = _load_font(cfg.get('font_ticker_size', 28))
# 번호 원형 배지
badges = ['', '', '']
badge_char = badges[num - 1] if num <= 3 else str(num)
if font_num:
draw.ellipse([60, 160, 200, 300], fill=corner_color)
bw, bh = _text_size(draw, badge_char, font_num)
draw.text((60 + (140 - bw) // 2, 160 + (140 - bh) // 2),
badge_char, font=font_num, fill=COLOR_WHITE)
# 포인트 텍스트
if font_point:
lines = _wrap_text_lines(point, font_point, W - 120, draw)[:4]
y = H // 2 - (len(lines) * 70) // 2
for line in lines:
draw.text((60, y), line, font=font_point, fill=COLOR_WHITE)
y += 70
# 뉴스 티커 바 (하단)
ticker_text = cfg.get('ticker_text', 'The 4th Path · {corner} · {date}')
ticker_text = ticker_text.format(
corner=corner, date=datetime.now().strftime('%Y.%m.%d')
)
draw.rectangle([0, H - 100, W, H], fill=COLOR_BLACK)
if font_ticker:
draw.text((30, H - 78), ticker_text, font=font_ticker, fill=COLOR_GOLD)
path = str(_tmp_slide(f'point{num}'))
img.save(path)
return path
def compose_data_slide(article: dict, cfg: dict) -> str:
"""데이터 카드 슬라이드: 다크 배경 + 수치 카드 2~3개"""
from PIL import Image, ImageDraw
img = solid_background(COLOR_DARK2)
draw = ImageDraw.Draw(img)
W, H = 1080, 1920
font_num = _load_font(100, bold=True)
font_label = _load_font(40)
font_meta = _load_font(30)
# KEY_POINTS에서 수치 추출 시도 (간단 파싱)
key_points = article.get('key_points', [])
import re
data_items = []
for kp in key_points:
nums = re.findall(r'\d[\d,.%억만조]+|\d+[%배x]', kp)
if nums:
data_items.append({'value': nums[0], 'label': kp[:20]})
# 수치가 없으면 포인트를 카드로 표시
if not data_items:
data_items = [{'value': f'0{i+1}', 'label': kp[:20]}
for i, kp in enumerate(key_points[:3])]
# 카드 그리기 (최대 3개)
card_w = 420
card_h = 300
items = data_items[:3]
cols = min(len(items), 2)
x_start = (W - cols * card_w - (cols - 1) * 30) // 2
y_start = H // 2 - card_h // 2 - (len(items) > 2) * (card_h // 2 + 20)
for i, item in enumerate(items):
col = i % cols
row = i // cols
x = x_start + col * (card_w + 30)
y = y_start + row * (card_h + 30)
_draw_rounded_rect(draw, [x, y, x + card_w, y + card_h], 16,
(30, 25, 60))
draw.rectangle([x, y, x + card_w, y + 6], fill=COLOR_GOLD) # 상단 강조선
if font_num:
vw, vh = _text_size(draw, item['value'], font_num)
draw.text((x + (card_w - vw) // 2, y + 60),
item['value'], font=font_num, fill=COLOR_GOLD)
if font_label:
lw, lh = _text_size(draw, item['label'], font_label)
draw.text((x + (card_w - lw) // 2, y + 190),
item['label'], font=font_label, fill=COLOR_WHITE)
# 출처 표시
sources = article.get('sources', [])
if sources and font_meta:
src_title = sources[0].get('title', '')[:40]
draw.text((60, H - 200), f'출처: {src_title}', font=font_meta,
fill=(150, 150, 150))
path = str(_tmp_slide('data'))
img.save(path)
return path
def compose_outro_slide(cfg: dict) -> str:
"""아웃트로 슬라이드: 다크 배경 + CTA + URL"""
from PIL import Image, ImageDraw
img = solid_background(COLOR_DARK)
draw = ImageDraw.Draw(img)
W, H = 1080, 1920
font_brand = _load_font(64, bold=True)
font_cta = _load_font(48)
font_url = _load_font(52, bold=True)
font_sub = _load_font(36)
# 골드 선 장식
draw.rectangle([60, H // 3, W - 60, H // 3 + 4], fill=COLOR_GOLD)
draw.rectangle([60, H * 2 // 3 + 80, W - 60, H * 2 // 3 + 84], fill=COLOR_GOLD)
cta = '더 자세한 내용은'
url = cfg.get('outro_url', 'the4thpath.com')
follow = cfg.get('outro_cta', '팔로우하면 매일 이런 정보를 받습니다')
brand = cfg.get('brand_name', 'The 4th Path')
y = H // 3 + 60
for text, font, color in [
(cta, font_cta, COLOR_WHITE),
(url, font_url, COLOR_GOLD),
('', None, None),
(brand, font_brand, COLOR_WHITE),
(follow, font_sub, (180, 180, 180)),
]:
if not font:
y += 40
continue
tw, th = _text_size(draw, text, font)
draw.text(((W - tw) // 2, y), text, font=font, fill=color)
y += th + 24
path = str(_tmp_slide('outro'))
img.save(path)
return path
# ─── ffmpeg 헬퍼 ──────────────────────────────────────
def _run_ffmpeg(args: list, quiet: bool = False) -> bool:
cmd = [FFMPEG, '-y'] + args
if quiet:
cmd = [FFMPEG, '-y', '-loglevel', 'error'] + args
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode != 0:
logger.error(f"ffmpeg 오류: {result.stderr[-400:]}")
return result.returncode == 0
def _check_ffmpeg() -> bool:
try:
r = subprocess.run([FFMPEG, '-version'], capture_output=True, timeout=5)
return r.returncode == 0
except Exception:
return False
def make_clip(slide_png: str, audio_wav: str, output_mp4: str) -> float:
"""
슬라이드 PNG + 오디오 WAV → MP4 클립 (Ken Burns zoompan).
Returns: 클립 실제 길이(초)
"""
duration = get_audio_duration(audio_wav) + 0.3 # 약간 여유
ok = _run_ffmpeg([
'-loop', '1', '-i', slide_png,
'-i', audio_wav,
'-c:v', 'libx264', '-tune', 'stillimage',
'-c:a', 'aac', '-b:a', '192k',
'-pix_fmt', 'yuv420p',
'-vf', (
'scale=1080:1920,'
'zoompan=z=\'min(zoom+0.0003,1.05)\':'
'x=\'iw/2-(iw/zoom/2)\':'
'y=\'ih/2-(ih/zoom/2)\':'
'd=1:s=1080x1920:fps=30'
),
'-shortest',
'-r', '30',
output_mp4,
], quiet=True)
return duration if ok else 0.0
def concat_clips_xfade(clips: list[dict], output_mp4: str,
transition: str = 'fade', trans_dur: float = 0.5) -> bool:
"""
여러 클립을 xfade 전환으로 결합.
clips: [{'video': path, 'audio': path, 'duration': float}, ...]
"""
if len(clips) < 2:
return _run_ffmpeg(['-i', clips[0]['mp4'], '-c', 'copy', output_mp4])
# xfade filter_complex 구성
n = len(clips)
inputs = []
for c in clips:
inputs += ['-i', c['mp4']]
# 비디오 xfade 체인
filter_parts = []
offset = 0.0
prev_v = '[0:v]'
prev_a = '[0:a]'
for i in range(1, n):
offset = sum(c['duration'] for c in clips[:i]) - trans_dur * i
out_v = f'[f{i}v]' if i < n - 1 else '[video]'
out_a = f'[f{i}a]' if i < n - 1 else '[audio]'
filter_parts.append(
f'{prev_v}[{i}:v]xfade=transition={transition}:'
f'duration={trans_dur}:offset={offset:.3f}{out_v}'
)
filter_parts.append(
f'{prev_a}[{i}:a]acrossfade=d={trans_dur}{out_a}'
)
prev_v = out_v
prev_a = out_a
filter_complex = '; '.join(filter_parts)
ok = _run_ffmpeg(
inputs + [
'-filter_complex', filter_complex,
'-map', '[video]', '-map', '[audio]',
'-c:v', 'libx264', '-c:a', 'aac',
'-pix_fmt', 'yuv420p',
output_mp4,
]
)
return ok
def mix_bgm(video_mp4: str, bgm_path: str, output_mp4: str,
volume: float = 0.08) -> bool:
"""BGM을 낮은 볼륨으로 믹스"""
if not Path(bgm_path).exists():
logger.warning(f"BGM 파일 없음 ({bgm_path}) — BGM 없이 진행")
import shutil
shutil.copy2(video_mp4, output_mp4)
return True
return _run_ffmpeg([
'-i', video_mp4,
'-i', bgm_path,
'-filter_complex',
f'[1:a]volume={volume}[bgm];[0:a][bgm]amix=inputs=2:duration=first[a]',
'-map', '0:v', '-map', '[a]',
'-c:v', 'copy', '-c:a', 'aac',
'-shortest',
output_mp4,
])
def burn_subtitles(video_mp4: str, srt_path: str, output_mp4: str) -> bool:
"""SRT 자막 burn-in"""
font_name = 'NanumGothic'
# Windows 맑은고딕 폰트명 확인
for fname in ['NotoSansKR-Regular.ttf', 'malgun.ttf']:
fp = FONTS_DIR / fname
if not fp.exists():
fp = Path(f'C:/Windows/Fonts/{fname}')
if fp.exists():
font_name = fp.stem
break
style = (
f'FontName={font_name},'
'FontSize=22,'
'PrimaryColour=&H00FFFFFF,'
'OutlineColour=&H80000000,'
'BorderStyle=4,'
'BackColour=&H80000000,'
'Outline=0,Shadow=0,'
'MarginV=120,'
'Alignment=2,'
'Bold=1'
)
# srt 경로에서 역슬래시 → 슬래시 (ffmpeg 호환)
srt_esc = str(srt_path).replace('\\', '/').replace(':', '\\:')
return _run_ffmpeg([
'-i', video_mp4,
'-vf', f'subtitles={srt_esc}:force_style=\'{style}\'',
'-c:v', 'libx264', '-c:a', 'copy',
output_mp4,
])
# ─── SRT 생성 ─────────────────────────────────────────
def build_srt(script_sections: list[dict]) -> str:
"""
섹션별 자막 생성.
script_sections: [{'text': str, 'start': float, 'duration': float}, ...]
"""
lines = []
for i, section in enumerate(script_sections, 1):
start = section['start']
end = start + section['duration']
# 문장을 2줄로 분할
text = section['text']
mid = len(text) // 2
if len(text) > 30:
space = text.rfind(' ', 0, mid)
if space > 0:
text = text[:space] + '\n' + text[space+1:]
lines += [str(i), f'{_sec_to_srt(start)} --> {_sec_to_srt(end)}', text, '']
return '\n'.join(lines)
def _sec_to_srt(s: float) -> str:
h, rem = divmod(int(s), 3600)
m, sec = divmod(rem, 60)
ms = int((s - int(s)) * 1000)
return f'{h:02d}:{m:02d}:{sec:02d},{ms:03d}'
# ─── 임시 파일 경로 ────────────────────────────────────
_tmp_dir: Optional[Path] = None
def _set_tmp_dir(d: Path):
global _tmp_dir
_tmp_dir = d
def _tmp_slide(name: str) -> Path:
return _tmp_dir / f'slide_{name}.png'
def _tmp_wav(name: str) -> Path:
return _tmp_dir / f'tts_{name}.wav'
def _tmp_clip(name: str) -> Path:
return _tmp_dir / f'clip_{name}.mp4'
# ─── 메인 클래스 ──────────────────────────────────────
class ShortsConverter:
"""
뉴스앵커 포맷 쇼츠 변환기.
사용:
sc = ShortsConverter()
mp4_path = sc.generate(article)
"""
def __init__(self):
self.cfg = _load_template()
def generate(self, article: dict) -> str:
"""메인 파이프라인. Returns: 최종 MP4 경로 또는 ''"""
import tempfile
if not _check_ffmpeg():
logger.error("ffmpeg 없음. PATH 또는 FFMPEG_PATH 확인")
return ''
key_points = article.get('key_points', [])
if not key_points:
logger.warning("KEY_POINTS 없음 — 쇼츠 생성 불가")
return ''
title = article.get('title', '')
corner = article.get('corner', '쉬운세상')
slug = article.get('slug', 'article')
date_str = datetime.now().strftime('%Y%m%d')
corner_cfg = self.cfg.get('corners', {}).get(corner, {})
tts_speed = corner_cfg.get('tts_speed', self.cfg.get('tts_speaking_rate_default', 1.05))
transition = corner_cfg.get('transition', 'fade')
trans_dur = self.cfg.get('transition_duration', 0.5)
voice = self.cfg.get('tts_voice_ko', 'ko-KR-Wavenet-A')
is_oncut = corner == '한컷'
force_data = corner_cfg.get('force_data_card', False)
logger.info(f"쇼츠 변환 시작: {title} / {corner}")
with tempfile.TemporaryDirectory() as tmp:
_set_tmp_dir(Path(tmp))
# ── 1. DALL-E 배경 생성 ─────────────────
bg_prompt = corner_cfg.get('bg_prompt_style')
bg_img = generate_background_dalle(bg_prompt, corner) if bg_prompt else None
# ── 2. TTS 스크립트 구성 ────────────────
title_short = title[:40] + ('...' if len(title) > 40 else '')
scripts = {
'intro': f'오늘은 {title_short}에 대해 알아보겠습니다.',
'headline': f'{title_short}',
}
for i, kp in enumerate(key_points[:3], 1):
scripts[f'point{i}'] = kp
if force_data or (not is_oncut and len(key_points) > 2):
scripts['data'] = '관련 데이터를 확인해보겠습니다.'
scripts['outro'] = (
f'자세한 내용은 {self.cfg.get("outro_url","the4thpath.com")}에서 확인하세요. '
'팔로우 부탁드립니다.'
)
# ── 3. 슬라이드 합성 ────────────────────
slides = {
'intro': compose_intro_slide(self.cfg),
'headline': compose_headline_slide(article, self.cfg, bg_img),
}
for i, kp in enumerate(key_points[:3], 1):
slides[f'point{i}'] = compose_point_slide(kp, i, article, self.cfg, bg_img)
if 'data' in scripts:
slides['data'] = compose_data_slide(article, self.cfg)
slides['outro'] = compose_outro_slide(self.cfg)
# ── 4. TTS 합성 + 클립 생성 ──────────────
clips = []
for key in scripts:
wav_path = str(_tmp_wav(key))
clip_path = str(_tmp_clip(key))
slide_path = slides.get(key)
if not slide_path or not Path(slide_path).exists():
continue
ok = synthesize_section(scripts[key], wav_path, voice, tts_speed)
if not ok:
logger.warning(f"TTS 실패: {key} — 슬라이드만 사용")
# 무음 WAV 생성 (2초)
_run_ffmpeg(['-f', 'lavfi', '-i', 'anullsrc=r=24000:cl=mono',
'-t', '2', wav_path], quiet=True)
dur = make_clip(slide_path, wav_path, clip_path)
if dur > 0:
clips.append({'mp4': clip_path, 'duration': dur})
if not clips:
logger.error("생성된 클립 없음")
return ''
# ── 5. 클립 결합 (xfade) ─────────────────
merged = str(Path(tmp) / 'merged.mp4')
if len(clips) == 1:
import shutil
shutil.copy2(clips[0]['mp4'], merged)
else:
if not concat_clips_xfade(clips, merged, transition, trans_dur):
logger.error("클립 결합 실패")
return ''
# ── 6. BGM 믹스 ──────────────────────────
with_bgm = str(Path(tmp) / 'with_bgm.mp4')
mix_bgm(merged, str(BGM_PATH), with_bgm, self.cfg.get('bgm_volume', 0.08))
source_for_srt = with_bgm if Path(with_bgm).exists() else merged
# ── 7. SRT 자막 생성 ─────────────────────
srt_sections = []
t = 0.0
for clip_data in clips:
srt_sections.append({'text': '', 'start': t, 'duration': clip_data['duration']})
t += clip_data['duration'] - trans_dur
# 섹션별 텍스트 채우기
keys = list(scripts.keys())
for i, section in enumerate(srt_sections):
if i < len(keys):
section['text'] = scripts[keys[i]]
srt_content = build_srt([s for s in srt_sections if s['text']])
srt_path = str(Path(tmp) / 'subtitles.srt')
Path(srt_path).write_text(srt_content, encoding='utf-8-sig')
# ── 8. 자막 burn-in ───────────────────────
output_path = str(OUTPUT_DIR / f'{date_str}_{slug}_shorts.mp4')
if not burn_subtitles(source_for_srt, srt_path, output_path):
# 자막 실패 시 자막 없는 버전으로
import shutil
shutil.copy2(source_for_srt, output_path)
logger.info(f"쇼츠 생성 완료: {output_path}")
return output_path
# ─── 모듈 레벨 진입점 (scheduler 호환) ────────────────
def convert(article: dict, card_path: str = '', save_file: bool = True) -> str:
"""
scheduler.py/_run_conversion_pipeline()에서 호출하는 진입점.
card_path: 사용하지 않음 (이전 버전 호환 파라미터)
"""
sc = ShortsConverter()
return sc.generate(article)
if __name__ == '__main__':
sample = {
'title': 'ChatGPT 처음 쓰는 사람을 위한 완전 가이드',
'slug': 'chatgpt-shorts-test',
'corner': '쉬운세상',
'key_points': [
'무료로 바로 시작할 수 있다',
'GPT-3.5도 일반 용도엔 충분하다',
'프롬프트의 질이 결과를 결정한다',
],
'sources': [{'title': 'OpenAI 공식 블로그', 'url': 'https://openai.com'}],
}
sc = ShortsConverter()
path = sc.generate(sample)
print(f'완료: {path}')
+152
View File
@@ -0,0 +1,152 @@
"""
X 스레드 변환봇 (converters/thread_converter.py)
역할: 원본 마크다운 → X(트위터) 스레드 JSON (LAYER 2)
- TITLE + KEY_POINTS → 280자 트윗 3-5개로 분할
- 첫 트윗: 흥미 유발 + 코너 해시태그
- 중간 트윗: 핵심 포인트
- 마지막 트윗: 블로그 링크 + CTA
출력: data/outputs/{date}_{slug}_thread.json
"""
import json
import logging
import textwrap
from datetime import datetime
from pathlib import Path
BASE_DIR = Path(__file__).parent.parent.parent
LOG_DIR = BASE_DIR / 'logs'
OUTPUT_DIR = BASE_DIR / 'data' / 'outputs'
OUTPUT_DIR.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(LOG_DIR / 'converter.log', encoding='utf-8'),
logging.StreamHandler(),
]
)
logger = logging.getLogger(__name__)
BLOG_BASE_URL = 'https://the4thpath.com'
TWEET_MAX = 280
CORNER_HASHTAGS = {
'쉬운세상': '#쉬운세상 #AI활용 #디지털라이프',
'숨은보물': '#숨은보물 #AI도구 #생산성',
'바이브리포트': '#바이브리포트 #트렌드 #AI시대',
'팩트체크': '#팩트체크 #AI뉴스',
'한컷': '#한컷 #AI만평',
}
BRAND_TAG = '#The4thPath'
def _split_to_tweet(text: str, max_len: int = TWEET_MAX) -> list[str]:
"""텍스트를 280자 단위로 자연스럽게 분할"""
if len(text) <= max_len:
return [text]
tweets = []
sentences = text.replace('. ', '.\n').replace('다. ', '다.\n').split('\n')
current = ''
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
test = (current + ' ' + sentence).strip() if current else sentence
if len(test) <= max_len:
current = test
else:
if current:
tweets.append(current)
# 문장 자체가 너무 길면 강제 분할
if len(sentence) > max_len:
chunks = textwrap.wrap(sentence, max_len - 5)
tweets.extend(chunks[:-1])
current = chunks[-1] if chunks else ''
else:
current = sentence
if current:
tweets.append(current)
return tweets or [text[:max_len]]
def convert(article: dict, blog_url: str = '', save_file: bool = True) -> list[dict]:
"""
article dict → X 스레드 트윗 리스트.
각 트윗: {'order': int, 'text': str, 'char_count': int}
"""
title = article.get('title', '')
corner = article.get('corner', '')
key_points = article.get('key_points', [])
tags = article.get('tags', [])
slug = article.get('slug', 'article')
logger.info(f"스레드 변환 시작: {title}")
hashtags = CORNER_HASHTAGS.get(corner, '')
tag_str = ' '.join(f'#{t}' for t in tags[:3] if t)
if tag_str:
hashtags = hashtags + ' ' + tag_str
tweets = []
# 트윗 1: 흥미 유발 + 제목 + 코너 해시태그
intro_text = f"👀 {title}\n\n{hashtags} {BRAND_TAG}"
if len(intro_text) <= TWEET_MAX:
tweets.append(intro_text)
else:
short_title = textwrap.shorten(title, width=100, placeholder='...')
tweets.append(f"👀 {short_title}\n\n{hashtags}")
# 트윗 2-4: 핵심 포인트
for i, point in enumerate(key_points[:3], 1):
bullets = ['', '', '']
bullet = bullets[i - 1] if i <= 3 else f'{i}.'
tweet_text = f"{bullet} {point}"
if len(tweet_text) <= TWEET_MAX:
tweets.append(tweet_text)
else:
split_tweets = _split_to_tweet(tweet_text)
tweets.extend(split_tweets)
# 마지막 트윗: CTA + 블로그 링크
post_url = blog_url or f"{BLOG_BASE_URL}/{slug}"
cta_text = f"전체 내용 보기 👇\n{post_url}\n\n{BRAND_TAG}"
tweets.append(cta_text)
result = [
{'order': i + 1, 'text': t, 'char_count': len(t)}
for i, t in enumerate(tweets)
]
if save_file:
date_str = datetime.now().strftime('%Y%m%d')
filename = f"{date_str}_{slug}_thread.json"
output_path = OUTPUT_DIR / filename
output_path.write_text(
json.dumps(result, ensure_ascii=False, indent=2), encoding='utf-8'
)
logger.info(f"스레드 저장: {output_path} ({len(result)}개 트윗)")
logger.info("스레드 변환 완료")
return result
if __name__ == '__main__':
sample = {
'title': 'ChatGPT 처음 쓰는 사람을 위한 완전 가이드',
'slug': 'chatgpt-guide',
'corner': '쉬운세상',
'tags': ['ChatGPT', 'AI', '가이드'],
'key_points': [
'무료로 바로 시작할 수 있다 — chat.openai.com 접속',
'GPT-4는 유료지만 GPT-3.5도 일반 용도엔 충분하다',
'프롬프트의 질이 답변의 질을 결정한다',
],
}
threads = convert(sample)
for t in threads:
print(f"[{t['order']}] ({t['char_count']}자) {t['text']}")
print()
View File
+224
View File
@@ -0,0 +1,224 @@
"""
이미지 호스팅 헬퍼 (distributors/image_host.py)
역할: 로컬 카드 이미지 → 공개 URL 변환
Instagram Graph API는 공개 URL이 필요하므로
카드 이미지를 외부에서 접근 가능한 URL로 변환한다.
지원 방식:
1. ImgBB (무료 API, 키 필요) ← IMGBB_API_KEY 설정 시
2. Blogger 미디어 업로드 (기존 OAuth) ← 기본값 (추가 비용 없음)
3. 로컬 HTTP 서버 (개발/테스트용) ← LOCAL_IMAGE_SERVER=true 시
"""
import base64
import json
import logging
import os
from pathlib import Path
import requests
from dotenv import load_dotenv
load_dotenv()
BASE_DIR = Path(__file__).parent.parent.parent
LOG_DIR = BASE_DIR / 'logs'
LOG_DIR.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(LOG_DIR / 'distributor.log', encoding='utf-8'),
logging.StreamHandler(),
]
)
logger = logging.getLogger(__name__)
IMGBB_API_KEY = os.getenv('IMGBB_API_KEY', '')
IMGBB_API_URL = 'https://api.imgbb.com/v1/upload'
# ─── 방식 1: ImgBB ────────────────────────────────────
def upload_to_imgbb(image_path: str, expiration: int = 0) -> str:
"""
ImgBB에 이미지 업로드.
expiration: 0=영구, 초 단위 만료 시간 (예: 86400=1일)
Returns: 공개 URL 또는 ''
"""
if not IMGBB_API_KEY:
logger.debug("IMGBB_API_KEY 없음 — ImgBB 건너뜀")
return ''
try:
with open(image_path, 'rb') as f:
image_data = base64.b64encode(f.read()).decode('utf-8')
payload = {
'key': IMGBB_API_KEY,
'image': image_data,
}
if expiration > 0:
payload['expiration'] = expiration
resp = requests.post(IMGBB_API_URL, data=payload, timeout=30)
resp.raise_for_status()
result = resp.json()
if result.get('success'):
url = result['data']['url']
logger.info(f"ImgBB 업로드 완료: {url}")
return url
else:
logger.warning(f"ImgBB 오류: {result.get('error', {})}")
return ''
except Exception as e:
logger.error(f"ImgBB 업로드 실패: {e}")
return ''
# ─── 방식 2: Blogger 미디어 업로드 ───────────────────
def upload_to_blogger(image_path: str) -> str:
"""
Blogger에 이미지를 첨부파일로 업로드 후 공개 URL 반환.
기존 Google OAuth (token.json) 재사용.
Returns: 공개 URL 또는 ''
"""
try:
import sys
sys.path.insert(0, str(BASE_DIR / 'bots'))
from publisher_bot import get_google_credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
blog_id = os.getenv('BLOG_MAIN_ID', '')
if not blog_id:
logger.warning("BLOG_MAIN_ID 없음")
return ''
creds = get_google_credentials()
service = build('blogger', 'v3', credentials=creds)
# Blogger API: 미디어 업로드 (pages나 posts에 이미지 첨부)
# 참고: Blogger는 직접 미디어 API가 없으므로 임시 draft 포스트로 업로드
media = MediaFileUpload(image_path, mimetype='image/png', resumable=True)
# 임시 draft 포스트에 이미지 삽입 → URL 추출 → 포스트 삭제
img_data = open(image_path, 'rb').read()
img_b64 = base64.b64encode(img_data).decode()
img_html = f'<img src="data:image/png;base64,{img_b64}" />'
draft = service.posts().insert(
blogId=blog_id,
body={'title': '__img_upload__', 'content': img_html},
isDraft=True,
).execute()
post_url = draft.get('url', '')
post_id = draft.get('id', '')
# draft 삭제
if post_id:
service.posts().delete(blogId=blog_id, postId=post_id).execute()
# base64 embedded는 직접 URL이 아니므로 ImgBB fallback 필요
# Blogger는 외부 이미지 호스팅 역할을 하지 않음
# → 실제 운영 시 ImgBB 또는 CDN 사용 권장
logger.warning("Blogger 미디어 업로드: base64 방식은 인스타 공개 URL로 부적합. ImgBB 권장.")
return ''
except Exception as e:
logger.error(f"Blogger 업로드 실패: {e}")
return ''
# ─── 방식 3: 로컬 HTTP 서버 (개발용) ─────────────────
_local_server = None
def start_local_server(port: int = 8765) -> str:
"""
로컬 HTTP 파일 서버 시작 (개발/테스트용).
Returns: base URL (예: http://192.168.1.100:8765)
"""
import socket
import threading
import http.server
import functools
global _local_server
if _local_server:
return _local_server
outputs_dir = str(BASE_DIR / 'data' / 'outputs')
handler = functools.partial(
http.server.SimpleHTTPRequestHandler, directory=outputs_dir
)
server = http.server.HTTPServer(('0.0.0.0', port), handler)
def run():
server.serve_forever()
thread = threading.Thread(target=run, daemon=True)
thread.start()
# 로컬 IP 확인
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
local_ip = s.getsockname()[0]
s.close()
except Exception:
local_ip = '127.0.0.1'
base_url = f'http://{local_ip}:{port}'
_local_server = base_url
logger.info(f"로컬 이미지 서버 시작: {base_url}")
return base_url
def get_local_url(image_path: str, port: int = 8765) -> str:
"""로컬 서버 URL 반환 (개발/ngrok 사용 시)"""
base_url = start_local_server(port)
filename = Path(image_path).name
return f'{base_url}/{filename}'
# ─── 메인 함수 ───────────────────────────────────────
def get_public_url(image_path: str) -> str:
"""
이미지 파일 → 공개 URL 반환.
우선순위: ImgBB → 로컬 서버(개발용)
"""
if not Path(image_path).exists():
logger.error(f"이미지 파일 없음: {image_path}")
return ''
# 1. ImgBB (API 키 있을 때)
url = upload_to_imgbb(image_path, expiration=86400 * 7) # 7일
if url:
return url
# 2. 로컬 HTTP 서버 (ngrok 또는 내부망 테스트용)
if os.getenv('LOCAL_IMAGE_SERVER', '').lower() == 'true':
url = get_local_url(image_path)
logger.warning(f"로컬 서버 URL 사용 (인터넷 접근 필요): {url}")
return url
logger.warning(
"공개 URL 생성 불가. .env에 IMGBB_API_KEY를 설정하거나 "
"LOCAL_IMAGE_SERVER=true로 설정하세요."
)
return ''
if __name__ == '__main__':
import sys
if len(sys.argv) > 1:
url = get_public_url(sys.argv[1])
print(f"공개 URL: {url}")
else:
print("사용법: python image_host.py <이미지경로>")
+207
View File
@@ -0,0 +1,207 @@
"""
인스타그램 배포봇 (distributors/instagram_bot.py)
역할: 카드 이미지 → Instagram Graph API 업로드 (LAYER 3)
- 피드 포스트: 카드 이미지 업로드
- 릴스: 쇼츠 영상 업로드 (Phase 2)
- 캡션: KEY_POINTS + 해시태그 + 블로그 링크(프로필)
사전 조건:
- Facebook Page + Instagram Business 계정 연결
- Instagram Graph API 앱 등록
- .env: INSTAGRAM_ACCESS_TOKEN, INSTAGRAM_ACCOUNT_ID
"""
import json
import logging
import os
import time
from pathlib import Path
import requests
from dotenv import load_dotenv
load_dotenv()
BASE_DIR = Path(__file__).parent.parent.parent
LOG_DIR = BASE_DIR / 'logs'
LOG_DIR.mkdir(exist_ok=True)
DATA_DIR = BASE_DIR / 'data'
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(LOG_DIR / 'distributor.log', encoding='utf-8'),
logging.StreamHandler(),
]
)
logger = logging.getLogger(__name__)
INSTAGRAM_ACCESS_TOKEN = os.getenv('INSTAGRAM_ACCESS_TOKEN', '')
INSTAGRAM_ACCOUNT_ID = os.getenv('INSTAGRAM_ACCOUNT_ID', '')
GRAPH_API_BASE = 'https://graph.facebook.com/v19.0'
BLOG_URL = 'https://the4thpath.com'
BRAND_TAG = '#The4thPath #테크인사이더 #22BLabs'
def _check_credentials() -> bool:
if not INSTAGRAM_ACCESS_TOKEN or not INSTAGRAM_ACCOUNT_ID:
logger.warning("Instagram 자격증명 없음 (.env: INSTAGRAM_ACCESS_TOKEN, INSTAGRAM_ACCOUNT_ID)")
return False
return True
def build_caption(article: dict) -> str:
"""인스타그램 캡션 생성"""
title = article.get('title', '')
corner = article.get('corner', '')
key_points = article.get('key_points', [])
tags = article.get('tags', [])
lines = [f"{title}", ""]
if key_points:
for point in key_points[:3]:
lines.append(f"{point}")
lines.append("")
lines.append(f"전체 내용: 프로필 링크 🔗")
lines.append("")
hashtags = [f'#{corner.replace(" ", "")}'] if corner else []
hashtags += [f'#{t}' for t in tags[:5] if t]
hashtags.append(BRAND_TAG)
lines.append(' '.join(hashtags))
return '\n'.join(lines)
def upload_image_container(image_url: str, caption: str) -> str:
"""
인스타 이미지 컨테이너 생성.
image_url: 공개 접근 가능한 이미지 URL (Instagram이 직접 다운로드)
Returns: container_id
"""
if not _check_credentials():
return ''
url = f"{GRAPH_API_BASE}/{INSTAGRAM_ACCOUNT_ID}/media"
params = {
'image_url': image_url,
'caption': caption,
'access_token': INSTAGRAM_ACCESS_TOKEN,
}
try:
resp = requests.post(url, data=params, timeout=30)
resp.raise_for_status()
container_id = resp.json().get('id', '')
logger.info(f"이미지 컨테이너 생성: {container_id}")
return container_id
except Exception as e:
logger.error(f"Instagram 컨테이너 생성 실패: {e}")
return ''
def publish_container(container_id: str) -> str:
"""컨테이너 → 실제 발행. Returns: post_id"""
if not _check_credentials() or not container_id:
return ''
# 컨테이너 준비 대기 (최대 60초)
status_url = f"{GRAPH_API_BASE}/{container_id}"
for _ in range(12):
try:
status_resp = requests.get(
status_url,
params={'fields': 'status_code', 'access_token': INSTAGRAM_ACCESS_TOKEN},
timeout=10
)
status = status_resp.json().get('status_code', '')
if status == 'FINISHED':
break
if status in ('ERROR', 'EXPIRED'):
logger.error(f"컨테이너 오류: {status}")
return ''
except Exception:
pass
time.sleep(5)
# 발행
publish_url = f"{GRAPH_API_BASE}/{INSTAGRAM_ACCOUNT_ID}/media_publish"
params = {
'creation_id': container_id,
'access_token': INSTAGRAM_ACCESS_TOKEN,
}
try:
resp = requests.post(publish_url, data=params, timeout=30)
resp.raise_for_status()
post_id = resp.json().get('id', '')
logger.info(f"Instagram 발행 완료: {post_id}")
return post_id
except Exception as e:
logger.error(f"Instagram 발행 실패: {e}")
return ''
def publish_card(article: dict, image_path_or_url: str) -> bool:
"""
카드 이미지를 인스타그램 피드에 게시.
image_path_or_url: 로컬 파일 경로 또는 공개 URL
- 로컬 경로인 경우 image_host.py로 공개 URL 변환
- http/https URL인 경우 그대로 사용
"""
if not _check_credentials():
logger.info("Instagram 미설정 — 발행 건너뜀")
return False
logger.info(f"Instagram 발행 시작: {article.get('title', '')}")
# 로컬 경로 → 공개 URL 변환
image_url = image_path_or_url
if not image_path_or_url.startswith('http'):
import sys
sys.path.insert(0, str(Path(__file__).parent))
from image_host import get_public_url
image_url = get_public_url(image_path_or_url)
if not image_url:
logger.error("공개 URL 변환 실패 — .env에 IMGBB_API_KEY 설정 필요")
return False
caption = build_caption(article)
container_id = upload_image_container(image_url, caption)
if not container_id:
return False
post_id = publish_container(container_id)
if not post_id:
return False
_log_published(article, post_id, 'instagram_card')
return True
def _log_published(article: dict, post_id: str, platform: str):
"""플랫폼별 발행 이력 저장"""
pub_dir = DATA_DIR / 'published'
pub_dir.mkdir(exist_ok=True)
from datetime import datetime
record = {
'platform': platform,
'post_id': post_id,
'title': article.get('title', ''),
'corner': article.get('corner', ''),
'published_at': datetime.now().isoformat(),
}
filename = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{platform}_{post_id}.json"
with open(pub_dir / filename, 'w', encoding='utf-8') as f:
json.dump(record, f, ensure_ascii=False, indent=2)
if __name__ == '__main__':
# 테스트 (실제 API 없이 출력 확인)
sample = {
'title': '테스트 글',
'corner': '쉬운세상',
'key_points': ['포인트 1', '포인트 2'],
'tags': ['AI', '테스트'],
}
print(build_caption(sample))
+214
View File
@@ -0,0 +1,214 @@
"""
틱톡 배포봇 (distributors/tiktok_bot.py)
역할: 쇼츠 MP4 → TikTok Content Posting API 업로드 (LAYER 3)
Phase 2.
사전 조건:
- TikTok Developer 계정 + 앱 등록 (Content Posting API 승인)
- .env: TIKTOK_ACCESS_TOKEN, TIKTOK_OPEN_ID
"""
import json
import logging
import os
import time
from pathlib import Path
import requests
from dotenv import load_dotenv
load_dotenv()
BASE_DIR = Path(__file__).parent.parent.parent
LOG_DIR = BASE_DIR / 'logs'
LOG_DIR.mkdir(exist_ok=True)
DATA_DIR = BASE_DIR / 'data'
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(LOG_DIR / 'distributor.log', encoding='utf-8'),
logging.StreamHandler(),
]
)
logger = logging.getLogger(__name__)
TIKTOK_ACCESS_TOKEN = os.getenv('TIKTOK_ACCESS_TOKEN', '')
TIKTOK_OPEN_ID = os.getenv('TIKTOK_OPEN_ID', '')
TIKTOK_API_BASE = 'https://open.tiktokapis.com/v2'
CORNER_HASHTAGS = {
'쉬운세상': ['쉬운세상', 'AI활용', '디지털라이프', 'The4thPath'],
'숨은보물': ['숨은보물', 'AI도구', '생산성', 'The4thPath'],
'바이브리포트': ['바이브리포트', '트렌드', 'AI시대', 'The4thPath'],
'팩트체크': ['팩트체크', 'AI뉴스', 'The4thPath'],
'한컷': ['한컷만평', 'AI시사', 'The4thPath'],
}
def _check_credentials() -> bool:
if not TIKTOK_ACCESS_TOKEN:
logger.warning("TIKTOK_ACCESS_TOKEN 없음")
return False
return True
def _get_headers() -> dict:
return {
'Authorization': f'Bearer {TIKTOK_ACCESS_TOKEN}',
'Content-Type': 'application/json; charset=UTF-8',
}
def build_caption(article: dict) -> str:
"""틱톡 캡션 생성 (제목 + 핵심 1줄 + 해시태그)"""
title = article.get('title', '')
key_points = article.get('key_points', [])
corner = article.get('corner', '')
caption_parts = [title]
if key_points:
caption_parts.append(key_points[0])
hashtags = CORNER_HASHTAGS.get(corner, ['The4thPath'])
tag_str = ' '.join(f'#{t}' for t in hashtags)
caption_parts.append(tag_str)
return '\n'.join(caption_parts)
def init_upload(video_size: int, video_duration: float) -> tuple[str, str]:
"""
TikTok 업로드 초기화 (Direct Post).
Returns: (upload_url, publish_id)
"""
url = f'{TIKTOK_API_BASE}/post/publish/video/init/'
payload = {
'post_info': {
'title': '', # 영상에서 추출되므로 빈칸 가능
'privacy_level': 'PUBLIC_TO_EVERYONE',
'disable_duet': False,
'disable_comment': False,
'disable_stitch': False,
},
'source_info': {
'source': 'FILE_UPLOAD',
'video_size': video_size,
'chunk_size': min(video_size, 64 * 1024 * 1024), # 64MB
'total_chunk_count': 1,
},
}
try:
resp = requests.post(url, json=payload, headers=_get_headers(), timeout=30)
resp.raise_for_status()
data = resp.json().get('data', {})
upload_url = data.get('upload_url', '')
publish_id = data.get('publish_id', '')
logger.info(f"TikTok 업로드 초기화: publish_id={publish_id}")
return upload_url, publish_id
except Exception as e:
logger.error(f"TikTok 업로드 초기화 실패: {e}")
return '', ''
def upload_chunk(upload_url: str, video_path: str, video_size: int) -> bool:
"""동영상 업로드"""
try:
with open(video_path, 'rb') as f:
video_data = f.read()
headers = {
'Content-Range': f'bytes 0-{video_size-1}/{video_size}',
'Content-Length': str(video_size),
'Content-Type': 'video/mp4',
}
resp = requests.put(upload_url, data=video_data, headers=headers, timeout=300)
if resp.status_code in (200, 201, 206):
logger.info("TikTok 동영상 업로드 완료")
return True
logger.error(f"TikTok 업로드 HTTP {resp.status_code}: {resp.text[:200]}")
return False
except Exception as e:
logger.error(f"TikTok 업로드 실패: {e}")
return False
def check_publish_status(publish_id: str, max_wait: int = 120) -> bool:
"""발행 상태 확인 (최대 max_wait초 대기)"""
url = f'{TIKTOK_API_BASE}/post/publish/status/fetch/'
payload = {'publish_id': publish_id}
for _ in range(max_wait // 5):
try:
resp = requests.post(url, json=payload, headers=_get_headers(), timeout=10)
resp.raise_for_status()
status = resp.json().get('data', {}).get('status', '')
if status == 'PUBLISH_COMPLETE':
logger.info("TikTok 발행 완료")
return True
if status in ('FAILED', 'CANCELED'):
logger.error(f"TikTok 발행 실패: {status}")
return False
except Exception as e:
logger.warning(f"상태 확인 오류: {e}")
time.sleep(5)
logger.warning("TikTok 발행 상태 확인 시간 초과")
return False
def publish_shorts(article: dict, video_path: str) -> bool:
"""
쇼츠 MP4 → TikTok 업로드.
video_path: shorts_converter.convert()가 생성한 MP4
"""
if not _check_credentials():
logger.info("TikTok 미설정 — 발행 건너뜀")
return False
if not Path(video_path).exists():
logger.error(f"영상 파일 없음: {video_path}")
return False
title = article.get('title', '')
logger.info(f"TikTok 발행 시작: {title}")
video_size = Path(video_path).stat().st_size
# 업로드 초기화
upload_url, publish_id = init_upload(video_size, 30.0)
if not upload_url or not publish_id:
return False
# 동영상 업로드
if not upload_chunk(upload_url, video_path, video_size):
return False
# 발행 상태 확인
if not check_publish_status(publish_id):
return False
_log_published(article, publish_id, 'tiktok')
return True
def _log_published(article: dict, post_id: str, platform: str):
pub_dir = DATA_DIR / 'published'
pub_dir.mkdir(exist_ok=True)
from datetime import datetime
record = {
'platform': platform,
'post_id': post_id,
'title': article.get('title', ''),
'corner': article.get('corner', ''),
'published_at': datetime.now().isoformat(),
}
filename = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{platform}_{post_id}.json"
with open(pub_dir / filename, 'w', encoding='utf-8') as f:
json.dump(record, f, ensure_ascii=False, indent=2)
if __name__ == '__main__':
sample = {
'title': '테스트 글',
'corner': '쉬운세상',
'key_points': ['포인트 1'],
}
print(build_caption(sample))
+149
View File
@@ -0,0 +1,149 @@
"""
X(트위터) 배포봇 (distributors/x_bot.py)
역할: X 스레드 JSON → X API v2로 순차 트윗 게시 (LAYER 3)
사전 조건:
- X Developer 계정 + 앱 등록
- .env: X_API_KEY, X_API_SECRET, X_ACCESS_TOKEN, X_ACCESS_SECRET
"""
import json
import logging
import os
import time
from pathlib import Path
import requests
from dotenv import load_dotenv
from requests_oauthlib import OAuth1
load_dotenv()
BASE_DIR = Path(__file__).parent.parent.parent
LOG_DIR = BASE_DIR / 'logs'
LOG_DIR.mkdir(exist_ok=True)
DATA_DIR = BASE_DIR / 'data'
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(LOG_DIR / 'distributor.log', encoding='utf-8'),
logging.StreamHandler(),
]
)
logger = logging.getLogger(__name__)
X_API_KEY = os.getenv('X_API_KEY', '')
X_API_SECRET = os.getenv('X_API_SECRET', '')
X_ACCESS_TOKEN = os.getenv('X_ACCESS_TOKEN', '')
X_ACCESS_SECRET = os.getenv('X_ACCESS_SECRET', '')
X_API_V2 = 'https://api.twitter.com/2/tweets'
def _check_credentials() -> bool:
if not all([X_API_KEY, X_API_SECRET, X_ACCESS_TOKEN, X_ACCESS_SECRET]):
logger.warning("X API 자격증명 없음 (.env: X_API_KEY, X_API_SECRET, X_ACCESS_TOKEN, X_ACCESS_SECRET)")
return False
return True
def _get_auth() -> OAuth1:
return OAuth1(X_API_KEY, X_API_SECRET, X_ACCESS_TOKEN, X_ACCESS_SECRET)
def post_tweet(text: str, reply_to_id: str = '') -> str:
"""
단일 트윗 게시.
reply_to_id: 스레드 연결용 이전 트윗 ID
Returns: 트윗 ID
"""
if not _check_credentials():
return ''
payload = {'text': text}
if reply_to_id:
payload['reply'] = {'in_reply_to_tweet_id': reply_to_id}
try:
auth = _get_auth()
resp = requests.post(X_API_V2, json=payload, auth=auth, timeout=15)
resp.raise_for_status()
tweet_id = resp.json().get('data', {}).get('id', '')
logger.info(f"트윗 게시: {tweet_id} ({len(text)}자)")
return tweet_id
except Exception as e:
logger.error(f"트윗 게시 실패: {e}")
return ''
def publish_thread(article: dict, thread_data: list[dict]) -> bool:
"""
스레드 JSON → 순차 트윗 게시.
thread_data: thread_converter.convert() 반환값
"""
if not _check_credentials():
logger.info("X API 미설정 — 발행 건너뜀")
return False
title = article.get('title', '')
logger.info(f"X 스레드 발행 시작: {title} ({len(thread_data)}개 트윗)")
prev_id = ''
tweet_ids = []
for tweet in sorted(thread_data, key=lambda x: x['order']):
text = tweet['text']
tweet_id = post_tweet(text, prev_id)
if not tweet_id:
logger.error(f"스레드 중단: {tweet['order']}번 트윗 실패")
return False
tweet_ids.append(tweet_id)
prev_id = tweet_id
time.sleep(1) # rate limit 방지
logger.info(f"X 스레드 발행 완료: {len(tweet_ids)}")
_log_published(article, tweet_ids[0] if tweet_ids else '', 'x_thread')
return True
def publish_thread_from_file(article: dict, thread_file: str) -> bool:
"""파일에서 스레드 데이터 로드 후 게시"""
try:
data = json.loads(Path(thread_file).read_text(encoding='utf-8'))
return publish_thread(article, data)
except Exception as e:
logger.error(f"스레드 파일 로드 실패: {e}")
return False
def _log_published(article: dict, post_id: str, platform: str):
pub_dir = DATA_DIR / 'published'
pub_dir.mkdir(exist_ok=True)
from datetime import datetime
record = {
'platform': platform,
'post_id': post_id,
'title': article.get('title', ''),
'corner': article.get('corner', ''),
'published_at': datetime.now().isoformat(),
}
filename = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{platform}_{post_id}.json"
with open(pub_dir / filename, 'w', encoding='utf-8') as f:
json.dump(record, f, ensure_ascii=False, indent=2)
if __name__ == '__main__':
import sys
sys.path.insert(0, str(BASE_DIR / 'bots' / 'converters'))
import thread_converter
sample = {
'title': 'ChatGPT 처음 쓰는 사람을 위한 완전 가이드',
'slug': 'chatgpt-guide',
'corner': '쉬운세상',
'tags': ['ChatGPT', 'AI'],
'key_points': ['무료로 바로 시작', 'GPT-3.5로도 충분', '프롬프트가 핵심'],
}
threads = thread_converter.convert(sample, save_file=False)
for t in threads:
print(f"[{t['order']}] {t['text']}\n")
+186
View File
@@ -0,0 +1,186 @@
"""
유튜브 배포봇 (distributors/youtube_bot.py)
역할: 쇼츠 MP4 → YouTube Data API v3 업로드 (LAYER 3)
Phase 2.
사전 조건:
- Google Cloud에서 YouTube Data API v3 활성화 (기존 프로젝트에 추가)
- .env: YOUTUBE_CHANNEL_ID (기존 Google OAuth token.json 재사용)
"""
import json
import logging
import os
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
BASE_DIR = Path(__file__).parent.parent.parent
LOG_DIR = BASE_DIR / 'logs'
LOG_DIR.mkdir(exist_ok=True)
DATA_DIR = BASE_DIR / 'data'
TOKEN_PATH = BASE_DIR / 'token.json'
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(LOG_DIR / 'distributor.log', encoding='utf-8'),
logging.StreamHandler(),
]
)
logger = logging.getLogger(__name__)
YOUTUBE_CHANNEL_ID = os.getenv('YOUTUBE_CHANNEL_ID', '')
YOUTUBE_SCOPES = [
'https://www.googleapis.com/auth/youtube.upload',
'https://www.googleapis.com/auth/youtube',
]
CORNER_TAGS = {
'쉬운세상': ['AI활용', '디지털라이프', '쉬운세상', 'The4thPath', 'AI가이드'],
'숨은보물': ['숨은보물', 'AI도구', '생산성', 'The4thPath', 'AI툴'],
'바이브리포트': ['트렌드', 'AI시대', '바이브리포트', 'The4thPath'],
'팩트체크': ['팩트체크', 'AI뉴스', 'The4thPath'],
'한컷': ['한컷만평', 'AI시사', 'The4thPath'],
}
def _get_credentials():
"""기존 Google OAuth token.json 재사용"""
try:
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
if not TOKEN_PATH.exists():
raise RuntimeError("token.json 없음. scripts/get_token.py 먼저 실행")
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), YOUTUBE_SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
TOKEN_PATH.write_text(creds.to_json())
return creds
except Exception as e:
logger.error(f"YouTube 인증 실패: {e}")
return None
def build_video_metadata(article: dict) -> dict:
"""유튜브 업로드용 메타데이터 구성"""
title = article.get('title', '')
meta = article.get('meta', '')
corner = article.get('corner', '')
key_points = article.get('key_points', [])
slug = article.get('slug', '')
# 쇼츠는 #Shorts 태그 필수
description_parts = [meta, '']
if key_points:
for point in key_points[:3]:
description_parts.append(f'{point}')
description_parts.append('')
description_parts.append('the4thpath.com')
description_parts.append('#Shorts')
tags = CORNER_TAGS.get(corner, ['The4thPath']) + ['Shorts', 'AI']
return {
'snippet': {
'title': f'{title} #Shorts',
'description': '\n'.join(description_parts),
'tags': tags,
'categoryId': '28', # Science & Technology
},
'status': {
'privacyStatus': 'public',
'selfDeclaredMadeForKids': False,
},
}
def publish_shorts(article: dict, video_path: str) -> bool:
"""
쇼츠 MP4 → YouTube 업로드.
video_path: shorts_converter.convert()가 생성한 MP4
"""
if not Path(video_path).exists():
logger.error(f"영상 파일 없음: {video_path}")
return False
logger.info(f"YouTube 쇼츠 발행 시작: {article.get('title', '')}")
creds = _get_credentials()
if not creds:
return False
try:
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
service = build('youtube', 'v3', credentials=creds)
metadata = build_video_metadata(article)
media = MediaFileUpload(
video_path,
mimetype='video/mp4',
resumable=True,
chunksize=5 * 1024 * 1024, # 5MB chunks
)
request = service.videos().insert(
part='snippet,status',
body=metadata,
media_body=media,
)
response = None
while response is None:
status, response = request.next_chunk()
if status:
pct = int(status.progress() * 100)
logger.info(f"업로드 진행: {pct}%")
video_id = response.get('id', '')
video_url = f'https://www.youtube.com/shorts/{video_id}'
logger.info(f"YouTube 쇼츠 발행 완료: {video_url}")
_log_published(article, video_id, 'youtube_shorts', video_url)
return True
except Exception as e:
logger.error(f"YouTube 업로드 실패: {e}")
return False
def _log_published(article: dict, post_id: str, platform: str, url: str = ''):
pub_dir = DATA_DIR / 'published'
pub_dir.mkdir(exist_ok=True)
from datetime import datetime
record = {
'platform': platform,
'post_id': post_id,
'url': url,
'title': article.get('title', ''),
'corner': article.get('corner', ''),
'published_at': datetime.now().isoformat(),
}
filename = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{platform}_{post_id}.json"
with open(pub_dir / filename, 'w', encoding='utf-8') as f:
json.dump(record, f, ensure_ascii=False, indent=2)
if __name__ == '__main__':
sample = {
'title': 'ChatGPT 처음 쓰는 사람을 위한 완전 가이드',
'meta': 'ChatGPT를 처음 쓰는 분을 위한 단계별 가이드',
'slug': 'chatgpt-guide',
'corner': '쉬운세상',
'key_points': ['무료로 바로 시작', 'GPT-3.5로도 충분', '프롬프트가 핵심'],
}
meta = build_video_metadata(sample)
import pprint
pprint.pprint(meta)
+8 -8
View File
@@ -340,14 +340,14 @@ def publish(article: dict) -> bool:
send_pending_review_alert(article, review_reason)
return False
# 마크다운 → HTML
body_html, toc_html = markdown_to_html(article.get('body', ''))
# AdSense 플레이스홀더
body_html = insert_adsense_placeholders(body_html)
# 최종 HTML 조합
full_html = build_full_html(article, body_html, toc_html)
# 변환봇이 미리 생성한 HTML이 있으면 재사용, 없으면 직접 변환
if article.get('_html_content'):
full_html = article['_html_content']
else:
# 마크다운 → HTML (fallback)
body_html, toc_html = markdown_to_html(article.get('body', ''))
body_html = insert_adsense_placeholders(body_html)
full_html = build_full_html(article, body_html, toc_html)
# Google 인증
try:
+109
View File
@@ -0,0 +1,109 @@
"""
Remote Claude Bot
텔레그램 메시지를 Claude Agent SDK에 전달하고 결과를 돌려보냅니다.
실행: python bots/remote_claude.py
"""
import asyncio
import logging
import os
from pathlib import Path
from dotenv import load_dotenv
from telegram import Update
from telegram.ext import Application, MessageHandler, CommandHandler, filters, ContextTypes
from claude_agent_sdk import query, ClaudeAgentOptions, ResultMessage
load_dotenv()
BASE_DIR = Path(__file__).parent.parent
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN', '')
TELEGRAM_CHAT_ID = int(os.getenv('TELEGRAM_CHAT_ID', '0'))
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
)
logger = logging.getLogger(__name__)
MAX_MSG_LEN = 4000
def split_message(text: str) -> list[str]:
chunks = []
while len(text) > MAX_MSG_LEN:
chunks.append(text[:MAX_MSG_LEN])
text = text[MAX_MSG_LEN:]
if text:
chunks.append(text)
return chunks
async def run_claude(prompt: str) -> str:
result_text = ""
try:
async for message in query(
prompt=prompt,
options=ClaudeAgentOptions(
cwd=str(BASE_DIR),
allowed_tools=["Read", "Write", "Edit", "Bash", "Glob", "Grep"],
permission_mode="bypassPermissions",
max_turns=30,
)
):
if isinstance(message, ResultMessage):
result_text = message.result
except Exception as e:
result_text = f"오류: {e}"
return result_text or "(완료)"
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.message.chat_id != TELEGRAM_CHAT_ID:
return
prompt = update.message.text.strip()
logger.info(f"명령 수신: {prompt[:80]}")
await update.message.reply_text("처리 중...")
await context.bot.send_chat_action(chat_id=TELEGRAM_CHAT_ID, action="typing")
result = await run_claude(prompt)
for chunk in split_message(result):
await update.message.reply_text(chunk)
async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.message.chat_id != TELEGRAM_CHAT_ID:
return
await update.message.reply_text(
"Remote Claude Bot\n\n"
"자연어로 아무 지시나 입력하세요.\n\n"
"예시:\n"
"• scheduler.py 상태 확인해줘\n"
"• 수집봇 지금 실행해줘\n"
"• .env 파일 내용 보여줘\n"
"• requirements.txt에 패키지 추가해줘\n"
"• 오늘 로그 확인해줘\n\n"
"/help — 이 메시지"
)
def main():
if not TELEGRAM_BOT_TOKEN:
logger.error("TELEGRAM_BOT_TOKEN이 없습니다.")
return
app = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
app.add_handler(CommandHandler('help', cmd_help))
app.add_handler(MessageHandler(
filters.TEXT & ~filters.COMMAND & filters.Chat(TELEGRAM_CHAT_ID),
handle_message
))
logger.info(f"Remote Claude Bot 시작 (chat_id={TELEGRAM_CHAT_ID})")
app.run_polling(drop_pending_updates=True)
if __name__ == '__main__':
main()
+310 -25
View File
@@ -17,6 +17,8 @@ from dotenv import load_dotenv
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
import anthropic
load_dotenv()
BASE_DIR = Path(__file__).parent.parent
@@ -40,6 +42,42 @@ logger = logging.getLogger(__name__)
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN', '')
TELEGRAM_CHAT_ID = os.getenv('TELEGRAM_CHAT_ID', '')
ANTHROPIC_API_KEY = os.getenv('ANTHROPIC_API_KEY', '')
_claude_client: anthropic.Anthropic | None = None
_conversation_history: dict[int, list] = {}
CLAUDE_SYSTEM_PROMPT = """당신은 The 4th Path 블로그 자동 수익 엔진의 AI 어시스턴트입니다.
이 시스템(v3)은 4계층 구조로 운영됩니다:
[LAYER 1] AI 콘텐츠 생성: OpenClaw(GPT-5.4)가 원본 마크다운 1개 생성
[LAYER 2] 변환 엔진: 원본 → 블로그HTML / 인스타카드 / X스레드 / 뉴스레터 자동 변환
[LAYER 3] 배포 엔진: Blogger / Instagram / X / TikTok / YouTube 순차 발행
[LAYER 4] 분석봇: 성과 수집 + 주간 리포트 + 피드백 루프
봇 구성:
- collector_bot: 트렌드/RSS 수집 (07:00)
- ai_writer: OpenClaw 글 작성 트리거 (08:00)
- blog_converter: 마크다운→HTML (08:30)
- card_converter: 인스타 카드 1080×1080 (08:30)
- thread_converter: X 스레드 변환 (08:30)
- publisher_bot: Blogger 발행 (09:00)
- instagram_bot: 인스타 발행 (10:00)
- x_bot: X 스레드 게시 (11:00)
- analytics_bot: 분석/리포트 (22:00)
사용 가능한 텔레그램 명령:
/status — 봇 상태
/topics — 오늘 수집된 글감
/pending — 검토 대기 글 목록
/approve [번호] — 글 승인 및 발행
/reject [번호] — 글 거부
/report — 주간 리포트
/images — 이미지 제작 현황
/convert — 수동 변환 실행
사용자의 자연어 요청을 이해하고 적절히 안내하거나 답변해주세요.
한국어로 간결하게 답변하세요."""
IMAGE_MODE = os.getenv('IMAGE_MODE', 'manual').lower()
# request 모드에서 이미지 대기 시 사용하는 상태 변수
# {chat_id: prompt_id} — 다음에 받은 이미지를 어느 프롬프트에 연결할지 기억
@@ -107,7 +145,79 @@ def _call_openclaw(topic_data: dict, output_path: Path):
output_path.write_text(json.dumps(topic_data, ensure_ascii=False, indent=2), encoding='utf-8')
def job_convert():
"""08:30 — 변환 엔진: 원본 마크다운 → 5개 포맷 생성"""
if not _publish_enabled:
logger.info("[스케줄] 발행 중단 — 변환 건너뜀")
return
logger.info("[스케줄] 변환 엔진 시작")
try:
_run_conversion_pipeline()
except Exception as e:
logger.error(f"변환 엔진 오류: {e}")
def _run_conversion_pipeline():
"""originals/ 폴더의 미변환 원본을 5개 포맷으로 변환"""
originals_dir = DATA_DIR / 'originals'
originals_dir.mkdir(exist_ok=True)
today = datetime.now().strftime('%Y%m%d')
converters_path = str(BASE_DIR / 'bots' / 'converters')
sys.path.insert(0, converters_path)
sys.path.insert(0, str(BASE_DIR / 'bots'))
for orig_file in sorted(originals_dir.glob(f'{today}_*.json')):
converted_flag = orig_file.with_suffix('.converted')
if converted_flag.exists():
continue
try:
article = json.loads(orig_file.read_text(encoding='utf-8'))
slug = article.get('slug', 'article')
# 1. 블로그 HTML
import blog_converter
blog_converter.convert(article, save_file=True)
# 2. 인스타 카드
import card_converter
card_path = card_converter.convert(article, save_file=True)
if card_path:
article['_card_path'] = card_path
# 3. X 스레드
import thread_converter
thread_converter.convert(article, save_file=True)
# 4. 쇼츠 영상 (Phase 2 — card 생성 후 시도, 실패해도 계속)
if card_path:
try:
import shorts_converter
shorts_converter.convert(article, card_path=card_path, save_file=True)
except Exception as shorts_err:
logger.debug(f"쇼츠 변환 건너뜀 (Phase 2): {shorts_err}")
# 5. 뉴스레터 발췌 (주간 묶음용 — 개별 저장은 weekly_report에서)
# newsletter_converter는 주간 단위로 묶어서 처리
# 변환 완료 플래그
converted_flag.touch()
logger.info(f"변환 완료: {slug}")
# drafts에 복사 (발행봇이 읽도록)
drafts_dir = DATA_DIR / 'drafts'
drafts_dir.mkdir(exist_ok=True)
draft_path = drafts_dir / orig_file.name
if not draft_path.exists():
draft_path.write_text(
orig_file.read_text(encoding='utf-8'), encoding='utf-8'
)
except Exception as e:
logger.error(f"변환 오류 ({orig_file.name}): {e}")
def job_publish(slot: int):
"""09:00 — 블로그 발행 (슬롯별)"""
if not _publish_enabled:
logger.info(f"[스케줄] 발행 중단 — 슬롯 {slot} 건너뜀")
return
@@ -118,6 +228,125 @@ def job_publish(slot: int):
logger.error(f"발행봇 오류: {e}")
def job_distribute_instagram():
"""10:00 — 인스타그램 카드 발행"""
if not _publish_enabled:
return
logger.info("[스케줄] 인스타그램 발행")
try:
_distribute_instagram()
except Exception as e:
logger.error(f"인스타그램 배포 오류: {e}")
def _distribute_instagram():
sys.path.insert(0, str(BASE_DIR / 'bots' / 'distributors'))
import instagram_bot
today = datetime.now().strftime('%Y%m%d')
outputs_dir = DATA_DIR / 'outputs'
for card_file in sorted(outputs_dir.glob(f'{today}_*_card.png')):
ig_flag = card_file.with_suffix('.ig_done')
if ig_flag.exists():
continue
slug = card_file.stem.replace(f'{today}_', '').replace('_card', '')
article = _load_article_by_slug(today, slug)
if not article:
logger.warning(f"Instagram: 원본 article 없음 ({slug})")
continue
# image_host.py가 로컬 경로 → 공개 URL 변환 처리
success = instagram_bot.publish_card(article, str(card_file))
if success:
ig_flag.touch()
logger.info(f"Instagram 발행 완료: {card_file.name}")
def job_distribute_x():
"""11:00 — X 스레드 게시"""
if not _publish_enabled:
return
logger.info("[스케줄] X 스레드 게시")
try:
_distribute_x()
except Exception as e:
logger.error(f"X 배포 오류: {e}")
def _distribute_x():
sys.path.insert(0, str(BASE_DIR / 'bots' / 'distributors'))
import x_bot
today = datetime.now().strftime('%Y%m%d')
outputs_dir = DATA_DIR / 'outputs'
for thread_file in sorted(outputs_dir.glob(f'{today}_*_thread.json')):
x_flag = thread_file.with_suffix('.x_done')
if x_flag.exists():
continue
slug = thread_file.stem.replace(f'{today}_', '').replace('_thread', '')
article = _load_article_by_slug(today, slug)
if not article:
continue
thread_data = json.loads(thread_file.read_text(encoding='utf-8'))
success = x_bot.publish_thread(article, thread_data)
if success:
x_flag.touch()
def job_distribute_tiktok():
"""18:00 — TikTok 쇼츠 업로드"""
if not _publish_enabled:
return
logger.info("[스케줄] TikTok 쇼츠 업로드")
try:
_distribute_shorts('tiktok')
except Exception as e:
logger.error(f"TikTok 배포 오류: {e}")
def job_distribute_youtube():
"""20:00 — YouTube 쇼츠 업로드"""
if not _publish_enabled:
return
logger.info("[스케줄] YouTube 쇼츠 업로드")
try:
_distribute_shorts('youtube')
except Exception as e:
logger.error(f"YouTube 배포 오류: {e}")
def _distribute_shorts(platform: str):
"""틱톡/유튜브 쇼츠 MP4 배포 공통 로직"""
sys.path.insert(0, str(BASE_DIR / 'bots' / 'distributors'))
if platform == 'tiktok':
import tiktok_bot as dist_bot
else:
import youtube_bot as dist_bot
today = datetime.now().strftime('%Y%m%d')
outputs_dir = DATA_DIR / 'outputs'
for shorts_file in sorted(outputs_dir.glob(f'{today}_*_shorts.mp4')):
done_flag = shorts_file.with_suffix(f'.{platform}_done')
if done_flag.exists():
continue
slug = shorts_file.stem.replace(f'{today}_', '').replace('_shorts', '')
article = _load_article_by_slug(today, slug)
if not article:
logger.warning(f"{platform}: 원본 article 없음 ({slug})")
continue
success = dist_bot.publish_shorts(article, str(shorts_file))
if success:
done_flag.touch()
def _load_article_by_slug(date_str: str, slug: str) -> dict:
"""날짜+slug로 원본 article 로드"""
originals_dir = DATA_DIR / 'originals'
for f in originals_dir.glob(f'{date_str}_*{slug}*.json'):
try:
return json.loads(f.read_text(encoding='utf-8'))
except Exception:
pass
return {}
def _publish_next():
drafts_dir = DATA_DIR / 'drafts'
drafts_dir.mkdir(exist_ok=True)
@@ -127,14 +356,12 @@ def _publish_next():
if article.get('_pending_openclaw'):
continue
sys.path.insert(0, str(BASE_DIR / 'bots'))
sys.path.insert(0, str(BASE_DIR / 'bots' / 'converters'))
import publisher_bot
import linker_bot
import markdown as md_lib
body_html = md_lib.markdown(
article.get('body', ''), extensions=['toc', 'tables', 'fenced_code']
)
body_html = linker_bot.process(article, body_html)
article['body'] = body_html
import blog_converter
# 변환봇으로 HTML 생성 (이미 변환된 경우 outputs에서 읽음)
html = blog_converter.convert(article, save_file=False)
article['_html_content'] = html
article['_body_is_html'] = True
publisher_bot.publish(article)
draft_file.unlink(missing_ok=True)
@@ -271,6 +498,26 @@ async def cmd_report(update: Update, context: ContextTypes.DEFAULT_TYPE):
analytics_bot.weekly_report()
async def cmd_convert(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""수동 변환 실행"""
await update.message.reply_text("변환 엔진 실행 중...")
try:
_run_conversion_pipeline()
outputs_dir = DATA_DIR / 'outputs'
today = datetime.now().strftime('%Y%m%d')
blogs = len(list(outputs_dir.glob(f'{today}_*_blog.html')))
cards = len(list(outputs_dir.glob(f'{today}_*_card.png')))
threads = len(list(outputs_dir.glob(f'{today}_*_thread.json')))
await update.message.reply_text(
f"변환 완료\n"
f"블로그 HTML: {blogs}\n"
f"인스타 카드: {cards}\n"
f"X 스레드: {threads}"
)
except Exception as e:
await update.message.reply_text(f"변환 오류: {e}")
# ─── 이미지 관련 명령 (request 모드) ────────────────
async def cmd_images(update: Update, context: ContextTypes.DEFAULT_TYPE):
@@ -438,6 +685,8 @@ async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
text = update.message.text.strip()
chat_id = update.message.chat_id
cmd_map = {
'발행 중단': cmd_stop_publish,
'발행 재개': cmd_resume_publish,
@@ -445,26 +694,45 @@ async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
'이번 주 리포트': cmd_report,
'대기 중인 글 보여줘': cmd_pending,
'이미지 목록': cmd_images,
'변환 실행': cmd_convert,
'오늘 뭐 발행했어?': cmd_status,
}
if text in cmd_map:
await cmd_map[text](update, context)
else:
return
# Claude API로 자연어 처리
if not ANTHROPIC_API_KEY:
await update.message.reply_text(
"사용 가능한 명령:\n"
"• 발행 중단 / 발행 재개\n"
"• 오늘 수집된 글감 보여줘\n"
"• 대기 중인 글 보여줘\n"
"• 이번 주 리포트\n"
"• 이미지 목록\n\n"
"슬래시 명령:\n"
"/approve [번호] — 글 승인\n"
"/reject [번호] — 글 거부\n"
"/images — 이미지 제작 현황\n"
"/imgpick [번호] — 프롬프트 선택\n"
"/imgbatch — 프롬프트 배치 전송\n"
"/imgcancel — 이미지 대기 취소\n"
"/status — 봇 상태"
"Claude API 키가 없습니다. .env 파일에 ANTHROPIC_API_KEY를 입력하세요."
)
return
global _claude_client
if _claude_client is None:
_claude_client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
history = _conversation_history.setdefault(chat_id, [])
history.append({"role": "user", "content": text})
# 대화 기록이 너무 길면 최근 20개만 유지
if len(history) > 20:
history[:] = history[-20:]
try:
await context.bot.send_chat_action(chat_id=chat_id, action="typing")
response = _claude_client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
system=CLAUDE_SYSTEM_PROMPT,
messages=history,
)
reply = response.content[0].text
history.append({"role": "assistant", "content": reply})
await update.message.reply_text(reply)
except Exception as e:
logger.error(f"Claude API 오류: {e}")
await update.message.reply_text(f"오류가 발생했습니다: {e}")
# ─── 스케줄러 설정 + 메인 ─────────────────────────────
@@ -473,6 +741,7 @@ def setup_scheduler() -> AsyncIOScheduler:
scheduler = AsyncIOScheduler(timezone='Asia/Seoul')
schedule_cfg = load_schedule()
# schedule.json 기반 동적 잡 (기존)
job_map = {
'collector': job_collector,
'ai_writer': job_ai_writer,
@@ -486,9 +755,24 @@ def setup_scheduler() -> AsyncIOScheduler:
if fn:
scheduler.add_job(fn, 'cron', hour=job['hour'], minute=job['minute'], id=job['id'])
# 고정 스케줄
# v3 고정 스케줄: 시차 배포
# 07:00 수집봇 (schedule.json에서 관리)
# 08:00 AI 글 작성 (schedule.json에서 관리)
scheduler.add_job(job_convert, 'cron', hour=8, minute=30, id='convert') # 08:30 변환
scheduler.add_job(lambda: job_publish(1), 'cron',
hour=9, minute=0, id='blog_publish') # 09:00 블로그
scheduler.add_job(job_distribute_instagram, 'cron',
hour=10, minute=0, id='instagram_dist') # 10:00 인스타
scheduler.add_job(job_distribute_x, 'cron',
hour=11, minute=0, id='x_dist') # 11:00 X
scheduler.add_job(job_distribute_tiktok, 'cron',
hour=18, minute=0, id='tiktok_dist') # 18:00 틱톡
scheduler.add_job(job_distribute_youtube, 'cron',
hour=20, minute=0, id='youtube_dist') # 20:00 유튜브
scheduler.add_job(job_analytics_daily, 'cron',
hour=22, minute=0, id='daily_report') # 22:00 분석
scheduler.add_job(job_analytics_weekly, 'cron',
day_of_week='sun', hour=22, minute=30, id='weekly_report')
day_of_week='sun', hour=22, minute=30, id='weekly_report') # 일요일 주간
# request 모드: 매주 월요일 10:00 이미지 프롬프트 배치 전송
if IMAGE_MODE == 'request':
@@ -496,7 +780,7 @@ def setup_scheduler() -> AsyncIOScheduler:
day_of_week='mon', hour=10, minute=0, id='image_batch')
logger.info("이미지 request 모드: 매주 월요일 10:00 배치 전송 등록")
logger.info("스케줄러 설정 완료")
logger.info("스케줄러 설정 완료 (v3 시차 배포 포함)")
return scheduler
@@ -515,6 +799,7 @@ async def main():
app.add_handler(CommandHandler('pending', cmd_pending))
app.add_handler(CommandHandler('report', cmd_report))
app.add_handler(CommandHandler('topics', cmd_show_topics))
app.add_handler(CommandHandler('convert', cmd_convert))
# 이미지 관련 (request / manual 공통 사용 가능)
app.add_handler(CommandHandler('images', cmd_images))