Paho-MQTT is an open-source Python MQTT client is developed by the Eclipse Foundation. The project has clients in many programming languages but for this tutorial, I will use Python. Paho is the best Python MQTT client that can run in any device with multi-threading capabilities. In this tutorial, we will build an MQTT client program by adding & understand each function.

If you are new to MQTT please read this first to understand the basics:
Fundamentals of MQTT

What’s going on?

1. We import the required objects and set the broker address as iot.eclipse.org and the port number as 1883.

1883 is the default port number in MQTT for all unencrypted connections.

2. We create an MQTT constructor called client. We call the connect() function with the address & port number of the broker.

If the connection is successful, you will see it output 0.

0 is the result code for a successful connection. Let us break down the client object:

1.1 The Client Constructor

The client contructor creates an MQTT client. It takes 4 parameters which are optional:

client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")

client_id is a unique string given by the client when connecting to a broker.

Each client must have a unique client id. If 2 clients with the same client id connect to the broker, the one that connected first will get disconnected from the broker. If a client id is not declared, the broker will assign its own client id.

clean_session is a Boolean value set to True by default. If set to True, the broker removes all the information about the client. If set to False the broker will retain the subscription information & queued messages.

See: Clean Sessions In MQTT Explained

userdata is data that you can send as a parameter to callbacks. More about callbacks in section 4.

protocol can be either MQTTv31 or MQTTv311 depending on which version you want to use.

transport defaults to tcp. If you want to send messages over WebSockets then set to websockets.

To disconnect from the broker cleanly we can use disconnect() function.

2. Publishing A Message

To publish a message we use the publish() function. The function takes 4 parameters:

These are the necessary parameters:

topic is a string variable containing the topic name.

The topic name is case sensitive.

payload is a string containing the message that will be published to the topic. Any client subscribed to the topic will see the payload message.

These are the optional parameters:

qosis either 0, 1 or 2. It defaults to 0.
To understand the different MQTT Quality of Service Levels see this post:
MQTT QoS Levels Explained

retain: This is a boolean value which defaults to False. See more about Retained messages:
MQTT Retained Messages

Adding publish function to our code:

To check if the message has been successfully published to a topic we need a client subscribed to that topic.

3. Subscribing To Topics

To subscribe to a topic we need the subscribe() function. The function takes 2 parameters

topic is a string variable containing topic name. This is a necessary parameter

Note: The topic name is case sensitive.

qos is either 0, 1 or 2. Defaults to 0 if not entered.

Let us modify our program for subscribing to the topic that we are publishing to:

Note: If you want the client to subscribe to multiple topics then you can put them in a list of tuples.

Example: client.subscribe([(‘topicName1’, 1),(‘topicName2’, 1)])

The format of the tuple is [(Topic Name, QoS Level)]

When you execute this program you will notice that the published message still does not get displayed on the console/terminal.

Subscribing to a topic means that the broker needs to send you the messages that are published to that topic. In our program, we have subscribed to the topic but we don’t have a function to process those messages.

To process the messages., we need to use callback functions.

4. Callbacks

Callbacks in paho are functions that are called asynchronously when an event occurs.

A normal python program is synchronous. Which means each line is executed sequentially and you cannot execute other parts of the program until that line is executed.

An asyncronous function is one that gets executed without blocking other parts of the program from executing.

For example if I have 2 functions in the same program that are asynchronously called, both the functions will execute simultanously.

You don’t need to learn asynchronous programming in python to use paho as its built-in the library.

In paho the events that have callbacks are connect, disconnect, subscribe, unsubscribe, publish, message received, logging. Callbacks are what makes paho a very powerful library.

Before we implement callbacks to our program, we need to first understand how these callbacks are called asynchronously. For this we will use the different loop functions available in paho.

4.1 The Loop Functions

loop is a function of the mqtt client constructor. When a message is received by the paho mqtt client, the message is stored in the receive buffer. When a message is to be sent from the client to the broker, it is stored in the send buffer. The loop functions are made to process any messages in the buffer and call a respective callback function. They also automatically reconnect the client to the broker if it disconnects for some reason.

There are 3 types of loop functions in the paho mqtt client:

4.1.1 loop_forever(): This is a blocking loop function. Which means the program will keep running and you cannot execute any other line after you call this function. Use this function if you want to run the program indefinitely and if you are using only a single client object.

