import asyncio
import copy
import json
import logging
from collections import OrderedDict
from datetime import datetime
from smia.logic import acl_smia_messages_utils
from spade.behaviour import OneShotBehaviour, CyclicBehaviour
from spade.message import Message
import smia
from smia import GeneralUtils
from smia.css_ontology.css_ontology_utils import CapabilitySkillACLInfo
from smia.utilities.fipa_acl_info import ServiceTypes, FIPAACLInfo, ACLSMIAOntologyInfo, ACLSMIAJSONSchemas
from smia.utilities.smia_info import SMIAInteractionInfo
from operator_gui_logic import GUIFeatures, GUIControllers
_logger = logging.getLogger(__name__)
[docs]
class OperatorGUIBehaviour(OneShotBehaviour):
"""The behavior for the Operator only needs to add the web interface to the SMIA SPADE agent and the GUI related
resources (HTML web pages and drivers)."""
[docs]
async def run(self) -> None:
# First, the dictionary is initialized to add the menu entries that are required in runtime. The name of the
# SMIA SPADE agent is also initialized to be used in the added HTMLs templates
self.agent.web_menu_entries = OrderedDict()
self.agent.smia_version = smia.__version__
# self.agent.agent_name = str(self.agent.jid).split('@')[0] # tambien se puede lograr mediante agent.jid.localpart
self.agent.build_avatar_url = GUIFeatures.build_avatar_url
# The dictionaries to build operator HTML webpage are also initialized
self.agent.loaded_statistics = {'AASmodels': 0, 'AvailableSMIAs': 0,
'Capabilities': 0, 'Skills': 0}
self.agent.css_elems_info = {}
self.agent.skills_info = {}
self.agent.available_smia_selection = []
self.agent.request_exec_info = {}
_logger.info("SMIA SPADE web interface required resources initialized.")
# The SMIA icon is added as the avatar of the GUI
await GUIFeatures.add_custom_favicon(self.agent)
_logger.info("Added SMIA Favicon to the web interface.")
# The controllers class is also created offering the agent object
self.operator_gui_controllers = GUIControllers(self.agent)
# Then, the required HTML webpages are added to the SMIA SPADE web module
self.agent.web.add_get('/smia_operator', self.operator_gui_controllers.hello_controller,
'/htmls/smia_operator.html')
self.agent.web.add_get("/smia_operator/load", self.operator_gui_controllers.operator_load_controller, None)
self.agent.web.add_post("/smia_operator/select", self.operator_gui_controllers.operator_select_controller, None)
self.agent.web.add_post('/smia_operator/submit', self.operator_gui_controllers.operator_request_controller,
'/htmls/smia_operator_submit.html')
# The new webpages need also to be added in the manu of the web interface
# await GUIFeatures.add_new_menu_entry(self.agent,'System view', '/system_view', 'fa fa-eye')
self.agent.web.add_menu_entry("SMIA operator", "/smia_operator", "fa fa-user-cog")
_logger.info("Added new web pages to the web interface.")
# TODO se ha aƱadido el Sen ACL del GUIAgent para realizar la prueba, hay que desarrollar los HTMLs para el
# operario y aƱadirlos
# The behaviour to receive all FIPA-ACL message is also added to SMIA SPADE agent
operator_recv_behav = OperatorReceiveBehaviour()
self.agent.add_behaviour(operator_recv_behav)
# Once all the configuration is done, the web interface is enabled in the SMIA SPADE agent
self.agent.web.start(hostname="0.0.0.0", port="10000")
_logger.info("Started SMIA SPADE web interface.")
[docs]
class OperatorReceiveBehaviour(CyclicBehaviour):
[docs]
async def on_start(self) -> None:
# The required global dictionaries are added to the agent
self.agent.received_msgs = []
self.agent.waiting_behavs = {}
[docs]
async def run(self) -> None:
msg = await self.receive(timeout=10) # wait for a message for 10 seconds
if msg:
_logger.aclinfo("FIPA-ACL Message received from {} with performative {} and content: {}".format(
msg.sender, msg.get_metadata('performative'), msg.body))
self.agent.received_msgs.append(msg)
if msg.thread in self.agent.waiting_behavs:
_logger.info("There is a behaviour waiting for this message.")
await self.unlock_behaviour(msg.thread, self.agent.waiting_behavs[msg.thread], msg)
[docs]
async def unlock_behaviour(self, thread, behav_name, msg):
for behaviour in self.agent.behaviours:
behav_class_name = str(behaviour.__class__.__name__)
if behav_class_name == behav_name:
if behaviour.thread == thread:
# Once the exact behaviour has been found, its execution is unlocked and the ACL message is offered
behaviour.receive_msg = msg
behaviour.receive_msg_event.set()
# The Event object is unset so that it can be used again.
behaviour.receive_msg_event.clear()
# The behaviour is also remove from the global dictionary
self.agent.waiting_behavs.pop(thread)
break
[docs]
class OperatorRequestBehaviour(OneShotBehaviour):
"""
This behaviour handles the CSS-related requests through FIPA-ACL messages.
"""
def __init__(self, agent_object, req_data):
"""
The constructor method is rewritten to add the object of the agent.
Args:
agent_object (spade.Agent): the SPADE agent object of the SMIA agent.
req_data (dict): all the information about the CSS-related request
"""
# The constructor of the inherited class is executed.
super().__init__()
# The SPADE agent object is stored as a variable of the behaviour class
self.myagent = agent_object
self.request_data = req_data
self.receive_msg_event = asyncio.Event() # In order to wait for specific incoming messages
self.receive_msg = None
self.thread = req_data['thread']
self.smia_id_list = req_data['formData'].getall('smia_id[]', [])
self.smia_version_list = req_data['formData'].getall('smia_version[]', [])
self.asset_id_list = req_data['formData'].getall('asset_id[]', [])
self.selected = req_data['formData'].getall('checkbox[]', [])
self.capability = req_data['formData'].get('capability', None) # Default if missing
self.constraints = req_data['formData'].get('constraints', None)
self.skill = req_data['formData'].get('skill', None)
self.skill_params = req_data['formData'].get('skillParams', None)
self.form_data = req_data['formData']
# Group data by row index
self.processed_data = []
self.selected_smia_ids = []
for idx, smia_id in enumerate(self.smia_id_list):
if smia_id in self.selected:
self.processed_data.append({
"smiaID": smia_id,
"smiaVersion": self.smia_version_list[idx],
"assetID": self.asset_id_list[idx],
})
self.selected_smia_ids.append(smia_id + '@' + str(self.myagent.jid.domain))
[docs]
async def run(self) -> None:
try:
# The ACL message template is created
msg = Message(thread=self.thread)
msg_body_json = {'serviceID': 'capabilityRequest',
'serviceType': ServiceTypes.ASSET_RELATED_SERVICE,
'serviceData': {'serviceCategory': 'service-request',
'serviceParams': {
'capabilityName': self.capability, 'skillName': self.skill
}}
}
if self.skill_params is not None:
skill_params_dict = {}
for param in set(eval(self.skill_params)):
param_value = self.form_data.get(param, None)
if param_value is None:
_logger.warning("The value of the {} parameter is missing, it is possible that the capability "
"cannot be executed.".format(param))
skill_params_dict[param] = param_value
msg_body_json['serviceData']['serviceParams']['skillParameterValues'] = skill_params_dict
if self.constraints is not None:
msg_body_json['serviceData']['serviceParams'][
CapabilitySkillACLInfo.REQUIRED_CAPABILITY_CONSTRAINTS] = eval(self.constraints)
# The information of the CSS-related request is added in the agent dictionary for the HTML result page
self.myagent.request_exec_info['InteractionsDict'].append(
{'type': 'analysis', 'title': 'Analyzing operator selection ...', 'capability': self.capability,
'skill': self.skill, 'constraints': self.constraints, 'smia_ids': self.selected_smia_ids})
# The JSON for the message body is added to message object
msg.body = json.dumps(msg_body_json)
smia_id = self.selected_smia_ids[0]
if len(self.processed_data) > 1:
# TODO HACER AHORA ! Para multiples SMIAs falta modificar las estructuras ACL a partir de SMIA v0.2.4
_logger.info("There are multiple SMIAs to be requested: negotiation is required")
# The negotiation request is made by performative CallForProposal (CFP)
general_thread = str(self.thread)
self.thread = msg.thread + '-neg' # It needs to be updated in order to receive later the associated response msg
# msg.thread = self.thread
msg.metadata = SMIAInteractionInfo.NEG_STANDARD_ACL_TEMPLATE_CFP.metadata
# The negotiation request ACL message is prepared
neg_body_json = copy.deepcopy(msg_body_json)
neg_body_json['serviceData']['serviceParams'].update({'neg_requester_jid': str(self.myagent.jid),
'targets': (','.join(self.selected_smia_ids))})
# 'capabilityName': 'Negotiation', 'skillName': 'NegotiationBasedOnRAM'})
if self.capability != 'Negotiation':
# If the capability requested is not Negotiation, the skill will be using RAM that every SMIA has
neg_body_json['serviceData']['serviceParams'].update({'capabilityName': 'Negotiation',
'skillName': 'NegotiationBasedOnRAM'})
if (OperatorRequestBehaviour.version_str_to_tuple(self.get_smia_version_by_id(smia_id)) >=
OperatorRequestBehaviour.version_str_to_tuple('0.2.4')):
_logger.info("It is a version higher than 0.2.4, so it requires a specific FIPA-SMIA-ACL "
"message format.")
neg_msg_metadata = {'performative': FIPAACLInfo.FIPA_ACL_PERFORMATIVE_CFP,
'ontology': ACLSMIAOntologyInfo.ACL_ONTOLOGY_CSS_SERVICE,
'protocol': FIPAACLInfo.FIPA_ACL_CONTRACT_NET_PROTOCOL}
neg_body_json = await self.adapt_msg_to_fipa_smiacl(neg_body_json)
neg_body_json = await acl_smia_messages_utils.generate_json_from_schema(
ACLSMIAJSONSchemas.JSON_SCHEMA_CSS_SERVICE, capabilityIRI=self.capability,
skillIRI=self.skill, constraints=neg_body_json.get('constraints'), # Adapted constraints
skillParams=neg_body_json.get('skillParams'), # Adapted skill params
negCriterion='http://www.w3id.org/hsu-aut/css#NegotiationBasedOnRAM',
negRequester=str(self.myagent.jid), negTargets=self.selected_smia_ids)
else:
neg_msg_metadata = SMIAInteractionInfo.NEG_STANDARD_ACL_TEMPLATE_CFP.metadata
# The updated JSON for the message body is added to message object
msg.body = json.dumps(neg_body_json)
for smia_id in self.selected_smia_ids:
# The CFP message is sent to each SMIA participant of the negotiation
msg.to = smia_id
_logger.aclinfo("Sending Negotiation request to {}...".format(smia_id))
neg_msg = OperatorRequestBehaviour.create_acl_msg(
smia_id, self.thread, neg_msg_metadata, neg_body_json)
await self.send(neg_msg)
# await self.send(msg)
_logger.aclinfo("Message sent to {}!".format(msg.to))
self.myagent.request_exec_info['Interactions'] += 1
# The information about the negotiation request is added in the dictionary for the HTML result page
self.myagent.request_exec_info['InteractionsDict'].append(
{'type': 'acl_send', 'title': 'Requesting negotiation betweeen selected SMIAs ...',
'message': 'As several SMIAs have been selected, all of them {} have been asked to negotiate with '
'each other in order to obtain the best option.'.format(self.selected_smia_ids)})
# The behaviour need to wait to the response message of negotiation winner
self.myagent.waiting_behavs[self.thread] = self.__class__.__name__
_logger.info('The behaviour will wait for the winner of the negotiation...')
await self.receive_msg_event.wait()
# The request for negotiation has been answered
self.thread = msg.thread
# The SMIA id to request the capability is updated to create correctly the next ACL message
response_body = acl_smia_messages_utils.get_parsed_body_from_acl_msg(self.receive_msg)
if 'serviceData' in response_body:
smia_id = eval(json.loads(self.receive_msg.body)['serviceData']['serviceParams'])['winner']
elif 'winner' in response_body and response_body['winner']:
smia_id = acl_smia_messages_utils.get_sender_from_acl_msg(self.receive_msg)
else:
smia_id = str(response_body)
# The information about the negotiation response is added in the dictionary for the HTML result page
response_info = {'type': 'acl_recv', 'title': 'Obtaining negotiation winner ...'}
if self.receive_msg.get_metadata('performative') == FIPAACLInfo.FIPA_ACL_PERFORMATIVE_FAILURE:
response_info.update({'response_type': 'failure', 'response_title':
'The negotiation process has not been completed.', 'response_msg':
str(json.loads(self.receive_msg.body)['serviceData']['serviceParams'])})
else:
response_info.update({'response_type': 'success', 'response_title':
'The negotiation process has been completed.', 'response_msg':
'The SMIA winner of the negotiation is: {}.'.format(smia_id)})
self.myagent.request_exec_info['InteractionsDict'].append(response_info)
if ((self.capability != 'Negotiation') or
(self.capability == 'Negotiation' and len(self.processed_data) == 1)): # TODO CUIDADO SI SE CAMBIA EL NOMBRE DE NEGOTIATION
# If the capacity is not Negotiation and there are several SMIA, a request for negotiation had to be made
# and the winner has been received, so the capacity will have to be requested from the winner. If there is
# only one SMIA, the capacity will be requested directly.
if (OperatorRequestBehaviour.version_str_to_tuple(self.get_smia_version_by_id(smia_id)) >=
OperatorRequestBehaviour.version_str_to_tuple('0.2.4')):
_logger.info("It is a version higher than 0.2.4, so it requires a specific FIPA-SMIA-ACL "
"message format.")
msg_metadata = {'performative': 'request', 'ontology': 'css-service', 'protocol': 'fipa-request'}
msg_body_json = await self.adapt_msg_to_fipa_smiacl(msg_body_json)
else:
if self.capability == 'Negotiation':
# In this particular case, the negotiation request is made via the performative CallForProposal
msg_metadata = SMIAInteractionInfo.NEG_STANDARD_ACL_TEMPLATE_CFP.metadata
msg_body_json['serviceData']['serviceParams'].update({'neg_requester_jid': str(self.myagent.jid),
'targets': smia_id})
else:
msg_metadata = SMIAInteractionInfo.CAP_STANDARD_ACL_TEMPLATE_REQUEST.metadata
# msg.body = json.dumps(msg_body_json)
msg = OperatorRequestBehaviour.create_acl_msg(smia_id, self.thread, msg_metadata, msg_body_json)
_logger.info("The selected capability to be executed is [{}].".format(self.capability))
# The information about the CSS-related request is added in the dictionary for the HTML result page
self.myagent.request_exec_info['InteractionsDict'].append(
{'type': 'acl_send', 'title': 'Requesting CSS-related capability execution ...',
'message': 'The SMIA with ID [{}] has been requested to execute the capability [{}].'.format(
smia_id, self.capability)})
# msg.to = smia_id
_logger.aclinfo("Sending {} capability request to {}...".format(self.capability, smia_id))
await self.send(msg)
_logger.aclinfo("Message sent to {}!".format(msg.to))
self.myagent.request_exec_info['Interactions'] += 1
# The behaviour need to wait to the response message
self.myagent.waiting_behavs[self.thread] = self.__class__.__name__
_logger.info('The behaviour will wait for the response of the CSS-related request...')
await self.receive_msg_event.wait()
# The information about the CSS-related response is added in the agent dictionary for HTML result page
if (OperatorRequestBehaviour.version_str_to_tuple(self.get_smia_version_by_id(smia_id)) >=
OperatorRequestBehaviour.version_str_to_tuple('0.2.4')):
try:
response_msg_body = json.loads(self.receive_msg.body)
except (TypeError, json.JSONDecodeError):
response_msg_body = self.receive_msg.body
response_info = {'type': 'acl_recv',
'title': 'Obtaining CSS-related capability execution result ...',
'response_msg': str(response_msg_body)}
else:
response_info = {'type': 'acl_recv', 'title': 'Obtaining CSS-related capability execution result ...',
'response_msg': str(json.loads(self.receive_msg.body)['serviceData']['serviceParams'])}
if self.receive_msg.get_metadata('performative') == FIPAACLInfo.FIPA_ACL_PERFORMATIVE_FAILURE:
response_info.update({'response_type': 'failure', 'response_title':
'The CSS-related execution has not been completed.'})
else:
response_info.update({'response_type': 'success', 'response_title':
'The CSS-related execution has been completed.'})
self.myagent.request_exec_info['InteractionsDict'].append(response_info)
# As the CSS-related request has finished, the time information is added in the agent dictionary
end_time = GeneralUtils.get_current_date_time()
duration = (datetime.fromisoformat(str(end_time)) -
datetime.fromisoformat(str(self.myagent.request_exec_info['StartTime']))).total_seconds()
self.myagent.request_exec_info.update({'EndTime': end_time, 'Duration': duration})
except Exception as e:
_logger.error("An exception occurred during an CSS-related request!")
_logger.error(e)
# The information about the error is added in the dictionary for the HTML result page
self.myagent.request_exec_info['InteractionsDict'].append(
{'type': 'exception', 'title': 'An error ocurred during the CSS-related request.', 'message': str(e)})
[docs]
async def adapt_msg_to_fipa_smiacl(self, msg_body_json: dict) -> dict:
"""
This method adapts a FIPA-ACL message into one that is compatible with the FIPA-SMIACL normalized language
(used in SMIA versions later than 0.2.4).
Args:
msg_body_json: dict with body of the message.
Returns:
msg_body_json: dict with adapted body of the message.
"""
if '#' not in self.capability:
self.capability = 'http://www.w3id.org/upv-ehu/gcis/css-smia#' + self.capability
if '#' not in self.skill:
self.skill = 'http://www.w3id.org/hsu-aut/css#' + self.skill
msg_body_json = {'capabilityIRI': self.capability, 'skillIRI': self.skill}
if self.skill_params is not None:
skill_params_dict = {}
for param in set(eval(self.skill_params)):
param_value = self.form_data.get(param, None)
if param_value is None:
_logger.warning(
"The value of the {} parameter is missing, it is possible that the capability "
"cannot be executed.".format(param))
if '#' not in param:
param = 'http://www.w3id.org/hsu-aut/css#' + param
skill_params_dict[param] = param_value
msg_body_json['skillParams'] = skill_params_dict
if self.constraints is not None:
constraints = {}
for const_name, const_value in eval(self.constraints).items():
if '#' not in const_name:
const_name = 'http://www.w3id.org/hsu-aut/css#' + const_name
constraints[const_name] = const_value
msg_body_json['constraints'] = constraints
return msg_body_json
[docs]
@staticmethod
def create_acl_msg(receiver_jid, thread, metadata, body_json):
"""
This method creates an FIPA-ACL SPADE message.
Args:
receiver_jid (str): the JID of the SMIA agent receiver.
thread (str): thread of the message.
metadata: metadata of the message.
body_json (dict): the body of the message in JSON format.
Returns:
spade.message.Message: the SPADE message object.
"""
msg = Message(to=receiver_jid, thread=thread)
msg.metadata = metadata
msg.body = json.dumps(body_json)
return msg
[docs]
def get_smia_version_by_id(self, smia_id):
"""
This method get the SMIA version y its identifier.
Args:
smia_id (str): identifier of the SMIA instance.
Returns:
str: version in string format.
"""
if '@' in smia_id:
smia_id = smia_id.split('@')[0]
for data in self.processed_data:
if data['smiaID'] == smia_id:
return data['smiaVersion']
return None
[docs]
@staticmethod
def version_str_to_tuple(version_str):
"""
This method converts a version string to tuple in order to make comparisons.
Args:
version_str (str): string version to convert.
Returns:
tuple: version in tuple format.
"""
return tuple(map(int, version_str.split('.')))