Files
Stock-trading-programming/app/main.py
T

1355 lines
52 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
main.py
단타 자동매매 시스템 메인 진입점
기획서 v2.1 기준
실행:
python -m app.main (Docker 컨테이너)
python app/main.py (로컬 테스트)
환경변수:
KIS_MOCK=true → 모의투자 모드
DRY_RUN=true → 신호만 확인, 주문 전송 안 함
"""
import io
import json
import os
import sys
import asyncio
import logging
from datetime import datetime, time, timedelta
from pathlib import Path
# 한글 로그 깨짐 방지 — stdout을 UTF-8로 강제
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
elif hasattr(sys.stdout, "buffer"):
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", line_buffering=True)
# .env 로드
def load_env():
env_path = Path(".env")
if not env_path.exists():
return
with open(env_path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, _, v = line.partition("=")
k = k.strip()
v = v.strip()
# 인라인 주석 제거 (예: true # 모의투자 → true)
if " #" in v:
v = v[:v.index(" #")]
v = v.strip().strip('"').strip("'")
if k and v and k not in os.environ:
os.environ[k] = v
load_env()
# 프로젝트 루트를 sys.path에 추가 (로컬 실행 시 필요)
ROOT = Path(__file__).parent.parent
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
# 로깅 설정
logging.basicConfig(
level=getattr(logging, os.getenv("LOG_LEVEL", "INFO")),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("logs/stockbot.log", encoding="utf-8"),
]
)
logger = logging.getLogger(__name__)
from app.execution.kis_client import KISClient
from app.execution.order_executor import OrderExecutor
from app.strategy.volatility_breakout import VolatilityBreakout
from app.risk.manager import RiskManager
from app.monitor.notifier import (
notify_buy, notify_tp1, notify_tp2, notify_sl,
notify_force_exit, notify_risk, notify_daily_summary,
notify_error, notify_ai_result, notify_ai_blocked,
notify_ai_fallback, send
)
from app.db.models import init_db, get_conn
from app.ml.predictor import ScalpingModel
from app.config import (
MAX_UNIVERSE, FORCE_EXIT, MAX_POSITIONS,
MAX_HOLD_MIN, KOSPI_MIN_CHG, MAX_DAILY_ENTRIES,
MAX_HOURLY_STOP_LOSS, ENTRY_PAUSE_WINDOWS,
ENTRY_LIMIT_ENFORCE
)
class SingleInstanceLock:
"""Process-wide lock so only one StockBot can run per workspace."""
def __init__(self, path: str | Path):
self.path = Path(path)
self._fh = None
self._mode = None
def acquire(self) -> bool:
self.path.parent.mkdir(parents=True, exist_ok=True)
self._fh = open(self.path, "a+", encoding="utf-8")
if os.name == "nt":
import msvcrt
self._fh.seek(0, os.SEEK_END)
if self._fh.tell() == 0:
self._fh.write("0")
self._fh.flush()
self._fh.seek(0)
try:
msvcrt.locking(self._fh.fileno(), msvcrt.LK_NBLCK, 1)
except OSError:
self._fh.close()
self._fh = None
return False
self._mode = "msvcrt"
else:
import fcntl
try:
fcntl.flock(self._fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
self._fh.close()
self._fh = None
return False
self._mode = "fcntl"
self._fh.seek(0)
self._fh.truncate()
self._fh.write(str(os.getpid()))
self._fh.flush()
return True
def release(self):
if self._fh is None:
return
try:
if self._mode == "msvcrt":
import msvcrt
self._fh.seek(0)
msvcrt.locking(self._fh.fileno(), msvcrt.LK_UNLCK, 1)
elif self._mode == "fcntl":
import fcntl
fcntl.flock(self._fh.fileno(), fcntl.LOCK_UN)
finally:
self._fh.close()
self._fh = None
class StockBot:
def __init__(self):
self.kis = KISClient()
self.executor = OrderExecutor(self.kis)
self.strategy = VolatilityBreakout()
self.positions = {} # ticker → {name, entry, qty, tp1_done, entry_time}
self.universe = [] # 감시 종목 리스트
self.ticker_names = {} # ticker → 종목명 캐시
self.ticker_sectors = {} # ticker -> sector name cache
self.sl_tickers = set() # 당일 SL 당한 종목 — 재진입 차단
self.risk = None # RiskManager (잔고 확인 후 초기화)
self.running = False
self.scalping_model = ScalpingModel()
# 장중 컨텍스트 (midday_context.json 갱신 감지용)
self._midday_ctx_mtime : float = 0.0
self._midday_pos_mult : float = 1.0 # midday position_size_multiplier
self._midday_loaded : bool = False
self._last_diag : float = 0.0 # 신호 진단 로그 마지막 시각
mode = "모의투자" if self.kis.is_mock else "실거래"
dry = " [DRY_RUN]" if os.getenv("DRY_RUN","true")=="true" else ""
logger.info(f"StockBot 시작 [{mode}]{dry}")
# ─────────────────────────────────────────
# 장중 컨텍스트 감시
# ─────────────────────────────────────────
def _check_midday_context(self):
"""midday_context.json 갱신 감지 → 즉시 점심 세션 파라미터 반영"""
path = Path("data/midday_context.json")
if not path.exists():
return
try:
mtime = path.stat().st_mtime
except OSError:
return
if mtime <= self._midday_ctx_mtime:
return
try:
ctx = json.loads(path.read_text(encoding="utf-8"))
if ctx.get("date") != datetime.now().strftime("%Y-%m-%d"):
return
if not ctx.get("lunch_trade_allowed", True):
logger.warning("midday_context: 점심 세션 진입 중단 설정")
self._midday_pos_mult = float(ctx.get("position_size_multiplier", 1.0))
# 섹터·블랙리스트 업데이트
if "hot_sectors" in ctx:
self.strategy.context["hot_sectors"] = ctx["hot_sectors"]
if "avoid_sectors" in ctx:
self.strategy.context["avoid_sectors"] = ctx["avoid_sectors"]
for t in ctx.get("blacklist_tickers", []):
bl = self.strategy.context.setdefault("blacklist_tickers", [])
if t not in bl:
bl.append(t)
# lunch_trade_allowed=false이면 진입 자체를 막는 플래그 저장
self.strategy.context["lunch_trade_allowed"] = ctx.get("lunch_trade_allowed", True)
self._midday_ctx_mtime = mtime
self._midday_loaded = True
logger.info(
f"midday_context 로드 완료 — 점심 세션 시작 "
f"(포지션 배율: ×{self._midday_pos_mult}, "
f"진입허용: {ctx.get('lunch_trade_allowed', True)})"
)
except Exception as e:
logger.warning(f"midday_context 로드 실패: {e}")
def _today_entry_count(self) -> int:
today = datetime.now().strftime("%Y-%m-%d")
with get_conn() as conn:
row = conn.execute(
"SELECT COUNT(DISTINCT ticker || '|' || entry_time) FROM trades WHERE date=? AND side='BUY'",
(today,),
).fetchone()
return int(row[0] or 0)
def _recent_stop_loss_count(self, minutes: int = 60) -> int:
cutoff = (datetime.now() - timedelta(minutes=minutes)).strftime("%H:%M:%S")
today = datetime.now().strftime("%Y-%m-%d")
with get_conn() as conn:
row = conn.execute("""
SELECT COUNT(*) FROM trades
WHERE date=? AND exit_reason='SL' AND exit_time >= ?
""", (today, cutoff)).fetchone()
return int(row[0] or 0)
def _entry_gate_reason(self, now_str: str) -> str:
for start, end in ENTRY_PAUSE_WINDOWS:
if start <= now_str < end:
return f"entry pause window {start}-{end}"
entries = self._today_entry_count()
if ENTRY_LIMIT_ENFORCE and entries >= MAX_DAILY_ENTRIES:
return f"daily entry limit reached {entries}/{MAX_DAILY_ENTRIES}"
stop_losses = self._recent_stop_loss_count(60)
if ENTRY_LIMIT_ENFORCE and stop_losses >= MAX_HOURLY_STOP_LOSS:
return f"{stop_losses} stop losses in last 60 minutes"
return ""
def _entry_warning_reason(self) -> str:
warnings = []
entries = self._today_entry_count()
if entries >= MAX_DAILY_ENTRIES:
warnings.append(f"daily entries high {entries}/{MAX_DAILY_ENTRIES}")
stop_losses = self._recent_stop_loss_count(60)
if stop_losses >= MAX_HOURLY_STOP_LOSS:
warnings.append(f"{stop_losses} stop losses in last 60 minutes")
return "; ".join(warnings)
def _log_entry_acceptance(
self,
ticker: str,
name: str,
current: float,
target: float,
qty: int,
multiplier: float,
reason: str,
):
logger.info(
"ENTRY accepted %s(%s) current=%s target=%.0f qty=%s mult=%.2f reason=%s",
name,
ticker,
current,
target,
qty,
multiplier,
reason,
)
def _save_entry_snapshot(
self,
trade_id: int | None,
ticker: str,
name: str,
price_info: dict,
target: float,
entry_price: float,
stop_price: float,
qty: int,
signal: dict,
combined_mult: float,
model_scores: dict | None = None,
):
ctx = self.strategy.context
prev = self.strategy.prev_data.get(ticker, {})
now = datetime.now()
model_scores = model_scores or {}
with get_conn() as conn:
conn.execute("""
INSERT INTO entry_snapshots
(trade_id, date, ticker, name, entry_time, current_price,
entry_price, target_price, stop_price, today_open, prev_high,
prev_low, prev_amount, volume, change_pct, market_sentiment,
sentiment_score, risk_level, trade_allowed, hot_sectors,
avoid_sectors, boosted_tickers, blacklist_tickers, ai_boosted,
ai_win_score, ai_stop_loss_score, ai_model_version,
position_size_multiplier, combined_multiplier, entry_reason,
strategy, created_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
trade_id,
now.strftime("%Y-%m-%d"),
ticker,
name,
now.strftime("%H:%M:%S"),
price_info.get("current"),
entry_price,
target,
stop_price,
self.strategy.today_open.get(ticker),
prev.get("high"),
prev.get("low"),
prev.get("amount"),
price_info.get("volume"),
price_info.get("change_pct"),
ctx.get("market_sentiment"),
ctx.get("sentiment_score"),
ctx.get("risk_level"),
1 if ctx.get("trade_allowed", True) else 0,
json.dumps(ctx.get("hot_sectors", []), ensure_ascii=False),
json.dumps(ctx.get("avoid_sectors", []), ensure_ascii=False),
json.dumps(ctx.get("boosted_tickers", []), ensure_ascii=False),
json.dumps(ctx.get("blacklist_tickers", []), ensure_ascii=False),
1 if signal.get("boosted") else 0,
model_scores.get("label_win"),
model_scores.get("label_stop_loss"),
model_scores.get("model_version"),
signal.get("multiplier", 1.0),
combined_mult,
signal.get("reason", ""),
"VB",
now.isoformat(timespec="seconds"),
))
def _build_entry_feature_row(
self,
ticker: str,
price_info: dict,
target: float,
entry_price: float,
stop_price: float,
signal: dict,
combined_mult: float,
) -> dict:
ctx = self.strategy.context
prev = self.strategy.prev_data.get(ticker, {})
return {
"current_price": price_info.get("current"),
"entry_price": entry_price,
"target_price": target,
"stop_price": stop_price,
"today_open": self.strategy.today_open.get(ticker),
"prev_high": prev.get("high"),
"prev_low": prev.get("low"),
"prev_amount": prev.get("amount"),
"volume": price_info.get("volume"),
"change_pct": price_info.get("change_pct"),
"sentiment_score": ctx.get("sentiment_score"),
"trade_allowed": 1 if ctx.get("trade_allowed", True) else 0,
"ai_boosted": 1 if signal.get("boosted") else 0,
"position_size_multiplier": signal.get("multiplier", 1.0),
"combined_multiplier": combined_mult,
}
def _score_entry_candidate(
self,
ticker: str,
name: str,
price_info: dict,
target: float,
entry_price: float,
stop_price: float,
signal: dict,
combined_mult: float,
) -> dict:
if not self.scalping_model.available:
return {}
try:
row = self._build_entry_feature_row(
ticker=ticker,
price_info=price_info,
target=target,
entry_price=entry_price,
stop_price=stop_price,
signal=signal,
combined_mult=combined_mult,
)
scores = self.scalping_model.score(row)
if scores:
scores["model_version"] = self.scalping_model.version
logger.info(
"AI_SCORE %s(%s) win=%.3f stop_loss=%.3f",
name,
ticker,
scores.get("label_win", -1),
scores.get("label_stop_loss", -1),
)
return scores
except Exception as e:
logger.warning("AI score failed %s(%s): %s", name, ticker, e)
return {}
def _save_post_entry_snapshot(
self,
trade_id: int | None,
ticker: str,
elapsed_sec: int,
entry_price: float,
price_info: dict,
mfe_pct: float,
mae_pct: float,
position_open: bool,
):
if trade_id is None:
return
current = price_info.get("current")
if not entry_price or current is None:
return
now = datetime.now()
return_pct = (current - entry_price) / entry_price * 100
with get_conn() as conn:
conn.execute("""
INSERT OR REPLACE INTO post_entry_snapshots
(trade_id, date, ticker, sample_time, elapsed_sec, entry_price,
current_price, return_pct, mfe_pct, mae_pct, volume,
change_pct, position_open, created_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
trade_id,
now.strftime("%Y-%m-%d"),
ticker,
now.strftime("%H:%M:%S"),
elapsed_sec,
entry_price,
current,
return_pct,
mfe_pct,
mae_pct,
price_info.get("volume"),
price_info.get("change_pct"),
1 if position_open else 0,
now.isoformat(timespec="seconds"),
))
async def _track_post_entry(
self,
trade_id: int | None,
ticker: str,
name: str,
entry_price: float,
):
if trade_id is None:
return
checkpoints = (60, 180, 300, 600)
start_ts = datetime.now().timestamp()
high = entry_price
low = entry_price
for elapsed_sec in checkpoints:
delay = max(0, start_ts + elapsed_sec - datetime.now().timestamp())
await asyncio.sleep(delay)
try:
price_info = await self.kis.get_price(ticker)
current = price_info["current"]
high = max(high, current)
low = min(low, current)
mfe_pct = (high - entry_price) / entry_price * 100
mae_pct = (low - entry_price) / entry_price * 100
self._save_post_entry_snapshot(
trade_id=trade_id,
ticker=ticker,
elapsed_sec=elapsed_sec,
entry_price=entry_price,
price_info=price_info,
mfe_pct=mfe_pct,
mae_pct=mae_pct,
position_open=ticker in self.positions,
)
logger.info(
"POST_ENTRY %s(%s) t=%ss current=%s mfe=%.2f%% mae=%.2f%% open=%s",
name,
ticker,
elapsed_sec,
current,
mfe_pct,
mae_pct,
ticker in self.positions,
)
except Exception as e:
logger.warning("post-entry snapshot failed %s t=%ss: %s", ticker, elapsed_sec, e)
# ─────────────────────────────────────────
# 초기화
# ─────────────────────────────────────────
async def initialize(self):
"""시스템 초기화"""
init_db()
await self.kis.get_access_token()
# 잔고 조회 → RiskManager 초기화
balance = await self.kis.get_balance()
cash = balance["cash"]
self.risk = RiskManager(init_cash=cash)
logger.info(f"초기 예수금: {cash:,}")
# DB에서 열린 포지션 복원 (재시작 시)
self._restore_positions_from_db()
# 당일 SL 종목 복원 (재시작 후에도 재진입 차단 유지)
self._restore_sl_tickers_from_db()
self._restore_reentry_controls_from_db()
await send(f"[시작] 단타봇 가동 | 예수금: {cash:,}원 | "
f"{'모의투자' if self.kis.is_mock else '실거래'}")
def _restore_positions_from_db(self):
"""재시작 시 DB positions 테이블에서 인메모리 복원"""
with get_conn() as conn:
rows = conn.execute("SELECT * FROM positions").fetchall()
for r in rows:
ticker, name, entry_time, entry_price, qty, tp1_done, target_price, stop_price, ai_boosted = r
self.positions[ticker] = {
"name" : name,
"entry" : entry_price,
"qty" : qty,
"tp1_done" : bool(tp1_done),
"entry_time": datetime.strptime(entry_time, "%H:%M:%S").replace(
year=datetime.now().year,
month=datetime.now().month,
day=datetime.now().day),
"sl_price" : stop_price,
"boosted" : bool(ai_boosted),
}
if self.positions:
logger.info(f"DB 포지션 복원: {list(self.positions.keys())}")
def _restore_sl_tickers_from_db(self):
"""재시작 시 당일 SL 종목 복원 — 재진입 차단 유지"""
today = datetime.now().strftime("%Y-%m-%d")
with get_conn() as conn:
rows = conn.execute(
"SELECT DISTINCT ticker FROM trades WHERE date=? AND exit_reason='SL'",
(today,)
).fetchall()
for (ticker,) in rows:
self.sl_tickers.add(ticker)
if self.sl_tickers:
logger.info(f"당일 SL 종목 복원(재진입 차단): {self.sl_tickers}")
def _restore_reentry_controls_from_db(self):
"""재시작 시 오늘 청산 이력 기반 재진입 제한 상태를 복원한다."""
today = datetime.now().strftime("%Y-%m-%d")
with get_conn() as conn:
rows = conn.execute("""
SELECT ticker, exit_time, exit_reason
FROM trades
WHERE date=?
AND exit_time IS NOT NULL
AND exit_reason IN ('TIME', 'FORCE', 'TP1', 'TP2')
ORDER BY exit_time
""", (today,)).fetchall()
restored = []
for ticker, exit_time, reason in rows:
if ticker in self.positions:
continue
try:
exit_dt = datetime.strptime(exit_time, "%H:%M:%S").replace(
year=datetime.now().year,
month=datetime.now().month,
day=datetime.now().day,
)
except (TypeError, ValueError):
continue
self.strategy.mark_final_exit(ticker, reason, exit_dt)
restored.append(f"{ticker}:{reason}")
if restored:
logger.info("재진입 제한 상태 복원: %s", ", ".join(restored))
def _db_save_position(self, ticker: str, pos: dict, target_price: float):
with get_conn() as conn:
conn.execute("""
INSERT OR REPLACE INTO positions
(ticker, name, entry_time, entry_price, quantity,
tp1_done, target_price, stop_price, ai_boosted)
VALUES (?,?,?,?,?,?,?,?,?)
""", (
ticker, pos["name"],
pos["entry_time"].strftime("%H:%M:%S"),
pos["entry"], pos["qty"],
1 if pos.get("tp1_done") else 0,
target_price, pos["sl_price"],
1 if pos.get("boosted") else 0,
))
def _db_delete_position(self, ticker: str):
with get_conn() as conn:
conn.execute("DELETE FROM positions WHERE ticker=?", (ticker,))
def _db_has_open_position(self, ticker: str) -> bool:
with get_conn() as conn:
row = conn.execute(
"SELECT 1 FROM positions WHERE ticker=?",
(ticker,),
).fetchone()
return row is not None
def _db_reserve_position(
self,
ticker: str,
name: str,
entry_price: float,
qty: int,
target_price: float,
stop_price: float,
ai_boosted: bool,
) -> bool:
"""Atomically reserve a ticker before sending a buy order."""
with get_conn() as conn:
cur = conn.execute("""
INSERT OR IGNORE INTO positions
(ticker, name, entry_time, entry_price, quantity,
tp1_done, target_price, stop_price, ai_boosted)
VALUES (?,?,?,?,?,?,?,?,?)
""", (
ticker,
name,
datetime.now().strftime("%H:%M:%S"),
entry_price,
qty,
0,
target_price,
stop_price,
1 if ai_boosted else 0,
))
return cur.rowcount == 1
@staticmethod
def _is_rate_limit_error(err) -> bool:
msg = str(err)
return (
"초당" in msg
or "거래건수" in msg
or "rate limit" in msg.lower()
or "too many" in msg.lower()
)
async def _get_price_with_retry(self, ticker: str, purpose: str, attempts: int = 3):
delays = (1.2, 2.5, 5.0)
for attempt in range(1, attempts + 1):
try:
return await self.kis.get_price(ticker)
except Exception as e:
if attempt >= attempts or not self._is_rate_limit_error(e):
raise
wait = delays[min(attempt - 1, len(delays) - 1)]
logger.warning(
"%s price retry %s/%s for %s after rate limit: %s",
purpose,
attempt,
attempts,
ticker,
e,
)
await asyncio.sleep(wait)
async def _sell_with_retry(
self,
ticker: str,
name: str,
qty: int,
reason: str,
attempts: int = 3,
) -> dict:
delays = (1.2, 2.5, 5.0)
for attempt in range(1, attempts + 1):
result = await self.executor.sell(ticker, name, qty, reason)
if result.get("success"):
return result
error = result.get("error", "")
if attempt >= attempts or not self._is_rate_limit_error(error):
return result
wait = delays[min(attempt - 1, len(delays) - 1)]
logger.warning(
"SELL retry %s/%s for %s after rate limit: %s",
attempt,
attempts,
ticker,
error,
)
await asyncio.sleep(wait)
# ─────────────────────────────────────────
# 유니버스 갱신 (08:30)
# ─────────────────────────────────────────
# ETF/ETN/인버스/레버리지 종목 필터
_ETF_KEYWORDS = ('인버스', '레버리지', '선물', 'KODEX', 'TIGER', 'KBSTAR',
'HANARO', 'ARIRANG', 'KOSEF', 'SOL', 'ACE', 'RISE', 'PLUS')
_SECTOR_FIELDS = (
"sector",
"sector_name",
"bstp_kor_isnm",
"bstp_cls_name",
"bstp_name",
)
_AVOID_SECTOR_NAME_HINTS = (
("\uac74\uc124", ("\uac74\uc124", "\ub300\uc6b0\uac74\uc124", "\ud604\ub300\uac74\uc124", "GS\uac74\uc124", "DL\uc774\uc564\uc528", "HDC\ud604\ub300\uc0b0\uc5c5\uac1c\ubc1c")),
("\uc804\uae30\uac00\uc2a4", ("\ud55c\uad6d\uc804\ub825", "\ud55c\uc804", "\ud55c\uad6d\uac00\uc2a4", "\uc9c0\uc5ed\ub09c\ubc29", "\uc804\uae30", "\uac00\uc2a4")),
("\uc8fc\ub958", ("\ud558\uc774\ud2b8\uc9c4\ub85c", "\ub86f\ub370\uce60\uc131", "\ubb34\ud559", "\ubcf4\ud574\uc591\uc870", "\uad6d\uc21c\ub2f9", "\uc8fc\ub958")),
)
@staticmethod
def _is_etf(ticker: str, name: str) -> bool:
if ticker.startswith('Q') or len(ticker) != 6: # ETN or 비정상 코드
return True
return any(kw in name for kw in StockBot._ETF_KEYWORDS)
@classmethod
def _sector_from_rank_row(cls, row: dict) -> str:
for field in cls._SECTOR_FIELDS:
value = row.get(field)
if value:
return str(value).strip()
return ""
def _infer_avoid_sector_from_name(self, name: str) -> str:
compact_name = (name or "").replace(" ", "")
if not compact_name:
return ""
for sector in self.strategy.context.get("avoid_sectors", []):
sector_name = str(sector or "").strip()
if not sector_name:
continue
compact_sector = sector_name.replace(" ", "")
for key, hints in self._AVOID_SECTOR_NAME_HINTS:
if key in compact_sector and any(hint in compact_name for hint in hints):
return sector_name
generic_token = compact_sector.replace("\uc5c5", "")
if len(generic_token) >= 2 and generic_token in compact_name:
return sector_name
return ""
async def update_universe(self):
"""종목 풀 갱신 + 전일 데이터 수집"""
logger.info("유니버스 갱신 시작")
try:
# ETF 필터 후 MAX_UNIVERSE 확보 위해 여유분 요청
rank = await self.kis.get_volume_rank(top_n=MAX_UNIVERSE + 20)
# 종목명 캐시 갱신 + ETF 필터
for r in rank:
self.ticker_names[r["ticker"]] = r["name"]
sector = self._sector_from_rank_row(r)
if sector:
self.ticker_sectors[r["ticker"]] = sector
tickers = [r["ticker"] for r in rank
if not self._is_etf(r["ticker"], r["name"])]
ctx = self.strategy.context
blacklist = ctx.get("blacklist_tickers", [])
tickers = [t for t in tickers if t not in blacklist]
boosted = ctx.get("boosted_tickers", [])
tickers = (
[t for t in boosted if t in tickers] +
[t for t in tickers if t not in boosted]
)[:MAX_UNIVERSE]
self.universe = tickers
logger.info(f"유니버스: {len(tickers)}종목 (ETF 제외)")
# 최근 7일 범위 조회 → 공휴일·대체공휴일 자동 처리
from datetime import timedelta
today = datetime.now()
start_dt = (today - timedelta(days=7)).strftime("%Y%m%d")
end_dt = (today - timedelta(days=1)).strftime("%Y%m%d")
for ticker in self.universe:
# 이미 전일 데이터 있으면 skip
if self.strategy.has_prev_data(ticker):
continue
try:
ohlcv = await self.kis.get_ohlcv_daily(
ticker,
start=start_dt,
end=end_dt,
)
if ohlcv:
prev = ohlcv[-1] # 가장 최근 거래일
self.strategy.set_prev_data(
ticker,
high = prev["high"],
low = prev["low"],
amount= prev.get("amount",
prev.get("volume", 0) * prev.get("close", 0))
)
else:
logger.warning(f"전일 OHLCV 없음 {ticker} ({start_dt}~{end_dt})")
except Exception as e:
logger.warning(f"전일 데이터 실패 {ticker}: {e}")
await asyncio.sleep(1.1) # 초당 2.5건으로 제한
except Exception as e:
logger.error(f"유니버스 갱신 실패: {e}")
# ─────────────────────────────────────────
# 시가 수집 + 목표가 계산 (08:50)
# ─────────────────────────────────────────
async def calc_targets(self):
"""당일 시가 기반 목표가 계산"""
logger.info("목표가 계산 시작")
valid_count = 0
for ticker in self.universe:
try:
price_info = await self.kis.get_price(ticker)
self.strategy.set_today_open(ticker, price_info["open"])
target = self.strategy.get_target(ticker)
name = self.ticker_names.get(ticker, ticker)
if target > 0:
logger.info(f"목표가: {name}({ticker}) {target:,.0f}원 [시가 {price_info['open']:,}]")
valid_count += 1
await asyncio.sleep(1.1)
except Exception as e:
logger.warning(f"시가 수집 실패 {ticker}: {e}")
logger.info(f"목표가 계산 완료: {valid_count}/{len(self.universe)}종목 유효")
# ─────────────────────────────────────────
# 메인 매매 루프 (09:00~14:50)
# ─────────────────────────────────────────
async def trading_loop(self):
"""1초 단위 메인 루프"""
logger.info("매매 루프 시작")
self.running = True
_consecutive_errors = 0
while self.running:
try:
now = datetime.now()
now_str = now.strftime("%H:%M")
# 14:50 강제 청산
if now_str >= FORCE_EXIT:
await self.force_exit_all()
self.running = False
break
# 14:00 이후 신규 진입 중단 (청산은 계속)
if now_str > "14:00":
await self.check_exits()
await asyncio.sleep(1)
continue
# 09:00 이전 대기
if now_str < "09:00":
await asyncio.sleep(1)
continue
# midday_context.json 갱신 감지 (점심 세션 이벤트 기반 시작)
self._check_midday_context()
# 리스크 체크 (L2/L4/L5 하드 중단)
if not self.risk.can_trade():
await asyncio.sleep(5)
continue
# 보유 포지션 청산 체크
await self.check_exits()
# 신규 진입 체크
if self.risk.can_add_position(len(self.positions)):
await self.check_entries()
_consecutive_errors = 0
await asyncio.sleep(1)
except asyncio.CancelledError:
raise
except Exception as e:
_consecutive_errors += 1
logger.error(
f"매매 루프 오류 (연속 {_consecutive_errors}회): "
f"{type(e).__name__}: {e}",
exc_info=True,
)
await notify_error(f"매매 루프 오류 {_consecutive_errors}회: {type(e).__name__}: {e}")
# 연속 10회 오류 시 루프 종료 (무한 오류 방지)
if _consecutive_errors >= 10:
logger.critical("연속 오류 10회 — 매매 루프 강제 종료")
await notify_error("연속 오류 10회 — 매매 루프 강제 종료")
self.running = False
break
await asyncio.sleep(5)
# ─────────────────────────────────────────
# 진입 체크
# ─────────────────────────────────────────
async def check_entries(self):
"""유니버스 전체 진입 신호 확인"""
now_str = datetime.now().strftime("%H:%M")
# 14:00 이후 진입 차단
if now_str > "14:00":
return
# midday_context 로드 전(11:20~) 11:00 이후 신규 진입 일시 중단
# — midday_context.json이 생성되면 _check_midday_context()가 자동 해제
if now_str >= "11:00" and not self._midday_loaded:
return
# lunch_trade_allowed=false이면 점심 세션 진입 차단
if self._midday_loaded and not self.strategy.context.get("lunch_trade_allowed", True):
return
gate_reason = self._entry_gate_reason(now_str)
if gate_reason:
logger.info("ENTRY blocked: %s", gate_reason)
return
warning_reason = self._entry_warning_reason()
if warning_reason:
logger.warning("ENTRY warning: %s", warning_reason)
_now_ts = datetime.now().timestamp()
_do_diag = (_now_ts - self._last_diag) >= 300 # 5분마다 진단 로그
_diag = []
for ticker in self.universe:
if ticker in self.positions:
if _do_diag:
_diag.append(f"{ticker}:보유중")
continue
if self._db_has_open_position(ticker):
if _do_diag:
_diag.append(f"{ticker}:DB보유중")
continue
if ticker in self.sl_tickers:
if _do_diag:
_diag.append(f"{ticker}:SL차단")
continue # 당일 SL 종목 재진입 차단
if len(self.positions) >= MAX_POSITIONS:
break
# 목표가 미계산 종목 스킵 (불필요한 API 호출 방지)
target = self.strategy.get_target(ticker)
if target <= 0:
if _do_diag:
_diag.append(f"{ticker}:목표가없음")
continue
try:
reserved = False
price_info = await self._get_price_with_retry(ticker, "ENTRY")
current = price_info["current"]
name = self.ticker_names.get(ticker, ticker)
sector = (
self.ticker_sectors.get(ticker, "")
or self._infer_avoid_sector_from_name(name)
)
signal = self.strategy.check_entry(
ticker=ticker,
name=name,
current_price=current,
sector=sector,
)
if not signal["signal"]:
if _do_diag:
_diag.append(
f"{name}({ticker}):{signal['reason']}"
f"[현재가{current:,}/목표가{target:,.0f}]"
)
continue
balance = await self.kis.get_balance()
cash = balance["cash"]
# AI 신호 배율 × B안(연속 손절) 배율 × midday 배율
combined_mult = (
signal.get("multiplier", 1.0)
* self.risk.get_consec_multiplier()
* self._midday_pos_mult
)
invest = self.risk.get_pos_size(cash, combined_mult)
qty = max(1, int(invest // current))
reserve_stop = current * (1 - self.risk.get_sl_pct())
if not self._db_reserve_position(
ticker=ticker,
name=name,
entry_price=current,
qty=qty,
target_price=target,
stop_price=reserve_stop,
ai_boosted=signal.get("boosted", False),
):
logger.info("ENTRY blocked: DB active position exists for %s", ticker)
continue
reserved = True
self._log_entry_acceptance(
ticker=ticker,
name=name,
current=current,
target=target,
qty=qty,
multiplier=combined_mult,
reason=signal["reason"],
)
result = await self.executor.buy(
ticker=ticker, name=name,
qty=qty, reason=signal["reason"],
ai_boosted=signal.get("boosted", False),
)
if result["success"]:
reserved = False
entry_price = result["price"] or current
sl_price = entry_price * (1 - self.risk.get_sl_pct())
model_scores = self._score_entry_candidate(
ticker=ticker,
name=name,
price_info=price_info,
target=target,
entry_price=entry_price,
stop_price=sl_price,
signal=signal,
combined_mult=combined_mult,
)
self._save_entry_snapshot(
trade_id=result.get("trade_id"),
ticker=ticker,
name=name,
price_info=price_info,
target=target,
entry_price=entry_price,
stop_price=sl_price,
qty=qty,
signal=signal,
combined_mult=combined_mult,
model_scores=model_scores,
)
asyncio.create_task(
self._track_post_entry(
trade_id=result.get("trade_id"),
ticker=ticker,
name=name,
entry_price=entry_price,
)
)
pos = {
"name" : name,
"entry" : entry_price,
"qty" : qty,
"tp1_done" : False,
"entry_time": datetime.now(),
"sl_price" : sl_price,
"boosted" : signal.get("boosted", False),
}
self.positions[ticker] = pos
self._db_save_position(
ticker, pos,
target_price=self.strategy.get_target(ticker),
)
await notify_buy(
ticker=ticker, name=name,
price=entry_price,
target=int(entry_price * 1.03),
stop=int(sl_price),
boosted=signal.get("boosted", False),
)
else:
if reserved:
self._db_delete_position(ticker)
reserved = False
except Exception as e:
if "reserved" in locals() and reserved:
self._db_delete_position(ticker)
logger.error(f"진입 체크 오류 {ticker}: {type(e).__name__}: {e}")
if _do_diag:
self._last_diag = _now_ts
if _diag:
logger.info(f"[신호진단] {' | '.join(_diag)}")
else:
logger.info("[신호진단] 전 종목 신호 없음 (유니버스 비어있거나 모두 필터됨)")
# ─────────────────────────────────────────
# 청산 체크
# ─────────────────────────────────────────
async def check_exits(self):
"""보유 포지션 청산 신호 확인"""
for ticker, pos in list(self.positions.items()):
try:
price_info = await self._get_price_with_retry(ticker, "EXIT")
current = price_info["current"]
name = pos["name"]
# 시간 청산: MAX_HOLD_MIN 초과
hold_min = (datetime.now() - pos["entry_time"]).seconds / 60
if hold_min >= MAX_HOLD_MIN:
await self._do_exit(ticker, pos, current, qty=pos["qty"], reason="TIME")
continue
# 전략 청산 신호
signal = self.strategy.check_exit(
ticker=ticker,
entry_price=pos["entry"],
current_price=current,
qty=pos["qty"],
tp1_done=pos["tp1_done"],
sl_pct=self.risk.get_sl_pct(),
)
if signal["signal"]:
await self._do_exit(
ticker, pos, current,
qty=signal["qty"],
reason=signal["reason"],
)
await asyncio.sleep(1.1)
except Exception as e:
logger.error(f"청산 체크 오류 {ticker}: {type(e).__name__}: {e}")
await asyncio.sleep(5)
async def _do_exit(self, ticker: str, pos: dict,
current: float, qty: int, reason: str):
"""실제 청산 실행"""
name = pos["name"]
result = await self._sell_with_retry(ticker, name, qty, reason)
if not result["success"]:
return
exit_price = result["price"] or current
pnl = (exit_price - pos["entry"]) * qty
pnl_pct = (exit_price - pos["entry"]) / pos["entry"] * 100
self.risk.record_trade(pnl)
# B안: 연속 손절 2회·3회 도달 시 Discord 알림
if reason == "SL":
consec = self.risk.consec_loss
if consec in (2, 3):
mult = self.risk.get_consec_multiplier()
await notify_risk(
"L3-B",
f"{consec}연속 손절 — 포지션 크기 {int(mult * 100)}%로 축소"
)
if reason == "TP1":
pos["tp1_done"] = True
pos["qty"] -= qty
if pos["qty"] <= 0:
del self.positions[ticker]
self._db_delete_position(ticker)
self.strategy.mark_final_exit(ticker, reason)
else:
self._db_save_position(ticker, pos, self.strategy.get_target(ticker))
await notify_tp1(ticker, name, pnl_pct)
elif reason in ("TP2", "SL", "TIME", "FORCE"):
del self.positions[ticker]
self._db_delete_position(ticker)
self.strategy.mark_final_exit(ticker, reason)
if reason == "TP2":
await notify_tp2(ticker, name, pnl_pct)
elif reason == "SL":
self.sl_tickers.add(ticker) # 당일 재진입 차단
await notify_sl(ticker, name, pnl_pct)
# L2/L3 체크 후 디스코드 경고
if not self.risk.can_trade():
await notify_risk(
self.risk.stop_reason.split(":")[0],
self.risk.stop_reason
)
# ─────────────────────────────────────────
# 강제 청산 (14:50)
# ─────────────────────────────────────────
async def force_exit_all(self):
"""14:50 전량 강제 청산"""
logger.info("14:50 강제 청산 시작")
for ticker, pos in list(self.positions.items()):
try:
price_info = await self._get_price_with_retry(ticker, "FORCE_EXIT")
current = price_info["current"]
await self._do_exit(
ticker, pos, current,
qty=pos["qty"], reason="FORCE"
)
except Exception as e:
logger.error(f"강제 청산 실패 {ticker}: {e}")
await notify_force_exit()
logger.info("강제 청산 완료")
# ─────────────────────────────────────────
# 일일 결산 (15:10)
# ─────────────────────────────────────────
async def daily_summary(self):
"""당일 결산 로그 및 디스코드 알림 + DB 저장"""
today = datetime.now().strftime("%Y-%m-%d")
with get_conn() as conn:
rows = conn.execute("""
SELECT pnl, fee, exit_reason FROM trades
WHERE date=? AND exit_time IS NOT NULL
""", (today,)).fetchall()
pnls = [r[0] for r in rows if r[0] is not None]
fees = [r[1] for r in rows if r[1] is not None]
total = len(pnls)
wins = sum(1 for p in pnls if p > 0)
losses = total - wins
gross_pnl = sum(p for p in pnls if p > 0) - abs(sum(p for p in pnls if p < 0))
total_fee = sum(fees)
net = sum(pnls)
mdd = min(self.risk.daily_pnl / self.risk.init_cash * 100, 0.0)
stopped = 0 if self.risk.can_trade() else 1
exit_counts = {}
for _, _, reason in rows:
key = reason or "UNKNOWN"
exit_counts[key] = exit_counts.get(key, 0) + 1
# daily_summary 테이블 저장
with get_conn() as conn:
conn.execute("""
INSERT OR REPLACE INTO daily_summary
(date, total_trades, win_trades, lose_trades,
gross_pnl, total_fee, net_pnl, max_drawdown, trading_stopped)
VALUES (?,?,?,?,?,?,?,?,?)
""", (today, total, wins, losses, gross_pnl, total_fee, net, mdd, stopped))
await notify_daily_summary(total, wins, losses, net)
if exit_counts:
dist = " / ".join(f"{k}:{v}" for k, v in sorted(exit_counts.items()))
logger.info("Exit distribution: %s", dist)
await send(f"[청산분포] {dist}")
self.risk.reset_daily()
logger.info(f"결산: {total}회 / 승{wins}{losses} / {net:+,.0f}원 (fee {total_fee:,.0f}원)")
# ─────────────────────────────────────────
# 스케줄러
# ─────────────────────────────────────────
async def run():
bot = StockBot()
await bot.initialize()
now = datetime.now().strftime("%H:%M")
if "09:00" <= now < "15:00":
logger.info("장 중 재시작 감지 → AI 컨텍스트 로드 + 유니버스/목표가 즉시 계산")
ctx = bot.strategy.load_ai_context()
await notify_ai_result(
ctx["market_sentiment"],
ctx["sentiment_score"],
ctx.get("hot_sectors", []),
ctx.get("avoid_sectors", []),
ctx.get("reason", ""),
)
bot.risk.set_risk_level(ctx.get("risk_level", "보통"))
await bot.update_universe()
await bot.calc_targets()
await bot.trading_loop() # 바로 매매루프 진입
# 매매루프 종료 후 15:10 결산까지 대기
while True:
now = datetime.now().strftime("%H:%M")
if now == "15:10":
await bot.daily_summary()
return
if now > "15:10":
return
await asyncio.sleep(30)
# 08:30 이후~09:00 이전 시작 시 컨텍스트·유니버스 즉시 로드
if "08:30" <= now < "09:00":
logger.info("장 전 재시작 감지(08:30~09:00) → AI 컨텍스트 로드 + 유니버스 즉시 갱신")
ctx = bot.strategy.load_ai_context()
bot.risk.set_risk_level(ctx.get("risk_level", "보통"))
await bot.update_universe()
while True:
now = datetime.now().strftime("%H:%M")
try:
# 08:30 AI 컨텍스트 로드 + 유니버스 갱신
# (claude_morning이 08:15에 시작해 08:30 전에 daily_context.json 생성)
if now == "08:30":
ctx = bot.strategy.load_ai_context()
await notify_ai_result(
ctx["market_sentiment"],
ctx["sentiment_score"],
ctx.get("hot_sectors", []),
ctx.get("avoid_sectors", []),
ctx.get("reason", ""),
)
bot.risk.set_risk_level(ctx.get("risk_level", "보통"))
await bot.update_universe()
# 08:50 목표가 계산
elif now == "08:50":
await bot.calc_targets()
# 09:00 매매 루프 시작
elif now == "09:00":
await bot.trading_loop()
# 15:10 결산
elif now == "15:10":
await bot.daily_summary()
except Exception as e:
logger.error(f"스케줄러 루프 오류 ({now}): {e}", exc_info=True)
await asyncio.sleep(30)
if __name__ == "__main__":
os.makedirs("logs", exist_ok=True)
os.makedirs("data", exist_ok=True)
instance_lock = SingleInstanceLock("logs/stockbot.lock")
if not instance_lock.acquire():
logger.error("StockBot already running; duplicate process exiting")
sys.exit(2)
logger.info("=" * 50)
logger.info("단타 자동매매 시스템 시작")
logger.info(f"모드: {'모의투자' if os.getenv('KIS_MOCK','true')=='true' else '실거래'}")
logger.info(f"DRY_RUN: {os.getenv('DRY_RUN','true')}")
logger.info("=" * 50)
asyncio.run(run())