""" app/ai/morning.py 장 전 분석 스크립트 (08:30 실행) 1. 네이버 금융 뉴스 크롤링 → data/news/YYYY-MM-DD.json 2. KIS API 수급/지수 수집 → data/market/YYYY-MM-DD.json 3. Claude AI 분석 → data/daily_context.json 4. Discord 알림 실행: python app/ai/morning.py python -m app.ai.morning """ import asyncio import json import logging import os import re import sys from datetime import datetime from pathlib import Path # ── .env 로드 ────────────────────────────────────────────────────────────── def _load_env(): env_path = Path(".env") if not env_path.exists(): return with open(env_path, encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#") or "=" not in line: continue k, _, v = line.partition("=") k = k.strip() v = v.strip() if " #" in v: v = v[: v.index(" #")] v = v.strip().strip('"').strip("'") if k and v and k not in os.environ: os.environ[k] = v _load_env() ROOT = Path(__file__).parent.parent.parent if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) os.makedirs("logs", exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler("logs/stockbot.log", encoding="utf-8"), ], ) logger = logging.getLogger(__name__) # 외부 임포트는 .env 로드 후 import aiohttp from bs4 import BeautifulSoup try: import anthropic as _anthropic_mod _ANTHROPIC_OK = True except ImportError: _ANTHROPIC_OK = False logger.warning("anthropic 패키지 없음 → pip install anthropic") from app.execution.kis_client import KISClient from app.monitor.notifier import send from app.config import AI_CONTEXT_PATH # ── 상수 ─────────────────────────────────────────────────────────────────── TODAY = datetime.now().strftime("%Y-%m-%d") NEWS_PATH = f"data/news/{TODAY}.json" MARKET_PATH = f"data/market/{TODAY}.json" CLAUDE_MODEL = os.getenv("CLAUDE_MODEL", "claude-sonnet-4-6") _NEWS_HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0.0.0 Safari/537.36" ), "Accept-Language": "ko-KR,ko;q=0.9", "Referer": "https://finance.naver.com/", } # ── AI fallback ───────────────────────────────────────────────────────────── _FALLBACK_CONTEXT = { "date": TODAY, "generated_at": "", "trade_allowed": True, "market_sentiment": "중립", "sentiment_score": 50, "risk_level": "보통", "hot_sectors": [], "avoid_sectors": [], "boosted_tickers": [], "blacklist_tickers": [], "position_size_multiplier": 0.8, "reason": "AI 분석 실패 - 기본값 적용", } # ── 뉴스 크롤링 ───────────────────────────────────────────────────────────── async def _fetch_naver_news(session: aiohttp.ClientSession) -> list[str]: """네이버 금융 메인 뉴스 헤드라인 수집""" url = "https://finance.naver.com/news/mainnews.naver" async with session.get( url, headers=_NEWS_HEADERS, timeout=aiohttp.ClientTimeout(total=15) ) as resp: raw = await resp.read() try: html = raw.decode("euc-kr") except Exception: html = raw.decode("utf-8", errors="replace") soup = BeautifulSoup(html, "html.parser") headlines = [] # 네이버 금융 뉴스 리스트 구조 탐색 for tag in soup.select("ul.simpleNewsList li a, .newsListBody .articleSubject a, .mainNewsList .articleSubject a"): text = tag.get_text(strip=True) if len(text) >= 10: headlines.append(text) # fallback: href에 'news' 포함된 a 태그 if not headlines: for tag in soup.find_all("a", href=re.compile(r"news")): text = tag.get_text(strip=True) if 10 <= len(text) <= 120: headlines.append(text) # 중복 제거 후 최대 15개 seen: set[str] = set() result: list[str] = [] for h in headlines: if h not in seen: seen.add(h) result.append(h) if len(result) >= 15: break return result async def fetch_news() -> list[str]: """뉴스 크롤링 (실패 시 빈 리스트 반환)""" try: async with aiohttp.ClientSession() as session: headlines = await _fetch_naver_news(session) logger.info(f"뉴스 {len(headlines)}건 수집 완료") return headlines except Exception as e: logger.warning(f"뉴스 크롤링 실패 (계속 진행): {e}") return [] # ── KIS 시장 데이터 ────────────────────────────────────────────────────────── async def fetch_market_data(kis: KISClient) -> dict: """KIS API로 수급/업종 데이터 수집""" data: dict = { "volume_rank": [], "foreign_buy": [], "institution_buy": [], "sectors": [], } # 1. 거래량 순위 try: data["volume_rank"] = await kis.get_volume_rank(top_n=20) logger.info(f"거래량 순위 {len(data['volume_rank'])}종목 수집") await asyncio.sleep(1.1) except Exception as e: logger.warning(f"거래량 순위 실패: {e}") # 2. 외국인/기관 순매수 try: fi = await kis.get_foreign_institution_rank(top_n=20) data["foreign_buy"] = fi["foreign"][:10] data["institution_buy"] = fi["institution"][:10] logger.info("외국인/기관 수급 수집 완료") await asyncio.sleep(1.1) except Exception as e: logger.warning(f"수급 데이터 실패: {e}") # 3. 업종 동향 try: data["sectors"] = await kis.get_sector_trend() logger.info(f"업종 동향 {len(data['sectors'])}개 수집") await asyncio.sleep(1.1) except Exception as e: logger.warning(f"업종 동향 실패: {e}") return data # ── Claude AI 분석 ─────────────────────────────────────────────────────────── def _build_prompt(news: list[str], market: dict) -> str: news_text = "\n".join(f"- {h}" for h in news) if news else "- (수집 실패)" vol_text = "\n".join( f" {r['rank']}. {r['name']}({r['ticker']}) {r['change_pct']:+.1f}%" for r in market.get("volume_rank", [])[:10] ) or " (없음)" foreign_text = "\n".join( f" {r['name']}({r['ticker']}) {r['amount']:+,}주" for r in market.get("foreign_buy", [])[:5] if r.get("amount", 0) > 0 ) or " (없음)" sector_text = "\n".join( f" {s.get('sector', '')} {s.get('change_pct', 0):+.2f}%" for s in market.get("sectors", [])[:10] if s.get("sector") ) or " (없음)" now_time = datetime.now().strftime("%H:%M:%S") return f"""오늘({TODAY}) 한국 주식시장 장 전 분석을 수행하고 JSON만 반환해주세요. ## 뉴스 헤드라인 {news_text} ## 거래량 상위 종목 (전일 등락률) {vol_text} ## 외국인 순매수 상위 {foreign_text} ## 업종별 동향 {sector_text} 다음 JSON 형식으로만 응답하세요 (마크다운 코드블록 없이): {{ "date": "{TODAY}", "generated_at": "{now_time}", "trade_allowed": true, "market_sentiment": "강세 또는 중립 또는 약세", "sentiment_score": 0~100 정수, "risk_level": "낮음 또는 보통 또는 높음", "hot_sectors": ["주목 섹터"], "avoid_sectors": ["회피 섹터"], "boosted_tickers": ["거래량+외국인 동시 상위 종목코드"], "blacklist_tickers": ["악재 종목코드"], "position_size_multiplier": 0.5~1.5 소수, "reason": "50자 이내 시장 요약" }}""" def analyze_with_claude(news: list[str], market: dict) -> dict: """Claude API로 시장 분석 → daily_context dict 반환""" if not _ANTHROPIC_OK: raise RuntimeError("anthropic 패키지 미설치") api_key = os.getenv("ANTHROPIC_API_KEY", "") if not api_key: raise RuntimeError("ANTHROPIC_API_KEY 환경변수 미설정") client = _anthropic_mod.Anthropic(api_key=api_key) prompt = _build_prompt(news, market) resp = client.messages.create( model=CLAUDE_MODEL, max_tokens=600, messages=[{"role": "user", "content": prompt}], ) raw = resp.content[0].text.strip() # 코드블록 감싸진 경우 추출 match = re.search(r"```(?:json)?\s*([\s\S]+?)```", raw) if match: raw = match.group(1).strip() ctx = json.loads(raw) # 필수 필드 기본값 보완 ctx.setdefault("date", TODAY) ctx.setdefault("generated_at", datetime.now().strftime("%H:%M:%S")) ctx.setdefault("trade_allowed", True) ctx.setdefault("position_size_multiplier", 1.0) return ctx # ── 메인 ───────────────────────────────────────────────────────────────────── async def main(): logger.info("=" * 40) logger.info(f"claude_morning 시작 [{TODAY}]") for d in ["data/news", "data/market"]: os.makedirs(d, exist_ok=True) # 1. 뉴스 수집 logger.info("[1/4] 뉴스 크롤링...") news = await fetch_news() Path(NEWS_PATH).write_text( json.dumps({"date": TODAY, "headlines": news}, ensure_ascii=False, indent=2), encoding="utf-8", ) # 2. KIS 시장 데이터 logger.info("[2/4] KIS 시장 데이터 수집...") kis = KISClient() try: await kis.get_access_token() market = await fetch_market_data(kis) except Exception as e: logger.warning(f"KIS 데이터 수집 실패 (계속 진행): {e}") market = {"volume_rank": [], "foreign_buy": [], "institution_buy": [], "sectors": []} Path(MARKET_PATH).write_text( json.dumps({"date": TODAY, **market}, ensure_ascii=False, indent=2), encoding="utf-8", ) # 3. Claude AI 분석 logger.info("[3/4] Claude AI 분석...") try: ctx = analyze_with_claude(news, market) logger.info( f"AI 분석 완료: {ctx['market_sentiment']}({ctx['sentiment_score']}점) " f"리스크={ctx['risk_level']}" ) except Exception as e: logger.error(f"AI 분석 실패: {e} → fallback 적용") ctx = {**_FALLBACK_CONTEXT, "generated_at": datetime.now().strftime("%H:%M:%S"), "reason": f"AI 분석 실패 - {e}"} # 4. daily_context.json 저장 Path(AI_CONTEXT_PATH).write_text( json.dumps(ctx, ensure_ascii=False, indent=2), encoding="utf-8", ) logger.info(f"[4/4] daily_context.json 저장: {AI_CONTEXT_PATH}") # 5. Discord 알림 hot = ", ".join(ctx.get("hot_sectors", [])) or "없음" avoid = ", ".join(ctx.get("avoid_sectors", [])) or "없음" boosted = ", ".join(ctx.get("boosted_tickers", [])) or "없음" trade_flag = "✅ 거래허용" if ctx.get("trade_allowed", True) else "🚫 거래중단" msg = ( f"[장전분석] {TODAY} {ctx.get('generated_at', '')}\n" f"시장: {ctx['market_sentiment']}({ctx['sentiment_score']}점) " f"| 리스크: {ctx['risk_level']} | {trade_flag}\n" f"주목 섹터: {hot}\n" f"회피 섹터: {avoid}\n" f"관심 종목: {boosted}\n" f"비중 배율: ×{ctx.get('position_size_multiplier', 1.0)}\n" f"📝 {ctx.get('reason', '')}" ) await send(msg) logger.info("claude_morning 완료") if __name__ == "__main__": asyncio.run(main())