diff --git a/build.gradle b/build.gradle index 2f8ccad9..d3b74fa6 100644 --- a/build.gradle +++ b/build.gradle @@ -69,12 +69,17 @@ dependencies { // spring security implementation 'org.springframework.boot:spring-boot-starter-security' + //netty for message broker + implementation 'org.springframework.boot:spring-boot-starter-reactor-netty' + + // REST Docs / test asciidoctorExt 'org.springframework.restdocs:spring-restdocs-asciidoctor' testImplementation 'org.springframework.restdocs:spring-restdocs-mockmvc' testImplementation 'org.springframework.restdocs:spring-restdocs-core' testImplementation 'org.springframework.security:spring-security-test' testImplementation 'org.springframework.boot:spring-boot-starter-data-redis' + testImplementation 'org.springframework.boot:spring-boot-starter-data-redis' testImplementation 'org.mockito:mockito-core:4.6.1' // === Testcontainers BOM (버전 중앙관리) === diff --git a/src/asciidoc/api/chat.adoc b/src/asciidoc/api/chat.adoc index 830ae63b..c983bb74 100644 --- a/src/asciidoc/api/chat.adoc +++ b/src/asciidoc/api/chat.adoc @@ -95,12 +95,12 @@ Authorization:Bearer {ACCESS_TOKEN} ---- SUBSCRIBE id:sub-0 -destination:/sub/chat/room/{chatRoomId} +destination:/exchange/amq.topic/chat.room.{chatRoomId} ^@ ---- -서버는 이후 `/sub/chat/room/{chatRoomId}` 로 전달되는 MESSAGE 프레임을 +서버는 이후 `/exchange/amq.topic/chat.room.{chatRoomId}` 로 전달되는 MESSAGE 프레임을 해당 세션으로 push 해야합니다. ''' @@ -135,7 +135,7 @@ content-type:application/json ---- MESSAGE subscription:sub-0 -destination:/sub/chat/room/{chatRoomId} +destination:/exchange/amq.topic/chat.room.{chatRoomId} message-id:msg-15 { @@ -178,6 +178,9 @@ content-type:application/json ''' ==== - SUBSCRIBE frame (유저 개인 큐 구독) +[WARNING] +현재 미구현 상태입니다. + /user/queue 는 특정 유저에게만 전달되는 개인용 메시지를 받기 위한 채널입니다. 클라이언트는 다음과 같이 단순히 구독만 하면 됩니다. @@ -201,6 +204,9 @@ destination:/user/queue/notifications ''' ==== - USER QUEUE MESSAGE frame (개인 메시지) +[WARNING] +현재 미구현 상태입니다. + 서버 → 클라이언트 [source,stomp] diff --git a/src/main/java/com/studypals/global/websocket/StompAuthChannelInterceptor.java b/src/main/java/com/studypals/global/websocket/StompAuthChannelInterceptor.java index 07b18347..7907595f 100644 --- a/src/main/java/com/studypals/global/websocket/StompAuthChannelInterceptor.java +++ b/src/main/java/com/studypals/global/websocket/StompAuthChannelInterceptor.java @@ -5,6 +5,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import org.springframework.beans.factory.annotation.Value; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; @@ -16,7 +17,6 @@ import lombok.RequiredArgsConstructor; import com.studypals.domain.chatManage.dao.ChatRoomMemberRepository; -import com.studypals.domain.memberManage.worker.MemberReader; import com.studypals.global.exceptions.errorCode.AuthErrorCode; import com.studypals.global.exceptions.errorCode.ChatErrorCode; import com.studypals.global.exceptions.exception.AuthException; @@ -46,13 +46,15 @@ public class StompAuthChannelInterceptor implements ChannelInterceptor { private final JwtUtils jwtUtils; private final ChatRoomMemberRepository chatRoomMemberRepository; - private final MemberReader memberReader; private final UserSubscribeInfoRepository userSubscribeInfoRepository; private static final String ACCESS_HEADER = "Authorization"; private final AtomicInteger connectCnt = new AtomicInteger(0); + @Value("${chat.subscribe.address.default}") + private String chatSubscribeAddressDefault; + /** * 메시지가 controller 로 바인딩 되기 전 과정을 수행합니다. 보통 {@code CONNECT, SUBSCRIBE, SEND} 에 대한 * intercept 가 가능합니다. @@ -67,8 +69,11 @@ public Message preSend(Message message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (accessor == null) throw new IllegalArgumentException("not invalid protocol"); - if (accessor.getCommand() == null) throw new IllegalArgumentException("header not exist"); + // heartbeat 통과 + if (accessor.getCommand() == null) { + return message; + } try { switch (accessor.getCommand()) { case CONNECT -> handleConnect(accessor); @@ -123,15 +128,10 @@ private void handleSubscribe(StompHeaderAccessor accessor) { ChatErrorCode.CHAT_SUBSCRIBE_FAIL, "[StompAuthChannelInterceptor#handleSubscribe] destination null"); - if (!destination.startsWith("/sub/chat/room/")) { - return; - } // url 로 부터 구독하고자 하는 방의 id 를 추출 String roomId = extractRoomIdFromDestination(destination); String sessionId = accessor.getSessionId(); - // todo: delete before prod - if (roomId.equals("hello")) return; if (sessionId == null) return; // 방 문자열 구조가 UUID 인지 @@ -165,19 +165,25 @@ private void handleSubscribe(StompHeaderAccessor accessor) { } private void handleUnsubscribe(StompHeaderAccessor accessor) { - String roomId = extractRoomIdFromDestination(accessor.getDestination()); + String destination = accessor.getDestination(); + String roomId = extractRoomIdFromDestination(destination); String sessionId = accessor.getSessionId(); + if (roomId == null || sessionId == null) { + return; + } userSubscribeInfoRepository.deleteMapById(sessionId, roomId); } private String extractRoomIdFromDestination(String destination) { // 예: "/sub/chat/room/{roomId}" 형식에서 {roomId}만 추출 - if (destination != null && destination.contains("/chat/room/")) { - return destination.substring(destination.lastIndexOf("/") + 1); + if (!StringUtils.hasText(destination)) { + return null; } - throw new ChatException( - ChatErrorCode.CHAT_SUBSCRIBE_FAIL, - "[StompAuthChannelInterceptor#handleSubscribe] destination format invalid"); + if (destination.startsWith(chatSubscribeAddressDefault)) { + return destination.substring(chatSubscribeAddressDefault.length()); + } + + return null; } private void validateRoomId(String roomId) { diff --git a/src/main/java/com/studypals/global/websocket/StompRelayProp.java b/src/main/java/com/studypals/global/websocket/StompRelayProp.java new file mode 100644 index 00000000..43ca54e2 --- /dev/null +++ b/src/main/java/com/studypals/global/websocket/StompRelayProp.java @@ -0,0 +1,20 @@ +package com.studypals.global.websocket; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Stomp relay 용 메시지 브로커 연결 설정 + * + * @author jack8 + * @see WebsocketConfig + * @since 2026-01-14 + */ +@ConfigurationProperties(prefix = "stomp.relay") +public record StompRelayProp( + String host, + Integer port, + String username, + String password, + String virtualHost, + Integer systemHeartbeatSendInterval, + Integer systemHeartbeatReceiveInterval) {} diff --git a/src/main/java/com/studypals/global/websocket/WebsocketConfig.java b/src/main/java/com/studypals/global/websocket/WebsocketConfig.java index e167c380..2c9fe4d7 100644 --- a/src/main/java/com/studypals/global/websocket/WebsocketConfig.java +++ b/src/main/java/com/studypals/global/websocket/WebsocketConfig.java @@ -1,5 +1,6 @@ package com.studypals.global.websocket; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.ChannelRegistration; @@ -25,9 +26,11 @@ @Configuration @EnableWebSocketMessageBroker @RequiredArgsConstructor +@EnableConfigurationProperties(StompRelayProp.class) public class WebsocketConfig implements WebSocketMessageBrokerConfigurer { private final StompAuthChannelInterceptor stompAuthChannelInterceptor; + private final StompRelayProp stompProp; @Override public void configureWebSocketTransport(WebSocketTransportRegistration registry) { @@ -41,9 +44,20 @@ public void configureClientInboundChannel(ChannelRegistration registration) { @Override public void configureMessageBroker(MessageBrokerRegistry config) { - config.enableSimpleBroker("/sub", "/queue"); config.setApplicationDestinationPrefixes("/pub", "/req"); config.setUserDestinationPrefix("/user"); + + config.enableStompBrokerRelay("/exchange", "/queue") + .setRelayHost(stompProp.host()) + .setRelayPort(stompProp.port()) + .setClientLogin(stompProp.username()) + .setClientPasscode(stompProp.password()) + .setSystemLogin(stompProp.username()) + .setSystemPasscode(stompProp.password()) + .setVirtualHost(stompProp.virtualHost()) + .setTaskScheduler(taskScheduler()) + .setSystemHeartbeatSendInterval(stompProp.systemHeartbeatSendInterval()) + .setSystemHeartbeatReceiveInterval(stompProp.systemHeartbeatReceiveInterval()); } @Override diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 42dea073..36f87e9e 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -51,6 +51,18 @@ spring.data.mongodb.username=${MONGO_USER} spring.data.mongodb.password=${MONGO_PWD} spring.data.mongodb.authentication-database=${MONGO_AUTH_DB} + +# =============================== +# RabbitMQ (prod) +# =============================== +stomp.relay.host=${RABBITMQ_HOST} +stomp.relay.port=${RABBITMQ_PORT} +stomp.relay.username=${RABBITMQ_USER} +stomp.relay.password=${RABBITMQ_PWD} +stomp.relay.virtual-host=${RABBITMQ_VHOST} +stomp.relay.system-heartbeat-send-interval=10000 +stomp.relay.system-heartbeat-receive-interval=10000 + # =============================== # Server (prod) # =============================== @@ -71,7 +83,8 @@ logging.level.org.springframework.web.servlet.mvc.method.annotation.ExceptionHan # Feature flags / misc # =============================== debug.message.print=true -chat.subscribe.address.default=/sub/chat/room/ +chat.subscribe.address.default=/exchange/amq.topic/chat.room. + # =============================== # File Upload diff --git a/src/test/java/com/studypals/testModules/testSupport/TestEnvironment.java b/src/test/java/com/studypals/testModules/testSupport/TestEnvironment.java index 31a34e75..e32752f3 100644 --- a/src/test/java/com/studypals/testModules/testSupport/TestEnvironment.java +++ b/src/test/java/com/studypals/testModules/testSupport/TestEnvironment.java @@ -37,14 +37,13 @@ public abstract class TestEnvironment { .withPassword("testpassword") .withCommand("--innodb_redo_log_capacity=512M", "--skip-log-bin") .withTmpFs(Collections.singletonMap("/var/lib/mysql", "rw")) - .withLogConsumer(new Slf4jLogConsumer(log).withPrefix("MYSQL")) - .withReuse(true); + .withLogConsumer(new Slf4jLogConsumer(log).withPrefix("MYSQL")); MYSQL.start(); - REDIS = new GenericContainer<>("redis:7.2").withExposedPorts(6379).withReuse(true); + REDIS = new GenericContainer<>("redis:7.2").withExposedPorts(6379); REDIS.start(); - MONGO = new MongoDBContainer("mongo:7.0").withReuse(true); + MONGO = new MongoDBContainer("mongo:7.0"); MONGO.start(); } }