Kafka
Kafka 是一个分布式流处理平台,用于构建实时数据管道和流应用。
核心概念
| 概念 | 说明 |
|---|---|
| Producer | 消息生产者 |
| Consumer | 消息消费者 |
| Broker | Kafka 服务节点 |
| Topic | 消息主题 |
| Partition | 分区,实现并行处理 |
| Consumer Group | 消费者组,实现负载均衡 |
快速开始
pom.xml
xml
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>配置
yaml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer生产者
发送消息
java
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String key, String value) {
kafkaTemplate.send(topic, key, value)
.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("发送成功: " + value);
} else {
System.out.println("发送失败: " + ex.getMessage());
}
});
}
// 发送对象(需要 JSON 序列化)
public void sendObject(String topic, Object obj) {
kafkaTemplate.send(topic, JSON.toJSONString(obj));
}配置优化
yaml
spring:
kafka:
producer:
acks: all # 所有副本确认
retries: 3 # 重试次数
batch-size: 16384 # 批量大小
linger-ms: 5 # 等待时间
buffer-memory: 33554432 # 缓冲区大小
properties:
enable.idempotence: true # 幂等性消费者
消费消息
java
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void consume(String message) {
System.out.println("收到消息: " + message);
}
// 消费对象
@KafkaListener(topics = "my-topic")
public void consumeObject(ConsumerRecord<String, String> record) {
User user = JSON.parseObject(record.value(), User.class);
System.out.println("收到用户: " + user);
}
// 指定分区消费
@KafkaListener(
topicPartitions = @TopicPartition(
topic = "my-topic",
partitions = {"0", "1"}
)
)
public void consumePartition(String message) {
System.out.println("收到消息: " + message);
}配置优化
yaml
spring:
kafka:
consumer:
max-poll-records: 500 # 每次拉取数量
fetch-min-size: 1 # 最小拉取大小
fetch-max-wait-ms: 500 # 最大等待时间
enable-auto-commit: false # 手动提交
properties:
session.timeout.ms: 30000
max.poll.interval.ms: 300000消息顺序
单分区有序
yaml
# 生产者
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 发送时使用相同 key 保证同一分区
kafkaTemplate.send("order-topic", orderId, orderMessage);多分区全局有序
java
// 使用单分区主题
kafkaTemplate.send("single-partition-topic", message);事务
java
@Transactional
public void sendTransactional() {
kafkaTemplate.send("topic1", "message1");
kafkaTemplate.send("topic2", "message2");
// 如果失败则全部回滚
}总结
- 核心概念:Producer、Consumer、Broker、Topic、Partition
- 生产者:发送消息、配置优化
- 消费者:消费消息、手动提交
- 消息顺序:单分区有序、多分区需额外处理
- 事务:Kafka 事务保证 Exactly-Once
