- publish() 성공 시 topics/ 에서 해당 topic 파일 자동 삭제 - /write, /collect 목록에 발행 제목 유사도 80% 필터 추가 - _load_published_titles(), _filter_unpublished() 헬퍼 추가 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
729 lines
27 KiB
Python
729 lines
27 KiB
Python
"""
|
|
발행봇 (publisher_bot.py)
|
|
역할: AI가 작성한 글을 Blogger에 자동 발행
|
|
- 마크다운 → HTML 변환
|
|
- 목차 자동 생성
|
|
- AdSense 플레이스홀더 삽입
|
|
- Schema.org Article JSON-LD
|
|
- 안전장치 (팩트체크/위험 키워드/출처 부족 → 수동 검토)
|
|
- Blogger API v3 발행
|
|
- Search Console URL 제출
|
|
- Telegram 알림
|
|
"""
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import markdown
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from dotenv import load_dotenv
|
|
from google.oauth2.credentials import Credentials
|
|
from google.auth.transport.requests import Request
|
|
from googleapiclient.discovery import build
|
|
|
|
load_dotenv()
|
|
|
|
BASE_DIR = Path(__file__).parent.parent
|
|
CONFIG_DIR = BASE_DIR / 'config'
|
|
DATA_DIR = BASE_DIR / 'data'
|
|
LOG_DIR = BASE_DIR / 'logs'
|
|
TOKEN_PATH = BASE_DIR / 'token.json'
|
|
LOG_DIR.mkdir(exist_ok=True)
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s [%(levelname)s] %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(LOG_DIR / 'publisher.log', encoding='utf-8'),
|
|
logging.StreamHandler(),
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN', '')
|
|
TELEGRAM_CHAT_ID = os.getenv('TELEGRAM_CHAT_ID', '')
|
|
BLOG_MAIN_ID = os.getenv('BLOG_MAIN_ID', '')
|
|
|
|
SCOPES = [
|
|
'https://www.googleapis.com/auth/blogger',
|
|
'https://www.googleapis.com/auth/webmasters',
|
|
]
|
|
|
|
|
|
def load_config(filename: str) -> dict:
|
|
with open(CONFIG_DIR / filename, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
|
|
|
|
# ─── Google 인증 ─────────────────────────────────────
|
|
|
|
def get_google_credentials() -> Credentials:
|
|
creds = None
|
|
# 1) token.json 파일 우선
|
|
if TOKEN_PATH.exists():
|
|
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), SCOPES)
|
|
if not creds or not creds.valid:
|
|
if creds and creds.expired and creds.refresh_token:
|
|
creds.refresh(Request())
|
|
with open(TOKEN_PATH, 'w') as f:
|
|
f.write(creds.to_json())
|
|
# 2) .env의 GOOGLE_REFRESH_TOKEN으로 직접 생성 (Docker 환경 대응)
|
|
if not creds or not creds.valid:
|
|
refresh_token = os.getenv('GOOGLE_REFRESH_TOKEN', '')
|
|
client_id = os.getenv('GOOGLE_CLIENT_ID', '')
|
|
client_secret = os.getenv('GOOGLE_CLIENT_SECRET', '')
|
|
if refresh_token and client_id and client_secret:
|
|
creds = Credentials(
|
|
token=None,
|
|
refresh_token=refresh_token,
|
|
token_uri='https://oauth2.googleapis.com/token',
|
|
client_id=client_id,
|
|
client_secret=client_secret,
|
|
scopes=SCOPES,
|
|
)
|
|
creds.refresh(Request())
|
|
with open(TOKEN_PATH, 'w') as f:
|
|
f.write(creds.to_json())
|
|
logger.info("Google 인증 성공 (.env refresh token)")
|
|
if not creds or not creds.valid:
|
|
raise RuntimeError("Google 인증 실패. scripts/get_token.py 를 먼저 실행하세요.")
|
|
return creds
|
|
|
|
|
|
# ─── 안전장치 ─────────────────────────────────────────
|
|
|
|
def check_safety(article: dict, safety_cfg: dict) -> tuple[bool, str]:
|
|
"""
|
|
수동 검토가 필요한지 판단.
|
|
Returns: (needs_review, reason)
|
|
"""
|
|
corner = article.get('corner', '')
|
|
body = article.get('body', '')
|
|
sources = article.get('sources', [])
|
|
quality_score = article.get('quality_score', 100)
|
|
|
|
# 팩트체크 코너는 무조건 수동 검토
|
|
manual_corners = safety_cfg.get('always_manual_review', ['팩트체크'])
|
|
if corner in manual_corners:
|
|
return True, f'코너 "{corner}" 는 항상 수동 검토 필요'
|
|
|
|
# 위험 키워드 감지
|
|
all_keywords = (
|
|
safety_cfg.get('crypto_keywords', []) +
|
|
safety_cfg.get('criticism_keywords', []) +
|
|
safety_cfg.get('investment_keywords', []) +
|
|
safety_cfg.get('legal_keywords', [])
|
|
)
|
|
for kw in all_keywords:
|
|
if kw in body:
|
|
return True, f'위험 키워드 감지: "{kw}"'
|
|
|
|
# 출처 2개 미만
|
|
min_sources = safety_cfg.get('min_sources_required', 2)
|
|
if len(sources) < min_sources:
|
|
return True, f'출처 {len(sources)}개 — {min_sources}개 이상 필요'
|
|
|
|
# 품질 점수 미달
|
|
min_score = safety_cfg.get('min_quality_score_for_auto', 75)
|
|
if quality_score < min_score:
|
|
return True, f'품질 점수 {quality_score}점 (자동 발행 최소: {min_score}점)'
|
|
|
|
return False, ''
|
|
|
|
|
|
# ─── HTML 변환 ─────────────────────────────────────────
|
|
|
|
def markdown_to_html(md_text: str) -> str:
|
|
"""마크다운 → HTML 변환 (목차 extension 포함)"""
|
|
md = markdown.Markdown(
|
|
extensions=['toc', 'tables', 'fenced_code', 'attr_list'],
|
|
extension_configs={
|
|
'toc': {
|
|
'title': '목차',
|
|
'toc_depth': '2-3',
|
|
}
|
|
}
|
|
)
|
|
html = md.convert(md_text)
|
|
toc = md.toc # 목차 HTML
|
|
return html, toc
|
|
|
|
|
|
def insert_adsense_placeholders(html: str) -> str:
|
|
"""두 번째 H2 뒤와 결론 섹션 앞에 AdSense 플레이스홀더 삽입"""
|
|
AD_SLOT_1 = '\n<!-- AD_SLOT_1 -->\n'
|
|
AD_SLOT_2 = '\n<!-- AD_SLOT_2 -->\n'
|
|
|
|
soup = BeautifulSoup(html, 'lxml')
|
|
h2_tags = soup.find_all('h2')
|
|
|
|
# 두 번째 H2 뒤에 AD_SLOT_1 삽입
|
|
if len(h2_tags) >= 2:
|
|
second_h2 = h2_tags[1]
|
|
ad_tag = BeautifulSoup(AD_SLOT_1, 'html.parser')
|
|
second_h2.insert_after(ad_tag)
|
|
|
|
# 결론 H2 앞에 AD_SLOT_2 삽입
|
|
for h2 in soup.find_all('h2'):
|
|
if any(kw in h2.get_text() for kw in ['결론', '마무리', '정리', '요약', 'conclusion']):
|
|
ad_tag2 = BeautifulSoup(AD_SLOT_2, 'html.parser')
|
|
h2.insert_before(ad_tag2)
|
|
break
|
|
|
|
return str(soup)
|
|
|
|
|
|
def build_json_ld(article: dict, blog_url: str = '') -> str:
|
|
"""Schema.org Article JSON-LD 생성"""
|
|
schema = {
|
|
"@context": "https://schema.org",
|
|
"@type": "Article",
|
|
"headline": article.get('title', ''),
|
|
"description": article.get('meta', ''),
|
|
"datePublished": datetime.now(timezone.utc).isoformat(),
|
|
"dateModified": datetime.now(timezone.utc).isoformat(),
|
|
"author": {
|
|
"@type": "Person",
|
|
"name": "테크인사이더"
|
|
},
|
|
"publisher": {
|
|
"@type": "Organization",
|
|
"name": "테크인사이더",
|
|
"logo": {
|
|
"@type": "ImageObject",
|
|
"url": ""
|
|
}
|
|
},
|
|
"mainEntityOfPage": {
|
|
"@type": "WebPage",
|
|
"@id": blog_url
|
|
}
|
|
}
|
|
return f'<script type="application/ld+json">\n{json.dumps(schema, ensure_ascii=False, indent=2)}\n</script>'
|
|
|
|
|
|
def _is_platform_logo(image_url: str) -> bool:
|
|
"""플랫폼 로고/아이콘 이미지인지 판별 — 대표 이미지로 부적합"""
|
|
skip_patterns = [
|
|
'logo', 'icon', 'avatar', 'banner', '/ad/',
|
|
'google.com/images/branding', 'googlenews', 'google-news',
|
|
'lh3.googleusercontent.com', # Google News CDN 썸네일
|
|
'facebook.com', 'twitter.com', 'naver.com/favicon',
|
|
'default_image', 'placeholder', 'noimage', 'no-image',
|
|
'og-default', 'share-default', 'sns_', 'common/',
|
|
]
|
|
url_lower = image_url.lower()
|
|
return any(p in url_lower for p in skip_patterns)
|
|
|
|
|
|
def _fetch_og_image(url: str) -> str:
|
|
"""원본 기사 URL에서 og:image 메타태그 크롤링"""
|
|
if not url or not url.startswith('http'):
|
|
return ''
|
|
# Google 뉴스 리다이렉트인 경우 실제 기사 URL 추출 시도 (head는 리다이렉트 안됨 → get 사용)
|
|
if 'news.google.com' in url:
|
|
try:
|
|
resp = requests.get(url, timeout=15, allow_redirects=True,
|
|
headers={'User-Agent': 'Mozilla/5.0 (compatible; BlogBot/1.0)'})
|
|
if resp.url and 'news.google.com' not in resp.url:
|
|
url = resp.url
|
|
except Exception:
|
|
pass
|
|
try:
|
|
resp = requests.get(url, timeout=10, headers={
|
|
'User-Agent': 'Mozilla/5.0 (compatible; BlogBot/1.0)',
|
|
})
|
|
if resp.status_code != 200:
|
|
return ''
|
|
soup = BeautifulSoup(resp.text, 'lxml')
|
|
# og:image
|
|
og = soup.find('meta', property='og:image')
|
|
if og and og.get('content', '').startswith('http'):
|
|
if not _is_platform_logo(og['content']):
|
|
return og['content']
|
|
# twitter:image
|
|
tw = soup.find('meta', attrs={'name': 'twitter:image'})
|
|
if tw and tw.get('content', '').startswith('http'):
|
|
if not _is_platform_logo(tw['content']):
|
|
return tw['content']
|
|
# 본문 첫 번째 큰 이미지
|
|
for img in soup.find_all('img', src=True):
|
|
src = img['src']
|
|
if src.startswith('http') and not _is_platform_logo(src):
|
|
return src
|
|
except Exception as e:
|
|
logger.warning(f"og:image 크롤링 실패 ({url}): {e}")
|
|
return ''
|
|
|
|
|
|
def _search_real_article_image(sources: list, topic: str = '') -> str:
|
|
"""Google News 소스 → DuckDuckGo 검색으로 실제 기사 URL 찾기 → og:image 크롤링"""
|
|
from urllib.parse import quote as _quote, urlparse, parse_qs, unquote
|
|
from bs4 import BeautifulSoup as _BS
|
|
|
|
for src in sources[:3]:
|
|
title = src.get('title', '')
|
|
if not title:
|
|
continue
|
|
# "기사 제목 - 매체명" → 매체명 제거
|
|
clean_title = re.sub(r'\s*[-–—]\s*\S+$', '', title).strip()
|
|
if len(clean_title) < 5:
|
|
clean_title = title
|
|
try:
|
|
ddg_url = f'https://html.duckduckgo.com/html/?q={_quote(clean_title)}'
|
|
resp = requests.get(ddg_url, timeout=10, headers={
|
|
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'
|
|
})
|
|
if resp.status_code != 200:
|
|
continue
|
|
soup = _BS(resp.text, 'lxml')
|
|
# DuckDuckGo는 redirect URL 사용: //duckduckgo.com/l/?uddg=실제URL
|
|
for a_tag in soup.select('a.result__a')[:3]:
|
|
href = a_tag.get('href', '')
|
|
# uddg 파라미터에서 실제 URL 추출
|
|
real_url = href
|
|
if 'uddg=' in href:
|
|
parsed = parse_qs(urlparse(href).query)
|
|
uddg = parsed.get('uddg', [''])[0]
|
|
if uddg:
|
|
real_url = unquote(uddg)
|
|
if not real_url.startswith('http'):
|
|
continue
|
|
if 'news.google.com' in real_url:
|
|
continue
|
|
# 실제 기사 URL에서 og:image 크롤링
|
|
img = _fetch_og_image(real_url)
|
|
if img:
|
|
logger.info(f"원문 기사 이미지 발견: {real_url[:50]} → {img[:60]}")
|
|
return img
|
|
except Exception as e:
|
|
logger.debug(f"DuckDuckGo 검색 실패: {e}")
|
|
return ''
|
|
|
|
|
|
def fetch_featured_image(article: dict) -> str:
|
|
"""대표 이미지: RSS 이미지 → 원문 기사 → og:image → Wikipedia 순으로 시도"""
|
|
# 1) RSS 수집 시 가져온 소스 이미지 (플랫폼 로고 제외)
|
|
source_image = article.get('source_image', '')
|
|
if source_image and source_image.startswith('http') and not _is_platform_logo(source_image):
|
|
try:
|
|
resp = requests.head(source_image, timeout=5, allow_redirects=True)
|
|
if resp.status_code == 200:
|
|
return source_image
|
|
except Exception:
|
|
pass
|
|
|
|
# 2) 원본 기사 URL에서 og:image 크롤링
|
|
source_url = article.get('source_url', '')
|
|
og_image = _fetch_og_image(source_url)
|
|
if og_image:
|
|
return og_image
|
|
|
|
# 3) Google News 소스 → DuckDuckGo로 실제 기사 검색 → og:image
|
|
sources = article.get('sources', [])
|
|
if sources:
|
|
real_image = _search_real_article_image(sources, article.get('title', ''))
|
|
if real_image:
|
|
return real_image
|
|
|
|
# 4) Pexels API (키가 있을 때)
|
|
pexels_key = os.getenv('PEXELS_API_KEY', '')
|
|
if pexels_key:
|
|
tags = article.get('tags', [])
|
|
if isinstance(tags, str):
|
|
tags = [t.strip() for t in tags.split(',')]
|
|
query = tags[0] if tags else article.get('corner', 'technology')
|
|
try:
|
|
resp = requests.get(
|
|
'https://api.pexels.com/v1/search',
|
|
headers={'Authorization': pexels_key},
|
|
params={'query': query, 'per_page': 1, 'orientation': 'landscape'},
|
|
timeout=10,
|
|
)
|
|
if resp.status_code == 200:
|
|
photos = resp.json().get('photos', [])
|
|
if photos:
|
|
return photos[0]['src']['large']
|
|
except Exception as e:
|
|
logger.warning(f"Pexels 이미지 검색 실패: {e}")
|
|
|
|
# 5) Wikipedia 썸네일 (무료, API 키 불필요)
|
|
tags = article.get('tags', [])
|
|
if isinstance(tags, str):
|
|
tags = [t.strip() for t in tags.split(',')]
|
|
search_keywords = [t for t in tags if t and len(t) <= 15][:8]
|
|
from urllib.parse import quote as _quote
|
|
for kw in search_keywords:
|
|
for lang in ['ko', 'en']:
|
|
try:
|
|
wiki_url = f'https://{lang}.wikipedia.org/api/rest_v1/page/summary/{_quote(kw)}'
|
|
resp = requests.get(wiki_url, timeout=6,
|
|
headers={'User-Agent': 'Mozilla/5.0 (compatible; BlogBot/1.0)'})
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
thumb = data.get('thumbnail', {}).get('source', '')
|
|
if thumb and thumb.startswith('http') and not _is_platform_logo(thumb):
|
|
thumb = re.sub(r'/\d+px-', '/800px-', thumb)
|
|
logger.info(f"Wikipedia({lang}) 이미지 사용: {kw} → {thumb[:60]}")
|
|
return thumb
|
|
except Exception:
|
|
pass
|
|
|
|
return ''
|
|
|
|
|
|
def _insert_toc_after_image(body_html: str, toc_block: str) -> str:
|
|
"""본문에 대표이미지가 있으면 이미지 뒤에, 없으면 맨 앞에 TOC 삽입"""
|
|
import re as _re
|
|
# 본문 시작이 <img 태그이면 그 뒤에 삽입
|
|
m = _re.match(r'(<img\s[^>]*/>)\s*', body_html)
|
|
if m:
|
|
return m.group(0) + toc_block + body_html[m.end():]
|
|
return toc_block + body_html
|
|
|
|
|
|
def build_full_html(article: dict, body_html: str, toc_html: str) -> str:
|
|
"""최종 HTML 조합: 대표이미지 + JSON-LD + 목차 + 본문 + 면책 문구"""
|
|
json_ld = build_json_ld(article)
|
|
disclaimer = article.get('disclaimer', '')
|
|
|
|
# 본문에 이미 <img> 태그가 있으면 대표 이미지 삽입 건너뜀
|
|
has_image = '<img ' in body_html.lower()
|
|
|
|
html_parts = []
|
|
if not has_image:
|
|
image_url = fetch_featured_image(article)
|
|
if image_url:
|
|
title = article.get('title', '').replace('"', '"')
|
|
# Blogger 호환: div 래핑 없이 직접 img 삽입 (본문 첫 줄에 배치)
|
|
img_tag = (
|
|
f'<img src="{image_url}" alt="{title}" '
|
|
f'width="100%" style="max-height:420px;object-fit:cover;border-radius:8px;'
|
|
f'margin-bottom:1.2em;" />'
|
|
)
|
|
# body_html 맨 앞에 이미지 삽입 (Blogger가 div를 제거하는 문제 방지)
|
|
body_html = img_tag + '\n' + body_html
|
|
|
|
html_parts.append(json_ld)
|
|
# 목차: h2가 3개 이상이고 TOC에 실제 링크가 있을 때만 표시
|
|
h2_count = body_html.lower().count('<h2')
|
|
toc_has_links = toc_html and '<a ' in toc_html and h2_count >= 3
|
|
if toc_has_links:
|
|
# 이미지 뒤, 본문 앞에 목차 삽입
|
|
toc_block = f'<div class="toc-wrapper">{toc_html}</div>\n'
|
|
body_html = _insert_toc_after_image(body_html, toc_block)
|
|
html_parts.append(body_html)
|
|
|
|
# 원문 출처 링크
|
|
sources = article.get('sources', [])
|
|
source_url = article.get('source_url', '')
|
|
source_name = article.get('source_name', '') or article.get('source', '')
|
|
if sources or source_url:
|
|
html_parts.append('<hr/>')
|
|
html_parts.append('<div class="source-info" style="margin:1.5em 0;padding:1em;'
|
|
'background:#f8f9fa;border-left:4px solid #ddd;border-radius:4px;'
|
|
'font-size:0.9em;color:#555;">')
|
|
html_parts.append('<b>📌 원문 출처</b><br/>')
|
|
seen = set()
|
|
if sources:
|
|
for src in sources:
|
|
url = src.get('url', '')
|
|
title = src.get('title', '') or url
|
|
if url and url not in seen:
|
|
seen.add(url)
|
|
html_parts.append(f'• <a href="{url}" target="_blank" rel="noopener">{title}</a><br/>')
|
|
if source_url and source_url not in seen:
|
|
label = source_name or source_url
|
|
html_parts.append(f'• <a href="{source_url}" target="_blank" rel="noopener">{label}</a><br/>')
|
|
html_parts.append('</div>')
|
|
|
|
if disclaimer:
|
|
html_parts.append(f'<p class="disclaimer"><small>{disclaimer}</small></p>')
|
|
|
|
return '\n'.join(html_parts)
|
|
|
|
|
|
# ─── Blogger API ──────────────────────────────────────
|
|
|
|
def publish_to_blogger(article: dict, html_content: str, creds: Credentials) -> dict:
|
|
"""Blogger API v3로 글 발행"""
|
|
service = build('blogger', 'v3', credentials=creds)
|
|
blog_id = BLOG_MAIN_ID
|
|
|
|
labels = [article.get('corner', '')]
|
|
tags = article.get('tags', [])
|
|
if isinstance(tags, str):
|
|
tags = [t.strip() for t in tags.split(',')]
|
|
labels.extend(tags)
|
|
labels = list(set(filter(None, labels)))
|
|
|
|
body = {
|
|
'title': article.get('title', ''),
|
|
'content': html_content,
|
|
'labels': labels,
|
|
}
|
|
|
|
result = service.posts().insert(
|
|
blogId=blog_id,
|
|
body=body,
|
|
isDraft=False,
|
|
).execute()
|
|
|
|
return result
|
|
|
|
|
|
def submit_to_search_console(url: str, creds: Credentials):
|
|
"""Google Search Console URL 색인 요청"""
|
|
try:
|
|
service = build('searchconsole', 'v1', credentials=creds)
|
|
# URL Inspection API (실제 indexing 요청)
|
|
# 참고: 일반적으로 Blogger sitemap이 자동 제출되므로 보조 수단
|
|
logger.info(f"Search Console 제출: {url}")
|
|
# indexing API는 별도 서비스 계정 필요. 여기서는 로그만 남김.
|
|
# 실제 색인 촉진은 Blogger 내장 sitemap에 의존
|
|
except Exception as e:
|
|
logger.warning(f"Search Console 제출 실패: {e}")
|
|
|
|
|
|
# ─── Telegram ────────────────────────────────────────
|
|
|
|
def send_telegram(text: str, parse_mode: str = 'HTML'):
|
|
"""Telegram 메시지 전송"""
|
|
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
|
|
logger.warning("Telegram 설정 없음 — 알림 건너뜀")
|
|
return
|
|
url = f'https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage'
|
|
payload = {
|
|
'chat_id': TELEGRAM_CHAT_ID,
|
|
'text': text,
|
|
'parse_mode': parse_mode,
|
|
}
|
|
try:
|
|
resp = requests.post(url, json=payload, timeout=10)
|
|
resp.raise_for_status()
|
|
except Exception as e:
|
|
logger.error(f"Telegram 전송 실패: {e}")
|
|
|
|
|
|
def send_pending_review_alert(article: dict, reason: str):
|
|
"""수동 검토 대기 알림 (Telegram)"""
|
|
title = article.get('title', '(제목 없음)')
|
|
corner = article.get('corner', '')
|
|
preview = article.get('body', '')[:300].replace('<', '<').replace('>', '>')
|
|
msg = (
|
|
f"🔍 <b>[수동 검토 필요]</b>\n\n"
|
|
f"📌 <b>{title}</b>\n"
|
|
f"코너: {corner}\n"
|
|
f"사유: {reason}\n\n"
|
|
f"미리보기:\n{preview}...\n\n"
|
|
f"명령: <code>승인</code> 또는 <code>거부</code>"
|
|
)
|
|
send_telegram(msg)
|
|
|
|
|
|
# ─── 발행 이력 ───────────────────────────────────────
|
|
|
|
def log_published(article: dict, post_result: dict):
|
|
"""발행 이력 저장"""
|
|
published_dir = DATA_DIR / 'published'
|
|
published_dir.mkdir(exist_ok=True)
|
|
record = {
|
|
'title': article.get('title', ''),
|
|
'corner': article.get('corner', ''),
|
|
'url': post_result.get('url', ''),
|
|
'post_id': post_result.get('id', ''),
|
|
'published_at': datetime.now(timezone.utc).isoformat(),
|
|
'quality_score': article.get('quality_score', 0),
|
|
'tags': article.get('tags', []),
|
|
'sources': article.get('sources', []),
|
|
}
|
|
filename = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{record['post_id']}.json"
|
|
with open(published_dir / filename, 'w', encoding='utf-8') as f:
|
|
json.dump(record, f, ensure_ascii=False, indent=2)
|
|
return record
|
|
|
|
|
|
def _cleanup_published_topic(article: dict):
|
|
"""발행 완료된 topic 파일을 topics/ 에서 삭제"""
|
|
import hashlib
|
|
topics_dir = DATA_DIR / 'topics'
|
|
topic_text = article.get('topic', '') or article.get('title', '')
|
|
if not topic_text:
|
|
return
|
|
topic_id = hashlib.md5(topic_text.encode()).hexdigest()[:8]
|
|
for f in topics_dir.glob(f'*_{topic_id}.json'):
|
|
try:
|
|
f.unlink()
|
|
logger.info(f"발행 완료 topic 파일 삭제: {f.name}")
|
|
except Exception as e:
|
|
logger.debug(f"topic 파일 삭제 실패: {e}")
|
|
|
|
|
|
def save_pending_review(article: dict, reason: str):
|
|
"""수동 검토 대기 글 저장"""
|
|
pending_dir = DATA_DIR / 'pending_review'
|
|
pending_dir.mkdir(exist_ok=True)
|
|
record = {**article, 'pending_reason': reason, 'created_at': datetime.now().isoformat()}
|
|
filename = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_pending.json"
|
|
with open(pending_dir / filename, 'w', encoding='utf-8') as f:
|
|
json.dump(record, f, ensure_ascii=False, indent=2)
|
|
return pending_dir / filename
|
|
|
|
|
|
def load_pending_review_file(filepath: str) -> dict:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
|
|
|
|
# ─── 메인 발행 함수 ──────────────────────────────────
|
|
|
|
def publish(article: dict) -> bool:
|
|
"""
|
|
article: OpenClaw blog-writer가 출력한 파싱된 글 dict
|
|
{
|
|
title, meta, slug, tags, corner, body (markdown),
|
|
coupang_keywords, sources, disclaimer, quality_score
|
|
}
|
|
Returns: True(발행 성공) / False(수동 검토 대기)
|
|
"""
|
|
logger.info(f"발행 시도: {article.get('title', '')}")
|
|
safety_cfg = load_config('safety_keywords.json')
|
|
|
|
# 안전장치 검사
|
|
needs_review, review_reason = check_safety(article, safety_cfg)
|
|
if needs_review:
|
|
logger.warning(f"수동 검토 대기: {review_reason}")
|
|
save_pending_review(article, review_reason)
|
|
send_pending_review_alert(article, review_reason)
|
|
return False
|
|
|
|
# 변환봇이 미리 생성한 HTML이 있으면 재사용, 없으면 직접 변환
|
|
if article.get('_html_content'):
|
|
full_html = article['_html_content']
|
|
else:
|
|
# 마크다운 → HTML (fallback)
|
|
body_html, toc_html = markdown_to_html(article.get('body', ''))
|
|
body_html = insert_adsense_placeholders(body_html)
|
|
full_html = build_full_html(article, body_html, toc_html)
|
|
|
|
# Google 인증
|
|
try:
|
|
creds = get_google_credentials()
|
|
except RuntimeError as e:
|
|
logger.error(str(e))
|
|
return False
|
|
|
|
# Blogger 발행
|
|
try:
|
|
post_result = publish_to_blogger(article, full_html, creds)
|
|
post_url = post_result.get('url', '')
|
|
logger.info(f"발행 완료: {post_url}")
|
|
except Exception as e:
|
|
logger.error(f"Blogger 발행 실패: {e}")
|
|
return False
|
|
|
|
# Search Console 제출
|
|
if post_url:
|
|
submit_to_search_console(post_url, creds)
|
|
|
|
# 발행 이력 저장
|
|
log_published(article, post_result)
|
|
|
|
# 발행 완료된 topic 파일 정리
|
|
_cleanup_published_topic(article)
|
|
|
|
# Telegram 알림
|
|
title = article.get('title', '')
|
|
corner = article.get('corner', '')
|
|
send_telegram(
|
|
f"✅ <b>발행 완료!</b>\n\n"
|
|
f"📌 <b>{title}</b>\n"
|
|
f"코너: {corner}\n"
|
|
f"URL: {post_url}"
|
|
)
|
|
|
|
return True
|
|
|
|
|
|
def approve_pending(filepath: str) -> bool:
|
|
"""수동 검토 대기 글 승인 후 발행"""
|
|
try:
|
|
article = load_pending_review_file(filepath)
|
|
article.pop('pending_reason', None)
|
|
article.pop('created_at', None)
|
|
|
|
# 안전장치 우회하여 강제 발행
|
|
body_html, toc_html = markdown_to_html(article.get('body', ''))
|
|
body_html = insert_adsense_placeholders(body_html)
|
|
full_html = build_full_html(article, body_html, toc_html)
|
|
|
|
creds = get_google_credentials()
|
|
post_result = publish_to_blogger(article, full_html, creds)
|
|
post_url = post_result.get('url', '')
|
|
log_published(article, post_result)
|
|
|
|
# 대기 파일 삭제
|
|
Path(filepath).unlink(missing_ok=True)
|
|
|
|
send_telegram(
|
|
f"✅ <b>[수동 승인] 발행 완료!</b>\n\n"
|
|
f"📌 {article.get('title', '')}\n"
|
|
f"URL: {post_url}"
|
|
)
|
|
logger.info(f"수동 승인 발행 완료: {post_url}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"승인 발행 실패: {e}")
|
|
return False
|
|
|
|
|
|
def reject_pending(filepath: str):
|
|
"""수동 검토 대기 글 거부 (파일 삭제)"""
|
|
try:
|
|
article = load_pending_review_file(filepath)
|
|
Path(filepath).unlink(missing_ok=True)
|
|
send_telegram(f"🗑 <b>[거부]</b> {article.get('title', '')} — 폐기됨")
|
|
logger.info(f"수동 검토 거부: {filepath}")
|
|
except Exception as e:
|
|
logger.error(f"거부 처리 실패: {e}")
|
|
|
|
|
|
def get_pending_list() -> list[dict]:
|
|
"""수동 검토 대기 목록 반환"""
|
|
pending_dir = DATA_DIR / 'pending_review'
|
|
pending_dir.mkdir(exist_ok=True)
|
|
result = []
|
|
for f in sorted(pending_dir.glob('*_pending.json')):
|
|
try:
|
|
data = json.loads(f.read_text(encoding='utf-8'))
|
|
data['_filepath'] = str(f)
|
|
result.append(data)
|
|
except Exception:
|
|
pass
|
|
return result
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# 테스트용: 샘플 아티클 발행 시도
|
|
sample = {
|
|
'title': '테스트 글',
|
|
'meta': '테스트 메타 설명',
|
|
'slug': 'test-article',
|
|
'tags': ['테스트', 'AI'],
|
|
'corner': '쉬운세상',
|
|
'body': '## 제목\n\n본문 내용입니다.\n\n## 결론\n\n마무리입니다.',
|
|
'coupang_keywords': ['키보드'],
|
|
'sources': [
|
|
{'url': 'https://example.com/1', 'title': '출처1', 'date': '2026-03-24'},
|
|
{'url': 'https://example.com/2', 'title': '출처2', 'date': '2026-03-24'},
|
|
],
|
|
'disclaimer': '',
|
|
'quality_score': 80,
|
|
}
|
|
result = publish(sample)
|
|
print('발행 결과:', result)
|