1. Acks = 0 에서의 Producer

    Untitled

    1. producer는 Leader broker가 메시지를 받았는지에 대한 Ack 메시지를 받지 않고 다음 메시지를 전송
    2. 전송 실패/성공 여부에 대해 받지 않기 때문에 기록되지 않더라도 재처리가 없음
    3. 메시지 손실은 있지만 빠르게 전송 가능
  2. Acks = 1에서의 Producer

    Untitled

    1. producer는 Leader broker가 메시지 A를 정상적으로 받았는지 Ack 메시지를 받은 후 다음 메시지를 전송
    2. 메시지가 모든 follwer broker에 저장되었는지 확인하지는 않고 전송(Leader broker만 확인)
    3. 만약 Leader broker에서 Follwer broker로 메시지 복제 중에 다운될 경우 Follwer broker가 Leader broker가 되면서 복제되지 못한 메시지는 소실될 수 있음
  3. Acks = all에서의 Producer

    Untitled

    1. producer는 Leader broker가 메시지를 받은 뒤 Follwer broker로 복제를 모두 마친 뒤에 Acks 메시지를 받은 후 다음 메시지를 처리
    2. 즉, 메시지가 Follwer broker까지 안전하게 복사가 되었는지 확인 후 다음 메시지 전송
    3. 메시지가 소실되는 경우는 작지만 Follwer broker까지의 저장을 기다리므로 오래 걸릴 수 있음
    4. min.insync.replicas
      1. 최소 broker개수를 지정
      2. 2를 지정하고 Follwer broker들이 모두 다운된 경우 현재 사용가능한 브로커는 1대인데, 이때 설정값보다 작으므로 에외 발생
  4. producer의 Sync와 Callback Async에서의 acks와 retry

    1. callback 기반의 async에서도 동일하게 acks설정에 기반하여 retry동작
    2. callback 기반의 async에서는 retry에 따라 producer의 기존 메시지 전송 순서와 기록되는 메시지 순서는 달라질 수 있음
    3. sync방식에서 acks = 0일 경우 전송 후 ack/error를 기다리지 않음(fire and forget)
  5. Producer의 메시지 배치 전송의 이해

    Untitled

    1. **send()**메서드를 통해 ProducerRecord를 입력
    2. 하지만, 바로 전송되지 않고 내부 메모리에서 단일 메시지를 토픽 파티션에 따라 Record Batch 단위로 묶은 뒤 전송
    3. 메시지들은 Producer Client의 내부 메모리에 여러 개의 Batch들로 buffer.memory 설정 사이즈만큼 보관될 수 있음
    4. 또한, 여러 개의 Batch들로 한꺼번에 전송될 수 있음
  6. Record Accumulator

    Untitled

    1. Record AccumulatorPartitioner에 의해서 메시지 배치가 전송이 될 토픽과 Partition에 따라 저장되는 KafkaProducer 메모리 영역

    2. Sender ThreadRecord Accumulator에 누적된 메시지 배치를 꺼내서 브로커로 전송

    3. KafkaProducerMain Threadsend()메서드를 호출하고 Record Accumulator에 데이터 저장하고 Sender Thread는 별개로 데이터를 브로커로 전송

    4. linger.ms

      Untitled

      1. Sender Thread로 메시지를 보내기 전 대기 시간
      2. 만약 Batch가 다 차지 않더라도 보낼 때 최대 대기 시간을 지정할 수 있음
      3. 일정 시간 대기를 통해 Record Batch에 메시지를 보다 많이 채울 수 있도록
      4. Producer, Broker간의 전송이 빠른 경우 0으로 해도 무방(전송 응답이 빠른데 굳이 기다릴 필요 없음)
      5. 보통 20ms 이하로 설정 권장
    5. Sender Thread

      Untitled

      1. 기본적으로 전송할 준비가 되어 있으면 Record Accumulator를 읽어서 전송
      2. 전송할 때 1개 또는 여러개의 Batch를 가져갈 수 있음
      3. 또한 특정 Batch가 다 채워지지 않아도 전송 가능
      4. max.inflight.requests.per.connection을 통해 각 파티션별로 전송할 때 최대 Batch 개수 설정
  7. Producer의 동기와 비동기에서의 Batch

    1. 기본적으로 send()메서드는 비동기 전송하며 Batch 단위 전송
    2. Callback 기반의 비동기 전송은 RecordMetadata를 Client가 받을 수 있는 방식 제공
    3. Callback 기반의 비동기 전송은 여러 개의 메시지가 Batch로 만들어짐
    4. 동기 전송인 get()에서는 배치에 메시지가 1개(전송 성공에 대해 응답을 받을 때까지 Batch를 채우지 않기 때문)
  8. Producer의 메시지 전송/재전송 시간 파라미터 이해

    Untitled

    1. max.block.ms

      1. send()호출하여 Record Accumulator에 append를 시도하려고 할 때 Record Accumulator가 가득찬 경우
      2. Record Accumulator에 append하기 위해 기다리는 시간
      3. 시간을 초과한 경우 예외를 던짐
    2. request.timeout.ms

      1. Sender Thread가 브로커로 전송할 때 브로커가 에러 또는 응답이 없는 경우 기다리는 시간
      2. 시간 초과할 경우 예외를 던짐
    3. retry.backoff.ms

      1. Sender Thread가 응답이 없어 request.timeout.ms만큼 기다리고 재전송 전 기다리는 시간
    4. delivery.timeout.ms

      1. Sender Thread가 전송 실패시 retry를 수행하는 최대 시간
      2. 즉, Producer 메시지 전송에 허용된 최대 시간
      3. delivery.timeout.ms >= linger.ms + request.timeout.ms 또는 delivery.timeout.ms >= linger.ms + retry.backoff.ms + request.timeout.ms 지켜져야함
    5. retries(재전송 횟수)

      Untitled

      1. retries는 매우 큰 값을 기본값으로 가짐(int 횟수)
      2. 하지만, delivery.timeout.ms를 초과하게 되면 전송 실패 처리를 하기 때문에 retries 설정된 만큼 재전송하는 경우는 없음
  9. max.in.flight.requests.per.connection 이해

    Untitled

    1. 브로커 서버의 응답 없이 Producer의 sender Thread가 한번에 보낼 수 있는 메시지 배치의 개수

    2. default = 5, Kafka Producer의 메시지 전송 단위는 Batch

    3. 비동기 전송 시 브로커의 응답없이 한꺼번에 보낼 수 있는 Batch의 개수는 max.in.flight.requests.per.connection에 따름

    4. 배치 전송 시 발생하는 상황

      Untitled

      1. 위 상황에서 B0, B1 배치가 전송되었을 때 B1은 저장되었는데, B0는 저장 실패
      2. Producer는 B0를 재전송하여 성공적으로 기록
      3. 이런 경우 기대한 저장 순서와 달라질 수 있다는 점
  10. 최대 한번 전송, 적어도 한번 전송, 정확히 한번 전송

    1. 최대 한번 전송(ack = 0)

      Untitled

      1. Producer는 브로커로부터 Ack 또는 에러 메세지에 상관하지 않고 다음 메세지 처리
      2. 메세지가 소실될 수는 있지만 중복 전송은 일어나지 않음
    2. 적어도 한번 전송(ack = 1, all)

      Untitled

      1. Producer는 브로커로 부터 Ack를 받은 다음에 다음 메세지 전송
      2. 메시지 소실은 없지만 중복 전송을 할 수 있음
    3. 중복 없이 전송(idempotence)

      Untitled

      1. Producer는 브로커로 부터 Ack를 받은 다음에 다음 메세지를 전송하되, Producer ID와 메시지 Seq를 Header에 넣어 전송

      2. 메시지 Seq는 메시지의 고유 Seq번호, 0…순차적 증가, Producer ID는 매번 새롭게 작성

      3. 브로커에서 메시지 Seq가 중복되는 경우 → 메세지 로그에 기록하지 않고 Ack만 전송

      4. 브로커는 Producer가 보낸 메세지의 Seq가 브로커가 가지고 있는 메세지의 Seq보다 1만큼 큰 경우에만 브로커에 저장

      5. Producer의 idempotence 설정

        1. enable.idempotence = true
        2. acks = all
        3. retries는 0보다 큰 값
        4. max.in.flight.requests.per.connection은 1 ~ 5 사이(최대 5까지 설정 가능)
        5. 성능은 감소할 수 있지만 사용을 권장
      6. idempotence 기반에서 메시지 전송 순서 유지

        Untitled

        1. B0, B1, B2 순으로 Producer에서 생성된 메시지 배치
        2. idempotence 기반에서 max.in.flight.requests.per.connection 만큼 여러 개의 배치들이 Broker로 전송
        3. Broker는 메세지 배치 처리시 write된 배치의 마지막 메세지 Seq + 1 이 아닌 배치 메세지가 올 경우 OutOfOrderSequenceException을 생성하여 Producer에 전달
        4. 예제는 B0는 저장에 성공했지만 B1이 저장에 실패했을 때 Seq + 1이 B1이므로 B2에 대해서도 저장하지 않고 에러를 던짐
        5. 이후 B1, B2를 함께 retry
  11. idempotence를 위한 Producer 설정

    1. enable.idempotence = true
    2. acks = all
    3. retries는 0보다 큰 값
    4. max.in.flight.requests.per.connection은 1 ~ 5 사이의 값
    5. 유의 사항
      1. kafka 3.0버전부터는 Producer의 기본 설정이 idempotence = true
      2. 하지만 기본 설정중에 enable.idempotence=true를 제외하고 다른 파라미터를 잘못 설정하면 전송을 되지만 idempotence가 제대로 동작하지 않음
      3. 명시적으로 enable.idempotence=true를 설정한 뒤 다른 파라미터들을 잘못 설정하면 config오류가 발생하면서 Producer가 기동되지 않음
  12. Custom 파티셔너를 통해 특정 Partition 설정하기

    DefaultPartitioner의 partition 메서드 key값 유무에 따라 전략 결정

    DefaultPartitioner의 partition 메서드 key값 유무에 따라 전략 결정

    1. KafkaProducer는 기본적으로 DefaultPartitioner 클래스를 이용해서 메세지 전송 시 도착할 Partition을 지정

    2. DefaultPartitioner는 키를 가지는 메시지의 경우 키 값을 Hashing하여 키 값에 따라 파티션 별로 균일하게 전송

    3. 커스텀 작성하는 방법

      partitioner 인터페이스

      partitioner 인터페이스

      1. DefaultPartitioner가 구현하고 있는 partitioner 인터페이스를 구현
      2. partition()메서드에 로직을 직접 구현해서 사용