目 录CONTENT

文章目录

Kafka—5.Kafka生产者关键配置

路口、下车
2026-01-20 / 0 评论 / 0 点赞 / 6 阅读 / 0 字
温馨提示:
本文最后更新于2026-01-20,若内容或图片失效,请留言反馈。 部分素材来自网络,若不小心影响到您的利益,请联系我们删除。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.itlaoqi.ex00200</groupId>
    <artifactId>ex00200</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <!-- Kafka客户端库,用于与Kafka集群进行通信 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.9.0</version>
        </dependency>

        <!-- Logback日志框架,用于记录应用程序的日志 -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.10</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.13.0</version>
        </dependency>
    </dependencies>
</project>
public class Order {
    private String orderId;
    private long timestamp;
    private String product;
    private int quantity;
    private double price;

    // 无参构造函数
    public Order() {}

    // 全参构造函数
    public Order(String orderId, long timestamp, String product, int quantity, double price) {
        this.orderId = orderId;
        this.timestamp = timestamp;
        this.product = product;
        this.quantity = quantity;
        this.price = price;
    }

    // Getter 和 Setter 方法
    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    public String getProduct() {
        return product;
    }

    public void setProduct(String product) {
        this.product = product;
    }

    public int getQuantity() {
        return quantity;
    }

    public void setQuantity(int quantity) {
        this.quantity = quantity;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }

    // toString 方法,方便打印对象信息
    @Override
    public String toString() {
        return "Order{" +
                "orderId='" + orderId + '\'' +
                ", timestamp=" + timestamp +
                ", product='" + product + '\'' +
                ", quantity=" + quantity +
                ", price=" + price +
                '}';
    }
}

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class OrderProducer {
    private static final Logger log = LoggerFactory.getLogger(OrderProducer.class);

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Properties kaProperties = new Properties();
        //它用于指定初始连接的服务器地址列表。
        kaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.230:9092"); // 指定Kafka集群地址
        kaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 设置键序列化类
        kaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); // 设置值序列化类
        /*
         acks配置掌控着生产者消息发送的确认策略。
         acks = 0时,生产者发完即走,速度最快但可能丢消息;
         acks = 1,会等主副本确认,有一定可靠性;
         acks = -1/all则需所有同步副本确认,可靠性高但性能损耗大。
         */
        kaProperties.put(ProducerConfig.ACKS_CONFIG, "all"); // 配置消息确认模式为所有副本确认
        kaProperties.put(ProducerConfig.RETRIES_CONFIG, "3"); // 设置消息发送失败重试次数
        /*
        MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION是 Kafka 生产者配置参数。
        它用于限制每个连接中可以同时发送但尚未收到响应的请求数量。
        如果设置为 1,能保证消息按顺序写入分区。
        在启用幂等性或事务时,这个值强制设为 1,有助于确保消息的顺序和事务的一致性。
        如果大于 1,可能出现乱序写入,但能提升吞吐量。
         */
        kaProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); // 限制每个连接的未完成请求数量

        try (Producer<String, Order> producer = new KafkaProducer<>(kaProperties)) {
            Order order = new Order("orders", System.currentTimeMillis(), "Laptop", 1, 999.99);
            /*
            ProducerRecord 是 Kafka 生产者发送消息的重要组件。
            它用于指定消息发送的目标主题,这决定了消息的基本流向。
            可以设定分区,能精准控制消息存储分区,若不指定,Kafka 会按策略分配。
            其中的键能够用于分区分配,确保相关消息在同一分区。
            时间戳记录消息产生时间,方便后续时间相关处理。
             */
            ProducerRecord<String, Order> producerRecord = new ProducerRecord<>("orders", null, order);
            // 发送ProducerRecord对象,并同步获取发送结果
            RecordMetadata result = producer.send(producerRecord).get();
            log.info("offset = {}, topic = {}, timestamp = {}", result.offset(), result.topic(), result.timestamp());
        } catch (Exception e) {
            log.error("Error sending message", e);
        }
    }


    public static class JsonSerializer implements org.apache.kafka.common.serialization.Serializer<Order> {
        private final ObjectMapper objectMapper = new ObjectMapper();

        @Override
        public byte[] serialize(String topic, Order data) {
            try {
                return objectMapper.writeValueAsBytes(data);
            } catch (Exception e) {
                log.error("Error serializing JSON message", e);
                throw new RuntimeException("Error serializing JSON message", e);
            }
        }
    }
}

0

评论区