기술 회고

예약 게시글 업로드 1편

Stitchhhh 2025. 5. 30. 01:07

이번 포스트에는 예약 게시글 업로드를 구현하면서 새로 배웠던 내용을 다시 한번 정리해보겠습니다. 예약 게시글 업로드를 구현하기 위해 ThreadPoolTaskScheduler, RabbitMQ를 사용하였고, 각 기술의 특징에 대해서 설명해보겠습니다.

ThreadPoolTaskScheduler

먼저 ThreadPoolTaskScheduler입니다. ThreadPoolTaskScheduler는 Spring Framework에서 제공하는 스케줄링용 스레드 풀 기반의 TaskScheduler 구현체입니다. ThreadPoolTaskScheduler를 사용하기 위해 Bean으로 등록을 해주었습니다.

@Configuration
public class SchedulerConfig {

    @Bean
    public ThreadPoolTaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(10);
        scheduler.setThreadNamePrefix("ScheduledTask-");
        scheduler.setWaitForTasksToCompleteOnShutdown(true);
        scheduler.setAwaitTerminationSeconds(30);
        return scheduler;
    }
}

 

  • setPoolSize(int poolSize) : 동시에 실행 가능한 최대 스케줄링 스레드 수
    • 10으로 설정하면 최대 10개의 작업이 동시에 병렬로 실행이 가능합니다.
    • 너무 작으면 작업이 밀릴 수 있고, 너무 크면 자원 낭비가 발생합니다.
  • setThreadNamePrefix(String prefix) : 생성되는 스레드들의 이름 접두사 설정
    • 예를 들어 설정값이 "ScheduledTask-"이면 생성되는 스레드 이름은 ScheduledTask-1, ScheduledTask-2, ... 형식으로 붙습니다.
  • setWaitForTasksToCompleteOnShutdown(boolean wait) : 애플리케이션 종료 시 현재 실행 중인 작업이 완료될 때까지 기다릴지 여부
    • true로 설정하면 shutdown 시점에 실행 중인 스케줄 작업이 끝날 때까지 대기합니다.
  • setAwaitTerminationSeconds(int seconds) : setWaitForTasksToCompleteOnShutdown에 true가 설정되어 있을 경우 경우 작업 종료를 기다릴 최대 시간(초)입니다.
    • 설정된 시간 이내에 작업이 완료되지 않으면 강제로 종료됩니다.

다음은 ThreadPoolTaskScheduler을 활용한 서비스 레이어 코드를 살펴보겠습니다.

@Service
@Transactional
class PostService(
    private val postRepository: PostRepository,
    private val taskScheduler: TaskScheduler
) {

    fun register(content: String, timestamp: LocalDateTime?) {
    	if (timestamp == null) {
        	postRepository.save(Post(content = content))
        } else {
        	taskScheduler.schedule(
            	{
                    postRepository.save(Post(content = content))
                },
                timestamp!!.toInstant()
            )	
        }
    }
}

 

해당 코드에서는 schedule(Runnable task, Instant startTime) 함수를 호출해서 스케줄러에 등록을 하고 있습니다. 첫 번째 인자에는 Runnable 타입의 작업이 들어가고, 두 번째 인자로는 Instant 타입의 시작 시간이 들어갑니다. 현재 시간이 startTime이 되면 task에 들어가있는 작업이 실행되는 흐름입니다.

 

이처럼 ThreadPoolTaskScheduler을 사용해서 예약 게시글을 충분히 구현할 수 있지만, 단점이 존재합니다. 바로 스케줄러의 등록된 태스크는 메모리에 저장되기 때문에 휘발성 데이터입니다. 또한 중복 태스크를 필터링할 수 있는 기능을 제공해주지 않아 별도로 구현을 해야한다는 점이 있습니다.

 

그래서 위 내용을 보완하기 위해 데이터베이스에 태스크를 관리하는 방법이 있습니다.

RabbitMQ

두 번째로는 RabbitMQ입니다. RabbitMQ는 시스템 간 비동기 메시지 전달을 도와주는 메시지 큐 시스템입니다. 여기서 TTL + DLX 조합으로 예약 기능을 구현할 수 있습니다.

TTL (Time To Live)

TTL은 메시지가 큐에 남아 있을 수 있는 최대 생존 시간(밀리초)을 의미합니다. TTL이 지나면 해당 메시지는 만료되고 큐에 DLX(Dead Letter Exchange)가 설정되어 있으면 해당 메시지는 DLX로 전달됩니다.

