So verwenden Sie MQTT in Java

MQTT ist ein OASIS-Standard-Messaging-Protokoll für das Internet der Dinge (IoT). Es ist als extrem leichter Publish/Subscribe-Messaging-Transport konzipiert, der sich ideal für die Verbindung von Remote-Geräten mit geringem Code-Footprint und minimaler Netzwerkbandbreite eignet. MQTT wird heute in einer Vielzahl von Branchen eingesetzt, wie z. B. Automobil, Fertigung, Telekommunikation, Öl und Gas usw.

Dieser Artikel stellt vor, wie MQTT im Java-Projekt verwendet wird, um die Funktionen zum Verbinden, Abonnieren, Abbestellen, Veröffentlichen und Empfangen von Nachrichten zwischen dem Client und dem Broker zu realisieren.

Abhängigkeit hinzufügen

Die Entwicklungsumgebung für diesen Artikel ist:

Wir werden verwenden Eclipse Paho Java-Client als Client, die am weitesten verbreitete MQTT-Client-Bibliothek in der Java-Sprache.

Fügen Sie die folgenden Abhängigkeiten zu der hinzu pom.xml Datei.

<dependencies>
   <dependency>
       <groupId>org.eclipse.paho</groupId>
       <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
       <version>1.2.5</version>
   </dependency>
</dependencies>

Erstellen Sie eine MQTT-Verbindung

MQTT-Broker

Dieser Artikel verwendet die öffentlicher MQTT-Broker erstellt basierend auf EMQX-Cloud. Die Serverzugriffsinformationen lauten wie folgt:

  • Makler: broker.emqx.io
  • TCP-Port: 1883
  • SSL/TLS-Port: 8883

Verbinden

Legen Sie die grundlegenden Verbindungsparameter von MQTT fest. Benutzername und Passwort sind optional.

String broker = "tcp://broker.emqx.io:1883";
// TLS/SSL
// String broker = "ssl://broker.emqx.io:8883";
String username = "emqx";
String password = "public";
String clientid = "publish_client";

Erstellen Sie dann einen MQTT-Client und verbinden Sie sich mit dem Broker.

MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
client.connect(options);

Anweisungen

  • MqttClient: MqttClient stellt eine Reihe von Methoden bereit, die die Steuerung blockieren und an das Anwendungsprogramm zurückgeben, sobald die MQTT-Aktion abgeschlossen ist.
  • MqttClientPersistence: Stellt einen persistenten Datenspeicher dar, der verwendet wird, um ausgehende und eingehende Nachrichten während der Übertragung zu speichern und die Zustellung an die angegebene QoS zu ermöglichen.
  • MqttConnectOptions: Enthält den Satz von Optionen, die steuern, wie der Client eine Verbindung zu einem Server herstellt. Hier sind einige gängige Methoden:
    • setUserName: Legt den für die Verbindung zu verwendenden Benutzernamen fest.
    • setPassword: Legt das für die Verbindung zu verwendende Passwort fest.
    • setCleanSession: Legt fest, ob Client und Server den Status über Neustarts und Wiederverbindungen hinweg speichern sollen.
    • setKeepAliveInterval: Legt das „Keep Alive“-Intervall fest.
    • setConnectionTimeout: Legt den Verbindungs-Timeout-Wert fest.
    • setAutomaticReconnect: Legt fest, ob der Client automatisch versucht, sich wieder mit dem Server zu verbinden, wenn die Verbindung unterbrochen wird.

Verbindung mit TLS/SSL

Wenn Sie ein selbstsigniertes Zertifikat für TLS/SSL-Verbindungen verwenden möchten, fügen Sie hinzu bcpkix-jdk15on zum pom.xml Datei.

<!--  -->
<dependency>
   <groupId>org.bouncycastle</groupId>
   <artifactId>bcpkix-jdk15on</artifactId>
   <version>1.70</version>
</dependency>

Erstellen Sie dann die SSLUtils.java Datei mit folgendem Code.

package io.emqx.mqtt;

import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMKeyPair;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileReader;
import java.security.KeyPair;
import java.security.KeyStore;
import java.security.Security;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;

public class SSLUtils {
   public static SSLSocketFactory getSocketFactory(final String caCrtFile,
                                                   final String crtFile, final String keyFile, final String password)
           throws Exception {
       Security.addProvider(new BouncyCastleProvider());

       // load CA certificate
       X509Certificate caCert = null;

       FileInputStream fis = new FileInputStream(caCrtFile);
       BufferedInputStream bis = new BufferedInputStream(fis);
       CertificateFactory cf = CertificateFactory.getInstance("X.509");

       while (bis.available() > 0) {
           caCert = (X509Certificate) cf.generateCertificate(bis);
      }

       // load client certificate
       bis = new BufferedInputStream(new FileInputStream(crtFile));
       X509Certificate cert = null;
       while (bis.available() > 0) {
           cert = (X509Certificate) cf.generateCertificate(bis);
      }

       // load client private key
       PEMParser pemParser = new PEMParser(new FileReader(keyFile));
       Object object = pemParser.readObject();
       JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");
       KeyPair key = converter.getKeyPair((PEMKeyPair) object);
       pemParser.close();

       // CA certificate is used to authenticate server
       KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
       caKs.load(null, null);
       caKs.setCertificateEntry("ca-certificate", caCert);
       TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
       tmf.init(caKs);

       // client key and certificates are sent to server so it can authenticate
       KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
       ks.load(null, null);
       ks.setCertificateEntry("certificate", cert);
       ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),
               new java.security.cert.Certificate[]{cert});
       KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
              .getDefaultAlgorithm());
       kmf.init(ks, password.toCharArray());

       // finally, create SSL socket factory
       SSLContext context = SSLContext.getInstance("TLSv1.2");
       context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);

       return context.getSocketFactory();
  }
}

