Files
Stock-trading-programming/app/main.py
T
whdwo798 e60b59a644 [2026-05-15] 작업 스케줄러 자동화 + main.py 타이밍 수정
- scripts/run_morning.ps1: claude_morning 실행 스크립트
- scripts/run_bot.bat: 매매봇 실행 스크립트
- scripts/setup_scheduler.ps1: 작업 스케줄러 등록 스크립트
- app/main.py: AI 컨텍스트 로드 타이밍 08:05→08:30으로 수정
  (claude_morning이 08:15 시작해 08:30 전에 완료되므로)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 14:02:53 +09:00

478 lines
18 KiB
Python

"""
main.py
단타 자동매매 시스템 메인 진입점
기획서 v2.1 기준
실행:
python -m app.main (Docker 컨테이너)
python app/main.py (로컬 테스트)
환경변수:
KIS_MOCK=true → 모의투자 모드
DRY_RUN=true → 신호만 확인, 주문 전송 안 함
"""
import os
import sys
import asyncio
import logging
from datetime import datetime, time
from pathlib import Path
# .env 로드
def load_env():
env_path = Path(".env")
if not env_path.exists():
return
with open(env_path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, _, v = line.partition("=")
k = k.strip()
v = v.strip()
# 인라인 주석 제거 (예: true # 모의투자 → true)
if " #" in v:
v = v[:v.index(" #")]
v = v.strip().strip('"').strip("'")
if k and v and k not in os.environ:
os.environ[k] = v
load_env()
# 프로젝트 루트를 sys.path에 추가 (로컬 실행 시 필요)
ROOT = Path(__file__).parent.parent
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
# 로깅 설정
logging.basicConfig(
level=getattr(logging, os.getenv("LOG_LEVEL", "INFO")),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("logs/stockbot.log", encoding="utf-8"),
]
)
logger = logging.getLogger(__name__)
from app.execution.kis_client import KISClient
from app.execution.order_executor import OrderExecutor
from app.strategy.volatility_breakout import VolatilityBreakout
from app.risk.manager import RiskManager
from app.monitor.notifier import (
notify_buy, notify_tp1, notify_tp2, notify_sl,
notify_force_exit, notify_risk, notify_daily_summary,
notify_error, notify_ai_result, notify_ai_blocked,
notify_ai_fallback, send
)
from app.db.models import init_db, get_conn
from app.config import (
MAX_UNIVERSE, FORCE_EXIT, MAX_POSITIONS,
MAX_HOLD_MIN, KOSPI_MIN_CHG
)
class StockBot:
def __init__(self):
self.kis = KISClient()
self.executor = OrderExecutor(self.kis)
self.strategy = VolatilityBreakout()
self.positions = {} # ticker → {name, entry, qty, tp1_done, entry_time}
self.universe = [] # 감시 종목 리스트
self.risk = None # RiskManager (잔고 확인 후 초기화)
self.running = False
mode = "모의투자" if self.kis.is_mock else "실거래"
dry = " [DRY_RUN]" if os.getenv("DRY_RUN","true")=="true" else ""
logger.info(f"StockBot 시작 [{mode}]{dry}")
# ─────────────────────────────────────────
# 초기화
# ─────────────────────────────────────────
async def initialize(self):
"""시스템 초기화"""
init_db()
await self.kis.get_access_token()
# 잔고 조회 → RiskManager 초기화
balance = await self.kis.get_balance()
cash = balance["cash"]
self.risk = RiskManager(init_cash=cash)
logger.info(f"초기 예수금: {cash:,}")
await send(f"[시작] 단타봇 가동 | 예수금: {cash:,}원 | "
f"{'모의투자' if self.kis.is_mock else '실거래'}")
# ─────────────────────────────────────────
# 유니버스 갱신 (08:30)
# ─────────────────────────────────────────
async def update_universe(self):
"""종목 풀 갱신 + 전일 데이터 수집"""
logger.info("유니버스 갱신 시작")
try:
rank = await self.kis.get_volume_rank(top_n=MAX_UNIVERSE)
tickers = [r["ticker"] for r in rank]
ctx = self.strategy.context
blacklist = ctx.get("blacklist_tickers", [])
tickers = [t for t in tickers if t not in blacklist]
boosted = ctx.get("boosted_tickers", [])
tickers = (
[t for t in boosted if t in tickers] +
[t for t in tickers if t not in boosted]
)[:MAX_UNIVERSE]
self.universe = tickers
logger.info(f"유니버스: {len(tickers)}종목")
# 전일 날짜 계산
from datetime import timedelta
today = datetime.now()
# 월요일이면 금요일로
offset = 3 if today.weekday() == 0 else 1
prev_date = (today - timedelta(days=offset)).strftime("%Y%m%d")
for ticker in self.universe:
# 이미 전일 데이터 있으면 skip
if self.strategy.has_prev_data(ticker):
continue
try:
ohlcv = await self.kis.get_ohlcv_daily(
ticker,
start=prev_date,
end=prev_date,
)
if ohlcv:
prev = ohlcv[-1]
self.strategy.set_prev_data(
ticker,
high = prev["high"],
low = prev["low"],
amount= prev.get("amount",
prev.get("volume", 0) * prev.get("close", 0))
)
except Exception as e:
logger.warning(f"전일 데이터 실패 {ticker}: {e}")
await asyncio.sleep(1.1) # 초당 2.5건으로 제한
except Exception as e:
logger.error(f"유니버스 갱신 실패: {e}")
# ─────────────────────────────────────────
# 시가 수집 + 목표가 계산 (08:50)
# ─────────────────────────────────────────
async def calc_targets(self):
"""당일 시가 기반 목표가 계산"""
logger.info("목표가 계산 시작")
for ticker in self.universe:
try:
price_info = await self.kis.get_price(ticker)
self.strategy.set_today_open(ticker, price_info["open"])
target = self.strategy.get_target(ticker)
if target > 0:
logger.debug(f"{ticker} 목표가: {target:,.0f}")
await asyncio.sleep(1.1)
except Exception as e:
logger.warning(f"시가 수집 실패 {ticker}: {e}")
# ─────────────────────────────────────────
# 메인 매매 루프 (09:00~14:50)
# ─────────────────────────────────────────
async def trading_loop(self):
"""1초 단위 메인 루프"""
logger.info("매매 루프 시작")
self.running = True
while self.running:
now = datetime.now()
now_str = now.strftime("%H:%M")
# 14:50 강제 청산
if now_str >= FORCE_EXIT:
await self.force_exit_all()
self.running = False
break
# 14:30 이후 신규 진입 중단
if now_str > "14:30":
await asyncio.sleep(1)
continue
# 09:00 이전 대기
if now_str < "09:00":
await asyncio.sleep(1)
continue
# 점심 (11:30~13:00) 신규 진입 중단
if "11:30" <= now_str < "13:00":
await asyncio.sleep(1)
continue
# 리스크 체크
if not self.risk.can_trade():
await asyncio.sleep(5)
continue
# 보유 포지션 청산 체크
await self.check_exits()
# 신규 진입 체크
if self.risk.can_add_position(len(self.positions)):
await self.check_entries()
await asyncio.sleep(1)
# ─────────────────────────────────────────
# 진입 체크
# ─────────────────────────────────────────
async def check_entries(self):
"""유니버스 전체 진입 신호 확인"""
for ticker in self.universe:
if ticker in self.positions:
continue
if len(self.positions) >= MAX_POSITIONS:
break
try:
price_info = await self.kis.get_price(ticker)
current = price_info["current"]
name = price_info.get("name", ticker)
# 전략 신호 체크
signal = self.strategy.check_entry(
ticker=ticker,
name=name,
current_price=current,
)
if not signal["signal"]:
continue
# 포지션 사이즈 계산
balance = await self.kis.get_balance()
cash = balance["cash"]
invest = self.risk.get_pos_size(
cash, signal.get("multiplier", 1.0)
)
qty = max(1, int(invest // current))
# 매수 실행
result = await self.executor.buy(
ticker=ticker, name=name,
qty=qty, reason=signal["reason"],
ai_boosted=signal.get("boosted", False),
)
if result["success"]:
entry_price = result["price"] or current
sl_price = entry_price * (1 - self.risk.get_sl_pct())
tp1_price = entry_price * (1 + 0.02)
self.positions[ticker] = {
"name" : name,
"entry" : entry_price,
"qty" : qty,
"tp1_done" : False,
"entry_time": datetime.now(),
"sl_price" : sl_price,
"boosted" : signal.get("boosted", False),
}
await notify_buy(
ticker=ticker, name=name,
price=entry_price,
target=int(entry_price * 1.03),
stop=int(sl_price),
boosted=signal.get("boosted", False),
)
await asyncio.sleep(1.1)
except Exception as e:
logger.error(f"진입 체크 오류 {ticker}: {e}")
# ─────────────────────────────────────────
# 청산 체크
# ─────────────────────────────────────────
async def check_exits(self):
"""보유 포지션 청산 신호 확인"""
for ticker, pos in list(self.positions.items()):
try:
price_info = await self.kis.get_price(ticker)
current = price_info["current"]
name = pos["name"]
# 시간 청산: MAX_HOLD_MIN 초과
hold_min = (datetime.now() - pos["entry_time"]).seconds / 60
if hold_min >= MAX_HOLD_MIN:
await self._do_exit(ticker, pos, current, qty=pos["qty"], reason="TIME")
continue
# 전략 청산 신호
signal = self.strategy.check_exit(
ticker=ticker,
entry_price=pos["entry"],
current_price=current,
qty=pos["qty"],
tp1_done=pos["tp1_done"],
sl_pct=self.risk.get_sl_pct(),
)
if signal["signal"]:
await self._do_exit(
ticker, pos, current,
qty=signal["qty"],
reason=signal["reason"],
)
await asyncio.sleep(1.1)
except Exception as e:
logger.error(f"청산 체크 오류 {ticker}: {e}")
async def _do_exit(self, ticker: str, pos: dict,
current: float, qty: int, reason: str):
"""실제 청산 실행"""
name = pos["name"]
result = await self.executor.sell(ticker, name, qty, reason)
if not result["success"]:
return
exit_price = result["price"] or current
pnl = (exit_price - pos["entry"]) * qty
pnl_pct = (exit_price - pos["entry"]) / pos["entry"] * 100
self.risk.record_trade(pnl)
if reason == "TP1":
pos["tp1_done"] = True
pos["qty"] -= qty
if pos["qty"] <= 0:
del self.positions[ticker]
await notify_tp1(ticker, name, pnl_pct)
elif reason in ("TP2", "SL", "TIME", "FORCE"):
del self.positions[ticker]
if reason == "TP2":
await notify_tp2(ticker, name, pnl_pct)
elif reason == "SL":
await notify_sl(ticker, name, pnl_pct)
# L2/L3 체크 후 디스코드 경고
if not self.risk.can_trade():
await notify_risk(
self.risk.stop_reason.split(":")[0],
self.risk.stop_reason
)
# ─────────────────────────────────────────
# 강제 청산 (14:50)
# ─────────────────────────────────────────
async def force_exit_all(self):
"""14:50 전량 강제 청산"""
logger.info("14:50 강제 청산 시작")
for ticker, pos in list(self.positions.items()):
try:
price_info = await self.kis.get_price(ticker)
current = price_info["current"]
await self._do_exit(
ticker, pos, current,
qty=pos["qty"], reason="FORCE"
)
except Exception as e:
logger.error(f"강제 청산 실패 {ticker}: {e}")
await notify_force_exit()
logger.info("강제 청산 완료")
# ─────────────────────────────────────────
# 일일 결산 (15:10)
# ─────────────────────────────────────────
async def daily_summary(self):
"""당일 결산 로그 및 디스코드 알림"""
today = datetime.now().strftime("%Y-%m-%d")
with get_conn() as conn:
rows = conn.execute("""
SELECT pnl FROM trades
WHERE date=? AND exit_time IS NOT NULL
""", (today,)).fetchall()
pnls = [r[0] for r in rows if r[0] is not None]
total = len(pnls)
wins = sum(1 for p in pnls if p > 0)
losses = total - wins
net = sum(pnls)
await notify_daily_summary(total, wins, losses, net)
self.risk.reset_daily()
logger.info(f"결산: {total}회 / 승{wins}{losses} / {net:+,.0f}")
# ─────────────────────────────────────────
# 스케줄러
# ─────────────────────────────────────────
async def run():
bot = StockBot()
await bot.initialize()
now = datetime.now().strftime("%H:%M")
if "09:00" <= now <= "14:30":
logger.info("장 중 재시작 감지 → 유니버스/목표가 즉시 계산")
await bot.update_universe()
await bot.calc_targets()
await bot.trading_loop() # 바로 매매루프 진입
return
while True:
now = datetime.now().strftime("%H:%M")
# 08:30 AI 컨텍스트 로드 + 유니버스 갱신
# (claude_morning이 08:15에 시작해 08:30 전에 daily_context.json 생성)
if now == "08:30":
ctx = bot.strategy.load_ai_context()
await notify_ai_result(
ctx["market_sentiment"],
ctx["sentiment_score"],
ctx.get("hot_sectors", []),
ctx.get("avoid_sectors", []),
ctx.get("reason", ""),
)
bot.risk.set_risk_level(ctx.get("risk_level", "보통"))
await bot.update_universe()
# 08:50 목표가 계산
elif now == "08:50":
await bot.calc_targets()
# 09:00 매매 루프 시작
elif now == "09:00":
await bot.trading_loop()
# 15:10 결산
elif now == "15:10":
await bot.daily_summary()
await asyncio.sleep(30)
if __name__ == "__main__":
os.makedirs("logs", exist_ok=True)
os.makedirs("data", exist_ok=True)
logger.info("=" * 50)
logger.info("단타 자동매매 시스템 시작")
logger.info(f"모드: {'모의투자' if os.getenv('KIS_MOCK','true')=='true' else '실거래'}")
logger.info(f"DRY_RUN: {os.getenv('DRY_RUN','true')}")
logger.info("=" * 50)
asyncio.run(run())