[docs]classMQTTAssetConnection(AssetConnection):""" This class implements the asset connection for MQTT protocol. It inherits from the valid official class defined by SMIA. """def__init__(self):super().__init__()self.architecture_style=AssetConnection.ArchitectureStyle.PUBSUB# Common dataself.interface_title=Noneself.base=Noneself.endpoint_metadata_elem=Noneself.security_scheme_elem=None# Data of each requestself.broker=Noneself.port=1883self.topic=Noneself.qos=0self.retain=Falseself.control_packet=Noneself.request_body=None# TODO analizar si son necesarias mas variables globales
[docs]asyncdefconfigure_connection_by_aas_model(self,interface_aas_elem):# The Interface element is first checkedawaitself.check_interface_element(interface_aas_elem)# Let's retrieve the necessary data from the AAS model to configure the HTTP connectionself.interface_title=interface_aas_elem.get_sm_element_by_semantic_id(AssetInterfacesInfo.SEMANTICID_INTERFACE_TITLE)# General information about the connection to the asset is defined in the SMC 'EndpointMetadata'self.endpoint_metadata_elem=interface_aas_elem.get_sm_element_by_semantic_id(AssetInterfacesInfo.SEMANTICID_ENDPOINT_METADATA)# The endpointMetadata element need to be checkedawaitself.check_endpoint_metadata()self.base=self.endpoint_metadata_elem.get_sm_element_by_semantic_id(AssetInterfacesInfo.SEMANTICID_INTERFACE_BASE)content_type_elem=self.endpoint_metadata_elem.get_sm_element_by_semantic_id(AssetInterfacesInfo.SEMANTICID_INTERFACE_CONTENT_TYPE)ifcontent_type_elemisnotNone:self.request_content_type=content_type_elem.value# TODO check (it is not used)security_definitions_elem=self.endpoint_metadata_elem.get_sm_element_by_semantic_id(AssetInterfacesInfo.SEMANTICID_INTERFACE_SECURITY_DEFINITIONS)ifsecurity_definitions_elemisnotNone:self.security_scheme_elem=security_definitions_elem.value# TODO: pensar como añadir el resto , p.e. tema de seguridad o autentificacion (bearer).# De momento se ha dejado sin seguridad (nosec_sc)# The InteractionMetadata elements also need to be checkedinteraction_metadata_elem=interface_aas_elem.get_sm_element_by_semantic_id(AssetInterfacesInfo.SEMANTICID_INTERACTION_METADATA)forinteraction_metadata_typeininteraction_metadata_elem:# Interaction metadata can be properties, actions or eventsforinteraction_elementininteraction_metadata_type:awaitself.check_interaction_metadata(interaction_element)# We extract the broker and port from the baseifself.baseisnotNoneandself.base.value:base_str=self.base.valueparsed_url=urlparse(self.base.value)ifnotparsed_url.netloc:parsed_url=urlparse(f"//{base_str}",scheme='mqtt')# If 'netloc' is empty, the '//' is missingelifnotparsed_url.scheme:parsed_url=parsed_url._replace(scheme='mqtt')# If there is no scheme, e.g., '//broker.com:1883'self.broker=parsed_url.hostnameifparsed_url.port:self.port=parsed_url.portelifparsed_url.scheme=='mqtts':self.port=8883else:self.port=1883
[docs]asyncdefexecute_asset_service(self,interaction_metadata,service_input_data=None):ifinteraction_metadataisNone:raiseAssetConnectionError("The skill cannot be executed by asset service because the given ""InteractionMetadata object is None","invalid method parameter","InteractionMetadata object is None")awaitself.extract_general_interaction_metadata(interaction_metadata)# Then, the data of the skill is added in the required field. To do that, the 'SkillParameterExposedThrough'# relationship should be obtained, which indicates where the parameter data should be addedifservice_input_dataisnotNoneandlen(service_input_data)>0:awaitself.add_asset_service_data(interaction_metadata,service_input_data)# At this point, the MQTT request is performedmqtt_response=awaitself.send_mqtt_request()ifmqtt_response:_logger.assetinfo("MQTT communication successfully completed.")returnTruereturnNone
# ---------------------# MQTT specific methods# ---------------------
[docs]asyncdefextract_general_interaction_metadata(self,interaction_metadata):""" This method extracts the general interaction information from the interaction metadata object. Since this is an MQTT Asset Connection, information about the topic, QoS, retain and control packet is obtained. All information is saved in the global variables of the class. Args: interaction_metadata (basyx.aas.model.SubmodelElementCollection): SubmodelElement of interactionMetadata. """# The interaction_metada element will be an SMC of the MQTT interface.awaitself.check_interaction_metadata(interaction_metadata)# First, the full valid topic of the MQTT request is obtained.forms_elem=interaction_metadata.get_sm_element_by_semantic_id(AssetInterfacesInfo.SEMANTICID_INTERFACE_FORMS)awaitself.get_topic(forms_elem)# Then, MQTT parameters are obtainedawaitself.get_mqtt_parameters(forms_elem)
[docs]asyncdefget_topic(self,forms_elem):""" This method gets the complete request Topic from the forms element within the InteractionMetadata element. The information is saved in the global variables of the class. Args: forms_elem (basyx.aas.model.SubmodelElementCollection): SubmodelElement of forms within InteractionMetadata. """href_elem=forms_elem.get_sm_element_by_semantic_id(AssetInterfacesInfo.SEMANTICID_INTERFACE_HREF)ifhref_elemisnotNoneandhref_elem.value:parsed_topic=urlparse(href_elem.value)path=parsed_topic.pathifparsed_topic.schemein('mqtt','mqtts'):self.topic=path.lstrip('/')# We remove the leading space to extract the exact topicelifpath.startswith('.//'):self.topic=f"/{path[3:]}"else:self.topic=path
[docs]asyncdefget_mqtt_parameters(self,forms_elem):""" This method gets the MQTT specific parameters from the forms element within the InteractionMetadata element. The information is saved in the global variables of the class. Args: forms_elem (basyx.aas.model.SubmodelElementCollection): SubmodelElement of forms within InteractionMetadata. """retain_elem=forms_elem.get_sm_element_by_semantic_id(MQTTAssetInterfaceSemantics.SEMANTICID_MQTT_INTERFACE_RETAIN)ifretain_elemisnotNoneandretain_elem.valueisnotNone:self.retain=str(retain_elem.value).lower()in['true','1']else:self.retain=Falsecontrol_packet_elem=forms_elem.get_sm_element_by_semantic_id(MQTTAssetInterfaceSemantics.SEMANTICID_MQTT_INTERFACE_CONTROL_PACKET)ifcontrol_packet_elemisnotNoneandcontrol_packet_elem.valueisnotNone:self.control_packet=control_packet_elem.valueqos_elem=forms_elem.get_sm_element_by_semantic_id(MQTTAssetInterfaceSemantics.SEMANTICID_MQTT_INTERFACE_QOS)ifqos_elemisnotNoneandqos_elem.valueisnotNone:try:self.qos=int(qos_elem.value)exceptValueError:self.qos=0else:self.qos=0
[docs]asyncdefadd_asset_service_data(self,interaction_metadata,service_input_data):""" This method adds the required data of the asset service, using the skill params information (exposure element and skill input data). The information is saved in the global variables of the class. Args: interaction_metadata (basyx.aas.model.SubmodelElementCollection): SubmodelElement of interactionMetadata. service_input_data (dict): dictionary containing the input data of the asset service. """# TODO revisar si habria que añadir los parametros en el modelo AAS, de momento simplemente se añaden# dependiendo el tipoself.request_body=awaitself.serialize_data_by_content_type(interaction_metadata,service_input_data)
[docs]asyncdefserialize_data_by_content_type(self,interaction_metadata,service_data):""" This method serializes the data for the given InteractionMetadata. Args: interaction_metadata(basyx.aas.model.SubmodelElementCollection): interactionMetadata Python object. service_data (dict): the data to be serialized in JSON format. Returns: str: service data in the content-type format. """content_type=awaitself.get_interaction_metadata_content_type(interaction_metadata)ifcontent_typeisNoneorcontent_type.valueisNone:returnjson.dumps(service_data)matchcontent_type.value:case'application/json':returnjson.dumps(service_data)# Data should be sent in string formatcase'text/plain':returnjson.dumps(service_data)case'application/xml':pass# Add the method to convert a JSON into XMLcase_:returnstr(service_data)
[docs]asyncdefsend_mqtt_request(self):""" This method sends the required MQTT message to the asset. All the required information is obtained from the global variables of the class. Returns: bool: True if communication succeeded. """ifnotself.broker:raiseAssetConnectionError("MQTT broker is not defined.","BrokerNotDefined","Missing Broker")ifself.topicisNone:raiseAssetConnectionError("MQTT topic is not defined.","TopicNotDefined","Missing Topic")try:asyncwithaiomqtt.Client(hostname=self.broker,port=self.port)asclient:ifself.control_packetin['publish',None]:payload=self.request_bodyifself.request_bodyisnotNoneelse""awaitclient.publish(self.topic,payload=payload,qos=self.qos,retain=self.retain)returnTrueelse:_logger.warning("Control packet {} is not supported for execute_asset_""service.".format(self.control_packet))returnFalseexceptaiomqtt.MqttErroraserror:raiseAssetConnectionError("The connection with the asset has raised an MQTT exception.",error.__class__.__name__,str(error))exceptExceptionase:raiseAssetConnectionError("The connection with the asset has raised an exception.",e.__class__.__name__,str(e))
[docs]classMQTTAssetInterfaceSemantics:""" This class contains the specific semanticIDs of HTTP interfaces. """SEMANTICID_MQTT_INTERFACE_RETAIN='https://www.w3.org/2019/wot/mqtt#hasRetainFlag'SEMANTICID_MQTT_INTERFACE_CONTROL_PACKET='https://www.w3.org/2019/wot/mqtt#ControlPacket'SEMANTICID_MQTT_INTERFACE_QOS='https://www.w3.org/2019/wot/mqtt#hasQoSFlag'