diff --git a/.gitignore b/.gitignore index 9b541f1..97c3265 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ data/daily_context.json data/universe_cache.json data/training_dataset*.csv data/external_training_dataset*.csv +data/external/ data/kis_token_*.json data/news/ data/market/ diff --git a/app/ml/features.py b/app/ml/features.py index d14f6c7..a97c7a2 100644 --- a/app/ml/features.py +++ b/app/ml/features.py @@ -15,16 +15,24 @@ EXCLUDED_COLUMNS = { "name", "entry_time", "exit_time", + "exit_price", "sample_time", "created_at", "exit_reason", "strategy", "reason", + "pnl", "source_file", "ai_win_score", "ai_stop_loss_score", "ai_model_version", } +EXCLUDED_PREFIXES = ( + "price_", + "ret_", + "mfe_", + "mae_", +) def select_feature_columns(df: pd.DataFrame, targets: Iterable[str] = LABEL_COLUMNS) -> list[str]: @@ -32,7 +40,11 @@ def select_feature_columns(df: pd.DataFrame, targets: Iterable[str] = LABEL_COLU numeric_columns = [ column for column in df.columns - if column not in excluded and pd.api.types.is_numeric_dtype(df[column]) + if ( + column not in excluded + and not column.startswith(EXCLUDED_PREFIXES) + and pd.api.types.is_numeric_dtype(df[column]) + ) ] return sorted(numeric_columns) diff --git a/scripts/build_external_training_dataset.py b/scripts/build_external_training_dataset.py index 9659a92..1700d32 100644 --- a/scripts/build_external_training_dataset.py +++ b/scripts/build_external_training_dataset.py @@ -31,16 +31,29 @@ def _num(value, default=0.0): return default -def _load_daily_amounts() -> dict[tuple[str, str], dict]: - result = {} +def _load_daily_amounts() -> dict[str, list[dict]]: + by_ticker = defaultdict(dict) for file in DAILY_ROOT.glob("*/stocks.csv"): rows = _read_csv(file) for row in rows: date = str(row.get("date") or file.parent.name) ticker = str(row.get("ticker") or row.get("티커") or "") if ticker: - result[(date, ticker)] = row - return result + row["date"] = date + by_ticker[ticker][date] = row + return { + ticker: [rows[date] for date in sorted(rows)] + for ticker, rows in by_ticker.items() + } + + +def _previous_daily_row(daily: dict[str, list[dict]], date: str, ticker: str) -> dict: + previous = {} + for row in daily.get(ticker, []): + if str(row.get("date", "")) >= date: + break + previous = row + return previous def _future_metrics(rows: list[dict], idx: int, entry_price: float): @@ -84,7 +97,7 @@ def _rows_for_file(path: Path, daily: dict, k: float, breakout_only: bool): for date in sorted(by_date): day_rows = by_date[date] ticker = day_rows[0]["ticker"] - daily_row = daily.get((date, ticker), {}) + daily_row = _previous_daily_row(daily, date, ticker) prev_high = _num(daily_row.get("high")) prev_low = _num(daily_row.get("low")) prev_amount = _num(daily_row.get("amount")) diff --git a/scripts/collect_daily_features.py b/scripts/collect_daily_features.py index 3cd9c16..1769339 100644 --- a/scripts/collect_daily_features.py +++ b/scripts/collect_daily_features.py @@ -6,15 +6,23 @@ Outputs: data/external/daily/YYYYMMDD/indexes.csv """ import argparse +import asyncio +import csv +import os import sys -from datetime import datetime +from datetime import datetime, timedelta from pathlib import Path import pandas as pd ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) OUT_ROOT = ROOT / "data" / "external" / "daily" +ETF_KEYWORDS = ( + "인버스", "레버리지", "선물", "KODEX", "TIGER", "KBSTAR", + "HANARO", "ARIRANG", "KOSEF", "SOL", "ACE", "RISE", "PLUS", +) def _yyyymmdd(date_text: str | None) -> str: @@ -44,6 +52,20 @@ def _standardize_index_row(row: dict, date_yyyymmdd: str, code: str, name: str) return out +def _is_etf(ticker: str, name: str) -> bool: + if ticker.startswith("Q") or len(ticker) != 6: + return True + return any(keyword in name for keyword in ETF_KEYWORDS) + + +def _write_csv(path: Path, rows: list[dict], fieldnames: list[str]): + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8-sig", newline="") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + + def collect_with_pykrx(date_yyyymmdd: str, out_dir: Path): try: from pykrx import stock @@ -67,16 +89,73 @@ def collect_with_pykrx(date_yyyymmdd: str, out_dir: Path): return len(stocks), len(index_rows) +async def collect_with_kis(date_yyyymmdd: str, out_dir: Path, top: int): + from app.main import load_env + from app.execution.kis_client import KISClient + + load_env() + os.environ["KIS_MOCK"] = "false" + + kis = KISClient() + await kis.get_access_token() + + rank = await kis.get_volume_rank(top_n=top * 2) + tickers = [ + r["ticker"] for r in rank + if not _is_etf(r["ticker"], r["name"]) + ][:top] + + end_dt = datetime.strptime(date_yyyymmdd, "%Y%m%d") + start = (end_dt - timedelta(days=14)).strftime("%Y%m%d") + + stock_rows = [] + for ticker in tickers: + try: + rows = await kis.get_ohlcv_daily(ticker, start=start, end=date_yyyymmdd) + except Exception as exc: + print(f"daily fetch failed {ticker}: {exc}", file=sys.stderr) + continue + for row in rows: + stock_rows.append({ + "date": row["date"], + "ticker": ticker, + "open": row["open"], + "high": row["high"], + "low": row["low"], + "close": row["close"], + "volume": row["volume"], + "amount": row["close"] * row["volume"], + }) + await asyncio.sleep(0.25) + + _write_csv( + out_dir / "stocks.csv", + stock_rows, + ["date", "ticker", "open", "high", "low", "close", "volume", "amount"], + ) + _write_csv( + out_dir / "indexes.csv", + [], + ["date", "code", "name", "open", "high", "low", "close", "volume", "amount", "change_pct"], + ) + return len(stock_rows), 0 + + def main(): parser = argparse.ArgumentParser() parser.add_argument("--date", help="YYYY-MM-DD or YYYYMMDD. Defaults to today.") + parser.add_argument("--top", type=int, default=30) args = parser.parse_args() date_yyyymmdd = _yyyymmdd(args.date) out_dir = OUT_ROOT / date_yyyymmdd out_dir.mkdir(parents=True, exist_ok=True) - stock_count, index_count = collect_with_pykrx(date_yyyymmdd, out_dir) + try: + stock_count, index_count = collect_with_pykrx(date_yyyymmdd, out_dir) + except Exception as exc: + print(f"pykrx daily features failed, falling back to KIS: {exc}", file=sys.stderr) + stock_count, index_count = asyncio.run(collect_with_kis(date_yyyymmdd, out_dir, args.top)) print(f"saved daily features: stocks={stock_count}, indexes={index_count}, dir={out_dir}") diff --git a/scripts/collect_minute_data.py b/scripts/collect_minute_data.py index 47f4605..2ce774f 100644 --- a/scripts/collect_minute_data.py +++ b/scripts/collect_minute_data.py @@ -23,6 +23,14 @@ from app.execution.kis_client import KISClient OUT_ROOT = ROOT / "data" / "external" / "minute" +DEFAULT_HOURS = ( + "093000", "100000", "103000", "110000", "113000", + "120000", "123000", "130000", "133000", "140000", +) +ETF_KEYWORDS = ( + "인버스", "레버리지", "선물", "KODEX", "TIGER", "KBSTAR", + "HANARO", "ARIRANG", "KOSEF", "SOL", "ACE", "RISE", "PLUS", +) def _date_dir(date_text: str | None) -> str: @@ -39,6 +47,12 @@ def _load_cached_tickers(limit: int) -> list[str]: return list(data.get("tickers", []))[:limit] +def _is_etf(ticker: str, name: str) -> bool: + if ticker.startswith("Q") or len(ticker) != 6: + return True + return any(keyword in name for keyword in ETF_KEYWORDS) + + async def _resolve_tickers(kis: KISClient, args) -> list[str]: if args.tickers: return [t.strip() for t in args.tickers.split(",") if t.strip()] @@ -47,8 +61,12 @@ async def _resolve_tickers(kis: KISClient, args) -> list[str]: if cached: return cached - rank = await kis.get_volume_rank(top_n=args.top) - return [r["ticker"] for r in rank] + rank = await kis.get_volume_rank(top_n=args.top * 2) + tickers = [ + r["ticker"] for r in rank + if args.include_etf or not _is_etf(r["ticker"], r["name"]) + ] + return tickers[:args.top] def _write_csv(path: Path, rows: list[dict]): @@ -59,6 +77,17 @@ def _write_csv(path: Path, rows: list[dict]): writer.writerows(rows) +async def _collect_ticker_rows(kis: KISClient, ticker: str, hours: list[str], sleep: float) -> list[dict]: + by_key = {} + for hour in hours: + rows = await kis.get_ohlcv_minute(ticker, hour=hour) + for row in rows: + key = (row.get("date"), row.get("time"), row.get("ticker")) + by_key[key] = row + await asyncio.sleep(sleep) + return sorted(by_key.values(), key=lambda r: (r.get("date", ""), r.get("time", ""))) + + async def main_async(args): load_env() if args.real_quotes: @@ -67,12 +96,15 @@ async def main_async(args): kis = KISClient() await kis.get_access_token() tickers = await _resolve_tickers(kis, args) + hours = [h.strip() for h in (args.hours or args.hour or "").split(",") if h.strip()] + if not hours: + hours = list(DEFAULT_HOURS) out_dir = OUT_ROOT / _date_dir(args.date) saved = 0 for ticker in tickers: try: - rows = await kis.get_ohlcv_minute(ticker, hour=args.hour) + rows = await _collect_ticker_rows(kis, ticker, hours, args.sleep) if rows: _write_csv(out_dir / f"{ticker}.csv", rows) saved += 1 @@ -81,7 +113,6 @@ async def main_async(args): print(f"no rows {ticker}") except Exception as exc: print(f"failed {ticker}: {exc}", file=sys.stderr) - await asyncio.sleep(args.sleep) print(f"minute collection done: saved={saved}/{len(tickers)}, dir={out_dir}") @@ -91,9 +122,11 @@ def main(): parser.add_argument("--date", help="YYYY-MM-DD or YYYYMMDD. Defaults to today.") parser.add_argument("--tickers", help="Comma-separated tickers.") parser.add_argument("--top", type=int, default=30) - parser.add_argument("--hour", default="153000", help="KIS upper-bound time HHMMSS.") + parser.add_argument("--hour", default="", help="Single KIS upper-bound time HHMMSS.") + parser.add_argument("--hours", default=",".join(DEFAULT_HOURS), help="Comma-separated KIS upper-bound times.") parser.add_argument("--sleep", type=float, default=1.1) parser.add_argument("--real-quotes", action="store_true", help="Use real quote API even if .env is mock.") + parser.add_argument("--include-etf", action="store_true", help="Include ETF/ETN products in collection.") args = parser.parse_args() asyncio.run(main_async(args)) diff --git a/scripts/run_training_pipeline.ps1 b/scripts/run_training_pipeline.ps1 index 669c617..b79e865 100644 --- a/scripts/run_training_pipeline.ps1 +++ b/scripts/run_training_pipeline.ps1 @@ -49,7 +49,7 @@ Invoke-PythonStep -Name "collecting KIS minute data" -Args @("scripts\collect_mi Invoke-PythonStep -Name "exporting bot training dataset" -Args @("scripts\export_training_dataset.py", "data\training_dataset.csv") -Required $true -Invoke-PythonStep -Name "building external training dataset" -Args @("scripts\build_external_training_dataset.py", "--out", "data\external_training_dataset.csv") -Required $true +Invoke-PythonStep -Name "building external training dataset" -Args @("scripts\build_external_training_dataset.py", "--out", "data\external_training_dataset.csv", "--all-minutes") -Required $true Invoke-PythonStep -Name "training model" -Args @("scripts\train_ai_model.py") -Required $true