请选择 进入手机版 | 继续访问电脑版

clickhouse 九(同步kafka数据)

[复制链接]
余峻 发表于 2020-12-31 17:52:34 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
1 步调

kafka作为消息队列通常用来收集各个服务产生的数据,而下游各种数据服务订阅消费数据,本文通过使用clickhouse 自带的kafka 引擎,来同步消费数据。
同步步调:


  • kafka中创建topic,创建消费者并消费该topic(查看消费情况)
  • 建立目标表(通常是MergeTree引擎系列),用来存储kafka中的数据;
  • 建立kafka引擎表,用于接入kafka数据源;
  • 创建Materialized View(物化视图), 监听kafka中的数据并将数据同步到clickhouse的目标表中;
同步流程图如下:

2 创建测试数据源

  1. bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka-reader# 创建消费者指定topic./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning  --topic kafka-reader --group kafka-reader-group
复制代码
3 创建数据储存目标表

  1. CREATE TABLE target(day Date,level String,message String) ENGINE = SummingMergeTree(day, (day, level), 8192);
复制代码
4 创建kafka消费表

1 使用kafka引擎创建queue表来毗连kafka并读取topic中的数据。该数据表订阅了名为kafka-reader的消息主题,且消费组的名称为kafka-reader-group,⽽消息的格式采⽤了JSONEachRow。
2 在此之后,查询这张数据表就可以大概看到Kafka的数据了。但是再次查询这张便就会没有数据了,这是因为Kafka表引擎在执⾏查询之后就会删除表内的数据。
  1.   CREATE TABLE queue (    timestamp DateTime,    level String,    message String  )  ENGINE = KafkaSETTINGS kafka_broker_list = '192.168.9.226:9092',       kafka_topic_list = 'kafka-reader',           kafka_row_delimiter = '\n',       kafka_group_name = 'kafka-reader-group',       kafka_format = 'JSONEachRow'
复制代码
参数解析–须要参数:


  • kafka_broker_list – 以逗号分隔的kafka的brokers 列表 (192.168.9.226:9092)。
  • kafka_topic_list – topic 列表 (kafka-reader)。
  • kafka_group_name – Kafka 消费组名称 (kafka-reader-group)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。
  • kafka_format – 消息体格式。JSONEachRow也就是平凡的json格式,使用与 SQL 部分的 FORMAT 函数相同表现方法。
参数解析–可选参数:


  • kafka_row_delimiter - 每个消息体之间的分隔符。
  • kafka_schema – 如果解析格式需要一个 schema 时,此参数必填。比方,普罗托船长 需要 schema - 文件路径以及根对象 schema.capnp:Message 的名字。
  • kafka_num_consumers – 单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不敷,则指定更多的消费者。消费者的总数不应该凌驾 topic 中分区的数量,因为每个分区只能分配一个消费者。
5 创建Materialized View(物化视图)传输数据

创建好的物化视图,它将会在后台收集数据。可以连续不断地从 Kafka 收集数据并通过 SELECT 将数据转换为所需要的格式。
  1. CREATE MATERIALIZED VIEW consumer TO target    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total    FROM queue GROUP BY day, level;
复制代码
6 测试

生产者添加数据:

查询目标表,查看消费数据
  1. SELECT *FROM target┌────────day─┬─level─┬─message─┐│ 2020-12-01 │ 11    │ 不开心   ││ 2020-12-30 │ 13    │ 写博客   ││ 2020-12-31 │ 15    │ 买可乐   ││ 2020-12-31 │ 17    │ 真好喝   │└────────────┴───────┴─────────┘
复制代码
查询consumer物化视图表,一般得到的数据和目标表差不多,除非实时数据许多,停止吸收topic数据或更改转换逻辑需要停用物化视图,更改完之后再启用物化视图
  1. # 停用 DETACH TABLE consumer;# 启用   ATTACH TABLE consumer;
复制代码
总结

clickhouse消费kafka数据的过程中,通过kafka引擎表作为一个管道吸收流入的数据,而物化视图负责将kafka引擎表的数据实时同步到目标表中,我们通过不同sql语句封装将kafka数据导入到不同目标表中,这是不错消费方式。
创作不易,喜欢的话可以点赞加关注哦!阿里嘎多民那桑!
参考文章: clickhouse官方文档

来源:https://blog.csdn.net/weixin_39025362/article/details/111989248
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则


专注素材教程免费分享
全国免费热线电话

18768367769

周一至周日9:00-23:00

反馈建议

27428564@qq.com 在线QQ咨询

扫描二维码关注我们

Powered by Discuz! X3.4© 2001-2013 Comsenz Inc.( 蜀ICP备2021001884号-1 )