4.1.2 loop_start()/loop_stop(): This is a non-blocking loop function which means I can call this function and still continue executing code after the function call. As the name suggests loop_start() is used to start the loop function and loop_stop() is used to stop the loop function. loop_start() can be used if you need to create more than 1 client object in the same program.

4.1.3 loop(): This is a blocking function. The difference between loop() and loop_forever() is that if you call the loop() function you have to handle reconnect manually unlike the latter. The loop_forever() & loop_start() function will automatically try to reconnect with the broker when it disconnects. It is not recommended to use loop() unless in special circumstanced.

Now getting back to callbacks. In the next section we will implement each callback into our program.

4.2 on_connect

The on_connect() callack is called each time the client connects to the broker. This means that if the client disconnects from and reconnects to the broker, this callback is executed. Lets add the callback to our program.

What’s Going on?

  1. We create the on_connect callback function. It takes 4 parameters: the client object, userdata, flags, rc.
  2.  The client object is an instance of the client constructor.
  3. userdata is custom data declared in the client constructor. You will need this if you want to pass custom data into the callback.
  4. flags is a dictionary object that is used to check if you have set clean session to True or False for the client object.
  5. rc stands for result code to check if the connection happened successfully or not. The Different result codes are:

    0: Connection successful 1: Connection refused – incorrect protocol version 2: Connection refused – invalid client identifier 3: Connection refused – server unavailable 4: Connection refused – bad username or password 5: Connection refused – not authorised 6-255: Currently unused.

The output of the program is:
Connected With Result Code 0
Which means the connection is successful.

4.3 on_disconnect

The on_disconnect() callback is called when the client disconnects from the broker.

Add this function below the on_connect() callback.

Don’t forget to assign the callback function to the client constructor!
client.on_disconnect = on_disconnect

Run the program, disconnect your internet and see if the message gets printed. Also, reconnect to the internet to see if the client reconnects to the broker.

If you are using multiple clients in a single program and you want to attach the same callback function to the client objects then you can use the client_id attribute of the client object to distinguish the client.

4.4 on_message

Getting back to the problem of messages not being displayed despite subscribing to the topic: The on_message() callback is used to process messages that are published to a subscribed topic.

In our program, we need to do 3 things:
1. Subscribe to the topic that we are publishing to.
2. Process the published message using the callback. In our program, we will simply print the message.
3. Assign the callback function to the on_message attribute of the client object.

Let us create & run 2 client programs. One that publishes a message(pub.py) to a topic and another that prints that message(sub.py).

The on_message() callback has 3 parameters: client, userdata & message. I already explained the first two in on_connection section.
The message object has 4 attributes namely: topic, payload, qos. retain.

If you notice each attribute is actually a parameter we can use in the publish() function.

First run sub.py and then run pub.py. The output will show the message and the topic.
If the message doesn’t get printed, check if:

  • You have added the function on_message() to the client object’s attribute.
  • If you have subscribed to the topic in the on_connect() callback in sub.py.
  • If you have entered the topic names correctly in the subscribe() and publish() functions in sub.py and pub.py respectively.
  • If you are printing the message in the on_message function.

How to organize & sort the messages?

If we have multiple messages being received from different topics and we need to sort each topic’s message in a different way then we have to sort the messages.

One way to do this is by using the message.topic attribute and creating many if conditions.

The better way to do this is to use the message_callback_add(sub, callback) function of the client object. This creates multiple callbacks to process messages from different topics.
The sub parameter takes the topic name and the callback parameter takes the name of the callback that will process messages of that topic.
Example:

Make sure you have subscribed to both the topics before adding callbacks to it.

You can try the above program by publishing to “TestingTopic” & “TestingTopic1” and seeing if it processes it seperately.

4.5 on_publish

The on_publish() function gets called when the publish() function is executed. This will return a tuple (result, mid).

The result is the error code. An error code of 0 means the message is published successfully.

The mid stands for message id. It is an integer that is a unique message identifier assigned by the client. If you use QoS levels 1 or 2 then the client loop will use the mid to identify messages that have not been sent.
You can add this snippet to your code if you want to try:

4.6 on_subscribe()/on_unsubscribe()

