이번 포스트에는 예약 게시글 업로드를 구현하면서 새로 배웠던 내용을 다시 한번 정리해보겠습니다. 예약 게시글 업로드를 구현하기 위해 ThreadPoolTaskScheduler, RabbitMQ를 사용하였고, 각 기술의 특징에 대해서 설명해보겠습니다.
ThreadPoolTaskScheduler
먼저 ThreadPoolTaskScheduler입니다. ThreadPoolTaskScheduler는 Spring Framework에서 제공하는 스케줄링용 스레드 풀 기반의 TaskScheduler 구현체입니다. ThreadPoolTaskScheduler를 사용하기 위해 Bean으로 등록을 해주었습니다.
@Configuration
public class SchedulerConfig {
@Bean
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10);
scheduler.setThreadNamePrefix("ScheduledTask-");
scheduler.setWaitForTasksToCompleteOnShutdown(true);
scheduler.setAwaitTerminationSeconds(30);
return scheduler;
}
}
- setPoolSize(int poolSize) : 동시에 실행 가능한 최대 스케줄링 스레드 수
- 10으로 설정하면 최대 10개의 작업이 동시에 병렬로 실행이 가능합니다.
- 너무 작으면 작업이 밀릴 수 있고, 너무 크면 자원 낭비가 발생합니다.
- setThreadNamePrefix(String prefix) : 생성되는 스레드들의 이름 접두사 설정
- 예를 들어 설정값이 "ScheduledTask-"이면 생성되는 스레드 이름은 ScheduledTask-1, ScheduledTask-2, ... 형식으로 붙습니다.
- setWaitForTasksToCompleteOnShutdown(boolean wait) : 애플리케이션 종료 시 현재 실행 중인 작업이 완료될 때까지 기다릴지 여부
- true로 설정하면 shutdown 시점에 실행 중인 스케줄 작업이 끝날 때까지 대기합니다.
- setAwaitTerminationSeconds(int seconds) : setWaitForTasksToCompleteOnShutdown에 true가 설정되어 있을 경우 경우 작업 종료를 기다릴 최대 시간(초)입니다.
- 설정된 시간 이내에 작업이 완료되지 않으면 강제로 종료됩니다.
다음은 ThreadPoolTaskScheduler을 활용한 서비스 레이어 코드를 살펴보겠습니다.
@Service
@Transactional
class PostService(
private val postRepository: PostRepository,
private val taskScheduler: TaskScheduler
) {
fun register(content: String, timestamp: LocalDateTime?) {
if (timestamp == null) {
postRepository.save(Post(content = content))
} else {
taskScheduler.schedule(
{
postRepository.save(Post(content = content))
},
timestamp!!.toInstant()
)
}
}
}
해당 코드에서는 schedule(Runnable task, Instant startTime) 함수를 호출해서 스케줄러에 등록을 하고 있습니다. 첫 번째 인자에는 Runnable 타입의 작업이 들어가고, 두 번째 인자로는 Instant 타입의 시작 시간이 들어갑니다. 현재 시간이 startTime이 되면 task에 들어가있는 작업이 실행되는 흐름입니다.
이처럼 ThreadPoolTaskScheduler을 사용해서 예약 게시글을 충분히 구현할 수 있지만, 단점이 존재합니다. 바로 스케줄러의 등록된 태스크는 메모리에 저장되기 때문에 휘발성 데이터입니다. 또한 중복 태스크를 필터링할 수 있는 기능을 제공해주지 않아 별도로 구현을 해야한다는 점이 있습니다.
그래서 위 내용을 보완하기 위해 데이터베이스에 태스크를 관리하는 방법이 있습니다.
RabbitMQ
두 번째로는 RabbitMQ입니다. RabbitMQ는 시스템 간 비동기 메시지 전달을 도와주는 메시지 큐 시스템입니다. 여기서 TTL + DLX 조합으로 예약 기능을 구현할 수 있습니다.
TTL (Time To Live)
TTL은 메시지가 큐에 남아 있을 수 있는 최대 생존 시간(밀리초)을 의미합니다. TTL이 지나면 해당 메시지는 만료되고 큐에 DLX(Dead Letter Exchange)가 설정되어 있으면 해당 메시지는 DLX로 전달됩니다.
DLX (Dead Letter Exchange)
메시지가 TTL 만료가 되거나, 큐 용량 초과, 메시지 거절에 해당하면 죽은 메시지(Dead Letter)가 됩니다. 이 때 메시지들은 해당 큐에 설정된 DLX(대체 Exchange)로 전달됩니다. DLX는 새로운 큐로 메시지를 이동시키는 역할을 합니다.
RabbitMQ를 설정하는 코드와 함께 각 Bean에 기능을 설명해보겠습니다.
@Configuration
class RabbitMQConfig {
@Bean
fun actualQueue(): Queue = Queue("actual.queue", true)
@Bean
fun actualExchange(): DirectExchange = DirectExchange("actual.exchange")
@Bean
fun actualBinding(): Binding = BindingBuilder
.bind(actualQueue())
.to(actualExchange())
.with("actual.routing.key")
@Bean
fun delayQueue(): Queue {
val args = mapOf(
"x-dead-letter-exchange" to "actual.exchange",
"x-dead-letter-routing-key" to "actual.routing.key",
)
return Queue("delay.queue", true, false, false, args)
}
@Bean
fun delayExchange(): DirectExchange = DirectExchange("delay.exchange")
@Bean
fun delayBinding(): Binding = BindingBuilder
.bind(delayQueue())
.to(delayExchange())
.with("delay.routing.key")
}
fun actualQueue(): Queue
지연된 메시지를 최종적으로 전달받아 소비자가 처리할 큐입니다. delay.queue에서 TTL이 만료된 메시지는 DLX를 통해 이 큐로 전달됩니다. 첫 번째 인자는 큐의 이름이며, 두 번째 인자인 `durable = true`는 서버가 재시작되어도 큐가 유지되도록 설정합니다.
fun actualExchange(): DirectExchange
실제 소비 큐로 메시지를 전달하기 위한 Exchange입니다. DirectExchange는 라우팅 키가 정확히 일치할 때만 메시지를 전달하며, 프로듀서 또는 DLX가 이 Exchange에 메시지를 보내면, 바인딩된 라우팅 키에 따라 해당 큐로 라우팅됩니다.
fun actualBinding(): Binding
actual.queue 큐를 actual.exchange에 "actual.routing.key"로 연결해주는 역할을 합니다.
fun delayQueue(): Queue
지연 처리를 위해 임시로 메시지를 보관하는 큐입니다. 이 큐는 메시지를 TTL 동안 보관한 뒤, 만료되면 지정된 DLX로 메시지를 전달합니다.
fun delayExchange(): DirectExchange
프로듀서가 지연 메시지를 보낼 때 사용하는 Exchange입니다. delay.queue와 연결되어 있으며, 지정된 라우팅 키를 기준으로 메시지를 delay.queue로 전달합니다.
fun delayBinding(): Binding
delay.queue 큐를 delay.exchange에 "delay.routing.key"로 연결해주는 역할을 합니다.
@Service
@Transactional
class PostService(
private val postRepository: PostRepository,
private val taskScheduler: TaskScheduler
) {
fun register(content: String, timestamp: OffsetDateTime?) {
if (timestamp == null) {
postRepository.save(Post(content = content))
} else {
val delayMillis = Duration.between(LocalDateTime.now(), timestamp.toLocalDateTime())
.toMillis()
val message = MessageBuilder
.withBody(content.toByteArray())
.setExpiration(delayMillis.toString())
.build()
rabbitTemplate.send("delay.exchange", "delay.routing.key", message)
}
}
@RabbitListener(queues = ["actual.queue"])
fun handleScheduledPost(content: String) {
postRepository.save(Post(content = content))
}
}
예약 시간이 존재하면, 현재 시간과의 차이를 계산해 그만큼의 TTL을 설정한 메시지를 RabbitMQ 딜레이 큐로 전송합니다.
메시지는 딜레이 큐에서 설정된 시간이 지나면 actual.queue로 이동하게 되고, 이 큐를 감지하고 있는 @RabbitListener가 메시지를 소비하는 순간, 메시지에 담긴 게시물 내용으로 게시물을 데이터베이스에 저장합니다.
'기술 회고' 카테고리의 다른 글
맞춤 추천 게시글 시스템 구축 - 구현 2편 (2) | 2025.04.19 |
---|---|
맞춤 추천 게시글 시스템 구축 - 구현 1편 (2) | 2025.04.17 |
맞춤 추천 게시글 시스템 구축 - 이론편 (0) | 2025.04.17 |