MQTT für Elixier | Komentor

Kürzlich habe ich ein ausgezeichnetes Buch gelesen Baue eine Wetterstation mit Elixir und Nerves. Es stellt Elixir als Werkzeug zum Erstellen eingebetteter Anwendungen vor.

Mit Nervenkönnen wir Elixir-Code auf netzwerkfähigen Geräten ausführen, die mit einer Steuersoftware und untereinander interagieren.

Das oben erwähnte Buch konzentriert sich hauptsächlich auf den Nerventeil und verwendet das HTTP-Protokoll für Netzwerkinteraktionen. Obwohl dies in vielen Situationen eine vernünftige Wahl ist, möchte ich eine weitere weit verbreitete Option für Produktions-IoT-Setups vorstellen: MQTT.

MQTT-Protokoll

MQTT ist ein Messaging-Protokoll, das speziell für die (IoT)-Gerätekommunikation entwickelt wurde. Es wird in vielen Bereichen eingesetzt, z. B. im Bankwesen, in der Öl- und Gasindustrie, in der Fertigung usw.

Das MQTT-Protokoll hat viele Vorteile, aber hier möchte ich einige davon erwähnen:

  • Es ist ein leichtgewichtiges Binärprotokoll, das im Allgemeinen über TCP/IP läuft.
  • Es wurde für unzuverlässige Netzwerke entwickelt und ist daher eine gute Wahl für Installationen im Freien.
  • Es folgt Kneipe/Sub Modell zur Vereinfachung der Client-Logik.

Wir werden einige der Vorteile von MQTT in unserem Setup demonstrieren.

MQTT-Broker

Eine wesentliche Sache bei MQTT ist, dass es die Logik des Clients vereinfacht, was für eingebettete Geräte entscheidend ist. Dies wird mit dem Pubsub-Modell erreicht: In MQTT gibt es kein Konzept eines „Servers“. Stattdessen sind alle teilnehmenden Einheiten Clients, die sich mit einem sogenannten verbinden Makler. Kunden Abonnieren zu Themen, veröffentlichen Nachrichten an sie, und der Broker erledigt das Routing (und viele andere Dinge).

Ein guter produktionsbereiter Broker, wie z EMQX bietet im Allgemeinen nicht nur MQTT-Routing-Funktionen, sondern viele andere interessante Funktionen, wie z

  • andere Arten von Verbindungsmethoden, wie WebSockets;
  • verschiedene Authentifizierungs- und Autorisierungsmodelle;
  • Streaming von Daten zu Datenbanken;
  • benutzerdefinierte Routing-Regeln basierend auf Nachrichten-Introspektion;
  • usw.

Sensor-Setup

Der Einfachheit halber wird unser Gerät durch eine gewöhnliche Mix-Anwendung dargestellt: Es kann leicht in eine Nerves-Anwendung umgewandelt werden.

Zuerst erstellen wir ein Mix-Projekt:

mix new --sup weather_sensor
cd weather_sensor

Um mit einem MQTT-Broker zu interagieren, benötigen wir eine MQTT-Client. Wir nehmen empf. Fügen Sie es als Abhängigkeit zu mix.exs hinzu:

defp deps do
  [
    {:emqtt, github: "emqx/emqtt", tag: "1.4.4", system_env: [{"BUILD_WITHOUT_QUIC", "1"}]}
  ]
end

Wir werden unseren gesamten „Sensor“-Code in das Hauptmodul WeatherSensor einfügen, also müssen wir ihn dem Application Supervisor lib/weather_sensor/application.ex hinzufügen:

