""" Collect KIS intraday minute bars for selected tickers. Usage: python scripts/collect_minute_data.py --tickers 005930,000660 python scripts/collect_minute_data.py --top 30 """ import argparse import asyncio import csv import json import os import sys from datetime import datetime from pathlib import Path ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT)) from app.main import load_env from app.execution.kis_client import KISClient OUT_ROOT = ROOT / "data" / "external" / "minute" DEFAULT_HOURS = ( "093000", "100000", "103000", "110000", "113000", "120000", "123000", "130000", "133000", "140000", ) ETF_KEYWORDS = ( "인버스", "레버리지", "선물", "KODEX", "TIGER", "KBSTAR", "HANARO", "ARIRANG", "KOSEF", "SOL", "ACE", "RISE", "PLUS", ) def _date_dir(date_text: str | None) -> str: if date_text: return date_text.replace("-", "") return datetime.now().strftime("%Y%m%d") def _load_cached_tickers(limit: int) -> list[str]: cache = ROOT / "data" / "universe_cache.json" if not cache.exists(): return [] data = json.loads(cache.read_text(encoding="utf-8")) return list(data.get("tickers", []))[:limit] def _is_etf(ticker: str, name: str) -> bool: if ticker.startswith("Q") or len(ticker) != 6: return True return any(keyword in name for keyword in ETF_KEYWORDS) async def _resolve_tickers(kis: KISClient, args) -> list[str]: if args.tickers: return [t.strip() for t in args.tickers.split(",") if t.strip()] cached = _load_cached_tickers(args.top) if cached: return cached rank = await kis.get_volume_rank(top_n=args.top * 2) tickers = [ r["ticker"] for r in rank if args.include_etf or not _is_etf(r["ticker"], r["name"]) ] return tickers[:args.top] def _write_csv(path: Path, rows: list[dict]): path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", newline="", encoding="utf-8-sig") as f: writer = csv.DictWriter(f, fieldnames=["date", "time", "ticker", "open", "high", "low", "close", "volume"]) writer.writeheader() writer.writerows(rows) async def _collect_ticker_rows(kis: KISClient, ticker: str, hours: list[str], sleep: float) -> list[dict]: by_key = {} for hour in hours: rows = await kis.get_ohlcv_minute(ticker, hour=hour) for row in rows: key = (row.get("date"), row.get("time"), row.get("ticker")) by_key[key] = row await asyncio.sleep(sleep) return sorted(by_key.values(), key=lambda r: (r.get("date", ""), r.get("time", ""))) async def main_async(args): load_env() if args.real_quotes: os.environ["KIS_MOCK"] = "false" kis = KISClient() await kis.get_access_token() tickers = await _resolve_tickers(kis, args) hours = [h.strip() for h in (args.hours or args.hour or "").split(",") if h.strip()] if not hours: hours = list(DEFAULT_HOURS) out_dir = OUT_ROOT / _date_dir(args.date) saved = 0 for ticker in tickers: try: rows = await _collect_ticker_rows(kis, ticker, hours, args.sleep) if rows: _write_csv(out_dir / f"{ticker}.csv", rows) saved += 1 print(f"saved {ticker}: {len(rows)} rows") else: print(f"no rows {ticker}") except Exception as exc: print(f"failed {ticker}: {exc}", file=sys.stderr) print(f"minute collection done: saved={saved}/{len(tickers)}, dir={out_dir}") def main(): parser = argparse.ArgumentParser() parser.add_argument("--date", help="YYYY-MM-DD or YYYYMMDD. Defaults to today.") parser.add_argument("--tickers", help="Comma-separated tickers.") parser.add_argument("--top", type=int, default=30) parser.add_argument("--hour", default="", help="Single KIS upper-bound time HHMMSS.") parser.add_argument("--hours", default=",".join(DEFAULT_HOURS), help="Comma-separated KIS upper-bound times.") parser.add_argument("--sleep", type=float, default=1.1) parser.add_argument("--real-quotes", action="store_true", help="Use real quote API even if .env is mock.") parser.add_argument("--include-etf", action="store_true", help="Include ETF/ETN products in collection.") args = parser.parse_args() asyncio.run(main_async(args)) if __name__ == "__main__": main()