The on_subscribe() callback is called when a client subscribes to a topic. Example:on_subscribe(client, userdata, mid, granted_qos)

The on_unsubscribe() callback is called when a client unsubscribes to a topic. Example:on_unsubscribe(client, userdata, mid)

mid is the message id as discussed in the on_publish() function.

granted_qos is the qos level for that topic when subscribing. You can try it in your program if you want.

5. Other Useful Paho MQTT Functions

The client module also has some useful functions that can be used in the program to execute functions without having to create a client object and go through the hassle of creating callbacks.

5.1 Single() / Multiple() Publish

The single and multiple functions are used to publish a single message or multiple messages to a topic on a broker without having to create a client object.

topic is the only necessary parameter. This is a string with the topic name.

auth is a dictionary with the username and password if the broker requires it. (The eclipse broker does not require authentication).
Example: auth = {'username':"username", 'password':""}

tls is required if we are using TLS/SSL encryption.
Example: dict = {'ca_certs':"", 'certfile':"", 'keyfile':"", 'tls_version':"", 'ciphers':"<ciphers">}

transport is accepts 2 values: websockets and tcp. Set it to tcpif you don’t want to use websockets.

The other parameters are similar to the ones in the client object.

msgs is a necessary parameter. It contains a list of messages to publish. Each message can be dictionary or tuple.

The dictionary must be in this format:
msg = {'topic':"< topicname >", 'payload':"", 'qos':'0', 'retain':'False'}

The tuple must be in this format:
("< topicname >", "< payload >", qos, retain)

In both these formats a topic name has to be present.

5.2 Simple() / Callback()

simple is a blocking function subscribes to a topic or a list of topics and returns the messages published to that topic.

simple(topics, qos=0, msg_count=1, retained=False, hostname="localhost", port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=mqtt.MQTTv311)

topics is the only necessary parameter. This can a string for a single topic or a list for multiple topics.

msg_count is the number of messages that the must be returned before disconnecting from the broker. For msg_count > 1, the function will return a list of messages.

The other parameters are already explained in the single() / multiple() section.
Example Code:

callback is similar to simple, the only difference it takes an extra parameter namely callback. The simple function simply returns the messages from the topic, the callback function sends the returned messages to any function for processing.

callback(callback, topics, qos=0, userdata=None, hostname="iot.eclipse.org", port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=mqtt.MQTTv311)

Example:

6. Additional Client Constructor Functions

These functions can be added to the client constructor to modify its behaviour. Almost all these functions must be added before client initialisation.

6.1 username_pw_set()

Sets a username & password if the broker required authentication. The password it optional.

username_pw_set(username, password=None)

6.2 will_set()

This is a very useful function. When the client connects to the broker it tells the broker that if it disconnects it must publish a message to a topic.

will_set(topic, payload=None, qos=0, retain=False)

To understand how & when to use Last Will & Testament in MQTT see this post:
MQTT Last Will And Testament (Explained with Example)

6.3 tls_set()

If you want to encrypt your messages with TLS you can use this function.
tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS, ciphers=None)

6.4 max_inflight_messages_set()

Sets the maximum messages that can be in the message buffer at once. This is valid only if qos is set to 1 or 2.

max_inflight_messages_set(self, inflight)

6.5 max_queued_messages_set()

Sets the maximum number of outgoing messages. This is valid for qos 1 or 2.

max_queued_messages_set(self, queue_size)

6.6 message_retry_set()

Sets the time in seconds before the client retries publishing a message. This is valid for qos 1 or 2.

message_retry_set(retry)

6.7 reconnect_delay_set()

This sets the number of seconds delay before the client automatically tries to reconnect to the broker if it disconnects. The number of seconds keeps doubling each time the client fails to reconnect upto a maximum of max_delay.

reconnect_delay_set(min_delay=1, max_delay=120)

6.8 enable_logger()/disable_logger()

Enables logging with the standard logging package.

enable_logger(logger=None)
disable_logger()

6.9 ws_set_options()

This is used if you are sending mqtt messages over a websocket and set transport = "websockets" in the client constructor.

ws_set_options(self, path="/mqtt", headers=None)

Liked This Tutorial? Hated it? Something I could do better?

Please let me know in the comments

0 Comments

Submit a Comment

Your email address will not be published. Required fields are marked *