924 lines
30 KiB
Python
Raw Normal View History

2025-04-10 17:31:33 +08:00
"""
*******************************************************************
Copyright (c) 2013, 2014 IBM Corp.
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Ian Craggs - initial implementation and/or documentation
*******************************************************************
"""
from __future__ import print_function
"""
Assertions are used to validate incoming data, but are omitted from outgoing packets. This is
so that the tests that use this package can send invalid data for error testing.
"""
import logging
logger = logging.getLogger("mqttsas")
# Low-level protocol interface
class MQTTException(Exception):
pass
# Message types
CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, \
PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, \
PINGREQ, PINGRESP, DISCONNECT = range(1, 15)
packetNames = [ "reserved", \
"Connect", "Connack", "Publish", "Puback", "Pubrec", "Pubrel", \
"Pubcomp", "Subscribe", "Suback", "Unsubscribe", "Unsuback", \
"Pingreq", "Pingresp", "Disconnect"]
classNames = [ "reserved", \
"Connects", "Connacks", "Publishes", "Pubacks", "Pubrecs", "Pubrels", \
"Pubcomps", "Subscribes", "Subacks", "Unsubscribes", "Unsubacks", \
"Pingreqs", "Pingresps", "Disconnects"]
def MessageType(byte):
if byte != None:
rc = ord(byte[0]) >> 4
else:
rc = None
return rc
def getPacket(aSocket):
"receive the next packet"
buf = aSocket.recv(1) # get the first byte fixed header
if buf == b"":
return None
if str(aSocket).find("[closed]") != -1:
closed = True
else:
closed = False
if closed:
return None
# now get the remaining length
multiplier = 1
remlength = 0
while 1:
next = aSocket.recv(1)
while len(next) == 0:
next = aSocket.recv(1)
buf += next
digit = ord(buf[-1])
remlength += (digit & 127) * multiplier
if digit & 128 == 0:
break
multiplier *= 128
# receive the remaining length if there is any
rest = ''
if remlength > 0:
while len(rest) < remlength:
rest += aSocket.recv(remlength-len(rest))
assert len(rest) == remlength
return buf + rest
class FixedHeaders:
def __init__(self, aMessageType):
self.MessageType = aMessageType
self.DUP = False
self.QoS = 0
self.RETAIN = False
self.remainingLength = 0
def __eq__(self, fh):
return self.MessageType == fh.MessageType and \
self.DUP == fh.DUP and \
self.QoS == fh.QoS and \
self.RETAIN == fh.RETAIN # and \
# self.remainingLength == fh.remainingLength
def __repr__(self):
"return printable representation of our data"
return classNames[self.MessageType]+'(DUP='+repr(self.DUP)+ \
", QoS="+repr(self.QoS)+", Retain="+repr(self.RETAIN)
def pack(self, length):
"pack data into string buffer ready for transmission down socket"
buffer = bytes([(self.MessageType << 4) | (self.DUP << 3) |\
(self.QoS << 1) | self.RETAIN])
self.remainingLength = length
buffer += self.encode(length)
return buffer
def encode(self, x):
assert 0 <= x <= 268435455
buffer = b''
while 1:
digit = x % 128
x //= 128
if x > 0:
digit |= 0x80
buffer += bytes([digit])
if x == 0:
break
return buffer
def unpack(self, buffer):
"unpack data from string buffer into separate fields"
b0 = ord(buffer[0])
self.MessageType = b0 >> 4
self.DUP = ((b0 >> 3) & 0x01) == 1
self.QoS = (b0 >> 1) & 0x03
self.RETAIN = (b0 & 0x01) == 1
(self.remainingLength, bytes) = self.decode(buffer[1:])
return bytes + 1 # length of fixed header
def decode(self, buffer):
multiplier = 1
value = 0
bytes = 0
while 1:
bytes += 1
digit = ord(buffer[0])
buffer = buffer[1:]
value += (digit & 127) * multiplier
if digit & 128 == 0:
break
multiplier *= 128
return (value, bytes)
def writeInt16(length):
return bytes([length // 256, length % 256])
def readInt16(buf):
return ord(buf[0])*256 + ord(buf[1])
def writeUTF(data):
# data could be a string, or bytes. If string, encode into bytes with utf-8
return writeInt16(len(data)) + (data if type(data) == type(b"") else bytes(data, "utf-8"))
def readUTF(buffer, maxlen):
if maxlen >= 2:
length = readInt16(buffer)
else:
raise MQTTException("Not enough data to read string length")
maxlen -= 2
if length > maxlen:
raise MQTTException("Length delimited string too long")
buf = buffer[2:2+length].decode("utf-8")
logger.info("[MQTT-4.7.3-2] topic names and filters not include null")
zz = buf.find("\x00") # look for null in the UTF string
if zz != -1:
raise MQTTException("[MQTT-1.5.3-2] Null found in UTF data "+buf)
"""for c in range (0xD800, 0xDFFF):
zz = buf.find(chr(c)) # look for D800-DFFF in the UTF string
if zz != -1:
raise MQTTException("[MQTT-1.5.3-1] D800-DFFF found in UTF data "+buf)
"""
if buf.find("\uFEFF") != -1:
logger.info("[MQTT-1.5.3-3] U+FEFF in UTF string")
return buf
def writeBytes(buffer):
return writeInt16(len(buffer)) + buffer
def readBytes(buffer):
length = readInt16(buffer)
return buffer[2:2+length]
class Packets:
def pack(self):
buffer = self.fh.pack(0)
return buffer
def __repr__(self):
return repr(self.fh)
def __eq__(self, packet):
return self.fh == packet.fh if packet else False
class Connects(Packets):
def __init__(self, buffer = None):
self.fh = FixedHeaders(CONNECT)
# variable header
self.ProtocolName = "MQTT"
self.ProtocolVersion = 4
self.CleanSession = True
self.WillFlag = False
self.WillQoS = 0
self.WillRETAIN = 0
self.KeepAliveTimer = 30
self.usernameFlag = False
self.passwordFlag = False
# Payload
self.ClientIdentifier = "" # UTF-8
self.WillTopic = None # UTF-8
self.WillMessage = None # binary
self.username = None # UTF-8
self.password = None # binary
if buffer != None:
self.unpack(buffer)
def pack(self):
connectFlags = bytes([(self.CleanSession << 1) | (self.WillFlag << 2) | \
(self.WillQoS << 3) | (self.WillRETAIN << 5) | \
(self.usernameFlag << 6) | (self.passwordFlag << 7)])
buffer = writeUTF(self.ProtocolName) + bytes([self.ProtocolVersion]) + \
connectFlags + writeInt16(self.KeepAliveTimer)
buffer += writeUTF(self.ClientIdentifier)
if self.WillFlag:
buffer += writeUTF(self.WillTopic)
buffer += writeBytes(self.WillMessage)
if self.usernameFlag:
buffer += writeUTF(self.username)
if self.passwordFlag:
buffer += writeBytes(self.password)
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == CONNECT
try:
fhlen = self.fh.unpack(buffer)
packlen = fhlen + self.fh.remainingLength
assert len(buffer) >= packlen, "buffer length %d packet length %d" % (len(buffer), packlen)
curlen = fhlen # points to after header + remaining length
assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
assert self.fh.QoS == 0, "[MQTT-2.1.2-1] QoS was not 0, was %d" % self.fh.QoS
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
self.ProtocolName = readUTF(buffer[curlen:], packlen - curlen)
curlen += len(self.ProtocolName) + 2
assert self.ProtocolName == "MQTT", "Wrong protocol name %s" % self.ProtocolName
self.ProtocolVersion = ord(buffer[curlen])
curlen += 1
connectFlags = ord(buffer[curlen])
assert (connectFlags & 0x01) == 0, "[MQTT-3.1.2-3] reserved connect flag must be 0"
self.CleanSession = ((connectFlags >> 1) & 0x01) == 1
self.WillFlag = ((connectFlags >> 2) & 0x01) == 1
self.WillQoS = (connectFlags >> 3) & 0x03
self.WillRETAIN = (connectFlags >> 5) & 0x01
self.passwordFlag = ((connectFlags >> 6) & 0x01) == 1
self.usernameFlag = ((connectFlags >> 7) & 0x01) == 1
curlen +=1
if self.WillFlag:
assert self.WillQoS in [0, 1, 2], "[MQTT-3.1.2-14] will qos must not be 3"
else:
assert self.WillQoS == 0, "[MQTT-3.1.2-13] will qos must be 0, if will flag is false"
assert self.WillRETAIN == False, "[MQTT-3.1.2-14] will retain must be false, if will flag is false"
self.KeepAliveTimer = readInt16(buffer[curlen:])
curlen += 2
logger.info("[MQTT-3.1.3-3] Clientid must be present, and first field")
logger.info("[MQTT-3.1.3-4] Clientid must be Unicode, and between 0 and 65535 bytes long")
self.ClientIdentifier = readUTF(buffer[curlen:], packlen - curlen)
curlen += len(self.ClientIdentifier) + 2
if self.WillFlag:
self.WillTopic = readUTF(buffer[curlen:], packlen - curlen)
curlen += len(self.WillTopic) + 2
self.WillMessage = readBytes(buffer[curlen:])
curlen += len(self.WillMessage) + 2
logger.info("[[MQTT-3.1.2-9] will topic and will message fields must be present")
else:
self.WillTopic = self.WillMessage = None
if self.usernameFlag:
assert len(buffer) > curlen+2, "Buffer too short to read username length"
self.username = readUTF(buffer[curlen:], packlen - curlen)
curlen += len(self.username) + 2
logger.info("[MQTT-3.1.2-19] username must be in payload if user name flag is 1")
else:
logger.info("[MQTT-3.1.2-18] username must not be in payload if user name flag is 0")
assert self.passwordFlag == False, "[MQTT-3.1.2-22] password flag must be 0 if username flag is 0"
if self.passwordFlag:
assert len(buffer) > curlen+2, "Buffer too short to read password length"
self.password = readBytes(buffer[curlen:])
curlen += len(self.password) + 2
logger.info("[MQTT-3.1.2-21] password must be in payload if password flag is 0")
else:
logger.info("[MQTT-3.1.2-20] password must not be in payload if password flag is 0")
if self.WillFlag and self.usernameFlag and self.passwordFlag:
logger.info("[MQTT-3.1.3-1] clientid, will topic, will message, username and password all present")
assert curlen == packlen, "Packet is wrong length curlen %d != packlen %d"
except:
logger.exception("[MQTT-3.1.4-1] server must validate connect packet and close connection without connack if it does not conform")
raise
def __repr__(self):
buf = repr(self.fh)+", ProtocolName="+str(self.ProtocolName)+", ProtocolVersion=" +\
repr(self.ProtocolVersion)+", CleanSession="+repr(self.CleanSession) +\
", WillFlag="+repr(self.WillFlag)+", KeepAliveTimer=" +\
repr(self.KeepAliveTimer)+", ClientId="+str(self.ClientIdentifier) +\
", usernameFlag="+repr(self.usernameFlag)+", passwordFlag="+repr(self.passwordFlag)
if self.WillFlag:
buf += ", WillQoS=" + repr(self.WillQoS) +\
", WillRETAIN=" + repr(self.WillRETAIN) +\
", WillTopic='"+ self.WillTopic +\
"', WillMessage='"+str(self.WillMessage)+"'"
if self.username:
buf += ", username="+self.username
if self.password:
buf += ", password="+str(self.password)
return buf+")"
def __eq__(self, packet):
rc = Packets.__eq__(self, packet) and \
self.ProtocolName == packet.ProtocolName and \
self.ProtocolVersion == packet.ProtocolVersion and \
self.CleanSession == packet.CleanSession and \
self.WillFlag == packet.WillFlag and \
self.KeepAliveTimer == packet.KeepAliveTimer and \
self.ClientIdentifier == packet.ClientIdentifier and \
self.WillFlag == packet.WillFlag
if rc and self.WillFlag:
rc = self.WillQoS == packet.WillQoS and \
self.WillRETAIN == packet.WillRETAIN and \
self.WillTopic == packet.WillTopic and \
self.WillMessage == packet.WillMessage
return rc
class Connacks(Packets):
def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, ReturnCode=0):
self.fh = FixedHeaders(CONNACK)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
self.flags = 0
self.returnCode = ReturnCode
if buffer != None:
self.unpack(buffer)
def pack(self):
buffer = bytes([self.flags, self.returnCode])
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 4
assert MessageType(buffer) == CONNACK
self.fh.unpack(buffer)
assert self.fh.remainingLength == 2, "Connack packet is wrong length %d" % self.fh.remainingLength
assert ord(buffer[2]) in [0, 1], "Connect Acknowledge Flags"
self.returnCode = ord(buffer[3])
assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
def __repr__(self):
return repr(self.fh)+", Session present="+str((self.flags & 0x01) == 1)+", ReturnCode="+repr(self.returnCode)+")"
def __eq__(self, packet):
return Packets.__eq__(self, packet) and \
self.returnCode == packet.returnCode
class Disconnects(Packets):
def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False):
self.fh = FixedHeaders(DISCONNECT)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
if buffer != None:
self.unpack(buffer)
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == DISCONNECT
self.fh.unpack(buffer)
assert self.fh.remainingLength == 0, "Disconnect packet is wrong length %d" % self.fh.remainingLength
logger.info("[MQTT-3.14.1-1] disconnect reserved bits must be 0")
assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
def __repr__(self):
return repr(self.fh)+")"
class Publishes(Packets):
def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0, TopicName="", Payload=b""):
self.fh = FixedHeaders(PUBLISH)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
# variable header
self.topicName = TopicName
self.messageIdentifier = MsgId
# payload
self.data = Payload
if buffer != None:
self.unpack(buffer)
def pack(self):
buffer = writeUTF(self.topicName)
if self.fh.QoS != 0:
buffer += writeInt16(self.messageIdentifier)
buffer += self.data
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == PUBLISH
fhlen = self.fh.unpack(buffer)
assert self.fh.QoS in [0, 1, 2], "QoS in Publish must be 0, 1, or 2"
packlen = fhlen + self.fh.remainingLength
assert len(buffer) >= packlen
curlen = fhlen
try:
self.topicName = readUTF(buffer[fhlen:], packlen - curlen)
except UnicodeDecodeError:
logger.info("[MQTT-3.3.2-1] topic name in publish must be utf-8")
raise
curlen += len(self.topicName) + 2
if self.fh.QoS != 0:
self.messageIdentifier = readInt16(buffer[curlen:])
logger.info("[MQTT-2.3.1-1] packet indentifier must be in publish if QoS is 1 or 2")
curlen += 2
assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0"
else:
logger.info("[MQTT-2.3.1-5] no packet indentifier in publish if QoS is 0")
self.messageIdentifier = 0
self.data = buffer[curlen:fhlen + self.fh.remainingLength]
if self.fh.QoS == 0:
assert self.fh.DUP == False, "[MQTT-2.1.2-4]"
return fhlen + self.fh.remainingLength
def __repr__(self):
rc = repr(self.fh)
if self.fh.QoS != 0:
rc += ", MsgId="+repr(self.messageIdentifier)
rc += ", TopicName="+repr(self.topicName)+", Payload="+repr(self.data)+")"
return rc
def __eq__(self, packet):
rc = Packets.__eq__(self, packet) and \
self.topicName == packet.topicName and \
self.data == packet.data
if rc and self.fh.QoS != 0:
rc = self.messageIdentifier == packet.messageIdentifier
return rc
class Pubacks(Packets):
def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0):
self.fh = FixedHeaders(PUBACK)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
# variable header
self.messageIdentifier = MsgId
if buffer != None:
self.unpack(buffer)
def pack(self):
buffer = writeInt16(self.messageIdentifier)
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == PUBACK
fhlen = self.fh.unpack(buffer)
assert self.fh.remainingLength == 2, "Puback packet is wrong length %d" % self.fh.remainingLength
assert len(buffer) >= fhlen + self.fh.remainingLength
self.messageIdentifier = readInt16(buffer[fhlen:])
assert self.fh.DUP == False, "[MQTT-2.1.2-1] Puback reserved bits must be 0"
assert self.fh.QoS == 0, "[MQTT-2.1.2-1] Puback reserved bits must be 0"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Puback reserved bits must be 0"
return fhlen + 2
def __repr__(self):
return repr(self.fh)+", MsgId "+repr(self.messageIdentifier)
def __eq__(self, packet):
return Packets.__eq__(self, packet) and \
self.messageIdentifier == packet.messageIdentifier
class Pubrecs(Packets):
def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0):
self.fh = FixedHeaders(PUBREC)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
# variable header
self.messageIdentifier = MsgId
if buffer != None:
self.unpack(buffer)
def pack(self):
buffer = writeInt16(self.messageIdentifier)
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == PUBREC
fhlen = self.fh.unpack(buffer)
assert self.fh.remainingLength == 2, "Pubrec packet is wrong length %d" % self.fh.remainingLength
assert len(buffer) >= fhlen + self.fh.remainingLength
self.messageIdentifier = readInt16(buffer[fhlen:])
assert self.fh.DUP == False, "[MQTT-2.1.2-1] Pubrec reserved bits must be 0"
assert self.fh.QoS == 0, "[MQTT-2.1.2-1] Pubrec reserved bits must be 0"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Pubrec reserved bits must be 0"
return fhlen + 2
def __repr__(self):
return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+")"
def __eq__(self, packet):
return Packets.__eq__(self, packet) and \
self.messageIdentifier == packet.messageIdentifier
class Pubrels(Packets):
def __init__(self, buffer=None, DUP=False, QoS=1, Retain=False, MsgId=0):
self.fh = FixedHeaders(PUBREL)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
# variable header
self.messageIdentifier = MsgId
if buffer != None:
self.unpack(buffer)
def pack(self):
buffer = writeInt16(self.messageIdentifier)
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == PUBREL
fhlen = self.fh.unpack(buffer)
assert self.fh.remainingLength == 2, "Pubrel packet is wrong length %d" % self.fh.remainingLength
assert len(buffer) >= fhlen + self.fh.remainingLength
self.messageIdentifier = readInt16(buffer[fhlen:])
assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP should be False in PUBREL"
assert self.fh.QoS == 1, "[MQTT-2.1.2-1] QoS should be 1 in PUBREL"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] RETAIN should be False in PUBREL"
logger.info("[MQTT-3.6.1-1] bits in fixed header for pubrel are ok")
return fhlen + 2
def __repr__(self):
return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+")"
def __eq__(self, packet):
return Packets.__eq__(self, packet) and \
self.messageIdentifier == packet.messageIdentifier
class Pubcomps(Packets):
def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0):
self.fh = FixedHeaders(PUBCOMP)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
# variable header
self.messageIdentifier = MsgId
if buffer != None:
self.unpack(buffer)
def pack(self):
buffer = writeInt16(self.messageIdentifier)
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == PUBCOMP
fhlen = self.fh.unpack(buffer)
assert len(buffer) >= fhlen + self.fh.remainingLength
assert self.fh.remainingLength == 2, "Pubcomp packet is wrong length %d" % self.fh.remainingLength
self.messageIdentifier = readInt16(buffer[fhlen:])
assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP should be False in Pubcomp"
assert self.fh.QoS == 0, "[MQTT-2.1.2-1] QoS should be 0 in Pubcomp"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Retain should be false in Pubcomp"
return fhlen + 2
def __repr__(self):
return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+")"
def __eq__(self, packet):
return Packets.__eq__(self, packet) and \
self.messageIdentifier == packet.messageIdentifier
class Subscribes(Packets):
def __init__(self, buffer=None, DUP=False, QoS=1, Retain=False, MsgId=0, Data=[]):
self.fh = FixedHeaders(SUBSCRIBE)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
# variable header
self.messageIdentifier = MsgId
# payload - list of topic, qos pairs
self.data = Data[:]
if buffer != None:
self.unpack(buffer)
def pack(self):
buffer = writeInt16(self.messageIdentifier)
for d in self.data:
buffer += writeUTF(d[0]) + bytes([d[1]])
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == SUBSCRIBE
fhlen = self.fh.unpack(buffer)
assert len(buffer) >= fhlen + self.fh.remainingLength
logger.info("[MQTT-2.3.1-1] packet indentifier must be in subscribe")
self.messageIdentifier = readInt16(buffer[fhlen:])
assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0"
leftlen = self.fh.remainingLength - 2
self.data = []
while leftlen > 0:
topic = readUTF(buffer[-leftlen:], leftlen)
leftlen -= len(topic) + 2
qos = ord(buffer[-leftlen])
assert qos in [0, 1, 2], "[MQTT-3-8.3-2] reserved bits must be zero"
leftlen -= 1
self.data.append((topic, qos))
assert len(self.data) > 0, "[MQTT-3.8.3-1] at least one topic, qos pair must be in subscribe"
assert leftlen == 0
assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP must be false in subscribe"
assert self.fh.QoS == 1, "[MQTT-2.1.2-1] QoS must be 1 in subscribe"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] RETAIN must be false in subscribe"
return fhlen + self.fh.remainingLength
def __repr__(self):
return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+\
", Data="+repr(self.data)+")"
def __eq__(self, packet):
return Packets.__eq__(self, packet) and \
self.messageIdentifier == packet.messageIdentifier and \
self.data == packet.data
class Subacks(Packets):
def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0, Data=[]):
self.fh = FixedHeaders(SUBACK)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
# variable header
self.messageIdentifier = MsgId
# payload - list of qos
self.data = Data[:]
if buffer != None:
self.unpack(buffer)
def pack(self):
buffer = writeInt16(self.messageIdentifier)
for d in self.data:
buffer += bytes([d])
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == SUBACK
fhlen = self.fh.unpack(buffer)
assert len(buffer) >= fhlen + self.fh.remainingLength
self.messageIdentifier = readInt16(buffer[fhlen:])
leftlen = self.fh.remainingLength - 2
self.data = []
while leftlen > 0:
qos = buffer[-leftlen]
assert ord(qos) in [0, 1, 2, 0x80], "[MQTT-3.9.3-2] return code in QoS must be 0, 1, 2 or 0x80, was "+ord(qos)
leftlen -= 1
self.data.append(qos)
assert leftlen == 0
assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP should be false in suback"
assert self.fh.QoS == 0, "[MQTT-2.1.2-1] QoS should be 0 in suback"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Retain should be false in suback"
return fhlen + self.fh.remainingLength
def __repr__(self):
return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+\
", Data="+repr(self.data)+")"
def __eq__(self, packet):
return Packets.__eq__(self, packet) and \
self.messageIdentifier == packet.messageIdentifier and \
self.data == packet.data
class Unsubscribes(Packets):
def __init__(self, buffer=None, DUP=False, QoS=1, Retain=False, MsgId=0, Data=[]):
self.fh = FixedHeaders(UNSUBSCRIBE)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
# variable header
self.messageIdentifier = MsgId
# payload - list of topics
self.data = Data[:]
if buffer != None:
self.unpack(buffer)
def pack(self):
buffer = writeInt16(self.messageIdentifier)
for d in self.data:
buffer += writeUTF(d)
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == UNSUBSCRIBE
fhlen = self.fh.unpack(buffer)
assert len(buffer) >= fhlen + self.fh.remainingLength
logger.info("[MQTT-2.3.1-1] packet indentifier must be in unsubscribe")
self.messageIdentifier = readInt16(buffer[fhlen:])
assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0"
leftlen = self.fh.remainingLength - 2
self.data = []
while leftlen > 0:
topic = readUTF(buffer[-leftlen:], leftlen)
leftlen -= len(topic) + 2
self.data.append(topic)
assert leftlen == 0
assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
assert self.fh.QoS == 1, "[MQTT-2.1.2-1]"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
logger.info("[MQTT-3-10.1-1] fixed header bits are 0,0,1,0")
return fhlen + self.fh.remainingLength
def __repr__(self):
return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+\
", Data="+repr(self.data)+")"
def __eq__(self, packet):
return Packets.__eq__(self, packet) and \
self.messageIdentifier == packet.messageIdentifier and \
self.data == packet.data
class Unsubacks(Packets):
def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0):
self.fh = FixedHeaders(UNSUBACK)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
# variable header
self.messageIdentifier = MsgId
if buffer != None:
self.unpack(buffer)
def pack(self):
buffer = writeInt16(self.messageIdentifier)
buffer = self.fh.pack(len(buffer)) + buffer
return buffer
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == UNSUBACK
fhlen = self.fh.unpack(buffer)
assert len(buffer) >= fhlen + self.fh.remainingLength
self.messageIdentifier = readInt16(buffer[fhlen:])
assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0"
self.messageIdentifier = readInt16(buffer[fhlen:])
assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
return fhlen + self.fh.remainingLength
def __repr__(self):
return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+")"
def __eq__(self, packet):
return Packets.__eq__(self, packet) and \
self.messageIdentifier == packet.messageIdentifier
class Pingreqs(Packets):
def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False):
self.fh = FixedHeaders(PINGREQ)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
if buffer != None:
self.unpack(buffer)
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == PINGREQ
fhlen = self.fh.unpack(buffer)
assert self.fh.remainingLength == 0
assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
return fhlen
def __repr__(self):
return repr(self.fh)+")"
class Pingresps(Packets):
def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False):
self.fh = FixedHeaders(PINGRESP)
self.fh.DUP = DUP
self.fh.QoS = QoS
self.fh.Retain = Retain
if buffer != None:
self.unpack(buffer)
def unpack(self, buffer):
assert len(buffer) >= 2
assert MessageType(buffer) == PINGRESP
fhlen = self.fh.unpack(buffer)
assert self.fh.remainingLength == 0
assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
return fhlen
def __repr__(self):
return repr(self.fh)+")"
classes = [None, Connects, Connacks, Publishes, Pubacks, Pubrecs,
Pubrels, Pubcomps, Subscribes, Subacks, Unsubscribes,
Unsubacks, Pingreqs, Pingresps, Disconnects]
def unpackPacket(buffer):
if MessageType(buffer) != None:
packet = classes[MessageType(buffer)]()
packet.unpack(buffer)
else:
packet = None
return packet
if __name__ == "__main__":
fh = FixedHeaders(CONNECT)
tests = [0, 56, 127, 128, 8888, 16383, 16384, 65535, 2097151, 2097152,
20555666, 268435454, 268435455]
for x in tests:
try:
assert x == fh.decode(fh.encode(x))[0]
except AssertionError:
print("Test failed for x =", x, fh.decode(fh.encode(x)))
try:
fh.decode(fh.encode(268435456))
print("Error")
except AssertionError:
pass
for packet in classes[1:]:
before = str(packet())
after = str(unpackPacket(packet().pack()))
try:
assert before == after
except:
print("before:", before, "\nafter:", after)
print("End")