[docs]defencode_string_in_base64_url(content_string):""" This method encodes a string in a base64 url format. This is required as the AAS and ontology identifiers need to be added in HTTP request paths. """content_bytes=content_string.encode('utf-8')encoded_content=base64.urlsafe_b64encode(content_bytes)returnencoded_content.decode('utf-8')
[docs]defdecode_base64_url_in_string(content_base64_url_string):""" This method encodes a string in a base64 url format. This is required as the AAS and ontology identifiers need to be added in HTTP request paths. """content_base64_url_bytes=content_base64_url_string.encode('utf-8')decoded_content=base64.urlsafe_b64decode(content_base64_url_bytes)returndecoded_content.decode('utf-8')
# HTTP METHODS# ------------
[docs]defsend_openapi_http_get_request(url,headers=None,timeout:int=5):""" This method sends an HTTP GET request to the AAS Repository and obtains the response JSON. """ifheadersisNone:# If headers are not set, the JSON headers are usedheaders=OPEN_API_JSON_HEADERStry:response=requests.get(url,headers=headers,timeout=timeout)# Try to parse JSON contenttry:content_json=response.json()if'result'incontent_json:returncontent_json['result']# In OpenAPI data can be returned in this fieldelse:returncontent_jsonexceptjson.JSONDecodeError:print(f"WARNING: Response claimed to be JSON but couldn't be parsed: {response.text[:100]}...")exceptrequests.exceptions.ConnectTimeout:_logger.error("ERROR: Connection timeout with {}".format(url))exceptrequests.exceptions.ConnectionError:_logger.error("ERROR: Connection error with {}".format(url))exceptExceptionase:_logger.error("ERROR: Unexpected error with {}".format(url))print(e)returnNone
[docs]defsend_openapi_http_post_request(url,headers=None,body=None,timeout:int=5):""" This method sends an HTTP GET request to the AAS Repository and obtains the response JSON. """ifheadersisNone:# If headers are not set, the JSON headers are usedheaders=OPEN_API_JSON_HEADERStry:ifisinstance(body,dict):response=requests.post(url,headers=headers,json=body,timeout=timeout)else:response=requests.post(url,headers=headers,data=body,timeout=timeout)# Try to parse JSON contenttry:content_json=response.json()# In OpenAPI data can be returned in 'result' fieldreturncontent_json['result']if'result'incontent_jsonelsecontent_jsonexceptjson.JSONDecodeError:print(f"WARNING: Response claimed to be JSON but couldn't be parsed: {response.text[:100]}...")exceptrequests.exceptions.ConnectTimeout:_logger.error("ERROR: Connection timeout with {}".format(url))exceptrequests.exceptions.ConnectionError:_logger.error("ERROR: Connection error with {}".format(url))exceptExceptionase:_logger.error("ERROR: Unexpected error with {}".format(url))print(e)returnNone
[docs]defcheck_and_get_response_error(response):""" This method checks if the response is valid adn extracts the errors message if it is not valid. Args: response: response object Returns: str: '' if it is valid and "ERROR..." if it is not. """ifresponseisNone:# TODO Analizar que devuelve cuando no hay instancias o cuando hay errorreturn"ERROR: The response is Null"ifisinstance(response,dict):if'code'inresponseand'message'inresponse:returnf"ERROR with code {response['code']}: {response}."# TODO Think and analyze more scenariosreturn''