FaaS to the Rescue

In the previous chapter we configured a Particle Electron to send temperature and humidity data to Particle's Device Cloud. Besides interfacing with the sensor, all it took was a simple call to Particle.publish:

Particle.publish("os-iot", output, 60, PRIVATE);

Since the system we developed uses the Mosquitto MQTT server for ingestion, we need to find a way to forward the data from Device Cloud to Mosquitto. In this chapter, we will configure a Particle integration that uses a webhook to do just that. Using a webhook requires a service we can HTTP POST to. Although we could write a simple HTTP service in Go or any other language, we will use a Function as a Service (FaaS) platform for Kubernetes to host such a function. The FaaS platform we will use is Kubeless, a Bitnami project. More information about Kubeless can be found at http://kubeless.io/.

We will first install Kubeless, then create the function, securely publish it and configure the Particle integration.

Kubeless Installation

Installing Kubeless is not complicated at all. Head over to http://kubeless.io/docs/quick-start and follow the instructions. I installed Kubeless on GKE and used the kubeless-rbac-$RELEASE.yaml with $RELEASE set to v0.4.0. Also install the kubeless CLI from https://github.com/kubeless/kubeless/releases.

With Kubeless and the CLI installed, issue the following command:

kubeless function ls

Kubeless uses your current kubectl context to reach out to your cluster and ask for a list of installed functions. I already installed the mqtt-pusher function and a test function so my response is:

NAME            NAMESPACE       HANDLER                 RUNTIME         TYPE    TOPIC   DEPENDENCIES    STATUS
get-python      default         test.foobar             python2.7       HTTP                            1/1 READY
mqtt-pusher     default         mqtt-pusher.mqtt        nodejs6         HTTP            mqtt: ^2.16.0   1/1 READY

In the next sections, we will add the mqtt-pusher function step-by-step. As you can see, we will use Node.js for the function. Kubeless supports other runtimes such as Python, Ruby and .NET Core 2.0. There's also a custom runtime which lets you use any container image with any language or binary.

Next to the kubeless CLI, there is also a kubeless UI over at https://github.com/kubeless/kubeless-ui. If you follow the instructions on GitHub and deploy the UI, you can port-forward to the UI as follows:

kubectl port-forward <kubeless-UI-pod> 3000 -n namespace

Note that it is a best practice to use namespaces for applications and supporting infrastructure. I installed kubeless and kubeless-ui in the kubeless namespace.

The kubeless-ui interface is pretty basic but does the job:

Similar to kubeless functions ls, all functions are listed. In addition, the UI shows the source code of the function and allows you to test the function directly from the UI.

Creating the mqtt-pusher function

To deploy a function, you use the kubeless function deploy command. If you have the function code in a file called mqtt-pusher.js, you create the function with the following command:

kubeless function deploy mqtt-pusher --runtime nodejs6 --handler mqtt-pusher.mqtt \ 
   --from-file mqtt-pusher.js --trigger-http

The above command creates a function called mqtt-pusher. The function is written in Node.js so we set the runtime accordingly. The function is triggered by a HTTP GET or POST so we set the trigger to HTTP. The handler is a combination of the file name (mqtt-pusher from mqtt-pusher.js) and the function in the source code. Indeed, inside the source code you will find the following exported function:

module.exports = {
  mqtt: (req, res) => {
        your code...
  },
};

req and res refer to the HTTP request and response object. You will use those parameters to interact with the HTTP runtime. The full code of the function can be found below:

var mqtt = require('mqtt') 
module.exports = {
  mqtt: (req, res) => {
    // log JSON body
    console.log(req.body);

    // mqtt setup
    var client = mqtt.connect('mqtts://' + process.env.MQTT_MOSQUITTO_SERVICE_HOST, {
            username: process.env.MQTTUSER,
            password: process.env.MQTTPASS,
            rejectUnauthorized: false
    })

    client.on('connect', function () {
          console.log("client connected");

        // convert data in req.body to JSON
        var data = JSON.parse(req.body.data);

          var topic = data.topic;

          var payload = {
              temperature: data.temperature,
              humidity: data.humidity,
              device: data.device,
              type: data.type
          }

        client.publish(topic, JSON.stringify(payload));
        res.end()

    })
  },
};

The code above uses the mqtt Node.js module. We will need to instruct Kubeless to grab that module with npm (Node Package Manager) before it runs our code. In addition, the code also uses environment variables, grabbed with process.env. The MQTT_MOSQUITTO_SERVICE_HOST environment variable is automatically set by Kubernetes. The MQTTUSER and MQTTPASS variables will need to be supplied to kubeless. The following command specifies dependencies and environment variables:

kubeless function deploy mqtt-pusher --runtime nodejs6 --handler mqtt-pusher.mqtt 
--from-file mqtt-pusher.js --trigger-http --dependencies package.json --env MQTTUSER=<your-user>
--env MQTTPASS=<your-password>

Like regular Node.js applications, the dependencies are specified in package.json. Create that file in the same folder that contains mqtt-pusher.js. It should have the following content:

{
  "name": "mqtt-app",
  "version": "1.0.0",
  "description": "mqtt client app",
  "main": "index.js",
  "author": "author",
  "license": "ISC",
  "dependencies": {
    "mqtt": "^2.16.0"
  }
}

You can also run npm init and follow the prompts. After npm init creates package.json, use npm install mqtt --save to install the package on your local machine and save the dependency to package.json.

You now have all the files you need to create the mqtt-pusher function with kubeless function deploy. After you create the function, you will notice that an extra pod is running. It has labels like created-by=kubeless and function=mqtt-pusher. To list that pods that have a function label:

kubectl get pods -l function

NAME                           READY     STATUS    RESTARTS   AGE
get-python-749f7fb9cb-nscwt    1/1       Running   2          17d
mqtt-pusher-668478885f-s482c   1/1       Running   3          10d

The above output shows two function pods. When your function code has an unhandled exception, the pod will exit and restart. From the above input, it is clear that mqtt-pusher experienced a few unhandled exceptions. Indeed, the mqtt-pusher code is extremely light on exception handling! Sorry about that!

As always, you can obtain the logs from the pod with:

kubectl logs mqtt-pusher-668478885f-s482c -f

The flag -f is used to follow the log. Following the log means you will see new log entries as they appear. If you use kubeless-ui, use the console button to see the logs:

When the Particle Electron is active and sending data, it is visible in the console output:

client connected
{ event: 'os-iot',
  data: '{"humidity":21.00,"temperature":25.60,"device":"particle_electron","type":"airqmobile",
    "topic":"airq/hamme/home/desk"}',
  published_at: '2018-03-21T10:45:09.572Z',
  coreid: '53002e001451343334363036' }

mqtt-pusher in more detail

In the previous section, the mqtt-pusher code was provided without further explanation. In this section, we will discuss the code but also make some changes. The MQTT connection logic will be moved outside the exported function. There is no need to call mqtt.connect whenever the HTTP handler is invoked:

var mqtt = require('mqtt')

// mqtt setup
var client = mqtt.connect('mqtts://' + process.env.MQTT_MOSQUITTO_SERVICE_HOST, {
       username: process.env.MQTTUSER,
       password: process.env.MQTTPASS,
       rejectUnauthorized: false
})

// just log client connected
client.on('connect', function () {
      console.log("client connected");
})

module.exports = {
  mqtt: (req, res) => {
    // log JSON body
    console.log(req.body);

    // convert data in req.body.data to JSON
    var data = JSON.parse(req.body.data);
      var topic = data.topic;
    if (topic) {
        var payload = {
                    temperature: data.temperature,
                    humidity: data.humidity,
                    device: data.device,
                    type: data.type
                }
        client.publish(topic, JSON.stringify(payload));
    }
    res.end()

    }
}

mqtt.connect simply needs to connect to our Mosquitto server that is running in the cluster. In the connection string, we use mqtts:// to indicate the use of TLS over the standard 8883 port. We configured MQTT with a username and password so we need to pass the username and password from our environment variables. Notice the use of rejectUnauthorized. That option is required because we do not want to verify the validity of the TLS certificate.

Inside the handler, we expect the request body to contain JSON. The JSON sent by our device, will be a field in the JSON sent by the Particle integration. That field is called data and we extract that from req.body. The device sends the MQTT topic it wants to use as data.topic so we extract that value. If topic exists, we publish the expected JSON to the MQTT topic.

To edit the code, you can do so from kubeless-ui. You can also modify mqtt-pusher.js, delete mqtt-pusher and recreate the function with kubeless function create. If there errors in your code, the pod will fail to deploy. Use kubectl logs or the log viewer in kubeless-ui to check for errors.

Accessing the function

To allow the Particle integration to function, it needs to call the Kubeless function from the Internet securely. In this section, we will see how to publish the function and provide a TLS certificate.

Kubeless can configure this for you with kubeless route create. It even has an option to enable automatic certificate generation with --enableTLSAcme. Instead of using this option, we will expose the Kubernetes services via our own Ingress controller and our cert-manager implementation.