defmodule WeatherSensor.Application do
  @moduledoc false

  use Application

  @impl true
  def start(_type, _args) do
    children = [
      WeatherSensor
    ]

    opts = [strategy: :one_for_one, name: WeatherSensor.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Lassen Sie uns nun das Hauptmodul in lib/weather_sensor.ex implementieren:

defmodule WeatherSensor do
  @moduledoc false

  use GenServer

  def start_link([]) do
    GenServer.start_link( __MODULE__ , [])
  end

  def init([]) do
    interval = Application.get_env(:weather_sensor, :interval)
    emqtt_opts = Application.get_env(:weather_sensor, :emqtt)
    report_topic = "reports/#{emqtt_opts[:clientid]}/temperature"
    {:ok, pid} = :emqtt.start_link(emqtt_opts)
    st = %{
      interval: interval,
      timer: nil,
      report_topic: report_topic,
      pid: pid
    }

    {:ok, set_timer(st), {:continue, :start_emqtt}}
  end

  def handle_continue(:start_emqtt, %{pid: pid} = st) do
    {:ok, _} = :emqtt.connect(pid)

    emqtt_opts = Application.get_env(:weather_sensor, :emqtt)
    clientid = emqtt_opts[:clientid]
    {:ok, _, _} = :emqtt.subscribe(pid, {"commands/#{clientid}/set_interval", 1})
    {:noreply, st}
  end

  def handle_info(:tick, %{report_topic: topic, pid: pid} = st) do
    report_temperature(pid, topic)
    {:noreply, set_timer(st)}
  end

  def handle_info({:publish, publish}, st) do
    handle_publish(parse_topic(publish), publish, st)
  end

  defp handle_publish(["commands", _, "set_interval"], %{payload: payload}, st) do
    new_st = %{st | interval: String.to_integer(payload)}
    {:noreply, set_timer(new_st)}
  end

  defp handle_publish(_, _, st) do
    {:noreply, st}
  end

  defp parse_topic(%{topic: topic}) do
    String.split(topic, "/", trim: true)
  end

  defp set_timer(st) do
    if st.timer do
      Process.cancel_timer(st.timer)
    end
    timer = Process.send_after(self(), :tick, st.interval)
    %{st | timer: timer}
  end

  defp report_temperature(pid, topic) do
    temperature = 10.0 + 2.0 * :rand.normal()
    message = {System.system_time(:millisecond), temperature}
    payload = :erlang.term_to_binary(message)
    :emqtt.publish(pid, topic, payload)
  end
end

Und fügen Sie einige Optionen in config/config.exs ein:

import Config

config :weather_sensor, :emqtt,
  host: '127.0.0.1',
  port: 1883,
  clientid: "weather_sensor",
  clean_start: false,
  name: :emqtt

config :weather_sensor, :interval, 1000

Lassen Sie uns kurz zusammenfassen, was in WeatherSensor passiert:

  • Es implementiert GenServer-Verhalten.
  • Beim Starten, es
    • öffnet eine MQTT-Verbindung;
    • abonniert das Thema commands/weather_sensor/set_interval zum Empfangen von Befehlen, empfangene Daten werden von :emqtt als {:publish, publish}-Nachrichten an den Prozess gesendet.
    • plant Timer mit einem vordefinierten Intervall.
  • Bei Zeitüberschreitung des Zeitgebers veröffentlicht es das {Timestamp, Temperature}-Tupel in “reports/weather_sensor/temperaturetopic”.
  • Beim Empfang einer Nachricht vom Thema commands/weather_sensor/set_interval wird das Timer-Intervall aktualisiert.

Da unsere Anwendung keine echte Nerves-Anwendung mit einem angeschlossenen Sensor wie dem BMP280 ist, generieren wir Temperaturdaten.

Hier sehen wir bereits einen Vorteil gegenüber der HTTP-Interaktion: Wir können nicht nur Daten senden, sondern auch einige Befehle in Echtzeit empfangen.

Wir brauchen einen funktionierenden Broker, um den Knoten erfolgreich auszuführen; Wir werden später damit beginnen.

Dashboard-Setup

Da es in MQTT keine „Server“ gibt, wird unser steuerndes Dashboard auch ein MQTT-Client sein. Aber es wird Abonnieren zum Thema Berichte/Wettersensor/Temperatur und veröffentlichen Kommandos ancommands/weather_sensor/set_interval.

Für ein Dashboard richten wir eine Phoenix LiveView-Anwendung ein.

Lassen Sie es uns erstellen:

mix phx.new --version
Phoenix installer v1.6.2
mix phx.new weather_dashboard --no-ecto --no-gettext --no-dashboard --live
cd weather_dashboard

Abhängigkeiten zu mix.exs hinzufügen

defp deps do
    [
      ...
      {:jason, "~> 1.2"},
      {:plug_cowboy, "~> 2.5"},

      {:emqtt, github: "emqx/emqtt", tag: "1.4.4", system_env: [{"BUILD_WITHOUT_QUIC", "1"}]},
      {:contex, github: "mindok/contex"} # We will need this for SVG charts
    ]
  end

Fügen Sie einige Einstellungen zu config/dev.exs hinzu:

config :weather_dashboard, :emqtt,
  host: '127.0.0.1',
  port: 1883

config :weather_dashboard, :sensor_id, "weather_sensor"

# Period for chart
config :weather_dashboard, :timespan, 60

Jetzt generieren wir einen LiveView-Controller:

mix phx.gen.live Measurements Temperature temperatures --no-schema --no-context

Dadurch werden viele Dateien generiert, aber wir benötigen nur einige davon, da wir eine Single-Page-Anwendung mit einem Diagramm benötigen.

rm lib/weather_dashboard_web/live/temperature_live/form_component.*
rm lib/weather_dashboard_web/live/temperature_live/show.*
rm lib/weather_dashboard_web/live/live_helpers.ex

Entfernen Sie auch import WeatherDashboardWeb.LiveHelpers aus lib/weather_dashboard_web.ex.

Update-Template für unsere Seite (lib/weather_dashboard_web/live/temperature_live/index.html.heex):

<div>
  <%= if @plot do %>
    <%= @plot %>
  <% end %>
</div>

<div>
  <form phx-submit="set-interval">
    <label for="interval">Interval</label>
    <input type="text" name="interval" value={@interval}/>
    <input type="submit" value="Set interval"/>
  </form>
</div>

Auf dieser Seite haben wir ein Diagramm und eine Eingabesteuerung, um Befehle an unser “Gerät” zu senden.

Aktualisieren Sie nun den Hauptteil, den LiveView-Controller (lib/weather_dashboard_web/live/temperature_live/index.ex):

defmodule WeatherDashboardWeb.TemperatureLive.Index do
  use WeatherDashboardWeb, :live_view

  require Logger

  @impl true
  def mount(_params, _session, socket) do
    reports = []
    emqtt_opts = Application.get_env(:weather_dashboard, :emqtt)
    {:ok, pid} = :emqtt.start_link(emqtt_opts)
    {:ok, _} = :emqtt.connect(pid)
    # Listen reports
    {:ok, _, _} = :emqtt.subscribe(pid, "reports/#")
    {:ok, assign(socket,
      reports: reports,
      pid: pid,
      plot: nil,
      interval: nil
    )}
  end

  @impl true
  def handle_params(_params, _url, socket) do
    {:noreply, socket}
  end

  @impl true
  def handle_event("set-interval", %{"interval" => interval_s}, socket) do
    case Integer.parse(interval_s) do
      {interval, ""} ->
        id = Application.get_env(:weather_dashboard, :sensor_id)
        # Send command to device
        topic = "commands/#{id}/set_interval"
        :ok = :emqtt.publish(
          socket.assigns[:pid],
          topic,
          interval_s,
          retain: true
        )
        {:noreply, assign(socket, interval: interval)}
      _ ->
        {:noreply, socket}
    end
  end

  def handle_event(name, data, socket) do
    Logger.info("handle_event: #{inspect([name, data])}")
    {:noreply, socket}
  end

  @impl true
  def handle_info({:publish, packet}, socket) do
    handle_publish(parse_topic(packet), packet, socket)
  end

  defp handle_publish(["reports", id, "temperature"], %{payload: payload}, socket) do
    if id == Application.get_env(:weather_dashboard, :sensor_id) do
      report = :erlang.binary_to_term(payload)
      {reports, plot} = update_reports(report, socket)
      {:noreply, assign(socket, reports: reports, plot: plot)}
    else
      {:noreply, socket}
    end
  end

  defp update_reports({ts, val}, socket) do
    new_report = {DateTime.from_unix!(ts, :millisecond), val}
    now = DateTime.utc_now()
    deadline = DateTime.add(DateTime.utc_now(), - 2 * Application.get_env(:weather_dashboard, :timespan), :second)
    reports =
      [new_report | socket.assigns[:reports]]
      |> Enum.filter(fn {dt, _} -> DateTime.compare(dt, deadline) == :gt end)
      |> Enum.sort()

    {reports, plot(reports, deadline, now)}
  end

  defp parse_topic(%{topic: topic}) do
    String.split(topic, "/", trim: true)
  end

  defp plot(reports, deadline, now) do
    x_scale =
      Contex.TimeScale.new()
      |> Contex.TimeScale.domain(deadline, now)
      |> Contex.TimeScale.interval_count(10)

    y_scale =
      Contex.ContinuousLinearScale.new()
      |> Contex.ContinuousLinearScale.domain(0, 30)

    options = [
      smoothed: false,
      custom_x_scale: x_scale,
      custom_y_scale: y_scale,
      custom_x_formatter: &x_formatter/1,
      axis_label_rotation: 45
    ]

    reports
    |> Enum.map(fn {dt, val} -> [dt, val] end)
    |> Contex.Dataset.new()
    |> Contex.Plot.new(Contex.LinePlot, 600, 250, options)
    |> Contex.Plot.to_svg()
  end

  defp x_formatter(datetime) do
    datetime
    |> Calendar.strftime("%H:%M:%S")
  end

end

Es gibt einige Dinge zu beachten.

  • Wir haben einen LiveView-Handler erstellt, um die Hauptseite unserer App bereitzustellen.
  • Normalerweise wird Phoenix.PubSub verwendet, um einen LiveView-Prozessstatus zu aktualisieren. Aber stattdessen machen wir einen Trick: seit an MQTT-Broker bereits ein Pubsub-Modell bereitstellt, stellen wir von unserem LiveView-Prozess aus direkt eine Verbindung dazu her.
  • Beim Empfang neuer Temperaturdaten aktualisiert der Server das Temperaturdiagramm.
  • Beim Erhalt einer Formularaktualisierung von einem Benutzer senden wir ein aktualisiertes Intervall an das Befehlsthema.

Richten Sie schließlich das Routing in lib/weather_dashboard_web/router.ex ein, damit unser Controller die Stammseite verarbeitet:

scope "/", WeatherDashboardWeb do
    pipe_through :browser

    live "/", TemperatureLive.Index
  end

Teile zusammenbinden

Jetzt sind wir bereit, alles einzurichten und in Betrieb zu nehmen.

Wir betreiben einen MQTT-Broker. Da wir keine spezifischen Einstellungen wünschen, ist der einfachste Weg, den Broker mit Docker auszuführen.

docker run -d --name emqx -p 1883:1883 emqx/emqx:4.3.10

Jetzt führen wir unser “Gerät” aus:

cd weather_sensor
export BUILD_WITHOUT_QUIC=1
iex -S mix
Erlang/OTP 24 [erts-12.1.2] [source] [64-bit] [smp:16:16] [ds:16:16:10] [async-threads:1] [jit] [dtrace]

....

13:17:24.461 [debug] emqtt(weather_sensor): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 8, false, 1, false}, {:mqtt_packet_subscribe, 2, %{}, [{"/commands/weather_sensor/set_interval", %{nl: 0, qos: 1, rap: 0, rh: 0}}]}, :undefined}

