Files
Stock-trading-programming/app/main.py
T
whdwo798 1690f4e248 버그 수정 — 스케줄러 한글 경로 깨짐 + Python 로그 UTF-8 강제
- setup_scheduler.ps1: chcp 65001, StockBot_Bot 제거, XML 경로 자동 교정
- run_morning/midday/evening.ps1: chcp 65001 + PYTHONUTF8=1 추가
- app/main.py: sys.stdout.reconfigure(encoding=utf-8) 추가

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 13:54:15 +09:00

686 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
main.py
단타 자동매매 시스템 메인 진입점
기획서 v2.1 기준
실행:
python -m app.main (Docker 컨테이너)
python app/main.py (로컬 테스트)
환경변수:
KIS_MOCK=true → 모의투자 모드
DRY_RUN=true → 신호만 확인, 주문 전송 안 함
"""
import io
import json
import os
import sys
import asyncio
import logging
from datetime import datetime, time
from pathlib import Path
# 한글 로그 깨짐 방지 — stdout을 UTF-8로 강제
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
elif hasattr(sys.stdout, "buffer"):
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", line_buffering=True)
# .env 로드
def load_env():
env_path = Path(".env")
if not env_path.exists():
return
with open(env_path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, _, v = line.partition("=")
k = k.strip()
v = v.strip()
# 인라인 주석 제거 (예: true # 모의투자 → true)
if " #" in v:
v = v[:v.index(" #")]
v = v.strip().strip('"').strip("'")
if k and v and k not in os.environ:
os.environ[k] = v
load_env()
# 프로젝트 루트를 sys.path에 추가 (로컬 실행 시 필요)
ROOT = Path(__file__).parent.parent
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
# 로깅 설정
logging.basicConfig(
level=getattr(logging, os.getenv("LOG_LEVEL", "INFO")),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("logs/stockbot.log", encoding="utf-8"),
]
)
logger = logging.getLogger(__name__)
from app.execution.kis_client import KISClient
from app.execution.order_executor import OrderExecutor
from app.strategy.volatility_breakout import VolatilityBreakout
from app.risk.manager import RiskManager
from app.monitor.notifier import (
notify_buy, notify_tp1, notify_tp2, notify_sl,
notify_force_exit, notify_risk, notify_daily_summary,
notify_error, notify_ai_result, notify_ai_blocked,
notify_ai_fallback, send
)
from app.db.models import init_db, get_conn
from app.config import (
MAX_UNIVERSE, FORCE_EXIT, MAX_POSITIONS,
MAX_HOLD_MIN, KOSPI_MIN_CHG
)
class StockBot:
def __init__(self):
self.kis = KISClient()
self.executor = OrderExecutor(self.kis)
self.strategy = VolatilityBreakout()
self.positions = {} # ticker → {name, entry, qty, tp1_done, entry_time}
self.universe = [] # 감시 종목 리스트
self.ticker_names = {} # ticker → 종목명 캐시
self.sl_tickers = set() # 당일 SL 당한 종목 — 재진입 차단
self.risk = None # RiskManager (잔고 확인 후 초기화)
self.running = False
# 장중 컨텍스트 (midday_context.json 갱신 감지용)
self._midday_ctx_mtime : float = 0.0
self._midday_pos_mult : float = 1.0 # midday position_size_multiplier
self._midday_loaded : bool = False
mode = "모의투자" if self.kis.is_mock else "실거래"
dry = " [DRY_RUN]" if os.getenv("DRY_RUN","true")=="true" else ""
logger.info(f"StockBot 시작 [{mode}]{dry}")
# ─────────────────────────────────────────
# 장중 컨텍스트 감시
# ─────────────────────────────────────────
def _check_midday_context(self):
"""midday_context.json 갱신 감지 → 즉시 점심 세션 파라미터 반영"""
path = Path("data/midday_context.json")
if not path.exists():
return
try:
mtime = path.stat().st_mtime
except OSError:
return
if mtime <= self._midday_ctx_mtime:
return
try:
ctx = json.loads(path.read_text(encoding="utf-8"))
if ctx.get("date") != datetime.now().strftime("%Y-%m-%d"):
return
if not ctx.get("lunch_trade_allowed", True):
logger.warning("midday_context: 점심 세션 진입 중단 설정")
self._midday_pos_mult = float(ctx.get("position_size_multiplier", 1.0))
# 섹터·블랙리스트 업데이트
if "hot_sectors" in ctx:
self.strategy.context["hot_sectors"] = ctx["hot_sectors"]
if "avoid_sectors" in ctx:
self.strategy.context["avoid_sectors"] = ctx["avoid_sectors"]
for t in ctx.get("blacklist_tickers", []):
bl = self.strategy.context.setdefault("blacklist_tickers", [])
if t not in bl:
bl.append(t)
# lunch_trade_allowed=false이면 진입 자체를 막는 플래그 저장
self.strategy.context["lunch_trade_allowed"] = ctx.get("lunch_trade_allowed", True)
self._midday_ctx_mtime = mtime
self._midday_loaded = True
logger.info(
f"midday_context 로드 완료 — 점심 세션 시작 "
f"(포지션 배율: ×{self._midday_pos_mult}, "
f"진입허용: {ctx.get('lunch_trade_allowed', True)})"
)
except Exception as e:
logger.warning(f"midday_context 로드 실패: {e}")
# ─────────────────────────────────────────
# 초기화
# ─────────────────────────────────────────
async def initialize(self):
"""시스템 초기화"""
init_db()
await self.kis.get_access_token()
# 잔고 조회 → RiskManager 초기화
balance = await self.kis.get_balance()
cash = balance["cash"]
self.risk = RiskManager(init_cash=cash)
logger.info(f"초기 예수금: {cash:,}")
# DB에서 열린 포지션 복원 (재시작 시)
self._restore_positions_from_db()
# 당일 SL 종목 복원 (재시작 후에도 재진입 차단 유지)
self._restore_sl_tickers_from_db()
await send(f"[시작] 단타봇 가동 | 예수금: {cash:,}원 | "
f"{'모의투자' if self.kis.is_mock else '실거래'}")
def _restore_positions_from_db(self):
"""재시작 시 DB positions 테이블에서 인메모리 복원"""
with get_conn() as conn:
rows = conn.execute("SELECT * FROM positions").fetchall()
for r in rows:
ticker, name, entry_time, entry_price, qty, tp1_done, target_price, stop_price, ai_boosted = r
self.positions[ticker] = {
"name" : name,
"entry" : entry_price,
"qty" : qty,
"tp1_done" : bool(tp1_done),
"entry_time": datetime.strptime(entry_time, "%H:%M:%S").replace(
year=datetime.now().year,
month=datetime.now().month,
day=datetime.now().day),
"sl_price" : stop_price,
"boosted" : bool(ai_boosted),
}
if self.positions:
logger.info(f"DB 포지션 복원: {list(self.positions.keys())}")
def _restore_sl_tickers_from_db(self):
"""재시작 시 당일 SL 종목 복원 — 재진입 차단 유지"""
today = datetime.now().strftime("%Y-%m-%d")
with get_conn() as conn:
rows = conn.execute(
"SELECT DISTINCT ticker FROM trades WHERE date=? AND exit_reason='SL'",
(today,)
).fetchall()
for (ticker,) in rows:
self.sl_tickers.add(ticker)
if self.sl_tickers:
logger.info(f"당일 SL 종목 복원(재진입 차단): {self.sl_tickers}")
def _db_save_position(self, ticker: str, pos: dict, target_price: float):
with get_conn() as conn:
conn.execute("""
INSERT OR REPLACE INTO positions
(ticker, name, entry_time, entry_price, quantity,
tp1_done, target_price, stop_price, ai_boosted)
VALUES (?,?,?,?,?,?,?,?,?)
""", (
ticker, pos["name"],
pos["entry_time"].strftime("%H:%M:%S"),
pos["entry"], pos["qty"],
1 if pos.get("tp1_done") else 0,
target_price, pos["sl_price"],
1 if pos.get("boosted") else 0,
))
def _db_delete_position(self, ticker: str):
with get_conn() as conn:
conn.execute("DELETE FROM positions WHERE ticker=?", (ticker,))
# ─────────────────────────────────────────
# 유니버스 갱신 (08:30)
# ─────────────────────────────────────────
# ETF/ETN/인버스/레버리지 종목 필터
_ETF_KEYWORDS = ('인버스', '레버리지', '선물', 'KODEX', 'TIGER', 'KBSTAR',
'HANARO', 'ARIRANG', 'KOSEF', 'SOL', 'ACE', 'RISE', 'PLUS')
@staticmethod
def _is_etf(ticker: str, name: str) -> bool:
if ticker.startswith('Q') or len(ticker) != 6: # ETN or 비정상 코드
return True
return any(kw in name for kw in StockBot._ETF_KEYWORDS)
async def update_universe(self):
"""종목 풀 갱신 + 전일 데이터 수집"""
logger.info("유니버스 갱신 시작")
try:
# ETF 필터 후 MAX_UNIVERSE 확보 위해 여유분 요청
rank = await self.kis.get_volume_rank(top_n=MAX_UNIVERSE + 20)
# 종목명 캐시 갱신 + ETF 필터
for r in rank:
self.ticker_names[r["ticker"]] = r["name"]
tickers = [r["ticker"] for r in rank
if not self._is_etf(r["ticker"], r["name"])]
ctx = self.strategy.context
blacklist = ctx.get("blacklist_tickers", [])
tickers = [t for t in tickers if t not in blacklist]
boosted = ctx.get("boosted_tickers", [])
tickers = (
[t for t in boosted if t in tickers] +
[t for t in tickers if t not in boosted]
)[:MAX_UNIVERSE]
self.universe = tickers
logger.info(f"유니버스: {len(tickers)}종목 (ETF 제외)")
# 전일 날짜 계산
from datetime import timedelta
today = datetime.now()
# 월요일이면 금요일로
offset = 3 if today.weekday() == 0 else 1
prev_date = (today - timedelta(days=offset)).strftime("%Y%m%d")
for ticker in self.universe:
# 이미 전일 데이터 있으면 skip
if self.strategy.has_prev_data(ticker):
continue
try:
ohlcv = await self.kis.get_ohlcv_daily(
ticker,
start=prev_date,
end=prev_date,
)
if ohlcv:
prev = ohlcv[-1]
self.strategy.set_prev_data(
ticker,
high = prev["high"],
low = prev["low"],
amount= prev.get("amount",
prev.get("volume", 0) * prev.get("close", 0))
)
except Exception as e:
logger.warning(f"전일 데이터 실패 {ticker}: {e}")
await asyncio.sleep(1.1) # 초당 2.5건으로 제한
except Exception as e:
logger.error(f"유니버스 갱신 실패: {e}")
# ─────────────────────────────────────────
# 시가 수집 + 목표가 계산 (08:50)
# ─────────────────────────────────────────
async def calc_targets(self):
"""당일 시가 기반 목표가 계산"""
logger.info("목표가 계산 시작")
for ticker in self.universe:
try:
price_info = await self.kis.get_price(ticker)
self.strategy.set_today_open(ticker, price_info["open"])
target = self.strategy.get_target(ticker)
if target > 0:
logger.debug(f"{ticker} 목표가: {target:,.0f}")
await asyncio.sleep(1.1)
except Exception as e:
logger.warning(f"시가 수집 실패 {ticker}: {e}")
# ─────────────────────────────────────────
# 메인 매매 루프 (09:00~14:50)
# ─────────────────────────────────────────
async def trading_loop(self):
"""1초 단위 메인 루프"""
logger.info("매매 루프 시작")
self.running = True
while self.running:
now = datetime.now()
now_str = now.strftime("%H:%M")
# 14:50 강제 청산
if now_str >= FORCE_EXIT:
await self.force_exit_all()
self.running = False
break
# 14:00 이후 신규 진입 중단 (청산은 계속)
if now_str > "14:00":
await self.check_exits()
await asyncio.sleep(1)
continue
# 09:00 이전 대기
if now_str < "09:00":
await asyncio.sleep(1)
continue
# midday_context.json 갱신 감지 (점심 세션 이벤트 기반 시작)
self._check_midday_context()
# 리스크 체크 (L2/L4/L5 하드 중단)
if not self.risk.can_trade():
await asyncio.sleep(5)
continue
# 보유 포지션 청산 체크
await self.check_exits()
# 신규 진입 체크
if self.risk.can_add_position(len(self.positions)):
await self.check_entries()
await asyncio.sleep(1)
# ─────────────────────────────────────────
# 진입 체크
# ─────────────────────────────────────────
async def check_entries(self):
"""유니버스 전체 진입 신호 확인"""
now_str = datetime.now().strftime("%H:%M")
# 14:00 이후 진입 차단
if now_str > "14:00":
return
# midday_context 로드 전(11:20~) 11:00 이후 신규 진입 일시 중단
# — midday_context.json이 생성되면 _check_midday_context()가 자동 해제
if now_str >= "11:00" and not self._midday_loaded:
return
# lunch_trade_allowed=false이면 점심 세션 진입 차단
if self._midday_loaded and not self.strategy.context.get("lunch_trade_allowed", True):
return
for ticker in self.universe:
if ticker in self.positions:
continue
if ticker in self.sl_tickers:
continue # 당일 SL 종목 재진입 차단
if len(self.positions) >= MAX_POSITIONS:
break
# 목표가 미계산 종목 스킵 (불필요한 API 호출 방지)
if self.strategy.get_target(ticker) <= 0:
continue
try:
price_info = await self.kis.get_price(ticker) # rate limiter가 자동 throttle
current = price_info["current"]
name = self.ticker_names.get(ticker, ticker)
signal = self.strategy.check_entry(
ticker=ticker,
name=name,
current_price=current,
)
if not signal["signal"]:
continue
balance = await self.kis.get_balance()
cash = balance["cash"]
# AI 신호 배율 × B안(연속 손절) 배율 × midday 배율
combined_mult = (
signal.get("multiplier", 1.0)
* self.risk.get_consec_multiplier()
* self._midday_pos_mult
)
invest = self.risk.get_pos_size(cash, combined_mult)
qty = max(1, int(invest // current))
result = await self.executor.buy(
ticker=ticker, name=name,
qty=qty, reason=signal["reason"],
ai_boosted=signal.get("boosted", False),
)
if result["success"]:
entry_price = result["price"] or current
sl_price = entry_price * (1 - self.risk.get_sl_pct())
pos = {
"name" : name,
"entry" : entry_price,
"qty" : qty,
"tp1_done" : False,
"entry_time": datetime.now(),
"sl_price" : sl_price,
"boosted" : signal.get("boosted", False),
}
self.positions[ticker] = pos
self._db_save_position(
ticker, pos,
target_price=self.strategy.get_target(ticker),
)
await notify_buy(
ticker=ticker, name=name,
price=entry_price,
target=int(entry_price * 1.03),
stop=int(sl_price),
boosted=signal.get("boosted", False),
)
except Exception as e:
logger.error(f"진입 체크 오류 {ticker}: {e}")
# ─────────────────────────────────────────
# 청산 체크
# ─────────────────────────────────────────
async def check_exits(self):
"""보유 포지션 청산 신호 확인"""
for ticker, pos in list(self.positions.items()):
try:
price_info = await self.kis.get_price(ticker)
current = price_info["current"]
name = pos["name"]
# 시간 청산: MAX_HOLD_MIN 초과
hold_min = (datetime.now() - pos["entry_time"]).seconds / 60
if hold_min >= MAX_HOLD_MIN:
await self._do_exit(ticker, pos, current, qty=pos["qty"], reason="TIME")
continue
# 전략 청산 신호
signal = self.strategy.check_exit(
ticker=ticker,
entry_price=pos["entry"],
current_price=current,
qty=pos["qty"],
tp1_done=pos["tp1_done"],
sl_pct=self.risk.get_sl_pct(),
)
if signal["signal"]:
await self._do_exit(
ticker, pos, current,
qty=signal["qty"],
reason=signal["reason"],
)
await asyncio.sleep(1.1)
except Exception as e:
logger.error(f"청산 체크 오류 {ticker}: {e}")
async def _do_exit(self, ticker: str, pos: dict,
current: float, qty: int, reason: str):
"""실제 청산 실행"""
name = pos["name"]
result = await self.executor.sell(ticker, name, qty, reason)
if not result["success"]:
return
exit_price = result["price"] or current
pnl = (exit_price - pos["entry"]) * qty
pnl_pct = (exit_price - pos["entry"]) / pos["entry"] * 100
self.risk.record_trade(pnl)
# B안: 연속 손절 2회·3회 도달 시 Discord 알림
if reason == "SL":
consec = self.risk.consec_loss
if consec in (2, 3):
mult = self.risk.get_consec_multiplier()
await notify_risk(
"L3-B",
f"{consec}연속 손절 — 포지션 크기 {int(mult * 100)}%로 축소"
)
if reason == "TP1":
pos["tp1_done"] = True
pos["qty"] -= qty
if pos["qty"] <= 0:
del self.positions[ticker]
self._db_delete_position(ticker)
else:
self._db_save_position(ticker, pos, self.strategy.get_target(ticker))
await notify_tp1(ticker, name, pnl_pct)
elif reason in ("TP2", "SL", "TIME", "FORCE"):
del self.positions[ticker]
self._db_delete_position(ticker)
if reason == "TP2":
await notify_tp2(ticker, name, pnl_pct)
elif reason == "SL":
self.sl_tickers.add(ticker) # 당일 재진입 차단
await notify_sl(ticker, name, pnl_pct)
# L2/L3 체크 후 디스코드 경고
if not self.risk.can_trade():
await notify_risk(
self.risk.stop_reason.split(":")[0],
self.risk.stop_reason
)
# ─────────────────────────────────────────
# 강제 청산 (14:50)
# ─────────────────────────────────────────
async def force_exit_all(self):
"""14:50 전량 강제 청산"""
logger.info("14:50 강제 청산 시작")
for ticker, pos in list(self.positions.items()):
try:
price_info = await self.kis.get_price(ticker)
current = price_info["current"]
await self._do_exit(
ticker, pos, current,
qty=pos["qty"], reason="FORCE"
)
except Exception as e:
logger.error(f"강제 청산 실패 {ticker}: {e}")
await notify_force_exit()
logger.info("강제 청산 완료")
# ─────────────────────────────────────────
# 일일 결산 (15:10)
# ─────────────────────────────────────────
async def daily_summary(self):
"""당일 결산 로그 및 디스코드 알림 + DB 저장"""
today = datetime.now().strftime("%Y-%m-%d")
with get_conn() as conn:
rows = conn.execute("""
SELECT pnl, fee FROM trades
WHERE date=? AND exit_time IS NOT NULL
""", (today,)).fetchall()
pnls = [r[0] for r in rows if r[0] is not None]
fees = [r[1] for r in rows if r[1] is not None]
total = len(pnls)
wins = sum(1 for p in pnls if p > 0)
losses = total - wins
gross_pnl = sum(p for p in pnls if p > 0) - abs(sum(p for p in pnls if p < 0))
total_fee = sum(fees)
net = sum(pnls)
mdd = min(self.risk.daily_pnl / self.risk.init_cash * 100, 0.0)
stopped = 0 if self.risk.can_trade() else 1
# daily_summary 테이블 저장
with get_conn() as conn:
conn.execute("""
INSERT OR REPLACE INTO daily_summary
(date, total_trades, win_trades, lose_trades,
gross_pnl, total_fee, net_pnl, max_drawdown, trading_stopped)
VALUES (?,?,?,?,?,?,?,?,?)
""", (today, total, wins, losses, gross_pnl, total_fee, net, mdd, stopped))
await notify_daily_summary(total, wins, losses, net)
self.risk.reset_daily()
logger.info(f"결산: {total}회 / 승{wins}{losses} / {net:+,.0f}원 (fee {total_fee:,.0f}원)")
# ─────────────────────────────────────────
# 스케줄러
# ─────────────────────────────────────────
async def run():
bot = StockBot()
await bot.initialize()
now = datetime.now().strftime("%H:%M")
if "09:00" <= now < "15:00":
logger.info("장 중 재시작 감지 → AI 컨텍스트 로드 + 유니버스/목표가 즉시 계산")
ctx = bot.strategy.load_ai_context()
await notify_ai_result(
ctx["market_sentiment"],
ctx["sentiment_score"],
ctx.get("hot_sectors", []),
ctx.get("avoid_sectors", []),
ctx.get("reason", ""),
)
bot.risk.set_risk_level(ctx.get("risk_level", "보통"))
await bot.update_universe()
await bot.calc_targets()
await bot.trading_loop() # 바로 매매루프 진입
# 매매루프 종료 후 15:10 결산까지 대기
while True:
now = datetime.now().strftime("%H:%M")
if now == "15:10":
await bot.daily_summary()
return
if now > "15:10":
return
await asyncio.sleep(30)
# 08:30 이후~09:00 이전 시작 시 컨텍스트·유니버스 즉시 로드
if "08:30" <= now < "09:00":
logger.info("장 전 재시작 감지(08:30~09:00) → AI 컨텍스트 로드 + 유니버스 즉시 갱신")
ctx = bot.strategy.load_ai_context()
bot.risk.set_risk_level(ctx.get("risk_level", "보통"))
await bot.update_universe()
while True:
now = datetime.now().strftime("%H:%M")
try:
# 08:30 AI 컨텍스트 로드 + 유니버스 갱신
# (claude_morning이 08:15에 시작해 08:30 전에 daily_context.json 생성)
if now == "08:30":
ctx = bot.strategy.load_ai_context()
await notify_ai_result(
ctx["market_sentiment"],
ctx["sentiment_score"],
ctx.get("hot_sectors", []),
ctx.get("avoid_sectors", []),
ctx.get("reason", ""),
)
bot.risk.set_risk_level(ctx.get("risk_level", "보통"))
await bot.update_universe()
# 08:50 목표가 계산
elif now == "08:50":
await bot.calc_targets()
# 09:00 매매 루프 시작
elif now == "09:00":
await bot.trading_loop()
# 15:10 결산
elif now == "15:10":
await bot.daily_summary()
except Exception as e:
logger.error(f"스케줄러 루프 오류 ({now}): {e}", exc_info=True)
await asyncio.sleep(30)
if __name__ == "__main__":
os.makedirs("logs", exist_ok=True)
os.makedirs("data", exist_ok=True)
logger.info("=" * 50)
logger.info("단타 자동매매 시스템 시작")
logger.info(f"모드: {'모의투자' if os.getenv('KIS_MOCK','true')=='true' else '실거래'}")
logger.info(f"DRY_RUN: {os.getenv('DRY_RUN','true')}")
logger.info("=" * 50)
asyncio.run(run())