""" Collect daily Korean-market features for model training. Outputs: data/external/daily/YYYYMMDD/stocks.csv data/external/daily/YYYYMMDD/indexes.csv """ import argparse import asyncio import csv import os import sys from datetime import datetime, timedelta from pathlib import Path import pandas as pd ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT)) OUT_ROOT = ROOT / "data" / "external" / "daily" ETF_KEYWORDS = ( "인버스", "레버리지", "선물", "KODEX", "TIGER", "KBSTAR", "HANARO", "ARIRANG", "KOSEF", "SOL", "ACE", "RISE", "PLUS", ) def _yyyymmdd(date_text: str | None) -> str: if date_text: return date_text.replace("-", "") return datetime.now().strftime("%Y%m%d") def _standardize_stock_ohlcv(df: pd.DataFrame, date_yyyymmdd: str) -> pd.DataFrame: df = df.reset_index() columns = list(df.columns) rename = {columns[0]: "ticker"} standard = ["open", "high", "low", "close", "volume", "amount", "change_pct"] for source, target in zip(columns[1:], standard): rename[source] = target df = df.rename(columns=rename) df.insert(0, "date", date_yyyymmdd) return df[[c for c in ["date", "ticker", *standard] if c in df.columns]] def _standardize_index_row(row: dict, date_yyyymmdd: str, code: str, name: str) -> dict: values = list(row.values()) keys = ["open", "high", "low", "close", "volume", "amount", "change_pct"] out = {"date": date_yyyymmdd, "code": code, "name": name} for key, value in zip(keys, values): out[key] = value return out def _is_etf(ticker: str, name: str) -> bool: if ticker.startswith("Q") or len(ticker) != 6: return True return any(keyword in name for keyword in ETF_KEYWORDS) def _write_csv(path: Path, rows: list[dict], fieldnames: list[str]): path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8-sig", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(rows) def collect_with_pykrx(date_yyyymmdd: str, out_dir: Path): try: from pykrx import stock except ImportError as exc: raise RuntimeError("pykrx is not installed. Install requirements first.") from exc stocks_raw = stock.get_market_ohlcv_by_ticker(date_yyyymmdd, market="ALL") stocks = _standardize_stock_ohlcv(stocks_raw, date_yyyymmdd) stocks.to_csv(out_dir / "stocks.csv", index=False, encoding="utf-8-sig") index_rows = [] for code, name in (("1001", "KOSPI"), ("2001", "KOSDAQ")): try: df = stock.get_index_ohlcv_by_date(date_yyyymmdd, date_yyyymmdd, code) if not df.empty: index_rows.append(_standardize_index_row(df.iloc[-1].to_dict(), date_yyyymmdd, code, name)) except Exception as exc: print(f"index fetch failed {name}: {exc}", file=sys.stderr) pd.DataFrame(index_rows).to_csv(out_dir / "indexes.csv", index=False, encoding="utf-8-sig") return len(stocks), len(index_rows) async def collect_with_kis(date_yyyymmdd: str, out_dir: Path, top: int): from app.main import load_env from app.execution.kis_client import KISClient load_env() os.environ["KIS_MOCK"] = "false" kis = KISClient() await kis.get_access_token() rank = await kis.get_volume_rank(top_n=top * 2) tickers = [ r["ticker"] for r in rank if not _is_etf(r["ticker"], r["name"]) ][:top] end_dt = datetime.strptime(date_yyyymmdd, "%Y%m%d") start = (end_dt - timedelta(days=14)).strftime("%Y%m%d") stock_rows = [] for ticker in tickers: try: rows = await kis.get_ohlcv_daily(ticker, start=start, end=date_yyyymmdd) except Exception as exc: print(f"daily fetch failed {ticker}: {exc}", file=sys.stderr) continue for row in rows: stock_rows.append({ "date": row["date"], "ticker": ticker, "open": row["open"], "high": row["high"], "low": row["low"], "close": row["close"], "volume": row["volume"], "amount": row["close"] * row["volume"], }) await asyncio.sleep(0.25) _write_csv( out_dir / "stocks.csv", stock_rows, ["date", "ticker", "open", "high", "low", "close", "volume", "amount"], ) _write_csv( out_dir / "indexes.csv", [], ["date", "code", "name", "open", "high", "low", "close", "volume", "amount", "change_pct"], ) return len(stock_rows), 0 def main(): parser = argparse.ArgumentParser() parser.add_argument("--date", help="YYYY-MM-DD or YYYYMMDD. Defaults to today.") parser.add_argument("--top", type=int, default=30) args = parser.parse_args() date_yyyymmdd = _yyyymmdd(args.date) out_dir = OUT_ROOT / date_yyyymmdd out_dir.mkdir(parents=True, exist_ok=True) try: stock_count, index_count = collect_with_pykrx(date_yyyymmdd, out_dir) except Exception as exc: print(f"pykrx daily features failed, falling back to KIS: {exc}", file=sys.stderr) stock_count, index_count = asyncio.run(collect_with_kis(date_yyyymmdd, out_dir, args.top)) print(f"saved daily features: stocks={stock_count}, indexes={index_count}, dir={out_dir}") if __name__ == "__main__": main()