13:17:24.463 [debug] emqtt(weather_sensor): RECV Data: <<144, 3, 0, 2, 1>>

13:17:25.427 [debug] emqtt(weather_sensor): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 3, false, 0, false}, {:mqtt_packet_publish, "/reports/weather_sensor/temperature", :undefined, :undefined}, <<131, 104, 2, 110, 6, 0, 179, 156, 178, 158, 125, 1, 70, 64, 38, 106, 91, 64, 234, 212, 185>>}

13:17:26.428 [debug] emqtt(weather_sensor): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 3, false, 0, false}, {:mqtt_packet_publish, "/reports/weather_sensor/temperature", :undefined, :undefined}, <<131, 104, 2, 110, 6, 0, 156, 160, 178, 158, 125, 1, 70, 64, 39, 115, 221, 187, 144, 192, 31>>}
...

Wir sehen, dass unser Sensor sofort begann, Berichte zu senden.

Führen Sie nun unser Dashboard aus:

cd weather_dashboard
export BUILD_WITHOUT_QUIC=1
iex -S mix phx.server
Erlang/OTP 24 [erts-12.1.2] [source] [64-bit] [smp:16:16] [ds:16:16:10] [async-threads:1] [jit] [dtrace]

[info] Running WeatherDashboardWeb.Endpoint with cowboy 2.9.0 at 127.0.0.1:4000 (http)
[info] Access WeatherDashboardWeb.Endpoint at 
Interactive Elixir (1.12.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> [watch] build finished, watching for changes...

Navigieren wir zu .

Wir sehen, dass ein entsprechender LiveView-Prozess gemountet, mit dem Broker verbunden und begonnen hat, Temperaturdaten zu empfangen:

[info] GET /
[info] Sent 200 in 145ms
[info] CONNECTED TO Phoenix.LiveView.Socket in 129µs
  Transport: :websocket
  Serializer: Phoenix.Socket.V2.JSONSerializer
  Parameters: %{"_csrf_token" => "cwoROxAwKFo7NEcSdgMwFlgaZ1AlBxUa6FIRhAbjHA6XORIF-EUiIRqU", "_mounts" => "0", "_track_static" => %{"0" => "/assets/app.css", "1" => "/assets/app.js"}, "vsn" => "2.0.0"}
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 1, false, 0, false}, {:mqtt_packet_connect, "MQTT", 4, false, true, false, 0, false, 60, %{}, "emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130", :undefined, :undefined, :undefined, :undefined, :undefined}, :undefined}
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): RECV Data: <<32, 2, 0, 0>>
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 8, false, 1, false}, {:mqtt_packet_subscribe, 2, %{}, [{"/reports/#", %{nl: 0, qos: 0, rap: 0, rh: 0}}]}, :undefined}
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): RECV Data: <<144, 3, 0, 2, 0>>
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): RECV Data: <<48, 58, 0, 35, 47, 114, 101, 112, 111, 114, 116, 115, 47, 119, 101, 97, 116,
  104, 101, 114, 95, 115, 101, 110, 115, 111, 114, 47, 116, 101, 109, 112, 101,
  114, 97, 116, 117, 114, 101, 131, 104, 2, 110, 6, 0, 180, 251, 188, 158, 125,
