This repository has been archived by the owner on Oct 29, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
publisher.go
138 lines (120 loc) · 3.63 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Copyright 2017 Fraunhofer Institute for Applied Information Technology FIT
package main
import (
MQTT "github.com/eclipse/paho.mqtt.golang"
"log"
"time"
"encoding/json"
"github.com/satori/go.uuid"
"code.linksmart.eu/sc/service-catalog/catalog"
SC "code.linksmart.eu/sc/service-catalog/client"
)
type SensorThingPayload struct {
Result string `json:"result,omitempty"`
Time string `json:"phenomenonTime,omitempty"`
}
type SensorThingTopic struct {
SensorID string
AreaID string
SensorName string
}
type Publisher struct {
toPublish chan AgentResponse
status2Publish chan AgentStatus
brokerUrl string
id string
stop chan bool
manager *AgentManager
}
type AgentStatus struct {
status bool
agentName string
}
func newPublisher(aConfig LSTPConfig) *Publisher {
publisher := &Publisher{
brokerUrl: aConfig.Broker,
toPublish: make (chan AgentResponse),
status2Publish: make(chan AgentStatus),
id: aConfig.Id,
stop: make (chan bool),
}
return publisher
}
func (p* Publisher) stopPublisher(){
p.stop<-true
}
func (p *Publisher) startPublisher(am *AgentManager) {
p.manager = am
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
log.Print("[Publisher:MessageHandler] TOPIC: %s\n", msg.Topic())
log.Print("[Publisher:MessageHandler] MSG: %s\n", msg.Payload())
}
opts := MQTT.NewClientOptions()
log.Println("[Publisher:startPublisher] Using MQTT broker: ", p.brokerUrl)
opts.AddBroker(p.brokerUrl)
opts.SetClientID(p.id)
opts.SetDefaultPublishHandler(f)
client := MQTT.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Panic(token.Error())
}
defer client.Disconnect(250)
log.Println("[Publisher:startPublisher] Connected to : ", p.brokerUrl)
if (am.mConfig.ServiceRegistration==true) {
service := catalog.Service{
ID: uuid.NewV4().String(),
Name: "_linksmart-tp._tcp",
Description: "A publishing device connector",
APIs: map[string]string{"MQTT API Specs": am.mConfig.Broker},
Docs: []catalog.Doc{{
Description: "MQTT API of the ThingPublsher service",
URL: "",
Type: "application/asyncapi+json;version=1.0",
APIs: []string{"MQTT API Specs"},
}},
TTL: 120,
}
stopRegistrator, _ := SC.RegisterServiceAndKeepalive(am.mConfig.ServiceCatalog, service, nil)
defer stopRegistrator()
}else{
log.Println("[Publisher:startPublisher] Service registration disabled.")
}
go func() {
log.Println("[Publisher:startPublisher] Payload Publisher started.")
for {
data:=<-p.toPublish
// create payload/topic and publish
go func(){
payload := SensorThingPayload{string(data.Payload),time.Now().UTC().Format(time.RFC3339)}
payloadJSON, err := json.Marshal(payload)
if err != nil {
log.Println("[Publisher:payloadloop] Error: %s", err)
return;
}
topic := p.manager.mConfig.Prefix+"Datastreams(" + data.AgentId + ")/" + p.manager.things[data.AgentId].Datastreams[0].Sensor.Description
token := client.Publish(topic, 1, false, payloadJSON)
token.Wait()
}()
}
}()
go func() {
log.Println("[Publisher:startPublisher] Status Publisher started.")
for {
status:=<-p.status2Publish
// create payload/topic and publish
go func(){
payload := ""
if status.status{
payload = "{ \"status\" : \"running\" }"
}else{
payload ="{ \"status\" : \"not available\" }"
}
topic := p.manager.mConfig.Prefix+"thing/"+status.agentName
token := client.Publish(topic, 1, false, payload)
token.Wait()
}()
}
}()
<-p.stop
log.Println("[Publisher:startPublisher] Publisher stopped.")
}