auto.create.topics.enable 옵션을 사용하면 토픽이 존재하지 않을 경우 자동 생성하게 된다.
Properties prop = new Properties();
prop.put("bootstrap.servers", "localhost:9092");
prop.put("key.serializer", StringSerializer.class.getName());
prop.put("value.serializer", StringSerializer.class.getName());
long now = System.currentTimeMillis();
Producer<String, String> producer = new KafkaProducer<>(prop);
for (int i = 0; i < 10_000; i++) {
int finalI = i;
producer.send(new ProducerRecord<>("test", ("testValue-" + now) + i), (recordMetadata, e) -> {
System.out.println(String.format(
"COMPLETE-%d (partition=%d)",
finalI, recordMetadata.partition()
));
});
}
System.out.println("END");
producer.close();
https://kafka.apache.org/documentation/#producerconfigs
카프카 클러스터 초기 연결을 하는데 사용할 호스트와 포트의 목록
클러스터 노드 중 일부만 입력하더라도 클러스터 내 다른 카프카 노드를 발견하는데 문제는 없지만, 일부 노드에 장애가 발생하였을 경우를 위해서라도 전체에 준하는 목록 입력을 권장
요청을 완료하기 위해 받아야할 ack의 수. 레코드의 전송 보장을 위한 설정 (디폴트: acks=1)
acks=0: ack를 기다리지 않음. 빠른 메시지 처리량을 얻을 수 있지만, 성공 여부를 보장하지 않아 메시지 손실이 발생할 수 있음. 각 레코드의 offset 값은 항상 -1로 세팅되며, 재시도 설정도 적용되지 않음 (성공여부를 알지 못하기 때문)acks=1: 리더가 자신의 로그에는 기록하지만 모든 팔로워의 ack를 기다리지 않음. 리더가 문제가 생겼지만 아직 팔로워에 복제 전이라면 메시지 손실이 발생할 수도 있음acks=all: 리더는 ISR 전체의 ack를 기다림. 하나의 팔로워가 있는 한 데이터 손실이 발생하지 않음을 보장함. 가장 강력한 데이터 무손실을 보장함.프로듀서가 카프카 서버로 데이터를 보내기 위해 잠시 대기할 수 있는 최대 메모리의 크기(byte). 버퍼가 가득찬 상태에서 max.block.ms 시간이 초과하면 예외가 발생
서버로 데이터를 보낼 때 사용할 압축 타입. (디폴트: none) (gzip, snappy, lz4, zstd)
메시지 전송 중 일시적인 오류 발생 시 재시도 횟수