请选择 进入手机版 | 继续访问电脑版

golang MQTT消息服务

[复制链接]
陈雪霜 发表于 2021-1-1 09:59:38 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
作物IoT的公司固然要有物联网的服务,在公司规模还没有形成之初,就筹划要将公司全部智能产物线的设备全部联网,与服务端有很好的通信,这样既可以拿到设备的使用数据,通过大数据的分析形成用户画相,还可以下放其他的一些物联网的服务。这是所有IoT的公司需要构建的根本服务,在做根本架构的时候,数据库选了阿里开发的tablestore,tablestrore适合大数据的公司存储海量的运行数据。消息系统则选了mqtt,来由是mqtt的低消耗、低带宽、轻量化的特点,下面是mqtt简述:
  MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以少少的代码和有限的带宽,为毗连远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
  MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简朴、开放和易于实现的,这些特点使它适用范围非常广泛。在许多情况下,包罗受限的环境中,如:呆板与呆板(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
详细的介绍看这里:
https://www.runoob.com/w3cnote/mqtt-intro.html
MQTT协议实现方式
  实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、署理(Broker)(服务器)、订阅者(Subscribe)。此中,消息的发布者和订阅者都是客户端,消息署理是服务器,消息发布者可以同时是订阅者。
  MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
  (1)Topic,可以明白为消息的范例,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以明白为消息的内容,是指订阅者详细要使用的内容。
看图会比力直观:

实际的使用中客户端和服务端都即可以是生产者,也可以是消费者,将生产的消息发送到broker队列,谁订阅谁消费。这样,大要上工作步调就非常明白了:
  1.注册客户端/服务端
2.获取broker地点
3.订阅需要的topic
4.生产/消费消息
以我开发的客户端的代码为例,供各人讨论:
1.模仿新设备开机激活,拿到mac,sn等信息
  1. //设备注册请求// func Actdev(device []string) (DeviceName string,Sk string,productAlias string,Isact bool,ProductKey string){func Actdev(device models.DevInfo) (error,models.DevInfo){    body := Body{}    Header := make(map[string]string)    form := make(map[string]string)    var dev map[string]string    //盘算sign    // var sign string    var ttime string = strconv.FormatInt(time.Now().UnixNano() / 1e9,10)    dev = map[string]string{        "Product-Key":   device.ProductKey,        "Mac":      device.Mac ,        "Sn":   device.Sn,        "Model": device.ModelName,        "Version":  "V010101RCN01C016011B1901081S",        //"Time": strconv.FormatInt(time.Now().UnixNano() / 1e9,10),        "Time":ttime,    }    // log.Info(dev)    sign := md5.Hmac(device.ProductSecert, dev)    // log.Info("sign: %s\n",sign)    url := "http://beta.evice/v1/register"        //添加 header    Header["Content-Type"]="application/x-www-form-urlencoded"    Header["Product-Key"]=device.ProductKey    Header["Mac"]=device.Mac    Header["Sn"]= device.Sn    Header["Model"]=device.ModelName    Header["Version"]="V010101RCN01C016011B1901081S"    Header["Time"]= ttime    form["sign"]= sign    err,rsp := Postdata(Header, form, dev, url)    if err !=nil{//fail        log.Info(string(rsp))        return err,device    }else{//ok                err1 := json.Unmarshal(rsp, &body)                // log.Info(string(rsp))                if err1 != nil {            log.Info(err1)            return err,device                }                // for k,v :=range(Header){                //         log.Info(k+":"+v)                // }                if body.Code == 10000 {                        // log.Info("--------Device active success!")            // log.Info(body.Code)            device.ProductAlias= body.Data.ProductAlias            device.DeviceName= body.Data.DeviceName            device.DeviceSecret= body.Data.DeviceSecret                        // log.Info("\n")                }else{            log.Info(body.Code)            return nil,device        }        }    return nil, device}
复制代码
2.设备激活注册乐成后获取broker地点:
  1. //传入下发devname获取brokerfunc Getbroker(device models.DevInfo) (error){     body := Body{}    Header := make(map[string]string)    form := make(map[string]string)    var dev map[string]string    key := device.ProductKey + device.DeviceSecret    var ttime string = strconv.FormatInt(time.Now().UnixNano() / 1e9,10)    Header["Content-Type"]="application/x-www-form-urlencoded"    Header["Product-Key"]=device.ProductKey    Header["Mac"]=device.Mac    Header["Sn"]= device.Sn    Header["Model"]=device.ModelName    Header["Version"]="V010101RCN01C016011B1901081S"    Header["Time"]= ttime    nsign := md5.Hmac(key, dev)    //log.Info("sign: %s\n",nsign )    url2 := "http://beta./broker/v1/address?" + "deviceName=" + device.DeviceName + "&sign=" + nsign        rsp, err := Get(url2, Header, form)    if err !=nil{//fail        // log.Info(string(rsp))        return err    }else{//ok                err1 := json.Unmarshal(rsp, &body)                // log.Info(string(rsp))                if err1 != nil {            log.Info(err1)            return err1                }                // for k,v :=range(Header){                //         log.Info(k+":"+v)                // }                if body.Code == 10000 {                        // log.Info("--------GET broker success!")            // log.Info(body.Code)            // dev["ProductAlias"]= Alias                        // dev["deviceName"]= body.Data.DeviceName            // dev["DeviceSecret"]= Secret            // log.Info(dev)                        // log.Info("\n")                }else{            log.Info(body.Code)            return nil        }    }    return nil}
复制代码
3.获取到broker地点后注册mqtt客户端:
  1. func Newmqclient(device models.DevInfo) (Client,error)  {        broker:= "tcp://127.0.0.1:1883"//支持大多数通信协议        opts := mqtt.NewClientOptions().AddBroker(broker)//beta        devname :=  device.DeviceName+"@"+device.ProductAlias        passwd:= device.DeviceSecret        opts.Username = devname        opts.Password = passwd        //timestamp := strconv.FormatInt(time.Now().UTC().UnixNano(), 10)        //timestamp := time.Now().Format("2006-01-02 15:04:05.000000")        clientID:="|aiot|device|"+devname+","+"deviceId="+device.DeviceID+","+"sn="+device.Sn+ "|"        opts.SetClientID(clientID)        // opts.SetDefaultPublishHandler(defaultHandler)        //opts.SetConnectionLostHandler(onLostHandler)        opts.SetAutoReconnect(true)        client := NewClient(broker,devname,passwd,clientID)        // client = mqtt.NewClient(opts)        var err = client.Connect()        if err != nil {                return nil, err        }         // log.Info(opts.ClientID)        // deviceproductkey = productkey        ClientSuccess++        log.Info("%v | MQ is connected  %d\n",device.DeviceName,ClientSuccess)        // DevicemqName = deviceName        //log := timestamp+" | "+deviceName+"connect to MQ " +strings.ToUpper(strconv.FormatBool(Isconn))+"\n"        //logs.Logfile(log)        // for {        //         time.Sleep(3*time.Second)        // }        return client,nil}
复制代码
  这里先简朴介绍mqtt,后续有完整的事件再详细介绍mqtt的其他方法
4.订阅topic:
  1. func (c *defaultClient) SubscribeUplink(client Client, topic string, device models.DevInfo) Token {                return c.client.Subscribe(topic, QoS, func(mqtt MQTT.Client, msg MQTT.Message) {                // Determine the actual topic                log.Info("Success SubscribeUplink with device: %v", device.DeviceName)                CheckReq(client, device, msg.Payload(), topic)//listen        })        }
复制代码
5.监听消息并处置惩罚:
  1. // CheckReq ...func CheckReq(client Client, device models.DevInfo, msg []byte, topic string) {        //log.Infof("[MESSSAGE_RECEIVED] topic=%s, msg=%s, device=%v", topic, string(msg), device.DeviceName)        var msgInfo models.MsgInfo        json.Unmarshal(msg, &msgInfo)        // replyId := msgInfo.MsgId        ota_check_launch := "/aiot/{ProductKey}/{DeviceName}/running/launch"        ota_check_online    := "/aiot/{ProductKey}/{DeviceName}/req/status/online"        ota_version_confirm := "/ota/device/{ProductKey}/{DeviceName}/req/version/inform"        ota_version_notify  := "/ota/device/{ProductKey}/{DeviceName}/req/version/notify"        ota_check_launch =strings.Replace(ota_check_launch, "{ProductKey}", device.ProductKey, -1)        ota_check_online = strings.Replace(ota_check_online, "{ProductKey}", device.ProductKey, -1)        ota_check_online = strings.Replace(ota_check_online, "{DeviceName}", device.DeviceName, -1)        ota_version_confirm = strings.Replace(ota_version_confirm, "{ProductKey}", device.ProductKey, -1)        ota_version_confirm = strings.Replace(ota_version_confirm, "{DeviceName}", device.DeviceName, -1)        ota_version_notify = strings.Replace(ota_version_notify, "{ProductKey}", device.ProductKey, -1)        ota_version_notify = strings.Replace(ota_version_notify, "{DeviceName}", device.DeviceName, -1)        //log.Infof("Received Topic: %s, device: %s, match: %s", topic, device.DeviceName, ota_check_online)        switch topic {        case ota_check_online:                //log.Infof("Processing... topic=%s,match=%s,method=%s", topic, ota_check_online, "OTA_CHECK_ONLINE")                // DeviceStatusOnlineReq(client, device, replyId)        case ota_version_confirm:                //log.Debugf("o=%s,t=%s,method=%s", topic, ota_version_confirm, "OTA_VERSION_CONFORM")                // DeviceVersionInformReq(client, device, replyId)        case ota_version_notify:                //log.Debugf("o=%s,t=%s,method=%s", topic, ota_version_notify, "OTA_VERSION_NOTIFY")                // DevicePushVersionReq(client, device, replyId, string(msg))        case ota_check_launch:                log.Info(topic)        }}
复制代码
6.发布消息:
  1. func LaunchReq(client Client, device models.DevInfo) {        topic := "/aiot/" + device.ProductKey + "/" + device.DeviceName + "/running/launch"        // log.Info(topic)        msgInfo := models.MsgInfo{                Code:    10000,                Data:    "",                MsgId:   strconv.FormatInt(time.Now().Unix(), 10),                Time:    time.Now().UnixNano() / 1e6,                Version: "1.0",        }        jsonBytes, _ := json.Marshal(msgInfo)        if client != nil {                client.PublishUplink(topic, string(jsonBytes))                Publistcount++                now :=time.Now().Format("2006-01-02 15:04:05")                // db.Logfile(now +" | "+strconv.Itoa(Publistcount)+" | "+topic)                log.Info("published %d    "+topic, Publistcount)                db.InsertDB(device.DeviceName,now)        }}
复制代码
  模仿发布设备开机的消息
实际应用中大概更复杂,这里只介绍最根本的使用,欢迎有兴趣的童靴留言讨论

来源:https://blog.csdn.net/weixin_41479678/article/details/111992838
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则


专注素材教程免费分享
全国免费热线电话

18768367769

周一至周日9:00-23:00

反馈建议

27428564@qq.com 在线QQ咨询

扫描二维码关注我们

Powered by Discuz! X3.4© 2001-2013 Comsenz Inc.( 蜀ICP备2021001884号-1 )