개발/java

Spring Boot 에 AWS SQS 적용하기

코딩하는꽃개 2023. 5. 22. 12:39
반응형

Amazon SQS(Simple Queue Service)란?

Amazon SQS(Simple Queue Service)는 아마존 웹 서비스(Amazon Web Services, AWS)의 메시지 대기열 서비스입니다. SQS를 사용하면 분산 시스템 간에 안정적이고 확장 가능한 방식으로 메시지를 전달할 수 있습니다. SQS는 복잡한 메시지 큐 관리를 간소화하여 개발자가 응용 프로그램에 대해 신속하게 메시지 기반 아키텍처를 구현할 수 있도록 도와줍니다.

 

SQS의 장점

  1. 신뢰성: SQS는 메시지를 전달하기 위해 내구성과 복제를 제공합니다. 메시지는 중복되거나 손실되지 않고 안전하게 보관되며, 필요한 경우 재시도 메커니즘을 통해 처리할 수 있습니다.
  2. 확장성: SQS는 많은 수의 메시지를 처리하기 위해 높은 확장성을 제공합니다. 필요에 따라 수신 및 송신 측을 수평으로 확장하여 대기 시간을 최소화하고 대기열 처리량을 향상시킬 수 있습니다.
  3. 간편한 관리: SQS는 메시지 큐에 대한 관리 작업을 간소화합니다. 큐 생성, 큐 크기 조정, 액세스 제어 설정 등을 손쉽게 수행할 수 있습니다.
  4. 다양한 운영 환경 지원: SQS는 여러 운영 환경(예: 서버리스, 마이크로서비스, 분산 시스템)에서 사용할 수 있도록 다양한 클라이언트 라이브러리 및 통합을 제공합니다.

 

 

SQS의 작동 방식

  • 메시지 전송: 메시지를 생성하여 SQS 큐에 보냅니다. 이 메시지는 대기열에 저장되고 대기 상태로 전환됩니다.
  • 메시지 처리: 대기열에 있는 메시지를 처리할 수 있는 수신 측(수신자)이 메시지를 가져와 처리합니다. 메시지는 삭제되거나 보이지 않는 상태로 대기열에서 숨김 처리될 수 있습니다.
  • 메시지 삭제: 메시지 처리가 완료되면 삭제 API를 사용하여 메시지를 명시적으로 삭제하거나, 메시지 처리 중 오류가 발생한 경우 재시도 메커니즘을 사용하여 메시지를 재처리하게 됩니다.

 

SQS의 사용 시나리오

  • SQS는 다양한 시나리오에서 사용될 수 있습니다. 몇 가지 예시는 다음과 같습니다:
    이벤트 드리븐 아키텍처: 마이크로서비스 아키텍처 또는 서버리스 환경에서 이벤트 기반 통신을 위해 SQS를 사용할 수 있습니다.
  • 워크로드 분산: 대량의 작업을 처리하기 위해 여러 작업자(worker) 인스턴스 간에 작업을 분산시키기 위해 SQS를 사용할 수 있습니다.
  • 비동기 처리: 웹 애플리케이션에서 메시지 큐를 사용하여 비동기 처리를 구현할 수 있습니다. 요청 처리 시간을 줄이고 응답 속도를 향상시킬 수 있습니다.

SQS Spring Boot에 적용하기

SQS를 사용하기 위해 AWS 콘솔 또는 AWS SDK를 활용할 수 있습니다. 

우선 AWS SDK의 의존성을 추가해 줍니다.

 

dependencies {
   ... (중략)
   // AWS
   implementation 'org.springframework.cloud:spring-cloud-starter-bootstrap:3.0.3'
   implementation 'org.springframework.cloud:spring-cloud-starter-aws-messaging:2.2.6.RELEASE'
   ... (중략)
}

 

의존성 추가가 완료 되었다면 Gradle 빌드 후 아래의 설정 클래스를 구현해 줍니다.

 

@Configuration
public class SqsConfig {

    @Value("${cloud.aws.credentials.access-key}")
    private String accessKey;

    @Value("${cloud.aws.credentials.secret-key}")
    private String secretKey;

    @Value("${cloud.aws.region.static}")
    private String region;

    @Bean
    @Primary
    public AWSCredentialsProvider awsCredentialsProvider() {
        return new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey));
    }

    @Bean
    public AmazonSQS amazonSQSClient() {
        AmazonSQSClientBuilder builder = AmazonSQSClientBuilder.standard().withCredentials(awsCredentialsProvider());
        builder.withRegion(region);
        return builder.build();
    }

    @Bean
    public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQSAsync) {
        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setAmazonSqs(amazonSQSAsync);
        factory.setMaxNumberOfMessages(10);
        factory.setWaitTimeOut(10);
        factory.setTaskExecutor(messageThreadPoolTaskExecutor());
        return factory;
    }

    @Bean
    public ThreadPoolTaskExecutor messageThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setThreadNamePrefix("sqs-task-");
        taskExecutor.setCorePoolSize(20);
        taskExecutor.setMaxPoolSize(20);
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }

    /**
     * SQS 메세지 변환
     * String 형태의 SQS 메세지를 DTO로 변환시켜 줍니다.
     * ObjectMapper를 사용하며, 필드명을 지정하고 싶은 경우 @JsonProperty 어노테이션을 사용합니다.
     * @param objectMapper 오브젝트 맵퍼
     * @return 맵핑 컨버터
     */
    @Bean
    public MappingJackson2MessageConverter mappingJackson2MessageConverter(ObjectMapper objectMapper) {
        MappingJackson2MessageConverter jackson2MessageConverter = new MappingJackson2MessageConverter();
        jackson2MessageConverter.setObjectMapper(objectMapper);
        return jackson2MessageConverter;
    }
}

 

한번에 받을 메세지 양은 setMaxNumberOfMessages로 설정 가능합니다.

 

하단에 있는 mappingJackson2MessageConvert 빈 클래스의 경우 SQS 메세지를 DTO 클래스로 변환시키기 위해서 선언해 주어야 합니다. 해당 클래스를 구현하지 않을 경우 직접 정의한 DTO를 사용할 수 없습니다.

 

 

설정 클래스 구현이 완료되었다면 이제 SQS의 큐 컨슈머를 구현해 줍니다.

@SqsListener(value = "${sqs.name.s3}", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void s3Consumer(EventRequestDTO<S3Record> message, Acknowledgment acknowledgment) {
    // 큐 제거
    acknowledgment.acknowledge();

    // 큐 딜리버리
    pipelineService.delivery(message);
}

 

컨슈머 구현체 파라미터 중 Acknowledgment가 있는데 이 부분에 대해서는 이 포스트를 참고해 주세요.

 

 

SQS SDK의 Acknowledgment에 대하여

Amazon SQS Acknowledgment의 종류 Amazon SQS(Simple Queue Service)에서 Acknowledgment(확인 응답)은 메시지를 처리한 후 메시지의 상태를 표시하고 해당 메시지를 안전하게 삭제하기 위해 사용됩니다. SQS는 다양

hanainu.tistory.com

 

 

결론

Amazon SQS는 안정적이고 확장 가능한 메시지 대기열 서비스로, 분산 시스템 간에 신뢰성 있게 메시지를 전달할 수 있습니다. 신뢰성, 확장성, 관리 용이성 등의 장점을 통해 메시지 기반 아키텍처를 쉽게 구현할 수 있습니다. SQS를 활용하여 안정적이고 유연한 애플리케이션을 개발해보세요!

 

 

 

반응형