Source code for smia.behaviours.specific_handle_behaviours.handle_aas_related_svc_behaviour
import json
import logging
from basyx.aas.adapter.json import AASToJsonEncoder
from spade.behaviour import OneShotBehaviour
from smia import GeneralUtils
from smia.aas_model.aas_model_utils import AASModelUtils
from smia.logic import inter_smia_interactions_utils, acl_smia_messages_utils
from smia.logic.exceptions import RequestDataError, ServiceRequestExecutionError, AASModelReadingError, \
AssetConnectionError
from smia.logic.services_utils import AgentServiceUtils
from smia.utilities import smia_archive_utils
from smia.utilities.aas_related_services_info import AASRelatedServicesInfo
from smia.utilities.fipa_acl_info import FIPAACLInfo, ACLSMIAOntologyInfo
from smia.utilities.smia_info import AssetInterfacesInfo
_logger = logging.getLogger(__name__)
[docs]
class HandleAASRelatedSvcBehaviour(OneShotBehaviour):
"""
This class implements the behaviour that handles all the service requests that the SMIA has received. This
request can arrive from an FIPA-ACL message as a :term:`Inter AAS Interaction` or from the AAS Core as an
:term:`Intra AAS Interaction` message. This is a OneShotBehaviour because it handles an individual service request
and then kills itself.
"""
# TODO PENSAR SI AGRUPAR EN ESTA CLASE TANTO Requests como Responses ('HandleSvcBehaviour'), ya que para CSS solo
# hay CapabilityBehaviour. Dentro de esta se podria analizar la performativa (como ya se hace), para ver si es una
# peticion o una respuesta
def __init__(self, agent_object, received_acl_msg):
"""
The constructor method is rewritten to add the object of the agent.
Args:
agent_object (spade.Agent): the SPADE agent object of the SMIA agent.
received_acl_msg (spade.message.Message): the received ACL-SMIA message object
"""
# The constructor of the inherited class is executed.
super().__init__()
# The SPADE agent object is stored as a variable of the behaviour class
self.myagent = agent_object
self.received_acl_msg = received_acl_msg
self.received_body_json = acl_smia_messages_utils.get_parsed_body_from_acl_msg(self.received_acl_msg)
self.requested_timestamp = GeneralUtils.get_current_timestamp()
[docs]
async def on_start(self):
"""
This method implements the initialization process of this behaviour.
"""
_logger.info("HandleAASRelatedSvcBehaviour starting to handle the service related to the message with thread"
" [{}]...".format(self.received_acl_msg.thread))
[docs]
async def run(self):
"""
This method implements the logic of the behaviour.
"""
# Depending on the type of service, the associated method will be launched
match self.received_acl_msg.get_metadata(FIPAACLInfo.FIPA_ACL_ONTOLOGY_ATTRIB):
case (ACLSMIAOntologyInfo.ACL_ONTOLOGY_ASSET_RELATED_SERVICE |
ACLSMIAOntologyInfo.ACL_ONTOLOGY_AGENT_RELATED_SERVICE):
await self.handle_asset_agent_related_service()
case ACLSMIAOntologyInfo.ACL_ONTOLOGY_AAS_SERVICE:
await self.handle_aas_service()
case ACLSMIAOntologyInfo.ACL_ONTOLOGY_AAS_INFRASTRUCTURE_SERVICE:
await self.handle_aas_infrastructure_service()
_logger.info("Management of the AAS-related service with thread {} finished.".format(self.received_acl_msg.thread))
# -------------------------------------
# Asset-/agent-related services methods
# -------------------------------------
# --------------------
# AAS services methods
# --------------------
[docs]
async def handle_aas_service(self):
"""
This method implements the logic to handle the AAS type services.
"""
try:
match self.received_acl_msg.get_metadata(FIPAACLInfo.FIPA_ACL_PERFORMATIVE_ATTRIB):
case FIPAACLInfo.FIPA_ACL_PERFORMATIVE_QUERY_REF:
result = await self.handle_aas_svc_query_ref()
case FIPAACLInfo.FIPA_ACL_PERFORMATIVE_INFORM:
_logger.aclinfo(f"The SMIA has been informed about the AAS service related to the thread"
f" [{self.received_acl_msg.thread}] with the content:{self.received_acl_msg.body}.")
# TODO
return
case _:
unsupported_performative_msg = ("Cannot handle the asset-/agent-related service of the ACL "
"interaction with thread [{}] because the performative [{}] is not"
" yet supported.".format(
self.received_acl_msg.thread,
self.received_acl_msg.get_metadata(FIPAACLInfo.FIPA_ACL_PERFORMATIVE_ATTRIB)))
_logger.warning(unsupported_performative_msg)
raise RequestDataError(unsupported_performative_msg)
# When the service is successfully performed the result will be sent to the requester
await inter_smia_interactions_utils.send_response_msg_from_received(
self, self.received_acl_msg, FIPAACLInfo.FIPA_ACL_PERFORMATIVE_INFORM, response_body=result)
# The information will be stored in the log
# TODO MODIFICAR ESTO CON LAS NUEVAS ESTRUCTURAS DE MENSAJES JSON
smia_archive_utils.save_completed_svc_log_info(
self.requested_timestamp, GeneralUtils.get_current_timestamp(),
await inter_smia_interactions_utils.acl_message_to_json(self.received_acl_msg), str(result),
self.received_acl_msg.get_metadata(FIPAACLInfo.FIPA_ACL_ONTOLOGY_ATTRIB))
except (RequestDataError, ServiceRequestExecutionError, AASModelReadingError) as svc_request_error:
if isinstance(svc_request_error, RequestDataError):
svc_request_error = ServiceRequestExecutionError(
self.received_acl_msg.thread, svc_request_error.message,
self.received_acl_msg.get_metadata(FIPAACLInfo.FIPA_ACL_ONTOLOGY_ATTRIB), self)
if isinstance(svc_request_error, AASModelReadingError):
svc_request_error = ServiceRequestExecutionError(
self.received_acl_msg.thread, "{}. Reason: {}".format(svc_request_error.message,
svc_request_error.reason),
self.received_acl_msg.get_metadata(FIPAACLInfo.FIPA_ACL_ONTOLOGY_ATTRIB), self)
await svc_request_error.handle_service_execution_error()
return # killing a behaviour does not cancel its current run loop
[docs]
async def handle_aas_svc_query_ref(self):
"""
This method implements the logic to handle the Query-Ref performatives of AAS type services.
"""
match self.received_body_json['serviceType']:
case AASRelatedServicesInfo.AAS_SERVICE_TYPE_DISCOVERY:
return await self.handle_aas_discovery_svc()
case _:
unsupported_service_type = ("Cannot handle the AAS service of the ACL interaction with thread [{}] "
"because the service type [{}] is not yet supported.".format(
self.received_acl_msg.thread, self.received_body_json['serviceType']))
_logger.warning(unsupported_service_type)
raise RequestDataError(unsupported_service_type)
[docs]
async def handle_aas_discovery_svc(self):
"""
This method implements the logic to handle the AAS type services of Discovery ServiceTypes.
"""
match self.received_body_json['serviceID']:
case (AASRelatedServicesInfo.AAS_DISCOVERY_SERVICE_GET_SM_BY_REF |
AASRelatedServicesInfo.AAS_DISCOVERY_SERVICE_GET_SM_VALUE_BY_REF):
if isinstance(self.received_body_json['serviceParams'], str):
self.received_body_json['serviceParams'] = await AASModelUtils.aas_model_reference_string_to_dict(
self.received_body_json['serviceParams'])
self.received_body_json['serviceParams'] = await self.myagent.aas_model.check_and_adapt_for_templates(
self.received_body_json['serviceParams'])
aas_object_ref = await AASModelUtils.create_aas_reference_object(
'ModelReference', self.received_body_json['serviceParams'])
# When the appropriate Reference object is created, the requested SubmodelElement can be obtained
requested_sme = await self.myagent.aas_model.get_object_by_reference(aas_object_ref)
# When the AAS object has been obtained, the result is returned in JSON format
if self.received_body_json['serviceID'] == AASRelatedServicesInfo.AAS_DISCOVERY_SERVICE_GET_SM_BY_REF:
return json.dumps(requested_sme, cls=AASToJsonEncoder)
else:
if not hasattr(requested_sme, 'value'):
raise RequestDataError(f"The SubmodelElement queried with the reference ["
f"{self.received_body_json['serviceParams']}] has not value, so it cannot"
f" be returned")
if not isinstance(requested_sme.value, str):
return json.dumps(requested_sme.value, cls=AASToJsonEncoder)
return str(requested_sme.value)
case AASRelatedServicesInfo.AAS_DISCOVERY_SERVICE_GET_AAS_INFO:
#TODO
pass
case AASRelatedServicesInfo.AAS_DISCOVERY_SERVICE_GET_SM_BY_ID:
# TODO
pass
case _:
unsupported_service_id = ("Cannot handle the AAS service of the ACL interaction with thread [{}] "
"because the discovery service id [{}] is not yet supported.".format(
self.received_acl_msg.thread, self.received_body_json['serviceID']))
_logger.warning(unsupported_service_id)
raise RequestDataError(unsupported_service_id)
# -----------------------------------
# AAS infrastructure services methods
# -----------------------------------
[docs]
async def handle_aas_infrastructure_service(self):
"""
This method implements the logic to handle the Infrastructure services of AAS type services.
"""
# The AAS Infrastructure Services are offered by the platform (in SMIA approach by SMIA ISM), not by SMIA
# instances, so it will not realize any task
pass
[docs]
async def send_response_msg_to_sender(self, performative, service_params):
"""
This method creates and sends a FIPA-ACL message with the given serviceParams and performative.
Args:
performative (str): performative according to FIPA-ACL standard.
service_params (dict): JSON with the serviceParams to be sent in the message.
"""
acl_msg = inter_smia_interactions_utils.create_inter_smia_response_msg(
receiver=self.svc_req_data['sender'],
thread=self.svc_req_data['thread'],
performative=performative,
ontology='SvcResponse',
service_id=self.svc_req_data['serviceID'],
service_type=self.svc_req_data['serviceType'],
service_params=json.dumps(service_params)
)
await self.send(acl_msg)