请选择 进入手机版 | 继续访问电脑版

java客户端作为kafka生产者测试

[复制链接]
余峻 发表于 2021-1-2 17:40:33 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
【README】

1、本文主要对 java客户端作为kafka 生产者举行测试, 消费者由 centos的kafka下令行线程饰演; 
2、消息发送: kafka的生产者采取异步发送消息的方式,在消息发送过程中,涉及到2个线程——main线程和sender线程,以及一个线程共享变量 RecordAccumulator。main线程将消息发送给 RecordAccumulator,sender线程不绝从 RecordAccumulator 中读取数据发送到 kafka broker;
3、开发情况
  1. -- pom.xml                                org.apache.kafka                  kafka-clients                  0.11.0.0                                org.slf4j            slf4j-simple            1.7.25            compile    -- log4j.propertieslog4j.rootLogger=INFO, stdout  log4j.appender.stdout=org.apache.log4j.ConsoleAppender  log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n  log4j.appender.logfile=org.apache.log4j.FileAppender  log4j.appender.logfile.File=target/spring.log  log4j.appender.logfile.layout=org.apache.log4j.PatternLayout  log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n  
复制代码
【0】 生产者同步发送消息

为啥需要同步发送? 因为 kafka可以包管单个分区内消息有序,但无法包管全局有序,即多个分区消息有序; 
存在一些业务场景,需要消息有序;
  1. /* 10.同步发送数据 */                 for (int i = 0; i < 10; i++) {                         try {                                Future future = producer.send(new ProducerRecord("first100", "first100-20210101--D" + i));                                RecordMetadata rMetadata = future.get(); // 调用future的get方法,让main线程阻塞,就可以实现同步发送                         } catch (Exception e) {                                e.printStackTrace();                        }                 }
复制代码
 
 

下面都是异步发送

【1】平凡生产者

1.1、生产者代码 
  1. /** * 平凡生产者  */public class MyProducer {        public static void main(String[] args) {                /* 1.创建kafka生产者的设置信息 */                Properties props = new Properties();                /*2.指定毗连的kafka集群, broker-list */                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");                  /*3.ack应答级别*/                props.put(ProducerConfig.ACKS_CONFIG, "all");                /*4.重试次数*/                 props.put(ProducerConfig.RETRIES_CONFIG, 3);                 /*5.批次巨细,一次发送多少数据,当数据大于16k,生产者会发送数据到 kafka集群 */                props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K);                  /*6.等待时间, 等待时间高出1毫秒,即便数据没有大于16k, 也会写数据到kafka集群 */                props.put(ProducerConfig.LINGER_MS_CONFIG, 1);                 /*7. RecordAccumulator 缓冲区巨细*/                 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);                  /*8. key, value 的序列化类 */                 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());                props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());                                System.out.println(props);                 /* 9.创建生产者对象 */                KafkaProducer producer = new KafkaProducer(props);                  /* 10.发送数据 */                 for (int i = 0; i < 10; i++) {                         Future future = producer.send(new ProducerRecord("first100", "first100-20210101--D" + i));                        try {                                System.out.println(future.get().partition() + "-" + future.get().offset());                        } catch (Exception e) {                                e.printStackTrace();                        }                 }                /* 11.关闭资源 */                  producer.close();                System.out.println("kafka生产者写入数据完成");         } }-- 日志 0-1832030-1832040-1832050-1832060-1832070-1832080-1832090-1832100-1832110-183212kafka生产者写入数据完成
复制代码
1.2、消费者
  1. [root@centos201 ~]# kafka-console-consumer.sh --topic first100 --bootstrap-server centos201:9092first100-20210101--D0first100-20210101--D1first100-20210101--D2first100-20210101--D3first100-20210101--D4first100-20210101--D5first100-20210101--D6first100-20210101--D7first100-20210101--D8first100-20210101--D9
复制代码
【2】带回调的生产者

2.1、生产者
  1. /**                 * 带回调的生产者                  */                for (int i = 0; i < 10; i++) {                         Future future = producer.send(new ProducerRecord("first100", "first100-20210101--E" + i)                                        , (metadata, exception)-> {                                /* lambda 表达式  */                                System.out.println(metadata.partition() + " -- " + metadata.offset());                        });                }
复制代码
2.2、消费者
  1. first100-20210101--E0first100-20210101--E1first100-20210101--E2first100-20210101--E3first100-20210101--E4first100-20210101--E5first100-20210101--E6first100-20210101--E7first100-20210101--E8first100-20210101--E9
复制代码
【3】创建分区计谋的生产者 (指定分区)

0、查察topic, 4个分区,3个副本
  1. [root@centos201 ~]# kafka-topics.sh --describe --topic aaa --zookeeper centos201:2181Topic:aaa       PartitionCount:4        ReplicationFactor:3     Configs:        Topic: aaa      Partition: 0    Leader: 2       Replicas: 2,1,3 Isr: 1,2,3        Topic: aaa      Partition: 1    Leader: 3       Replicas: 3,2,1 Isr: 1,2,3        Topic: aaa      Partition: 2    Leader: 1       Replicas: 1,3,2 Isr: 2,1,3        Topic: aaa      Partition: 3    Leader: 2       Replicas: 2,3,1 Isr: 1,2,3
复制代码
3.1、生产者
  1. props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName()); // 设置分区器                 /* 9.创建生产者对象 */                KafkaProducer producer = new KafkaProducer(props);                  /* 10.发送数据 */                 for (int i = 0; i < 10; i++) {                         Future future = producer.send(new ProducerRecord("aaa", "aaa-key", "aaa-20210101--B" + i)                                        , (metadata, exception)-> {                                                /* lambda 表达式  */                                                System.out.println(metadata.partition() + " -- " + metadata.offset());                        });                }-- 日志1 -- 1121 -- 1131 -- 1141 -- 1151 -- 1161 -- 1171 -- 1181 -- 1191 -- 1201 -- 121kafka生产者写入数据完成
复制代码
3.2、自界说分区器 
  1. /** * 自界说分区器 */public class MyPartitioner implements Partitioner {        @Override        public void configure(Map configs) {                        }        @Override        public int partition(String topic, Object key, byte[] keyBytes,                        Object value, byte[] valueBytes, Cluster cluster) {                Integer integer = cluster.partitionCountForTopic(topic);                                return 1;        }        @Override        public void close() {                        }}
复制代码
3.3、消费者
  1. [root@centos201 ~]# kafka-console-consumer.sh --topic aaa --bootstrap-server centos201:9092aaa-20210101--C0aaa-20210101--C1aaa-20210101--C2aaa-20210101--C3aaa-20210101--C4aaa-20210101--C5aaa-20210101--C6aaa-20210101--C7aaa-20210101--C8aaa-20210101--C9
复制代码
小结: 可以查察,即便topic 有4个分区,但我在自界说分区器中指定写入到分区1, 所以生产者只把消息写到分区1; 
 
 
 
 

来源:https://blog.csdn.net/PacosonSWJTU/article/details/112058396
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

发布主题

专注素材教程免费分享
全国免费热线电话

18768367769

周一至周日9:00-23:00

反馈建议

27428564@qq.com 在线QQ咨询

扫描二维码关注我们

Powered by Discuz! X3.4© 2001-2013 Comsenz Inc.( 蜀ICP备2021001884号-1 )