请选择 进入手机版 | 继续访问电脑版

springboot整合kafka

[复制链接]
期待幸福 发表于 2021-1-1 18:31:14 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
前言

kafka集群的搭建、kafka manager集群管理软件,我在上一篇内里已经写完了。参考:https://blog.csdn.net/m0_37606574/article/details/111952708
 
Springboot整合kafka

我springboot起步依赖是2.4.0版本,版本比力新,以下代码是从一个知乎用户分享的案例,亲测有效。
 
添加maven依赖
  1.    org.springframework.kafka   spring-kafka   2.4.0.RELEASE
复制代码
 
添加消费者设置
[code]import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import org.springframework.kafka.listener.ContainerProperties;import java.util.HashMap;import java.util.Map;@Configurationpublic class KafkaConsumerConfig {    private static final String GROUP0_ID = "group0";    private static final String GROUP1_ID = "group1";    /**     * 1. setAckMode: 消费者手动提交ack     *     * RECORD: 每处置处罚完一条记载后提交。     * BATCH(默认): 每次poll一批数据后提交一次,频率取决于每次poll的调用频率。     * TIME: 每次隔断ackTime的时间提交。     * COUNT: 处置处罚完poll的一批数据后而且隔断上次提交处置处罚的记载数高出了设置的ackCount就提交。     * COUNT_TIME: TIME和COUNT中任意一条满足即提交。     * MANUAL: 手动调用Acknowledgment.acknowledge()后,而且处置处罚完poll的这批数据后提交。     * MANUAL_IMMEDIATE: 手动调用Acknowledgment.acknowledge()后立即提交。     *     * 2. factory.setConcurrency(3);     * 此处设置的目标在于:假设 topic test 下有 0、1、2三个 partition,Spring Boot中只有一个 @KafkaListener() 消费者订阅此 topic,此处设置并发为3,     * 启动后 会有三个差异的消费者分别订阅 p0、p1、p2,本地实际有三个消费者线程。     * 而 factory.setConcurrency(1); 的话 本地只有一个消费者线程, p0、p1、p2被同一个消费者订阅。     * 由于 一个partition只能被同一个消费者组下的一个消费者订阅,对于只有一个 partition的topic,纵然设置 并发为3,也只会有一个消费者,多余的消费者没有 partition可以订阅。     *     * 3. factory.setBatchListener(true);     * 设置批量消费 ,每个批次数量在Kafka设置参数ConsumerConfig.MAX_POLL_RECORDS_CONFIG中设置,     * 限制的是 一次批量吸收的最大条数,而不是 等到到达最大条数才吸收,这点容易被误解。     * 实际测试时,吸收是实时的,当生产者大量写入时,一次批量吸收的消息数量为 设置的最大条数。     */    @Bean    KafkaListenerContainerFactory kafkaListenerContainerFactory() {        ConcurrentKafkaListenerContainerFactory                factory = new ConcurrentKafkaListenerContainerFactory();        // 设置消费者工厂        factory.setConsumerFactory(consumerFactory());        // 设置为批量消费,每个批次数量在Kafka设置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG        factory.setBatchListener(true);        // 消费者组中线程数量,消费者数量
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则


专注素材教程免费分享
全国免费热线电话

18768367769

周一至周日9:00-23:00

反馈建议

27428564@qq.com 在线QQ咨询

扫描二维码关注我们

Powered by Discuz! X3.4© 2001-2013 Comsenz Inc.( 蜀ICP备2021001884号-1 )