실시간 웹소켓 리팩토링

This commit is contained in:
2026-02-23 15:37:22 +09:00
parent 276ef09d89
commit c17797061e
8 changed files with 408 additions and 71 deletions

View File

@@ -52,10 +52,16 @@ const subscribers = new Map<string, Set<RealtimeCallback>>();
const subscriberCounts = new Map<string, number>(); // 실제 소켓 구독 요청 여부 추적용
let socket: WebSocket | null = null;
let pingInterval: number | undefined;
let isConnecting = false; // 연결 진행 중 상태 잠금
let reconnectRetryTimer: number | undefined;
let lastAppKeyConflictAt = 0;
let reconnectAttempt = 0;
let manualDisconnectRequested = false;
const MAX_AUTO_RECONNECT_ATTEMPTS = 8;
const RECONNECT_BASE_DELAY_MS = 1_000;
const RECONNECT_MAX_DELAY_MS = 30_000;
const RECONNECT_JITTER_MS = 300;
export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
isConnected: false,
@@ -63,6 +69,9 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
connect: async (options) => {
const forceApprovalRefresh = options?.forceApprovalRefresh ?? false;
manualDisconnectRequested = false;
window.clearTimeout(reconnectRetryTimer);
reconnectRetryTimer = undefined;
const currentSocket = socket;
if (currentSocket?.readyState === WebSocket.CLOSING) {
@@ -70,12 +79,7 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
}
// 1. 이미 연결되어 있거나, 연결 시도 중이면 중복 실행 방지
if (
socket?.readyState === WebSocket.OPEN ||
socket?.readyState === WebSocket.CONNECTING ||
socket?.readyState === WebSocket.CLOSING ||
isConnecting
) {
if (isSocketUnavailableForNewConnect(socket) || isConnecting) {
return;
}
@@ -89,10 +93,7 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
const wsConnection = await getOrFetchWsConnection();
// 비동기 대기 중에 다른 연결이 성사되었는지 다시 확인
if (
socket?.readyState === WebSocket.OPEN ||
socket?.readyState === WebSocket.CONNECTING
) {
if (isSocketOpenOrConnecting(socket)) {
isConnecting = false;
return;
}
@@ -113,6 +114,7 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
if (socket !== ws) return;
set({ isConnected: true, error: null });
reconnectAttempt = 0;
console.log("[KisWebSocket] Connected");
// 재연결 시 기존 구독 복구
@@ -127,23 +129,46 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
}
});
}
// PINGPONG (Keep-alive)
window.clearInterval(pingInterval);
pingInterval = window.setInterval(() => {
if (socket?.readyState === WebSocket.OPEN) {
socket.send("PINGPONG"); // 일부 환경에서는 PINGPONG 텍스트 전송
}
}, 100_000); // 100초 주기
};
ws.onclose = () => {
ws.onclose = (event) => {
if (socket === ws) {
isConnecting = false;
set({ isConnected: false });
console.log("[KisWebSocket] Disconnected");
window.clearInterval(pingInterval);
socket = null;
const hasSubscribers = hasActiveRealtimeSubscribers();
const canAutoReconnect =
!manualDisconnectRequested &&
hasSubscribers &&
reconnectAttempt < MAX_AUTO_RECONNECT_ATTEMPTS;
if (canAutoReconnect) {
reconnectAttempt += 1;
const delayMs = getReconnectDelayMs(reconnectAttempt);
console.warn(
`[KisWebSocket] Disconnected (code=${event.code}, reason=${event.reason || "none"}) -> reconnect in ${delayMs}ms (attempt ${reconnectAttempt}/${MAX_AUTO_RECONNECT_ATTEMPTS})`,
);
window.clearTimeout(reconnectRetryTimer);
reconnectRetryTimer = window.setTimeout(() => {
const refreshApproval = reconnectAttempt % 3 === 0;
void get().reconnect({ refreshApproval });
}, delayMs);
return;
}
if (hasSubscribers && reconnectAttempt >= MAX_AUTO_RECONNECT_ATTEMPTS) {
set({
error:
"실시간 연결이 반복 종료되어 자동 재연결을 중단했습니다. 새로고침 또는 수동 재연결을 시도해 주세요.",
});
}
reconnectAttempt = 0;
console.log(
`[KisWebSocket] Disconnected (code=${event.code}, reason=${event.reason || "none"})`,
);
}
};
@@ -165,7 +190,17 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
// PINGPONG 응답 또는 제어 메시지 처리
if (data.startsWith("{")) {
const control = parseControlMessage(data);
if (control?.rt_cd && control.rt_cd !== "0") {
if (!control) return;
if (control.trId === "PINGPONG") {
// KIS 샘플 구현과 동일하게 원문을 그대로 echo하여 연결 유지를 보조합니다.
if (socket === ws && ws.readyState === WebSocket.OPEN) {
ws.send(data);
}
return;
}
if (control.rtCd && control.rtCd !== "0") {
const errorMessage = buildControlErrorMessage(control);
set({
error: errorMessage,
@@ -173,7 +208,7 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
// KIS 제어 메시지: ALREADY IN USE appkey
// 이전 세션이 닫히기 전에 재연결될 때 간헐적으로 발생합니다.
if (control.msg_cd === "OPSP8996") {
if (control.msgCd === "OPSP8996") {
const now = Date.now();
if (now - lastAppKeyConflictAt > 5_000) {
lastAppKeyConflictAt = now;
@@ -183,6 +218,14 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
}, 1_200);
}
}
// 승인키가 유효하지 않을 때는 승인키 재발급 후 재연결합니다.
if (control.msgCd === "OPSP0011") {
window.clearTimeout(reconnectRetryTimer);
reconnectRetryTimer = window.setTimeout(() => {
void get().reconnect({ refreshApproval: true });
}, 1_200);
}
}
return;
}
@@ -212,6 +255,7 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
reconnect: async (options) => {
const refreshApproval = options?.refreshApproval ?? false;
manualDisconnectRequested = false;
const currentSocket = socket;
get().disconnect();
if (currentSocket && currentSocket.readyState !== WebSocket.CLOSED) {
@@ -223,6 +267,7 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
},
disconnect: () => {
manualDisconnectRequested = true;
const currentSocket = socket;
if (
currentSocket &&
@@ -236,8 +281,9 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
socket = null;
}
set({ isConnected: false });
window.clearInterval(pingInterval);
window.clearTimeout(reconnectRetryTimer);
reconnectRetryTimer = undefined;
reconnectAttempt = 0;
isConnecting = false;
},
@@ -310,9 +356,12 @@ function sendSubscription(
}
interface KisWsControlMessage {
rt_cd?: string;
msg_cd?: string;
trId?: string;
trKey?: string;
rtCd?: string;
msgCd?: string;
msg1?: string;
encrypt?: string;
}
/**
@@ -323,8 +372,32 @@ interface KisWsControlMessage {
*/
function parseControlMessage(rawData: string): KisWsControlMessage | null {
try {
const parsed = JSON.parse(rawData) as KisWsControlMessage;
return parsed && typeof parsed === "object" ? parsed : null;
const parsed = JSON.parse(rawData) as {
header?: {
tr_id?: string;
tr_key?: string;
encrypt?: string;
};
body?: {
rt_cd?: string;
msg_cd?: string;
msg1?: string;
};
rt_cd?: string;
msg_cd?: string;
msg1?: string;
};
if (!parsed || typeof parsed !== "object") return null;
return {
trId: parsed.header?.tr_id,
trKey: parsed.header?.tr_key,
encrypt: parsed.header?.encrypt,
rtCd: parsed.body?.rt_cd ?? parsed.rt_cd,
msgCd: parsed.body?.msg_cd ?? parsed.msg_cd,
msg1: parsed.body?.msg1 ?? parsed.msg1,
};
} catch {
return null;
}
@@ -337,13 +410,67 @@ function parseControlMessage(rawData: string): KisWsControlMessage | null {
* @see features/kis-realtime/stores/kisWebSocketStore.ts ws.onmessage
*/
function buildControlErrorMessage(message: KisWsControlMessage) {
if (message.msg_cd === "OPSP8996") {
if (message.msgCd === "OPSP8996") {
return "실시간 연결이 다른 세션과 충돌해 재연결을 시도합니다.";
}
const detail = [message.msg1, message.msg_cd].filter(Boolean).join(" / ");
const detail = [message.msg1, message.msgCd].filter(Boolean).join(" / ");
return detail ? `실시간 제어 메시지 오류: ${detail}` : "실시간 제어 메시지 오류";
}
/**
* @description 활성화된 웹소켓 구독이 존재하는지 반환합니다.
* @returns 구독 중인 TR/심볼이 1개 이상이면 true
* @see features/kis-realtime/stores/kisWebSocketStore.ts ws.onclose
*/
function hasActiveRealtimeSubscribers() {
for (const count of subscriberCounts.values()) {
if (count > 0) return true;
}
return false;
}
/**
* @description 자동 재연결 시도 횟수에 따라 지수 백오프 지연시간(ms)을 계산합니다.
* @param attempt 1부터 시작하는 재연결 시도 횟수
* @returns 지연시간(ms)
* @see features/kis-realtime/stores/kisWebSocketStore.ts ws.onclose
*/
function getReconnectDelayMs(attempt: number) {
const exponential = RECONNECT_BASE_DELAY_MS * 2 ** Math.max(0, attempt - 1);
const clamped = Math.min(exponential, RECONNECT_MAX_DELAY_MS);
const jitter = Math.floor(Math.random() * RECONNECT_JITTER_MS);
return clamped + jitter;
}
/**
* @description 소켓이 OPEN 또는 CONNECTING 상태인지 검사합니다.
* @param target 검사 대상 소켓
* @returns 연결 유지/진행 상태면 true
* @see features/kis-realtime/stores/kisWebSocketStore.ts connect
*/
function isSocketOpenOrConnecting(target: WebSocket | null) {
if (!target) return false;
return (
target.readyState === WebSocket.OPEN ||
target.readyState === WebSocket.CONNECTING
);
}
/**
* @description 새 연결을 시작하면 안 되는 소켓 상태인지 검사합니다.
* @param target 검사 대상 소켓
* @returns OPEN/CONNECTING/CLOSING 중 하나면 true
* @see features/kis-realtime/stores/kisWebSocketStore.ts connect
*/
function isSocketUnavailableForNewConnect(target: WebSocket | null) {
if (!target) return false;
return (
target.readyState === WebSocket.OPEN ||
target.readyState === WebSocket.CONNECTING ||
target.readyState === WebSocket.CLOSING
);
}
/**
* @description 특정 웹소켓 인스턴스가 완전히 닫힐 때까지 대기합니다.
* @param target 대기할 웹소켓 인스턴스