Files
Stock-trading-programming/app/main.py
T

678 lines
27 KiB
Python
Raw Normal View History

2026-05-14 15:14:50 +09:00
"""
main.py
단타 자동매매 시스템 메인 진입점
기획서 v2.1 기준
실행:
python -m app.main (Docker 컨테이너)
python app/main.py (로컬 테스트)
환경변수:
KIS_MOCK=true → 모의투자 모드
DRY_RUN=true → 신호만 확인, 주문 전송 안 함
"""
import json
2026-05-14 15:14:50 +09:00
import os
import sys
import asyncio
import logging
from datetime import datetime, time
from pathlib import Path
# .env 로드
def load_env():
env_path = Path(".env")
if not env_path.exists():
return
with open(env_path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, _, v = line.partition("=")
k = k.strip()
v = v.strip()
# 인라인 주석 제거 (예: true # 모의투자 → true)
if " #" in v:
v = v[:v.index(" #")]
2026-05-14 15:14:50 +09:00
v = v.strip().strip('"').strip("'")
if k and v and k not in os.environ:
os.environ[k] = v
load_env()
# 프로젝트 루트를 sys.path에 추가 (로컬 실행 시 필요)
ROOT = Path(__file__).parent.parent
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
2026-05-14 15:14:50 +09:00
# 로깅 설정
logging.basicConfig(
level=getattr(logging, os.getenv("LOG_LEVEL", "INFO")),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("logs/stockbot.log", encoding="utf-8"),
]
)
logger = logging.getLogger(__name__)
from app.execution.kis_client import KISClient
from app.execution.order_executor import OrderExecutor
from app.strategy.volatility_breakout import VolatilityBreakout
from app.risk.manager import RiskManager
from app.monitor.notifier import (
notify_buy, notify_tp1, notify_tp2, notify_sl,
notify_force_exit, notify_risk, notify_daily_summary,
notify_error, notify_ai_result, notify_ai_blocked,
notify_ai_fallback, send
)
from app.db.models import init_db, get_conn
from app.config import (
MAX_UNIVERSE, FORCE_EXIT, MAX_POSITIONS,
MAX_HOLD_MIN, KOSPI_MIN_CHG
)
class StockBot:
def __init__(self):
self.kis = KISClient()
self.executor = OrderExecutor(self.kis)
self.strategy = VolatilityBreakout()
self.positions = {} # ticker → {name, entry, qty, tp1_done, entry_time}
self.universe = [] # 감시 종목 리스트
self.ticker_names = {} # ticker → 종목명 캐시
self.sl_tickers = set() # 당일 SL 당한 종목 — 재진입 차단
self.risk = None # RiskManager (잔고 확인 후 초기화)
self.running = False
2026-05-14 15:14:50 +09:00
# 장중 컨텍스트 (midday_context.json 갱신 감지용)
self._midday_ctx_mtime : float = 0.0
self._midday_pos_mult : float = 1.0 # midday position_size_multiplier
self._midday_loaded : bool = False
2026-05-14 15:14:50 +09:00
mode = "모의투자" if self.kis.is_mock else "실거래"
dry = " [DRY_RUN]" if os.getenv("DRY_RUN","true")=="true" else ""
logger.info(f"StockBot 시작 [{mode}]{dry}")
# ─────────────────────────────────────────
# 장중 컨텍스트 감시
# ─────────────────────────────────────────
def _check_midday_context(self):
"""midday_context.json 갱신 감지 → 즉시 점심 세션 파라미터 반영"""
path = Path("data/midday_context.json")
if not path.exists():
return
try:
mtime = path.stat().st_mtime
except OSError:
return
if mtime <= self._midday_ctx_mtime:
return
try:
ctx = json.loads(path.read_text(encoding="utf-8"))
if ctx.get("date") != datetime.now().strftime("%Y-%m-%d"):
return
if not ctx.get("lunch_trade_allowed", True):
logger.warning("midday_context: 점심 세션 진입 중단 설정")
self._midday_pos_mult = float(ctx.get("position_size_multiplier", 1.0))
# 섹터·블랙리스트 업데이트
if "hot_sectors" in ctx:
self.strategy.context["hot_sectors"] = ctx["hot_sectors"]
if "avoid_sectors" in ctx:
self.strategy.context["avoid_sectors"] = ctx["avoid_sectors"]
for t in ctx.get("blacklist_tickers", []):
bl = self.strategy.context.setdefault("blacklist_tickers", [])
if t not in bl:
bl.append(t)
# lunch_trade_allowed=false이면 진입 자체를 막는 플래그 저장
self.strategy.context["lunch_trade_allowed"] = ctx.get("lunch_trade_allowed", True)
self._midday_ctx_mtime = mtime
self._midday_loaded = True
logger.info(
f"midday_context 로드 완료 — 점심 세션 시작 "
f"(포지션 배율: ×{self._midday_pos_mult}, "
f"진입허용: {ctx.get('lunch_trade_allowed', True)})"
)
except Exception as e:
logger.warning(f"midday_context 로드 실패: {e}")
2026-05-14 15:14:50 +09:00
# ─────────────────────────────────────────
# 초기화
# ─────────────────────────────────────────
async def initialize(self):
"""시스템 초기화"""
init_db()
await self.kis.get_access_token()
# 잔고 조회 → RiskManager 초기화
balance = await self.kis.get_balance()
cash = balance["cash"]
self.risk = RiskManager(init_cash=cash)
logger.info(f"초기 예수금: {cash:,}")
# DB에서 열린 포지션 복원 (재시작 시)
self._restore_positions_from_db()
# 당일 SL 종목 복원 (재시작 후에도 재진입 차단 유지)
self._restore_sl_tickers_from_db()
2026-05-14 15:14:50 +09:00
await send(f"[시작] 단타봇 가동 | 예수금: {cash:,}원 | "
f"{'모의투자' if self.kis.is_mock else '실거래'}")
def _restore_positions_from_db(self):
"""재시작 시 DB positions 테이블에서 인메모리 복원"""
with get_conn() as conn:
rows = conn.execute("SELECT * FROM positions").fetchall()
for r in rows:
ticker, name, entry_time, entry_price, qty, tp1_done, target_price, stop_price, ai_boosted = r
self.positions[ticker] = {
"name" : name,
"entry" : entry_price,
"qty" : qty,
"tp1_done" : bool(tp1_done),
"entry_time": datetime.strptime(entry_time, "%H:%M:%S").replace(
year=datetime.now().year,
month=datetime.now().month,
day=datetime.now().day),
"sl_price" : stop_price,
"boosted" : bool(ai_boosted),
}
if self.positions:
logger.info(f"DB 포지션 복원: {list(self.positions.keys())}")
def _restore_sl_tickers_from_db(self):
"""재시작 시 당일 SL 종목 복원 — 재진입 차단 유지"""
today = datetime.now().strftime("%Y-%m-%d")
with get_conn() as conn:
rows = conn.execute(
"SELECT DISTINCT ticker FROM trades WHERE date=? AND exit_reason='SL'",
(today,)
).fetchall()
for (ticker,) in rows:
self.sl_tickers.add(ticker)
if self.sl_tickers:
logger.info(f"당일 SL 종목 복원(재진입 차단): {self.sl_tickers}")
def _db_save_position(self, ticker: str, pos: dict, target_price: float):
with get_conn() as conn:
conn.execute("""
INSERT OR REPLACE INTO positions
(ticker, name, entry_time, entry_price, quantity,
tp1_done, target_price, stop_price, ai_boosted)
VALUES (?,?,?,?,?,?,?,?,?)
""", (
ticker, pos["name"],
pos["entry_time"].strftime("%H:%M:%S"),
pos["entry"], pos["qty"],
1 if pos.get("tp1_done") else 0,
target_price, pos["sl_price"],
1 if pos.get("boosted") else 0,
))
def _db_delete_position(self, ticker: str):
with get_conn() as conn:
conn.execute("DELETE FROM positions WHERE ticker=?", (ticker,))
2026-05-14 15:14:50 +09:00
# ─────────────────────────────────────────
# 유니버스 갱신 (08:30)
# ─────────────────────────────────────────
# ETF/ETN/인버스/레버리지 종목 필터
_ETF_KEYWORDS = ('인버스', '레버리지', '선물', 'KODEX', 'TIGER', 'KBSTAR',
'HANARO', 'ARIRANG', 'KOSEF', 'SOL', 'ACE', 'RISE', 'PLUS')
@staticmethod
def _is_etf(ticker: str, name: str) -> bool:
if ticker.startswith('Q') or len(ticker) != 6: # ETN or 비정상 코드
return True
return any(kw in name for kw in StockBot._ETF_KEYWORDS)
2026-05-14 15:14:50 +09:00
async def update_universe(self):
"""종목 풀 갱신 + 전일 데이터 수집"""
logger.info("유니버스 갱신 시작")
try:
# ETF 필터 후 MAX_UNIVERSE 확보 위해 여유분 요청
rank = await self.kis.get_volume_rank(top_n=MAX_UNIVERSE + 20)
# 종목명 캐시 갱신 + ETF 필터
for r in rank:
self.ticker_names[r["ticker"]] = r["name"]
tickers = [r["ticker"] for r in rank
if not self._is_etf(r["ticker"], r["name"])]
2026-05-14 15:14:50 +09:00
ctx = self.strategy.context
blacklist = ctx.get("blacklist_tickers", [])
tickers = [t for t in tickers if t not in blacklist]
boosted = ctx.get("boosted_tickers", [])
tickers = (
[t for t in boosted if t in tickers] +
[t for t in tickers if t not in boosted]
)[:MAX_UNIVERSE]
self.universe = tickers
logger.info(f"유니버스: {len(tickers)}종목 (ETF 제외)")
2026-05-14 15:14:50 +09:00
# 전일 날짜 계산
from datetime import timedelta
today = datetime.now()
# 월요일이면 금요일로
offset = 3 if today.weekday() == 0 else 1
prev_date = (today - timedelta(days=offset)).strftime("%Y%m%d")
2026-05-14 15:14:50 +09:00
for ticker in self.universe:
# 이미 전일 데이터 있으면 skip
if self.strategy.has_prev_data(ticker):
continue
2026-05-14 15:14:50 +09:00
try:
ohlcv = await self.kis.get_ohlcv_daily(
ticker,
start=prev_date,
end=prev_date,
2026-05-14 15:14:50 +09:00
)
if ohlcv:
prev = ohlcv[-1]
2026-05-14 15:14:50 +09:00
self.strategy.set_prev_data(
ticker,
high = prev["high"],
low = prev["low"],
amount= prev.get("amount",
prev.get("volume", 0) * prev.get("close", 0))
2026-05-14 15:14:50 +09:00
)
except Exception as e:
logger.warning(f"전일 데이터 실패 {ticker}: {e}")
await asyncio.sleep(1.1) # 초당 2.5건으로 제한
2026-05-14 15:14:50 +09:00
except Exception as e:
logger.error(f"유니버스 갱신 실패: {e}")
# ─────────────────────────────────────────
# 시가 수집 + 목표가 계산 (08:50)
# ─────────────────────────────────────────
async def calc_targets(self):
"""당일 시가 기반 목표가 계산"""
logger.info("목표가 계산 시작")
for ticker in self.universe:
try:
price_info = await self.kis.get_price(ticker)
self.strategy.set_today_open(ticker, price_info["open"])
target = self.strategy.get_target(ticker)
if target > 0:
logger.debug(f"{ticker} 목표가: {target:,.0f}")
await asyncio.sleep(1.1)
2026-05-14 15:14:50 +09:00
except Exception as e:
logger.warning(f"시가 수집 실패 {ticker}: {e}")
# ─────────────────────────────────────────
# 메인 매매 루프 (09:00~14:50)
# ─────────────────────────────────────────
async def trading_loop(self):
"""1초 단위 메인 루프"""
logger.info("매매 루프 시작")
self.running = True
while self.running:
now = datetime.now()
now_str = now.strftime("%H:%M")
# 14:50 강제 청산
if now_str >= FORCE_EXIT:
await self.force_exit_all()
self.running = False
break
# 14:00 이후 신규 진입 중단
if now_str > "14:00":
2026-05-14 15:14:50 +09:00
await asyncio.sleep(1)
continue
# 09:00 이전 대기
if now_str < "09:00":
await asyncio.sleep(1)
continue
# midday_context.json 갱신 감지 (점심 세션 이벤트 기반 시작)
self._check_midday_context()
2026-05-14 15:14:50 +09:00
# 리스크 체크 (L2/L4/L5 하드 중단)
2026-05-14 15:14:50 +09:00
if not self.risk.can_trade():
await asyncio.sleep(5)
continue
# 보유 포지션 청산 체크
await self.check_exits()
# 신규 진입 체크
if self.risk.can_add_position(len(self.positions)):
await self.check_entries()
await asyncio.sleep(1)
# ─────────────────────────────────────────
# 진입 체크
# ─────────────────────────────────────────
async def check_entries(self):
"""유니버스 전체 진입 신호 확인"""
now_str = datetime.now().strftime("%H:%M")
# 14:00 이후 진입 차단
if now_str > "14:00":
return
# midday_context 로드 전(11:20~) 11:00 이후 신규 진입 일시 중단
# — midday_context.json이 생성되면 _check_midday_context()가 자동 해제
if now_str >= "11:00" and not self._midday_loaded:
return
# lunch_trade_allowed=false이면 점심 세션 진입 차단
if self._midday_loaded and not self.strategy.context.get("lunch_trade_allowed", True):
return
2026-05-14 15:14:50 +09:00
for ticker in self.universe:
if ticker in self.positions:
continue
if ticker in self.sl_tickers:
continue # 당일 SL 종목 재진입 차단
2026-05-14 15:14:50 +09:00
if len(self.positions) >= MAX_POSITIONS:
break
# 목표가 미계산 종목 스킵 (불필요한 API 호출 방지)
if self.strategy.get_target(ticker) <= 0:
continue
2026-05-14 15:14:50 +09:00
try:
price_info = await self.kis.get_price(ticker) # rate limiter가 자동 throttle
2026-05-14 15:14:50 +09:00
current = price_info["current"]
name = self.ticker_names.get(ticker, ticker)
2026-05-14 15:14:50 +09:00
signal = self.strategy.check_entry(
ticker=ticker,
name=name,
current_price=current,
)
if not signal["signal"]:
continue
balance = await self.kis.get_balance()
cash = balance["cash"]
# AI 신호 배율 × B안(연속 손절) 배율 × midday 배율
combined_mult = (
signal.get("multiplier", 1.0)
* self.risk.get_consec_multiplier()
* self._midday_pos_mult
)
invest = self.risk.get_pos_size(cash, combined_mult)
qty = max(1, int(invest // current))
2026-05-14 15:14:50 +09:00
result = await self.executor.buy(
ticker=ticker, name=name,
qty=qty, reason=signal["reason"],
ai_boosted=signal.get("boosted", False),
)
if result["success"]:
entry_price = result["price"] or current
sl_price = entry_price * (1 - self.risk.get_sl_pct())
pos = {
2026-05-14 15:14:50 +09:00
"name" : name,
"entry" : entry_price,
"qty" : qty,
"tp1_done" : False,
"entry_time": datetime.now(),
"sl_price" : sl_price,
"boosted" : signal.get("boosted", False),
}
self.positions[ticker] = pos
self._db_save_position(
ticker, pos,
target_price=self.strategy.get_target(ticker),
)
2026-05-14 15:14:50 +09:00
await notify_buy(
ticker=ticker, name=name,
price=entry_price,
target=int(entry_price * 1.03),
stop=int(sl_price),
boosted=signal.get("boosted", False),
)
except Exception as e:
logger.error(f"진입 체크 오류 {ticker}: {e}")
# ─────────────────────────────────────────
# 청산 체크
# ─────────────────────────────────────────
async def check_exits(self):
"""보유 포지션 청산 신호 확인"""
for ticker, pos in list(self.positions.items()):
try:
price_info = await self.kis.get_price(ticker)
current = price_info["current"]
name = pos["name"]
# 시간 청산: MAX_HOLD_MIN 초과
hold_min = (datetime.now() - pos["entry_time"]).seconds / 60
if hold_min >= MAX_HOLD_MIN:
await self._do_exit(ticker, pos, current, qty=pos["qty"], reason="TIME")
continue
# 전략 청산 신호
signal = self.strategy.check_exit(
ticker=ticker,
entry_price=pos["entry"],
current_price=current,
qty=pos["qty"],
tp1_done=pos["tp1_done"],
sl_pct=self.risk.get_sl_pct(),
)
if signal["signal"]:
await self._do_exit(
ticker, pos, current,
qty=signal["qty"],
reason=signal["reason"],
)
await asyncio.sleep(1.1)
2026-05-14 15:14:50 +09:00
except Exception as e:
logger.error(f"청산 체크 오류 {ticker}: {e}")
async def _do_exit(self, ticker: str, pos: dict,
current: float, qty: int, reason: str):
"""실제 청산 실행"""
name = pos["name"]
result = await self.executor.sell(ticker, name, qty, reason)
if not result["success"]:
return
exit_price = result["price"] or current
pnl = (exit_price - pos["entry"]) * qty
pnl_pct = (exit_price - pos["entry"]) / pos["entry"] * 100
self.risk.record_trade(pnl)
# B안: 연속 손절 2회·3회 도달 시 Discord 알림
if reason == "SL":
consec = self.risk.consec_loss
if consec in (2, 3):
mult = self.risk.get_consec_multiplier()
await notify_risk(
"L3-B",
f"{consec}연속 손절 — 포지션 크기 {int(mult * 100)}%로 축소"
)
2026-05-14 15:14:50 +09:00
if reason == "TP1":
pos["tp1_done"] = True
pos["qty"] -= qty
if pos["qty"] <= 0:
del self.positions[ticker]
self._db_delete_position(ticker)
else:
self._db_save_position(ticker, pos, self.strategy.get_target(ticker))
2026-05-14 15:14:50 +09:00
await notify_tp1(ticker, name, pnl_pct)
elif reason in ("TP2", "SL", "TIME", "FORCE"):
del self.positions[ticker]
self._db_delete_position(ticker)
2026-05-14 15:14:50 +09:00
if reason == "TP2":
await notify_tp2(ticker, name, pnl_pct)
elif reason == "SL":
self.sl_tickers.add(ticker) # 당일 재진입 차단
2026-05-14 15:14:50 +09:00
await notify_sl(ticker, name, pnl_pct)
# L2/L3 체크 후 디스코드 경고
if not self.risk.can_trade():
await notify_risk(
self.risk.stop_reason.split(":")[0],
self.risk.stop_reason
)
# ─────────────────────────────────────────
# 강제 청산 (14:50)
# ─────────────────────────────────────────
async def force_exit_all(self):
"""14:50 전량 강제 청산"""
logger.info("14:50 강제 청산 시작")
for ticker, pos in list(self.positions.items()):
try:
price_info = await self.kis.get_price(ticker)
current = price_info["current"]
await self._do_exit(
ticker, pos, current,
qty=pos["qty"], reason="FORCE"
)
except Exception as e:
logger.error(f"강제 청산 실패 {ticker}: {e}")
await notify_force_exit()
logger.info("강제 청산 완료")
# ─────────────────────────────────────────
# 일일 결산 (15:10)
# ─────────────────────────────────────────
async def daily_summary(self):
"""당일 결산 로그 및 디스코드 알림 + DB 저장"""
2026-05-14 15:14:50 +09:00
today = datetime.now().strftime("%Y-%m-%d")
with get_conn() as conn:
rows = conn.execute("""
SELECT pnl, fee FROM trades
2026-05-14 15:14:50 +09:00
WHERE date=? AND exit_time IS NOT NULL
""", (today,)).fetchall()
pnls = [r[0] for r in rows if r[0] is not None]
fees = [r[1] for r in rows if r[1] is not None]
total = len(pnls)
wins = sum(1 for p in pnls if p > 0)
losses = total - wins
gross_pnl = sum(p for p in pnls if p > 0) - abs(sum(p for p in pnls if p < 0))
total_fee = sum(fees)
net = sum(pnls)
mdd = min(self.risk.daily_pnl / self.risk.init_cash * 100, 0.0)
stopped = 0 if self.risk.can_trade() else 1
# daily_summary 테이블 저장
with get_conn() as conn:
conn.execute("""
INSERT OR REPLACE INTO daily_summary
(date, total_trades, win_trades, lose_trades,
gross_pnl, total_fee, net_pnl, max_drawdown, trading_stopped)
VALUES (?,?,?,?,?,?,?,?,?)
""", (today, total, wins, losses, gross_pnl, total_fee, net, mdd, stopped))
2026-05-14 15:14:50 +09:00
await notify_daily_summary(total, wins, losses, net)
self.risk.reset_daily()
logger.info(f"결산: {total}회 / 승{wins}{losses} / {net:+,.0f}원 (fee {total_fee:,.0f}원)")
2026-05-14 15:14:50 +09:00
# ─────────────────────────────────────────
# 스케줄러
# ─────────────────────────────────────────
async def run():
bot = StockBot()
await bot.initialize()
now = datetime.now().strftime("%H:%M")
if "09:00" <= now <= "14:30":
logger.info("장 중 재시작 감지 → AI 컨텍스트 로드 + 유니버스/목표가 즉시 계산")
ctx = bot.strategy.load_ai_context()
await notify_ai_result(
ctx["market_sentiment"],
ctx["sentiment_score"],
ctx.get("hot_sectors", []),
ctx.get("avoid_sectors", []),
ctx.get("reason", ""),
)
bot.risk.set_risk_level(ctx.get("risk_level", "보통"))
await bot.update_universe()
await bot.calc_targets()
await bot.trading_loop() # 바로 매매루프 진입
# 매매루프 종료 후 15:10 결산까지 대기
while True:
now = datetime.now().strftime("%H:%M")
if now == "15:10":
await bot.daily_summary()
return
if now > "15:10":
return
await asyncio.sleep(30)
# 08:30 이후~09:00 이전 시작 시 컨텍스트·유니버스 즉시 로드
if "08:30" <= now < "09:00":
logger.info("장 전 재시작 감지(08:30~09:00) → AI 컨텍스트 로드 + 유니버스 즉시 갱신")
ctx = bot.strategy.load_ai_context()
bot.risk.set_risk_level(ctx.get("risk_level", "보통"))
await bot.update_universe()
2026-05-14 15:14:50 +09:00
while True:
now = datetime.now().strftime("%H:%M")
try:
# 08:30 AI 컨텍스트 로드 + 유니버스 갱신
# (claude_morning이 08:15에 시작해 08:30 전에 daily_context.json 생성)
if now == "08:30":
ctx = bot.strategy.load_ai_context()
await notify_ai_result(
ctx["market_sentiment"],
ctx["sentiment_score"],
ctx.get("hot_sectors", []),
ctx.get("avoid_sectors", []),
ctx.get("reason", ""),
)
bot.risk.set_risk_level(ctx.get("risk_level", "보통"))
await bot.update_universe()
2026-05-14 15:14:50 +09:00
# 08:50 목표가 계산
elif now == "08:50":
await bot.calc_targets()
2026-05-14 15:14:50 +09:00
# 09:00 매매 루프 시작
elif now == "09:00":
await bot.trading_loop()
2026-05-14 15:14:50 +09:00
# 15:10 결산
elif now == "15:10":
await bot.daily_summary()
except Exception as e:
logger.error(f"스케줄러 루프 오류 ({now}): {e}", exc_info=True)
2026-05-14 15:14:50 +09:00
await asyncio.sleep(30)
if __name__ == "__main__":
os.makedirs("logs", exist_ok=True)
os.makedirs("data", exist_ok=True)
logger.info("=" * 50)
logger.info("단타 자동매매 시스템 시작")
logger.info(f"모드: {'모의투자' if os.getenv('KIS_MOCK','true')=='true' else '실거래'}")
logger.info(f"DRY_RUN: {os.getenv('DRY_RUN','true')}")
logger.info("=" * 50)
asyncio.run(run())