Writing an MQTT client

Now that Mosquitto is running with authentication and TLS, it is time to write a small client that can send a payload to an MQTT topic. We will write the client in Go and keep it as simple as possible. If you have never worked with Go, go to https://golang.org/dl to download and install Go. The installation instructions are very easy to follow. You can use any supported operating system to follow along.

To connect to our MQTT server, we will use the Eclipse Paho MQTT Go client. To install it, issue the following command:

go get github.com/eclipse/paho.mqtt.golang

The client depends on two other packages. Install them with the following two commands:

go get golang.org/x/net/websocket
go get golang.org/x/net/proxy

To create the client, create a new folder under the GOPATH you configured during installation of Go and create a main.go file in it. Start with the following code:

package main

import (
    "crypto/tls"
    "fmt"
    "math/rand"
    "time"

    MQTT "github.com/eclipse/paho.mqtt.golang"
)

The import statement imports the packages used by the application. We will need the crypto/tls package to configure TLS options that we pass to Paho. More specifically, we will instruct Paho to skip verification steps for the certificate we installed in Mosquitto (see Installing Mosquitto). The fmt package is used to print some output to the sceen, math/rand to generate random floating point numbers and time to pause between sends. The most important package we use is the Paho MQTT package, aliased as MQTT above.

The main() function, the entry point for the application, is shown below. Replace <your-IP> and other such parameters with the text that is applicable to you.

func main() {
    server := "tcps://<your-IP>:8883"

    connOpts := MQTT.NewClientOptions()
    connOpts.AddBroker(server)
    connOpts.SetClientID("simulator")
    connOpts.SetCleanSession(true)
    connOpts.SetUsername("<your-admin>")
    connOpts.SetPassword("<your-password>")
    connOpts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true})

    client := MQTT.NewClient(connOpts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }
    fmt.Println("Connected to server ", server)

    for payload := range generate(time.Second, "simulator", "airq") {
        client.Publish("airq/antwerp/ahlers/asterix", byte(0), false, payload)
        fmt.Printf("Published message %s... Sleeping...\n", payload)
    }

}

We first specify connection options in connOpts. Note the SetTLSConfig line where we configure the MQTT client to not verify the certificates. That is OK in our scenario. With the connection options, we create a new client and then try to connect. When the connection fails, we panic and if not, we start publishing JSON data on a topic called airq/antwerp/ahlers/asterix.

Note the second parameter in the call to client.Publish(), which is the quality of service or QoS level. QoS 0 means the message will be delivered only once or not at all. It is the most unreliable transfer mode but it is ok to use if clients send data regularly and it is acceptable to miss a few data points. MQTT uses TCP as the transport protocol, so even with QoS set to 0, you still have TCP guarantees.

The payload comes from the following generator function:

func generate(sleep time.Duration, device string, devicetype string) <-chan string {
    c := make(chan string)
    go func() {
        r := rand.New(rand.NewSource(time.Now().UnixNano()))
        // forever generate data
        for {
            temperature := 20 + r.Float64()*10
            humidity := 40 + r.Float64()*10
            c <- fmt.Sprintf(`{"device": "%s", "type": "%s","temperature": %.2f , "humidity": %.2f}`,
                device, devicetype, temperature, humidity)
            time.Sleep(sleep)
        }
    }()

    return c
}

The generate() function takes a time.Duration like time.Second, a device and a device type. The function returns a channel. In the call to payload := range generate() we iterate over that channel indefinitely until we exit the program with CTRL-C. The channel we return is created in the generate() function where an anonymous goroutine takes care of generating the JSON payload as a string and sending it through the channel. Instead of using a generator, we could have just used a for loop inside main() to generate the payloads. I opted to use the generator with a channel because it is more fun and you see this pattern often in Go programs.

In the connection options, clean session is set to true. Clean session means a non persistent connection is used. In that case, Mosquitto does not store subscription information or undelivered messages for clients. When a client only publishes messages, this mode is ideal and lowers server load.

Depending on your scenario, you will want higher QoS levels like 1 or 2, potentially in combination with the clean session option. If you use QoS level 1, there is a possibility that messages are delivered more than once. In that scenario, it is crucial that your backend can handle duplicates. If the next chapters, we will write backend components that cannot handle duplication which leads to the recommendation of using only QoS 0 or QoS 2.

When you run the application with go run main.go you should see the following output:

With MQTT.fx, you can subscribe to mytopic to see the following output:

In the next steps, we want to persist the data we send in a database. There are many databases to choose from, from the traditional relational databases such as SQL Server or PostgreSQL to NoSQL databases such as Cassandra. We will use InfluxDB, a time series databases written in Go. Head on to the next section to find out more!

results matching ""

    No results matching ""