DLX (Dead Letter Exchange)

메시지가 TTL 만료가 되거나, 큐 용량 초과, 메시지 거절에 해당하면 죽은 메시지(Dead Letter)가 됩니다. 이 때 메시지들은 해당 큐에 설정된 DLX(대체 Exchange)로 전달됩니다. DLX는 새로운 큐로 메시지를 이동시키는 역할을 합니다.

 

RabbitMQ를 설정하는 코드와 함께 각 Bean에 기능을 설명해보겠습니다.

@Configuration
class RabbitMQConfig {

    @Bean
    fun actualQueue(): Queue = Queue("actual.queue", true)

    @Bean
    fun actualExchange(): DirectExchange = DirectExchange("actual.exchange")

    @Bean
    fun actualBinding(): Binding = BindingBuilder
        .bind(actualQueue())
        .to(actualExchange())
        .with("actual.routing.key")

    @Bean
    fun delayQueue(): Queue {
        val args = mapOf(
            "x-dead-letter-exchange" to "actual.exchange",
            "x-dead-letter-routing-key" to "actual.routing.key",
        )
        return Queue("delay.queue", true, false, false, args)
    }

    @Bean
    fun delayExchange(): DirectExchange = DirectExchange("delay.exchange")

    @Bean
    fun delayBinding(): Binding = BindingBuilder
        .bind(delayQueue())
        .to(delayExchange())
        .with("delay.routing.key")
}

 

fun actualQueue(): Queue

지연된 메시지를 최종적으로 전달받아 소비자가 처리할 큐입니다. delay.queue에서 TTL이 만료된 메시지는 DLX를 통해 이 큐로 전달됩니다. 첫 번째 인자는 큐의 이름이며, 두 번째 인자인 `durable = true`는 서버가 재시작되어도 큐가 유지되도록 설정합니다.

fun actualExchange(): DirectExchange

실제 소비 큐로 메시지를 전달하기 위한 Exchange입니다. DirectExchange는 라우팅 키가 정확히 일치할 때만 메시지를 전달하며, 프로듀서 또는 DLX가 이 Exchange에 메시지를 보내면, 바인딩된 라우팅 키에 따라 해당 큐로 라우팅됩니다.

fun actualBinding(): Binding

actual.queue 큐를 actual.exchange에 "actual.routing.key"로 연결해주는 역할을 합니다.

fun delayQueue(): Queue

지연 처리를 위해 임시로 메시지를 보관하는 큐입니다. 이 큐는 메시지를 TTL 동안 보관한 뒤, 만료되면 지정된 DLX로 메시지를 전달합니다.

fun delayExchange(): DirectExchange

프로듀서가 지연 메시지를 보낼 때 사용하는 Exchange입니다. delay.queue와 연결되어 있으며, 지정된 라우팅 키를 기준으로 메시지를 delay.queue로 전달합니다.

fun delayBinding(): Binding

delay.queue 큐를 delay.exchange에 "delay.routing.key"로 연결해주는 역할을 합니다.

 

@Service
@Transactional
class PostService(
    private val postRepository: PostRepository,
    private val taskScheduler: TaskScheduler
) {

    fun register(content: String, timestamp: OffsetDateTime?) {
    	if (timestamp == null) {
        	postRepository.save(Post(content = content))
        } else {
        	val delayMillis = Duration.between(LocalDateTime.now(), timestamp.toLocalDateTime())
         		.toMillis()

        	val message = MessageBuilder
    			.withBody(content.toByteArray())
        		.setExpiration(delayMillis.toString())
          		.build()

        	rabbitTemplate.send("delay.exchange", "delay.routing.key", message)
        }
    }
    
    @RabbitListener(queues = ["actual.queue"])
    fun handleScheduledPost(content: String) {
        postRepository.save(Post(content = content))
    }
}

 

예약 시간이 존재하면, 현재 시간과의 차이를 계산해 그만큼의 TTL을 설정한 메시지를 RabbitMQ 딜레이 큐로 전송합니다.

 

메시지는 딜레이 큐에서 설정된 시간이 지나면 actual.queue로 이동하게 되고, 이 큐를 감지하고 있는 @RabbitListener가 메시지를 소비하는 순간, 메시지에 담긴 게시물 내용으로 게시물을 데이터베이스에 저장합니다.