Source code for spade.container

import asyncio
import logging
import sys
from contextlib import suppress
from typing import Coroutine, Awaitable

from singletonify import singleton

from .behaviour import BehaviourType
from .message import Message

logger = logging.getLogger("SPADE")

# check if python is 3.6 or higher
if sys.version_info >= (3, 7) and sys.platform == "win32":
    asyncio.set_event_loop_policy(
        asyncio.WindowsSelectorEventLoopPolicy()
    )  # pragma: no cover


[docs]def get_or_create_eventloop(): # pragma: no cover if sys.version_info < (3, 10): loop = asyncio.get_event_loop() else: try: loop = asyncio.get_running_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop
[docs]@singleton() class Container(object): """ The container class allows agents to exchange messages without using the XMPP socket if they are in the same process. The container is a singleton. """ def __init__(self): self.__agents = {} self.loop = get_or_create_eventloop() self.is_running = True def reset(self) -> None: """Empty the container by unregistering all the agents.""" self.__init__() def register(self, agent) -> None: """ Register a new agent. Args: agent (spade.agent.Agent): the agent to be registered """ self.__agents[str(agent.jid)] = agent agent.set_container(self) agent.set_loop(self.loop) def unregister(self, jid: str) -> None: if str(jid) in self.__agents: del self.__agents[str(jid)] def has_agent(self, jid: str) -> bool: """ Check if an agent is registered in the container. Args: jid (str): the jid of the agent to be checked. Returns: bool: wether the agent is or is not registered. """ return jid in self.__agents def get_agent(self, jid: str): """ Returns a registered agent Args: jid (str): the identifier of the agent Returns: spade.agent.Agent: the agent you were looking for Raises: KeyError: if the agent is not found """ return self.__agents[jid] async def send(self, msg: Message, behaviour: BehaviourType) -> None: """ This method sends the message using the container mechanism when the receiver is also registered in the container. Otherwise, it uses the XMPP send method from the original behaviour. Args: msg (spade.message.Message): the message to be sent behaviour: the behaviour that is sending the message """ to = str(msg.to) if to in self.__agents: self.__agents[to].dispatch(msg) else: await behaviour._xmpp_send(msg=msg) def run(self, coro: Awaitable) -> None: # pragma: no cover self.loop.run_until_complete(coro)
[docs]def run_container(main_func: Coroutine) -> None: # pragma: no cover container = Container() try: container.run(main_func) except KeyboardInterrupt: logger.warning("Keyboard interrupt received. Stopping SPADE...") except Exception as e: # pragma: no cover logger.error("Exception in the event loop: {}".format(e)) if sys.version_info >= (3, 7): # pragma: no cover tasks = asyncio.all_tasks(loop=container.loop) # pragma: no cover else: tasks = asyncio.Task.all_tasks(loop=container.loop) # pragma: no cover for task in tasks: task.cancel() with suppress(asyncio.CancelledError): container.run(task) container.loop.close() logger.debug("Loop closed")