""" kis_client.py KIS Open API REST + WebSocket 래퍼 - 토큰 자동 발급/갱신 - 모의투자/실거래 모드 자동 전환 - rate limit 제어 (초당 20건) """ import os import json import time import asyncio import aiohttp import logging from datetime import datetime, timedelta from typing import Optional, Dict, Any, Callable logger = logging.getLogger(__name__) # ── 모드별 베이스 URL ── URL_REAL = "https://openapi.koreainvestment.com:9443" URL_MOCK = "https://openapivts.koreainvestment.com:29443" class KISClient: """ KIS Open API 클라이언트 모의투자/실거래 모드를 .env의 KIS_MOCK 값으로 자동 전환 """ def __init__(self): self.is_mock = os.getenv("KIS_MOCK", "true").lower() == "true" self.base_url = URL_MOCK if self.is_mock else URL_REAL # 모드별 키 자동 선택 if self.is_mock: self.app_key = os.getenv("KIS_MOCK_APP_KEY", "") self.app_secret = os.getenv("KIS_MOCK_APP_SECRET", "") self.account_no = os.getenv("KIS_MOCK_ACCOUNT_NO", "") else: self.app_key = os.getenv("KIS_APP_KEY", "") self.app_secret = os.getenv("KIS_APP_SECRET", "") self.account_no = os.getenv("KIS_ACCOUNT_NO", "") # 계좌번호 파싱 (앞 8자리 + 뒤 2자리) self._parse_account() # 토큰 관련 self._access_token : Optional[str] = None self._token_expires_at: Optional[datetime] = None # 토큰 파일 캐시 경로 (재시작 시 재사용) mode_tag = "mock" if self.is_mock else "real" self._token_cache_file = os.path.join( os.path.dirname(__file__), "..", "..", "data", f"kis_token_{mode_tag}.json" ) self._load_token_from_file() # rate limit: 모의투자 1건/초, 실거래 5건/초 self._rate_limit = 1 if self.is_mock else 5 self._semaphore = asyncio.Semaphore(1) self._req_times : list = [] mode = "모의투자" if self.is_mock else "실거래" logger.info(f"KISClient 초기화 완료 [{mode}] 계좌: {self.account_no}") def _parse_account(self): """계좌번호 파싱: '50123456-01' → ('50123456', '01')""" raw = self.account_no.replace("-", "") if len(raw) >= 10: self.acct_prefix = raw[:8] self.acct_suffix = raw[8:10] else: self.acct_prefix = raw self.acct_suffix = "01" # ───────────────────────────────────────── # 토큰 관리 # ───────────────────────────────────────── def _load_token_from_file(self): """재시작 시 파일 캐시에서 토큰 복원""" try: if os.path.exists(self._token_cache_file): with open(self._token_cache_file, encoding="utf-8") as f: cached = json.load(f) expires_at = datetime.fromisoformat(cached["expires_at"]) if datetime.now() < expires_at - timedelta(minutes=30): self._access_token = cached["access_token"] self._token_expires_at = expires_at logger.info("KIS 토큰 파일 캐시 복원 완료") except Exception: pass def _save_token_to_file(self): """토큰을 파일에 저장 (재시작 시 재사용)""" try: os.makedirs(os.path.dirname(self._token_cache_file), exist_ok=True) with open(self._token_cache_file, "w", encoding="utf-8") as f: json.dump({ "access_token": self._access_token, "expires_at": self._token_expires_at.isoformat(), }, f) except Exception: pass async def get_access_token(self) -> str: """액세스 토큰 발급/갱신 (만료 30분 전 자동 갱신)""" now = datetime.now() if (self._access_token and self._token_expires_at and now < self._token_expires_at - timedelta(minutes=30)): return self._access_token url = f"{self.base_url}/oauth2/tokenP" body = { "grant_type" : "client_credentials", "appkey" : self.app_key, "appsecret" : self.app_secret, } async with aiohttp.ClientSession() as session: async with session.post(url, json=body) as resp: data = await resp.json() if "access_token" not in data: raise RuntimeError(f"토큰 발급 실패: {data}") self._access_token = data["access_token"] # 유효기간 24시간 self._token_expires_at = now + timedelta(hours=24) self._save_token_to_file() logger.info("KIS 액세스 토큰 발급/갱신 완료") return self._access_token # ───────────────────────────────────────── # REST API 기본 호출 # ───────────────────────────────────────── async def _request( self, method : str, path : str, tr_id : str, params : Optional[Dict] = None, body : Optional[Dict] = None, ) -> Dict[str, Any]: """ KIS REST API 공통 호출 - rate limit 제어 (초당 20건) - 토큰 자동 첨부 """ token = await self.get_access_token() url = f"{self.base_url}{path}" headers = { "content-type" : "application/json; charset=utf-8", "authorization" : f"Bearer {token}", "appkey" : self.app_key, "appsecret" : self.app_secret, "tr_id" : tr_id, "custtype" : "P", # 개인 } async with self._semaphore: now = time.monotonic() self._req_times = [t for t in self._req_times if now - t < 1.0] if len(self._req_times) >= self._rate_limit: wait = 1.0 - (now - self._req_times[0]) if wait > 0: await asyncio.sleep(wait) self._req_times.append(time.monotonic()) async with aiohttp.ClientSession() as session: if method == "GET": async with session.get(url, headers=headers, params=params) as r: data = await r.json() else: async with session.post(url, headers=headers, json=body) as r: data = await r.json() # 응답 코드 체크 rt_cd = data.get("rt_cd", "") if rt_cd != "0": msg = data.get("msg1", "알 수 없는 오류") logger.error(f"KIS API 오류 [{tr_id}]: {rt_cd} - {msg}") raise RuntimeError(f"KIS API 오류: {msg}") return data # ───────────────────────────────────────── # 시세 조회 # ───────────────────────────────────────── async def get_price(self, ticker: str) -> Dict: """주식 현재가 조회 (FHKST01010100)""" data = await self._request( method = "GET", path = "/uapi/domestic-stock/v1/quotations/inquire-price", tr_id = "FHKST01010100", params = { "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD" : ticker, } ) o = data["output"] return { "ticker" : ticker, "current" : int(o["stck_prpr"]), # 현재가 "open" : int(o["stck_oprc"]), # 시가 "high" : int(o["stck_hgpr"]), # 고가 "low" : int(o["stck_lwpr"]), # 저가 "prev_close" : int(o["stck_sdpr"]), # 전일 종가 "volume" : int(o["acml_vol"]), # 누적 거래량 "change_pct" : float(o["prdy_ctrt"]), # 등락률 "market_cap" : int(o.get("hts_avls", 0)) * 100_000_000, # 시가총액 (억→원) } async def get_ohlcv_daily(self, ticker: str, start: str, end: str) -> list: """ 주식 기간별 시세 (일봉) - 백테스트/AI 분석용 start, end: 'YYYYMMDD' """ data = await self._request( method = "GET", path = "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice", tr_id = "FHKST03010100", params = { "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD" : ticker, "FID_INPUT_DATE_1" : start, "FID_INPUT_DATE_2" : end, "FID_PERIOD_DIV_CODE" : "D", # 일봉 "FID_ORG_ADJ_PRC" : "0", } ) result = [] for row in data.get("output2", []): result.append({ "date" : row["stck_bsop_date"], "open" : int(row["stck_oprc"]), "high" : int(row["stck_hgpr"]), "low" : int(row["stck_lwpr"]), "close" : int(row["stck_clpr"]), "volume": int(row["acml_vol"]), }) return result # ───────────────────────────────────────── # 주문 # ───────────────────────────────────────── async def order_buy( self, ticker : str, qty : int, price : int = 0, # 0 = 시장가 order_type: str = "01", # 01=시장가, 00=지정가 ) -> Dict: """주식 매수 주문""" dry_run = os.getenv("DRY_RUN", "true").lower() == "true" if dry_run: price_info = await self.get_price(ticker) current = price_info["current"] logger.info(f"[DRY_RUN] 매수 {ticker} {qty}주 @ {current:,}원") return {"dry_run": True, "ticker": ticker, "qty": qty, "entry_price": current} # 모의/실거래 TR 구분 tr_id = "VTTC0802U" if self.is_mock else "TTTC0802U" data = await self._request( method = "POST", path = "/uapi/domestic-stock/v1/trading/order-cash", tr_id = tr_id, body = { "CANO" : self.acct_prefix, "ACNT_PRDT_CD": self.acct_suffix, "PDNO" : ticker, "ORD_DVSN" : order_type, "ORD_QTY" : str(qty), "ORD_UNPR" : str(price), } ) logger.info(f"매수 주문 완료: {ticker} {qty}주") return data async def order_sell( self, ticker : str, qty : int, price : int = 0, order_type: str = "01", ) -> Dict: """주식 매도 주문""" dry_run = os.getenv("DRY_RUN", "true").lower() == "true" if dry_run: price_info = await self.get_price(ticker) current = price_info["current"] logger.info(f"[DRY_RUN] 매도 {ticker} {qty}주 @ {current:,}원") return {"dry_run": True, "ticker": ticker, "qty": qty, "exit_price": current} tr_id = "VTTC0801U" if self.is_mock else "TTTC0801U" data = await self._request( method = "POST", path = "/uapi/domestic-stock/v1/trading/order-cash", tr_id = tr_id, body = { "CANO" : self.acct_prefix, "ACNT_PRDT_CD": self.acct_suffix, "PDNO" : ticker, "ORD_DVSN" : order_type, "ORD_QTY" : str(qty), "ORD_UNPR" : str(price), } ) logger.info(f"매도 주문 완료: {ticker} {qty}주") return data # ───────────────────────────────────────── # 잔고 조회 # ───────────────────────────────────────── async def get_balance(self) -> Dict: """주식 잔고 조회 (보유 종목 + 예수금)""" tr_id = "VTTC8434R" if self.is_mock else "TTTC8434R" data = await self._request( method = "GET", path = "/uapi/domestic-stock/v1/trading/inquire-balance", tr_id = tr_id, params = { "CANO" : self.acct_prefix, "ACNT_PRDT_CD" : self.acct_suffix, "AFHR_FLPR_YN" : "N", "OFL_YN" : "", "INQR_DVSN" : "02", "UNPR_DVSN" : "01", "FUND_STTL_ICLD_YN" : "N", "FNCG_AMT_AUTO_RDPT_YN": "N", "PRCS_DVSN" : "01", "CTX_AREA_FK100" : "", "CTX_AREA_NK100" : "", } ) holdings = [] for item in data.get("output1", []): qty = int(item.get("hldg_qty", "0")) if qty > 0: holdings.append({ "ticker" : item["pdno"], "name" : item["prdt_name"], "qty" : qty, "avg_price" : int(item["pchs_avg_pric"].replace(".", "")), "current" : int(item["prpr"]), "pnl_pct" : float(item["evlu_pfls_rt"]), }) cash = int(data["output2"][0].get("dnca_tot_amt", "0")) if data.get("output2") else 0 return { "holdings" : holdings, "cash" : cash, "total_cnt": len(holdings), } # ───────────────────────────────────────── # AI 판단용 수급 데이터 # ───────────────────────────────────────── async def get_volume_rank(self, top_n: int = 30) -> list: """거래량 순위 상위 종목 (AI 판단용)""" data = await self._request( method = "GET", path = "/uapi/domestic-stock/v1/quotations/volume-rank", tr_id = "FHPST01710000", params = { "FID_COND_MRKT_DIV_CODE": "J", "FID_COND_SCR_DIV_CODE" : "20171", "FID_INPUT_ISCD" : "0000", "FID_DIV_CLS_CODE" : "0", "FID_BLNG_CLS_CODE" : "0", "FID_TRGT_CLS_CODE" : "111111111", "FID_TRGT_EXLS_CLS_CODE": "000000", "FID_INPUT_PRICE_1" : "", "FID_INPUT_PRICE_2" : "", "FID_VOL_CNT" : "", "FID_INPUT_DATE_1" : "", } ) result = [] for i, row in enumerate(data.get("output", [])[:top_n]): result.append({ "rank" : i + 1, "ticker" : row["mksc_shrn_iscd"], "name" : row["hts_kor_isnm"], "volume" : int(row["acml_vol"]), "change_pct": float(row["prdy_ctrt"]), }) return result async def get_foreign_institution_rank(self, top_n: int = 30) -> Dict: """외국인/기관 순매수 상위 (AI 판단용)""" data = await self._request( method = "GET", path = "/uapi/domestic-stock/v1/quotations/inquire-investor", tr_id = "FHKST04430000", params = { "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD" : "0000", "FID_INPUT_DATE_1" : "", "FID_INPUT_DATE_2" : "", "FID_PERIOD_DIV_CODE" : "D", } ) foreign = [] institution = [] for row in data.get("output", [])[:top_n]: entry = { "ticker": row.get("mksc_shrn_iscd", ""), "name" : row.get("hts_kor_isnm", ""), "amount": int(row.get("frgn_ntby_qty", "0")), } foreign.append(entry) entry2 = { "ticker": row.get("mksc_shrn_iscd", ""), "name" : row.get("hts_kor_isnm", ""), "amount": int(row.get("orgn_ntby_qty", "0")), } institution.append(entry2) return { "foreign" : sorted(foreign, key=lambda x: x["amount"], reverse=True)[:top_n], "institution": sorted(institution, key=lambda x: x["amount"], reverse=True)[:top_n], } async def get_sector_trend(self) -> list: """업종별 등락률 (AI 판단용)""" data = await self._request( method = "GET", path = "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice", tr_id = "FHKST03010100", params = { "FID_COND_MRKT_DIV_CODE": "U", # 업종 "FID_INPUT_ISCD" : "0001", "FID_INPUT_DATE_1" : "", "FID_INPUT_DATE_2" : "", "FID_PERIOD_DIV_CODE" : "D", "FID_ORG_ADJ_PRC" : "0", } ) result = [] for row in data.get("output1", []): result.append({ "sector" : row.get("hts_kor_isnm", ""), "change_pct": float(row.get("prdy_ctrt", "0")), }) return result # ───────────────────────────────────────── # WebSocket 클라이언트 (실시간 시세) # ───────────────────────────────────────── class KISWebSocket: """ KIS 실시간 시세 WebSocket - 체결가 (H0STCNT0) - 호가 (H0STASP0) - VI (H0STVI0) """ WS_URL_REAL = "ws://ops.koreainvestment.com:21000" WS_URL_MOCK = "ws://ops.koreainvestment.com:31000" def __init__(self, kis_client: KISClient): self.kis = kis_client self.ws_url = self.WS_URL_MOCK if kis_client.is_mock else self.WS_URL_REAL self._ws = None self._handlers : Dict[str, Callable] = {} # ticker → callback self._vi_handler: Optional[Callable] = None self._running = False def on_price(self, ticker: str, handler: Callable): """실시간 체결가 핸들러 등록""" self._handlers[ticker] = handler def on_vi(self, handler: Callable): """VI 발동 핸들러 등록""" self._vi_handler = handler async def subscribe(self, tickers: list): """종목 구독 시작""" token = await self.kis.get_access_token() # 접속키 발급 async with aiohttp.ClientSession() as session: resp = await session.post( f"{self.kis.base_url}/oauth2/Approval", json={ "grant_type": "client_credentials", "appkey" : self.kis.app_key, "secretkey" : self.kis.app_secret, } ) key_data = await resp.json() approval_key = key_data.get("approval_key", "") async with aiohttp.ClientSession() as session: async with session.ws_connect(self.ws_url) as ws: self._ws = ws self._running = True logger.info(f"WebSocket 연결 완료: {len(tickers)}종목 구독 시작") # 종목별 구독 등록 for ticker in tickers: for tr_id in ["H0STCNT0", "H0STVI0"]: await ws.send_json({ "header": { "approval_key": approval_key, "custtype" : "P", "tr_type" : "1", # 등록 "content-type": "utf-8", }, "body": { "input": { "tr_id" : tr_id, "tr_key" : ticker, } } }) logger.info("구독 등록 완료") # 메시지 수신 루프 async for msg in ws: if not self._running: break if msg.type == aiohttp.WSMsgType.TEXT: await self._handle_message(msg.data) elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): logger.error("WebSocket 연결 끊김") self._running = False break async def _handle_message(self, raw: str): """WebSocket 메시지 파싱""" try: # KIS WebSocket 메시지 포맷: 헤더|바디 if raw.startswith("{"): # JSON 형식 (시스템 메시지) return parts = raw.split("|") if len(parts) < 4: return tr_id = parts[1] data = parts[3].split("^") if tr_id == "H0STCNT0": # 실시간 체결가 ticker = data[0] price = int(data[2]) volume = int(data[9]) handler = self._handlers.get(ticker) if handler: await handler(ticker, price, volume) elif tr_id == "H0STVI0": # VI 발동/해제 ticker = data[0] vi_status = data[1] # 1=발동, 2=해제 ref_price = int(data[5]) if len(data) > 5 else 0 if self._vi_handler: await self._vi_handler(ticker, vi_status, ref_price) except Exception as e: logger.error(f"WebSocket 메시지 파싱 오류: {e}") async def close(self): self._running = False if self._ws: await self._ws.close() logger.info("WebSocket 연결 종료")