분산시스템이란?

분산환경에서의 데이터 전달 방식에 대해

Posted by Yan on September 17, 2024

분산 시스템

  • 목표를 달성하기 위하여 여러 개의 컴퓨터 리소스를 사용하는 시스템
  • 시스템은 2개 이상의 컴포넌트로 구성되어 있다.
    • 엔터프라이즈 애플리케이션
    • 마이크로 서비스 아키택처 어플리케이션
    • 모놀리틱 아키텍처 어플리케이션 + 검색엔진
  • 네트워크를 사용하여 컴포넌트 간의 기능을 통합한다.

분산 시스템에서 데이터를 전달하는 방법

Remote API를 사용한 데이터 전달 방식

  • 서버-클라 구조
    • 서버에 데이터를 조작(CUD)하는 경우
    • 서버의 데이터를 조회(R)하는 경우
  • 사용자 요청에 즉각 응답하는 API를 주로 사용하는 방식
  • 비교적 간단하게 개발할 수 있다.

Message Queue를 사용한 데이터 전파 방식

  • Publisher - Consumer구조
    • Consumer가 데이터를 조작한다. (CRUD)
    • Message Queue 컴포넌트가 추가된다.
  • 배치작업, 비동기 작업에서 주로 사용한다.
  • 비교적 개발이 복잡해진다.

효율적인 방법은 뭘까?

분산 시스템에서 컴포넌트들을 네트워크로 연결하는 방법

  • 네트워크는 시스템을 연결하는 유일한 수단이 되는데, 네트워크는 신뢰할 수 없는 매체이다.
    • 패킷 손실이 있을 수 있고, 네트워크 지연, 네트워크 다운이 있을 수 있다.
    • 항상 데이터 유실에 대비해야 한다.

데이터 전달을 보장하는 방법론들

1. At-most once delivery (최대 한번만 전달한다)

  • Producer는 메세지를 최대 한 번만 전송한다. (Fire and Forget)
  • Consumer는 메세지를 최대 한 번만 수신한다.
    • 네트워큭 유실되거나, Producer/Consumer에서 발생한 예외가 있으면 메세지가 유실될 수 있다.

2. At-least once delivery (최소 한번 전달한다)

  • Producer는 메세지를 최대 한 번 이상 발송한다.
    • 발송 메세제의 상태를 관리한다.
  • Consumer는 메세지를 최소 한 번 이상 수신한다.
    • ACK 유실로 인한 재발송이 가능하다.

3. Exactly-once-delivery (정확히 한 번 전달)

  • 메세지는 정확하게 한 번만 전송한다.
세 가지 방법론의 장단점 정리
분류 장점 단점
At-most once delivery 개발하기 쉽다. 메세지가 유실될 수 있다.
At-least once delivery Producer는 메세지 발송이 보장되고, 효과 대비 개발이 쉽다. Consumer는 멱등성을 보장해야한다.
Exactly-once-delivery 누락과 중복이 없다. 가장 어려운 개발 난이도, Producer, Consumer에서 모든 상태 관리가 필요하다, MQ에 의존한 개발이 필요하다. MQ추가로 인해 시스템 복잡도가 증가한다.

RDB를 사용하는 애플리케이션에서의 전달 방법

  • 서비스별 데이터베이스 패턴
  • TransactionSynchronizationManager, TransactionSynchronization : 콜백 메소드 호출

서비스별 데이터베이스 패턴 (Database per Service Pattern)

  • 마이크로 서비스 아키텍처 패턴
  • 모던 애플리케이션의 일반적인 형태

주의점

해당 패턴을 구현할 때, @Transactional 어노테이션을 서비스에 붙이고, DB에 저장하는 로직 수행 후, 다른 서비스에 전파를 했는데 만약 그 후에 커밋 전 rollback이 발생한다면 정합성이 맞지 않다. 이를 방지하기 위해 시도해볼 수 있는 방법이 몇 가지 있다.

트랜잭션 Commit 이벤트를 사용하는 방법
  • @TransactionalEventLisnter 어노테이션 사용 : 다른 컴포넌트에 데이터를 전달해야 하는 경우, propagate메소드에 @TransactionalEventLisnter(phase = TransactionPhase.AFTER_COMMIT) 어노테이션을 사용한다.
    • 만약 네트워크 문제로 propagte가 실패하고 DB에 커밋만 되면 어떻게 처리할 것인가? @Retryable어노테이션을 사용하여 재시도 전략을 새운다. 재시도도 실패할 수 있으므로, 재시도 최대 횟수를 정하여 수행한다. @Retryable(maxAttemps = 3, backOff = @BackOff(delay = 100L))와 같이.
    • 만약 재시도도 최대 횟수까지 실패한다면?
마이크로 서비스 아키택처 패턴을 적용해보자!
Transactional Outbox Pattern
  • RDB를 Message Queue로 사용한다. 이벤트나 메세지가 발행되면 RDB에 하나의 트랜잭션으로 같이 저장한다.
  • OLTP에 Event Message를 포한하는 패턴

이때 메세지 큐처럼 사용할 테이블을 설계할 때 중요한 필드

Field Name Data Type Description
event_id BIGINT 이벤트의 순서를 보장
created_at DateTime(3~6) 이벤트 발생 시간
status smallint Ready(0), Done(1)
payload jsonb JSON type의 Message payload

코드 예시

