Source code for smia.behaviours.specific_handle_behaviours.handle_aas_related_svc_behaviour

import json
import logging

from basyx.aas.adapter.json import AASToJsonEncoder
from spade.behaviour import OneShotBehaviour

from smia import GeneralUtils
from smia.aas_model.aas_model_utils import AASModelUtils
from smia.logic import inter_smia_interactions_utils, acl_smia_messages_utils
from smia.logic.exceptions import RequestDataError, ServiceRequestExecutionError, AASModelReadingError, \
    AssetConnectionError
from smia.logic.services_utils import AgentServiceUtils
from smia.utilities import smia_archive_utils
from smia.utilities.aas_related_services_info import AASRelatedServicesInfo
from smia.utilities.fipa_acl_info import FIPAACLInfo, ACLSMIAOntologyInfo
from smia.utilities.smia_info import AssetInterfacesInfo

_logger = logging.getLogger(__name__)


[docs] class HandleAASRelatedSvcBehaviour(OneShotBehaviour): """ This class implements the behaviour that handles all the service requests that the SMIA has received. This request can arrive from an FIPA-ACL message as a :term:`Inter AAS Interaction` or from the AAS Core as an :term:`Intra AAS Interaction` message. This is a OneShotBehaviour because it handles an individual service request and then kills itself. """ # TODO PENSAR SI AGRUPAR EN ESTA CLASE TANTO Requests como Responses ('HandleSvcBehaviour'), ya que para CSS solo # hay CapabilityBehaviour. Dentro de esta se podria analizar la performativa (como ya se hace), para ver si es una # peticion o una respuesta def __init__(self, agent_object, received_acl_msg): """ The constructor method is rewritten to add the object of the agent. Args: agent_object (spade.Agent): the SPADE agent object of the SMIA agent. received_acl_msg (spade.message.Message): the received ACL-SMIA message object """ # The constructor of the inherited class is executed. super().__init__() # The SPADE agent object is stored as a variable of the behaviour class self.myagent = agent_object self.received_acl_msg = received_acl_msg self.received_body_json = acl_smia_messages_utils.get_parsed_body_from_acl_msg(self.received_acl_msg) self.requested_timestamp = GeneralUtils.get_current_timestamp()
[docs] async def on_start(self): """ This method implements the initialization process of this behaviour. """ _logger.info("HandleAASRelatedSvcBehaviour starting to handle the service related to the message with thread" " [{}]...".format(self.received_acl_msg.thread))
[docs] async def run(self): """ This method implements the logic of the behaviour. """ # Depending on the type of service, the associated method will be launched match self.received_acl_msg.get_metadata(FIPAACLInfo.FIPA_ACL_ONTOLOGY_ATTRIB): case (ACLSMIAOntologyInfo.ACL_ONTOLOGY_ASSET_RELATED_SERVICE | ACLSMIAOntologyInfo.ACL_ONTOLOGY_AGENT_RELATED_SERVICE): await self.handle_asset_agent_related_service() case ACLSMIAOntologyInfo.ACL_ONTOLOGY_AAS_SERVICE: await self.handle_aas_service() case ACLSMIAOntologyInfo.ACL_ONTOLOGY_AAS_INFRASTRUCTURE_SERVICE: await self.handle_aas_infrastructure_service() _logger.info("Management of the AAS-related service with thread {} finished.".format(self.received_acl_msg.thread))
# ------------------------------------- # Asset-/agent-related services methods # ------------------------------------- # -------------------- # AAS services methods # --------------------
[docs] async def handle_aas_service(self): """ This method implements the logic to handle the AAS type services. """ try: match self.received_acl_msg.get_metadata(FIPAACLInfo.FIPA_ACL_PERFORMATIVE_ATTRIB): case FIPAACLInfo.FIPA_ACL_PERFORMATIVE_QUERY_REF: result = await self.handle_aas_svc_query_ref() case FIPAACLInfo.FIPA_ACL_PERFORMATIVE_INFORM: _logger.aclinfo(f"The SMIA has been informed about the AAS service related to the thread" f" [{self.received_acl_msg.thread}] with the content:{self.received_acl_msg.body}.") # TODO return case _: unsupported_performative_msg = ("Cannot handle the asset-/agent-related service of the ACL " "interaction with thread [{}] because the performative [{}] is not" " yet supported.".format( self.received_acl_msg.thread, self.received_acl_msg.get_metadata(FIPAACLInfo.FIPA_ACL_PERFORMATIVE_ATTRIB))) _logger.warning(unsupported_performative_msg) raise RequestDataError(unsupported_performative_msg) # When the service is successfully performed the result will be sent to the requester await inter_smia_interactions_utils.send_response_msg_from_received( self, self.received_acl_msg, FIPAACLInfo.FIPA_ACL_PERFORMATIVE_INFORM, response_body=result) # The information will be stored in the log # TODO MODIFICAR ESTO CON LAS NUEVAS ESTRUCTURAS DE MENSAJES JSON smia_archive_utils.save_completed_svc_log_info( self.requested_timestamp, GeneralUtils.get_current_timestamp(), await inter_smia_interactions_utils.acl_message_to_json(self.received_acl_msg), str(result), self.received_acl_msg.get_metadata(FIPAACLInfo.FIPA_ACL_ONTOLOGY_ATTRIB)) except (RequestDataError, ServiceRequestExecutionError, AASModelReadingError) as svc_request_error: if isinstance(svc_request_error, RequestDataError): svc_request_error = ServiceRequestExecutionError( self.received_acl_msg.thread, svc_request_error.message, self.received_acl_msg.get_metadata(FIPAACLInfo.FIPA_ACL_ONTOLOGY_ATTRIB), self) if isinstance(svc_request_error, AASModelReadingError): svc_request_error = ServiceRequestExecutionError( self.received_acl_msg.thread, "{}. Reason: {}".format(svc_request_error.message, svc_request_error.reason), self.received_acl_msg.get_metadata(FIPAACLInfo.FIPA_ACL_ONTOLOGY_ATTRIB), self) await svc_request_error.handle_service_execution_error() return # killing a behaviour does not cancel its current run loop
[docs] async def handle_aas_svc_query_ref(self): """ This method implements the logic to handle the Query-Ref performatives of AAS type services. """ match self.received_body_json['serviceType']: case AASRelatedServicesInfo.AAS_SERVICE_TYPE_DISCOVERY: return await self.handle_aas_discovery_svc() case _: unsupported_service_type = ("Cannot handle the AAS service of the ACL interaction with thread [{}] " "because the service type [{}] is not yet supported.".format( self.received_acl_msg.thread, self.received_body_json['serviceType'])) _logger.warning(unsupported_service_type) raise RequestDataError(unsupported_service_type)
[docs] async def handle_aas_discovery_svc(self): """ This method implements the logic to handle the AAS type services of Discovery ServiceTypes. """ match self.received_body_json['serviceID']: case (AASRelatedServicesInfo.AAS_DISCOVERY_SERVICE_GET_SM_BY_REF | AASRelatedServicesInfo.AAS_DISCOVERY_SERVICE_GET_SM_VALUE_BY_REF): if isinstance(self.received_body_json['serviceParams'], str): self.received_body_json['serviceParams'] = await AASModelUtils.aas_model_reference_string_to_dict( self.received_body_json['serviceParams']) self.received_body_json['serviceParams'] = await self.myagent.aas_model.check_and_adapt_for_templates( self.received_body_json['serviceParams']) aas_object_ref = await AASModelUtils.create_aas_reference_object( 'ModelReference', self.received_body_json['serviceParams']) # When the appropriate Reference object is created, the requested SubmodelElement can be obtained requested_sme = await self.myagent.aas_model.get_object_by_reference(aas_object_ref) # When the AAS object has been obtained, the result is returned in JSON format if self.received_body_json['serviceID'] == AASRelatedServicesInfo.AAS_DISCOVERY_SERVICE_GET_SM_BY_REF: return json.dumps(requested_sme, cls=AASToJsonEncoder) else: if not hasattr(requested_sme, 'value'): raise RequestDataError(f"The SubmodelElement queried with the reference [" f"{self.received_body_json['serviceParams']}] has not value, so it cannot" f" be returned") if not isinstance(requested_sme.value, str): return json.dumps(requested_sme.value, cls=AASToJsonEncoder) return str(requested_sme.value) case AASRelatedServicesInfo.AAS_DISCOVERY_SERVICE_GET_AAS_INFO: #TODO pass case AASRelatedServicesInfo.AAS_DISCOVERY_SERVICE_GET_SM_BY_ID: # TODO pass case _: unsupported_service_id = ("Cannot handle the AAS service of the ACL interaction with thread [{}] " "because the discovery service id [{}] is not yet supported.".format( self.received_acl_msg.thread, self.received_body_json['serviceID'])) _logger.warning(unsupported_service_id) raise RequestDataError(unsupported_service_id)
# ----------------------------------- # AAS infrastructure services methods # -----------------------------------
[docs] async def handle_aas_infrastructure_service(self): """ This method implements the logic to handle the Infrastructure services of AAS type services. """ # The AAS Infrastructure Services are offered by the platform (in SMIA approach by SMIA ISM), not by SMIA # instances, so it will not realize any task pass
[docs] async def send_response_msg_to_sender(self, performative, service_params): """ This method creates and sends a FIPA-ACL message with the given serviceParams and performative. Args: performative (str): performative according to FIPA-ACL standard. service_params (dict): JSON with the serviceParams to be sent in the message. """ acl_msg = inter_smia_interactions_utils.create_inter_smia_response_msg( receiver=self.svc_req_data['sender'], thread=self.svc_req_data['thread'], performative=performative, ontology='SvcResponse', service_id=self.svc_req_data['serviceID'], service_type=self.svc_req_data['serviceType'], service_params=json.dumps(service_params) ) await self.send(acl_msg)