from __future__ import annotations import argparse import json import sys from datetime import datetime from pathlib import Path import joblib import pandas as pd from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score, precision_score, roc_auc_score from sklearn.model_selection import train_test_split ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT)) from app.ml.features import LABEL_COLUMNS, build_feature_matrix, select_feature_columns DEFAULT_INPUTS = [ Path("data/training_dataset.csv"), Path("data/external_training_dataset.csv"), ] def _read_csv(path: Path) -> pd.DataFrame: if not path.exists() or path.stat().st_size == 0: return pd.DataFrame() return pd.read_csv(path) def _load_inputs(paths: list[Path]) -> pd.DataFrame: frames = [] for path in paths: frame = _read_csv(path) if not frame.empty: frame["source_file"] = str(path) frames.append(frame) if not frames: return pd.DataFrame() return pd.concat(frames, ignore_index=True, sort=False) def _train_one_target( x: pd.DataFrame, y: pd.Series, random_state: int, ) -> tuple[RandomForestClassifier, dict[str, float | int]]: stratify = y if y.nunique() == 2 and y.value_counts().min() >= 2 else None x_train, x_test, y_train, y_test = train_test_split( x, y, test_size=0.25, random_state=random_state, stratify=stratify, ) model = RandomForestClassifier( n_estimators=300, max_depth=7, min_samples_leaf=10, class_weight="balanced_subsample", random_state=random_state, n_jobs=-1, ) model.fit(x_train, y_train) prediction = model.predict(x_test) metrics: dict[str, float | int] = { "rows": int(len(y)), "train_rows": int(len(y_train)), "test_rows": int(len(y_test)), "positive_rows": int(y.sum()), "accuracy": float(accuracy_score(y_test, prediction)), "precision": float(precision_score(y_test, prediction, zero_division=0)), } if y_test.nunique() == 2 and hasattr(model, "predict_proba"): metrics["roc_auc"] = float(roc_auc_score(y_test, model.predict_proba(x_test)[:, 1])) return model, metrics def train(args: argparse.Namespace) -> int: df = _load_inputs(args.inputs) if df.empty: print("No training rows found. Export datasets first.") return 0 targets = [target for target in args.targets if target in df.columns] if not targets: print("No supported label columns found.") return 0 if len(df) < args.min_rows: print(f"Only {len(df)} rows found. Need at least {args.min_rows} rows before training.") return 0 feature_columns = select_feature_columns(df, targets) if not feature_columns: print("No numeric feature columns found.") return 0 x, medians = build_feature_matrix(df, feature_columns) models = {} metrics = { "created_at": datetime.now().isoformat(timespec="seconds"), "input_files": [str(path) for path in args.inputs], "feature_count": len(feature_columns), "features": feature_columns, "targets": {}, } for target in targets: labeled = df[target].dropna() target_index = labeled.index y = pd.to_numeric(labeled, errors="coerce").dropna().astype(int) target_x = x.loc[y.index] if len(y) < args.min_rows: print(f"Skipping {target}: only {len(y)} labeled rows.") continue if y.nunique() < 2: print(f"Skipping {target}: target has only one class.") continue model, target_metrics = _train_one_target(target_x, y, args.random_state) models[target] = model metrics["targets"][target] = target_metrics print(f"Trained {target}: {target_metrics}") if not models: print("No model was trained.") return 0 args.out.parent.mkdir(parents=True, exist_ok=True) bundle = { "created_at": metrics["created_at"], "feature_columns": feature_columns, "medians": medians, "models": models, "metrics": metrics, } joblib.dump(bundle, args.out) metrics_path = args.metrics_out or args.out.with_suffix(".metrics.json") metrics_path.write_text(json.dumps(metrics, indent=2), encoding="utf-8") print(f"Saved model: {args.out}") print(f"Saved metrics: {metrics_path}") return 0 def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Train scalping AI models from exported datasets.") parser.add_argument( "--inputs", nargs="+", type=Path, default=DEFAULT_INPUTS, help="CSV datasets to combine.", ) parser.add_argument("--out", type=Path, default=Path("models/scalping_model.joblib")) parser.add_argument("--metrics-out", type=Path, default=None) parser.add_argument("--min-rows", type=int, default=200) parser.add_argument("--random-state", type=int, default=42) parser.add_argument( "--targets", nargs="+", default=sorted(LABEL_COLUMNS), choices=sorted(LABEL_COLUMNS), ) return parser.parse_args() if __name__ == "__main__": raise SystemExit(train(parse_args()))