MQTTブローカー(mosquitto)とInfluxDBを連携させる

はやし
2025-03-19
2025-03-19

 

はじめに

オープンソースのMQTTブローカーであるMosquiitoと、MQTTのデータを格納するためのDBとしてInfluxDB1をAlmaLinux9上で簡易動作検証してみたので備忘録として残しておきます。

MQTT(Message Queuing Telemetry Transport : エムキューティーティー)は、軽量なメッセージングプロトコルで、IoTデバイス(例えば冷蔵庫や洗濯機といった家電)などのデータを収集したりするのによく利用されるようです。

MQTTではあまり耳慣れない用語が使われます。

1.ブローカー

MQTTクライアントからデータを仲介するためのサーバ。

2.パブリッシュ/パブリッシャー

MQTTブローカーにデータを送ること。送信者(パブリッシャー)は特定のトピックに対してメッセージをブローカーに送信する。

3.サブスクライブ/サブスクライバー

MQTTブローカーからデータを受け取ること。受信者(サブスクライバー)は、特定のトピックを購読(サブスクライブ)しておくと、ブローカーからそのトピックのメッセージが届く。

これだけ読んでも意味がよく分からないので、実際に試してみます。

MQTTブローカー(サーバ)には、オープンソースのmosquiitoを利用します。

また、Mosquitto(ブローカー)にデータを送信(パブリッシュ)するためと、ブローカーが受信したデータを受信(サブスクライブ)する検証を行うために、ブローカーと同じサーバ上に、PythonでMQTTクライアントを実装してみます。

さらに、受信したデータをMQTTのデータを相性が良いInfluxDBに格納してみます。

環境は以下の通りです。

  • OS:AlmaLinux9
  • MQTTブローカー:mosquitto2.0.18
  • DB:InfluxDB1.11.8

Mosquittoのインストール

AlmaLinux9でMosquiitoを簡単に利用するには、EPELのレポジトリを利用します。

# dnf install epel-release
# dnf install mosquitto mosquitto-devel

インストールできたら、Mosquittoを起動します。

# systemctl start mosquitto
# systemctl enable mosquitto

ついでに、後にPythonでMQTTクライアントを実装するので、必要そうなライブラリをインストールしておきます。

# dnf install python3-pip
# pip install paho-mqtt influxdb

Mosquittoの動作確認

起動したMosquittoが動作しているかを簡単に確認してみます。

確認には二つのターミナル端末(TeraTermなど)を用意します。

一つ目のターミナルでは、サブスクライバーとしてコマンドを実行します。

# mosquitto_sub -h localhost -t test ←待ち受け画面になる
※MQTTクライアントのサブスクライバーとして、localhostのブローカーに接続してtestという名前のトピックを購読(サブスクライブ)するという意味

もう一つのターミナルでは、パブリッシャーとしてコマンドを実行します。

# mosquitto_pub -h localhost -t test -m "test message"
※MTQQクライアントのパブリッシャーとして、localhostのブローカーのtestというトピックへ「test message」というメッセージを送信(パブリッシュ)するという意味

メッセージをパブリッシュすると、一つ目の待ち受け画面になっているターミナルに、「test message」と表示されれば、正常に動作していることが分かります。

InfluxDB1のインストール

次に、サブスクライブしたデータを格納するためのDBとしてInfluxDB1をインストールします。

現在InfluxDBにはバージョン1と2がありますが、今回は1を利用します。(バージョン2はGUIが実装されていたり、DBの仕様自体も大きく変わります。)

RedHat向けのレポジトリが用意されているので利用します。

# vi /etc/yum.repos.d/influxdb.repo
----------------------------------------------------------------------
[influxdb]
name = InfluxData Repository - Stable
baseurl = https://repos.influxdata.com/rhel/9/$basearch/stable
enabled = 1
gpgcheck = 1
gpgkey = https://repos.influxdata.com/influxdata-archive_compat.key
----------------------------------------------------------------------
# dnf install influxdb

InfluxDBを起動します。

# systemctl start influxdb

InfluxDBにデータベースを作成する

InfluxDBが起動したら、MQTTデータを格納するためのデータベースを作成しておきます。

InfluxDB1はMySQL等のRDBMSと似たようなSQLが利用できます。

# influx ←SQL入力モードになる
Connected to http://localhost:8086 version v1.11.8
InfluxDB shell version: v1.11.8

> create database test; ←testという名前のデータベースを作成
> show databases; ←確認
name: databases
name
----
_internal
test

> create user admin with password 'ADMIN_PASS' with all privileges; ←admminという名前のユーザを作成

admin : ユーザ名
ADMIN_PASS:任意のパスワード

