토픽 자동생성 옵션

auto.create.topics.enable 옵션을 사용하면 토픽이 존재하지 않을 경우 자동 생성하게 된다.

프로듀서 예제

Properties prop = new Properties();
prop.put("bootstrap.servers", "localhost:9092");
prop.put("key.serializer", StringSerializer.class.getName());
prop.put("value.serializer", StringSerializer.class.getName());
 
long now = System.currentTimeMillis();
Producer<String, String> producer = new KafkaProducer<>(prop);
 
for (int i = 0; i < 10_000; i++) {
    int finalI = i;
    producer.send(new ProducerRecord<>("test", ("testValue-" + now) + i), (recordMetadata, e) -> {
        System.out.println(String.format(
                "COMPLETE-%d (partition=%d)",
                finalI, recordMetadata.partition()
        ));
    });
}
 
System.out.println("END");
producer.close();

프로듀서 주요 옵션

https://kafka.apache.org/documentation/#producerconfigs

bootstrap.servers

카프카 클러스터 초기 연결을 하는데 사용할 호스트와 포트의 목록

클러스터 노드 중 일부만 입력하더라도 클러스터 내 다른 카프카 노드를 발견하는데 문제는 없지만, 일부 노드에 장애가 발생하였을 경우를 위해서라도 전체에 준하는 목록 입력을 권장

acks

요청을 완료하기 위해 받아야할 ack의 수. 레코드의 전송 보장을 위한 설정 (디폴트: acks=1)

buffer.memory

프로듀서가 카프카 서버로 데이터를 보내기 위해 잠시 대기할 수 있는 최대 메모리의 크기(byte). 버퍼가 가득찬 상태에서 max.block.ms 시간이 초과하면 예외가 발생

compression.type

서버로 데이터를 보낼 때 사용할 압축 타입. (디폴트: none) (gzip, snappy, lz4, zstd)

retries

메시지 전송 중 일시적인 오류 발생 시 재시도 횟수