Satz options folgendermaßen.

String broker = "ssl://broker.emqx.io:8883";
// Set socket factory
String caFilePath = "/cacert.pem";
String clientCrtFilePath = "/client.pem";
String clientKeyFilePath = "/client.key";
SSLSocketFactory socketFactory = getSocketFactory(caFilePath, clientCrtFilePath, clientKeyFilePath, "");
options.setSocketFactory(socketFactory);

Veröffentlichen von MQTT-Nachrichten

Erstellen Sie eine Klasse PublishSample das wird a veröffentlichen Hello MQTT Nachricht zum Thema mqtt/test.

package io.emqx.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class PublishSample {

   public static void main(String[] args) {

       String broker = "tcp://broker.emqx.io:1883";
       String topic = "mqtt/test";
       String username = "emqx";
       String password = "public";
       String clientid = "publish_client";
       String content = "Hello MQTT";
       int qos = 0;

       try {
           MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
           MqttConnectOptions options = new MqttConnectOptions();
           options.setUserName(username);
           options.setPassword(password.toCharArray());
           options.setConnectionTimeout(60);
      options.setKeepAliveInterval(60);
           // connect
           client.connect(options);
           // create message and setup QoS
           MqttMessage message = new MqttMessage(content.getBytes());
           message.setQos(qos);
           // publish message
           client.publish(topic, message);
           System.out.println("Message published");
           System.out.println("topic: " + topic);
           System.out.println("message content: " + content);
           // disconnect
           client.disconnect();
           // close client
           client.close();
      } catch (MqttException e) {
           throw new RuntimeException(e);
      }
  }
}

Erstellen Sie eine Klasse SubscribeSample die das Thema abonnieren mqtt/test.

package io.emqx.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class SubscribeSample {
   public static void main(String[] args) {
       String broker = "tcp://broker.emqx.io:1883";
       String topic = "mqtt/test";
       String username = "emqx";
       String password = "public";
       String clientid = "subscribe_client";
       int qos = 0;

       try {
           MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
           // connect options
           MqttConnectOptions options = new MqttConnectOptions();
           options.setUserName(username);
           options.setPassword(password.toCharArray());
           options.setConnectionTimeout(60);
      options.setKeepAliveInterval(60);
           // setup callback
           client.setCallback(new MqttCallback() {

               public void connectionLost(Throwable cause) {
                   System.out.println("connectionLost: " + cause.getMessage());
              }

               public void messageArrived(String topic, MqttMessage message) {
                   System.out.println("topic: " + topic);
                   System.out.println("Qos: " + message.getQos());
                   System.out.println("message content: " + new String(message.getPayload()));

              }

               public void deliveryComplete(IMqttDeliveryToken token) {
                   System.out.println("deliveryComplete---------" + token.isComplete());
              }

          });
           client.connect(options);
           client.subscribe(topic, qos);
      } catch (Exception e) {
           e.printStackTrace();
      }
  }
}

MqttCallback:

connectionLost(Throwable cause): Diese Methode wird aufgerufen, wenn die Verbindung zum Server unterbrochen wird.

messageArrived(String topic, MqttMessage message): Diese Methode wird aufgerufen, wenn eine Nachricht vom Server eintrifft.

deliveryComplete(IMqttDeliveryToken token): Wird aufgerufen, wenn die Zustellung einer Nachricht abgeschlossen ist und alle Bestätigungen eingegangen sind.

Prüfen

Als nächstes laufen SubscribeSample zu abonnieren mqtt/test Thema. Dann renne PublishSample um die Nachricht an die zu veröffentlichen mqtt/test Thema. Wir werden sehen, dass der Herausgeber die Nachricht erfolgreich veröffentlicht und der Abonnent sie erhält.

Java-MQTT

Jetzt sind wir damit fertig, den Paho Java Client als MQTT-Client zu verwenden, um uns mit dem zu verbinden öffentlicher MQTT-Server und Nachrichtenveröffentlichung und -abonnement implementieren.

Den vollständigen Code finden Sie unter: https://github.com/emqx/MQTT-Client-Examples/tree/master/mqtt-client-Java.

Ursprünglich veröffentlicht unter:https://www.emqx.com.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *