""" app/ai/morning.py 장 전 데이터 수집 스크립트 (claude_morning 헬퍼) Claude Code headless가 이 스크립트를 실행해 데이터를 수집한 뒤, 수집 결과를 직접 분석해 daily_context.json을 작성한다. 실행: python app/ai/morning.py # 수집 + 파일 저장 python app/ai/morning.py --print # 수집 결과를 stdout으로 출력 (Claude 분석용) """ import asyncio import html as html_lib import json import logging import os import re import sys import xml.etree.ElementTree as ET from datetime import datetime from pathlib import Path def _load_env(): env_path = Path(".env") if not env_path.exists(): return with open(env_path, encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#") or "=" not in line: continue k, _, v = line.partition("=") k = k.strip() v = v.strip() if " #" in v: v = v[: v.index(" #")] v = v.strip().strip('"').strip("'") if k and v and k not in os.environ: os.environ[k] = v _load_env() ROOT = Path(__file__).parent.parent.parent if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) os.makedirs("logs", exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.StreamHandler(sys.stderr), # stdout은 Claude 분석용으로 비워둠 logging.FileHandler("logs/stockbot.log", encoding="utf-8"), ], ) logger = logging.getLogger(__name__) import aiohttp from app.execution.kis_client import KISClient from app.monitor.notifier import send as discord_send TODAY = datetime.now().strftime("%Y-%m-%d") NEWS_PATH = f"data/news/{TODAY}.json" MARKET_PATH = f"data/market/{TODAY}.json" _HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0.0.0 Safari/537.36" ), "Accept-Language": "ko-KR,ko;q=0.9", } # 경제/증권 RSS 피드 (소스명, URL) RSS_FEEDS = [ ("한경_증권", "https://www.hankyung.com/feed/finance"), ("한경_경제", "https://www.hankyung.com/feed/economy"), ("연합뉴스_경제", "https://www.yna.co.kr/rss/economy.xml"), ("매경_증권", "https://www.mk.co.kr/rss/30000001/"), ] # ── RSS 뉴스 수집 ───────────────────────────────────────────────────────────── async def fetch_rss_news() -> list[str]: """4개 경제지 RSS에서 증권/경제 헤드라인 수집 (소스당 최대 20건)""" headlines: list[str] = [] seen: set[str] = set() async with aiohttp.ClientSession(headers=_HEADERS) as session: for source_name, url in RSS_FEEDS: try: async with session.get( url, timeout=aiohttp.ClientTimeout(total=10) ) as resp: raw = await resp.read() try: text = raw.decode("utf-8") except UnicodeDecodeError: text = raw.decode("euc-kr", errors="replace") root = ET.fromstring(text) count = 0 for item in root.findall(".//item"): el = item.find("title") if el is None or not el.text: continue title = html_lib.unescape(el.text.strip()) title = re.sub(r"", "", title).strip() title = re.sub(r"<[^>]+>", "", title).strip() if len(title) >= 10 and title not in seen: seen.add(title) headlines.append(title) count += 1 if count >= 20: break logger.info(f"RSS [{source_name}] {count}건") except Exception as e: logger.warning(f"RSS 실패 [{source_name}]: {e}") logger.info(f"RSS 뉴스 총 {len(headlines)}건 수집") return headlines # ── 네이버 종목별 뉴스 ──────────────────────────────────────────────────────── async def fetch_stock_news_naver(stocks: list[dict]) -> dict: """네이버 검색 API로 거래량 상위 종목별 최신 뉴스 5건 수집""" client_id = os.getenv("NAVER_CLIENT_ID", "") client_secret = os.getenv("NAVER_CLIENT_SECRET", "") if not client_id or not client_secret: logger.warning("NAVER API 키 없음 — 종목별 뉴스 스킵") return {} naver_headers = { "X-Naver-Client-Id": client_id, "X-Naver-Client-Secret": client_secret, } result: dict = {} async with aiohttp.ClientSession(headers=naver_headers) as session: for stock in stocks[:20]: ticker = stock.get("ticker", "") name = stock.get("name", "") if not name or name == ticker: continue try: params = {"query": name, "display": 5, "sort": "date"} async with session.get( "https://openapi.naver.com/v1/search/news.json", params=params, timeout=aiohttp.ClientTimeout(total=5), ) as resp: data = await resp.json() items = data.get("items", []) news_list = [ html_lib.unescape(re.sub(r"<[^>]+>", "", item.get("title", ""))) for item in items if item.get("title") ] if news_list: result[ticker] = {"name": name, "headlines": news_list} await asyncio.sleep(0.11) # 초당 9건 이하 유지 except Exception as e: logger.warning(f"종목 뉴스 실패 [{name}]: {e}") logger.info(f"종목별 뉴스 {len(result)}개 종목 수집") return result # ── KIS 시장 데이터 ─────────────────────────────────────────────────────────── async def fetch_market_data(kis: KISClient) -> dict: data: dict = { "volume_rank": [], "foreign_buy": [], "institution_buy": [], "sectors": [], } try: data["volume_rank"] = await kis.get_volume_rank(top_n=30) logger.info(f"거래량 순위 {len(data['volume_rank'])}종목") await asyncio.sleep(1.1) except Exception as e: logger.warning(f"거래량 순위 실패: {e}") try: fi = await kis.get_foreign_institution_rank(top_n=20) data["foreign_buy"] = fi["foreign"][:10] data["institution_buy"] = fi["institution"][:10] logger.info("외국인/기관 수급 수집") await asyncio.sleep(1.1) except Exception as e: logger.warning(f"수급 데이터 실패: {e}") try: data["sectors"] = await kis.get_sector_trend() logger.info(f"업종 동향 {len(data['sectors'])}개") await asyncio.sleep(1.1) except Exception as e: logger.warning(f"업종 동향 실패: {e}") return data # ── 메인 ───────────────────────────────────────────────────────────────────── async def main(print_mode: bool = False): logger.info(f"데이터 수집 시작 [{TODAY}]") for d in ["data/news", "data/market"]: os.makedirs(d, exist_ok=True) # 1. RSS 뉴스 수집 (4개 언론사) news = await fetch_rss_news() # 2. KIS 시장 데이터 — 데이터 수집 전용이므로 실거래 API 사용 (주문 없음) _orig_mock = os.environ.get("KIS_MOCK", "true") os.environ["KIS_MOCK"] = "false" kis = KISClient() os.environ["KIS_MOCK"] = _orig_mock market: dict = {"volume_rank": [], "foreign_buy": [], "institution_buy": [], "sectors": []} try: await kis.get_access_token() market = await fetch_market_data(kis) except Exception as e: logger.warning(f"KIS 수집 실패: {e}") # 3. 네이버 종목별 뉴스 (거래량 상위 20종목) stock_news = await fetch_stock_news_naver(market["volume_rank"]) # 파일 저장 Path(NEWS_PATH).write_text( json.dumps({"date": TODAY, "headlines": news}, ensure_ascii=False, indent=2), encoding="utf-8", ) Path(MARKET_PATH).write_text( json.dumps({"date": TODAY, **market, "stock_news": stock_news}, ensure_ascii=False, indent=2), encoding="utf-8", ) logger.info(f"수집 완료 → {NEWS_PATH}, {MARKET_PATH}") if print_mode: print(json.dumps( { "date": TODAY, "news_headlines": news, # RSS 전체 (~80건) "volume_rank": market["volume_rank"][:20], "foreign_buy_top10": market["foreign_buy"], "institution_buy_top10": market["institution_buy"], "sectors": market["sectors"][:15], "stock_news": stock_news, # 종목별 뉴스 }, ensure_ascii=False, indent=2, )) async def send_discord(): """daily_context.json을 읽어 Discord로 장전 분석 요약 전송""" ctx_path = Path("data/daily_context.json") if not ctx_path.exists(): logger.error("daily_context.json 없음 — Discord 전송 스킵") return ctx = json.loads(ctx_path.read_text(encoding="utf-8")) hot = ", ".join(ctx.get("hot_sectors", [])) or "없음" avoid = ", ".join(ctx.get("avoid_sectors", [])) or "없음" boosted = ", ".join(ctx.get("boosted_tickers", [])) or "없음" flag = "✅ 거래허용" if ctx.get("trade_allowed", True) else "🚫 거래중단" msg = ( f"[장전분석] {ctx['date']} {ctx.get('generated_at', '')}\n" f"시장: {ctx['market_sentiment']}({ctx['sentiment_score']}점) | 리스크: {ctx['risk_level']} | {flag}\n" f"주목 섹터: {hot}\n" f"회피 섹터: {avoid}\n" f"관심 종목: {boosted}\n" f"비중 배율: x{ctx.get('position_size_multiplier', 1.0)}\n" f"📝 {ctx.get('reason', '')}" ) await discord_send(msg) logger.info("Discord 장전 분석 전송 완료") print("Discord 알림 전송 완료.") if __name__ == "__main__": print_mode = "--print" in sys.argv discord_mode = "--send-discord" in sys.argv if discord_mode: asyncio.run(send_discord()) else: asyncio.run(main(print_mode=print_mode))