MQTT Lua Agent

mqttagent
Login

Casual Maintenance Intended

This is a daemon MQTT client which runs Lua callbacks on received messages.

It started with a backup graph that did not match the connectivity graph, so some data had to be pushed while other had to be pulled. So I needed machine-to-machine signalling to pull data from a source after it has been pushed completely.

I thought of MQTT for the signalling, and a kind of cron-but-for-MQTT-messages-instead-of-time to run actions. But then I imagined writing a parser for a crontab-like configuration file, and then writing such a configuration file, so I reconsidered my life choices.

It turns out that a Lua script is much easier for development (thanks to existing interpreters), for setup (thanks to a friendlier language), and for maintenance (thanks to basic logic not being scattered across a lot of small shell scripts).

Manual

Commands

The provided commands provide varying levels of extra primitives bound to the Lua environment.

For now, the generated commands run mqttagent.lua in the current directory and stay in the foreground.

Once the script is initially run, the program runs until all connections and all timers are finished (or the script calls os.exit).

MQTT client creation

client = mqttclient.new(config)

It creates a new MQTT client, using a configuration table which is deserialized into a go-mqtt/mqtt.Config structure, with the extra field connection used to make the Dialer.

MQTT reception callbacks

The client has a table-like API to set or reset callbacks:

-- Set a callback
client[topic_filer_string] = callback_function
-- Call or query an existing callback
client[topic_filter_string](self, message, topic, t)
local f = client[topic_filter_string]
-- Reset a callback
client[topic_filter_string] = nil

The callbacks are called with four parameters:

Setting a new callback automatically subscribes to the filter, and resetting the callback automaticall unsubscribes.

Note that when subscribing to overlapping-but-different filters, messages may be duplicated (i.e. when the overlap makes the broker send the message twice, each callback will be called twice). Since QoS is not supported yet, callbacks should always be ready to handle multiple and missing messages anyway.

MQTT message sending

The client has a function-like API to send messages:

-- Send a ping to the server
client()
-- Send a message to the given topic
client(message, topic)

Timers

Timers are created using timer.new with a time and a callback:

timer_obj = timer.new(t, callback)

Timers are NOT automatically repeated, the callback is called only once, after t, and then the timer is destroyed unless explicitly rescheduled:

timer_obj:schedule(next_t)

To make repeating timers, explicitly call self:schedule within the callback with the next time.

When the time given in timer.new or :schedule is in the past, the callback is called as soon as possible.

The :cancel method is also available to de-schedule a timer without destroying it.

Examples

Simplest client prints one message and leaves

-- Create an anonymous client
local c = mqttclient.new({ connection = "127.0.0.1:1883" })

-- Subscribe to all topics under `test`
c["test/#"] = function(self, message, topic)
	-- Print the received message
	print(message)
	-- Unsubscribe
	c["test/#"] = nil
end

Client with LWT, keepalive and timer

-- Keep alive in seconds
local keepalive = 60
-- Create the client
local c = mqttclient.new{
	connection = "127.0.0.1:1883",
	user_name = "mqttagent",
	password = "1234",
	will = {
		topic = "test/status/mqttagent",
		message = "Offline",
	},
	keep_alive = keepalive,
}

-- Setup a ping timer for the keepalive
if keepalive > 0 then
	-- Create a new timer, dropping the variable (`self` is enough)
	timer.new(os.time() + keepalive, function(self, t)
		-- Send a ping
		c()
		-- Schedule next keepalive
		self:schedule(t + keepalive)
	end)
end

-- Print the next 10 messages send to `test` topic
local count = 10
c["test"] = function(self, message, topic)
	print(message)
	count = count - 1
	if count <= 0 then
		os.exit(0)
	end
end

-- Announce start
c("Online", "test/status/mqttagent")

One-way MQTT Bridge

-- Create the source client
local source = mqttclient.new{
	connection = "mqtt.example.com:1883",
	user_name = "mqttagent",
	password = "1234",
}
-- Create the destination client
local dest = mqttclient.new{ "127.0.0.1:1883" }
-- Make the one-way bridge
source["#"] = function(self, message, topic)
	dest(message, topic)
end

RRD Update Using JSON Payload Form Tasmota Energy Sensor

local json = require("json")
local c = mqttclient.new{ connection = "127.0.0.1:1883" }

function millis(n)
	if n then
		return math.floor(n * 1000 + 0.5)
	else
		return "U"
	end
end

c["tele/+/SENSOR"] = function(self, message, topic)
	-- Deduce file name from middle part of the topic
	local name = string.sub(topic, 6, -8)
	-- Sanity check
	if string.find(name, "[^_%w%-]") then return end
	-- Decode JSON payload
	local obj, err = json.decode(message)
	if err then
		print(err)
		return
	end
	if not obj.ENERGY then return end

	os.execute("rrdtool update \"" .. dbdir .. "/energy-sensor-" ..
	            name .. ".rrd\" N:" ..
	            millis(obj.ENERGY.Total) ..
	            ":" .. (obj.ENERGY.Period or "U") ..
	            ":" .. (obj.ENERGY.Power or "U") ..
	            ":" .. millis(obj.ENERGY.Factor) ..
	            ":" .. (obj.ENERGY.Voltage or "U") ..
	            ":" .. millis(obj.ENERGY.Current))
end

Backup Monitoring

local c = mqttclient.new{ connection = "127.0.0.1:1883" }
-- List of topic suffix to check
local backups = {
	["dest1/src1"] = 0,
	["dest1/src2"] = 0,
	["dest2/src2"] = 0,
	["dest3/src1"] = 0,
}

-- Check every day at 5:02:42 UTC that all backups have been done
function next_check()
	local tbl = os.date("!*t")
	tbl.hour = 5
	tbl.min = 2
	tbl.sec = 42
	return os.time(tbl) + 86400
end

timer.new(next_check(), function(self, t)
	local msg = ""
	-- Accumulate missing backup names in msg and reset the table
	for k, v in pairs(backups) do
		if v == 0 then
			msg = msg .. (#msg > 0 and ", " or "") .. k
		end
		backups[k] = 0
	end
	-- Send an alert when a backup is missing
	if #msg > 0 then
		c("Missing: " .. msg, "alert/backup")
	end
	-- Check again tomorrow
	self:schedule(t + 86400)
end)

-- Mark backups as seen when notified
c["backup/#"] = function(self, _, topic, t)
	local name = string.sub(topic, 8)
	if backups[name] then
		backups[name] = t
	end
end