diff --git a/container/application/ble_data.go b/container/application/ble_data.go deleted file mode 100644 index 9f1c486..0000000 --- a/container/application/ble_data.go +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright 2021 Hewlett Packard Enterprise (HPE) -// -// Licensed under the MIT License; -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.github.com/aruba-iotops-example-ble/LICENSE -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -type BleFrameType int32 - -type BleData struct { - // ble data - Data []byte - FrameType *BleFrameType - // device mac address - Mac string - // Received Signal Strength Indication - Rssi int32 - // AP mac address - ApMac string -} - -type IBeaconData struct { - DeviceClass string - UUID string - Major string - Minor string - Power string -} diff --git a/container/application/http_client.go b/container/application/http_client.go index 2eaacf8..5491520 100644 --- a/container/application/http_client.go +++ b/container/application/http_client.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.github.com/aruba-iotops-example-ble/LICENSE +// http://www.github.com/aruba-iotops-example-ble/LICENSE // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -16,6 +16,7 @@ package main import ( "bufio" "context" + "encoding/json" "log" "net/http" "time" @@ -30,7 +31,7 @@ func NewHTTPClient(url, apiKey, method string) *HTTPClient { URL: url, APIKey: apiKey, Method: method, - dataCh: make(chan []byte, 1), + dataCh: make(chan *BleData, 1), } } @@ -38,13 +39,13 @@ type HTTPClient struct { URL string APIKey string Method string // HTTP Method: GET/POST/HEAD/OPTIONS/PUT/PATCH/DELETE/TRACE/CONNECT - dataCh chan []byte + dataCh chan *BleData } // Connect establish an HTTP connection. // response data will be put into filed "dataCh". func (c *HTTPClient) Connect(ctx context.Context) { - log.Default().Println("Http request, url: " + c.URL + " ; apiKey: " + c.APIKey) + log.Println("Http request, url: " + c.URL + " ; apiKey: " + c.APIKey) req, _ := http.NewRequestWithContext(ctx, c.Method, c.URL, nil) req.Header.Set("apikey", c.APIKey) @@ -59,7 +60,7 @@ func (c *HTTPClient) Connect(ctx context.Context) { }() if err != nil { - log.Default().Println("HTTP request error!") + log.Println("HTTP request error!") // If connect failed, will retry after 1 second <-time.After(1 * time.Second) @@ -68,7 +69,7 @@ func (c *HTTPClient) Connect(ctx context.Context) { return } - reader := bufio.NewReader(resp.Body) + scanner := bufio.NewScanner(resp.Body) go func() { defer func() { @@ -77,24 +78,41 @@ func (c *HTTPClient) Connect(ctx context.Context) { } }() - for { - line, err := reader.ReadBytes('\n') - if err != nil { - log.Default().Println(err.Error()) - - return - } - - if len(line) > 0 { - c.dataCh <- line + for scanner.Scan() { + //log.Println(scanner.Text()) + bleData := bleDataFromResult(scanner.Bytes()) + if bleData != nil { + c.dataCh <- bleData } } + if err := scanner.Err(); err != nil { + log.Println(err.Error()) + } }() } -// GetDataCh return HTTP client filed "dataCh". -// HTTP response data will be put into this field "dataCh". -// method in "process_ble_data.go" file will consume this data from data channel. -func (c *HTTPClient) GetDataCh() <-chan []byte { +func bleDataFromResult(aResult []byte) *BleData { + bleData := new(BleData) + err := json.Unmarshal(aResult, bleData) + if err != nil { + return nil + } + return bleData +} + +// GetDataCh returns the streaming ble data channel +func (c *HTTPClient) GetDataCh() <-chan *BleData { return c.dataCh } + +type BleData struct { + Result struct { + Mac string `json:"mac"` + ApMac string `json:"apMac"` + Payload []byte `json:"payload"` + Rssi int `json:"rssi"` + FrameType string `json:"frameType"` + RadioMac string `json:"radioMac"` + MacAddressType string `json:"macAddressType"` + } `json:"result"` +} diff --git a/container/application/main.go b/container/application/main.go index 3deaaa6..cda484c 100644 --- a/container/application/main.go +++ b/container/application/main.go @@ -15,12 +15,11 @@ package main import ( "context" + "encoding/hex" + "encoding/json" "log" "net/http" "os" - "strings" - - "github.com/google/uuid" ) // IoT Operations example ble app is a demonstration of how to interact with HPE IoT Operations infrastructure service. @@ -41,48 +40,53 @@ func main() { // Example: http://apiGwUrl/$(api-method) apiGwURL := os.Getenv("APIGW_URL") - // clientID is used to identify your MQTT connections. - // The value should not be the same as the value in the MQTT web page - // (MQTT web page : http://www.hivemq.com/demos/websocket-client/). - clientID := strings.ReplaceAll(uuid.New().String(), "-", "") + // mqtt client + mqttClient := NewMqttClient() + mqttClient.Connect() - // MQTT url - serverURL := "wss://test.mosquitto.org:8091/mqtt" + // bleAPIURL: example app will get data from HPE IoT Operations infrastructure services through this API url + bleAPIURL := "http://" + apiGwURL + "/api/v3/ble/stream/packets" + httpClient := NewHTTPClient(bleAPIURL, apiKey, http.MethodGet) + httpClient.Connect(context.Background()) - // MQTT username/password - userName := "rw" - password := "readwrite" + ProcessBleData(httpClient.GetDataCh(), mqttClient.GetPubDataCh()) +} - // MQTT publish data topic. - // default topic name is "app2broker_topic". - // IoT Operations data will be sent into this topic, - // you can subscribe this topic in the MQTT web page. - pubTopic := os.Getenv("APP_TO_BROKER_TOPIC") - if pubTopic == "" { - pubTopic = "app2broker_topic" - } +const minBleDataLen = 30 - // MQTT subscribe data topic. - // default topic name is "broker2app_topic" - // you can send data into this topic through MQTT web page, - // Example app will accept data from that topic. - subTopic := os.Getenv("BROKER_TO_APP_TOPIC") - if subTopic == "" { - subTopic = "broker2app_topic" - } +type IBeaconData struct { + DeviceClass string + UUID string + Major string + Minor string + Power string +} - log.Println("Example app start") +// ProcessBleData get data from HPE IoT Operations infrastructure service. +// then decode and decorate and put data into data channel, +// data channel will be consumed by MQTT client. +func ProcessBleData(httpDataCh <-chan *BleData, mqttDataCh chan<- string) { + for bleData := range httpDataCh { + payload := bleData.Result.Payload + if len(payload) < minBleDataLen { + continue + } - // mqtt client - mqttClient := NewMqttClient(serverURL, userName, password, clientID, pubTopic, subTopic) - mqttClient.Connect() + // below is to convert iBeacon byte data to iBeacon string data. + // you need to overwrite this code when you decode your device data. + // note: field "data" is hexadecimal byte array. + // If you want to get string data. please process it with method hex.EncodeToString([]byte) + iBeaconData := &IBeaconData{ + DeviceClass: "iBeacon", + UUID: hex.EncodeToString(payload[9:25]), + Major: hex.EncodeToString(payload[25:27]), + Minor: hex.EncodeToString(payload[27:29]), + Power: hex.EncodeToString(payload[29:30]), + } + iBeacon, _ := json.Marshal(iBeaconData) - // bleAPIURL: example app will get data from HPE IoT Operations infrastructure services through this API url - bleAPIURL := "http://" + apiGwURL + "/api/v2/ble/stream/packets" - httpClient := NewHTTPClient(bleAPIURL, apiKey, http.MethodGet) - httpClient.Connect(context.Background()) + log.Println("iBeacon uuid: " + iBeaconData.UUID) - // bleClient: process ble data - bleClient := NewBleClient() - bleClient.ProcessBleData(httpClient.GetDataCh(), mqttClient.GetPubDataCh()) + mqttDataCh <- string(iBeacon) + } } diff --git a/container/application/main_test.go b/container/application/main_test.go new file mode 100644 index 0000000..05cef14 --- /dev/null +++ b/container/application/main_test.go @@ -0,0 +1,87 @@ +// Copyright 2021 Hewlett Packard Enterprise (HPE) +// +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.github.com/aruba-iotops-example-ble/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package main + +import ( + "context" + "encoding/base64" + "encoding/hex" + "encoding/json" + "fmt" + "log" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +const iBeaconRawHex = "0201041AFF4C000215F7826DA64FA24E988024BC5B71E0893E00000000C5" + +func TestExampleApp(t *testing.T) { + server := SEServerMock() + + // http client + log.Println("Request Ble data and transfer it to a third party server") + + BleRequestURL := server.URL + "/api/v3/ble/stream/packets" + httpClient := NewHTTPClient(BleRequestURL, "", http.MethodGet) + httpClient.Connect(context.Background()) + + mqttDataCh := make(chan string, 1) + + // bleClient process ble data + go ProcessBleData(httpClient.GetDataCh(), mqttDataCh) + + iBeaconData := &IBeaconData{} + + go func() { + result := <-mqttDataCh + _ = json.Unmarshal([]byte(result), iBeaconData) + }() + + <-time.After(20 * time.Millisecond) + + if strings.ToUpper(iBeaconData.UUID) != iBeaconRawHex[18:50] { + t.Error("Get iBeacon data failed") + } +} + +func SEServerMock() *httptest.Server { + log.Println("start HTTP server. send Ble data to client.") + // mock data + bleDataMock, _ := hex.DecodeString(iBeaconRawHex) + payload := base64.StdEncoding.EncodeToString(bleDataMock) + + testData := fmt.Sprintf(`{"result":{"mac":"dc:a6:32:3f:1f:33","apMac":"ff:ff:2c:5d:94:9f","payload":"%s","rssi":-46,"frameType":"BLE_FRAME_TYPE_ADV_IND","radioMac":"ff:11:df:f5:ba:b1","macAddressType":"BLE_MAC_ADDRESS_TYPE_PUBLIC"}}`, payload) + + // HTTP server + server := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + if request.URL.EscapedPath() != "/api/v3/ble/stream/packets" { + _, _ = fmt.Fprintf(writer, "Reqeust path error") + } + if request.Method != http.MethodGet { + _, _ = fmt.Fprintf(writer, "Request method error") + } + + flusher, _ := writer.(http.Flusher) + + writer.Write(append([]byte(testData), []byte("\n")...)) + flusher.Flush() + + time.Sleep(1 * time.Second) + })) + + return server +} diff --git a/container/application/mqtt_client.go b/container/application/mqtt_client.go index 10ba36c..4289d5d 100644 --- a/container/application/mqtt_client.go +++ b/container/application/mqtt_client.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.github.com/aruba-iotops-example-ble/LICENSE +// http://www.github.com/aruba-iotops-example-ble/LICENSE // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -16,9 +16,12 @@ package main import ( "crypto/tls" "log" + "os" + "strings" "time" mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/google/uuid" ) const mqttVersion = 4 @@ -28,8 +31,37 @@ const mqttVersion = 4 // It has two fields: PubDataCh、SubDataCh. // PubDataCh: put data into this field, then data will be sent to MQTT broker. // SubDataCh: get data from MQTT broker, then put data into this field. -func NewMqttClient(url string, userName string, password string, - clientID string, pubTopic string, subTopic string) *MqttClient { +func NewMqttClient() *MqttClient { + // clientID is used to identify your MQTT connections. + // The value should not be the same as the value in the MQTT web page + // (MQTT web page : http://www.hivemq.com/demos/websocket-client/). + clientID := strings.ReplaceAll(uuid.New().String(), "-", "") + + // MQTT url + url := "wss://test.mosquitto.org:8091/mqtt" + + // MQTT username/password + userName := "rw" + password := "readwrite" + + // MQTT publish data topic. + // default topic name is "app2broker_topic". + // IoT Operations data will be sent into this topic, + // you can subscribe this topic in the MQTT web page. + pubTopic := os.Getenv("APP_TO_BROKER_TOPIC") + if pubTopic == "" { + pubTopic = "app2broker_topic" + } + + // MQTT subscribe data topic. + // default topic name is "broker2app_topic" + // you can send data into this topic through MQTT web page, + // Example app will accept data from that topic. + subTopic := os.Getenv("BROKER_TO_APP_TOPIC") + if subTopic == "" { + subTopic = "broker2app_topic" + } + return &MqttClient{ URL: url, // MQTT broker url userName: userName, @@ -55,7 +87,7 @@ type MqttClient struct { // Connect establish a MQTT connection to publish and subscribe to MQTT broker. func (c *MqttClient) Connect() { - log.Default().Println("mqtt url : " + c.URL + " ; clientId :" + + log.Println("mqtt url : " + c.URL + " ; clientId :" + c.ClientID + " ; pubTopic: " + c.PubTopic + " ; subtopic: " + c.SubTopic) opts := mqtt.NewClientOptions(). @@ -69,20 +101,20 @@ func (c *MqttClient) Connect() { c.SubDataCh <- message.Payload() }) opts.OnConnect = func(client mqtt.Client) { - log.Default().Println("Mqtt Connected") + log.Println("Mqtt Connected") } opts.OnConnectionLost = func(client mqtt.Client, err error) { - log.Default().Printf("Connect lost: %v", err) + log.Printf("Connect lost: %v", err) } client := mqtt.NewClient(opts) token := client.Connect() if token.Wait() && token.Error() != nil { - log.Default().Println("mqtt connected fail! ") + log.Println("mqtt connected fail! ") if token.Error() != nil { - log.Default().Println(token.Error().Error()) + log.Println(token.Error().Error()) } // If connect failed, will retry after 1 second diff --git a/container/application/mqtt_client_test.go b/container/application/mqtt_client_test.go index f7a5263..5611e5f 100644 --- a/container/application/mqtt_client_test.go +++ b/container/application/mqtt_client_test.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.github.com/aruba-iotops-example-ble/LICENSE +// http://www.github.com/aruba-iotops-example-ble/LICENSE // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -20,42 +20,25 @@ import ( ) func TestMqttClient(t *testing.T) { - t.Parallel() - - mqttClient := NewMqttClient("wss://test.mosquitto.org:8091/mqtt", "rw", "readwrite", - "random_client", "iotops_topic", "iotops_topic") - - tests := []struct { - name string - mqttClient *MqttClient - }{ - {name: "pub and sub", mqttClient: mqttClient}, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - mqttClient.Connect() - - // pub - text := "iotops pub message : " + time.Now().String() - mqttClient.GetPubDataCh() <- text - - // sub - var data []byte - - go func() { - data = <-mqttClient.GetSubDataCh() - }() - - // verify - <-time.After(500 * time.Millisecond) - - if !strings.Contains(string(data), "iotops pub message") { - t.Error("mqtt publish data failed") - } - }) + t.Setenv("APP_TO_BROKER_TOPIC", "iotops_topic") + t.Setenv("BROKER_TO_APP_TOPIC", "iotops_topic") + mqttClient := NewMqttClient() + mqttClient.Connect() + + // pub + text := "iotops pub message : " + time.Now().String() + mqttClient.GetPubDataCh() <- text + + // sub + var data []byte + go func() { + data = <-mqttClient.GetSubDataCh() + }() + + // verify + <-time.After(500 * time.Millisecond) + + if !strings.Contains(string(data), "iotops pub message") { + t.Error("mqtt publish data failed") } } diff --git a/container/application/process_ble_data.go b/container/application/process_ble_data.go deleted file mode 100644 index 6a9f83e..0000000 --- a/container/application/process_ble_data.go +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright 2021 Hewlett Packard Enterprise (HPE) -// -// Licensed under the MIT License; -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.github.com/aruba-iotops-example-ble/LICENSE -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package main - -import ( - "encoding/hex" - "encoding/json" - "log" - "strings" -) - -const minBleDataLen = 30 - -// NewBleClient is to get data from HPE IoT Operations infrastructure service and decode data. -// It will send HTTP request to get data. -// After get response data, it will decode data and decorate data, then put data into data channel. -func NewBleClient() *BleClient { - return &BleClient{} -} - -type BleClient struct{} - -// ProcessBleData get data from HPE IoT Operations infrastructure service. -// then decode and decorate and put data into data channel, -// data channel will be consumed by MQTT client. -func (c *BleClient) ProcessBleData(httpDataCh <-chan []byte, mqttDataCh chan<- string) { - // the structure of data is : `data:{"key":"value"}` - // below code will replace `data:` with "", leaving only json structured data. - for data := range httpDataCh { - if strings.Contains(string(data), "data:") { - str := strings.ReplaceAll(string(data), "data:", "") - str = strings.ReplaceAll(str, "\n", "") - - // below is to convert iBeacon byte data to iBeacon string data. - // you need to overwrite this code when you decode your device data. - // note: field "data" is hexadecimal byte array. - // If you want to get string data. please process it with method hex.EncodeToString([]byte) - bleData := &BleData{} - _ = json.Unmarshal([]byte(str), bleData) - - if len(bleData.Data) < minBleDataLen { - continue - } - - iBeaconData := &IBeaconData{ - DeviceClass: "iBeacon", - UUID: hex.EncodeToString(bleData.Data[9:25]), - Major: hex.EncodeToString(bleData.Data[25:27]), - Minor: hex.EncodeToString(bleData.Data[27:29]), - Power: hex.EncodeToString(bleData.Data[29:30]), - } - iBeacon, _ := json.Marshal(iBeaconData) - - log.Default().Println("iBeacon uuid: " + iBeaconData.UUID) - - mqttDataCh <- string(iBeacon) - } - } -} diff --git a/container/application/process_ble_data_test.go b/container/application/process_ble_data_test.go deleted file mode 100644 index 7b6fd5b..0000000 --- a/container/application/process_ble_data_test.go +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright 2021 Hewlett Packard Enterprise (HPE) -// -// Licensed under the MIT License; -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.github.com/aruba-iotops-example-ble/LICENSE -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package main - -import ( - "context" - "encoding/hex" - "encoding/json" - "fmt" - "log" - "net/http" - "net/http/httptest" - "strings" - "testing" - "time" -) - -func TestExampleApp(t *testing.T) { - t.Parallel() - - tests := []struct { - name string - }{ - {name: "SE request"}, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - server := SEServerMock() - - // http client - log.Default().Println("Request Ble data and transfer it to a third party server") - - BleRequestURL := server.URL + "/api/v2/ble/stream/packets" - httpClient := NewHTTPClient(BleRequestURL, "", http.MethodGet) - httpClient.Connect(context.Background()) - - mqttDataCh := make(chan string, 1) - - // bleClient process ble data - go NewBleClient().ProcessBleData(httpClient.GetDataCh(), mqttDataCh) - - iBeaconData := &IBeaconData{} - - go func() { - result := <-mqttDataCh - _ = json.Unmarshal([]byte(result), iBeaconData) - }() - - <-time.After(20 * time.Millisecond) - - if strings.ToUpper(iBeaconData.UUID) != "F7826DA64FA24E988024BC5B71E0893E" { - t.Error("Get iBeacon data failed") - } - }) - } -} - -func SEServerMock() *httptest.Server { - log.Default().Println("start HTTP server. send Ble data to client.") - // mock data - var frameType BleFrameType = 3 - - bleDataMock, _ := hex.DecodeString("0201041AFF4C000215F7826DA64FA24E988024BC5B71E0893E00000000C5") - - // ble structure: 0201041AFF 4C00 02 15 4152554EF94A3B869470706978210A00(UUID) 0000(Major) 0000(Minor) C8(power) - bleData := &BleData{ - Mac: "50:31:ad:02:5c:93", - Data: bleDataMock, - Rssi: -57, - FrameType: &frameType, - ApMac: "11:22:33:44:55:66", - } - bleDataJSON, _ := json.Marshal(bleData) - testData := "data:" + string(bleDataJSON) + "\n" - - // HTTP server - server := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - if request.URL.EscapedPath() != "/api/v2/ble/stream/packets" { - _, _ = fmt.Fprintf(writer, "Reqeust path error") - } - if request.Method != http.MethodGet { - _, _ = fmt.Fprintf(writer, "Request method error") - } - - flusher, _ := writer.(http.Flusher) - - _, _ = fmt.Fprint(writer, testData) - flusher.Flush() - - time.Sleep(1 * time.Second) - })) - - return server -}