diff --git a/app/ai/morning.py b/app/ai/morning.py index cebef38..20a2dd0 100644 --- a/app/ai/morning.py +++ b/app/ai/morning.py @@ -1,15 +1,13 @@ """ app/ai/morning.py -장 전 분석 스크립트 (08:30 실행) +장 전 데이터 수집 스크립트 (claude_morning 헬퍼) -1. 네이버 금융 뉴스 크롤링 → data/news/YYYY-MM-DD.json -2. KIS API 수급/지수 수집 → data/market/YYYY-MM-DD.json -3. Claude AI 분석 → data/daily_context.json -4. Discord 알림 +Claude Code headless가 이 스크립트를 실행해 데이터를 수집한 뒤, +수집 결과를 직접 분석해 daily_context.json을 작성한다. 실행: - python app/ai/morning.py - python -m app.ai.morning + python app/ai/morning.py # 수집 + 파일 저장 + python app/ai/morning.py --print # 수집 결과를 stdout으로 출력 (Claude 분석용) """ import asyncio @@ -22,7 +20,6 @@ from datetime import datetime from pathlib import Path -# ── .env 로드 ────────────────────────────────────────────────────────────── def _load_env(): env_path = Path(".env") if not env_path.exists(): @@ -53,34 +50,21 @@ logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ - logging.StreamHandler(sys.stdout), + logging.StreamHandler(sys.stderr), # stdout은 Claude 분석용으로 비워둠 logging.FileHandler("logs/stockbot.log", encoding="utf-8"), ], ) logger = logging.getLogger(__name__) -# 외부 임포트는 .env 로드 후 import aiohttp from bs4 import BeautifulSoup - -try: - import anthropic as _anthropic_mod - _ANTHROPIC_OK = True -except ImportError: - _ANTHROPIC_OK = False - logger.warning("anthropic 패키지 없음 → pip install anthropic") - from app.execution.kis_client import KISClient -from app.monitor.notifier import send -from app.config import AI_CONTEXT_PATH -# ── 상수 ─────────────────────────────────────────────────────────────────── TODAY = datetime.now().strftime("%Y-%m-%d") NEWS_PATH = f"data/news/{TODAY}.json" MARKET_PATH = f"data/market/{TODAY}.json" -CLAUDE_MODEL = os.getenv("CLAUDE_MODEL", "claude-sonnet-4-6") -_NEWS_HEADERS = { +_HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " @@ -90,83 +74,60 @@ _NEWS_HEADERS = { "Referer": "https://finance.naver.com/", } -# ── AI fallback ───────────────────────────────────────────────────────────── -_FALLBACK_CONTEXT = { - "date": TODAY, - "generated_at": "", - "trade_allowed": True, - "market_sentiment": "중립", - "sentiment_score": 50, - "risk_level": "보통", - "hot_sectors": [], - "avoid_sectors": [], - "boosted_tickers": [], - "blacklist_tickers": [], - "position_size_multiplier": 0.8, - "reason": "AI 분석 실패 - 기본값 적용", -} - # ── 뉴스 크롤링 ───────────────────────────────────────────────────────────── -async def _fetch_naver_news(session: aiohttp.ClientSession) -> list[str]: - """네이버 금융 메인 뉴스 헤드라인 수집""" - url = "https://finance.naver.com/news/mainnews.naver" - async with session.get( - url, headers=_NEWS_HEADERS, timeout=aiohttp.ClientTimeout(total=15) - ) as resp: - raw = await resp.read() - - try: - html = raw.decode("euc-kr") - except Exception: - html = raw.decode("utf-8", errors="replace") - - soup = BeautifulSoup(html, "html.parser") - - headlines = [] - # 네이버 금융 뉴스 리스트 구조 탐색 - for tag in soup.select("ul.simpleNewsList li a, .newsListBody .articleSubject a, .mainNewsList .articleSubject a"): - text = tag.get_text(strip=True) - if len(text) >= 10: - headlines.append(text) - - # fallback: href에 'news' 포함된 a 태그 - if not headlines: - for tag in soup.find_all("a", href=re.compile(r"news")): - text = tag.get_text(strip=True) - if 10 <= len(text) <= 120: - headlines.append(text) - - # 중복 제거 후 최대 15개 - seen: set[str] = set() - result: list[str] = [] - for h in headlines: - if h not in seen: - seen.add(h) - result.append(h) - if len(result) >= 15: - break - - return result - - async def fetch_news() -> list[str]: - """뉴스 크롤링 (실패 시 빈 리스트 반환)""" + url = "https://finance.naver.com/news/mainnews.naver" try: async with aiohttp.ClientSession() as session: - headlines = await _fetch_naver_news(session) - logger.info(f"뉴스 {len(headlines)}건 수집 완료") - return headlines + async with session.get( + url, headers=_HEADERS, timeout=aiohttp.ClientTimeout(total=15) + ) as resp: + raw = await resp.read() + + try: + html = raw.decode("euc-kr") + except Exception: + html = raw.decode("utf-8", errors="replace") + + soup = BeautifulSoup(html, "html.parser") + headlines: list[str] = [] + + for tag in soup.select( + "ul.simpleNewsList li a, " + ".newsListBody .articleSubject a, " + ".mainNewsList .articleSubject a" + ): + text = tag.get_text(strip=True) + if len(text) >= 10: + headlines.append(text) + + if not headlines: + for tag in soup.find_all("a", href=re.compile(r"news")): + text = tag.get_text(strip=True) + if 10 <= len(text) <= 120: + headlines.append(text) + + seen: set[str] = set() + result: list[str] = [] + for h in headlines: + if h not in seen: + seen.add(h) + result.append(h) + if len(result) >= 15: + break + + logger.info(f"뉴스 {len(result)}건 수집") + return result except Exception as e: - logger.warning(f"뉴스 크롤링 실패 (계속 진행): {e}") + logger.warning(f"뉴스 크롤링 실패: {e}") return [] # ── KIS 시장 데이터 ────────────────────────────────────────────────────────── async def fetch_market_data(kis: KISClient) -> dict: - """KIS API로 수급/업종 데이터 수집""" data: dict = { "volume_rank": [], "foreign_buy": [], @@ -174,28 +135,25 @@ async def fetch_market_data(kis: KISClient) -> dict: "sectors": [], } - # 1. 거래량 순위 try: data["volume_rank"] = await kis.get_volume_rank(top_n=20) - logger.info(f"거래량 순위 {len(data['volume_rank'])}종목 수집") + logger.info(f"거래량 순위 {len(data['volume_rank'])}종목") await asyncio.sleep(1.1) except Exception as e: logger.warning(f"거래량 순위 실패: {e}") - # 2. 외국인/기관 순매수 try: fi = await kis.get_foreign_institution_rank(top_n=20) data["foreign_buy"] = fi["foreign"][:10] data["institution_buy"] = fi["institution"][:10] - logger.info("외국인/기관 수급 수집 완료") + logger.info("외국인/기관 수급 수집") await asyncio.sleep(1.1) except Exception as e: logger.warning(f"수급 데이터 실패: {e}") - # 3. 업종 동향 try: data["sectors"] = await kis.get_sector_trend() - logger.info(f"업종 동향 {len(data['sectors'])}개 수집") + logger.info(f"업종 동향 {len(data['sectors'])}개") await asyncio.sleep(1.1) except Exception as e: logger.warning(f"업종 동향 실패: {e}") @@ -203,168 +161,53 @@ async def fetch_market_data(kis: KISClient) -> dict: return data -# ── Claude AI 분석 ─────────────────────────────────────────────────────────── - -def _build_prompt(news: list[str], market: dict) -> str: - news_text = "\n".join(f"- {h}" for h in news) if news else "- (수집 실패)" - - vol_text = "\n".join( - f" {r['rank']}. {r['name']}({r['ticker']}) {r['change_pct']:+.1f}%" - for r in market.get("volume_rank", [])[:10] - ) or " (없음)" - - foreign_text = "\n".join( - f" {r['name']}({r['ticker']}) {r['amount']:+,}주" - for r in market.get("foreign_buy", [])[:5] - if r.get("amount", 0) > 0 - ) or " (없음)" - - sector_text = "\n".join( - f" {s.get('sector', '')} {s.get('change_pct', 0):+.2f}%" - for s in market.get("sectors", [])[:10] - if s.get("sector") - ) or " (없음)" - - now_time = datetime.now().strftime("%H:%M:%S") - - return f"""오늘({TODAY}) 한국 주식시장 장 전 분석을 수행하고 JSON만 반환해주세요. - -## 뉴스 헤드라인 -{news_text} - -## 거래량 상위 종목 (전일 등락률) -{vol_text} - -## 외국인 순매수 상위 -{foreign_text} - -## 업종별 동향 -{sector_text} - -다음 JSON 형식으로만 응답하세요 (마크다운 코드블록 없이): -{{ - "date": "{TODAY}", - "generated_at": "{now_time}", - "trade_allowed": true, - "market_sentiment": "강세 또는 중립 또는 약세", - "sentiment_score": 0~100 정수, - "risk_level": "낮음 또는 보통 또는 높음", - "hot_sectors": ["주목 섹터"], - "avoid_sectors": ["회피 섹터"], - "boosted_tickers": ["거래량+외국인 동시 상위 종목코드"], - "blacklist_tickers": ["악재 종목코드"], - "position_size_multiplier": 0.5~1.5 소수, - "reason": "50자 이내 시장 요약" -}}""" - - -def analyze_with_claude(news: list[str], market: dict) -> dict: - """Claude API로 시장 분석 → daily_context dict 반환""" - if not _ANTHROPIC_OK: - raise RuntimeError("anthropic 패키지 미설치") - - api_key = os.getenv("ANTHROPIC_API_KEY", "") - if not api_key: - raise RuntimeError("ANTHROPIC_API_KEY 환경변수 미설정") - - client = _anthropic_mod.Anthropic(api_key=api_key) - prompt = _build_prompt(news, market) - - resp = client.messages.create( - model=CLAUDE_MODEL, - max_tokens=600, - messages=[{"role": "user", "content": prompt}], - ) - - raw = resp.content[0].text.strip() - - # 코드블록 감싸진 경우 추출 - match = re.search(r"```(?:json)?\s*([\s\S]+?)```", raw) - if match: - raw = match.group(1).strip() - - ctx = json.loads(raw) - - # 필수 필드 기본값 보완 - ctx.setdefault("date", TODAY) - ctx.setdefault("generated_at", datetime.now().strftime("%H:%M:%S")) - ctx.setdefault("trade_allowed", True) - ctx.setdefault("position_size_multiplier", 1.0) - - return ctx - - # ── 메인 ───────────────────────────────────────────────────────────────────── -async def main(): - logger.info("=" * 40) - logger.info(f"claude_morning 시작 [{TODAY}]") +async def main(print_mode: bool = False): + logger.info(f"데이터 수집 시작 [{TODAY}]") for d in ["data/news", "data/market"]: os.makedirs(d, exist_ok=True) - # 1. 뉴스 수집 - logger.info("[1/4] 뉴스 크롤링...") + # 뉴스 수집 news = await fetch_news() Path(NEWS_PATH).write_text( json.dumps({"date": TODAY, "headlines": news}, ensure_ascii=False, indent=2), encoding="utf-8", ) - # 2. KIS 시장 데이터 - logger.info("[2/4] KIS 시장 데이터 수집...") + # KIS 시장 데이터 kis = KISClient() + market: dict = {"volume_rank": [], "foreign_buy": [], "institution_buy": [], "sectors": []} try: await kis.get_access_token() market = await fetch_market_data(kis) except Exception as e: - logger.warning(f"KIS 데이터 수집 실패 (계속 진행): {e}") - market = {"volume_rank": [], "foreign_buy": [], "institution_buy": [], "sectors": []} + logger.warning(f"KIS 수집 실패: {e}") Path(MARKET_PATH).write_text( json.dumps({"date": TODAY, **market}, ensure_ascii=False, indent=2), encoding="utf-8", ) - # 3. Claude AI 분석 - logger.info("[3/4] Claude AI 분석...") - try: - ctx = analyze_with_claude(news, market) - logger.info( - f"AI 분석 완료: {ctx['market_sentiment']}({ctx['sentiment_score']}점) " - f"리스크={ctx['risk_level']}" - ) - except Exception as e: - logger.error(f"AI 분석 실패: {e} → fallback 적용") - ctx = {**_FALLBACK_CONTEXT, "generated_at": datetime.now().strftime("%H:%M:%S"), "reason": f"AI 분석 실패 - {e}"} + logger.info(f"수집 완료 → {NEWS_PATH}, {MARKET_PATH}") - # 4. daily_context.json 저장 - Path(AI_CONTEXT_PATH).write_text( - json.dumps(ctx, ensure_ascii=False, indent=2), - encoding="utf-8", - ) - logger.info(f"[4/4] daily_context.json 저장: {AI_CONTEXT_PATH}") - - # 5. Discord 알림 - hot = ", ".join(ctx.get("hot_sectors", [])) or "없음" - avoid = ", ".join(ctx.get("avoid_sectors", [])) or "없음" - boosted = ", ".join(ctx.get("boosted_tickers", [])) or "없음" - trade_flag = "✅ 거래허용" if ctx.get("trade_allowed", True) else "🚫 거래중단" - - msg = ( - f"[장전분석] {TODAY} {ctx.get('generated_at', '')}\n" - f"시장: {ctx['market_sentiment']}({ctx['sentiment_score']}점) " - f"| 리스크: {ctx['risk_level']} | {trade_flag}\n" - f"주목 섹터: {hot}\n" - f"회피 섹터: {avoid}\n" - f"관심 종목: {boosted}\n" - f"비중 배율: ×{ctx.get('position_size_multiplier', 1.0)}\n" - f"📝 {ctx.get('reason', '')}" - ) - await send(msg) - - logger.info("claude_morning 완료") + if print_mode: + # Claude Code가 읽을 수 있도록 stdout으로 출력 + print(json.dumps( + { + "date": TODAY, + "news_headlines": news, + "volume_rank": market["volume_rank"][:10], + "foreign_buy_top5": market["foreign_buy"][:5], + "institution_buy_top5": market["institution_buy"][:5], + "sectors": market["sectors"][:15], + }, + ensure_ascii=False, + indent=2, + )) if __name__ == "__main__": - asyncio.run(main()) + print_mode = "--print" in sys.argv + asyncio.run(main(print_mode=print_mode)) diff --git a/app/main.py b/app/main.py index 335d862..f6776e3 100644 --- a/app/main.py +++ b/app/main.py @@ -450,13 +450,8 @@ async def run(): ) bot.risk.set_risk_level(ctx.get("risk_level", "보통")) - # 08:30 장전 분석 → 유니버스 갱신 + # 08:30 유니버스 갱신 (claude_morning은 Claude Code headless가 별도 실행) elif now == "08:30": - try: - from app.ai.morning import main as claude_morning - await claude_morning() - except Exception as e: - logger.error(f"claude_morning 실패 (계속 진행): {e}") await bot.update_universe() # 08:50 목표가 계산 diff --git a/app/requirements.txt b/app/requirements.txt index 965e21b..90ee012 100644 --- a/app/requirements.txt +++ b/app/requirements.txt @@ -1,5 +1,4 @@ aiohttp==3.9.5 -anthropic>=0.40.0 python-dotenv==1.0.1 APScheduler==3.10.4 beautifulsoup4==4.12.3