실시간 웹소켓 리팩토링
This commit is contained in:
@@ -52,10 +52,16 @@ const subscribers = new Map<string, Set<RealtimeCallback>>();
|
||||
const subscriberCounts = new Map<string, number>(); // 실제 소켓 구독 요청 여부 추적용
|
||||
|
||||
let socket: WebSocket | null = null;
|
||||
let pingInterval: number | undefined;
|
||||
let isConnecting = false; // 연결 진행 중 상태 잠금
|
||||
let reconnectRetryTimer: number | undefined;
|
||||
let lastAppKeyConflictAt = 0;
|
||||
let reconnectAttempt = 0;
|
||||
let manualDisconnectRequested = false;
|
||||
|
||||
const MAX_AUTO_RECONNECT_ATTEMPTS = 8;
|
||||
const RECONNECT_BASE_DELAY_MS = 1_000;
|
||||
const RECONNECT_MAX_DELAY_MS = 30_000;
|
||||
const RECONNECT_JITTER_MS = 300;
|
||||
|
||||
export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
|
||||
isConnected: false,
|
||||
@@ -63,6 +69,9 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
|
||||
|
||||
connect: async (options) => {
|
||||
const forceApprovalRefresh = options?.forceApprovalRefresh ?? false;
|
||||
manualDisconnectRequested = false;
|
||||
window.clearTimeout(reconnectRetryTimer);
|
||||
reconnectRetryTimer = undefined;
|
||||
const currentSocket = socket;
|
||||
|
||||
if (currentSocket?.readyState === WebSocket.CLOSING) {
|
||||
@@ -70,12 +79,7 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
|
||||
}
|
||||
|
||||
// 1. 이미 연결되어 있거나, 연결 시도 중이면 중복 실행 방지
|
||||
if (
|
||||
socket?.readyState === WebSocket.OPEN ||
|
||||
socket?.readyState === WebSocket.CONNECTING ||
|
||||
socket?.readyState === WebSocket.CLOSING ||
|
||||
isConnecting
|
||||
) {
|
||||
if (isSocketUnavailableForNewConnect(socket) || isConnecting) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -89,10 +93,7 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
|
||||
const wsConnection = await getOrFetchWsConnection();
|
||||
|
||||
// 비동기 대기 중에 다른 연결이 성사되었는지 다시 확인
|
||||
if (
|
||||
socket?.readyState === WebSocket.OPEN ||
|
||||
socket?.readyState === WebSocket.CONNECTING
|
||||
) {
|
||||
if (isSocketOpenOrConnecting(socket)) {
|
||||
isConnecting = false;
|
||||
return;
|
||||
}
|
||||
@@ -113,6 +114,7 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
|
||||
if (socket !== ws) return;
|
||||
|
||||
set({ isConnected: true, error: null });
|
||||
reconnectAttempt = 0;
|
||||
console.log("[KisWebSocket] Connected");
|
||||
|
||||
// 재연결 시 기존 구독 복구
|
||||
@@ -127,23 +129,46 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// PINGPONG (Keep-alive)
|
||||
window.clearInterval(pingInterval);
|
||||
pingInterval = window.setInterval(() => {
|
||||
if (socket?.readyState === WebSocket.OPEN) {
|
||||
socket.send("PINGPONG"); // 일부 환경에서는 PINGPONG 텍스트 전송
|
||||
}
|
||||
}, 100_000); // 100초 주기
|
||||
};
|
||||
|
||||
ws.onclose = () => {
|
||||
ws.onclose = (event) => {
|
||||
if (socket === ws) {
|
||||
isConnecting = false;
|
||||
set({ isConnected: false });
|
||||
console.log("[KisWebSocket] Disconnected");
|
||||
window.clearInterval(pingInterval);
|
||||
socket = null;
|
||||
|
||||
const hasSubscribers = hasActiveRealtimeSubscribers();
|
||||
const canAutoReconnect =
|
||||
!manualDisconnectRequested &&
|
||||
hasSubscribers &&
|
||||
reconnectAttempt < MAX_AUTO_RECONNECT_ATTEMPTS;
|
||||
|
||||
if (canAutoReconnect) {
|
||||
reconnectAttempt += 1;
|
||||
const delayMs = getReconnectDelayMs(reconnectAttempt);
|
||||
console.warn(
|
||||
`[KisWebSocket] Disconnected (code=${event.code}, reason=${event.reason || "none"}) -> reconnect in ${delayMs}ms (attempt ${reconnectAttempt}/${MAX_AUTO_RECONNECT_ATTEMPTS})`,
|
||||
);
|
||||
|
||||
window.clearTimeout(reconnectRetryTimer);
|
||||
reconnectRetryTimer = window.setTimeout(() => {
|
||||
const refreshApproval = reconnectAttempt % 3 === 0;
|
||||
void get().reconnect({ refreshApproval });
|
||||
}, delayMs);
|
||||
return;
|
||||
}
|
||||
|
||||
if (hasSubscribers && reconnectAttempt >= MAX_AUTO_RECONNECT_ATTEMPTS) {
|
||||
set({
|
||||
error:
|
||||
"실시간 연결이 반복 종료되어 자동 재연결을 중단했습니다. 새로고침 또는 수동 재연결을 시도해 주세요.",
|
||||
});
|
||||
}
|
||||
|
||||
reconnectAttempt = 0;
|
||||
console.log(
|
||||
`[KisWebSocket] Disconnected (code=${event.code}, reason=${event.reason || "none"})`,
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -165,7 +190,17 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
|
||||
// PINGPONG 응답 또는 제어 메시지 처리
|
||||
if (data.startsWith("{")) {
|
||||
const control = parseControlMessage(data);
|
||||
if (control?.rt_cd && control.rt_cd !== "0") {
|
||||
if (!control) return;
|
||||
|
||||
if (control.trId === "PINGPONG") {
|
||||
// KIS 샘플 구현과 동일하게 원문을 그대로 echo하여 연결 유지를 보조합니다.
|
||||
if (socket === ws && ws.readyState === WebSocket.OPEN) {
|
||||
ws.send(data);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (control.rtCd && control.rtCd !== "0") {
|
||||
const errorMessage = buildControlErrorMessage(control);
|
||||
set({
|
||||
error: errorMessage,
|
||||
@@ -173,7 +208,7 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
|
||||
|
||||
// KIS 제어 메시지: ALREADY IN USE appkey
|
||||
// 이전 세션이 닫히기 전에 재연결될 때 간헐적으로 발생합니다.
|
||||
if (control.msg_cd === "OPSP8996") {
|
||||
if (control.msgCd === "OPSP8996") {
|
||||
const now = Date.now();
|
||||
if (now - lastAppKeyConflictAt > 5_000) {
|
||||
lastAppKeyConflictAt = now;
|
||||
@@ -183,6 +218,14 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
|
||||
}, 1_200);
|
||||
}
|
||||
}
|
||||
|
||||
// 승인키가 유효하지 않을 때는 승인키 재발급 후 재연결합니다.
|
||||
if (control.msgCd === "OPSP0011") {
|
||||
window.clearTimeout(reconnectRetryTimer);
|
||||
reconnectRetryTimer = window.setTimeout(() => {
|
||||
void get().reconnect({ refreshApproval: true });
|
||||
}, 1_200);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
@@ -212,6 +255,7 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
|
||||
|
||||
reconnect: async (options) => {
|
||||
const refreshApproval = options?.refreshApproval ?? false;
|
||||
manualDisconnectRequested = false;
|
||||
const currentSocket = socket;
|
||||
get().disconnect();
|
||||
if (currentSocket && currentSocket.readyState !== WebSocket.CLOSED) {
|
||||
@@ -223,6 +267,7 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
|
||||
},
|
||||
|
||||
disconnect: () => {
|
||||
manualDisconnectRequested = true;
|
||||
const currentSocket = socket;
|
||||
if (
|
||||
currentSocket &&
|
||||
@@ -236,8 +281,9 @@ export const useKisWebSocketStore = create<KisWebSocketState>((set, get) => ({
|
||||
socket = null;
|
||||
}
|
||||
set({ isConnected: false });
|
||||
window.clearInterval(pingInterval);
|
||||
window.clearTimeout(reconnectRetryTimer);
|
||||
reconnectRetryTimer = undefined;
|
||||
reconnectAttempt = 0;
|
||||
isConnecting = false;
|
||||
},
|
||||
|
||||
@@ -310,9 +356,12 @@ function sendSubscription(
|
||||
}
|
||||
|
||||
interface KisWsControlMessage {
|
||||
rt_cd?: string;
|
||||
msg_cd?: string;
|
||||
trId?: string;
|
||||
trKey?: string;
|
||||
rtCd?: string;
|
||||
msgCd?: string;
|
||||
msg1?: string;
|
||||
encrypt?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -323,8 +372,32 @@ interface KisWsControlMessage {
|
||||
*/
|
||||
function parseControlMessage(rawData: string): KisWsControlMessage | null {
|
||||
try {
|
||||
const parsed = JSON.parse(rawData) as KisWsControlMessage;
|
||||
return parsed && typeof parsed === "object" ? parsed : null;
|
||||
const parsed = JSON.parse(rawData) as {
|
||||
header?: {
|
||||
tr_id?: string;
|
||||
tr_key?: string;
|
||||
encrypt?: string;
|
||||
};
|
||||
body?: {
|
||||
rt_cd?: string;
|
||||
msg_cd?: string;
|
||||
msg1?: string;
|
||||
};
|
||||
rt_cd?: string;
|
||||
msg_cd?: string;
|
||||
msg1?: string;
|
||||
};
|
||||
|
||||
if (!parsed || typeof parsed !== "object") return null;
|
||||
|
||||
return {
|
||||
trId: parsed.header?.tr_id,
|
||||
trKey: parsed.header?.tr_key,
|
||||
encrypt: parsed.header?.encrypt,
|
||||
rtCd: parsed.body?.rt_cd ?? parsed.rt_cd,
|
||||
msgCd: parsed.body?.msg_cd ?? parsed.msg_cd,
|
||||
msg1: parsed.body?.msg1 ?? parsed.msg1,
|
||||
};
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
@@ -337,13 +410,67 @@ function parseControlMessage(rawData: string): KisWsControlMessage | null {
|
||||
* @see features/kis-realtime/stores/kisWebSocketStore.ts ws.onmessage
|
||||
*/
|
||||
function buildControlErrorMessage(message: KisWsControlMessage) {
|
||||
if (message.msg_cd === "OPSP8996") {
|
||||
if (message.msgCd === "OPSP8996") {
|
||||
return "실시간 연결이 다른 세션과 충돌해 재연결을 시도합니다.";
|
||||
}
|
||||
const detail = [message.msg1, message.msg_cd].filter(Boolean).join(" / ");
|
||||
const detail = [message.msg1, message.msgCd].filter(Boolean).join(" / ");
|
||||
return detail ? `실시간 제어 메시지 오류: ${detail}` : "실시간 제어 메시지 오류";
|
||||
}
|
||||
|
||||
/**
|
||||
* @description 활성화된 웹소켓 구독이 존재하는지 반환합니다.
|
||||
* @returns 구독 중인 TR/심볼이 1개 이상이면 true
|
||||
* @see features/kis-realtime/stores/kisWebSocketStore.ts ws.onclose
|
||||
*/
|
||||
function hasActiveRealtimeSubscribers() {
|
||||
for (const count of subscriberCounts.values()) {
|
||||
if (count > 0) return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @description 자동 재연결 시도 횟수에 따라 지수 백오프 지연시간(ms)을 계산합니다.
|
||||
* @param attempt 1부터 시작하는 재연결 시도 횟수
|
||||
* @returns 지연시간(ms)
|
||||
* @see features/kis-realtime/stores/kisWebSocketStore.ts ws.onclose
|
||||
*/
|
||||
function getReconnectDelayMs(attempt: number) {
|
||||
const exponential = RECONNECT_BASE_DELAY_MS * 2 ** Math.max(0, attempt - 1);
|
||||
const clamped = Math.min(exponential, RECONNECT_MAX_DELAY_MS);
|
||||
const jitter = Math.floor(Math.random() * RECONNECT_JITTER_MS);
|
||||
return clamped + jitter;
|
||||
}
|
||||
|
||||
/**
|
||||
* @description 소켓이 OPEN 또는 CONNECTING 상태인지 검사합니다.
|
||||
* @param target 검사 대상 소켓
|
||||
* @returns 연결 유지/진행 상태면 true
|
||||
* @see features/kis-realtime/stores/kisWebSocketStore.ts connect
|
||||
*/
|
||||
function isSocketOpenOrConnecting(target: WebSocket | null) {
|
||||
if (!target) return false;
|
||||
return (
|
||||
target.readyState === WebSocket.OPEN ||
|
||||
target.readyState === WebSocket.CONNECTING
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description 새 연결을 시작하면 안 되는 소켓 상태인지 검사합니다.
|
||||
* @param target 검사 대상 소켓
|
||||
* @returns OPEN/CONNECTING/CLOSING 중 하나면 true
|
||||
* @see features/kis-realtime/stores/kisWebSocketStore.ts connect
|
||||
*/
|
||||
function isSocketUnavailableForNewConnect(target: WebSocket | null) {
|
||||
if (!target) return false;
|
||||
return (
|
||||
target.readyState === WebSocket.OPEN ||
|
||||
target.readyState === WebSocket.CONNECTING ||
|
||||
target.readyState === WebSocket.CLOSING
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* @description 특정 웹소켓 인스턴스가 완전히 닫힐 때까지 대기합니다.
|
||||
* @param target 대기할 웹소켓 인스턴스
|
||||
|
||||
Reference in New Issue
Block a user