请选择 进入手机版 | 继续访问电脑版

Apache Druid 从Kafka加载数据 -- 全流程分析

[复制链接]
黎平 发表于 2021-1-2 17:39:29 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
目次
一、Kafka 创建topic、生产者
二、向kafka生产数据
三、Apache Druid 设置DataSource 数据源
1) Start
2) Connect
3) Pase Data
4) Pase Time
5) Transform【可跳过】
6) Filter 【可跳过】
7) Configure Schema【重点设置】
8) Partition
9) Tune
10) Pulish
11) Edit Json spec
案例一:销售数据查询示例
案例二:通过服务器系统时间(毫秒)作为时间戳上传
设置Apache Druid 数据源DataSource
案例三:通过服务器系统时间(秒)作为时间戳上传
设置Apache Druid 数据源DataSource
一、Kafka 创建topic、生产者

1. 创建topic
  1. kafka-topics.sh --create --zookeeper node-01:2181,node-02:2181,node-03:2181 --replication-factor 1 --partitions 1 --topic fast_sales
复制代码
2. 创建生产者
  1. kafka-console-producer.sh --broker-list node-01:9092,node-02:9092,node-03:9092 --topic fast_sales
复制代码
3. 创建消费者
  1. kafka-console-consumer.sh --bootstrap-server node-01:9092,node-02:9092,node-03:9092 --topic fast_sales --group topic_test1_g1
复制代码
二、向kafka生产数据

  1. {"timestamp":"2020-08-08T01:03.00z","category":"手机","areaName":"北京","monye":"1450"}{"timestamp":"2020-08-08T01:03.00z","category":"手机","areaName":"北京","monye":"1450"}{"timestamp":"2020-08-08T01:03.00z","category":"家电","areaName":"北京","monye":"1550"}{"timestamp":"2020-08-08T01:03.00z","category":"手机","areaName":"深圳","monye":"1000"}{"timestamp":"2020-08-08T01:03.01z","category":"手机","areaName":"深圳","monye":"2000"}{"timestamp":"2020-08-08T01:04.01z","category":"手机","areaName":"深圳","monye":"2200"}
复制代码
三、Apache Druid 设置DataSource 数据源

1) Start


2) Connect


3) Pase Data


4) Pase Time


5) Transform【可跳过】


6) Filter 【可跳过】


7) Configure Schema【重点设置


8) Partition


9) Tune


10) Pulish

Max parse exceptions: 2147483647

11) Edit Json spec


  1. {  "type": "kafka",  "dataSchema": {    "dataSource": "fast_sales",    "parser": {      "type": "string",      "parseSpec": {        "format": "json",        "timestampSpec": {          "column": "timestamp",          "format": "iso"        },        "dimensionsSpec": {          "dimensions": [            "areaName",            "category"          ]        }      }    },    "metricsSpec": [      {        "type": "count",        "name": "count"      },      {        "type": "longSum",        "name": "sum_monye",        "fieldName": "monye",        "expression": null      }    ],    "granularitySpec": {      "type": "uniform",      "segmentGranularity": "DAY",      "queryGranularity": "MINUTE",      "rollup": true,      "intervals": null    },    "transformSpec": {      "filter": null,      "transforms": []    }  },  "tuningConfig": {    "type": "kafka",    "maxRowsInMemory": 1000000,    "maxBytesInMemory": 0,    "maxRowsPerSegment": 5000000,    "maxTotalRows": null,    "intermediatePersistPeriod": "PT10M",    "basePersistDirectory": "/usr/local/imply-3.0.4/var/tmp/1609509057384-0",    "maxPendingPersists": 0,    "indexSpec": {      "bitmap": {        "type": "concise"      },      "dimensionCompression": "lz4",      "metricCompression": "lz4",      "longEncoding": "longs"    },    "buildV9Directly": true,    "reportParseExceptions": false,    "handoffConditionTimeout": 0,    "resetOffsetAutomatically": true,    "segmentWriteOutMediumFactory": null,    "workerThreads": null,    "chatThreads": null,    "chatRetries": 8,    "httpTimeout": "PT10S",    "shutdownTimeout": "PT80S",    "offsetFetchPeriod": "PT30S",    "intermediateHandoffPeriod": "P2147483647D",    "logParseExceptions": true,    "maxParseExceptions": 2147483647,    "maxSavedParseExceptions": 0,    "skipSequenceNumberAvailabilityCheck": false  },  "ioConfig": {    "topic": "fast_sales",    "replicas": 1,    "taskCount": 1,    "taskDuration": "PT3600S",    "consumerProperties": {      "bootstrap.servers": "node-01:9092,node-02:9092,node-03:9092"    },    "pollTimeout": 100,    "startDelay": "PT5S",    "period": "PT30S",    "useEarliestOffset": false,    "completionTimeout": "PT1800S",    "lateMessageRejectionPeriod": null,    "earlyMessageRejectionPeriod": null,    "stream": "fast_sales",    "useEarliestSequenceNumber": false,    "type": "kafka"  },  "context": null,  "suspended": false}