When you create the mqtt-pusher function, you also create an mqtt-pusher service on port 8080 by default:

kubectl get svc -l function

NAME          CLUSTER-IP      EXTERNAL-IP   PORT(S)    AGE
get-python    10.43.254.11    <none>        8080/TCP   17d
mqtt-pusher   10.43.253.237   <none>        8080/TCP   1h

As discussed in Adding an Ingress Controller, we can create an Ingress resource to access this service via our nginx Ingress Controller. Create the following Ingress resource:

apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: mqttpush-domain-com
  annotations:
    kubernetes.io/ingress.class: nginx
spec:
 rules:
   - host: mqttpush.domain.com
     http:
      paths:
      - backend:
          serviceName: mqtt-pusher
          servicePort: 8080
 tls:
  - hosts:
    - mqttpush.domain.com
    secretName: mqttpush-domain-com-tls

Note that I used domain.com above. You should replace domain.com with your own domain and setup a wildcard A record (*.domain.com) that points to the external IP of the Ingress Controller. The wildcard ensures that mqttpush.domain.com is resolved to the Ingress Controller. In turn, the Ingress Controller forwards the traffic to the Kubernetes service (backend in the spec).

secretName refers to a secret that contains a .crt and .key file. The secret does not need to exist at the moment you create the Ingress resource. To actually create the secret and certificate, we will use cert-manager from the Adding an Ingress Controller chapter. Create the certificate with the YAML below:

apiVersion: certmanager.k8s.io/v1alpha1
kind: Certificate
metadata:
  name: mqttpush-domain-com
  namespace: default
spec:
  secretName: mqttpush-domain-com-tls
  issuerRef:
    name: letsencrypt-prod
  commonName: mqttpush.domain.com
  dnsNames:
  - mqttpush.domain.com
  acme:
    config:
    - http01:
       ingress: mqttpush-domain-com
      domains:
      - mqttpush.domain.com

When you submit the above YAML file to Kubernetes, cert-manager should request the certificate from the Let's Encrypt production issuer and store the .crt and .key file in the secret referenced by secretName. You should now be able to call https://mqttpush.domain.com securely. We will do that from the Particle Integration

Configuring the Particle integration

In the Particle Console, select Integration at the left and click New Integration. Select the Webhook type. Fill in the fields as below:

In the Particle firmware, we publish the os-iot event so that needs to match the event name. The URL is the Kubeless function we just created and we will perform an HTTP POST with the JSON in the request body. Although it is possible to provide a device filter, we will accept events from any device.

After the integration is created, you can open its properties for more details.

When you click a log entry, you obtain even more details:

The source event that triggered the webhook is the JSON that ends up at our Kubeless function. As you can see, the JSON contains a data field which contains the JSON we send from our device. Particle performs an HTTP request using the POST method. The response sent by our Kubeless function actually comes from our Ingress Controller (Server: nginx) but the function itself is served by the Node.js Express framework (X-Powered-By: Express) used under the hood by the Kubeless Node.js runtime. We do not send an integration response (undefined) but if we did, we could catch it on the device by setting up a handler:

void setup() {
  // Subscribe to the integration response event
  Particle.subscribe("hook-response/os-iot", myHandler, MY_DEVICES);
}

void myHandler(const char *event, const char *data) {
  // Handle the integration response
}

Metrics

If you take a look at your function logs, you will notice log entries about connections to /healthz and /metrics. /healthz is used by Kubernetes for monitoring the liveness and readiness of your pod and /metrics is used by Prometheus:

::ffff:10.40.10.1 - - [03/Mar/2018:22:40:24 +0000] "GET /healthz HTTP/1.1" 200 2 "-" "kube-probe/1.8+"
::ffff:10.40.3.11 - - [03/Mar/2018:22:40:30 +0000] "GET /metrics HTTP/1.1" 200 2385 "-" "Prometheus/2.1.0"
::ffff:10.40.10.1 - - [03/Mar/2018:22:40:54 +0000] "GET /healthz HTTP/1.1" 200 2 "-" "kube-probe/1.8+"

In the Prometheus UI, you can query the amount of function calls for instance:

This concludes our brief foray into Kubeless and FaaS territory. You have seen how to write a simple Kubeless function that acts as a bridge between the Particle Device Cloud and Mosquitto. If you are often in need of smaller functions to run reliably in a Kubernetes environment, think of a FaaS platform as a potential solution. In this chapter, we used Kubeless but there are many others like OpenFaaS and Fission.

results matching ""

    No results matching ""