1. Consumer의 subscribe, poll, commit 로직

    Untitled

    Untitled

    중복 읽기 상황(poll 이후 commit을 하지 않은 상태에서  작업 도중 다운)

    중복 읽기 상황(poll 이후 commit을 하지 않은 상태에서 작업 도중 다운)

    읽기 누락 상황(poll 이후 commit을 한 상태에서 작업 도중 다운)

    읽기 누락 상황(poll 이후 commit을 한 상태에서 작업 도중 다운)

    1. Consumersubscribe()를 통해 읽을 토픽을 등록
    2. poll() 메서드를 이용해 주기적으로 브로커의 토픽 파티션에서 메세지를 가져옴
    3. 메세지를 가져왔으면 commit을 통해서 __comsumer_offset에 다음에 읽을 offset 위치를 작성
  2. Consumer의 auto commit

    consumer offset 적용 유형

    consumer offset 적용 유형

    auto.commit.interval.ms를 넘은 경우 다음 poll()에서 commit

    auto.commit.interval.ms를 넘은 경우 다음 poll()에서 commit

    1. Consumerconfigauto.enable.commit=true인 경우 읽은 메세지를 브로커에 바로 commit 적용하지 않음
    2. auto.commit.interval.ms로 설정된 주기(default = 5000)마다 자동으로 commit 수행
    3. Consumer가 읽어온 메세지보다 브로커의 commit이 오래되었으므로 Consumer의 장애/재기동 및 Rebalancing 후 브로커에서 이미 읽어온 메세지를 다시 읽어와서 중복 처리될 수 있음
    4. 중복 처리 참고: https://blog.voidmainvoid.net/262
  3. Consumer의 수동 동기/비동기 commit

    Untitled

    Untitled

    1. Consumer Client는 일정 주기마다 자동으로 commit하지 않고 API를 이용하여 동기 또는 비동기 commit을 적용할 수 있음
    2. 동기와 비동기에서 가장 큰 차이는 commit이 실패했을 때 재시도
    3. Consumer의 Manual 동기/비동기 commit 구현 방법
      1. enable.auto.commit = false
      2. 동기 commitKafkaConsumercommitSync() 호출
      3. 비동기 commitKafkaConsumercommitAsync() 호출
  4. Consumer에서 토픽의 특정 파티션 할당하기

    특정 파티션 읽기

    특정 파티션 읽기

    특정 파티션 특정 offset 읽기

    특정 파티션 특정 offset 읽기

    1. Consumer에게 여러 개의 파티션이 있는 토픽에서 특정 파티션만 할당 가능

    2. 배치 처리시 특정 key레벨 파티션을 특정 Consumer에 할당하여 처리할 경우 사용

    3. KafkaConsumer assign() 메서드에 TopicPartition 객체로 특정 파티션을 인자로 입력하여 할당

      TopicPartition topicPartition = new TopicPartition({topicName}, 0);
      
      kafkaConsumer.assign(Arrays.asList(topicPartition));
      
    4. 특정 메세지가 누락되었을 경우 해당 메세지를 다시 읽어 오기 위해 유지 보수 차원에서 일반적으로 사용

    5. TopicPartition 객체로 할당할 특정 파티션을 설정하고 seek() 메서드로 읽어올 offset 설정(처리 보다는 확인 용도)

      TopicPartition topicPartition = new TopicPartition({topicName}, 1);
      
      kafkaConsumer.assign(Arrays.asList(topicPartitions));
      
      kafkaConsumer.seek(topicPartition, 6L);
      
      // 만약 __consumer_offset의 offset이 6보다 작은 경우 처음부터 읽음