Consumer의 subscribe, poll, commit 로직
중복 읽기 상황(poll 이후 commit을 하지 않은 상태에서 작업 도중 다운)
읽기 누락 상황(poll 이후 commit을 한 상태에서 작업 도중 다운)
subscribe()
를 통해 읽을 토픽을 등록poll()
메서드를 이용해 주기적으로 브로커의 토픽 파티션에서 메세지를 가져옴__comsumer_offset
에 다음에 읽을 offset 위치를 작성Consumer의 auto commit
consumer offset 적용 유형
auto.commit.interval.ms를 넘은 경우 다음 poll()에서 commit
auto.enable.commit=true
인 경우 읽은 메세지를 브로커에 바로 commit 적용하지 않음auto.commit.interval.ms
로 설정된 주기(default = 5000
)마다 자동으로 commit 수행Consumer의 수동 동기/비동기 commit
enable.auto.commit = false
commitSync()
호출commitAsync()
호출Consumer에서 토픽의 특정 파티션 할당하기
특정 파티션 읽기
특정 파티션 특정 offset 읽기
Consumer에게 여러 개의 파티션이 있는 토픽에서 특정 파티션만 할당 가능
배치 처리시 특정 key레벨 파티션을 특정 Consumer에 할당하여 처리할 경우 사용
KafkaConsumer assign()
메서드에 TopicPartition 객체로 특정 파티션을 인자로 입력하여 할당
TopicPartition topicPartition = new TopicPartition({topicName}, 0);
kafkaConsumer.assign(Arrays.asList(topicPartition));
특정 메세지가 누락되었을 경우 해당 메세지를 다시 읽어 오기 위해 유지 보수 차원에서 일반적으로 사용
TopicPartition 객체로 할당할 특정 파티션을 설정하고 seek()
메서드로 읽어올 offset 설정(처리 보다는 확인 용도)
TopicPartition topicPartition = new TopicPartition({topicName}, 1);
kafkaConsumer.assign(Arrays.asList(topicPartitions));
kafkaConsumer.seek(topicPartition, 6L);
// 만약 __consumer_offset의 offset이 6보다 작은 경우 처음부터 읽음