1. Kafka Consumer 개요

    Untitled

    1. 브로커의 메시지를 읽는 역할
    2. 모든 Consumer들은 고유한 그룹아이디 group.id를 가지는 Consumer Group에 소속되어야함
    3. 개별 Consumer Group내에 여러 개의 Consumer 들은 토픽 파티션 별로 분배
  2. Consumer의 subscribe, poll, commit 로직

    Untitled

    1. Consumersubscribe()를 호출하여 읽을 토픽을 등록
    2. Consumerpoll() 메서드를 이용해 주기적으로 브로커의 토픽 파티션에서 메세지를 가져옴
    3. 메시지를 성공적으로 가져온 경우 commit__consumer_offset에 기록해서 다음에 읽을 offset 위치 참조
  3. Consumer의 주요 수행 개요

    1. KafkaConsumerFetcher, ConsumerClientNetwork 등의 주요 내부 객체와 별도의 Heart Beat Thread 생성
    2. Fetch, ConsumerClientNetwork 객체는 Broker의 토픽 파티션에서 메세지를 fetch ****poll 수행
    3. Heart Beat Thread는 Consumer의 정상적인 활동을 Group Coordinator에 보고하는 역할 수행
    4. Group Coordinator는 주어진 시간동안 Heart Beat를 받지 못하면 Consumer들의 Rebalance를 수행 명령
  4. Consumer Client API 처리 로직

    Untitled

    1. Consumer Config 설정(bootstrap.servers, key/value deserializer, group.id 등)
    2. config를 반영하여 Consumer 객체 생성
    3. 읽을 토픽을 subscribe() 를 통해 설정
    4. 지속적으로 poll()을 호출하여 Topic의 메세지 읽음
    5. close()를 통해 객체 반환(중요)
  5. Consumer의 구성 요소와 poll() 메서드

    Consumer 구성

    Consumer 구성

    1. poll(Duration.ofMillis(1000))

      1. 브로커나 Consumer 내부 Queue에 데이터가 있다면 바로 데이터를 반환(시간과 상관없이)
      2. 데이터가 없는 경우 1000ms 동안 데이터 fetch를 브로커에 계속 수행하고 결과 반환
    2. poll() 메서드의 내부 동작

      Untitled

      1. fetcherConsumer 내부의 Linked Queue에서 메세지를 가져옴
      2. Linked Queue에 메세지를 채우는 역할은 ConsumerNetworkClient가 수행
  6. Consumer Fetcher 관련 Config

    Untitled

    1. 기본적으로 FetcherLinked Queue에서 데이터를 가져오되, Linked Queue에 데이터가 없는 경우 ConsumerNetworkClient에서 데이터를 브로커로 부터 가져올 것을 요청

    2. fetch.min.bytes

      1. Fetcher가 읽어들이는 최소 bytes
      2. 브로커는 지정된 fetch.min.bytes 이상의 새로운 메세지가 쌓이기 전까지는 전송을 하지 않음
      3. default = 1
    3. fetch.max.wait.ms

      1. 브로커에 fetch.min.bytes 이상의 메세지가 쌓일 때까지 최대 대기 시간
      2. default = 500ms
    4. fetch.max.bytes

      1. fetcher가 한번에 가져올 수 있는 최대 데이터
      2. default = 500MB
    5. max.partition.fetch.bytes

      1. fetcher가 파티션별 한번에 가져올 수 있는 bytes
      2. default = 1MB
    6. max.poll.records

      1. fetcher가 한번에 가져올 수 있는 레코드 수
      2. default = 500
    7. config 정리

      Untitled

  7. Consumer의 auto.offset.reset

    1. Consumer가 새로 만들어져 특정 topic을 subscribe하는 경우 어떤 offset부터 읽을지 지정(다른 컨슈머 그룹 기준)
    2. 하지만, 동일 컨슈머 그룹에서 새로운 Consumer가 만들어지면 ealiest로 설정하더라도 offset을 0부터 읽지는 않음
    3. 만약, topic이 삭제되는 경우 offset정보가 없으므로 offset이 0부터 설정
    4. 즉, auto.offset.reset__consumer_offset의 정보가 없는 경우 어디부터 읽을지 설정하는 config
    5. 가장 처음부터(ealiest) 또는 가장 마지막부터(latest) 설정 가능