kafka学习_kafka实战教程
项目背景公司要开发一套用户广告归因系统,投放专家目前在头条、广点通、小米、华为、oppo、vivo等渠道投放广告信息。我们开发的系统在用户点击广告后,需要记录用户的设备信息和广告的关系信息,方便后面做用户归因。
项目背景公司要开发一套用户广告归因系统,投放专家目前在头条、广点通、小米、华为、oppo、vivo等渠道投放广告信息我们开发的系统在用户点击广告后,需要记录用户的设备信息和广告的关系信息,方便后面做用户归因。
广告的点击数据并发量巨大,为了提高用户的体验,使用消息队列异步写入数据消费者消费 kafka 中的消息,并把消息写入ClickHouse ,而ClickHouse在写入数据太频繁会报异常信息:Exception: Too many parts (302). Merges are processing significantly slower than inserts。
我们的痛点需要解决以下问题1、消息能够快速消费,并且无积压消息2、解决批量写入ClickHouse异常问题。流程图如下:

Kafka参数配置在开发前,我们需要对kafka的常用配置做下了解kafka 的消费者配置类是ConsumerConfig,可以使用它来设置消费者的配置信息下面我对它的常用配置信息做下介绍1、enable.auto.commit。
该属性指定了消费者是否自动提交偏移量,默认值是true为了尽量避免出现重复数据(假如,某个消费者poll消息后,应用正在处理消息,在3秒后kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费)和数据丢失,可以把它设为 false,由自己控制何时提交偏移量。
如果把它设为true,还可以通过配置 auto.commit.interval.ms 属性来控制提交的频率2、auto.commit.interval.ms自动提交间隔范围:[0,Integer.MAX],默认值是 5000 (5 s)。
3、手动提交:commitSync/commitAsync手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)相同点:都会将本次poll的一批数据最大的偏移量提交。
不同点:commitSync会阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败,导致重复消费4、max.poll.records。
Consumer每次调用poll()时取到的records的最大数5、fetch.min.bytes该属性指定了消费者从服务器获取记录的最小字节数broker 在收到消费者的数据请求时,如果可用的数据量小于 fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。
这样可以降低消费者和 broker 的工作负载,因为它们在主题不是很活跃的时候(或者一天里的低谷时段)就不需要来来回回地处理消息如果没有很多可用数据,但消费者的 CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。
如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载6、fetch.max.wait.ms我们通过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。
而 fetch.max.wait.ms 则用于指定 broker 的等待时间,默认是 500ms如果没有足够的数据流入 Kafka,消费者获取最小数据量的要求就得不到满足,最终导致 500ms 的延迟如果要降低潜在的延迟(为了满足 SLA),可以把该参数值设置得小一些。
如果 fetch.max.wait.ms 被设为 100ms,并且 fetch.min.bytes 被设为 1MB,那么 Kafka 在收到消费者的请求后,要么返回 1MB 数据,要么在 100ms 后返回所有可用的数据,就看哪个条件先得到满足。
项目实战我们的项目使用Spring boot 开发分以下场景演示项目:1、引入kafka 依赖包2、配置消费者信息3、@KafkaListener 注解介绍4、单次消息消费5、批量消息消费6、并发消息消费。
引入kafka 开发包org.springframework.kafkaspring-kafka
>2.5.4.RELEASEkafka配置kafka:bootstrap-servers:127.0.0.1:50001producer:
key-deserializer:org.apache.kafka.common.serialization.StringDeserializervalue-deserializer:org.apache.kafka.common.serialization.StringDeserializer
properties:# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafkalinger.ms:200sasl.mechanism:PLAIN
security.protocol:SASL_PLAINTEXTconsumer:group-id:link-click-consumer# 当 Broker 端没有 offset(如第一次消费或 offset 超过7天过期)时如何初始化 offset,当收到 OFFSET_OUT_OF_RANGE 错误时,如何重置 Offset
# earliest:表示自动重置到 partition 的最小 offset# latest:默认为 latest,表示自动重置到 partition 的最大 offset# none:不自动进行 offset 重置,抛
auto-offset-reset:earliest# 是否在消费消息后将 offset 同步到 Broker,当 Consumer 失败后就能从 Broker 获取最新的 offsetenable-auto-commit:
false## 当 auto.commit.enable=true 时,自动提交 Offset 的时间间隔,建议设置至少1000auto-commit-interval:2000max-poll-records:
200poll-timeout:1000heartbeat-interval:3000key-deserializer:org.apache.kafka.common.serialization.StringDeserializer
value-deserializer:org.apache.kafka.common.serialization.StringDeserializerproperties:# 使用 Kafka 消费分组机制时,消费者超时时间。
当 Broker 在该时间内没有收到消费者的心跳时,认为该消费者故障失败,Broker 发起重新 Rebalance 过程目前该值的配置必须在 Broker 配置group.min.session.timeout.ms=6000和group.max.session.timeout.ms=300000 之间。
session.timeout.ms:60000# 使用 Kafka 消费分组机制时,消费者发送心跳的间隔这个值必须小于 session.timeout.ms,一般小于它的三分之一heartbeat.interval.ms:。
3000# 使用 Kafka 消费分组机制时,再次调用 poll 允许的最大间隔如果在该时间内没有再次调用 poll,则认为该消费者已经失败,Broker 会重新发起 Rebalance 把分配给它的 partition 分配给其他消费者。
max.poll.interval.ms:300000fetch.max.wait.ms:1000# 该属性指定了消费者从服务器消费数据的最小字节数fetch.min.bytes:2048request.timeout.ms:
600000sasl.mechanism:PLAINsecurity.protocol:SASL_PLAINTEXTlistener:# 在侦听器容器中运行的线程数concurrency:2type:batch
max-poll-records:200#当 auto.commit.enable 设置为false时,表示kafak的offset由customer手动维护,#spring-kafka提供了通过ackMode的值表示不同的手动提交方式
#手动调用Acknowledgment.acknowledge()后立即提交ack-mode:manual_immediate# 消费者监听的topic不存在时,项目会报错,设置为falsemissing-topics-fatal:
false@KafkaListener 注解@KafkaListener是Spring Kafka提供的注解,用于监听Kafka Topic上的消息当一个方法被@KafkaListener注解修饰后,该方法将会自动注册为一个Kafka消息监听器。
当Kafka Topic上有消息到达时,该方法将会被调用@KafkaListener注解可以用在方法上,也可以用在类上用在方法上时,方法的参数可以是以下几种类型:ConsumerRecord:表示Kafka消息的实体对象,包含了消息的键值对以及相关的元数据信息。
Message:表示Kafka消息的包装对象,包含了消息的实体对象以及相关的元数据信息消息实体对象类型:表示只监听指定消息类型的情况当使用消息实体对象类型作为方法参数类型时,需要在注解中指定消息类型,例如:。
@KafkaListener(topics = "link-click-topic", groupId = "link-click-consumer", containerFactory = "kafkaListenerContainerFactory"
, concurrency = "3", errorHandler = "myErrorHandler", autoStartup = "true") publicvoidlistenToTestTopic
(ClickMessage message){ //处理业务数据//doBuiness(); }@KafkaListener注解中的属性含义如下:topics:监听的Kafka Topic名称,支持设置多个Topic,用逗号分隔。
groupId:该监听器所属的消费者组ID,默认为groupcontainerFactory:指定使用的Kafka消息监听器容器工厂,默认为kafkaListenerContainerFactoryerrorHandler:指定错误处理器的名称。
concurrency:指定Kafka消息监听器容器的并发数,默认为1autoStartup:指定是否在Spring容器初始化时自动启动该监听器,默认为true使用@KafkaListener注解可以轻松实现Kafka消息的消费,同时也可以基于Spring的依赖注入和AOP能力,方便地实现消息处理逻辑的复用和管理。
单次消费消息
Kafka配置信息spring:kafka:consumer:# 手动ack-mode:manual_immediate#设置是否批量消费,默认 single(单条),batch(批量)type:single
# 自动提交 offset 默认 trueenable-auto-commit:false/** * kafka的单次消费消息 */@KafkaListener(topics = "link-click-topic"
, groupId = "link-click-consumer") publicvoid onMessage(ConsumerRecord record, Consumer consumer) {
try { String message = record.value(); //处理业务数据//doBuiness(); consumer.commitSync(); log.info(
"Consumer>>>>>>>>>>>>>end"); } catch (Exception e) { log.error("Consumer.onMessage#error ."
, e); thrownew BizException("消息消费失败", e); } }单次消费消息,每次消费者只能从broker拉取一条消息消费,,严重影响 消息消费性能,我们当然要批量消费消息啦!
批量消费消息
Kafka 配置信息:fetch.max.wait.ms这个参数也和 fetch.min.bytes 参数有关,如果 Kafka 仅仅参考 fetch.min.bytes 参数的要求,那么有可能会一直阻塞等待而无法发送响应给 Consumer,显然这是不合理的。
fetch.max.wait.ms 参数用于指定 Kafka 的等待时间,默认值为500(ms)如果 Kafka 中没有足够多的消息而满足不了 fetch.min.bytes 参数的要求,那么最终会等待500ms。
这个参数的设定和 Consumer 与 Kafka 之间的延迟也有关系,如果业务应用对延迟敏感,那么可以适当调小这个参数spring:kafka:consumer:# 手动ack-mode:manual_immediate
#设置是否批量消费,默认 single(单条),batch(批量)type:batch# 自动提交 offset 默认 trueenable-auto-commit:false# 批量消费最大数量max-poll-records:
200# 该属性指定了消费者最大 的等待时间,默认是 500msfetch.max.wait.ms:1000# 该属性指定了消费者从服务器消费数据的最小字节数fetch.min.bytes:2048/** * kafka的批量消费监听器 */
@KafkaListener(topics = "link-click-topic", groupId = "link-click-consumer") publicvoid onMessage(List
String, String>> records, Consumer consumer) { try { log.info("Consumer.batch#size={}"
, records == null ? 0 : records.size()); if (CollectionUtil.isEmpty(records)) {
//分别是commitSync(同步提交)和commitAsync(异步提交) consumer.commitSync(); return
; } for (ConsumerRecord record : records) {
String message = record.value(); if (StringUtils.isBlank(message)) {
continue; } //处理业务数据//doBuiness(); } consumer.commitSync(); log.info(
"Consumer>>>>>>>>>>>>>end"); } catch (Exception e) { log.error("Consumer.onMessage#error ."
, e); thrownew BizException("消息消费失败", e); } }并发消费
如果我们的机器数量少于Kafka的分区数,为提高消费效率,需要开启并发消费分区数 = 机器数 * concurrency ,如我们的分区数为6,有3台消费者消费消息,则可以配置concurrency = 2。
注意:并发量根据实际分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态注意Consumer 方法上的注解配置 concurrencyspring:kafka:consumer:group-id:
consumer-groupbootstrap-servers:127.0.0.1:9092max-poll-records:200# 一次 poll 最多返回的记录数listener:type:batch
# 开启批量监听concurrency:2# 设置并发数/** * kafka的批量消费监听器 */@KafkaListener(topics = "link-click-topic"
, groupId = "link-click-consumer", concurrency = "2") publicvoid onMessage(List
, String>> records, Consumer consumer) { try { log.info("Consumer.batch#size={}", records ==
null ? 0 : records.size()); if (CollectionUtil.isEmpty(records)) { //分别是commitSync(同步提交)和commitAsync(异步提交)
consumer.commitSync(); return; } for (ConsumerRecord<
String, String> record : records) { String message = record.value();
if (StringUtils.isBlank(message)) { continue; }
//处理业务数据//doBuiness(); } consumer.commitSync(); log.info("Consumer>>>>>>>>>>>>>end"
); } catch (Exception e) { log.error("Consumer.onMessage#error .", e);
thrownew BizException("消息消费失败", e); } }
- 标签:
- 编辑:
- 相关文章
-
kafka学习_kafka实战教程
项目背景公司要开发一套用户广告归因系统,投放专家目前在头条、广点通、小米、华为、oppo、vivo等渠道投放广告信息。我们开发的系…
-
潍坊托福英语学习班_潍坊托福考点怎么样
连续教学12载,育成雅思创辉煌记者了解到,潍坊育成教育从事教育教学已经12年,英语教学得到了潍坊学生家长的一致好评,学校聘请的海外名…
- 学学习计划_学习计划表格
- 初中学习目标_初中三年的规划与目标
- 哪里可以学习思维导图的课程_哪里有思维导图培训班 中小学生
- 二建学习顺序_学二建的顺序
- 期中学习总结_期中学期总结