REST API to InfluxDB
In this chapter, we will create a REST API that you can query to retrieve a batch of recent data points. You will be able to ask for the last x data points like so:
curl http://<your-IP>/measurements/last/2
[{"device":"simulator","humidity":48.42,"room":"asterix","temperature":31.15,"time":"2018-02-04T21:58:16Z"},
{"device":"simulator","humidity":42.44,"room":"asterix","temperature":31.78,"time":"2018-02-04T21:58:15Z"}]
Instead of curl, you can also use your browser. Alternatively, you can query data points created since the last x seconds:
curl http://<your-IP>/measurements/since/2
[{"device":"simulator","humidity":45.56,"room":"asterix","temperature":26.83,"time":"2018-02-04T21:59:45Z"},
{"device":"simulator","humidity":43.07,"room":"asterix","temperature":25.42,"time":"2018-02-04T21:59:46Z"}]
If you have Excel, you can use Data - From Web to import the data. You will have to apply some transforms to get the data to look like below:
In a later chapter, we will also add a mobile application with Ionic:
Note that initially, we will not add authentication to the API which means you can connect without specifying a user name and password. We will also not add TLS yet and keep that for another chapter where you will see how to use an Ingress Controller with certificates.
Creating a REST API in Go
There are several paths you can take to create a REST API, ranging from the lower level built-in net/http package, to all kinds of frameworks that take a higher level approach. One such framework is goa, which takes a design-first approach. A design-first approach means you design your API using a DSL (domain-specific language) which is then fed to a code generator. You then add your custom business logic to the generated code.
Unless your API is extremely simple, using the net/http package only is not very productive. In most cases, a web toolkit is added such as Gorilla. There are others as well.
In this book, we will use goa to create our REST API. Let's get started!
Designing the API
First install goa and goagen with:
go get -u github.com/goadesign/goa/...
To create the API, first create a new folder under your Go src folder and create a folder called design in it. For instance:
src
|
--- influx-rest
|
--- design
In the design folder, you create regular .go files that will use goa's DSL to design the API. Let's start with the first part, api.go:
package design
import (
. "github.com/goadesign/goa/design"
. "github.com/goadesign/goa/design/apidsl"
)
var _ = API("Influx Rest API", func() {
Title("The Influx Rest API")
Description("An API which retrieves data from InfluxDB")
Host("localhost:80")
Scheme("http")
BasePath("/")
Origin("*", func() {
Headers("Content-Type")
Methods("GET")
})
Consumes("application/json")
Produces("application/json")
})
By convention, the package is called design and you need to import the two packages with dot sourcing. You should not use dot sourcing in your applications but for these design files, it is ok. The code probably looks a bit weird with the nested anonymous functions but that is just how the DSL was designed.
It is clear what sort of API we are creating here. It should listen on port 80, use http and the basepath is at the root. This means that our calls will start from / as in http://host/measurements/last/10. Using Origin, we configure the API to get calls from anywhere, but only for GET requests.
This is clearly not enough because we have not defined our resources and our responses. Although you can add these to api.go, I like to seperate them in their own files. Add two more files, resources.go and responses.go. In resources.go, add the following:
package design
import (
. "github.com/goadesign/goa/design"
. "github.com/goadesign/goa/design/apidsl"
)
var _ = Resource("measurement", func() {
BasePath("/measurements")
Action("last", func() {
Description("Get last measurements")
Routing(GET("/last/:number"))
Params(func() {
Param("number", Integer, "Last number of measurements", func() {
Minimum(1)
})
})
Response(OK, ArrayOf(MeasurementMedia))
Response(NotFound)
})
Action("since", func() {
Description("Get measurements since current time minus a number of seconds")
Routing(GET("/since/:seconds"))
Params(func() {
Param("seconds", Integer, "Number of seconds to subtract for current time", func() {
Minimum(1)
})
})
Response(OK, ArrayOf(MeasurementMedia))
Response(NotFound)
})
})
It is important that resources.go uses the same package and imports. We define only one resource, a measurement, that we can retrieve via /measurements. We can perform two actions on the resource, last and since. If you look at the definition of an action, it should be clear from the DSL how it works. To perform these actions, we use the HTTP GET method with a parameter of :number or :seconds depending on the action. Because it does not make sense to ask for the 0th data point, we define a minimum value.
Note that goa will enforce these settings when you call the API and that is the nice thing about it. There is no need to write code to validate the parameter because it is done automatically.
Above, we define two possible responses per action, an OK response and a NotFound response. When we respond with OK (http status code 200), we also return an array of MeasurementMedia which is defined in responses.go:
package design
import (
. "github.com/goadesign/goa/design"
. "github.com/goadesign/goa/design/apidsl"
)
var MeasurementMedia = MediaType("application/vnd.measurement+json", func() {
Description("A measurement")
Attributes(func() {
Attribute("time", DateTime, "Measurement time")
Attribute("device", String, "Device name")
Attribute("room", String, "Room")
Attribute("temperature", Number, "Temperature")
Attribute("humidity", Number, "Humidity")
Required("time", "device", "room", "temperature", "humidity")
})
View("default", func() {
Attribute("time")
Attribute("device")
Attribute("room")
Attribute("temperature")
Attribute("humidity")
})
})
The responses.go file defines a MediaType for a measurement. The attributes of the type are neatly defined with a name, a type and a description. A MediaType requires at least one view, which we define as well.
That is all you have to do to design the API. The next step is to generate code based on the design and to add your custom business logic.
Generating code
Generating code is done with the goagen application. Just run the command as shown below, specifying the path to the design folder, excluding the src folder. For example:
goagen bootstrap -d github.com/gbaeke/influx-rest/design
Several folders will be added, as shown in the screenshot below:
The app folder contains the generated code for the API as a package called app. The main application uses two files, main.go and measurement.go and they import this app package, in addition to a few other packages:
import (
"github.com/gbaeke/influx-rest/app"
"github.com/goadesign/goa"
"github.com/goadesign/goa/middleware"
)
The code in the app folder should not be modified. To make changes to your API, change the design package and then run goagen again. Alternatively, you can also run go generate from the folder that contains main.go (note the comment in main.go that makes this happen).
Implementing your business logic
To actually connect to InfluxDB and retrieve the requested values, we add code to measurement.go. Let's first take a look at main.go first:
//go:generate goagen bootstrap -d github.com/gbaeke/influx-rest/design
package main
import (
"github.com/gbaeke/influx-rest/app"
"github.com/goadesign/goa"
"github.com/goadesign/goa/middleware"
)
func main() {
// Create service
service := goa.New("Influx Rest API")
// Mount middleware
service.Use(middleware.RequestID())
service.Use(middleware.LogRequest(true))
service.Use(middleware.ErrorHandler(service, true))
service.Use(middleware.Recover())
// Mount "measurement" controller
c := NewMeasurementController(service)
app.MountMeasurementController(service, c)
// Start service
if err := service.ListenAndServe(":80"); err != nil {
service.LogError("startup", "err", err)
}
}
main.go was generated by goagen and we do not need to make changes here. Note that the web server is using port 80 and that goa has mounted a controller. It's the controller for the measurement resource, measurement.go, where we implement our business logic:
package main
import (
"fmt"
"github.com/gbaeke/influx-rest/app"
"github.com/goadesign/goa"
)
// MeasurementController implements the measurement resource.
type MeasurementController struct {
*goa.Controller
}
// NewMeasurementController creates a measurement controller.
func NewMeasurementController(service *goa.Service) *MeasurementController {
return &MeasurementController{Controller: service.NewController("MeasurementController")}
}
// Last runs the last action.
func (c *MeasurementController) Last(ctx *app.LastMeasurementContext) error {
// MeasurementController_Last: start_implement
query := fmt.Sprintf("select time, device, room, humidity, temperature from iotdata order by desc limit %d", ctx.Number)
return ctx.OK(readData(query))
// MeasurementController_Last: end_implement
}
// Since runs the since action.
func (c *MeasurementController) Since(ctx *app.SinceMeasurementContext) error {
// MeasurementController_Since: start_implement
query := fmt.Sprintf("select time, device, room, humidity, temperature from iotdata where time > now() - %ds", ctx.Seconds)
return ctx.OK(readData(query))
// MeasurementController_Since: end_implement
}
The only code we added is in func Last and Since. We simply format a query string applicable to the scenario and use the readData() function to return an array of measurements using ctx.OK. Note that the parameter, Number or Seconds, can be retrieved via the ctx variable.
readData() is implemented in a separate file belonging to the main package, called db.go. The full source code is listed below.
package main
import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/gbaeke/influx-rest/app"
"github.com/influxdata/influxdb/client/v2"
"github.com/kelseyhightower/envconfig"
)
// Specification for environment variables
type Specification struct {
InfluxHost string `envconfig:"DB_INFLUXDB_SERVICE_HOST" required:"true" default:"localhost"` // matches Kubernetes environment variable for service influx-influxdb
InfluxPort string `envconfig:"DB_INFLUXDB_SERVICE_PORT" required:"true" default:"8086"`
InfluxDatabase string `envconfig:"INFLUX_DATABASE" default:"telemetry"` // database will be created by this service if it does not exist
InfluxMeasurement string `envconfig:"INFLUX_MEASUREMENT" default:"iotdata"`
InfluxUser string `envconfig:"INFLUX_USER" default:""`
InfluxPassword string `envconfig:"INFLUX_PASSWORD" default:""`
}
// defined at package level; initialised in init(); used in db.go only
var s Specification
var c client.Client
func init() {
// get environment variables via Specification
err := envconfig.Process("", &s)
if err != nil {
log.Fatal(err.Error())
}
// Create a new HTTPClient
c, err = client.NewHTTPClient(client.HTTPConfig{
Addr: fmt.Sprintf("http://%s:%s", s.InfluxHost, s.InfluxPort),
Username: s.InfluxUser,
Password: s.InfluxPassword,
})
if err != nil {
log.Fatal(err)
}
}
// readyData retrieves points from measurement iotdata
func readData(query string) []*app.Measurement {
measurements := []*app.Measurement{}
result, err := queryDB(c, query)
if err != nil {
log.Print(err)
return measurements // return empty array on error
}
// return empty array if query results in no data
if len(result[0].Series) == 0 {
return measurements
}
// we expect select time, device, room, humidity, temperature in the query
for i, row := range result[0].Series[0].Values {
t, err := time.Parse(time.RFC3339, row[0].(string))
if err != nil {
log.Fatal(err)
}
device := row[1].(string)
room := row[2].(string)
humidity, err := row[3].(json.Number).Float64()
temperature, err := row[4].(json.Number).Float64()
log.Printf("[%2d] %s: %s %s %f %f\n", i, t.Format(time.Stamp), device, room, temperature, humidity)
// &app.Measurement{"device", humidity, temperature, time}
measurements = append(measurements, &app.Measurement{Device: device, Room: room,
Humidity: humidity, Temperature: temperature, Time: t})
}
return measurements
}
// queryDB convenience function to query the database
func queryDB(clnt client.Client, cmd string) (res []client.Result, err error) {
q := client.Query{
Command: cmd,
Database: s.InfluxDatabase,
}
if response, err := clnt.Query(q); err == nil {
if response.Error() != nil {
return res, response.Error()
}
res = response.Results
} else {
return res, err
}
return res, nil
}
A lot of the code is similar to the code we used in the MQTT-to-InfluxDB forwarder. The readData() function is new and returns an array of pointers to app.Measurement. app.Measurement is a struct created by goagen in the app/media_types.go file, based on the design:
type Measurement struct {
// Device name
Device string `form:"device" json:"device" xml:"device"`
// Humidity
Humidity float64 `form:"humidity" json:"humidity" xml:"humidity"`
// Room
Room string `form:"room" json:"room" xml:"room"`
// Temperature
Temperature float64 `form:"temperature" json:"temperature" xml:"temperature"`
// Measurement time
Time time.Time `form:"time" json:"time" xml:"time"`
}
In readData() we create a new local array of pointers to app.Measurement. We return this empty array when we cannot connect to InfluxDB or the query returns no data.
The InfluxDB query returns JSON that is formatted as follows:
{
"results": [
{
"series": [
{
"name": "iotdata",
"columns": [
"time",
"device",
"room",
"humidity",
"temperature"
],
"values": [
[
"2015-01-29T21:55:43.702900257Z",
other fields...
],
[
"2015-01-29T21:55:43.702900257Z",
other fields...
],
[
"2015-06-11T20:46:02Z",
other fields...
]
]
}
]
}
]
}
With the statement for i, row := range result[0].Series[0].Values we can iterate over the values and pick out each value using its index. The index depends on the order in the SELECT query. Type assertion is used to convert the value to a string or a json.Number. Since our struct values for humidity and temperature use Float64 types, we also need to convert json.Number to float64 using the Float64() method. With the append function, we add the retrieved values to the slice of pointers like so:
measurements = append(measurements, &app.Measurement{Device: device, Room: room,
Humidity: humidity, Temperature: temperature, Time: t})
Building the API
The API is built using GOOS=linux go build from the folder that contains main.go. This results in a Linux executable called influx-rest if that is the name of the parent folder. Use the -o flag with go build to specify another name if you wish.
Testing the API
To test the API on your local machine, use port forwarding to access InfluxDB in your Kubernetes cluster:
kubectl port-forward <your-pod-name> 8086:8086
Next, run your API with ./influx-rest. The result is:
2018/02/05 15:25:56 [INFO] mount ctrl=Measurement action=Last route=GET /measurements/last/:number
2018/02/05 15:25:56 [INFO] mount ctrl=Measurement action=Since route=GET /measurements/since/:seconds
2018/02/05 15:25:56 [INFO] listen transport=http addr=:80
Goa includes a tool to test the API although you can simply use curl as well. The tool can be found in tool/name-of-your-API-cli but you have to build it first. In that folder, type go build -o cli (or in Windows, cli.exe) and then run the tool to see the instructions:
CLI client for the Influx Rest API service
Usage:
Influx [command]
Available Commands:
help Help about any command
last Get last measurements
since Get measurements since current time minus a number of seconds
Flags:
--dump Dump HTTP request and response.
-h, --help help for Influx
-H, --host string API hostname (default "localhost:80")
-s, --scheme string Set the requests scheme
-t, --timeout duration Set the request timeout (default 20s)
Use "Influx [command] --help" for more information about a command.
To try it, type the following:
cli last measurement
You will get a response that you cannot call the API with a parameter of 0 because we set a minimum value of 1. To use another value, use the following:
cli last measurement meausurements/last/1
You should see the last measurement you sent to InfluxDB. Note that the tool defaults to localhost:8086. If you want to use another host use the -H parameter followed by the hostname/IP and port (e.g. 10.10.2.3:8086).
The cli uses the client package, from the client folder. The client package can be used in your own applications that need to consume the REST API.
Deploying the API
With the API fully tested, we can deploy the API to Kubernetes. But first, we need to build a Docker image and store it in a repository:
docker build -t gbaeke/influx-rest .
docker push gbaeke/influx-rest
To deploy the container to Kubernetes, we create a deployment and a service in one YAML file:
apiVersion: v1
kind: Service
metadata:
name: influx-rest
labels:
app: influx-rest
spec:
type: LoadBalancer
ports:
- port: 80
protocol: TCP
selector:
app: influx-rest
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: influx-rest
spec:
replicas: 2
template:
metadata:
labels:
app: influx-rest
spec:
containers:
- name: influx-rest
image: gbaeke/influx-rest
env:
- name: INFLUX_USER
value: "admin"
- name: INFLUX_PASSWORD
value: "test"
- name: INFLUX_DATABASE
value: "telemetry"
- name: INFLUX_MEASUREMENT
value: "iotdata"
We first create a service called influx-rest. It will listen on port 80 and forward traffic to containers with a label of app: influx-rest (via the selector). We use the LoadBalancer type which results in a cloud load balancer if you are running Kubernetes in a cloud environment such as Azure or Google Cloud.
Next, we create a deployment. The service and deployment is separated with three dashes (---). You have already seen a deployment in From MQTT to InfluxDB, so it is very similar. In this case, we have specified two replicas in order to load balance requests between the two pods. The ReplicaSet will make sure there are two replicas running at all times.
With the above file, deploy to Kubernetes with:
kubectl apply -f <your-YAML-file>
When you run kubectl get pods you should see a result like below:
NAME READY STATUS RESTARTS AGE
db-influxdb-154626948-85qzk 1/1 Running 1 1d
influx-rest-3950614142-0kvr5 1/1 Running 0 16h
influx-rest-3950614142-c8hdv 1/1 Running 0 16h
mqtt-client-2383379092-tjc38 1/1 Running 0 19h
mqtt-mosquitto-298995702-h14k8 1/1 Running 0 1d
Test the complete deployment by running the MQTT simulator created in Writing an MQTT client. Next, obtain the influx-rest external IP with kubectl get svc influx-rest and use curl <your-IP>/measurements/last/10 to see if you can get the last 10 measurements. The result should be something like:
[{"device":"simulator","humidity":40.33,"room":"asterix","temperature":37.36,"time":"2018-02-05T14:50:19Z"},
{"device":"simulator","humidity":46.01,"room":"asterix","temperature":20.72,"time":"2018-02-05T14:50:18Z"},
{"device":"simulator","humidity":46.22,"room":"asterix","temperature":38.77,"time":"2018-02-05T14:50:17Z"},
{"device":"simulator","humidity":43.25,"room":"asterix","temperature":37.68,"time":"2018-02-05T14:50:16Z"},
...