Skip to content

Latest commit

 

History

History
246 lines (182 loc) · 7.73 KB

howto-mqtt-pubsub-among-mqtt-clients.md

File metadata and controls

246 lines (182 loc) · 7.73 KB
title titleSuffix description author ms.author ms.service ms.topic ms.date
PubSub among MQTT web clients
Azure Web PubSub
A how-to guide that shows to how to subscribe to messages on a topic and send messages to a topic without the involvement of a typical application server
Y-Sindo
zityang
azure-web-pubsub
how-to
10/17/2024

Publish/subscribe among MQTT web clients

:::image type="content" source="media/quickstarts-pubsub-among-clients/among-clients.gif" alt-text="GIF of pub/sub among clients without an application server.":::

This quickstart guide demonstrates how to

[!div class="checklist"]

  • connect to your Web PubSub resource
  • subscribe to messages on a specific topic
  • publish messages to a topic

Prerequisites

  • A Web PubSub resource. To created one, you can follow the guidance: Create a Web PubSub resource
  • A code editor, such as Visual Studio Code
  • Dependencies for the language you plan to use

Note

Except for the MQTT client libraries mentioned belows, you can choose any standard MQTT client libraries that meet the following requirements to connect to Web PubSub:

  • Support WebSocket transport.
  • Support MQTT protocol 3.1.1 or 5.0.
mkdir pubsub_among_clients
cd pubsub_among_clients

npm install mqtt
mkdir pubsub_among_clients
cd pubsub_among_clients

# Create a new .net console project
dotnet new console

dotnet add package MqttNet
mkdir pubsub_among_clients
cd pubsub_among_clients

pip install paho-mqtt

Connect to Web PubSub

An MQTT uses a Client Access URL to connect and authenticate with your resource. This URL follows a pattern of wss://<service_name>.webpubsub.azure.com/clients/mqtt/hubs/<hub_name>?access_token=<token>.

A client can have a few ways to obtain the Client Access URL. It's best practice to not hard code the Client Access URL in your code. In the production world, we usually set up an app server to return this URL on demand. Generate Client Access URL describes the practice in detail.

For this quick start, you can copy and paste one from Azure portal shown in the following diagram.

The diagram shows how to get MQTT client access url.

As shown in the preceding code, the client has the permissions to send messages to topic group1 and to subscribe to topic group2.

[!INCLUDE MQTT-Connection-Parameters]

The following code shows how to connect MQTT clients to WebPubSub with MQTT protocol version 5.0, clean start, 30-seconds session expiry interval.

Create a file with name index.js and add following code

const mqtt = require('mqtt');
var client = mqtt.connect(`wss://<service_name>.webpubsub.azure.com/clients/mqtt/hubs/<hub_name>?access_token=<token>`,
        {
            clientId: "client1",
            protocolVersion: 5, // Use MQTT 5.0 protocol
            clean: true,
            properties: {
                sessionExpiryInterval: 30,
            },
        });

Edit the Program.cs file and add following code

using MQTTnet;
using MQTTnet.Client;

var mqttFactory = new MqttFactory();
var client = mqttFactory.CreateMqttClient();
var mqttClientOptions = new MqttClientOptionsBuilder()
    .WithWebSocketServer((MqttClientWebSocketOptionsBuilder b) =>
        b.WithUri("wss://<service_name>.webpubsub.azure.com/clients/mqtt/hubs/<hub_name>?access_token=<token>"))
    .WithClientId("client1")
    .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500)
    .WithCleanStart()
    .WithSessionExpiryInterval(30)
    .Build();
await client.ConnectAsync(mqttClientOptions, CancellationToken.None);
import paho.mqtt.client as mqtt
from paho.mqtt.packettypes import PacketTypes

def on_connect(client, userdata, flags, reasonCode, properties):
    print("Connected with result code "+str(reasonCode))

def on_connect_fail(client, userData):
    print("Connection failed")
    print(userData)

def on_log(client, userdata, level, buf):
    print("log: ", buf)

host = "<service_name>.webpubsub.azure.com"
port = 443
client = mqtt.Client(client_id= client_id, transport="websockets", protocol= mqtt.MQTTv5)
client.ws_set_options(path="/clients/mqtt/hubs/<hub_name>?access_token=<token>")
client.tls_set()
client.on_connect = on_connect
client.on_connect_fail = on_connect_fail
client.on_log = on_log
connect_properties.SessionExpiryInterval = 30
client.connect(host, port, clean_start = True, properties=connect_properties)

Troubleshooting

If your client failed to connect, you could use the Azure Monitor for troubleshooting. See Monitor Azure Web PubSub for more details.

You can check the connection parameters and get more detailed error messages from the Azure Monitor. For example, the following screenshot of Azure Log Analytics shows that the connection was rejected because it set an invalid keep alive interval. Screenshot of Azure Log Analytics.

Subscribe to a topic

To receive messages from topics, the client

  • must subscribe to the topic it wishes to receive messages from
  • has a callback to handle message event

The following code shows a client subscribes to topics named group2.

// ...code from the last step

// Provide callback to the message event.
client.on("message", async (topic, payload, packet) => {
    console.log(topic, payload)
});

// Subscribe to a topic.
client.subscribe("group2", { qos: 1 }, (err, granted) => { console.log("subscribe", granted); })
// ...code from the last step

// Provide callback to the message event.
client.ApplicationMessageReceivedAsync += (args) =>
{
    Console.WriteLine($"Received message on topic '{args.ApplicationMessage.Topic}': {System.Text.Encoding.UTF8.GetString(args.ApplicationMessage.PayloadSegment)}");
    return Task.CompletedTask;
};
// Subscribe to a topic "topic".
await client.SubscribeAsync("group2", MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
# ...code from the last step

# Provide callback to the message event.
def subscriber_on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))
client.on_message = subscriber_on_message

# Subscribe to a topic "topic".
client.subscribe("group2")

# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()

Publish a message to a group

In the previous step, we set up everything needed to receive messages from group1, now we send messages to that group.

// ...code from the last step

// Send message "Hello World" in the "text" format to "group1".
client.publish("group1", "Hello World!")
// ...code from the last step

// Send message "Hello World" in the "text" format to "group1".
await client.PublishStringAsync("group1", "Hello World!");
# ...code from the last step

# Send message "Hello World" in the "text" format to "group1".
client.publish("group1", "Hello World!")

By using the client SDK, you now know how to

[!div class="checklist"]

  • connect to your Web PubSub resource
  • subscribe to topics
  • publish messages to topics