> show users; ←作成したユーザを確認
user  admin
----  -----
admin true

> quit ←SQLモードを抜ける

これでMQTTデータを格納するためのデータベースの準備ができました。

PythonでMQTTクライアントを作る

ここまでで、MQTTブローカー(Mosquitto)と、データを格納するためのデータベース(InfluxDB)の準備が出来たので、MQTTクライアント(パブリッシュ&サブスクライブをするためのもの)を作成してみます。

言語は何でも良いですが、今回はPythonで作ってみます。

mqtt_influx.pyという名前で、ブローカーと同じサーバの任意の場所に保存します。

import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
import time
import random

# InfluxDB の設定
INFLUXDB_HOST = "localhost"
INFLUXDB_PORT = 8086
INFLUXDB_DATABASE = "test"
INFLUXDB_USER = "admin"
INFLUXDB_PASSWORD = "msfhips2"

# MQTT の設定
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
MQTT_TOPIC = "sensor/data"

# InfluxDB クライアントを作成
influx_client = InfluxDBClient(
    host=INFLUXDB_HOST,
    port=INFLUXDB_PORT,
    username=INFLUXDB_USER,
    password=INFLUXDB_PASSWORD,
    database=INFLUXDB_DATABASE
)

# 初回のみ、データベースを作成
influx_client.create_database(INFLUXDB_DATABASE)

# MQTT 受信時のコールバック関数
def on_message(client, userdata, msg):
    payload = msg.payload.decode("utf-8")
    timestamp = int(time.time() * 1000000000)  # ナノ秒精度のタイムスタンプ
    print(f"Received message: {payload}")

    # InfluxDB に保存するデータ形式
    json_body = [
        {
            "measurement": "sensor_data",
            "tags": {
                "device": "sensor1"
            },
            "fields": {
                "value": float(payload)
            },
            "time": timestamp
        }
    ]

    # InfluxDB にデータを書き込む
    influx_client.write_points(json_body)

# MQTT クライアントを作成
mqtt_client = mqtt.Client()
mqtt_client.on_message = on_message
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)

# サブスクライブ開始
mqtt_client.subscribe(MQTT_TOPIC)
print(f"Subscribed to MQTT topic: {MQTT_TOPIC}")

# 別スレッドでループ開始
mqtt_client.loop_start()

# 送信側(パブリッシャー)
while True:
    value = round(random.uniform(10, 30), 2)  # 10~30のランダム値
    mqtt_client.publish(MQTT_TOPIC, str(value))
    print(f"Published: {value}")
    time.sleep(5)

このスクリプトは、MQTTクライアントとしてパブリッシャー(送信者)とサブスクライバー(受信者)の機能と、受信したデータをInfluxDBに保存するようなっています。

ざっくりした流れは以下の通り。

1. [パブリッシャー] → MQTTブローカーの「sensor/data」トピックにランダム生成した数値データを送信
2. [ブローカー] → データを中継
3. [サブスクライバー] → ブローカーからデータを受信
4. [InfluxDB] → 受信データを保存
5. [次のデータ] → 5秒ごとに送信を繰り返す

MQTTクライアントを実行する

作成したMQTTクライアント(Pythonスクリプト)を実行してみます。

# python3 mqtt_influx.py
Subscribed to MQTT topic: sensor/data
Published: 22.72
Received message: 22.72
Published: 23.6
Received message: 23.6
Published: 27.69
Received message: 27.69
Published: 29.41
Received message: 29.41
Published: 17.08
Received message: 17.08
Published: 23.78
Received message: 23.78
Published: 29.4
Received message: 29.4

上記のように5秒おきにランダム数値がパブリッシュ(Published)、サブスクライブ(Received)されていればOKです。

受信したデータがInfluxDBに保存されているかも確認します。

# influx
> use test ←データベース名testを指定
> SELECT * FROM sensor_data; ←テーブル名sedsour_dataをselect
name: sensor_data
time                device  value
----                ------  -----
1742175950679701504 sensor1 22.72
1742175955684963072 sensor1 23.6
1742175960690174976 sensor1 27.69
1742175965695668992 sensor1 29.41
1742175970702315520 sensor1 17.08
1742175975707922688 sensor1 23.78
1742175980709915136 sensor1 29.4

データベース名「test」の「sensor_data」テーブルにデータが保存されていればOKです。

まとめ

ここまでで、MTQQブローカー(Mosquitto)と、データを保存するためのInfluxDB1の動作検証が完了です。

IoT関連のデータ収集の実装としてMQTTはよく利用されるため、良い勉強になりました。

なお、InfluxDBに保存したデータをGrafana等の可視化ツールに連携することもできるので、興味がある方は試してみてください。