网站首页 > 文章精选 正文
1 写入方式
producer 采用推(push)模式将消息发布到 broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)
1. 顺序写磁盘的优势
减少寻道时间:磁盘顺序写避免了频繁的磁头移动,减少了寻道时间,提升了效率。
预读机制:操作系统会预读连续数据,减少 I/O 操作次数,提高吞吐率。
批处理:Kafka 将多个消息打包写入磁盘,减少了 I/O 操作次数,进一步提升了效率。
2. 随机写内存的劣势
内存碎片:频繁的随机写会导致内存碎片,降低内存利用率。
缓存失效:随机写可能导致缓存频繁失效,增加缓存 miss 率,影响性能。
锁竞争:多线程随机写可能引发锁竞争,增加系统开销。
2 分区(Partition)
消息发送时都被发送到一个 topic,其本质就是一个目录,每个 Topic 被划分为多个 Partition,每个 Partition 是一个独立的、有序的、不可变的消息队列。其组织结构如下图所示:
我们可以看到,每个 Partition 中的消息都是有序的,生产的消息被不断追加到 Partition log 上,其中的每一个消息都被赋予了一个唯一的offset 值。
1)分区的原因
(1)方便在集群中水平扩展: 每个 Partition 可以通过调整以适应它所在的机器,而一个 topic又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了; 通过增加 Partition 数量,可以突破单机存储限制,理论上支持 无限数据规模(例如 PB 级数据)
(2)可以提高并发,提升吞吐量:因为可以以 Partition 为单位读写了。 不同 Partition 可以分布在不同的 Broker(Kafka 服务器)上,读写操作可以并行进行。
(3) 可以灵活调整: 若数据量激增,可以通过增加 Partition 数量来扩展 Topic 容量,而无需停机。
2)分区的原则
1. 默认分区策略(DefaultPartitioner)
- 有 Key 的消息:
1. 对 Key 进行哈希(默认使用 MurmurHash2 算法),然后对分区总数取模: partition = hash(key) % numPartitions
2. 确保相同 Key 的消息始终写入同一分区(保证分区内有序)。
- 无 Key 的消息(Kafka 2.4+ 优化):
1. 粘性分区策略(Sticky Partitioning): 生产者会先将无 Key 的消息“粘性”地批量发送到随机选择的一个分区,直到该批次完成或达到 batch.size 限制后,再切换到另一个随机分区。
优点:减少批次切换的开销,提升吞吐量,同时避免早期轮询策略的负载不均问题。
//DefaultPartitioner
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
int numPartitions) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
}
2. 自定义分区策略
可通过实现
org.apache.kafka.clients.producer.Partitioner 接口自定义分区逻辑,例如:按业务规则路由(如用户ID前缀、地域等)。
动态调整分区权重(如热点数据分散到更多分区)。
配置方式:
properties.put("partitioner.class", "com.example.CustomPartitioner");
com.example.CustomPartitioner: 是第一步实现Partitioner接口的自定义实现类的全限定名
3.副本(Replication)
同 一 个 partition 可 能 会 有 多 个 replication ( 对 应 server.properties 配 置 中 的default.replication.factor=N)也可以在创建Topic时,指定副本数量(--replication-factor N)。没有 replication 的情况下,一旦 broker 宕机,其上所有 patition 的数据都不可被消费,同时 producer 也不能再将数据存于其上的 patition。引入 replication之后,同一个 partition 可能会有多个 replication,而这时需要在这些 replication 之间选出一 个 leader,producer 和 consumer 只与这个 leader 交互,其它 replication 作为 follower 从 leader 中复制数据。 而且Kafka 需要为每个分区分配副本(Replicas)到不同的 Broker 上。
4. 写入流程
producer 写入消息流程如下:
1)producer 先从 zookeeper 的 "/brokers/.../state"节点找到该 partition 的 leader
2)producer 将消息发送给该 leader
3)leader 将消息写入本地 log
4)followers 从 leader pull 消息,写入本地 log 后向 leader 发送 ACK
5)leader 收到所有 ISR 中的 replication 的 ACK 后,增加 HW(high watermark,最后 commit
的 offset)并向 producer 发送 ACK
Producer发送消息的细节:
Producer 在消息发送的过程中,涉及到两个线程,main线程和sender线程,其中main线程是消息的生产线程,而sender线程是jvm单例的线程,专门用于消息的发送。
在jvm的内存中开辟了一块缓存空间叫RecordAccumulator(消息累加器/消息收集器),用于将多条消息合并成一个批次,然后由sender线程发送给kafka集群。
我们的一条消息在生产过程会调用send方法然后经过拦截器经过序列化器,再经过分区器确定消息发送在具体topic下的哪个分区,然后发送到对应的消息累加器中,消息累加器是多个双端队列。并且每个队列和主题分区都具有一一映射关系。消息在累加器中,进行合并,达到了对应的size(batch.size)或者等待超过对应的等待时间(linger.ms),都会触发sender线程的发送。sender线程有一个请求池,默认缓存五个请求( max.in.flight.requests.per.connection ),发送消息后,会等待服务端的ack,如果没收到ack就会重试默认重试int最大值( retries )。如果ack成功就会删除累加器中的消息批次,并响应给到生产端。
当双端队列中的DQueue满足 batch.size 或者 linger.ms 条件时触发sender线程。
5. 生产者拦截器
Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。
对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会 对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor
按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是
org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
//获取配置信息和初始化数据时调用。
configure(configs)
//该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在 消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好 保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算
onSend(ProducerRecord)
//KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement() 方法,优先于用户设定的 Callback 之前执行。这个方法运行在 Producer 的 I/O 线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。
onAcknowledgement(RecordMetadata, Exception)
//关闭 interceptor,主要用于执行一些资源清理工作
close()
如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
案例:
- 需求:
实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间 戳信息加到消息 value 的最后面;第二个 interceptor 会在消息发送后更新成功发送消息数或 失败发送消息数。
- 代码实现
增加时间戳拦截器
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
/**
* 增加时间戳拦截器
*/
public class TimeInterceptor implements ProducerInterceptor<String,String> {
@Override
//发送消息之前,会调用此方法
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 创建一个新的 record,把时间戳写入消息体的最后面
return new ProducerRecord(record.topic(), record.partition(),
record.timestamp(), record.key(),
record.value().toString()+ "," + System.currentTimeMillis() );
}
@Override
//成功发送数据完毕,服务器返回的响应之前, 或者是发送失败, 会调用该方法
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
//生产者关闭的时候,会调用此方法
public void close() {
}
@Override
//创建生产者对象的时候调用
public void configure(Map<String, ?> configs) {
}
}
- 统计发送消息成功和发送失败消息数,并在 producer 关闭时打印这两个计数器
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class CounterInterceptor implements ProducerInterceptor<String,String> {
private int successCounter = 0;
private int errorCounter = 0;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 统计成功和失败的次数
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
// 保存结果
System.out.println("成功: " + successCounter);
System.out.println("失败: " + errorCounter);
}
@Override
public void configure(Map<String, ?> configs) {
}
}
- producer 主程序
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Arrays;
import java.util.Properties;
public class InterceptorProducer {
public static void main(String[] args) {
//1.设置生产者配置参数
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "worker1:9092");
// key序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// value序列化
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//构建拦截链
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName(),CounterInterceptor.class.getName()));
//2.创建生产者
Producer<String, String> producer = new KafkaProducer<>(props);
// 3 发送消息
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("test",
"message" + i);
producer.send(record);
}
// 4 一定要关闭 producer,这样才会调用 interceptor 的 close 方法
producer.close();
}
}
- 测试
- (1)在 kafka 上启动消费者,然后运行客户端 java 程序。
bin/kafka-console-consumer.sh --bootstrap-server worker2:9092 --from-beginning --topic test
message0,1742651703108
message1,1742651703619
message2,1742651703620
message3,1742651703620
message4,1742651703620
message5,1742651703620
message6,1742651703620
message7,1742651703620
message8,1742651703620
message9,1742651703620
(2) 观察 idea控制台输出数据如下
成功: 10
失败: 0
6 序列化器
1、序列化器概述
生产者需要用序列化器(Serializer)把对象转化为字节数组才能通过网络发送给 Kafka。而在消费者需要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换为相应的对象。上面对应程序中的序列化器也使用了客户端自带的 org.apache.kafka.common.serialization.StringSerializer,除了用于 String 类型的序列化器,还有 ByteArray、ByteBuffer、Bytes、Double、Integer、Long 这几种类型,它们都实现了 org.apache.kafka.common.serialization.Serializer 接口,该接口有三个方法:
- void configure(Mapvar1, boolean var2);
- byte[] serialize(String var1, T var2);
- void close();
configure() 方法用来配置当前类,serialize() 方法用来执行序列化操作,close() 方法用来关闭当前的序列化器,一般情况下,close() 是一个空方法。生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的。
2、StringSerializer
接下来,我们看一下 Kafka 自带的 StringSerializer ,将 String 类型转为 byte[] 类型:
public class StringSerializer implements Serializer<String> {
private Charset encoding = StandardCharsets.UTF_8;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue instanceof String) {
String encodingName = (String) encodingValue;
try {
encoding = Charset.forName(encodingName);
} catch (UnsupportedCharsetException | IllegalCharsetNameException e) {
throw new SerializationException("Unsupported encoding " + encodingName, e);
}
}
}
@Override
public byte[] serialize(String topic, String data) {
if (data == null)
return null;
else
return data.getBytes(encoding);
}
}
如果你需要自定义序列化器,其实也很简单, 只需要编写一个类去实现org.apache.kafka.common.serialization.StringSerializer接口, 然后重写它的方法. 并在设置序列化时,设置为你自定义的类全限定名就可以了.
7 分区器
1 分区的好处
- 便于合理使用存储资源, kafka会尽量让Partition分配到不同的Broker上存储,那这样可以把海量的数据按照分区分割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果
- 提高并行度,生产者可以以分区为单位发送数据,消费者可以以分区为单位进行消费数据.
2 生产者发送消息分区策略
1)默认的分区器
默认的分区器 org.apache.kafka.clients.producer.internals.DefaultPartitioner,它实现了 org.apache.kafka.clients.producer.Partitioner接口,这个接口定义了 2 个方法,具体如下所示:
- int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
- void close();
其中 partition() 方法用来计算分区号,返回值为 int 类型。partition() 方法中的参数分别为:主题、key、序列化后的 key、value、序列化后的 value,以及集群的元数据信息。通过这些可以实现功能丰富的分区器。close() 方法在关闭分区器的时候用来回收一些资源。
kafka生产者支持三种三种分区策略 1) 指定分区; 2)指定key,计算hash得分区; 3)指定随机粘性分区
我们可以查看KafkaProducer类的partition()方法代码
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
//如果消息指定分区, 就使用指定分区
if (record.partition() != null)
return record.partition();
//分区器默认的是DefaultPartitioner
if (partitioner != null) {
int customPartition = partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
if (customPartition < 0) {
throw new IllegalArgumentException(String.format(
"The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
}
return customPartition;
}
if (serializedKey != null && !partitionerIgnoreKeys) {
// hash the keyBytes to choose a partition
return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
} else {
return RecordMetadata.UNKNOWN_PARTITION;
}
}
在默认分区器 DefaultPartitioner 的实现中,如果 key 不为 null,那么默认的分区器会对 key 进行哈希(采用 MurmurHash2 算法,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算分区号,拥有相同 key 的消息会被写入到同一分区。如果 key 为 null,那么消息将会以轮询的方式发往主题内的各个可用分区中。下面是DefaultPartitioner类的partition()分区方法代码
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,int numPartitions) {
//如果key为空, kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,kafka再随机一个分区进行实验(和上一次的分区不一样)
//例如: 第一次随机选择0号分区,等到0号分区当前批次满了(默认16K)或者linger.ms设置的时间到了,kafka再随机一个分区进行使用(如果还是0会继续随机)
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
//如果key不为空,将key的hash值与topic的Partition数进行取余得到Partition的值
return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
}
2)自定义分区器 如果研发人员可以根据企业需求,自己重新实现分区器。
1)需求 例如我们实现一个分区器实现,发送过来的数据中如果包含 Hi,就发往 0 号分区,不包含 Hi,就发往 1 号分区。 2)实现步骤 (1)定义类实现 Partitioner 接口。 (2)重写 partition()方法。
package com.fs.kafka.demo2;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyPartitioner implements Partitioner {
/**
* @param topic 主题
* @param key 消息的 key
* @param keyBytes 消息的 key 序列化后的字节数组
* @param value 消息的 value
* @param valueBytes 消息的 value 序列化后的字节数组
* @param cluster 集群元数据可以查看分区信息
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String string = value.toString();
if (string.startsWith("hello")){
return 0;
}else{
return 1;
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
(3)使用分区器的方法,在生产者的配置中添加分区器参数。
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());
8 消息收集器
消息收集器(RecordAccumulator)
为了提高生产者的吞吐量,我们通过收集器将多条消息合并成一批统一发送。在broker中将消息批量存入。减少多次的网络IO。
消息收集器默认32m,也就是buffer.memory配置
收集器的存储形式为ConcurrentMap<TopicPartition, Deque<ProducerBatch>>,可以看出来就是一个分区对应一个双端队列,队列中存储的是ProducerBatch一般大小是16k根据batch.size配置,新的消息会append到ProducerBatch中,满16k就会创建新的ProducerBatch,并且触发sender线程进行发送。
9 消息发送线程(Sender)
消息保存在内存后,Sender线程就会把符合条件的消息按照批次进行发送。除了发送消息,元数据的加载也是通过Sender线程来处理的。
Sender线程发送消息以及接收消息,都是基于java NIO的Selector。通过Selector把消息发出去,并通过Selector接收消息。
Sender线程默认容纳5个未确认的消息,消息发送失败后会进行重试。
10 消息确认机制-ACK
producer提供了三种消息确认的模式,通过配置acks来实现
acks为0时, 表示生产者将数据发送出去就不管了,不等待任何返回。这种情况下数据传输效率最高,但是数据可靠性最低,当 server挂掉的时候就会丢数据;
acks为1时(默认),表示数据发送到Kafka后,经过leader成功接收消息的的确认,才算发送成功,如果leader宕机了,就会丢失数据。
acks为-1/all时,表示生产者需要等待ISR中的所有follower都确认接收到数据后才算发送完成,这样数据不会丢失,因此可靠性最高,性能最低。
猜你喜欢
- 2025-05-10 Java手写一个bitmap(java手写代码)
- 2025-05-10 MySQL有哪些实现方式?何为插入,何为更新?
- 2025-05-10 自学 C++ 第 6 课 二维数组找最值
- 2025-05-10 斐波那契查找算法(斐波那契查找算法java)
- 2025-05-10 YARN 资源调度器 CapacityScheduler 原理
- 2025-05-10 8张图带你全面了解kafka的核心机制
- 2025-05-10 java数据类型的转换以及精度丢失(java中基本数据类型转换)
- 2025-05-10 C语言中用宏实现求两个数中的最大数
- 2025-05-10 异或的魅力!图解「数组中两个数的最大异或值」
- 2025-05-10 基础函数20例,案例解读,再不掌握就真的Out了
- 05-14TS,TypeScript,Windows环境下构建环境,安装、编译且运行
- 05-14TypeScript 也能开发AI应用了!
- 05-14搞懂 TypeScript 装饰器
- 05-14前端小哥哥:如何使用typescript开发实战项目?
- 05-14在 React 项目中,一般怎么处理错误?
- 05-14react19 常用状态管理
- 05-14Vue3开发极简入门(2):TypeScript定义对象类型
- 05-14C#与TypeScript语法深度对比
- 最近发表
- 标签列表
-
- newcoder (56)
- 字符串的长度是指 (45)
- drawcontours()参数说明 (60)
- unsignedshortint (59)
- postman并发请求 (47)
- python列表删除 (50)
- 左程云什么水平 (56)
- 计算机网络的拓扑结构是指() (45)
- 编程题 (64)
- postgresql默认端口 (66)
- 数据库的概念模型独立于 (48)
- 产生系统死锁的原因可能是由于 (51)
- 数据库中只存放视图的 (62)
- 在vi中退出不保存的命令是 (53)
- 哪个命令可以将普通用户转换成超级用户 (49)
- noscript标签的作用 (48)
- 联合利华网申 (49)
- swagger和postman (46)
- 结构化程序设计主要强调 (53)
- 172.1 (57)
- apipostwebsocket (47)
- 唯品会后台 (61)
- 简历助手 (56)
- offshow (61)
- mysql数据库面试题 (57)