目 录CONTENT

文章目录

Kafka—4.Java程序接入Kakfa

路口、下车
2026-01-20 / 0 评论 / 0 点赞 / 5 阅读 / 0 字
温馨提示:
本文最后更新于2026-01-20,若内容或图片失效,请留言反馈。 部分素材来自网络,若不小心影响到您的利益,请联系我们删除。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.itlaoqi</groupId>
  <artifactId>ex00100</artifactId>
  <version>1.0-SNAPSHOT</version>

  <properties>
    <maven.compiler.source>17</maven.compiler.source>
    <maven.compiler.target>17</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  <dependencies>
    <!-- Kafka客户端库,用于与Kafka集群进行通信 -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.9.0</version>
    </dependency>

    <!-- Logback日志框架,用于记录应用程序的日志 -->
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>1.2.10</version>
    </dependency>
  </dependencies>
</project>

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class HelloWorldProducer {

    public static void main(String[] args) {

        // 创建Properties对象以存储Kafka生产者的配置属性
        Properties kaProperties = new Properties();

        // 设置Kafka服务器的地址
        kaProperties.put("bootstrap.servers", "192.168.31.230:9092");

        // 设置键的序列化器为字符串类型
        kaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 设置值的序列化器为字符串类型
        kaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建Kafka生产者并使用之前设置的配置属性
        try (Producer<String, String> producer = new KafkaProducer<>(kaProperties)) {

            // 创建一个ProducerRecord对象,指定主题和消息内容
            ProducerRecord<String, String> producerRecord =
                    new ProducerRecord<>("kinaction_helloworld", "hello world again!");

            // 发送消息
            producer.send(producerRecord);

        }

    }
}

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.List;
import java.util.Properties;

public class HelloWorldConsumer {

    final static Logger log = LoggerFactory.getLogger(HelloWorldConsumer.class);

    private volatile boolean keepConsuming = true;

    public static void main(String[] args) {
        // 创建Properties对象来存储Kafka消费者的配置属性
        Properties kaProperties = new Properties();  //<1>

        // 设置Kafka服务器的地址
        kaProperties.put("bootstrap.servers", "192.168.31.230:9092");


        // 设置消费者组的ID,同一组内的消费者会竞争消费分区
        kaProperties.put("group.id", "kinaction_helloconsumer");

        // 启用自动提交偏移量,表示消费者会自动向Kafka报告它读取到的最后一条消息的位置
        kaProperties.put("enable.auto.commit", "true");

        // 设置自动提交偏移量的时间间隔,这里设置为每1000毫秒提交一次
        kaProperties.put("auto.commit.interval.ms", "1000");

        // 设置键的反序列化器为字符串类型
        kaProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 设置值的反序列化器为字符串类型
        kaProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");


        // 创建HelloWorldConsumer实例
        HelloWorldConsumer helloWorldConsumer = new HelloWorldConsumer();

        // 使用配置属性开始消费Kafka主题的消息
        helloWorldConsumer.consume(kaProperties);

        // 添加关闭钩子,当JVM关闭时,调用HelloWorldConsumer的shutdown方法进行清理
        Runtime.getRuntime().addShutdownHook(new Thread(helloWorldConsumer::shutdown));
    }

    private void consume(Properties kaProperties) {
        // 创建Kafka消费者并使用之前设置的配置属性
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kaProperties)) {

            // 订阅指定的主题
            consumer.subscribe(List.of("kinaction_helloworld"));  //<2>

            // 当继续消费标志为真时循环执行
            while (keepConsuming) {

                // 拉取消息,最长等待250毫秒
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(250));  //<3>

                // 遍历接收到的消息记录
                for (ConsumerRecord<String, String> record : records) {   //<4>

                    // 打印消息的偏移量和值
                    log.info("kinaction_info offset = {}, kinaction_value = {}", record.offset(), record.value());
                }
            }
        }

    }

    private void shutdown() {
        keepConsuming = false;
    }
}

0

评论区