Files
Stock-trading-programming/scripts/collect_minute_data.py
T

103 lines
3.0 KiB
Python
Raw Normal View History

"""
Collect KIS intraday minute bars for selected tickers.
Usage:
python scripts/collect_minute_data.py --tickers 005930,000660
python scripts/collect_minute_data.py --top 30
"""
import argparse
import asyncio
import csv
import json
import os
import sys
from datetime import datetime
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
from app.main import load_env
from app.execution.kis_client import KISClient
OUT_ROOT = ROOT / "data" / "external" / "minute"
def _date_dir(date_text: str | None) -> str:
if date_text:
return date_text.replace("-", "")
return datetime.now().strftime("%Y%m%d")
def _load_cached_tickers(limit: int) -> list[str]:
cache = ROOT / "data" / "universe_cache.json"
if not cache.exists():
return []
data = json.loads(cache.read_text(encoding="utf-8"))
return list(data.get("tickers", []))[:limit]
async def _resolve_tickers(kis: KISClient, args) -> list[str]:
if args.tickers:
return [t.strip() for t in args.tickers.split(",") if t.strip()]
cached = _load_cached_tickers(args.top)
if cached:
return cached
rank = await kis.get_volume_rank(top_n=args.top)
return [r["ticker"] for r in rank]
def _write_csv(path: Path, rows: list[dict]):
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=["date", "time", "ticker", "open", "high", "low", "close", "volume"])
writer.writeheader()
writer.writerows(rows)
async def main_async(args):
load_env()
if args.real_quotes:
os.environ["KIS_MOCK"] = "false"
kis = KISClient()
await kis.get_access_token()
tickers = await _resolve_tickers(kis, args)
out_dir = OUT_ROOT / _date_dir(args.date)
saved = 0
for ticker in tickers:
try:
rows = await kis.get_ohlcv_minute(ticker, hour=args.hour)
if rows:
_write_csv(out_dir / f"{ticker}.csv", rows)
saved += 1
print(f"saved {ticker}: {len(rows)} rows")
else:
print(f"no rows {ticker}")
except Exception as exc:
print(f"failed {ticker}: {exc}", file=sys.stderr)
await asyncio.sleep(args.sleep)
print(f"minute collection done: saved={saved}/{len(tickers)}, dir={out_dir}")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--date", help="YYYY-MM-DD or YYYYMMDD. Defaults to today.")
parser.add_argument("--tickers", help="Comma-separated tickers.")
parser.add_argument("--top", type=int, default=30)
parser.add_argument("--hour", default="153000", help="KIS upper-bound time HHMMSS.")
parser.add_argument("--sleep", type=float, default=1.1)
parser.add_argument("--real-quotes", action="store_true", help="Use real quote API even if .env is mock.")
args = parser.parse_args()
asyncio.run(main_async(args))
if __name__ == "__main__":
main()