first vibe coding

This commit is contained in:
jongjae0305
2026-05-14 15:14:50 +09:00
commit bfff65e55b
40 changed files with 2795 additions and 0 deletions
+6
View File
@@ -0,0 +1,6 @@
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "main.py"]
View File
View File
+36
View File
@@ -0,0 +1,36 @@
"""
config.py - 전략 파라미터 전용 (기획서 v3.0 기준)
Claude Code가 이 파일을 읽고 필요시 수정함
"""
# ── 변동성 돌파 ──
STRATEGY_K = 0.5
ENTRY_START = "09:00"
ENTRY_END = "14:30"
FORCE_EXIT = "14:50" # 절대 변경 불가
TP1_PCT = 0.02 # 1차 익절 +2% → 50% 매도
TP2_PCT = 0.03 # 2차 익절 +3% → 전량
SL_PCT = 0.015 # 손절 -1.5%
MAX_HOLD_MIN = 120
# ── 리스크 ──
POS_SIZE_PCT = 0.20
MAX_POSITIONS = 2
DAILY_SL_PCT = 0.03
CONSEC_LOSS = 3
AI_RISK_SL_MAP = {"낮음": 0.015, "보통": 0.015, "높음": 0.010}
# ── 유니버스 ──
MIN_TRADE_AMOUNT = 10_000_000_000
MAX_UNIVERSE = 30
KOSPI_MIN_CHG = -1.0
# ── AI ──
AI_CONTEXT_PATH = "data/daily_context.json"
AI_MIN_SCORE = 40
AI_BOOST_MULTI = 1.5
# ── 비용 ──
FEE_RATE = 0.00015
TAX_RATE = 0.0018
SLIPPAGE = 0.001
View File
+22
View File
@@ -0,0 +1,22 @@
"""
data/collector.py - KIS WebSocket 실시간 시세 수신
"""
import asyncio, logging
from app.execution.kis_client import KISWebSocket
logger = logging.getLogger(__name__)
class DataCollector:
def __init__(self, kis_client, on_price, on_vi):
self.ws = KISWebSocket(kis_client)
self.on_price = on_price
self.on_vi = on_vi
async def start(self, tickers: list):
self.ws.on_price("*", self.on_price)
self.ws.on_vi(self.on_vi)
await self.ws.subscribe(tickers)
async def stop(self):
await self.ws.close()
+38
View File
@@ -0,0 +1,38 @@
"""
data/universe.py - 종목 풀 갱신 (08:30)
"""
import json, os, logging
from app.config import MAX_UNIVERSE
logger = logging.getLogger(__name__)
CACHE_PATH = "data/universe_cache.json"
async def update_universe(kis_client, ai_context: dict) -> list:
"""거래량 순위 기반 유니버스 갱신 + AI 필터"""
try:
rank = await kis_client.get_volume_rank(top_n=MAX_UNIVERSE * 2)
tickers = [r["ticker"] for r in rank]
blacklist = ai_context.get("blacklist_tickers", [])
tickers = [t for t in tickers if t not in blacklist]
boosted = ai_context.get("boosted_tickers", [])
tickers = (
[t for t in boosted if t in tickers] +
[t for t in tickers if t not in boosted]
)[:MAX_UNIVERSE]
os.makedirs(os.path.dirname(CACHE_PATH), exist_ok=True)
with open(CACHE_PATH, "w") as f:
json.dump({"tickers": tickers, "boosted": boosted}, f)
logger.info(f"유니버스 갱신: {len(tickers)}종목 (부스트: {len(boosted)})")
return tickers
except Exception as e:
logger.error(f"유니버스 갱신 실패: {e}")
if os.path.exists(CACHE_PATH):
with open(CACHE_PATH) as f:
return json.load(f).get("tickers", [])
return []
View File
+88
View File
@@ -0,0 +1,88 @@
"""
db/models.py - SQLite 스키마 생성
기획서 v2.1 기준 4개 테이블
"""
import sqlite3
import os
DB_PATH = os.getenv("DB_PATH", "data/stockbot.db")
def init_db():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# 체결 내역
c.execute("""
CREATE TABLE IF NOT EXISTS trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
ticker TEXT NOT NULL,
name TEXT,
entry_time TEXT NOT NULL,
exit_time TEXT,
entry_price REAL NOT NULL,
exit_price REAL,
quantity INTEGER NOT NULL,
side TEXT NOT NULL,
exit_reason TEXT,
pnl REAL,
fee REAL,
slippage REAL,
strategy TEXT DEFAULT 'VB',
ai_boosted INTEGER DEFAULT 0
)""")
# 일일 요약
c.execute("""
CREATE TABLE IF NOT EXISTS daily_summary (
date TEXT PRIMARY KEY,
total_trades INTEGER DEFAULT 0,
win_trades INTEGER DEFAULT 0,
lose_trades INTEGER DEFAULT 0,
gross_pnl REAL DEFAULT 0,
total_fee REAL DEFAULT 0,
net_pnl REAL DEFAULT 0,
max_drawdown REAL DEFAULT 0,
trading_stopped INTEGER DEFAULT 0
)""")
# 포지션 (장중 현황)
c.execute("""
CREATE TABLE IF NOT EXISTS positions (
ticker TEXT PRIMARY KEY,
name TEXT,
entry_time TEXT,
entry_price REAL,
quantity INTEGER,
tp1_done INTEGER DEFAULT 0,
target_price REAL,
stop_price REAL,
ai_boosted INTEGER DEFAULT 0
)""")
# AI 판단 이력
c.execute("""
CREATE TABLE IF NOT EXISTS ai_context_log (
date TEXT PRIMARY KEY,
generated_at TEXT,
trade_allowed INTEGER,
market_sentiment TEXT,
sentiment_score INTEGER,
risk_level TEXT,
hot_sectors TEXT,
avoid_sectors TEXT,
boosted_tickers TEXT,
blacklist_tickers TEXT,
position_size_mult REAL,
reason TEXT,
claude_tokens_used INTEGER,
api_call_success INTEGER DEFAULT 1
)""")
conn.commit()
conn.close()
print(f"DB 초기화 완료: {DB_PATH}")
def get_conn():
return sqlite3.connect(DB_PATH)
+78
View File
@@ -0,0 +1,78 @@
"""
db/repository.py - DB 접근 레이어
"""
import json
from datetime import datetime
from app.db.models import get_conn
def save_trade(ticker, name, entry_time, entry_price,
quantity, side, ai_boosted=False):
with get_conn() as conn:
conn.execute("""
INSERT INTO trades
(date, ticker, name, entry_time, entry_price, quantity, side, ai_boosted)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (datetime.now().strftime("%Y-%m-%d"), ticker, name,
entry_time, entry_price, quantity, side,
1 if ai_boosted else 0))
def update_trade_exit(ticker, exit_time, exit_price, exit_reason, pnl, fee):
with get_conn() as conn:
conn.execute("""
UPDATE trades SET exit_time=?, exit_price=?,
exit_reason=?, pnl=?, fee=?
WHERE ticker=? AND exit_time IS NULL
ORDER BY id DESC LIMIT 1
""", (exit_time, exit_price, exit_reason, pnl, fee, ticker))
def save_daily_summary(date, total, wins, losses,
gross_pnl, fee, net_pnl, mdd, stopped):
with get_conn() as conn:
conn.execute("""
INSERT OR REPLACE INTO daily_summary
VALUES (?,?,?,?,?,?,?,?,?)
""", (date, total, wins, losses, gross_pnl, fee, net_pnl, mdd, stopped))
def save_ai_context(ctx: dict, tokens_used: int = 0, success: bool = True):
with get_conn() as conn:
conn.execute("""
INSERT OR REPLACE INTO ai_context_log
(date, generated_at, trade_allowed, market_sentiment,
sentiment_score, risk_level, hot_sectors, avoid_sectors,
boosted_tickers, blacklist_tickers, position_size_mult,
reason, claude_tokens_used, api_call_success)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
ctx.get("date"), ctx.get("generated_at"),
1 if ctx.get("trade_allowed") else 0,
ctx.get("market_sentiment"), ctx.get("sentiment_score"),
ctx.get("risk_level"),
json.dumps(ctx.get("hot_sectors", []), ensure_ascii=False),
json.dumps(ctx.get("avoid_sectors", []), ensure_ascii=False),
json.dumps(ctx.get("boosted_tickers", []), ensure_ascii=False),
json.dumps(ctx.get("blacklist_tickers", []), ensure_ascii=False),
ctx.get("position_size_multiplier", 1.0),
ctx.get("reason", ""),
tokens_used, 1 if success else 0,
))
def get_today_trades(date: str = None):
date = date or datetime.now().strftime("%Y-%m-%d")
with get_conn() as conn:
rows = conn.execute(
"SELECT * FROM trades WHERE date=?", (date,)
).fetchall()
return rows
def get_open_positions():
with get_conn() as conn:
rows = conn.execute(
"SELECT * FROM positions"
).fetchall()
return rows
View File
+550
View File
@@ -0,0 +1,550 @@
"""
kis_client.py
KIS Open API REST + WebSocket 래퍼
- 토큰 자동 발급/갱신
- 모의투자/실거래 모드 자동 전환
- rate limit 제어 (초당 20건)
"""
import os
import json
import time
import asyncio
import aiohttp
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, Callable
logger = logging.getLogger(__name__)
# ── 모드별 베이스 URL ──
URL_REAL = "https://openapi.koreainvestment.com:9443"
URL_MOCK = "https://openapivts.koreainvestment.com:29443"
class KISClient:
"""
KIS Open API 클라이언트
모의투자/실거래 모드를 .env의 KIS_MOCK 값으로 자동 전환
"""
def __init__(self):
self.is_mock = os.getenv("KIS_MOCK", "true").lower() == "true"
self.base_url = URL_MOCK if self.is_mock else URL_REAL
# 모드별 키 자동 선택
if self.is_mock:
self.app_key = os.getenv("KIS_MOCK_APP_KEY", "")
self.app_secret = os.getenv("KIS_MOCK_APP_SECRET", "")
self.account_no = os.getenv("KIS_MOCK_ACCOUNT_NO", "")
else:
self.app_key = os.getenv("KIS_APP_KEY", "")
self.app_secret = os.getenv("KIS_APP_SECRET", "")
self.account_no = os.getenv("KIS_ACCOUNT_NO", "")
# 계좌번호 파싱 (앞 8자리 + 뒤 2자리)
self._parse_account()
# 토큰 관련
self._access_token : Optional[str] = None
self._token_expires_at: Optional[datetime] = None
# rate limit: 초당 20건
self._semaphore = asyncio.Semaphore(20)
self._req_times : list = []
mode = "모의투자" if self.is_mock else "실거래"
logger.info(f"KISClient 초기화 완료 [{mode}] 계좌: {self.account_no}")
def _parse_account(self):
"""계좌번호 파싱: '50123456-01' → ('50123456', '01')"""
raw = self.account_no.replace("-", "")
if len(raw) >= 10:
self.acct_prefix = raw[:8]
self.acct_suffix = raw[8:10]
else:
self.acct_prefix = raw
self.acct_suffix = "01"
# ─────────────────────────────────────────
# 토큰 관리
# ─────────────────────────────────────────
async def get_access_token(self) -> str:
"""액세스 토큰 발급/갱신 (만료 30분 전 자동 갱신)"""
now = datetime.now()
if (self._access_token
and self._token_expires_at
and now < self._token_expires_at - timedelta(minutes=30)):
return self._access_token
url = f"{self.base_url}/oauth2/tokenP"
body = {
"grant_type" : "client_credentials",
"appkey" : self.app_key,
"appsecret" : self.app_secret,
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=body) as resp:
data = await resp.json()
if "access_token" not in data:
raise RuntimeError(f"토큰 발급 실패: {data}")
self._access_token = data["access_token"]
# 유효기간 24시간
self._token_expires_at = now + timedelta(hours=24)
logger.info("KIS 액세스 토큰 발급/갱신 완료")
return self._access_token
# ─────────────────────────────────────────
# REST API 기본 호출
# ─────────────────────────────────────────
async def _request(
self,
method : str,
path : str,
tr_id : str,
params : Optional[Dict] = None,
body : Optional[Dict] = None,
) -> Dict[str, Any]:
"""
KIS REST API 공통 호출
- rate limit 제어 (초당 20건)
- 토큰 자동 첨부
"""
token = await self.get_access_token()
url = f"{self.base_url}{path}"
headers = {
"content-type" : "application/json; charset=utf-8",
"authorization" : f"Bearer {token}",
"appkey" : self.app_key,
"appsecret" : self.app_secret,
"tr_id" : tr_id,
"custtype" : "P", # 개인
}
async with self._semaphore:
# 초당 20건 rate limit
now = time.monotonic()
self._req_times = [t for t in self._req_times if now - t < 1.0]
if len(self._req_times) >= 20:
wait = 1.0 - (now - self._req_times[0])
if wait > 0:
await asyncio.sleep(wait)
self._req_times.append(time.monotonic())
async with aiohttp.ClientSession() as session:
if method == "GET":
async with session.get(url, headers=headers, params=params) as r:
data = await r.json()
else:
async with session.post(url, headers=headers, json=body) as r:
data = await r.json()
# 응답 코드 체크
rt_cd = data.get("rt_cd", "")
if rt_cd != "0":
msg = data.get("msg1", "알 수 없는 오류")
logger.error(f"KIS API 오류 [{tr_id}]: {rt_cd} - {msg}")
raise RuntimeError(f"KIS API 오류: {msg}")
return data
# ─────────────────────────────────────────
# 시세 조회
# ─────────────────────────────────────────
async def get_price(self, ticker: str) -> Dict:
"""주식 현재가 조회 (FHKST01010100)"""
data = await self._request(
method = "GET",
path = "/uapi/domestic-stock/v1/quotations/inquire-price",
tr_id = "FHKST01010100",
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD" : ticker,
}
)
o = data["output"]
return {
"ticker" : ticker,
"current" : int(o["stck_prpr"]), # 현재가
"open" : int(o["stck_oprc"]), # 시가
"high" : int(o["stck_hgpr"]), # 고가
"low" : int(o["stck_lwpr"]), # 저가
"prev_close" : int(o["stck_sdpr"]), # 전일 종가
"volume" : int(o["acml_vol"]), # 누적 거래량
"change_pct" : float(o["prdy_ctrt"]), # 등락률
"market_cap" : int(o.get("hts_avls", 0)) * 100_000_000, # 시가총액 (억→원)
}
async def get_ohlcv_daily(self, ticker: str, start: str, end: str) -> list:
"""
주식 기간별 시세 (일봉) - 백테스트/AI 분석용
start, end: 'YYYYMMDD'
"""
data = await self._request(
method = "GET",
path = "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice",
tr_id = "FHKST03010100",
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD" : ticker,
"FID_INPUT_DATE_1" : start,
"FID_INPUT_DATE_2" : end,
"FID_PERIOD_DIV_CODE" : "D", # 일봉
"FID_ORG_ADJ_PRC" : "0",
}
)
result = []
for row in data.get("output2", []):
result.append({
"date" : row["stck_bsop_date"],
"open" : int(row["stck_oprc"]),
"high" : int(row["stck_hgpr"]),
"low" : int(row["stck_lwpr"]),
"close" : int(row["stck_clpr"]),
"volume": int(row["acml_vol"]),
})
return result
# ─────────────────────────────────────────
# 주문
# ─────────────────────────────────────────
async def order_buy(
self,
ticker : str,
qty : int,
price : int = 0, # 0 = 시장가
order_type: str = "01", # 01=시장가, 00=지정가
) -> Dict:
"""주식 매수 주문"""
dry_run = os.getenv("DRY_RUN", "true").lower() == "true"
if dry_run:
logger.info(f"[DRY_RUN] 매수 {ticker} {qty}주 @ {price or '시장가'}")
return {"dry_run": True, "ticker": ticker, "qty": qty}
# 모의/실거래 TR 구분
tr_id = "VTTC0802U" if self.is_mock else "TTTC0802U"
data = await self._request(
method = "POST",
path = "/uapi/domestic-stock/v1/trading/order-cash",
tr_id = tr_id,
body = {
"CANO" : self.acct_prefix,
"ACNT_PRDT_CD": self.acct_suffix,
"PDNO" : ticker,
"ORD_DVSN" : order_type,
"ORD_QTY" : str(qty),
"ORD_UNPR" : str(price),
}
)
logger.info(f"매수 주문 완료: {ticker} {qty}")
return data
async def order_sell(
self,
ticker : str,
qty : int,
price : int = 0,
order_type: str = "01",
) -> Dict:
"""주식 매도 주문"""
dry_run = os.getenv("DRY_RUN", "true").lower() == "true"
if dry_run:
logger.info(f"[DRY_RUN] 매도 {ticker} {qty}주 @ {price or '시장가'}")
return {"dry_run": True, "ticker": ticker, "qty": qty}
tr_id = "VTTC0801U" if self.is_mock else "TTTC0801U"
data = await self._request(
method = "POST",
path = "/uapi/domestic-stock/v1/trading/order-cash",
tr_id = tr_id,
body = {
"CANO" : self.acct_prefix,
"ACNT_PRDT_CD": self.acct_suffix,
"PDNO" : ticker,
"ORD_DVSN" : order_type,
"ORD_QTY" : str(qty),
"ORD_UNPR" : str(price),
}
)
logger.info(f"매도 주문 완료: {ticker} {qty}")
return data
# ─────────────────────────────────────────
# 잔고 조회
# ─────────────────────────────────────────
async def get_balance(self) -> Dict:
"""주식 잔고 조회 (보유 종목 + 예수금)"""
tr_id = "VTTC8001R" if self.is_mock else "TTTC8001R"
data = await self._request(
method = "GET",
path = "/uapi/domestic-stock/v1/trading/inquire-balance",
tr_id = tr_id,
params = {
"CANO" : self.acct_prefix,
"ACNT_PRDT_CD" : self.acct_suffix,
"AFHR_FLPR_YN" : "N",
"OFL_YN" : "",
"INQR_DVSN" : "02",
"UNPR_DVSN" : "01",
"FUND_STTL_ICLD_YN" : "N",
"FNCG_AMT_AUTO_RDPT_YN": "N",
"PRCS_DVSN" : "01",
"CTX_AREA_FK100" : "",
"CTX_AREA_NK100" : "",
}
)
holdings = []
for item in data.get("output1", []):
qty = int(item.get("hldg_qty", "0"))
if qty > 0:
holdings.append({
"ticker" : item["pdno"],
"name" : item["prdt_name"],
"qty" : qty,
"avg_price" : int(item["pchs_avg_pric"].replace(".", "")),
"current" : int(item["prpr"]),
"pnl_pct" : float(item["evlu_pfls_rt"]),
})
cash = int(data["output2"][0].get("dnca_tot_amt", "0")) if data.get("output2") else 0
return {
"holdings" : holdings,
"cash" : cash,
"total_cnt": len(holdings),
}
# ─────────────────────────────────────────
# AI 판단용 수급 데이터
# ─────────────────────────────────────────
async def get_volume_rank(self, top_n: int = 30) -> list:
"""거래량 순위 상위 종목 (AI 판단용)"""
data = await self._request(
method = "GET",
path = "/uapi/domestic-stock/v1/quotations/volume-rank",
tr_id = "FHPST01710000",
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_COND_SCR_DIV_CODE" : "20171",
"FID_INPUT_ISCD" : "0000",
"FID_DIV_CLS_CODE" : "0",
"FID_BLNG_CLS_CODE" : "0",
"FID_TRGT_CLS_CODE" : "111111111",
"FID_TRGT_EXLS_CLS_CODE": "000000",
"FID_INPUT_PRICE_1" : "",
"FID_INPUT_PRICE_2" : "",
"FID_VOL_CNT" : "",
"FID_INPUT_DATE_1" : "",
}
)
result = []
for i, row in enumerate(data.get("output", [])[:top_n]):
result.append({
"rank" : i + 1,
"ticker" : row["mksc_shrn_iscd"],
"name" : row["hts_kor_isnm"],
"volume" : int(row["acml_vol"]),
"change_pct": float(row["prdy_ctrt"]),
})
return result
async def get_foreign_institution_rank(self, top_n: int = 30) -> Dict:
"""외국인/기관 순매수 상위 (AI 판단용)"""
data = await self._request(
method = "GET",
path = "/uapi/domestic-stock/v1/quotations/inquire-investor",
tr_id = "FHKST04430000",
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD" : "0000",
"FID_INPUT_DATE_1" : "",
"FID_INPUT_DATE_2" : "",
"FID_PERIOD_DIV_CODE" : "D",
}
)
foreign = []
institution = []
for row in data.get("output", [])[:top_n]:
entry = {
"ticker": row.get("mksc_shrn_iscd", ""),
"name" : row.get("hts_kor_isnm", ""),
"amount": int(row.get("frgn_ntby_qty", "0")),
}
foreign.append(entry)
entry2 = {
"ticker": row.get("mksc_shrn_iscd", ""),
"name" : row.get("hts_kor_isnm", ""),
"amount": int(row.get("orgn_ntby_qty", "0")),
}
institution.append(entry2)
return {
"foreign" : sorted(foreign, key=lambda x: x["amount"], reverse=True)[:top_n],
"institution": sorted(institution, key=lambda x: x["amount"], reverse=True)[:top_n],
}
async def get_sector_trend(self) -> list:
"""업종별 등락률 (AI 판단용)"""
data = await self._request(
method = "GET",
path = "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice",
tr_id = "FHKST03010100",
params = {
"FID_COND_MRKT_DIV_CODE": "U", # 업종
"FID_INPUT_ISCD" : "0001",
"FID_INPUT_DATE_1" : "",
"FID_INPUT_DATE_2" : "",
"FID_PERIOD_DIV_CODE" : "D",
"FID_ORG_ADJ_PRC" : "0",
}
)
result = []
for row in data.get("output1", []):
result.append({
"sector" : row.get("hts_kor_isnm", ""),
"change_pct": float(row.get("prdy_ctrt", "0")),
})
return result
# ─────────────────────────────────────────
# WebSocket 클라이언트 (실시간 시세)
# ─────────────────────────────────────────
class KISWebSocket:
"""
KIS 실시간 시세 WebSocket
- 체결가 (H0STCNT0)
- 호가 (H0STASP0)
- VI (H0STVI0)
"""
WS_URL_REAL = "ws://ops.koreainvestment.com:21000"
WS_URL_MOCK = "ws://ops.koreainvestment.com:31000"
def __init__(self, kis_client: KISClient):
self.kis = kis_client
self.ws_url = self.WS_URL_MOCK if kis_client.is_mock else self.WS_URL_REAL
self._ws = None
self._handlers : Dict[str, Callable] = {} # ticker → callback
self._vi_handler: Optional[Callable] = None
self._running = False
def on_price(self, ticker: str, handler: Callable):
"""실시간 체결가 핸들러 등록"""
self._handlers[ticker] = handler
def on_vi(self, handler: Callable):
"""VI 발동 핸들러 등록"""
self._vi_handler = handler
async def subscribe(self, tickers: list):
"""종목 구독 시작"""
token = await self.kis.get_access_token()
# 접속키 발급
async with aiohttp.ClientSession() as session:
resp = await session.post(
f"{self.kis.base_url}/oauth2/Approval",
json={
"grant_type": "client_credentials",
"appkey" : self.kis.app_key,
"secretkey" : self.kis.app_secret,
}
)
key_data = await resp.json()
approval_key = key_data.get("approval_key", "")
async with aiohttp.ClientSession() as session:
async with session.ws_connect(self.ws_url) as ws:
self._ws = ws
self._running = True
logger.info(f"WebSocket 연결 완료: {len(tickers)}종목 구독 시작")
# 종목별 구독 등록
for ticker in tickers:
for tr_id in ["H0STCNT0", "H0STVI0"]:
await ws.send_json({
"header": {
"approval_key": approval_key,
"custtype" : "P",
"tr_type" : "1", # 등록
"content-type": "utf-8",
},
"body": {
"input": {
"tr_id" : tr_id,
"tr_key" : ticker,
}
}
})
logger.info("구독 등록 완료")
# 메시지 수신 루프
async for msg in ws:
if not self._running:
break
if msg.type == aiohttp.WSMsgType.TEXT:
await self._handle_message(msg.data)
elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
logger.error("WebSocket 연결 끊김")
self._running = False
break
async def _handle_message(self, raw: str):
"""WebSocket 메시지 파싱"""
try:
# KIS WebSocket 메시지 포맷: 헤더|바디
if raw.startswith("{"):
# JSON 형식 (시스템 메시지)
return
parts = raw.split("|")
if len(parts) < 4:
return
tr_id = parts[1]
data = parts[3].split("^")
if tr_id == "H0STCNT0":
# 실시간 체결가
ticker = data[0]
price = int(data[2])
volume = int(data[9])
handler = self._handlers.get(ticker)
if handler:
await handler(ticker, price, volume)
elif tr_id == "H0STVI0":
# VI 발동/해제
ticker = data[0]
vi_status = data[1] # 1=발동, 2=해제
ref_price = int(data[5]) if len(data) > 5 else 0
if self._vi_handler:
await self._vi_handler(ticker, vi_status, ref_price)
except Exception as e:
logger.error(f"WebSocket 메시지 파싱 오류: {e}")
async def close(self):
self._running = False
if self._ws:
await self._ws.close()
logger.info("WebSocket 연결 종료")
+99
View File
@@ -0,0 +1,99 @@
"""
execution/order_executor.py
주문 실행 모듈
DRY_RUN=true 시 실제 주문 전송 없음
"""
import os
import asyncio
import logging
from datetime import datetime
from app.execution.kis_client import KISClient
from app.db.models import get_conn
from app.config import FEE_RATE, TAX_RATE
logger = logging.getLogger(__name__)
class OrderExecutor:
def __init__(self, kis: KISClient):
self.kis = kis
self.dry_run = os.getenv("DRY_RUN", "true").lower() == "true"
def _calc_fee(self, price: float, qty: int, is_buy: bool) -> float:
amt = price * qty
return amt * (FEE_RATE + (0 if is_buy else TAX_RATE))
async def buy(self, ticker: str, name: str,
qty: int, reason: str = "",
ai_boosted: bool = False) -> dict:
"""시장가 매수"""
try:
result = await self.kis.order_buy(ticker, qty)
price = result.get("entry_price", 0)
# DB 저장
fee = self._calc_fee(price, qty, True)
self._save_trade(
ticker=ticker, name=name,
entry_price=price, qty=qty,
side="BUY", fee=fee,
ai_boosted=ai_boosted,
)
mode = "[DRY]" if self.dry_run else ""
logger.info(f"{mode} 매수 {name}({ticker}) {qty}주 @ {price:,}")
return {"success": True, "price": price, "qty": qty}
except Exception as e:
logger.error(f"매수 실패 {ticker}: {e}")
return {"success": False, "error": str(e)}
async def sell(self, ticker: str, name: str,
qty: int, reason: str = "") -> dict:
"""시장가 매도"""
try:
result = await self.kis.order_sell(ticker, qty)
price = result.get("exit_price", 0)
fee = self._calc_fee(price, qty, False)
self._update_trade_exit(
ticker=ticker, exit_price=price,
qty=qty, reason=reason, fee=fee,
)
mode = "[DRY]" if self.dry_run else ""
logger.info(f"{mode} 매도 {name}({ticker}) {qty}주 @ {price:,}원 [{reason}]")
return {"success": True, "price": price, "qty": qty}
except Exception as e:
logger.error(f"매도 실패 {ticker}: {e}")
return {"success": False, "error": str(e)}
def _save_trade(self, ticker, name, entry_price,
qty, side, fee, ai_boosted=False):
with get_conn() as conn:
conn.execute("""
INSERT INTO trades
(date, ticker, name, entry_time, entry_price,
quantity, side, fee, ai_boosted)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
datetime.now().strftime("%Y-%m-%d"),
ticker, name,
datetime.now().strftime("%H:%M:%S"),
entry_price, qty, side, fee,
1 if ai_boosted else 0,
))
def _update_trade_exit(self, ticker, exit_price,
qty, reason, fee):
with get_conn() as conn:
conn.execute("""
UPDATE trades
SET exit_time=?, exit_price=?, exit_reason=?, fee=fee+?
WHERE ticker=? AND exit_time IS NULL
ORDER BY id DESC LIMIT 1
""", (
datetime.now().strftime("%H:%M:%S"),
exit_price, reason, fee, ticker,
))
+458
View File
@@ -0,0 +1,458 @@
"""
main.py
단타 자동매매 시스템 메인 진입점
기획서 v2.1 기준
실행:
python -m app.main (Docker 컨테이너)
python app/main.py (로컬 테스트)
환경변수:
KIS_MOCK=true → 모의투자 모드
DRY_RUN=true → 신호만 확인, 주문 전송 안 함
"""
import os
import sys
import asyncio
import logging
from datetime import datetime, time
from pathlib import Path
# .env 로드
def load_env():
env_path = Path(".env")
if not env_path.exists():
return
with open(env_path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, _, v = line.partition("=")
k = k.strip()
v = v.strip().strip('"').strip("'")
if k and v and k not in os.environ:
os.environ[k] = v
load_env()
# 로깅 설정
logging.basicConfig(
level=getattr(logging, os.getenv("LOG_LEVEL", "INFO")),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("logs/stockbot.log", encoding="utf-8"),
]
)
logger = logging.getLogger(__name__)
from app.execution.kis_client import KISClient
from app.execution.order_executor import OrderExecutor
from app.strategy.volatility_breakout import VolatilityBreakout
from app.risk.manager import RiskManager
from app.monitor.notifier import (
notify_buy, notify_tp1, notify_tp2, notify_sl,
notify_force_exit, notify_risk, notify_daily_summary,
notify_error, notify_ai_result, notify_ai_blocked,
notify_ai_fallback, send
)
from app.db.models import init_db, get_conn
from app.config import (
MAX_UNIVERSE, FORCE_EXIT, MAX_POSITIONS,
MAX_HOLD_MIN, KOSPI_MIN_CHG
)
class StockBot:
def __init__(self):
self.kis = KISClient()
self.executor = OrderExecutor(self.kis)
self.strategy = VolatilityBreakout()
self.positions = {} # ticker → {name, entry, qty, tp1_done, entry_time}
self.universe = [] # 감시 종목 리스트
self.risk = None # RiskManager (잔고 확인 후 초기화)
self.running = False
mode = "모의투자" if self.kis.is_mock else "실거래"
dry = " [DRY_RUN]" if os.getenv("DRY_RUN","true")=="true" else ""
logger.info(f"StockBot 시작 [{mode}]{dry}")
# ─────────────────────────────────────────
# 초기화
# ─────────────────────────────────────────
async def initialize(self):
"""시스템 초기화"""
init_db()
await self.kis.get_access_token()
# 잔고 조회 → RiskManager 초기화
balance = await self.kis.get_balance()
cash = balance["cash"]
self.risk = RiskManager(init_cash=cash)
logger.info(f"초기 예수금: {cash:,}")
await send(f"[시작] 단타봇 가동 | 예수금: {cash:,}원 | "
f"{'모의투자' if self.kis.is_mock else '실거래'}")
# ─────────────────────────────────────────
# 유니버스 갱신 (08:30)
# ─────────────────────────────────────────
async def update_universe(self):
"""종목 풀 갱신 + 전일 데이터 수집"""
logger.info("유니버스 갱신 시작")
try:
# 거래량 순위 상위 30종목
rank = await self.kis.get_volume_rank(top_n=MAX_UNIVERSE)
tickers = [r["ticker"] for r in rank]
# AI 블랙리스트 제거
ctx = self.strategy.context
blacklist = ctx.get("blacklist_tickers", [])
tickers = [t for t in tickers if t not in blacklist]
# boosted 종목 상단 배치
boosted = ctx.get("boosted_tickers", [])
tickers = (
[t for t in boosted if t in tickers] +
[t for t in tickers if t not in boosted]
)[:MAX_UNIVERSE]
self.universe = tickers
logger.info(f"유니버스: {len(tickers)}종목")
# 전일 OHLCV 수집 (목표가 계산용)
today = datetime.now().strftime("%Y%m%d")
for ticker in self.universe:
try:
ohlcv = await self.kis.get_ohlcv_daily(
ticker,
start=today,
end=today,
)
if len(ohlcv) >= 2:
prev = ohlcv[-2]
self.strategy.set_prev_data(
ticker,
high = prev["high"],
low = prev["low"],
amount= prev.get("amount",
prev.get("volume",0) * prev.get("close",0))
)
except Exception as e:
logger.warning(f"전일 데이터 실패 {ticker}: {e}")
await asyncio.sleep(0.1) # rate limit
except Exception as e:
logger.error(f"유니버스 갱신 실패: {e}")
# ─────────────────────────────────────────
# 시가 수집 + 목표가 계산 (08:50)
# ─────────────────────────────────────────
async def calc_targets(self):
"""당일 시가 기반 목표가 계산"""
logger.info("목표가 계산 시작")
for ticker in self.universe:
try:
price_info = await self.kis.get_price(ticker)
self.strategy.set_today_open(ticker, price_info["open"])
target = self.strategy.get_target(ticker)
if target > 0:
logger.debug(f"{ticker} 목표가: {target:,.0f}")
await asyncio.sleep(0.05)
except Exception as e:
logger.warning(f"시가 수집 실패 {ticker}: {e}")
# ─────────────────────────────────────────
# 메인 매매 루프 (09:00~14:50)
# ─────────────────────────────────────────
async def trading_loop(self):
"""1초 단위 메인 루프"""
logger.info("매매 루프 시작")
self.running = True
while self.running:
now = datetime.now()
now_str = now.strftime("%H:%M")
# 14:50 강제 청산
if now_str >= FORCE_EXIT:
await self.force_exit_all()
self.running = False
break
# 14:30 이후 신규 진입 중단
if now_str > "14:30":
await asyncio.sleep(1)
continue
# 09:00 이전 대기
if now_str < "09:00":
await asyncio.sleep(1)
continue
# 점심 (11:30~13:00) 신규 진입 중단
if "11:30" <= now_str < "13:00":
await asyncio.sleep(1)
continue
# 리스크 체크
if not self.risk.can_trade():
await asyncio.sleep(5)
continue
# 보유 포지션 청산 체크
await self.check_exits()
# 신규 진입 체크
if self.risk.can_add_position(len(self.positions)):
await self.check_entries()
await asyncio.sleep(1)
# ─────────────────────────────────────────
# 진입 체크
# ─────────────────────────────────────────
async def check_entries(self):
"""유니버스 전체 진입 신호 확인"""
for ticker in self.universe:
if ticker in self.positions:
continue
if len(self.positions) >= MAX_POSITIONS:
break
try:
price_info = await self.kis.get_price(ticker)
current = price_info["current"]
name = price_info.get("name", ticker)
# 전략 신호 체크
signal = self.strategy.check_entry(
ticker=ticker,
name=name,
current_price=current,
)
if not signal["signal"]:
continue
# 포지션 사이즈 계산
balance = await self.kis.get_balance()
cash = balance["cash"]
invest = self.risk.get_pos_size(
cash, signal.get("multiplier", 1.0)
)
qty = max(1, int(invest // current))
# 매수 실행
result = await self.executor.buy(
ticker=ticker, name=name,
qty=qty, reason=signal["reason"],
ai_boosted=signal.get("boosted", False),
)
if result["success"]:
entry_price = result["price"] or current
sl_price = entry_price * (1 - self.risk.get_sl_pct())
tp1_price = entry_price * (1 + 0.02)
self.positions[ticker] = {
"name" : name,
"entry" : entry_price,
"qty" : qty,
"tp1_done" : False,
"entry_time": datetime.now(),
"sl_price" : sl_price,
"boosted" : signal.get("boosted", False),
}
await notify_buy(
ticker=ticker, name=name,
price=entry_price,
target=int(entry_price * 1.03),
stop=int(sl_price),
boosted=signal.get("boosted", False),
)
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"진입 체크 오류 {ticker}: {e}")
# ─────────────────────────────────────────
# 청산 체크
# ─────────────────────────────────────────
async def check_exits(self):
"""보유 포지션 청산 신호 확인"""
for ticker, pos in list(self.positions.items()):
try:
price_info = await self.kis.get_price(ticker)
current = price_info["current"]
name = pos["name"]
# 시간 청산: MAX_HOLD_MIN 초과
hold_min = (datetime.now() - pos["entry_time"]).seconds / 60
if hold_min >= MAX_HOLD_MIN:
await self._do_exit(ticker, pos, current, qty=pos["qty"], reason="TIME")
continue
# 전략 청산 신호
signal = self.strategy.check_exit(
ticker=ticker,
entry_price=pos["entry"],
current_price=current,
qty=pos["qty"],
tp1_done=pos["tp1_done"],
sl_pct=self.risk.get_sl_pct(),
)
if signal["signal"]:
await self._do_exit(
ticker, pos, current,
qty=signal["qty"],
reason=signal["reason"],
)
await asyncio.sleep(0.05)
except Exception as e:
logger.error(f"청산 체크 오류 {ticker}: {e}")
async def _do_exit(self, ticker: str, pos: dict,
current: float, qty: int, reason: str):
"""실제 청산 실행"""
name = pos["name"]
result = await self.executor.sell(ticker, name, qty, reason)
if not result["success"]:
return
exit_price = result["price"] or current
pnl = (exit_price - pos["entry"]) * qty
pnl_pct = (exit_price - pos["entry"]) / pos["entry"] * 100
self.risk.record_trade(pnl)
if reason == "TP1":
pos["tp1_done"] = True
pos["qty"] -= qty
if pos["qty"] <= 0:
del self.positions[ticker]
await notify_tp1(ticker, name, pnl_pct)
elif reason in ("TP2", "SL", "TIME", "FORCE"):
del self.positions[ticker]
if reason == "TP2":
await notify_tp2(ticker, name, pnl_pct)
elif reason == "SL":
await notify_sl(ticker, name, pnl_pct)
# L2/L3 체크 후 디스코드 경고
if not self.risk.can_trade():
await notify_risk(
self.risk.stop_reason.split(":")[0],
self.risk.stop_reason
)
# ─────────────────────────────────────────
# 강제 청산 (14:50)
# ─────────────────────────────────────────
async def force_exit_all(self):
"""14:50 전량 강제 청산"""
logger.info("14:50 강제 청산 시작")
for ticker, pos in list(self.positions.items()):
try:
price_info = await self.kis.get_price(ticker)
current = price_info["current"]
await self._do_exit(
ticker, pos, current,
qty=pos["qty"], reason="FORCE"
)
except Exception as e:
logger.error(f"강제 청산 실패 {ticker}: {e}")
await notify_force_exit()
logger.info("강제 청산 완료")
# ─────────────────────────────────────────
# 일일 결산 (15:10)
# ─────────────────────────────────────────
async def daily_summary(self):
"""당일 결산 로그 및 디스코드 알림"""
today = datetime.now().strftime("%Y-%m-%d")
with get_conn() as conn:
rows = conn.execute("""
SELECT pnl FROM trades
WHERE date=? AND exit_time IS NOT NULL
""", (today,)).fetchall()
pnls = [r[0] for r in rows if r[0] is not None]
total = len(pnls)
wins = sum(1 for p in pnls if p > 0)
losses = total - wins
net = sum(pnls)
await notify_daily_summary(total, wins, losses, net)
self.risk.reset_daily()
logger.info(f"결산: {total}회 / 승{wins}{losses} / {net:+,.0f}")
# ─────────────────────────────────────────
# 스케줄러
# ─────────────────────────────────────────
async def run():
bot = StockBot()
await bot.initialize()
while True:
now = datetime.now().strftime("%H:%M")
# 07:30 AI 판단 (컨텍스트 로드)
if now == "08:05":
bot.strategy.load_ai_context()
ctx = bot.strategy.context
await notify_ai_result(
ctx["market_sentiment"],
ctx["sentiment_score"],
ctx.get("hot_sectors", []),
ctx.get("avoid_sectors", []),
ctx.get("reason", ""),
)
bot.risk.set_risk_level(ctx.get("risk_level", "보통"))
# 08:30 유니버스 갱신
elif now == "08:30":
await bot.update_universe()
# 08:50 목표가 계산
elif now == "08:50":
await bot.calc_targets()
# 09:00 매매 루프 시작
elif now == "09:00":
await bot.trading_loop()
# 15:10 결산
elif now == "15:10":
await bot.daily_summary()
await asyncio.sleep(30)
if __name__ == "__main__":
os.makedirs("logs", exist_ok=True)
os.makedirs("data", exist_ok=True)
logger.info("=" * 50)
logger.info("단타 자동매매 시스템 시작")
logger.info(f"모드: {'모의투자' if os.getenv('KIS_MOCK','true')=='true' else '실거래'}")
logger.info(f"DRY_RUN: {os.getenv('DRY_RUN','true')}")
logger.info("=" * 50)
asyncio.run(run())
View File
+86
View File
@@ -0,0 +1,86 @@
"""
monitor/dashboard.py - Streamlit 대시보드
실행: streamlit run monitor/dashboard.py --server.port 8501
"""
import streamlit as st
import pandas as pd
import sqlite3
import json
import os
from datetime import datetime, timedelta
DB_PATH = os.getenv("DB_PATH", "data/stockbot.db")
AI_CTX = os.getenv("AI_CONTEXT_PATH", "data/daily_context.json")
st.set_page_config(page_title="단타봇 대시보드", layout="wide")
st.title("📈 단타 자동매매 대시보드")
st.caption(f"마지막 갱신: {datetime.now().strftime('%H:%M:%S')}")
# ── AI 판단 현황 ──
st.subheader("🤖 오늘의 AI 판단")
try:
with open(AI_CTX, encoding="utf-8") as f:
ctx = json.load(f)
col1, col2, col3, col4 = st.columns(4)
col1.metric("시장 분위기", ctx.get("market_sentiment", "-"))
col2.metric("감성 점수", f"{ctx.get('sentiment_score', 0)}")
col3.metric("리스크 레벨", ctx.get("risk_level", "-"))
col4.metric("거래 허용", "" if ctx.get("trade_allowed") else "")
st.info(f"💬 {ctx.get('reason', '')}")
col_l, col_r = st.columns(2)
col_l.write(f"**주목 섹터:** {', '.join(ctx.get('hot_sectors', []))}")
col_r.write(f"**회피 섹터:** {', '.join(ctx.get('avoid_sectors', []))}")
except:
st.warning("AI 판단 데이터 없음 (08:00 이후 생성)")
st.divider()
# ── 오늘 매매 현황 ──
st.subheader("📊 오늘 매매 현황")
try:
conn = sqlite3.connect(DB_PATH)
today = datetime.now().strftime("%Y-%m-%d")
df = pd.read_sql(
"SELECT * FROM trades WHERE date=?", conn, params=(today,)
)
conn.close()
if df.empty:
st.info("오늘 매매 없음")
else:
total = len(df)
wins = (df["pnl"] > 0).sum()
net = df["pnl"].sum()
win_rt = wins / total * 100 if total else 0
c1, c2, c3, c4 = st.columns(4)
c1.metric("총 매매", f"{total}")
c2.metric("승률", f"{win_rt:.0f}%")
c3.metric("순손익", f"{net:+,.0f}")
c4.metric("승/패", f"{wins}/{total-wins}")
st.dataframe(df[["entry_time","ticker","name","entry_price",
"exit_price","pnl","exit_reason"]].fillna("-"),
use_container_width=True)
except Exception as e:
st.error(f"DB 연결 실패: {e}")
st.divider()
# ── 주간 손익 ──
st.subheader("📅 최근 7일 손익")
try:
conn = sqlite3.connect(DB_PATH)
week_ago = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")
df_w = pd.read_sql(
"SELECT date, SUM(pnl) as net_pnl, COUNT(*) as trades "
"FROM trades WHERE date >= ? GROUP BY date ORDER BY date",
conn, params=(week_ago,)
)
conn.close()
if not df_w.empty:
st.bar_chart(df_w.set_index("date")["net_pnl"])
st.dataframe(df_w, use_container_width=True)
except:
st.info("주간 데이터 없음")
st.button("🔄 새로고침", on_click=st.rerun)
+79
View File
@@ -0,0 +1,79 @@
"""
monitor/notifier.py
디스코드 Webhook 알림 (단방향)
aiohttp만 사용 - 별도 라이브러리 없음
"""
import os
import asyncio
import aiohttp
import logging
logger = logging.getLogger(__name__)
WEBHOOK_URL = os.getenv("DISCORD_WEBHOOK_URL", "")
async def send(message: str) -> None:
"""디스코드 Webhook 메시지 전송"""
if not WEBHOOK_URL:
logger.warning(f"[Discord 미설정] {message}")
return
try:
async with aiohttp.ClientSession() as session:
await session.post(
WEBHOOK_URL,
json={"content": message},
timeout=aiohttp.ClientTimeout(total=5)
)
except Exception as e:
logger.error(f"Discord 알림 실패: {e}")
# ── 이벤트별 메시지 ──
async def notify_ai_result(sentiment: str, score: int,
hot: list, avoid: list, reason: str):
hot_str = ",".join(hot) if hot else "없음"
avoid_str = ",".join(avoid) if avoid else "없음"
await send(
f"[AI분석] 시장: {sentiment}({score}점) | "
f"주목: {hot_str} | 회피: {avoid_str}\n💬 {reason}"
)
async def notify_ai_blocked(ticker: str, name: str, reason: str):
await send(f"[AI차단] {name}({ticker}) - {reason}")
async def notify_buy(ticker: str, name: str, price: int,
target: int, stop: int, boosted: bool = False):
star = "" if boosted else ""
await send(
f"[매수{star}] {name}({ticker}) {price:,}원 | "
f"목표 {target:,} | 손절 {stop:,}"
)
async def notify_tp1(ticker: str, name: str, pct: float):
await send(f"[익절1] {name}({ticker}) +{pct:.1f}% / 잔여 50%")
async def notify_tp2(ticker: str, name: str, pct: float):
await send(f"[익절2] {name}({ticker}) +{pct:.1f}% / 전량 청산")
async def notify_sl(ticker: str, name: str, pct: float):
await send(f"[손절] {name}({ticker}) {pct:.1f}% / 즉시 청산")
async def notify_force_exit():
await send("[14:50 강제청산] 전 포지션 청산 완료")
async def notify_risk(level: str, message: str):
await send(f"[경고-{level}] {message}")
async def notify_daily_summary(trades: int, wins: int,
losses: int, net_pnl: float):
win_rate = wins / trades * 100 if trades else 0
await send(
f"[결산] 매매 {trades}회 / 승 {wins}{losses} "
f"({win_rate:.0f}%) / 순손익 {net_pnl:+,.0f}"
)
async def notify_error(message: str):
await send(f"[긴급] {message}")
async def notify_ai_fallback():
await send("[경고] AI 판단 실패 → 기본값 적용 (비중 80%)")
+8
View File
@@ -0,0 +1,8 @@
aiohttp==3.9.5
python-dotenv==1.0.1
APScheduler==3.10.4
beautifulsoup4==4.12.3
redis==5.0.7
streamlit==1.36.0
pandas==2.2.2
numpy==1.26.4
View File
+107
View File
@@ -0,0 +1,107 @@
"""
risk/manager.py
리스크 매니저 L1~L5
기획서 v2.1 기준
"""
import logging
from app.config import (
SL_PCT, DAILY_SL_PCT, CONSEC_LOSS,
AI_RISK_SL_MAP, POS_SIZE_PCT, MAX_POSITIONS
)
logger = logging.getLogger(__name__)
class RiskManager:
def __init__(self, init_cash: float):
self.init_cash = init_cash
self.daily_pnl = 0.0
self.weekly_pnl = 0.0
self.monthly_pnl = 0.0
self.consec_loss = 0
self.trading_stopped = False
self.stop_reason = ""
self.risk_level = "보통"
def set_risk_level(self, level: str):
"""AI 판단 결과로 risk_level 설정"""
self.risk_level = level
def get_sl_pct(self) -> float:
"""현재 risk_level에 따른 손절 비율 반환"""
return AI_RISK_SL_MAP.get(self.risk_level, SL_PCT)
def get_pos_size(self, cash: float, multiplier: float = 1.0) -> float:
"""포지션 사이즈 계산 (AI multiplier 반영)"""
return cash * POS_SIZE_PCT * multiplier
# ── 손실 기록 ──
def record_trade(self, pnl: float):
"""매매 결과 기록 및 손실 한도 체크"""
self.daily_pnl += pnl
self.weekly_pnl += pnl
self.monthly_pnl += pnl
if pnl < 0:
self.consec_loss += 1
else:
self.consec_loss = 0
self._check_limits()
def _check_limits(self):
"""L1~L5 손실 한도 체크"""
# L2: 일일 누적 손실 -3%
if self.daily_pnl / self.init_cash < -DAILY_SL_PCT:
self._stop("L2", f"일일 손실 {self.daily_pnl/self.init_cash*100:.1f}% 도달")
# L3: 연속 손절 3회
if self.consec_loss >= CONSEC_LOSS:
self._stop("L3", f"{CONSEC_LOSS}연속 손절 발생")
# L4: 주간 누적 -7%
if self.weekly_pnl / self.init_cash < -0.07:
self._stop("L4", f"주간 손실 {self.weekly_pnl/self.init_cash*100:.1f}%")
# L5: 월간 누적 -15%
if self.monthly_pnl / self.init_cash < -0.15:
self._stop("L5", f"월간 손실 {self.monthly_pnl/self.init_cash*100:.1f}%")
def _stop(self, level: str, reason: str):
self.trading_stopped = True
self.stop_reason = f"{level}: {reason}"
logger.warning(f"매매 중단 - {self.stop_reason}")
# ── 상태 조회 ──
def can_trade(self) -> bool:
return not self.trading_stopped
def can_add_position(self, current_positions: int) -> bool:
return (not self.trading_stopped
and current_positions < MAX_POSITIONS)
def reset_daily(self):
"""매일 장 시작 전 일일 손익 초기화"""
self.daily_pnl = 0.0
self.consec_loss = 0
self.trading_stopped = False
self.stop_reason = ""
def reset_weekly(self):
self.weekly_pnl = 0.0
def reset_monthly(self):
self.monthly_pnl = 0.0
def status(self) -> dict:
return {
"trading_stopped": self.trading_stopped,
"stop_reason" : self.stop_reason,
"daily_pnl" : self.daily_pnl,
"weekly_pnl" : self.weekly_pnl,
"monthly_pnl" : self.monthly_pnl,
"consec_loss" : self.consec_loss,
"risk_level" : self.risk_level,
"sl_pct" : self.get_sl_pct(),
}
View File
+19
View File
@@ -0,0 +1,19 @@
"""
strategy/base.py - 전략 추상 클래스
"""
from abc import ABC, abstractmethod
class BaseStrategy(ABC):
@abstractmethod
def check_entry(self, ticker: str, name: str,
current_price: float, **kwargs) -> dict:
"""진입 신호 체크. signal, reason, boosted, multiplier 반환"""
pass
@abstractmethod
def check_exit(self, ticker: str, entry_price: float,
current_price: float, qty: int,
tp1_done: bool, sl_pct: float) -> dict:
"""청산 신호 체크. signal, reason, qty 반환"""
pass
+217
View File
@@ -0,0 +1,217 @@
"""
strategy/volatility_breakout.py
변동성 돌파 전략 + AI 필터
기획서 v2.1 기준
"""
import json
import logging
import os
from datetime import datetime
from app.config import (
STRATEGY_K, TP1_PCT, TP2_PCT,
ENTRY_START, ENTRY_END,
AI_CONTEXT_PATH, AI_MIN_SCORE,
AI_BOOST_MULTI, MIN_TRADE_AMOUNT,
KOSPI_MIN_CHG
)
logger = logging.getLogger(__name__)
# AI fallback 기본값
DEFAULT_CONTEXT = {
"trade_allowed" : True,
"market_sentiment" : "중립",
"sentiment_score" : 50,
"risk_level" : "보통",
"hot_sectors" : [],
"avoid_sectors" : [],
"boosted_tickers" : [],
"blacklist_tickers" : [],
"position_size_multiplier": 0.8,
"reason" : "AI 판단 실패 - 기본값 적용",
}
class VolatilityBreakout:
"""
변동성 돌파 전략
목표가 = 당일 시가 + (전일 고가 - 전일 저가) × K
현재가 >= 목표가 → 진입 신호
"""
def __init__(self):
self.context = DEFAULT_CONTEXT.copy()
self.prev_data = {} # ticker → {high, low, amount} 전일 데이터
self.today_open = {} # ticker → 당일 시가
self.targets = {} # ticker → 목표가
# ── AI 컨텍스트 로드 ──
def load_ai_context(self) -> dict:
"""daily_context.json 로드, 실패 시 fallback"""
try:
if not os.path.exists(AI_CONTEXT_PATH):
logger.warning("daily_context.json 없음 → fallback")
return DEFAULT_CONTEXT.copy()
with open(AI_CONTEXT_PATH, encoding="utf-8") as f:
ctx = json.load(f)
# 날짜 체크 (오늘 날짜 파일인지 확인)
today = datetime.now().strftime("%Y-%m-%d")
if ctx.get("date") != today:
logger.warning(f"AI 컨텍스트 날짜 불일치: {ctx.get('date')} → fallback")
return DEFAULT_CONTEXT.copy()
# 최소 감성 점수 체크
if ctx.get("sentiment_score", 50) < AI_MIN_SCORE:
ctx["trade_allowed"] = False
logger.info(f"감성점수 {ctx['sentiment_score']} < {AI_MIN_SCORE} → 거래 중단")
self.context = ctx
logger.info(
f"AI 컨텍스트 로드: {ctx['market_sentiment']}({ctx['sentiment_score']}점) "
f"/ {ctx['reason']}"
)
return ctx
except Exception as e:
logger.error(f"AI 컨텍스트 로드 실패: {e} → fallback")
return DEFAULT_CONTEXT.copy()
# ── 목표가 계산 ──
def set_prev_data(self, ticker: str, high: float,
low: float, amount: float):
"""전일 고가/저가/거래대금 저장"""
self.prev_data[ticker] = {
"high" : high,
"low" : low,
"amount": amount,
}
def set_today_open(self, ticker: str, open_price: float):
"""당일 시가로 목표가 계산"""
prev = self.prev_data.get(ticker)
if not prev or prev["amount"] < MIN_TRADE_AMOUNT:
return
prev_range = prev["high"] - prev["low"]
if prev_range <= 0:
return
target = open_price + prev_range * STRATEGY_K
self.today_open[ticker] = open_price
self.targets[ticker] = target
def get_target(self, ticker: str) -> float:
return self.targets.get(ticker, 0.0)
# ── 진입 신호 판단 ──
def check_entry(self, ticker: str, name: str,
current_price: float, sector: str = "") -> dict:
"""
진입 신호 체크
반환: {"signal": bool, "reason": str, "boosted": bool, "multiplier": float}
"""
result = {"signal": False, "reason": "", "boosted": False, "multiplier": 1.0}
# 시간 체크
now = datetime.now().strftime("%H:%M")
if not (ENTRY_START <= now <= ENTRY_END):
result["reason"] = f"진입 시간 외 ({now})"
return result
# 목표가 확인
target = self.targets.get(ticker, 0)
if target <= 0:
result["reason"] = "목표가 없음"
return result
# 기술적 조건: 현재가 >= 목표가
if current_price < target:
result["reason"] = f"목표가 미달 ({current_price:,} < {target:,.0f})"
return result
# ── AI 필터 ──
ctx = self.context
# trade_allowed 체크
if not ctx.get("trade_allowed", True):
result["reason"] = f"AI 거래 중단: {ctx.get('reason', '')}"
return result
# 블랙리스트 체크
if ticker in ctx.get("blacklist_tickers", []):
result["reason"] = "AI 블랙리스트"
return result
# 섹터 회피 체크
avoid = ctx.get("avoid_sectors", [])
if sector and any(s in sector for s in avoid):
result["reason"] = f"AI 섹터 회피 ({sector})"
return result
# 부스트 체크
boosted = ticker in ctx.get("boosted_tickers", [])
multiplier = ctx.get("position_size_multiplier", 1.0)
if boosted:
multiplier = min(multiplier * AI_BOOST_MULTI, 1.5)
result.update({
"signal" : True,
"reason" : f"목표가 돌파 ({current_price:,} >= {target:,.0f})",
"boosted" : boosted,
"multiplier": multiplier,
"target" : target,
})
return result
# ── 청산 신호 판단 ──
def check_exit(self, ticker: str, entry_price: float,
current_price: float, qty: int,
tp1_done: bool, sl_pct: float) -> dict:
"""
청산 신호 체크
우선순위: 손절 > 1차 익절 > 2차 익절
반환: {"signal": bool, "reason": str, "qty": int}
"""
result = {"signal": False, "reason": "", "qty": 0}
sl_price = entry_price * (1 - sl_pct)
tp1_price = entry_price * (1 + TP1_PCT)
tp2_price = entry_price * (1 + TP2_PCT)
# 손절 (최우선)
if current_price <= sl_price:
result.update({
"signal": True,
"reason": "SL",
"qty" : qty,
"price" : sl_price,
})
return result
# 2차 익절
if current_price >= tp2_price:
result.update({
"signal": True,
"reason": "TP2",
"qty" : qty - (qty // 2 if not tp1_done else 0),
"price" : tp2_price,
})
return result
# 1차 익절 (아직 안 했으면)
if not tp1_done and current_price >= tp1_price:
result.update({
"signal": True,
"reason": "TP1",
"qty" : qty // 2,
"price" : tp1_price,
})
return result
return result