复制代码
案例一:销售数据查询示例

1)数据源

2)追念向Kafka输入数据有,如下:
  1. {"timestamp":"2020-08-08T01:03.00z","category":"手机","areaName":"北京","monye":"1450"}{"timestamp":"2020-08-08T01:03.00z","category":"手机","areaName":"北京","monye":"1450"}{"timestamp":"2020-08-08T01:03.00z","category":"家电","areaName":"北京","monye":"1550"}{"timestamp":"2020-08-08T01:03.00z","category":"手机","areaName":"深圳","monye":"1000"}{"timestamp":"2020-08-08T01:03.01z","category":"手机","areaName":"深圳","monye":"2000"}{"timestamp":"2020-08-08T01:04.01z","category":"手机","areaName":"深圳","monye":"2200"}
复制代码
-- 查询所有数据

-- 按时间范围查询数据

-- 查询输入数据总记载数

-- 按地域、商品种别分类,统计销售总金额 

-- 按地域分组,盘算消费总额

-- 按商品品类分组,盘算消费总额

-- 先搂时间范围过滤,再按地域、商品品类分组,盘算消费总额

案例二:通过服务器系统时间(毫秒)作为时间戳上传

1. 创建topic
  1. kafka-topics.sh --create --zookeeper node-01:2181,node-02:2181,node-03:2181 --replication-factor 1 --partitions 1 --topic fast_sales_test_timestamp
复制代码
2. 创建生产者
  1. kafka-console-producer.sh --broker-list node-01:9092,node-02:9092,node-03:9092 --topic fast_sales_test_timestamp
复制代码
3. 创建消费者
  1. kafka-console-consumer.sh --bootstrap-server node-01:9092,node-02:9092,node-03:9092 --topic fast_sales_test_timestamp --group topic_test1_g1
复制代码
4. Kafka 生产数据
-- 需要在服务器系统时间戳上加上8小时对应的毫秒数偏移量
1609549913324 对应 2021-01-02 09:11:53 + 28800000 = 1609578713324  对应  2021-01-02 17:11:53
1609550385297 对应 2021-01-02 09:19:45 + 28800000 = 1609579185297  对应  2021-01-02 17:19:45
  1. {"timestamp":"1609549913324","category":"手机","areaName":"北京","monye":"1450"}{"timestamp":"1609549913324","category":"手机","areaName":"北京","monye":"1450"}{"timestamp":"1609549913324","category":"家电","areaName":"北京","monye":"1550"}{"timestamp":"1609550385297","category":"手机","areaName":"深圳","monye":"1000"}{"timestamp":"1609550385297","category":"手机","areaName":"深圳","monye":"2000"}{"timestamp":"1609550385297","category":"手机","areaName":"深圳","monye":"2200"}============================== 加上偏8小时偏移后的数据 ================================={"timestamp":"1609578713324","category":"手机","areaName":"北京","monye":"1450"}{"timestamp":"1609578713324","category":"手机","areaName":"北京","monye":"1450"}{"timestamp":"1609578713324","category":"家电","areaName":"北京","monye":"1550"}{"timestamp":"1609579185297","category":"手机","areaName":"深圳","monye":"1000"}{"timestamp":"1609579185297","category":"手机","areaName":"深圳","monye":"2000"}{"timestamp":"1609579185297","category":"手机","areaName":"深圳","monye":"2200"}
