Files
Stock-trading-programming/app/execution/kis_client.py
T

600 lines
23 KiB
Python

"""
kis_client.py
KIS Open API REST + WebSocket 래퍼
- 토큰 자동 발급/갱신
- 모의투자/실거래 모드 자동 전환
- rate limit 제어 (초당 20건)
"""
import os
import json
import time
import asyncio
import aiohttp
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, Callable
logger = logging.getLogger(__name__)
# ── 모드별 베이스 URL ──
URL_REAL = "https://openapi.koreainvestment.com:9443"
URL_MOCK = "https://openapivts.koreainvestment.com:29443"
class KISClient:
"""
KIS Open API 클라이언트
모의투자/실거래 모드를 .env의 KIS_MOCK 값으로 자동 전환
"""
def __init__(self):
self.is_mock = os.getenv("KIS_MOCK", "true").lower() == "true"
self.base_url = URL_MOCK if self.is_mock else URL_REAL
# 모드별 키 자동 선택
if self.is_mock:
self.app_key = os.getenv("KIS_MOCK_APP_KEY", "")
self.app_secret = os.getenv("KIS_MOCK_APP_SECRET", "")
self.account_no = os.getenv("KIS_MOCK_ACCOUNT_NO", "")
else:
self.app_key = os.getenv("KIS_APP_KEY", "")
self.app_secret = os.getenv("KIS_APP_SECRET", "")
self.account_no = os.getenv("KIS_ACCOUNT_NO", "")
# 계좌번호 파싱 (앞 8자리 + 뒤 2자리)
self._parse_account()
# 토큰 관련
self._access_token : Optional[str] = None
self._token_expires_at: Optional[datetime] = None
# 토큰 파일 캐시 경로 (재시작 시 재사용)
mode_tag = "mock" if self.is_mock else "real"
self._token_cache_file = os.path.join(
os.path.dirname(__file__), "..", "..", "data", f"kis_token_{mode_tag}.json"
)
self._load_token_from_file()
# rate limit: 모의투자 1건/초, 실거래 5건/초
self._rate_limit = 1 if self.is_mock else 5
self._semaphore = asyncio.Semaphore(1)
self._req_times : list = []
mode = "모의투자" if self.is_mock else "실거래"
logger.info(f"KISClient 초기화 완료 [{mode}] 계좌: {self.account_no}")
def _parse_account(self):
"""계좌번호 파싱: '50123456-01' → ('50123456', '01')"""
raw = self.account_no.replace("-", "")
if len(raw) >= 10:
self.acct_prefix = raw[:8]
self.acct_suffix = raw[8:10]
else:
self.acct_prefix = raw
self.acct_suffix = "01"
# ─────────────────────────────────────────
# 토큰 관리
# ─────────────────────────────────────────
def _load_token_from_file(self):
"""재시작 시 파일 캐시에서 토큰 복원"""
try:
if os.path.exists(self._token_cache_file):
with open(self._token_cache_file, encoding="utf-8") as f:
cached = json.load(f)
expires_at = datetime.fromisoformat(cached["expires_at"])
if datetime.now() < expires_at - timedelta(minutes=30):
self._access_token = cached["access_token"]
self._token_expires_at = expires_at
logger.info("KIS 토큰 파일 캐시 복원 완료")
except Exception:
pass
def _save_token_to_file(self):
"""토큰을 파일에 저장 (재시작 시 재사용)"""
try:
os.makedirs(os.path.dirname(self._token_cache_file), exist_ok=True)
with open(self._token_cache_file, "w", encoding="utf-8") as f:
json.dump({
"access_token": self._access_token,
"expires_at": self._token_expires_at.isoformat(),
}, f)
except Exception:
pass
async def get_access_token(self) -> str:
"""액세스 토큰 발급/갱신 (만료 30분 전 자동 갱신)"""
now = datetime.now()
if (self._access_token
and self._token_expires_at
and now < self._token_expires_at - timedelta(minutes=30)):
return self._access_token
url = f"{self.base_url}/oauth2/tokenP"
body = {
"grant_type" : "client_credentials",
"appkey" : self.app_key,
"appsecret" : self.app_secret,
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=body) as resp:
data = await resp.json()
if "access_token" not in data:
# EGW00133: 분당 1회 제한 — 캐시 토큰이 아직 유효하면 그대로 사용
if data.get("error_code") == "EGW00133" and self._access_token:
logger.warning("KIS 토큰 발급 속도 제한(EGW00133) — 기존 캐시 토큰 유지")
return self._access_token
# 캐시 없으면 60초 대기 후 1회 재시도
if data.get("error_code") == "EGW00133":
logger.warning("KIS 토큰 발급 속도 제한(EGW00133) — 60초 후 재시도")
await asyncio.sleep(60)
async with aiohttp.ClientSession() as session:
async with session.post(url, json=body) as resp:
data = await resp.json()
if "access_token" not in data:
raise RuntimeError(f"토큰 발급 실패: {data}")
self._access_token = data["access_token"]
# 유효기간 24시간
self._token_expires_at = now + timedelta(hours=24)
self._save_token_to_file()
logger.info("KIS 액세스 토큰 발급/갱신 완료")
return self._access_token
# ─────────────────────────────────────────
# REST API 기본 호출
# ─────────────────────────────────────────
async def _request(
self,
method : str,
path : str,
tr_id : str,
params : Optional[Dict] = None,
body : Optional[Dict] = None,
) -> Dict[str, Any]:
"""
KIS REST API 공통 호출
- rate limit 제어 (초당 20건)
- 토큰 자동 첨부
"""
token = await self.get_access_token()
url = f"{self.base_url}{path}"
headers = {
"content-type" : "application/json; charset=utf-8",
"authorization" : f"Bearer {token}",
"appkey" : self.app_key,
"appsecret" : self.app_secret,
"tr_id" : tr_id,
"custtype" : "P", # 개인
}
async with self._semaphore:
now = time.monotonic()
self._req_times = [t for t in self._req_times if now - t < 1.0]
if len(self._req_times) >= self._rate_limit:
wait = 1.0 - (now - self._req_times[0])
if wait > 0:
await asyncio.sleep(wait)
self._req_times.append(time.monotonic())
async with aiohttp.ClientSession() as session:
if method == "GET":
async with session.get(url, headers=headers, params=params) as r:
data = await r.json()
else:
async with session.post(url, headers=headers, json=body) as r:
data = await r.json()
# 응답 코드 체크
rt_cd = data.get("rt_cd", "")
if rt_cd != "0":
msg = data.get("msg1", "알 수 없는 오류")
logger.error(f"KIS API 오류 [{tr_id}]: {rt_cd} - {msg}")
raise RuntimeError(f"KIS API 오류: {msg}")
return data
# ─────────────────────────────────────────
# 시세 조회
# ─────────────────────────────────────────
async def get_price(self, ticker: str) -> Dict:
"""주식 현재가 조회 (FHKST01010100)"""
data = await self._request(
method = "GET",
path = "/uapi/domestic-stock/v1/quotations/inquire-price",
tr_id = "FHKST01010100",
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD" : ticker,
}
)
o = data["output"]
return {
"ticker" : ticker,
"current" : int(o["stck_prpr"]), # 현재가
"open" : int(o["stck_oprc"]), # 시가
"high" : int(o["stck_hgpr"]), # 고가
"low" : int(o["stck_lwpr"]), # 저가
"prev_close" : int(o["stck_sdpr"]), # 전일 종가
"volume" : int(o["acml_vol"]), # 누적 거래량
"change_pct" : float(o["prdy_ctrt"]), # 등락률
"market_cap" : int(o.get("hts_avls", 0)) * 100_000_000, # 시가총액 (억→원)
}
async def get_ohlcv_daily(self, ticker: str, start: str, end: str) -> list:
"""
주식 기간별 시세 (일봉) - 백테스트/AI 분석용
start, end: 'YYYYMMDD'
"""
data = await self._request(
method = "GET",
path = "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice",
tr_id = "FHKST03010100",
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD" : ticker,
"FID_INPUT_DATE_1" : start,
"FID_INPUT_DATE_2" : end,
"FID_PERIOD_DIV_CODE" : "D", # 일봉
"FID_ORG_ADJ_PRC" : "0",
}
)
result = []
for row in data.get("output2", []):
result.append({
"date" : row["stck_bsop_date"],
"open" : int(row["stck_oprc"]),
"high" : int(row["stck_hgpr"]),
"low" : int(row["stck_lwpr"]),
"close" : int(row["stck_clpr"]),
"volume": int(row["acml_vol"]),
})
return result
# ─────────────────────────────────────────
# 주문
# ─────────────────────────────────────────
async def order_buy(
self,
ticker : str,
qty : int,
price : int = 0, # 0 = 시장가
order_type: str = "01", # 01=시장가, 00=지정가
) -> Dict:
"""주식 매수 주문"""
dry_run = os.getenv("DRY_RUN", "true").lower() == "true"
if dry_run:
price_info = await self.get_price(ticker)
current = price_info["current"]
logger.info(f"[DRY_RUN] 매수 {ticker} {qty}주 @ {current:,}")
return {"dry_run": True, "ticker": ticker, "qty": qty, "entry_price": current}
# 모의/실거래 TR 구분
tr_id = "VTTC0802U" if self.is_mock else "TTTC0802U"
data = await self._request(
method = "POST",
path = "/uapi/domestic-stock/v1/trading/order-cash",
tr_id = tr_id,
body = {
"CANO" : self.acct_prefix,
"ACNT_PRDT_CD": self.acct_suffix,
"PDNO" : ticker,
"ORD_DVSN" : order_type,
"ORD_QTY" : str(qty),
"ORD_UNPR" : str(price),
}
)
logger.info(f"매수 주문 완료: {ticker} {qty}")
return data
async def order_sell(
self,
ticker : str,
qty : int,
price : int = 0,
order_type: str = "01",
) -> Dict:
"""주식 매도 주문"""
dry_run = os.getenv("DRY_RUN", "true").lower() == "true"
if dry_run:
price_info = await self.get_price(ticker)
current = price_info["current"]
logger.info(f"[DRY_RUN] 매도 {ticker} {qty}주 @ {current:,}")
return {"dry_run": True, "ticker": ticker, "qty": qty, "exit_price": current}
tr_id = "VTTC0801U" if self.is_mock else "TTTC0801U"
data = await self._request(
method = "POST",
path = "/uapi/domestic-stock/v1/trading/order-cash",
tr_id = tr_id,
body = {
"CANO" : self.acct_prefix,
"ACNT_PRDT_CD": self.acct_suffix,
"PDNO" : ticker,
"ORD_DVSN" : order_type,
"ORD_QTY" : str(qty),
"ORD_UNPR" : str(price),
}
)
logger.info(f"매도 주문 완료: {ticker} {qty}")
return data
# ─────────────────────────────────────────
# 잔고 조회
# ─────────────────────────────────────────
async def get_balance(self) -> Dict:
"""주식 잔고 조회 (보유 종목 + 예수금)"""
tr_id = "VTTC8434R" if self.is_mock else "TTTC8434R"
data = await self._request(
method = "GET",
path = "/uapi/domestic-stock/v1/trading/inquire-balance",
tr_id = tr_id,
params = {
"CANO" : self.acct_prefix,
"ACNT_PRDT_CD" : self.acct_suffix,
"AFHR_FLPR_YN" : "N",
"OFL_YN" : "",
"INQR_DVSN" : "02",
"UNPR_DVSN" : "01",
"FUND_STTL_ICLD_YN" : "N",
"FNCG_AMT_AUTO_RDPT_YN": "N",
"PRCS_DVSN" : "01",
"CTX_AREA_FK100" : "",
"CTX_AREA_NK100" : "",
}
)
holdings = []
for item in data.get("output1", []):
qty = int(item.get("hldg_qty", "0"))
if qty > 0:
holdings.append({
"ticker" : item["pdno"],
"name" : item["prdt_name"],
"qty" : qty,
"avg_price" : int(item["pchs_avg_pric"].replace(".", "")),
"current" : int(item["prpr"]),
"pnl_pct" : float(item["evlu_pfls_rt"]),
})
cash = int(data["output2"][0].get("dnca_tot_amt", "0")) if data.get("output2") else 0
return {
"holdings" : holdings,
"cash" : cash,
"total_cnt": len(holdings),
}
# ─────────────────────────────────────────
# AI 판단용 수급 데이터
# ─────────────────────────────────────────
async def get_volume_rank(self, top_n: int = 30) -> list:
"""거래량 순위 상위 종목 (AI 판단용)"""
data = await self._request(
method = "GET",
path = "/uapi/domestic-stock/v1/quotations/volume-rank",
tr_id = "FHPST01710000",
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_COND_SCR_DIV_CODE" : "20171",
"FID_INPUT_ISCD" : "0000",
"FID_DIV_CLS_CODE" : "0",
"FID_BLNG_CLS_CODE" : "0",
"FID_TRGT_CLS_CODE" : "111111111",
"FID_TRGT_EXLS_CLS_CODE": "000000",
"FID_INPUT_PRICE_1" : "",
"FID_INPUT_PRICE_2" : "",
"FID_VOL_CNT" : "",
"FID_INPUT_DATE_1" : "",
}
)
result = []
for i, row in enumerate(data.get("output", [])[:top_n]):
result.append({
"rank" : i + 1,
"ticker" : row["mksc_shrn_iscd"],
"name" : row["hts_kor_isnm"],
"volume" : int(row["acml_vol"]),
"change_pct": float(row["prdy_ctrt"]),
})
return result
async def get_foreign_institution_rank(self, top_n: int = 30) -> Dict:
"""외국인/기관 순매수 상위 (AI 판단용)"""
data = await self._request(
method = "GET",
path = "/uapi/domestic-stock/v1/quotations/inquire-investor",
tr_id = "FHKST04430000",
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD" : "0000",
"FID_INPUT_DATE_1" : "",
"FID_INPUT_DATE_2" : "",
"FID_PERIOD_DIV_CODE" : "D",
}
)
foreign = []
institution = []
for row in data.get("output", [])[:top_n]:
entry = {
"ticker": row.get("mksc_shrn_iscd", ""),
"name" : row.get("hts_kor_isnm", ""),
"amount": int(row.get("frgn_ntby_qty", "0")),
}
foreign.append(entry)
entry2 = {
"ticker": row.get("mksc_shrn_iscd", ""),
"name" : row.get("hts_kor_isnm", ""),
"amount": int(row.get("orgn_ntby_qty", "0")),
}
institution.append(entry2)
return {
"foreign" : sorted(foreign, key=lambda x: x["amount"], reverse=True)[:top_n],
"institution": sorted(institution, key=lambda x: x["amount"], reverse=True)[:top_n],
}
async def get_sector_trend(self) -> list:
"""업종별 등락률 (AI 판단용)"""
data = await self._request(
method = "GET",
path = "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice",
tr_id = "FHKST03010100",
params = {
"FID_COND_MRKT_DIV_CODE": "U", # 업종
"FID_INPUT_ISCD" : "0001",
"FID_INPUT_DATE_1" : "",
"FID_INPUT_DATE_2" : "",
"FID_PERIOD_DIV_CODE" : "D",
"FID_ORG_ADJ_PRC" : "0",
}
)
result = []
for row in data.get("output1", []):
result.append({
"sector" : row.get("hts_kor_isnm", ""),
"change_pct": float(row.get("prdy_ctrt", "0")),
})
return result
# ─────────────────────────────────────────
# WebSocket 클라이언트 (실시간 시세)
# ─────────────────────────────────────────
class KISWebSocket:
"""
KIS 실시간 시세 WebSocket
- 체결가 (H0STCNT0)
- 호가 (H0STASP0)
- VI (H0STVI0)
"""
WS_URL_REAL = "ws://ops.koreainvestment.com:21000"
WS_URL_MOCK = "ws://ops.koreainvestment.com:31000"
def __init__(self, kis_client: KISClient):
self.kis = kis_client
self.ws_url = self.WS_URL_MOCK if kis_client.is_mock else self.WS_URL_REAL
self._ws = None
self._handlers : Dict[str, Callable] = {} # ticker → callback
self._vi_handler: Optional[Callable] = None
self._running = False
def on_price(self, ticker: str, handler: Callable):
"""실시간 체결가 핸들러 등록"""
self._handlers[ticker] = handler
def on_vi(self, handler: Callable):
"""VI 발동 핸들러 등록"""
self._vi_handler = handler
async def subscribe(self, tickers: list):
"""종목 구독 시작"""
token = await self.kis.get_access_token()
# 접속키 발급
async with aiohttp.ClientSession() as session:
resp = await session.post(
f"{self.kis.base_url}/oauth2/Approval",
json={
"grant_type": "client_credentials",
"appkey" : self.kis.app_key,
"secretkey" : self.kis.app_secret,
}
)
key_data = await resp.json()
approval_key = key_data.get("approval_key", "")
async with aiohttp.ClientSession() as session:
async with session.ws_connect(self.ws_url) as ws:
self._ws = ws
self._running = True
logger.info(f"WebSocket 연결 완료: {len(tickers)}종목 구독 시작")
# 종목별 구독 등록
for ticker in tickers:
for tr_id in ["H0STCNT0", "H0STVI0"]:
await ws.send_json({
"header": {
"approval_key": approval_key,
"custtype" : "P",
"tr_type" : "1", # 등록
"content-type": "utf-8",
},
"body": {
"input": {
"tr_id" : tr_id,
"tr_key" : ticker,
}
}
})
logger.info("구독 등록 완료")
# 메시지 수신 루프
async for msg in ws:
if not self._running:
break
if msg.type == aiohttp.WSMsgType.TEXT:
await self._handle_message(msg.data)
elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
logger.error("WebSocket 연결 끊김")
self._running = False
break
async def _handle_message(self, raw: str):
"""WebSocket 메시지 파싱"""
try:
# KIS WebSocket 메시지 포맷: 헤더|바디
if raw.startswith("{"):
# JSON 형식 (시스템 메시지)
return
parts = raw.split("|")
if len(parts) < 4:
return
tr_id = parts[1]
data = parts[3].split("^")
if tr_id == "H0STCNT0":
# 실시간 체결가
ticker = data[0]
price = int(data[2])
volume = int(data[9])
handler = self._handlers.get(ticker)
if handler:
await handler(ticker, price, volume)
elif tr_id == "H0STVI0":
# VI 발동/해제
ticker = data[0]
vi_status = data[1] # 1=발동, 2=해제
ref_price = int(data[5]) if len(data) > 5 else 0
if self._vi_handler:
await self._vi_handler(ticker, vi_status, ref_price)
except Exception as e:
logger.error(f"WebSocket 메시지 파싱 오류: {e}")
async def close(self):
self._running = False
if self._ws:
await self._ws.close()
logger.info("WebSocket 연결 종료")