Files

180 lines
5.3 KiB
Python

from __future__ import annotations
import argparse
import json
import sys
from datetime import datetime
from pathlib import Path
import joblib
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, roc_auc_score
from sklearn.model_selection import train_test_split
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
from app.ml.features import LABEL_COLUMNS, build_feature_matrix, select_feature_columns
DEFAULT_INPUTS = [
Path("data/training_dataset.csv"),
Path("data/external_training_dataset.csv"),
]
def _read_csv(path: Path) -> pd.DataFrame:
if not path.exists() or path.stat().st_size == 0:
return pd.DataFrame()
return pd.read_csv(path)
def _load_inputs(paths: list[Path]) -> pd.DataFrame:
frames = []
for path in paths:
frame = _read_csv(path)
if not frame.empty:
frame["source_file"] = str(path)
frames.append(frame)
if not frames:
return pd.DataFrame()
return pd.concat(frames, ignore_index=True, sort=False)
def _train_one_target(
x: pd.DataFrame,
y: pd.Series,
random_state: int,
) -> tuple[RandomForestClassifier, dict[str, float | int]]:
stratify = y if y.nunique() == 2 and y.value_counts().min() >= 2 else None
x_train, x_test, y_train, y_test = train_test_split(
x,
y,
test_size=0.25,
random_state=random_state,
stratify=stratify,
)
model = RandomForestClassifier(
n_estimators=300,
max_depth=7,
min_samples_leaf=10,
class_weight="balanced_subsample",
random_state=random_state,
n_jobs=-1,
)
model.fit(x_train, y_train)
prediction = model.predict(x_test)
metrics: dict[str, float | int] = {
"rows": int(len(y)),
"train_rows": int(len(y_train)),
"test_rows": int(len(y_test)),
"positive_rows": int(y.sum()),
"accuracy": float(accuracy_score(y_test, prediction)),
"precision": float(precision_score(y_test, prediction, zero_division=0)),
}
if y_test.nunique() == 2 and hasattr(model, "predict_proba"):
metrics["roc_auc"] = float(roc_auc_score(y_test, model.predict_proba(x_test)[:, 1]))
return model, metrics
def train(args: argparse.Namespace) -> int:
df = _load_inputs(args.inputs)
if df.empty:
print("No training rows found. Export datasets first.")
return 0
targets = [target for target in args.targets if target in df.columns]
if not targets:
print("No supported label columns found.")
return 0
if len(df) < args.min_rows:
print(f"Only {len(df)} rows found. Need at least {args.min_rows} rows before training.")
return 0
feature_columns = select_feature_columns(df, targets)
if not feature_columns:
print("No numeric feature columns found.")
return 0
x, medians = build_feature_matrix(df, feature_columns)
models = {}
metrics = {
"created_at": datetime.now().isoformat(timespec="seconds"),
"input_files": [str(path) for path in args.inputs],
"feature_count": len(feature_columns),
"features": feature_columns,
"targets": {},
}
for target in targets:
labeled = df[target].dropna()
target_index = labeled.index
y = pd.to_numeric(labeled, errors="coerce").dropna().astype(int)
target_x = x.loc[y.index]
if len(y) < args.min_rows:
print(f"Skipping {target}: only {len(y)} labeled rows.")
continue
if y.nunique() < 2:
print(f"Skipping {target}: target has only one class.")
continue
model, target_metrics = _train_one_target(target_x, y, args.random_state)
models[target] = model
metrics["targets"][target] = target_metrics
print(f"Trained {target}: {target_metrics}")
if not models:
print("No model was trained.")
return 0
args.out.parent.mkdir(parents=True, exist_ok=True)
bundle = {
"created_at": metrics["created_at"],
"feature_columns": feature_columns,
"medians": medians,
"models": models,
"metrics": metrics,
}
joblib.dump(bundle, args.out)
metrics_path = args.metrics_out or args.out.with_suffix(".metrics.json")
metrics_path.write_text(json.dumps(metrics, indent=2), encoding="utf-8")
print(f"Saved model: {args.out}")
print(f"Saved metrics: {metrics_path}")
return 0
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Train scalping AI models from exported datasets.")
parser.add_argument(
"--inputs",
nargs="+",
type=Path,
default=DEFAULT_INPUTS,
help="CSV datasets to combine.",
)
parser.add_argument("--out", type=Path, default=Path("models/scalping_model.joblib"))
parser.add_argument("--metrics-out", type=Path, default=None)
parser.add_argument("--min-rows", type=int, default=200)
parser.add_argument("--random-state", type=int, default=42)
parser.add_argument(
"--targets",
nargs="+",
default=sorted(LABEL_COLUMNS),
choices=sorted(LABEL_COLUMNS),
)
return parser.parse_args()
if __name__ == "__main__":
raise SystemExit(train(parse_args()))