Skip to content

Kafka

Kafka 是一个分布式流处理平台,用于构建实时数据管道和流应用。

核心概念

概念说明
Producer消息生产者
Consumer消息消费者
BrokerKafka 服务节点
Topic消息主题
Partition分区,实现并行处理
Consumer Group消费者组,实现负载均衡

快速开始

pom.xml

xml
<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.6.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

配置

yaml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

生产者

发送消息

java
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String topic, String key, String value) {
    kafkaTemplate.send(topic, key, value)
        .whenComplete((result, ex) -> {
            if (ex == null) {
                System.out.println("发送成功: " + value);
            } else {
                System.out.println("发送失败: " + ex.getMessage());
            }
        });
}

// 发送对象(需要 JSON 序列化)
public void sendObject(String topic, Object obj) {
    kafkaTemplate.send(topic, JSON.toJSONString(obj));
}

配置优化

yaml
spring:
  kafka:
    producer:
      acks: all                    # 所有副本确认
      retries: 3                   # 重试次数
      batch-size: 16384           # 批量大小
      linger-ms: 5                 # 等待时间
      buffer-memory: 33554432      # 缓冲区大小
      properties:
        enable.idempotence: true   # 幂等性

消费者

消费消息

java
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void consume(String message) {
    System.out.println("收到消息: " + message);
}

// 消费对象
@KafkaListener(topics = "my-topic")
public void consumeObject(ConsumerRecord<String, String> record) {
    User user = JSON.parseObject(record.value(), User.class);
    System.out.println("收到用户: " + user);
}

// 指定分区消费
@KafkaListener(
    topicPartitions = @TopicPartition(
        topic = "my-topic",
        partitions = {"0", "1"}
    )
)
public void consumePartition(String message) {
    System.out.println("收到消息: " + message);
}

配置优化

yaml
spring:
  kafka:
    consumer:
      max-poll-records: 500          # 每次拉取数量
      fetch-min-size: 1             # 最小拉取大小
      fetch-max-wait-ms: 500        # 最大等待时间
      enable-auto-commit: false     # 手动提交
      properties:
        session.timeout.ms: 30000
        max.poll.interval.ms: 300000

消息顺序

单分区有序

yaml
# 生产者
spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer

# 发送时使用相同 key 保证同一分区
kafkaTemplate.send("order-topic", orderId, orderMessage);

多分区全局有序

java
// 使用单分区主题
kafkaTemplate.send("single-partition-topic", message);

事务

java
@Transactional
public void sendTransactional() {
    kafkaTemplate.send("topic1", "message1");
    kafkaTemplate.send("topic2", "message2");
    // 如果失败则全部回滚
}

总结

  1. 核心概念:Producer、Consumer、Broker、Topic、Partition
  2. 生产者:发送消息、配置优化
  3. 消费者:消费消息、手动提交
  4. 消息顺序:单分区有序、多分区需额外处理
  5. 事务:Kafka 事务保证 Exactly-Once

基于 VitePress 构建