...

Außerdem wurde die Seite sofort aktualisiert:

Phönix 1

Wenn wir das Intervall aktualisieren, sehen wir, dass der Geräteknoten den Befehl sofort erhält und häufiger mit der Aktualisierung beginnt:

Phönix 2

Jetzt demonstrieren wir eine wichtige Sache: Stoppen wir unseren “device”-Knoten, warten Sie ein wenig und starten Sie ihn erneut. Wir sehen, dass der Knoten weiterhin Daten mit der aktualisierten Frequenz sendet.

Phönix 3

Wie konnte das passieren? Das Geheimnis ist einfach: das Retain-Flag von Befehlsnachrichten, das wir an das Befehlsthema senden.

:ok = :emqtt.publish(
  socket.assigns[:pid],
  topic,
  interval_s,
  retain: true
)

Wenn wir eine Nachricht mit Retain-Flag an ein Thema senden, wird diese Nachricht auch zu einer “Standard”-Nachricht. Der Broker behält sie, und jeder Abonnent des Themas erhält diese Nachricht beim Abonnieren.

Diese Funktion ist für eingebettete Geräte von Bedeutung, die häufig offline gehen und keinen einfach zu verwendenden lokalen Speicher haben, um ihren Zustand beizubehalten. Auf diese Weise werden sie beim Verbinden korrekt konfiguriert.

Fazit

In diesem Artikel wir

  • demonstrierte eine beliebte Art der Interaktion mit eingebetteten Geräten – das MQTT-Protokoll;
  • wir haben seine Verwendung in Elixir eingeführt;
  • Wir haben auch einige Vorteile von MQTT demonstriert, wie z. B. das Pubsub-Modell und die Beibehaltung von Nachrichten.

Andere leistungsstarke Funktionen, die wir vielleicht sogar in einem einfachen Setup verwenden möchten, sind:

  • Streamen von Themendaten in eine Datenbank, damit wir den Verlauf beim Verbinden ohne „manuelles“ Speichern anzeigen können;
  • verwenden MQTT.js um direkt vom Frontend über WebSockets eine Verbindung zum Broker herzustellen.

Der gesamte Code ist unter verfügbar .

Ursprünglich erschienen bei

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *