Files
auto-trade/features/kis-realtime/stores/kisWebSocketStore.ts
2026-02-13 16:41:10 +09:00

442 lines
15 KiB
TypeScript

import { create } from "zustand";
import { useKisRuntimeStore } from "@/features/settings/store/use-kis-runtime-store";
import { buildKisRealtimeMessage } from "@/features/kis-realtime/utils/websocketUtils";
/**
* @file features/kis-realtime/stores/kisWebSocketStore.ts
* @description KIS 실시간 웹소켓 연결을 전역에서 하나로 관리하는 스토어입니다.
* 중복 연결을 방지하고, 여러 컴포넌트에서 동일한 데이터를 구독할 때 효율적으로 처리합니다.
*/
type RealtimeCallback = (data: string) => void;
interface KisWebSocketState {
isConnected: boolean;
error: string | null;
/**
* 웹소켓 연결을 수립합니다.
* 이미 연결되어 있거나 연결 중이면 무시합니다.
*/
connect: (options?: { forceApprovalRefresh?: boolean }) => Promise<void>;
/**
* 웹소켓 연결을 강제로 재시작합니다.
* 필요 시 승인키를 새로 발급받아 재연결합니다.
*/
reconnect: (options?: { refreshApproval?: boolean }) => Promise<void>;
/**
* 웹소켓 연결을 종료합니다.
* 모든 구독이 해제됩니다.
*/
disconnect: () => void;
/**
* 특정 TR ID와 종목 코드로 실시간 데이터를 구독합니다.
* @param trId 거래 ID (예: H0STCNT0)
* @param symbol 종목 코드 (예: 005930)
* @param callback 데이터 수신 시 실행할 콜백 함수
* @returns 구독 해제 함수 (useEffect cleanup에서 호출하세요)
*/
subscribe: (
trId: string,
symbol: string,
callback: RealtimeCallback,
) => () => void;
}
// 구독자 목록 관리 (Key: "TR_ID|SYMBOL", Value: Set<Callback>)
// 스토어 외부 변수로 관리하여 불필요한 리렌더링을 방지합니다.
const subscribers = new Map<string, Set<RealtimeCallback>>();
const subscriberCounts = new Map<string, number>(); // 실제 소켓 구독 요청 여부 추적용
let socket: WebSocket | null = null;
let pingInterval: number | undefined;
let isConnecting = false; // 연결 진행 중 상태 잠금
let reconnectRetryTimer: number | undefined;
let lastAppKeyConflictAt = 0;
export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
isConnected: false,
error: null,
connect: async (options) => {
const forceApprovalRefresh = options?.forceApprovalRefresh ?? false;
const currentSocket = socket;
if (currentSocket?.readyState === WebSocket.CLOSING) {
await waitForSocketClose(currentSocket);
}
// 1. 이미 연결되어 있거나, 연결 시도 중이면 중복 실행 방지
if (
socket?.readyState === WebSocket.OPEN ||
socket?.readyState === WebSocket.CONNECTING ||
socket?.readyState === WebSocket.CLOSING ||
isConnecting
) {
return;
}
try {
isConnecting = true;
const { getOrFetchWsConnection, clearWsConnectionCache } =
useKisRuntimeStore.getState();
if (forceApprovalRefresh) {
clearWsConnectionCache();
}
const wsConnection = await getOrFetchWsConnection();
// 비동기 대기 중에 다른 연결이 성사되었는지 다시 확인
if (
socket?.readyState === WebSocket.OPEN ||
socket?.readyState === WebSocket.CONNECTING
) {
isConnecting = false;
return;
}
if (!wsConnection) {
throw new Error("웹소켓 접속 키 발급에 실패했습니다.");
}
// 소켓 생성
// socket 변수에 할당하기 전에 로컬 변수로 제어하여 이벤트 핸들러 클로저 문제 방지
const ws = new WebSocket(`${wsConnection.wsUrl}/tryitout`);
socket = ws;
ws.onopen = () => {
isConnecting = false;
// socket 변수가 다른 인스턴스로 바뀌었을 가능성은 낮지만(락 때문),
// 안전을 위해 이벤트 발생 주체인 ws를 사용 또는 현재 socket 확인
if (socket !== ws) return;
set({ isConnected: true, error: null });
console.log("[KisWebSocket] Connected");
// 재연결 시 기존 구독 복구
const approvalKey = wsConnection.approvalKey;
if (approvalKey) {
subscriberCounts.forEach((_, key) => {
const [trId, symbol] = key.split("|");
// OPEN 상태일 때만 전송
if (ws.readyState === WebSocket.OPEN) {
sendSubscription(ws, approvalKey, trId, symbol, "1"); // 구독
}
});
}
// PINGPONG (Keep-alive)
window.clearInterval(pingInterval);
pingInterval = window.setInterval(() => {
if (socket?.readyState === WebSocket.OPEN) {
socket.send("PINGPONG"); // 일부 환경에서는 PINGPONG 텍스트 전송
}
}, 100_000); // 100초 주기
};
ws.onclose = () => {
if (socket === ws) {
isConnecting = false;
set({ isConnected: false });
console.log("[KisWebSocket] Disconnected");
window.clearInterval(pingInterval);
socket = null;
}
};
ws.onerror = (event) => {
if (socket === ws) {
isConnecting = false;
console.error("[KisWebSocket] Error", event);
set({
isConnected: false,
error: "웹소켓 연결 중 오류가 발생했습니다.",
});
}
};
ws.onmessage = (event) => {
const data = event.data;
if (typeof data !== "string") return;
// PINGPONG 응답 또는 제어 메시지 처리
if (data.startsWith("{")) {
const control = parseControlMessage(data);
if (control?.rt_cd && control.rt_cd !== "0") {
const errorMessage = buildControlErrorMessage(control);
set({
error: errorMessage,
});
// KIS 제어 메시지: ALREADY IN USE appkey
// 이전 세션이 닫히기 전에 재연결될 때 간헐적으로 발생합니다.
if (control.msg_cd === "OPSP8996") {
const now = Date.now();
if (now - lastAppKeyConflictAt > 5_000) {
lastAppKeyConflictAt = now;
window.clearTimeout(reconnectRetryTimer);
reconnectRetryTimer = window.setTimeout(() => {
void get().reconnect({ refreshApproval: false });
}, 1_200);
}
}
}
return;
}
if (data[0] === "0" || data[0] === "1") {
// 데이터 포맷: 0|TR_ID|KEY|...
const parts = data.split("|");
if (parts.length >= 4) {
const trId = parts[1];
const body = parts[3];
const values = body.split("^");
const symbol = values[0] ?? "";
// UI 흐름: 소켓 수신 -> TR/심볼 정규화 매칭 -> 해당 구독 콜백 실행 -> 훅 파서(parseKisRealtime*) -> 화면 반영
dispatchRealtimeMessageToSubscribers(trId, symbol, data);
}
}
};
} catch (err) {
isConnecting = false;
set({
isConnected: false,
error: err instanceof Error ? err.message : "연결 실패",
});
}
},
reconnect: async (options) => {
const refreshApproval = options?.refreshApproval ?? false;
const currentSocket = socket;
get().disconnect();
if (currentSocket && currentSocket.readyState !== WebSocket.CLOSED) {
await waitForSocketClose(currentSocket);
}
await get().connect({
forceApprovalRefresh: refreshApproval,
});
},
disconnect: () => {
const currentSocket = socket;
if (
currentSocket &&
(currentSocket.readyState === WebSocket.OPEN ||
currentSocket.readyState === WebSocket.CONNECTING ||
currentSocket.readyState === WebSocket.CLOSING)
) {
currentSocket.close();
}
if (currentSocket?.readyState === WebSocket.CLOSED && socket === currentSocket) {
socket = null;
}
set({ isConnected: false });
window.clearInterval(pingInterval);
window.clearTimeout(reconnectRetryTimer);
isConnecting = false;
},
subscribe: (trId, symbol, callback) => {
const key = `${trId}|${symbol}`;
// 1. 구독자 목록에 추가
if (!subscribers.has(key)) {
subscribers.set(key, new Set());
}
subscribers.get(key)!.add(callback);
// 2. 소켓 서버에 구독 요청 (첫 번째 구독자인 경우)
const currentCount = subscriberCounts.get(key) || 0;
if (currentCount === 0) {
const { wsApprovalKey } = useKisRuntimeStore.getState();
if (socket?.readyState === WebSocket.OPEN && wsApprovalKey) {
sendSubscription(socket, wsApprovalKey, trId, symbol, "1"); // "1": 등록
}
}
subscriberCounts.set(key, currentCount + 1);
// **연결이 안 되어 있으면 연결 시도**
if (!socket || socket.readyState !== WebSocket.OPEN) {
get().connect();
}
// 3. 구독 해제 함수 반환
return () => {
const callbacks = subscribers.get(key);
if (callbacks) {
callbacks.delete(callback);
if (callbacks.size === 0) {
subscribers.delete(key);
}
}
const count = subscriberCounts.get(key) || 0;
if (count > 0) {
subscriberCounts.set(key, count - 1);
if (count - 1 === 0) {
// 마지막 구독자가 사라지면 소켓 구독 해제
const { wsApprovalKey } = useKisRuntimeStore.getState();
if (socket?.readyState === WebSocket.OPEN && wsApprovalKey) {
sendSubscription(socket, wsApprovalKey, trId, symbol, "2"); // "2": 해제
}
}
}
};
},
}));
// 헬퍼: 구독/해제 메시지 전송
function sendSubscription(
ws: WebSocket,
appKey: string,
trId: string,
symbol: string,
trType: "1" | "2",
) {
try {
const msg = buildKisRealtimeMessage(appKey, symbol, trId, trType);
ws.send(JSON.stringify(msg));
console.debug(
`[KisWebSocket] ${trType === "1" ? "Sub" : "Unsub"} ${trId} ${symbol}`,
);
} catch (e) {
console.warn("[KisWebSocket] Send error", e);
}
}
interface KisWsControlMessage {
rt_cd?: string;
msg_cd?: string;
msg1?: string;
}
/**
* @description 웹소켓 제어 메시지(JSON)를 파싱합니다.
* @param rawData 원본 메시지 문자열
* @returns 파싱된 제어 메시지 또는 null
* @see features/kis-realtime/stores/kisWebSocketStore.ts ws.onmessage
*/
function parseControlMessage(rawData: string): KisWsControlMessage | null {
try {
const parsed = JSON.parse(rawData) as KisWsControlMessage;
return parsed && typeof parsed === "object" ? parsed : null;
} catch {
return null;
}
}
/**
* @description KIS 웹소켓 제어 오류를 사용자용 짧은 문구로 변환합니다.
* @param message KIS 제어 메시지
* @returns 표시용 오류 문자열
* @see features/kis-realtime/stores/kisWebSocketStore.ts ws.onmessage
*/
function buildControlErrorMessage(message: KisWsControlMessage) {
if (message.msg_cd === "OPSP8996") {
return "실시간 연결이 다른 세션과 충돌해 재연결을 시도합니다.";
}
const detail = [message.msg1, message.msg_cd].filter(Boolean).join(" / ");
return detail ? `실시간 제어 메시지 오류: ${detail}` : "실시간 제어 메시지 오류";
}
/**
* @description 특정 웹소켓 인스턴스가 완전히 닫힐 때까지 대기합니다.
* @param target 대기할 웹소켓 인스턴스
* @param timeoutMs 최대 대기 시간(ms)
* @returns close/error/timeout 중 먼저 완료되면 resolve
* @see features/kis-realtime/stores/kisWebSocketStore.ts connect/reconnect
*/
function waitForSocketClose(target: WebSocket, timeoutMs = 2_000) {
if (target.readyState === WebSocket.CLOSED) {
return Promise.resolve();
}
return new Promise<void>((resolve) => {
let settled = false;
const onClose = () => finish();
const onError = () => finish();
const timeoutId = window.setTimeout(() => finish(), timeoutMs);
const finish = () => {
if (settled) return;
settled = true;
window.clearTimeout(timeoutId);
target.removeEventListener("close", onClose);
target.removeEventListener("error", onError);
resolve();
};
target.addEventListener("close", onClose);
target.addEventListener("error", onError);
});
}
/**
* @description 실시간 데이터(TR/종목코드)와 등록된 구독자를 매칭해 콜백을 실행합니다.
* 종목코드 접두(prefix) 차이(A005930/J005930 등)와 구독 심볼 형식 차이를 허용합니다.
* @param trId 수신 TR ID
* @param rawSymbol 수신 데이터의 원본 종목코드
* @param payload 웹소켓 원문 메시지
* @see features/trade/hooks/useTradeTickSubscription.ts 체결 구독 콜백
* @see features/trade/hooks/useOrderbookSubscription.ts 호가 구독 콜백
*/
function dispatchRealtimeMessageToSubscribers(
trId: string,
rawSymbol: string,
payload: string,
) {
const callbackSet = new Set<RealtimeCallback>();
const normalizedIncomingSymbol = normalizeRealtimeSymbol(rawSymbol);
// 1) 정확히 일치하는 key 우선
const exactKey = `${trId}|${rawSymbol}`;
subscribers.get(exactKey)?.forEach((callback) => callbackSet.add(callback));
// 2) 숫자 6자리 기준(정규화)으로 일치하는 key 매칭
subscribers.forEach((callbacks, key) => {
const [subscribedTrId, subscribedSymbol = ""] = key.split("|");
if (subscribedTrId !== trId) return;
if (!normalizedIncomingSymbol) return;
const normalizedSubscribedSymbol = normalizeRealtimeSymbol(subscribedSymbol);
if (!normalizedSubscribedSymbol) return;
if (normalizedIncomingSymbol !== normalizedSubscribedSymbol) return;
callbacks.forEach((callback) => callbackSet.add(callback));
});
// 3) 심볼 매칭이 실패한 경우에도 같은 TR 전체 콜백으로 안전 fallback
if (callbackSet.size === 0) {
subscribers.forEach((callbacks, key) => {
const [subscribedTrId] = key.split("|");
if (subscribedTrId !== trId) return;
callbacks.forEach((callback) => callbackSet.add(callback));
});
}
callbackSet.forEach((callback) => callback(payload));
}
/**
* @description 실시간 종목코드를 비교 가능한 6자리 숫자 코드로 정규화합니다.
* @param value 원본 종목코드 (예: 005930, A005930)
* @returns 정규화된 6자리 코드. 파싱 불가 시 원본 trim 값 반환
* @see features/kis-realtime/stores/kisWebSocketStore.ts dispatchRealtimeMessageToSubscribers
*/
function normalizeRealtimeSymbol(value: string) {
const trimmed = value.trim();
if (!trimmed) return "";
const digits = trimmed.replace(/\D/g, "");
if (digits.length >= 6) {
return digits.slice(-6);
}
return trimmed;
}