import datetime
import logging
import socket
from typing import Optional, Coroutine, Type
import aiohttp_jinja2
import jinja2
import timeago
from aiohttp import web as aioweb
from aiohttp.web_runner import AppRunner
from aioxmpp import PresenceType, JID
from .behaviour import CyclicBehaviour
from .message import Message
logger = logging.getLogger("spade.Web")
[docs]def unused_port(hostname: str) -> None:
"""Return a port that is unused on the current host."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((hostname, 0))
return s.getsockname()[1]
[docs]async def start_server_in_loop(runner: AppRunner, hostname: str, port: int, agent):
"""
Listens to http requests and sends them to the webapp.
Args:
runner (AppRunner): AppRunner to process the http requests
hostname (str): host name to listen from.
port (int): port to listen from.
agent (spade.agent.Agent): agent that owns the web app.
"""
await runner.setup()
agent.web.server = aioweb.TCPSite(runner, hostname, port)
await agent.web.server.start()
logger.info(f"Serving on http://{hostname}:{port}/")
[docs]class WebApp(object):
"""Module to handle agent's web interface"""
def __init__(self, agent):
self.agent = agent
self.server = None
self.hostname = None
self.port = None
self.runner = None
self.app = aioweb.Application()
internal_loader = jinja2.PackageLoader(
"spade", package_path="templates", encoding="utf-8"
)
cwd_loader = jinja2.FileSystemLoader(".")
self.loaders = [internal_loader, cwd_loader]
self._set_loaders()
[docs] def start(
self,
hostname: Optional[str] = None,
port: Optional[int] = None,
templates_path: Optional[str] = None,
):
"""
Starts the web interface.
Args:
hostname (str, optional): host name to listen from. (Default value = None)
port (int, optional): port to listen from. (Default value = None)
templates_path (str, optional): path to look for templates. (Default value = None)
"""
self.hostname = hostname if hostname else "localhost"
if port:
self.port = port
elif not self.port:
self.port = unused_port(self.hostname)
if templates_path:
self.loaders.insert(0, jinja2.FileSystemLoader(templates_path))
self._set_loaders()
self.setup_routes()
self.runner = aioweb.AppRunner(self.app)
return self.agent.submit(
start_server_in_loop(self.runner, self.hostname, self.port, self.agent)
)
[docs] def is_started(self) -> bool:
return self.runner is not None
def _set_loaders(self) -> None:
loader = jinja2.ChoiceLoader(self.loaders)
aiohttp_jinja2.setup(
self.app,
loader=loader,
extensions=["jinja2_time.TimeExtension"],
context_processors=[self.agent_processor, aiohttp_jinja2.request_processor],
)
[docs] def setup_routes(self) -> None:
self.app.router.add_get("/spade", self.index)
self.app.router.add_get("/spade/stop", self.stop_agent)
self.app.router.add_get("/spade/stop/now/", self.stop_now)
self.app.router.add_get("/spade/messages/", self.get_messages)
self.app.router.add_get(
"/spade/behaviour/{behaviour_type}/{behaviour_class}/", self.get_behaviour
)
self.app.router.add_get(
"/spade/behaviour/{behaviour_type}/{behaviour_class}/kill/",
self.kill_behaviour,
)
self.app.router.add_get("/spade/agent/{agentjid}/", self.get_agent)
self.app.router.add_get(
"/spade/agent/{agentjid}/unsubscribe/", self.unsubscribe_agent
)
self.app.router.add_post("/spade/agent/{agentjid}/send/", self.send_agent)
[docs] def add_get(
self,
path: str,
controller: Coroutine,
template: str,
raw: Optional[bool] = False,
) -> None:
"""
Setup a route of type GET
Args:
path (str): URL to listen to
controller (coroutine): the coroutine to handle the request
template (str): the template to render the response or None if it is a JSON response
raw (bool): indicates if post-processing (jinja, json, etc) is needed or not
"""
if raw:
fn = controller
else:
fn = self._prepare_controller(controller, template)
self.app.router.add_get(path, fn)
[docs] def add_post(
self,
path: str,
controller: Coroutine,
template: str,
raw: Optional[bool] = False,
) -> None:
"""
Setup a route of type POST
Args:
path (str): URL to listen to
controller (coroutine): the coroutine to handle the request
template (str): the template to render the response or None if it is a JSON response
raw (bool): indicates if post-processing (jinja, json, etc) is needed or not
"""
if raw:
fn = controller
else:
fn = self._prepare_controller(controller, template)
self.app.router.add_post(path, fn)
def _prepare_controller(self, controller: Coroutine, template: str) -> None:
"""
Wraps the controller wether to render a jinja template or to return a json response (if template is None)
Args:
controller (coroutine): the coroutine to be wrapped
template (str): the name of the template or None
Returns:
coroutine: a wrapped coroutine of the controller
"""
if template:
fn = aiohttp_jinja2.template(template_name=template)(controller)
else:
fn = self._parse_json_response(controller)
return fn
@staticmethod
def _parse_json_response(func):
async def wrapper(*args, **kwargs):
result = await func(*args, **kwargs)
return aioweb.json_response(data=result, content_type="application/json")
return wrapper
[docs] @staticmethod
def timeago(date):
return timeago.format(date, datetime.datetime.now())
[docs] async def agent_processor(self, request):
messages = [
(self.timeago(m[0]), m[1]) for m in self.agent.traces.received(limit=5)
]
return {"agent": self.agent, "messages": messages}
# Default controllers for agent
[docs] @aiohttp_jinja2.template("internal_tpl_index.html")
async def index(self, request):
contacts = [
{
"jid": jid,
"avatar": self.agent.build_avatar_url(jid.bare()),
"available": c["presence"].type_ == PresenceType.AVAILABLE
if "presence" in c.keys()
else False,
"show": str(c["presence"].show).split(".")[1]
if "presence" in c.keys()
else None,
}
for jid, c in self.agent.presence.get_contacts().items()
]
return {"contacts": contacts}
[docs] @aiohttp_jinja2.template("internal_tpl_index.html")
async def stop_agent(self, request):
return {"stopping": True}
[docs] async def stop_now(self, request):
logger.warning("Stopping agent from web interface.")
await self.agent.stop()
return aioweb.json_response({}) # pragma: no cover
[docs] @aiohttp_jinja2.template("internal_tpl_messages.html")
async def get_messages(self, request):
messages = [(self.timeago(m[0]), m[1]) for m in self.agent.traces.received()]
return {"messages": messages}
[docs] @aiohttp_jinja2.template("internal_tpl_behaviour.html")
async def get_behaviour(self, request):
behaviour_str = f"{request.match_info['behaviour_type']}/{request.match_info['behaviour_class']}"
behaviour = self.find_behaviour(behaviour_str)
messages = [
(self.timeago(m[0]), m[1])
for m in self.agent.traces.filter(category=behaviour_str)
]
return {"behaviour": behaviour, "bmessages": messages}
[docs] async def kill_behaviour(self, request):
behaviour_str = f"{request.match_info['behaviour_type']}/{request.match_info['behaviour_class']}"
behaviour = self.find_behaviour(behaviour_str)
behaviour.kill()
raise aioweb.HTTPFound("/spade")
[docs] @aiohttp_jinja2.template("internal_tpl_agent.html")
async def get_agent(self, request):
agent_jid = request.match_info["agentjid"]
agent_messages = [
(self.timeago(m[0]), m[1]) for m in self.agent.traces.filter(to=agent_jid)
]
c = self.agent.presence.get_contact(JID.fromstr(agent_jid))
contact = {
"show": str(c["presence"].show).split(".")[1]
if "presence" in c.keys()
else None
}
return {"amessages": agent_messages, "ajid": agent_jid, "contact": contact}
[docs] async def unsubscribe_agent(self, request):
agent_jid = request.match_info["agentjid"]
self.agent.presence.unsubscribe(agent_jid)
raise aioweb.HTTPFound("/spade/agent/{agentjid}/".format(agentjid=agent_jid))
[docs] async def send_agent(self, request):
agent_jid = request.match_info["agentjid"]
form = await request.post()
body = form["message"]
logger.info("Sending message to {}: {}".format(agent_jid, body))
msg = Message(to=agent_jid, sender=str(self.agent.jid), body=body)
aioxmpp_msg = msg.prepare()
await self.agent.stream.send(aioxmpp_msg)
msg.sent = True
self.agent.traces.append(msg)
raise aioweb.HTTPFound("/spade/agent/{agentjid}/".format(agentjid=agent_jid))
[docs] def find_behaviour(self, behaviour_str: str) -> Optional[Type[CyclicBehaviour]]:
behav = None
for behaviour in self.agent.behaviours:
if str(behaviour) == behaviour_str:
behav = behaviour
break
return behav