复制代码
设置Apache Druid 数据源DataSource

  1. {  "type": "kafka",  "dataSchema": {    "dataSource": "fast_sales_test_timestamp",    "parser": {      "type": "string",      "parseSpec": {        "format": "json",        "timestampSpec": {          "column": "timestamp",          "format": "millis"        },        "dimensionsSpec": {          "dimensions": [            "areaName",            "category"          ]        }      }    },    "metricsSpec": [      {        "type": "count",        "name": "count"      },      {        "type": "longSum",        "name": "sum_monye",        "fieldName": "monye",        "expression": null      }    ],    "granularitySpec": {      "type": "uniform",      "segmentGranularity": "DAY",      "queryGranularity": "MINUTE",      "rollup": true,      "intervals": null    },    "transformSpec": {      "filter": null,      "transforms": []    }  },  "tuningConfig": {    "type": "kafka",    "maxRowsInMemory": 1000000,    "maxBytesInMemory": 0,    "maxRowsPerSegment": 5000000,    "maxTotalRows": null,    "intermediatePersistPeriod": "PT10M",    "basePersistDirectory": "/usr/local/imply-3.0.4/var/tmp/1609509057384-0",    "maxPendingPersists": 0,    "indexSpec": {      "bitmap": {        "type": "concise"      },      "dimensionCompression": "lz4",      "metricCompression": "lz4",      "longEncoding": "longs"    },    "buildV9Directly": true,    "reportParseExceptions": false,    "handoffConditionTimeout": 0,    "resetOffsetAutomatically": true,    "segmentWriteOutMediumFactory": null,    "workerThreads": null,    "chatThreads": null,    "chatRetries": 8,    "httpTimeout": "PT10S",    "shutdownTimeout": "PT80S",    "offsetFetchPeriod": "PT30S",    "intermediateHandoffPeriod": "P2147483647D",    "logParseExceptions": true,    "maxParseExceptions": 2147483647,    "maxSavedParseExceptions": 0,    "skipSequenceNumberAvailabilityCheck": false  },  "ioConfig": {    "topic": "fast_sales_test_timestamp",    "replicas": 1,    "taskCount": 1,    "taskDuration": "PT3600S",    "consumerProperties": {      "bootstrap.servers": "node-01:9092,node-02:9092,node-03:9092"    },    "pollTimeout": 100,    "startDelay": "PT5S",    "period": "PT30S",    "useEarliestOffset": false,    "completionTimeout": "PT1800S",    "lateMessageRejectionPeriod": null,    "earlyMessageRejectionPeriod": null,    "stream": "fast_sales_test_timestamp",    "useEarliestSequenceNumber": false,    "type": "kafka"  },  "context": null,  "suspended": false}
复制代码
 
查询数据示例:
[code]SELECT * FROM  "fast_sales_test_timestamp" -- 查询所有数据-- SELECT * FROM  "fast_sales_test_timestamp" WHERE __time  '2021-01-02T09:11:00.000Z' -- 按时间范围查询数据-- SELECT SUM("count") FROM "fast_sales_test_timestamp" -- 查询输入数据总记载数-- SELECT areaName, category, SUM(sum_monye) FROM "fast_sales_test_timestamp" GROUP BY areaName, category -- 按地域、商品品类分组,盘算消费总额-- SELECT areaName, SUM(sum_monye) FROM "fast_sales_test_timestamp" GROUP BY areaName -- 按地域分组,盘算消费总额-- SELECT category, SUM(sum_monye) FROM "fast_sales_test_timestamp" GROUP BY category -- 按商品品类分组,盘算消费总额-- SELECT SUM(sum_monye) FROM "fast_sales_test_timestamp" -- 盘算消费总额-- SELECT areaName, category, SUM(sum_monye) FROM "fast_sales_test_timestamp" -- WHERE __time
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

发布主题

专注素材教程免费分享
全国免费热线电话

18768367769

周一至周日9:00-23:00

反馈建议

27428564@qq.com 在线QQ咨询

扫描二维码关注我们

Powered by Discuz! X3.4© 2001-2013 Comsenz Inc.( 蜀ICP备2021001884号-1 )