Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,17 @@ dependencies {
// spring security
implementation 'org.springframework.boot:spring-boot-starter-security'

//netty for message broker
implementation 'org.springframework.boot:spring-boot-starter-reactor-netty'


// REST Docs / test
asciidoctorExt 'org.springframework.restdocs:spring-restdocs-asciidoctor'
testImplementation 'org.springframework.restdocs:spring-restdocs-mockmvc'
testImplementation 'org.springframework.restdocs:spring-restdocs-core'
testImplementation 'org.springframework.security:spring-security-test'
testImplementation 'org.springframework.boot:spring-boot-starter-data-redis'
testImplementation 'org.springframework.boot:spring-boot-starter-data-redis'
testImplementation 'org.mockito:mockito-core:4.6.1'

// === Testcontainers BOM (버전 중앙관리) ===
Expand Down
12 changes: 9 additions & 3 deletions src/asciidoc/api/chat.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ Authorization:Bearer {ACCESS_TOKEN}
----
SUBSCRIBE
id:sub-0
destination:/sub/chat/room/{chatRoomId}
destination:/exchange/amq.topic/chat.room.{chatRoomId}

^@
----

서버는 이후 `/sub/chat/room/{chatRoomId}` 로 전달되는 MESSAGE 프레임을
서버는 이후 `/exchange/amq.topic/chat.room.{chatRoomId}` 로 전달되는 MESSAGE 프레임을
해당 세션으로 push 해야합니다.

'''
Expand Down Expand Up @@ -135,7 +135,7 @@ content-type:application/json
----
MESSAGE
subscription:sub-0
destination:/sub/chat/room/{chatRoomId}
destination:/exchange/amq.topic/chat.room.{chatRoomId}
message-id:msg-15

{
Expand Down Expand Up @@ -178,6 +178,9 @@ content-type:application/json
'''
==== - SUBSCRIBE frame (유저 개인 큐 구독)

[WARNING]
현재 미구현 상태입니다.

/user/queue 는 특정 유저에게만 전달되는 개인용 메시지를 받기 위한 채널입니다.
클라이언트는 다음과 같이 단순히 구독만 하면 됩니다.

Expand All @@ -201,6 +204,9 @@ destination:/user/queue/notifications
'''
==== - USER QUEUE MESSAGE frame (개인 메시지)

[WARNING]
현재 미구현 상태입니다.

서버 → 클라이언트

[source,stomp]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
Expand All @@ -16,7 +17,6 @@
import lombok.RequiredArgsConstructor;

import com.studypals.domain.chatManage.dao.ChatRoomMemberRepository;
import com.studypals.domain.memberManage.worker.MemberReader;
import com.studypals.global.exceptions.errorCode.AuthErrorCode;
import com.studypals.global.exceptions.errorCode.ChatErrorCode;
import com.studypals.global.exceptions.exception.AuthException;
Expand Down Expand Up @@ -46,13 +46,15 @@ public class StompAuthChannelInterceptor implements ChannelInterceptor {

private final JwtUtils jwtUtils;
private final ChatRoomMemberRepository chatRoomMemberRepository;
private final MemberReader memberReader;
private final UserSubscribeInfoRepository userSubscribeInfoRepository;

private static final String ACCESS_HEADER = "Authorization";

private final AtomicInteger connectCnt = new AtomicInteger(0);

@Value("${chat.subscribe.address.default}")
private String chatSubscribeAddressDefault;

/**
* 메시지가 controller 로 바인딩 되기 전 과정을 수행합니다. 보통 {@code CONNECT, SUBSCRIBE, SEND} 에 대한
* intercept 가 가능합니다.
Expand All @@ -67,8 +69,11 @@ public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);

if (accessor == null) throw new IllegalArgumentException("not invalid protocol");
if (accessor.getCommand() == null) throw new IllegalArgumentException("header not exist");

// heartbeat 통과
if (accessor.getCommand() == null) {
return message;
}
try {
switch (accessor.getCommand()) {
case CONNECT -> handleConnect(accessor);
Expand Down Expand Up @@ -123,15 +128,10 @@ private void handleSubscribe(StompHeaderAccessor accessor) {
ChatErrorCode.CHAT_SUBSCRIBE_FAIL,
"[StompAuthChannelInterceptor#handleSubscribe] destination null");

if (!destination.startsWith("/sub/chat/room/")) {
return;
}
// url 로 부터 구독하고자 하는 방의 id 를 추출
String roomId = extractRoomIdFromDestination(destination);
String sessionId = accessor.getSessionId();

// todo: delete before prod
if (roomId.equals("hello")) return;
if (sessionId == null) return;

// 방 문자열 구조가 UUID 인지
Expand Down Expand Up @@ -165,19 +165,25 @@ private void handleSubscribe(StompHeaderAccessor accessor) {
}

private void handleUnsubscribe(StompHeaderAccessor accessor) {
String roomId = extractRoomIdFromDestination(accessor.getDestination());
String destination = accessor.getDestination();
String roomId = extractRoomIdFromDestination(destination);
String sessionId = accessor.getSessionId();
if (roomId == null || sessionId == null) {
return;
}
userSubscribeInfoRepository.deleteMapById(sessionId, roomId);
}

private String extractRoomIdFromDestination(String destination) {
// 예: "/sub/chat/room/{roomId}" 형식에서 {roomId}만 추출
if (destination != null && destination.contains("/chat/room/")) {
return destination.substring(destination.lastIndexOf("/") + 1);
if (!StringUtils.hasText(destination)) {
return null;
}
throw new ChatException(
ChatErrorCode.CHAT_SUBSCRIBE_FAIL,
"[StompAuthChannelInterceptor#handleSubscribe] destination format invalid");
if (destination.startsWith(chatSubscribeAddressDefault)) {
return destination.substring(chatSubscribeAddressDefault.length());
}

return null;
}

private void validateRoomId(String roomId) {
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/com/studypals/global/websocket/StompRelayProp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.studypals.global.websocket;

import org.springframework.boot.context.properties.ConfigurationProperties;

/**
* Stomp relay 용 메시지 브로커 연결 설정
*
* @author jack8
* @see WebsocketConfig
* @since 2026-01-14
*/
@ConfigurationProperties(prefix = "stomp.relay")
public record StompRelayProp(
String host,
Integer port,
String username,
String password,
String virtualHost,
Integer systemHeartbeatSendInterval,
Integer systemHeartbeatReceiveInterval) {}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.studypals.global.websocket;

import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
Expand All @@ -25,9 +26,11 @@
@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
@EnableConfigurationProperties(StompRelayProp.class)
public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {

private final StompAuthChannelInterceptor stompAuthChannelInterceptor;
private final StompRelayProp stompProp;

@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
Expand All @@ -41,9 +44,20 @@ public void configureClientInboundChannel(ChannelRegistration registration) {

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/sub", "/queue");
config.setApplicationDestinationPrefixes("/pub", "/req");
config.setUserDestinationPrefix("/user");

config.enableStompBrokerRelay("/exchange", "/queue")
.setRelayHost(stompProp.host())
.setRelayPort(stompProp.port())
.setClientLogin(stompProp.username())
.setClientPasscode(stompProp.password())
.setSystemLogin(stompProp.username())
.setSystemPasscode(stompProp.password())
.setVirtualHost(stompProp.virtualHost())
.setTaskScheduler(taskScheduler())
.setSystemHeartbeatSendInterval(stompProp.systemHeartbeatSendInterval())
.setSystemHeartbeatReceiveInterval(stompProp.systemHeartbeatReceiveInterval());
}

@Override
Expand Down
15 changes: 14 additions & 1 deletion src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ spring.data.mongodb.username=${MONGO_USER}
spring.data.mongodb.password=${MONGO_PWD}
spring.data.mongodb.authentication-database=${MONGO_AUTH_DB}


# ===============================
# RabbitMQ (prod)
# ===============================
stomp.relay.host=${RABBITMQ_HOST}
stomp.relay.port=${RABBITMQ_PORT}
stomp.relay.username=${RABBITMQ_USER}
stomp.relay.password=${RABBITMQ_PWD}
stomp.relay.virtual-host=${RABBITMQ_VHOST}
stomp.relay.system-heartbeat-send-interval=10000
stomp.relay.system-heartbeat-receive-interval=10000

# ===============================
# Server (prod)
# ===============================
Expand All @@ -71,7 +83,8 @@ logging.level.org.springframework.web.servlet.mvc.method.annotation.ExceptionHan
# Feature flags / misc
# ===============================
debug.message.print=true
chat.subscribe.address.default=/sub/chat/room/
chat.subscribe.address.default=/exchange/amq.topic/chat.room.


# ===============================
# File Upload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,13 @@ public abstract class TestEnvironment {
.withPassword("testpassword")
.withCommand("--innodb_redo_log_capacity=512M", "--skip-log-bin")
.withTmpFs(Collections.singletonMap("/var/lib/mysql", "rw"))
.withLogConsumer(new Slf4jLogConsumer(log).withPrefix("MYSQL"))
.withReuse(true);
.withLogConsumer(new Slf4jLogConsumer(log).withPrefix("MYSQL"));
MYSQL.start();

REDIS = new GenericContainer<>("redis:7.2").withExposedPorts(6379).withReuse(true);
REDIS = new GenericContainer<>("redis:7.2").withExposedPorts(6379);
REDIS.start();

MONGO = new MongoDBContainer("mongo:7.0").withReuse(true);
MONGO = new MongoDBContainer("mongo:7.0");
MONGO.start();
}
}
Expand Down