[2026-05-28] 외부 데이터 학습 파이프라인 복구

This commit is contained in:
2026-05-28 20:13:27 +09:00
parent 57e945ef28
commit e1a32ce177
6 changed files with 152 additions and 14 deletions
+1
View File
@@ -4,6 +4,7 @@ data/daily_context.json
data/universe_cache.json data/universe_cache.json
data/training_dataset*.csv data/training_dataset*.csv
data/external_training_dataset*.csv data/external_training_dataset*.csv
data/external/
data/kis_token_*.json data/kis_token_*.json
data/news/ data/news/
data/market/ data/market/
+13 -1
View File
@@ -15,16 +15,24 @@ EXCLUDED_COLUMNS = {
"name", "name",
"entry_time", "entry_time",
"exit_time", "exit_time",
"exit_price",
"sample_time", "sample_time",
"created_at", "created_at",
"exit_reason", "exit_reason",
"strategy", "strategy",
"reason", "reason",
"pnl",
"source_file", "source_file",
"ai_win_score", "ai_win_score",
"ai_stop_loss_score", "ai_stop_loss_score",
"ai_model_version", "ai_model_version",
} }
EXCLUDED_PREFIXES = (
"price_",
"ret_",
"mfe_",
"mae_",
)
def select_feature_columns(df: pd.DataFrame, targets: Iterable[str] = LABEL_COLUMNS) -> list[str]: def select_feature_columns(df: pd.DataFrame, targets: Iterable[str] = LABEL_COLUMNS) -> list[str]:
@@ -32,7 +40,11 @@ def select_feature_columns(df: pd.DataFrame, targets: Iterable[str] = LABEL_COLU
numeric_columns = [ numeric_columns = [
column column
for column in df.columns for column in df.columns
if column not in excluded and pd.api.types.is_numeric_dtype(df[column]) if (
column not in excluded
and not column.startswith(EXCLUDED_PREFIXES)
and pd.api.types.is_numeric_dtype(df[column])
)
] ]
return sorted(numeric_columns) return sorted(numeric_columns)
+18 -5
View File
@@ -31,16 +31,29 @@ def _num(value, default=0.0):
return default return default
def _load_daily_amounts() -> dict[tuple[str, str], dict]: def _load_daily_amounts() -> dict[str, list[dict]]:
result = {} by_ticker = defaultdict(dict)
for file in DAILY_ROOT.glob("*/stocks.csv"): for file in DAILY_ROOT.glob("*/stocks.csv"):
rows = _read_csv(file) rows = _read_csv(file)
for row in rows: for row in rows:
date = str(row.get("date") or file.parent.name) date = str(row.get("date") or file.parent.name)
ticker = str(row.get("ticker") or row.get("티커") or "") ticker = str(row.get("ticker") or row.get("티커") or "")
if ticker: if ticker:
result[(date, ticker)] = row row["date"] = date
return result by_ticker[ticker][date] = row
return {
ticker: [rows[date] for date in sorted(rows)]
for ticker, rows in by_ticker.items()
}
def _previous_daily_row(daily: dict[str, list[dict]], date: str, ticker: str) -> dict:
previous = {}
for row in daily.get(ticker, []):
if str(row.get("date", "")) >= date:
break
previous = row
return previous
def _future_metrics(rows: list[dict], idx: int, entry_price: float): def _future_metrics(rows: list[dict], idx: int, entry_price: float):
@@ -84,7 +97,7 @@ def _rows_for_file(path: Path, daily: dict, k: float, breakout_only: bool):
for date in sorted(by_date): for date in sorted(by_date):
day_rows = by_date[date] day_rows = by_date[date]
ticker = day_rows[0]["ticker"] ticker = day_rows[0]["ticker"]
daily_row = daily.get((date, ticker), {}) daily_row = _previous_daily_row(daily, date, ticker)
prev_high = _num(daily_row.get("high")) prev_high = _num(daily_row.get("high"))
prev_low = _num(daily_row.get("low")) prev_low = _num(daily_row.get("low"))
prev_amount = _num(daily_row.get("amount")) prev_amount = _num(daily_row.get("amount"))
+81 -2
View File
@@ -6,15 +6,23 @@ Outputs:
data/external/daily/YYYYMMDD/indexes.csv data/external/daily/YYYYMMDD/indexes.csv
""" """
import argparse import argparse
import asyncio
import csv
import os
import sys import sys
from datetime import datetime from datetime import datetime, timedelta
from pathlib import Path from pathlib import Path
import pandas as pd import pandas as pd
ROOT = Path(__file__).resolve().parent.parent ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
OUT_ROOT = ROOT / "data" / "external" / "daily" OUT_ROOT = ROOT / "data" / "external" / "daily"
ETF_KEYWORDS = (
"인버스", "레버리지", "선물", "KODEX", "TIGER", "KBSTAR",
"HANARO", "ARIRANG", "KOSEF", "SOL", "ACE", "RISE", "PLUS",
)
def _yyyymmdd(date_text: str | None) -> str: def _yyyymmdd(date_text: str | None) -> str:
@@ -44,6 +52,20 @@ def _standardize_index_row(row: dict, date_yyyymmdd: str, code: str, name: str)
return out return out
def _is_etf(ticker: str, name: str) -> bool:
if ticker.startswith("Q") or len(ticker) != 6:
return True
return any(keyword in name for keyword in ETF_KEYWORDS)
def _write_csv(path: Path, rows: list[dict], fieldnames: list[str]):
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8-sig", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
def collect_with_pykrx(date_yyyymmdd: str, out_dir: Path): def collect_with_pykrx(date_yyyymmdd: str, out_dir: Path):
try: try:
from pykrx import stock from pykrx import stock
@@ -67,16 +89,73 @@ def collect_with_pykrx(date_yyyymmdd: str, out_dir: Path):
return len(stocks), len(index_rows) return len(stocks), len(index_rows)
async def collect_with_kis(date_yyyymmdd: str, out_dir: Path, top: int):
from app.main import load_env
from app.execution.kis_client import KISClient
load_env()
os.environ["KIS_MOCK"] = "false"
kis = KISClient()
await kis.get_access_token()
rank = await kis.get_volume_rank(top_n=top * 2)
tickers = [
r["ticker"] for r in rank
if not _is_etf(r["ticker"], r["name"])
][:top]
end_dt = datetime.strptime(date_yyyymmdd, "%Y%m%d")
start = (end_dt - timedelta(days=14)).strftime("%Y%m%d")
stock_rows = []
for ticker in tickers:
try:
rows = await kis.get_ohlcv_daily(ticker, start=start, end=date_yyyymmdd)
except Exception as exc:
print(f"daily fetch failed {ticker}: {exc}", file=sys.stderr)
continue
for row in rows:
stock_rows.append({
"date": row["date"],
"ticker": ticker,
"open": row["open"],
"high": row["high"],
"low": row["low"],
"close": row["close"],
"volume": row["volume"],
"amount": row["close"] * row["volume"],
})
await asyncio.sleep(0.25)
_write_csv(
out_dir / "stocks.csv",
stock_rows,
["date", "ticker", "open", "high", "low", "close", "volume", "amount"],
)
_write_csv(
out_dir / "indexes.csv",
[],
["date", "code", "name", "open", "high", "low", "close", "volume", "amount", "change_pct"],
)
return len(stock_rows), 0
def main(): def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--date", help="YYYY-MM-DD or YYYYMMDD. Defaults to today.") parser.add_argument("--date", help="YYYY-MM-DD or YYYYMMDD. Defaults to today.")
parser.add_argument("--top", type=int, default=30)
args = parser.parse_args() args = parser.parse_args()
date_yyyymmdd = _yyyymmdd(args.date) date_yyyymmdd = _yyyymmdd(args.date)
out_dir = OUT_ROOT / date_yyyymmdd out_dir = OUT_ROOT / date_yyyymmdd
out_dir.mkdir(parents=True, exist_ok=True) out_dir.mkdir(parents=True, exist_ok=True)
stock_count, index_count = collect_with_pykrx(date_yyyymmdd, out_dir) try:
stock_count, index_count = collect_with_pykrx(date_yyyymmdd, out_dir)
except Exception as exc:
print(f"pykrx daily features failed, falling back to KIS: {exc}", file=sys.stderr)
stock_count, index_count = asyncio.run(collect_with_kis(date_yyyymmdd, out_dir, args.top))
print(f"saved daily features: stocks={stock_count}, indexes={index_count}, dir={out_dir}") print(f"saved daily features: stocks={stock_count}, indexes={index_count}, dir={out_dir}")
+38 -5
View File
@@ -23,6 +23,14 @@ from app.execution.kis_client import KISClient
OUT_ROOT = ROOT / "data" / "external" / "minute" OUT_ROOT = ROOT / "data" / "external" / "minute"
DEFAULT_HOURS = (
"093000", "100000", "103000", "110000", "113000",
"120000", "123000", "130000", "133000", "140000",
)
ETF_KEYWORDS = (
"인버스", "레버리지", "선물", "KODEX", "TIGER", "KBSTAR",
"HANARO", "ARIRANG", "KOSEF", "SOL", "ACE", "RISE", "PLUS",
)
def _date_dir(date_text: str | None) -> str: def _date_dir(date_text: str | None) -> str:
@@ -39,6 +47,12 @@ def _load_cached_tickers(limit: int) -> list[str]:
return list(data.get("tickers", []))[:limit] return list(data.get("tickers", []))[:limit]
def _is_etf(ticker: str, name: str) -> bool:
if ticker.startswith("Q") or len(ticker) != 6:
return True
return any(keyword in name for keyword in ETF_KEYWORDS)
async def _resolve_tickers(kis: KISClient, args) -> list[str]: async def _resolve_tickers(kis: KISClient, args) -> list[str]:
if args.tickers: if args.tickers:
return [t.strip() for t in args.tickers.split(",") if t.strip()] return [t.strip() for t in args.tickers.split(",") if t.strip()]
@@ -47,8 +61,12 @@ async def _resolve_tickers(kis: KISClient, args) -> list[str]:
if cached: if cached:
return cached return cached
rank = await kis.get_volume_rank(top_n=args.top) rank = await kis.get_volume_rank(top_n=args.top * 2)
return [r["ticker"] for r in rank] tickers = [
r["ticker"] for r in rank
if args.include_etf or not _is_etf(r["ticker"], r["name"])
]
return tickers[:args.top]
def _write_csv(path: Path, rows: list[dict]): def _write_csv(path: Path, rows: list[dict]):
@@ -59,6 +77,17 @@ def _write_csv(path: Path, rows: list[dict]):
writer.writerows(rows) writer.writerows(rows)
async def _collect_ticker_rows(kis: KISClient, ticker: str, hours: list[str], sleep: float) -> list[dict]:
by_key = {}
for hour in hours:
rows = await kis.get_ohlcv_minute(ticker, hour=hour)
for row in rows:
key = (row.get("date"), row.get("time"), row.get("ticker"))
by_key[key] = row
await asyncio.sleep(sleep)
return sorted(by_key.values(), key=lambda r: (r.get("date", ""), r.get("time", "")))
async def main_async(args): async def main_async(args):
load_env() load_env()
if args.real_quotes: if args.real_quotes:
@@ -67,12 +96,15 @@ async def main_async(args):
kis = KISClient() kis = KISClient()
await kis.get_access_token() await kis.get_access_token()
tickers = await _resolve_tickers(kis, args) tickers = await _resolve_tickers(kis, args)
hours = [h.strip() for h in (args.hours or args.hour or "").split(",") if h.strip()]
if not hours:
hours = list(DEFAULT_HOURS)
out_dir = OUT_ROOT / _date_dir(args.date) out_dir = OUT_ROOT / _date_dir(args.date)
saved = 0 saved = 0
for ticker in tickers: for ticker in tickers:
try: try:
rows = await kis.get_ohlcv_minute(ticker, hour=args.hour) rows = await _collect_ticker_rows(kis, ticker, hours, args.sleep)
if rows: if rows:
_write_csv(out_dir / f"{ticker}.csv", rows) _write_csv(out_dir / f"{ticker}.csv", rows)
saved += 1 saved += 1
@@ -81,7 +113,6 @@ async def main_async(args):
print(f"no rows {ticker}") print(f"no rows {ticker}")
except Exception as exc: except Exception as exc:
print(f"failed {ticker}: {exc}", file=sys.stderr) print(f"failed {ticker}: {exc}", file=sys.stderr)
await asyncio.sleep(args.sleep)
print(f"minute collection done: saved={saved}/{len(tickers)}, dir={out_dir}") print(f"minute collection done: saved={saved}/{len(tickers)}, dir={out_dir}")
@@ -91,9 +122,11 @@ def main():
parser.add_argument("--date", help="YYYY-MM-DD or YYYYMMDD. Defaults to today.") parser.add_argument("--date", help="YYYY-MM-DD or YYYYMMDD. Defaults to today.")
parser.add_argument("--tickers", help="Comma-separated tickers.") parser.add_argument("--tickers", help="Comma-separated tickers.")
parser.add_argument("--top", type=int, default=30) parser.add_argument("--top", type=int, default=30)
parser.add_argument("--hour", default="153000", help="KIS upper-bound time HHMMSS.") parser.add_argument("--hour", default="", help="Single KIS upper-bound time HHMMSS.")
parser.add_argument("--hours", default=",".join(DEFAULT_HOURS), help="Comma-separated KIS upper-bound times.")
parser.add_argument("--sleep", type=float, default=1.1) parser.add_argument("--sleep", type=float, default=1.1)
parser.add_argument("--real-quotes", action="store_true", help="Use real quote API even if .env is mock.") parser.add_argument("--real-quotes", action="store_true", help="Use real quote API even if .env is mock.")
parser.add_argument("--include-etf", action="store_true", help="Include ETF/ETN products in collection.")
args = parser.parse_args() args = parser.parse_args()
asyncio.run(main_async(args)) asyncio.run(main_async(args))
+1 -1
View File
@@ -49,7 +49,7 @@ Invoke-PythonStep -Name "collecting KIS minute data" -Args @("scripts\collect_mi
Invoke-PythonStep -Name "exporting bot training dataset" -Args @("scripts\export_training_dataset.py", "data\training_dataset.csv") -Required $true Invoke-PythonStep -Name "exporting bot training dataset" -Args @("scripts\export_training_dataset.py", "data\training_dataset.csv") -Required $true
Invoke-PythonStep -Name "building external training dataset" -Args @("scripts\build_external_training_dataset.py", "--out", "data\external_training_dataset.csv") -Required $true Invoke-PythonStep -Name "building external training dataset" -Args @("scripts\build_external_training_dataset.py", "--out", "data\external_training_dataset.csv", "--all-minutes") -Required $true
Invoke-PythonStep -Name "training model" -Args @("scripts\train_ai_model.py") -Required $true Invoke-PythonStep -Name "training model" -Args @("scripts\train_ai_model.py") -Required $true