1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public class CreateTaskService implements CreateTaskUserCase {
  
  @Transactional
  public CreateTaskResponse createTask(CreateTaskCommand createTaskCommand) {
    Task task = createTaskCommand.toTask();

    taskRepository.save(task);
    eventRepository.save(CreatTaskEvent.of(task));

    return CreateTaskResponse.of(task);
  }
}
Polling Publisher Pattern

위의 Outbox Pattern으로 RDB에 저장한 이벤트를 어떻게 전파할 것인가?

  • RDB Message Queue Polling & Publishing : DB에 저장된 이벤트를 주기적으로 polling하고, 발행하는 요소를 추가한다.

코드 예시

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Service
public class MessagePublisher {

  @Scheduled(cron = "0/5 * * * * * *")
  @Transactional // 만약 exception으로 인해 rollback이 발생할 경우 대비
  public void publish() {
    LocalDateTime now = LocalDateTime.now();
    eventRepository.findByCreatedAtBefore(now, EventStatus.READY)
                   .stream()
                   .map(event -> restTemplate.execute(event))
                   .map(event -> event.done())
                   .forEach(eventRepository.save());
  }
}
마이크로 서비스 아키텍처 패턴을 적용할 때 장단점
  • 장점
    • REST-API환경에서 At-least-once 구현 가능
  • 단점
    • Polling, Publisher 과정으로 인한 지연처리. 일정 주기마다 publish하기 때문. (또한 exception 발생할 경우 무제한으로 지연될 수 있다.)
    • 데이터 베이스 부하. 이벤트를 저장하기 때문.
    • 데이터베이스에 비례한 처리 속도가 나온다.

Rabbit MQ를 이용한 데이터 전달 방법

Rabbit MQ란?

  • AMQP(Advenced Message Queueing Protocol)을 구현한 메세지 브로커
  • Publish/Subscribe 방식 지원
  • ACK를 메세지 응답 처리 메커니즘으로 처리
    • Producer Confirm : producer의 메세지 발행 확인. CorrelationData가 중간 매개체 역할을 한다.
    • Consumer Ack : consumer의 메세지 확인. Channel을 매개변수로 이용. @RabbitListener어노테이션 사용.
      • MessageListenr인터페이스를 사용하여 consumer ack를 구현할 수 있다.

만약 NAK만 나오고, 처리 안 된 메세지가 queue에 쌓인다면?

DeadLetterQueue로 처리 안 된 메세지를 이동시킨다. 어떤 메세지를 이동시킬까?

  • basic reject, basic NAK로 처리할 수 없는 경우
  • Queue의 메세지가 오래되어 TTL을 넘긴 경우
  • Queue가 가득차서 더 이상 처리할 수 없는 경우

Retry 후 DeadLetter로 이동하는 코드 예시

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Bean
public SimpleRabbitListenerContainerFactory retryContainerFactory(ConnectionFactory connFactory) {

  var containerFactory = new SimpleRabbitListnerContainerFactory();
  containerFactory.setConnectionFactory(connFactory);
  containerFactory.setAdviceChain(
    RetryInterceptorBuilder.stateless()
                           .maxAttempts(3)      // 최대 시도 횟수
                           .backOffOptions(1000, 2, 2000) // retry 간격(1초 단위로 설정)
                           .recoverer(new RejctAndDontRequeueRecoverer()) // 원래 있던 requeue말고, 새 requeue로 넣는다.
                           .build()
  );
  return containerFactory;
}

DeadLetter Queue리스너는 어떻게 생겼나?

1
2
3
4
5
6
7
@RabbitListener(
        queues = "example.member.deadletter"
        containerFactory = "deadLetterContainerFactory"
)
public void onDeadLetterMessage(Map<String, Object> rawEvent) {
  // alert failure or fallbacks
}

Kafka를 사용한 데이터 전달 방법

Producer Confirm

1
2
3
4
5
6
7
8
9
10
11
12
13
@Slf4j
@Component
@RequiredArgsConstructor
public class Producer {

  public void sendEvent(CreateTaskEvent event) {
    ListenableFuture<SendResult<String, CreateTaskEvent>> future =  kafkaTemplate.send(TOPIC_TASK, event);
    future.addCallback(
      result -> log.info("offset: {}", result.getRecordMetadata().offset()),
      throwable -> log.error("fail to publish", throwable)
    ); // 2개의 callback을 넣어줄 수 있다. success용, failure용 2개다.
  }
}

Consumer ACK

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@FunctionalInterface
public interface AcknowledgingMessageListener<K, V> extends MessageListener<K, V> {
  default void onMessage(ConsumerRecord<K, V> data) {
    throw new UnsupportedOperationException("Container should never call this");
  }
  
  void onMessage(ConsumerRecord<K, V> var1, Acknowledgment var2);
}

@Override
@KafkaListener(containerFactory = "kafkaListenerContainerFactory")
public void onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
  try {
    // do something
    acknowledgement.acknowledge(); // consumer ACK이 발송됨
  } catch (Exception e) {
    log.lerror("Error to recieve message", e);
  }
}

결론

분산 시스템에서 데이터를 전달하는 효율적인 방법

  • Event Driven Architecture의 기본은 데이터 전달이다.
  • 최소 At-least-once로 설정해야 한다.
  • Producer Confirm, Consumer ACK를 고려해야 한다.
reference

NHN FORWARD 22 분산시스템에서 데이터를 전달하는 효율적인 방법