Source code for spade.xmpp_client

import ssl

from slixmpp import ClientXMPP
import logging

from slixmpp.exceptions import IqError, IqTimeout


[docs]class RegistrationException(Exception): pass
[docs]class XMPPClient(ClientXMPP): def __init__(self, jid, password, verify_security, auto_register): ClientXMPP.__init__(self, jid, password) self.logger = logging.getLogger("spade.Agent") if not verify_security: self.ssl_context.check_hostname = False self.ssl_context.verify_mode = ssl.CERT_NONE self.add_event_handler("session_start", self.session_start) self.register_plugin("xep_0199") # XMPP Ping if auto_register: self.add_event_handler("register", self.register) self.register_plugin("xep_0077") # In-band-registration self.register_plugin("xep_0199") # Ping / Keepalive connection self["xep_0199"].enable_keepalive(interval=55)
[docs] def session_start(self, event): self.send_presence() self.get_roster()
[docs] async def register(self, event): resp = self.Iq() resp["type"] = "set" resp["register"]["username"] = self.boundjid.user resp["register"]["password"] = self.password try: await resp.send() except IqError: """ If the user is already registered, it will return an IQ error We can safely ignore it. The client will try the auth process right after the ibr process """ pass except IqTimeout: raise RegistrationException("Timeout error during the register process.")