diff --git a/.gitignore b/.gitignore index fe9b6f9c..4f2f9979 100644 --- a/.gitignore +++ b/.gitignore @@ -32,5 +32,9 @@ compliance_tool/aas_compliance_tool/version.py server/app/version.py # Ignore the content of the server storage -server/input/ +server/example_configurations/repository_standalone/input/ +server/example_configurations/repository_standalone/storage/ +server/example_configurations/registry_standalone/input/ +server/example_configurations/registry_standalone/storage/ +/storage/ server/storage/ diff --git a/server/app/adapter/__init__.py b/server/app/adapter/__init__.py new file mode 100644 index 00000000..10c99aa6 --- /dev/null +++ b/server/app/adapter/__init__.py @@ -0,0 +1 @@ +from .jsonization import * diff --git a/server/app/adapter/jsonization.py b/server/app/adapter/jsonization.py new file mode 100644 index 00000000..a8ee3471 --- /dev/null +++ b/server/app/adapter/jsonization.py @@ -0,0 +1,340 @@ +import logging +from typing import Callable, Dict, Optional, Set, Type + +from basyx.aas import model +from basyx.aas.adapter._generic import ASSET_KIND, ASSET_KIND_INVERSE, JSON_AAS_TOP_LEVEL_KEYS_TO_TYPES, PathOrIO +from basyx.aas.adapter.json import AASToJsonEncoder +from basyx.aas.adapter.json.json_deserialization import AASFromJsonDecoder, _get_ts, read_aas_json_file_into + +import app.model as server_model + +logger = logging.getLogger(__name__) + +JSON_SERVER_AAS_TOP_LEVEL_KEYS_TO_TYPES = JSON_AAS_TOP_LEVEL_KEYS_TO_TYPES + ( + ("assetAdministrationShellDescriptors", server_model.AssetAdministrationShellDescriptor), + ("submodelDescriptors", server_model.SubmodelDescriptor), +) + + +class ServerAASFromJsonDecoder(AASFromJsonDecoder): + @classmethod + def _get_aas_class_parsers(cls) -> Dict[str, Callable[[Dict[str, object]], object]]: + aas_class_parsers = super()._get_aas_class_parsers() + aas_class_parsers.update( + { + "AssetAdministrationShellDescriptor": cls._construct_asset_administration_shell_descriptor, + "SubmodelDescriptor": cls._construct_submodel_descriptor, + "AssetLink": cls._construct_asset_link, + "ProtocolInformation": cls._construct_protocol_information, + "Endpoint": cls._construct_endpoint, + } + ) + return aas_class_parsers + + # ################################################################################################## + # Utility Methods used in constructor methods to add general attributes (from abstract base classes) + # ################################################################################################## + + @classmethod + def _amend_abstract_attributes(cls, obj: object, dct: Dict[str, object]) -> None: + super()._amend_abstract_attributes(obj, dct) + + if isinstance(obj, server_model.Descriptor): + if "description" in dct: + obj.description = cls._construct_lang_string_set( + _get_ts(dct, "description", list), model.MultiLanguageTextType + ) + if "displayName" in dct: + obj.display_name = cls._construct_lang_string_set( + _get_ts(dct, "displayName", list), model.MultiLanguageNameType + ) + if "extensions" in dct: + for extension in _get_ts(dct, "extensions", list): + obj.extension.add(cls._construct_extension(extension)) + + @classmethod + def _construct_asset_administration_shell_descriptor( + cls, dct: Dict[str, object], object_class=server_model.AssetAdministrationShellDescriptor + ) -> server_model.AssetAdministrationShellDescriptor: + ret = object_class(id_=_get_ts(dct, "id", str)) + cls._amend_abstract_attributes(ret, dct) + if "administration" in dct: + ret.administration = cls._construct_administrative_information(_get_ts(dct, "administration", dict)) + if "assetKind" in dct: + ret.asset_kind = ASSET_KIND_INVERSE[_get_ts(dct, "assetKind", str)] + if "assetType" in dct: + ret.asset_type = _get_ts(dct, "assetType", str) + global_asset_id = None + if "globalAssetId" in dct: + ret.global_asset_id = _get_ts(dct, "globalAssetId", str) + specific_asset_id = set() + if "specificAssetIds" in dct: + for desc_data in _get_ts(dct, "specificAssetIds", list): + specific_asset_id.add(cls._construct_specific_asset_id(desc_data, model.SpecificAssetId)) + if "endpoints" in dct: + for endpoint_dct in _get_ts(dct, "endpoints", list): + if "protocolInformation" in endpoint_dct: + ret.endpoints.append(cls._construct_endpoint(endpoint_dct, server_model.Endpoint)) + elif "href" in endpoint_dct: + protocol_info = server_model.ProtocolInformation( + href=_get_ts(endpoint_dct["href"], "href", str), + endpoint_protocol=( + _get_ts(endpoint_dct["href"], "endpointProtocol", str) + if "endpointProtocol" in endpoint_dct["href"] + else None + ), + endpoint_protocol_version=( + _get_ts(endpoint_dct["href"], "endpointProtocolVersion", list) + if "endpointProtocolVersion" in endpoint_dct["href"] + else None + ), + ) + ret.endpoints.append( + server_model.Endpoint( + protocol_information=protocol_info, interface=_get_ts(endpoint_dct, "interface", str) + ) + ) + if "idShort" in dct: + ret.id_short = _get_ts(dct, "idShort", str) + if "submodelDescriptors" in dct: + for sm_dct in _get_ts(dct, "submodelDescriptors", list): + ret.submodel_descriptors.append( + cls._construct_submodel_descriptor(sm_dct, server_model.SubmodelDescriptor) + ) + return ret + + @classmethod + def _construct_protocol_information( + cls, dct: Dict[str, object], object_class=server_model.ProtocolInformation + ) -> server_model.ProtocolInformation: + ret = object_class( + href=_get_ts(dct, "href", str), + endpoint_protocol=_get_ts(dct, "endpointProtocol", str) if "endpointProtocol" in dct else None, + endpoint_protocol_version=( + _get_ts(dct, "endpointProtocolVersion", list) if "endpointProtocolVersion" in dct else None + ), + subprotocol=_get_ts(dct, "subprotocol", str) if "subprotocol" in dct else None, + subprotocol_body=_get_ts(dct, "subprotocolBody", str) if "subprotocolBody" in dct else None, + subprotocol_body_encoding=( + _get_ts(dct, "subprotocolBodyEncoding", str) if "subprotocolBodyEncoding" in dct else None + ), + ) + return ret + + @classmethod + def _construct_endpoint(cls, dct: Dict[str, object], object_class=server_model.Endpoint) -> server_model.Endpoint: + ret = object_class( + protocol_information=cls._construct_protocol_information( + _get_ts(dct, "protocolInformation", dict), server_model.ProtocolInformation + ), + interface=_get_ts(dct, "interface", str), + ) + cls._amend_abstract_attributes(ret, dct) + return ret + + @classmethod + def _construct_submodel_descriptor( + cls, dct: Dict[str, object], object_class=server_model.SubmodelDescriptor + ) -> server_model.SubmodelDescriptor: + ret = object_class(id_=_get_ts(dct, "id", str), endpoints=[]) + cls._amend_abstract_attributes(ret, dct) + for endpoint_dct in _get_ts(dct, "endpoints", list): + if "protocolInformation" in endpoint_dct: + ret.endpoints.append(cls._construct_endpoint(endpoint_dct, server_model.Endpoint)) + elif "href" in endpoint_dct: + protocol_info = server_model.ProtocolInformation( + href=_get_ts(endpoint_dct["href"], "href", str), + endpoint_protocol=( + _get_ts(endpoint_dct["href"], "endpointProtocol", str) + if "endpointProtocol" in endpoint_dct["href"] + else None + ), + endpoint_protocol_version=( + _get_ts(endpoint_dct["href"], "endpointProtocolVersion", list) + if "endpointProtocolVersion" in endpoint_dct["href"] + else None + ), + ) + ret.endpoints.append( + server_model.Endpoint( + protocol_information=protocol_info, interface=_get_ts(endpoint_dct, "interface", str) + ) + ) + if "administration" in dct: + ret.administration = cls._construct_administrative_information(_get_ts(dct, "administration", dict)) + if "idShort" in dct: + ret.id_short = _get_ts(dct, "idShort", str) + if "semanticId" in dct: + ret.semantic_id = cls._construct_reference(_get_ts(dct, "semanticId", dict)) + if "supplementalSemanticIds" in dct: + for ref in _get_ts(dct, "supplementalSemanticIds", list): + ret.supplemental_semantic_id.append(cls._construct_reference(ref)) + return ret + + @classmethod + def _construct_asset_link( + cls, dct: Dict[str, object], object_class=server_model.AssetLink + ) -> server_model.AssetLink: + ret = object_class(name=_get_ts(dct, "name", str), value=_get_ts(dct, "value", str)) + return ret + + +class ServerStrictAASFromJsonDecoder(ServerAASFromJsonDecoder): + """ + A strict version of the AASFromJsonDecoder class for deserializing Asset Administration Shell data from the + official JSON format + + This version has set ``failsafe = False``, which will lead to Exceptions raised for every missing attribute or wrong + object type. + """ + + failsafe = False + + +class ServerStrippedAASFromJsonDecoder(ServerAASFromJsonDecoder): + """ + Decoder for stripped JSON objects. Used in the HTTP adapter. + """ + + stripped = True + + +class ServerStrictStrippedAASFromJsonDecoder(ServerStrictAASFromJsonDecoder, ServerStrippedAASFromJsonDecoder): + """ + Non-failsafe decoder for stripped JSON objects. + """ + + pass + + +def read_server_aas_json_file_into( + object_store: model.AbstractObjectStore, + file: PathOrIO, + replace_existing: bool = False, + ignore_existing: bool = False, + failsafe: bool = True, + stripped: bool = False, + decoder: Optional[Type[AASFromJsonDecoder]] = None, +) -> Set[model.Identifier]: + return read_aas_json_file_into( + object_store=object_store, + file=file, + replace_existing=replace_existing, + ignore_existing=ignore_existing, + failsafe=failsafe, + stripped=stripped, + decoder=decoder, + keys_to_types=JSON_SERVER_AAS_TOP_LEVEL_KEYS_TO_TYPES, + ) + + +class ServerAASToJsonEncoder(AASToJsonEncoder): + + @classmethod + def _get_aas_class_serializers(cls) -> Dict[Type, Callable]: + serializers = super()._get_aas_class_serializers() + serializers.update( + { + server_model.AssetAdministrationShellDescriptor: cls._asset_administration_shell_descriptor_to_json, + server_model.SubmodelDescriptor: cls._submodel_descriptor_to_json, + server_model.Endpoint: cls._endpoint_to_json, + server_model.ProtocolInformation: cls._protocol_information_to_json, + server_model.AssetLink: cls._asset_link_to_json, + } + ) + return serializers + + @classmethod + def _abstract_classes_to_json(cls, obj: object) -> Dict[str, object]: + data: Dict[str, object] = super()._abstract_classes_to_json(obj) + if isinstance(obj, server_model.Descriptor): + if obj.description: + data["description"] = obj.description + if obj.display_name: + data["displayName"] = obj.display_name + if obj.extension: + data["extensions"] = list(obj.extension) + return data + + @classmethod + def _asset_administration_shell_descriptor_to_json( + cls, obj: server_model.AssetAdministrationShellDescriptor + ) -> Dict[str, object]: + """ + serialization of an object from class AssetAdministrationShell to json + + :param obj: object of class AssetAdministrationShell + :return: dict with the serialized attributes of this object + """ + data = cls._abstract_classes_to_json(obj) + data.update(cls._namespace_to_json(obj)) + data["id"] = obj.id + if obj.administration: + data["administration"] = obj.administration + if obj.asset_kind: + data["assetKind"] = ASSET_KIND[obj.asset_kind] + if obj.asset_type: + data["assetType"] = obj.asset_type + if obj.global_asset_id: + data["globalAssetId"] = obj.global_asset_id + if obj.specific_asset_id: + data["specificAssetIds"] = list(obj.specific_asset_id) + if obj.endpoints: + data["endpoints"] = list(obj.endpoints) + if obj.id_short: + data["idShort"] = obj.id_short + if obj.submodel_descriptors: + data["submodelDescriptors"] = list(obj.submodel_descriptors) + return data + + @classmethod + def _protocol_information_to_json(cls, obj: server_model.ProtocolInformation) -> Dict[str, object]: + data = cls._abstract_classes_to_json(obj) + + data["href"] = obj.href + if obj.endpoint_protocol: + data["endpointProtocol"] = obj.endpoint_protocol + if obj.endpoint_protocol_version: + data["endpointProtocolVersion"] = obj.endpoint_protocol_version + if obj.subprotocol: + data["subprotocol"] = obj.subprotocol + if obj.subprotocol_body: + data["subprotocolBody"] = obj.subprotocol_body + if obj.subprotocol_body_encoding: + data["subprotocolBodyEncoding"] = obj.subprotocol_body_encoding + return data + + @classmethod + def _endpoint_to_json(cls, obj: server_model.Endpoint) -> Dict[str, object]: + data = cls._abstract_classes_to_json(obj) + data["protocolInformation"] = cls._protocol_information_to_json(obj.protocol_information) + data["interface"] = obj.interface + return data + + @classmethod + def _submodel_descriptor_to_json(cls, obj: server_model.SubmodelDescriptor) -> Dict[str, object]: + """ + serialization of an object from class Submodel to json + + :param obj: object of class Submodel + :return: dict with the serialized attributes of this object + """ + data = cls._abstract_classes_to_json(obj) + data["id"] = obj.id + data["endpoints"] = [cls._endpoint_to_json(ep) for ep in obj.endpoints] + if obj.id_short: + data["idShort"] = obj.id_short + if obj.administration: + data["administration"] = obj.administration + if obj.semantic_id: + data["semanticId"] = obj.semantic_id + if obj.supplemental_semantic_id: + data["supplementalSemanticIds"] = list(obj.supplemental_semantic_id) + return data + + @classmethod + def _asset_link_to_json(cls, obj: server_model.AssetLink) -> Dict[str, object]: + data = cls._abstract_classes_to_json(obj) + data["name"] = obj.name + data["value"] = obj.value + return data diff --git a/server/app/backend/__init__.py b/server/app/backend/__init__.py new file mode 100644 index 00000000..a58825fb --- /dev/null +++ b/server/app/backend/__init__.py @@ -0,0 +1 @@ +from .local_file import * diff --git a/server/app/backend/local_file.py b/server/app/backend/local_file.py new file mode 100644 index 00000000..e55c08e6 --- /dev/null +++ b/server/app/backend/local_file.py @@ -0,0 +1,174 @@ +import hashlib +import json +import logging +import os +import threading +import weakref +from typing import Dict, Iterator, Type, Union + +from basyx.aas import model +from basyx.aas.model import provider as sdk_provider + +from app.adapter import jsonization +from app.model import AssetAdministrationShellDescriptor, SubmodelDescriptor, descriptor + +logger = logging.getLogger(__name__) + +_DESCRIPTOR_TYPE = Union[descriptor.AssetAdministrationShellDescriptor, descriptor.SubmodelDescriptor] +_DESCRIPTOR_CLASSES = (descriptor.AssetAdministrationShellDescriptor, descriptor.SubmodelDescriptor) + +# We need to resolve the Descriptor type in order to deserialize it again from JSON +DESCRIPTOR_TYPE_TO_STRING: Dict[Type[Union[AssetAdministrationShellDescriptor, SubmodelDescriptor]], str] = { + AssetAdministrationShellDescriptor: "AssetAdministrationShellDescriptor", + SubmodelDescriptor: "SubmodelDescriptor", +} + + +class LocalFileDescriptorStore(sdk_provider.AbstractObjectStore[model.Identifier, _DESCRIPTOR_TYPE]): + """ + An ObjectStore implementation for :class:`~app.model.descriptor.Descriptor` BaSyx Python SDK objects backed + by a local file based local backend + """ + + def __init__(self, directory_path: str): + """ + Initializer of class LocalFileDescriptorStore + + :param directory_path: Path to the local file backend (the path where you want to store your AAS JSON files) + """ + self.directory_path: str = directory_path.rstrip("/") + + # A dictionary of weak references to local replications of stored objects. Objects are kept in this cache as + # long as there is any other reference in the Python application to them. We use this to make sure that only one + # local replication of each object is kept in the application and retrieving an object from the store always + # returns the **same** (not only equal) object. Still, objects are forgotten, when they are not referenced + # anywhere else to save memory. + self._object_cache: weakref.WeakValueDictionary[model.Identifier, _DESCRIPTOR_TYPE] = ( + weakref.WeakValueDictionary() + ) + self._object_cache_lock = threading.Lock() + + def check_directory(self, create=False): + """ + Check if the directory exists and created it if not (and requested to do so) + + :param create: If True and the database does not exist, try to create it + """ + if not os.path.exists(self.directory_path): + if not create: + raise FileNotFoundError("The given directory ({}) does not exist".format(self.directory_path)) + # Create directory + os.mkdir(self.directory_path) + logger.info("Creating directory {}".format(self.directory_path)) + + def get_descriptor_by_hash(self, hash_: str) -> _DESCRIPTOR_TYPE: + """ + Retrieve an AAS Descriptor object from the local file by its identifier hash + + :raises KeyError: If the respective file could not be found + """ + # Try to get the correct file + try: + with open("{}/{}.json".format(self.directory_path, hash_), "r") as file: + obj = json.load(file, cls=jsonization.ServerAASFromJsonDecoder) + except FileNotFoundError as e: + raise KeyError("No Descriptor with hash {} found in local file database".format(hash_)) from e + # If we still have a local replication of that object (since it is referenced from anywhere else), update that + # replication and return it. + with self._object_cache_lock: + if obj.id in self._object_cache: + old_obj = self._object_cache[obj.id] + old_obj.update_from(obj) + return old_obj + self._object_cache[obj.id] = obj + return obj + + def get_item(self, identifier: model.Identifier) -> _DESCRIPTOR_TYPE: + """ + Retrieve an AAS Descriptor object from the local file by its :class:`~basyx.aas.model.base.Identifier` + + :raises KeyError: If the respective file could not be found + """ + try: + return self.get_descriptor_by_hash(self._transform_id(identifier)) + except KeyError as e: + raise KeyError("No Identifiable with id {} found in local file database".format(identifier)) from e + + def add(self, x: _DESCRIPTOR_TYPE) -> None: + """ + Add a Descriptor object to the store + + :raises KeyError: If an object with the same id exists already in the object store + """ + logger.debug("Adding object %s to Local File Store ...", repr(x)) + if os.path.exists("{}/{}.json".format(self.directory_path, self._transform_id(x.id))): + raise KeyError("Descriptor with id {} already exists in local file database".format(x.id)) + with open("{}/{}.json".format(self.directory_path, self._transform_id(x.id)), "w") as file: + # Usually, we don't need to serialize the modelType, since during HTTP requests, we know exactly if this + # is an AASDescriptor or SubmodelDescriptor. However, here we cannot distinguish them, so to deserialize + # them successfully, we hack the `modelType` into the JSON. + serialized = json.loads(json.dumps(x, cls=jsonization.ServerAASToJsonEncoder)) + serialized["modelType"] = DESCRIPTOR_TYPE_TO_STRING[type(x)] + json.dump(serialized, file, indent=4) + with self._object_cache_lock: + self._object_cache[x.id] = x + + def discard(self, x: _DESCRIPTOR_TYPE) -> None: + """ + Delete an :class:`~app.model.descriptor.Descriptor` AAS object from the local file store + + :param x: The object to be deleted + :raises KeyError: If the object does not exist in the database + """ + logger.debug("Deleting object %s from Local File Store database ...", repr(x)) + try: + os.remove("{}/{}.json".format(self.directory_path, self._transform_id(x.id))) + except FileNotFoundError as e: + raise KeyError("No AAS Descriptor object with id {} exists in local file database".format(x.id)) from e + with self._object_cache_lock: + self._object_cache.pop(x.id, None) + + def __contains__(self, x: object) -> bool: + """ + Check if an object with the given :class:`~basyx.aas.model.base.Identifier` or the same + :class:`~basyx.aas.model.base.Identifier` as the given object is contained in the local file database + + :param x: AAS object :class:`~basyx.aas.model.base.Identifier` or :class:`~app.model.descriptor.Descriptor` + AAS object + :return: ``True`` if such an object exists in the database, ``False`` otherwise + """ + if isinstance(x, model.Identifier): + identifier = x + elif isinstance(x, _DESCRIPTOR_CLASSES): + identifier = x.id + else: + return False + logger.debug("Checking existence of Descriptor object with id %s in database ...", repr(x)) + return os.path.exists("{}/{}.json".format(self.directory_path, self._transform_id(identifier))) + + def __len__(self) -> int: + """ + Retrieve the number of objects in the local file database + + :return: The number of objects (determined from the number of documents) + """ + logger.debug("Fetching number of documents from database ...") + return len(os.listdir(self.directory_path)) + + def __iter__(self) -> Iterator[_DESCRIPTOR_TYPE]: + """ + Iterate all :class:`~app.model.descriptor.Descriptor` objects in the local folder. + + This method returns an iterator, containing only a list of all identifiers in the database and retrieving + the identifiable objects on the fly. + """ + logger.debug("Iterating over objects in database ...") + for name in os.listdir(self.directory_path): + yield self.get_descriptor_by_hash(name.rstrip(".json")) + + @staticmethod + def _transform_id(identifier: model.Identifier) -> str: + """ + Helper method to represent an ASS Identifier as a string to be used as Local file document id + """ + return hashlib.sha256(identifier.encode("utf-8")).hexdigest() diff --git a/server/app/interfaces/__init__.py b/server/app/interfaces/__init__.py index e69de29b..a7d6a55d 100644 --- a/server/app/interfaces/__init__.py +++ b/server/app/interfaces/__init__.py @@ -0,0 +1,4 @@ +from .base import * +from .discovery import * +from .registry import * +from .repository import * diff --git a/server/app/interfaces/base.py b/server/app/interfaces/base.py index 8234eddc..8868dd7f 100644 --- a/server/app/interfaces/base.py +++ b/server/app/interfaces/base.py @@ -10,23 +10,24 @@ import io import itertools import json -from typing import Iterable, Type, Iterator, Tuple, Optional, List, Union, Dict, Callable, TypeVar, Any +from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Type, TypeVar, Union import werkzeug.exceptions import werkzeug.routing import werkzeug.utils -from lxml import etree -from werkzeug import Response, Request -from werkzeug.exceptions import NotFound, BadRequest -from werkzeug.routing import MapAdapter - from basyx.aas import model from basyx.aas.adapter._generic import XML_NS_MAP -from basyx.aas.adapter.json import StrictStrippedAASFromJsonDecoder, StrictAASFromJsonDecoder, AASToJsonEncoder -from basyx.aas.adapter.xml import xml_serialization, XMLConstructables, read_aas_xml_element +from basyx.aas.adapter.xml import XMLConstructables, read_aas_xml_element, xml_serialization from basyx.aas.model import AbstractObjectStore -from app.util.converters import base64url_decode +from lxml import etree +from werkzeug import Request, Response +from werkzeug.exceptions import BadRequest, NotFound +from werkzeug.routing import MapAdapter +import app.model +from app.adapter import ServerAASToJsonEncoder, ServerStrictAASFromJsonDecoder, ServerStrictStrippedAASFromJsonDecoder +from app.model import AssetAdministrationShellDescriptor, AssetLink, SubmodelDescriptor +from app.util.converters import base64url_decode T = TypeVar("T") @@ -44,13 +45,19 @@ def __str__(self): class Message: - def __init__(self, code: str, text: str, message_type: MessageType = MessageType.UNDEFINED, - timestamp: Optional[datetime.datetime] = None): + def __init__( + self, + code: str, + text: str, + message_type: MessageType = MessageType.UNDEFINED, + timestamp: Optional[datetime.datetime] = None, + ): self.code: str = code self.text: str = text self.message_type: MessageType = message_type - self.timestamp: datetime.datetime = timestamp if timestamp is not None \ - else datetime.datetime.now(datetime.timezone.utc) + self.timestamp: datetime.datetime = ( + timestamp if timestamp is not None else datetime.datetime.now(datetime.timezone.utc) + ) class Result: @@ -66,8 +73,9 @@ def __init__(self, success: bool, messages: Optional[List[Message]] = None): class APIResponse(abc.ABC, Response): @abc.abstractmethod - def __init__(self, obj: Optional[ResponseData] = None, cursor: Optional[int] = None, - stripped: bool = False, *args, **kwargs): + def __init__( + self, obj: Optional[ResponseData] = None, cursor: Optional[int] = None, stripped: bool = False, *args, **kwargs + ): super().__init__(*args, **kwargs) if obj is None: self.status_code = 204 @@ -87,14 +95,9 @@ def serialize(self, obj: ResponseData, cursor: Optional[int], stripped: bool) -> if cursor is None: data = obj else: - data = { - "paging_metadata": {"cursor": str(cursor)}, - "result": obj - } + data = {"paging_metadata": {"cursor": str(cursor)}, "result": obj} return json.dumps( - data, - cls=StrippedResultToJsonEncoder if stripped else ResultToJsonEncoder, - separators=(",", ":") + data, cls=StrippedResultToJsonEncoder if stripped else ResultToJsonEncoder, separators=(",", ":") ) @@ -159,13 +162,10 @@ def __init__(self, *args, content_type="text/xml", **kwargs): super().__init__(*args, **kwargs, content_type=content_type) -class ResultToJsonEncoder(AASToJsonEncoder): +class ResultToJsonEncoder(ServerAASToJsonEncoder): @classmethod def _result_to_json(cls, result: Result) -> Dict[str, object]: - return { - "success": result.success, - "messages": result.messages - } + return {"success": result.success, "messages": result.messages} @classmethod def _message_to_json(cls, message: Message) -> Dict[str, object]: @@ -173,7 +173,7 @@ def _message_to_json(cls, message: Message) -> Dict[str, object]: "messageType": message.message_type, "text": message.text, "code": message.code, - "timestamp": message.timestamp.isoformat() + "timestamp": message.timestamp.isoformat(), } def default(self, obj: object) -> object: @@ -200,8 +200,8 @@ def __call__(self, environ, start_response) -> Iterable[bytes]: @classmethod def _get_slice(cls, request: Request, iterator: Iterable[T]) -> Tuple[Iterator[T], int]: - limit_str = request.args.get('limit', default="10") - cursor_str = request.args.get('cursor', default="1") + limit_str = request.args.get("limit", default="10") + cursor_str = request.args.get("cursor", default="1") try: limit, cursor = int(limit_str), int(cursor_str) - 1 # cursor is 1-indexed if limit < 0 or cursor < 0: @@ -234,27 +234,31 @@ def get_response_type(request: Request) -> Type[APIResponse]: response_types: Dict[str, Type[APIResponse]] = { "application/json": JsonResponse, "application/xml": XmlResponse, - "text/xml": XmlResponseAlt + "text/xml": XmlResponseAlt, } if len(request.accept_mimetypes) == 0 or request.accept_mimetypes.best in (None, "*/*"): return JsonResponse mime_type = request.accept_mimetypes.best_match(response_types) if mime_type is None: - raise werkzeug.exceptions.NotAcceptable("This server supports the following content types: " - + ", ".join(response_types.keys())) + raise werkzeug.exceptions.NotAcceptable( + "This server supports the following content types: " + ", ".join(response_types.keys()) + ) return response_types[mime_type] @staticmethod - def http_exception_to_response(exception: werkzeug.exceptions.HTTPException, response_type: Type[APIResponse]) \ - -> APIResponse: + def http_exception_to_response( + exception: werkzeug.exceptions.HTTPException, response_type: Type[APIResponse] + ) -> APIResponse: headers = exception.get_headers() location = exception.get_response().location if location is not None: headers.append(("Location", location)) if exception.code and exception.code >= 400: - message = Message(type(exception).__name__, - exception.description if exception.description is not None else "", - MessageType.ERROR) + message = Message( + type(exception).__name__, + exception.description if exception.description is not None else "", + MessageType.ERROR, + ) result = Result(False, [message]) else: result = Result(False) @@ -264,13 +268,12 @@ def http_exception_to_response(exception: werkzeug.exceptions.HTTPException, res class ObjectStoreWSGIApp(BaseWSGIApp): object_store: AbstractObjectStore - def _get_all_obj_of_type(self, type_: Type[model.provider._IDENTIFIABLE]) -> Iterator[model.provider._IDENTIFIABLE]: + def _get_all_obj_of_type(self, type_: Type[T]) -> Iterator[T]: for obj in self.object_store: if isinstance(obj, type_): yield obj - def _get_obj_ts(self, identifier: model.Identifier, type_: Type[model.provider._IDENTIFIABLE]) \ - -> model.provider._IDENTIFIABLE: + def _get_obj_ts(self, identifier: model.Identifier, type_: Type[T]) -> T: identifiable = self.object_store.get(identifier) if not isinstance(identifiable, type_): raise NotFound(f"No {type_.__name__} with {identifier} found!") @@ -293,7 +296,12 @@ class HTTPApiDecoder: @classmethod def check_type_support(cls, type_: type): - if type_ not in cls.type_constructables_map: + tolerated_types = ( + AssetAdministrationShellDescriptor, + SubmodelDescriptor, + AssetLink, + ) + if type_ not in cls.type_constructables_map and type_ not in tolerated_types: raise TypeError(f"Parsing {type_} is not supported!") @classmethod @@ -305,8 +313,9 @@ def assert_type(cls, obj: object, type_: Type[T]) -> T: @classmethod def json_list(cls, data: Union[str, bytes], expect_type: Type[T], stripped: bool, expect_single: bool) -> List[T]: cls.check_type_support(expect_type) - decoder: Type[StrictAASFromJsonDecoder] = StrictStrippedAASFromJsonDecoder if stripped \ - else StrictAASFromJsonDecoder + decoder: Type[ServerStrictAASFromJsonDecoder] = ( + ServerStrictStrippedAASFromJsonDecoder if stripped else ServerStrictAASFromJsonDecoder + ) try: parsed = json.loads(data, cls=decoder) if isinstance(parsed, list) and expect_single: @@ -325,6 +334,9 @@ def json_list(cls, data: Union[str, bytes], expect_type: Type[T], stripped: bool model.SpecificAssetId: decoder._construct_specific_asset_id, model.Reference: decoder._construct_reference, model.Qualifier: decoder._construct_qualifier, + app.model.AssetAdministrationShellDescriptor: decoder._construct_asset_administration_shell_descriptor, + app.model.SubmodelDescriptor: decoder._construct_submodel_descriptor, + app.model.AssetLink: decoder._construct_asset_link, } constructor: Optional[Callable[..., T]] = mapping.get(expect_type) # type: ignore[assignment] @@ -360,8 +372,9 @@ def xml(cls, data: bytes, expect_type: Type[T], stripped: bool) -> T: cls.check_type_support(expect_type) try: xml_data = io.BytesIO(data) - rv = read_aas_xml_element(xml_data, cls.type_constructables_map[expect_type], - stripped=stripped, failsafe=False) + rv = read_aas_xml_element( + xml_data, cls.type_constructables_map[expect_type], stripped=stripped, failsafe=False + ) except (KeyError, ValueError) as e: # xml deserialization creates an error chain. since we only return one error, return the root cause f: BaseException = e @@ -386,8 +399,8 @@ def request_body(cls, request: Request, expect_type: Type[T], stripped: bool) -> if request.mimetype not in valid_content_types: raise werkzeug.exceptions.UnsupportedMediaType( - f"Invalid content-type: {request.mimetype}! Supported types: " - + ", ".join(valid_content_types)) + f"Invalid content-type: {request.mimetype}! Supported types: " + ", ".join(valid_content_types) + ) if request.mimetype == "application/json": return cls.json(request.get_data(), expect_type, stripped) diff --git a/server/app/interfaces/discovery.py b/server/app/interfaces/discovery.py new file mode 100644 index 00000000..0506c73c --- /dev/null +++ b/server/app/interfaces/discovery.py @@ -0,0 +1,158 @@ +""" +This module implements the Discovery interface defined in the 'Specification of the Asset Administration Shell Part 2 – Application Programming Interface'. +""" + +import json +from typing import Dict, List, Set + +import werkzeug.exceptions +from basyx.aas import model +from werkzeug.routing import Rule, Submount +from werkzeug.wrappers import Request, Response + +from app import model as server_model +from app.adapter import jsonization +from app.interfaces.base import BaseWSGIApp, HTTPApiDecoder +from app.util.converters import IdentifierToBase64URLConverter + + +class DiscoveryStore: + def __init__(self) -> None: + self.aas_id_to_asset_ids: Dict[model.Identifier, Set[model.SpecificAssetId]] = {} + self.asset_id_to_aas_ids: Dict[model.SpecificAssetId, Set[model.Identifier]] = {} + + def get_all_specific_asset_ids_by_aas_id(self, aas_id: model.Identifier) -> List[model.SpecificAssetId]: + return list(self.aas_id_to_asset_ids.get(aas_id, set())) + + def add_specific_asset_ids_to_aas(self, aas_id: model.Identifier, asset_ids: List[model.SpecificAssetId]) -> None: + + if aas_id not in self.aas_id_to_asset_ids: + self.aas_id_to_asset_ids[aas_id] = set() + + for asset in asset_ids: + self.aas_id_to_asset_ids[aas_id].add(asset) + + def delete_specific_asset_ids_by_aas_id(self, aas_id: model.Identifier) -> None: + key = aas_id + if key in self.aas_id_to_asset_ids: + del self.aas_id_to_asset_ids[key] + + def search_aas_ids_by_asset_link(self, asset_link: server_model.AssetLink) -> List[model.Identifier]: + result = [] + for asset_key, aas_ids in self.asset_id_to_aas_ids.items(): + expected_key = f"{asset_link.name}:{asset_link.value}" + if asset_key == expected_key: + result.extend(list(aas_ids)) + return result + + def _add_aas_id_to_specific_asset_id(self, asset_id: model.SpecificAssetId, aas_id: model.Identifier) -> None: + if asset_id in self.asset_id_to_aas_ids: + self.asset_id_to_aas_ids[asset_id].add(aas_id) + else: + self.asset_id_to_aas_ids[asset_id] = {aas_id} + + def _delete_aas_id_from_specific_asset_ids(self, asset_id: model.SpecificAssetId, aas_id: model.Identifier) -> None: + if asset_id in self.asset_id_to_aas_ids: + self.asset_id_to_aas_ids[asset_id].discard(aas_id) + + @classmethod + def from_file(cls, filename: str) -> "DiscoveryStore": + """ + Load the state of the `DiscoveryStore` from a local file. + Safely handles files that are missing expected keys. + + """ + with open(filename, "r") as file: + data = json.load(file, cls=jsonization.ServerAASFromJsonDecoder) + discovery_store = DiscoveryStore() + discovery_store.aas_id_to_asset_ids = data.get("aas_id_to_asset_ids", {}) + discovery_store.asset_id_to_aas_ids = data.get("asset_id_to_aas_ids", {}) + return discovery_store + + def to_file(self, filename: str) -> None: + """ + Write the current state of the `DiscoveryStore` to a local JSON file for persistence. + """ + with open(filename, "w") as file: + data = { + "aas_id_to_asset_ids": self.aas_id_to_asset_ids, + "asset_id_to_aas_ids": self.asset_id_to_aas_ids, + } + json.dump(data, file, cls=jsonization.ServerAASToJsonEncoder, indent=4) + + +class DiscoveryAPI(BaseWSGIApp): + def __init__(self, persistent_store: DiscoveryStore, base_path: str = "/api/v3.1.1"): + self.persistent_store: DiscoveryStore = persistent_store + self.url_map = werkzeug.routing.Map( + [ + Submount( + base_path, + [ + Rule( + "/lookup/shellsByAssetLink", + methods=["POST"], + endpoint=self.search_all_aas_ids_by_asset_link, + ), + Submount( + "/lookup/shells", + [ + Rule( + "/", + methods=["GET"], + endpoint=self.get_all_specific_asset_ids_by_aas_id, + ), + Rule("/", methods=["POST"], endpoint=self.post_all_asset_links_by_id), + Rule( + "/", + methods=["DELETE"], + endpoint=self.delete_all_asset_links_by_id, + ), + ], + ), + ], + ) + ], + converters={"base64url": IdentifierToBase64URLConverter}, + strict_slashes=False, + ) + + def search_all_aas_ids_by_asset_link( + self, request: Request, url_args: dict, response_t: type, **_kwargs + ) -> Response: + asset_links = HTTPApiDecoder.request_body_list(request, server_model.AssetLink, False) + matching_aas_keys = set() + for asset_link in asset_links: + aas_keys = self.persistent_store.search_aas_ids_by_asset_link(asset_link) + matching_aas_keys.update(aas_keys) + paginated_slice, cursor = self._get_slice(request, list(matching_aas_keys)) + return response_t(list(paginated_slice), cursor=cursor) + + def get_all_specific_asset_ids_by_aas_id( + self, request: Request, url_args: dict, response_t: type, **_kwargs + ) -> Response: + aas_identifier = str(url_args["aas_id"]) + asset_ids = self.persistent_store.get_all_specific_asset_ids_by_aas_id(aas_identifier) + return response_t(asset_ids) + + def post_all_asset_links_by_id(self, request: Request, url_args: dict, response_t: type, **_kwargs) -> Response: + aas_identifier = str(url_args["aas_id"]) + specific_asset_ids = HTTPApiDecoder.request_body_list(request, model.SpecificAssetId, False) + self.persistent_store.add_specific_asset_ids_to_aas(aas_identifier, specific_asset_ids) + for asset_id in specific_asset_ids: + self.persistent_store._add_aas_id_to_specific_asset_id(asset_id, aas_identifier) + updated = {aas_identifier: self.persistent_store.get_all_specific_asset_ids_by_aas_id(aas_identifier)} + return response_t(updated) + + def delete_all_asset_links_by_id(self, request: Request, url_args: dict, response_t: type, **_kwargs) -> Response: + aas_identifier = str(url_args["aas_id"]) + self.persistent_store.delete_specific_asset_ids_by_aas_id(aas_identifier) + for key in list(self.persistent_store.asset_id_to_aas_ids.keys()): + self.persistent_store.asset_id_to_aas_ids[key].discard(aas_identifier) + return response_t() + + +if __name__ == "__main__": + from werkzeug.serving import run_simple + + run_simple("localhost", 8084, DiscoveryAPI(DiscoveryStore()), use_debugger=True, use_reloader=True) diff --git a/server/app/interfaces/registry.py b/server/app/interfaces/registry.py new file mode 100644 index 00000000..19dbc323 --- /dev/null +++ b/server/app/interfaces/registry.py @@ -0,0 +1,356 @@ +""" +This module implements the Registry interface defined in the 'Specification of the Asset Administration Shell Part 2 – Application Programming Interface'. +""" + +from typing import Dict, Iterator, Tuple, Type + +import werkzeug.exceptions +import werkzeug.routing +import werkzeug.urls +import werkzeug.utils +from basyx.aas import model +from werkzeug.exceptions import BadRequest, Conflict, NotFound +from werkzeug.routing import MapAdapter, Rule, Submount +from werkzeug.wrappers import Request, Response + +import app.model as server_model +from app.interfaces.base import APIResponse, HTTPApiDecoder, ObjectStoreWSGIApp, is_stripped_request +from app.model import DictDescriptorStore +from app.util.converters import IdentifierToBase64URLConverter, base64url_decode + + +class RegistryAPI(ObjectStoreWSGIApp): + def __init__(self, object_store: model.AbstractObjectStore, base_path: str = "/api/v3.1.1"): + self.object_store: model.AbstractObjectStore = object_store + self.url_map = werkzeug.routing.Map( + [ + Submount( + base_path, + [ + Rule("/description", methods=["GET"], endpoint=self.get_self_description), + Rule("/shell-descriptors", methods=["GET"], endpoint=self.get_all_aas_descriptors), + Rule("/shell-descriptors", methods=["POST"], endpoint=self.post_aas_descriptor), + Submount( + "/shell-descriptors", + [ + Rule("/", methods=["GET"], endpoint=self.get_aas_descriptor_by_id), + Rule("/", methods=["PUT"], endpoint=self.put_aas_descriptor_by_id), + Rule( + "/", methods=["DELETE"], endpoint=self.delete_aas_descriptor_by_id + ), + Submount( + "/", + [ + Rule( + "/submodel-descriptors", + methods=["GET"], + endpoint=self.get_all_submodel_descriptors_through_superpath, + ), + Rule( + "/submodel-descriptors", + methods=["POST"], + endpoint=self.post_submodel_descriptor_through_superpath, + ), + Submount( + "/submodel-descriptors", + [ + Rule( + "/", + methods=["GET"], + endpoint=self.get_submodel_descriptor_by_id_through_superpath, + ), + Rule( + "/", + methods=["PUT"], + endpoint=self.put_submodel_descriptor_by_id_through_superpath, + ), + Rule( + "/", + methods=["DELETE"], + endpoint=self.delete_submodel_descriptor_by_id_through_superpath, + ), + ], + ), + ], + ), + ], + ), + Rule("/submodel-descriptors", methods=["GET"], endpoint=self.get_all_submodel_descriptors), + Rule("/submodel-descriptors", methods=["POST"], endpoint=self.post_submodel_descriptor), + Submount( + "/submodel-descriptors", + [ + Rule( + "/", + methods=["GET"], + endpoint=self.get_submodel_descriptor_by_id, + ), + Rule( + "/", + methods=["PUT"], + endpoint=self.put_submodel_descriptor_by_id, + ), + Rule( + "/", + methods=["DELETE"], + endpoint=self.delete_submodel_descriptor_by_id, + ), + ], + ), + ], + ) + ], + converters={"base64url": IdentifierToBase64URLConverter}, + strict_slashes=False, + ) + + def _get_all_aas_descriptors( + self, request: "Request" + ) -> Tuple[Iterator[server_model.AssetAdministrationShellDescriptor], int]: + + descriptors: Iterator[server_model.AssetAdministrationShellDescriptor] = self._get_all_obj_of_type( + server_model.AssetAdministrationShellDescriptor + ) + + asset_kind_str = request.args.get("assetKind") + if asset_kind_str is not None: + try: + asset_kind = model.AssetKind[asset_kind_str] + except KeyError: + raise BadRequest( + f"Invalid assetKind '{asset_kind_str}', " + f"must be one of {list(model.AssetKind.__members__)}" + ) + descriptors = filter(lambda desc: desc.asset_kind == asset_kind, descriptors) + + asset_type = request.args.get("assetType") + if asset_type is not None: + asset_type = base64url_decode(asset_type) + try: + asset_type = model.Identifier(asset_type) + except Exception: + raise BadRequest(f"Invalid assetType: '{asset_type}'") + descriptors = filter(lambda desc: desc.asset_type == asset_type, descriptors) + + paginated_descriptors, end_index = self._get_slice(request, descriptors) + return paginated_descriptors, end_index + + def _get_aas_descriptor(self, url_args: Dict) -> server_model.AssetAdministrationShellDescriptor: + return self._get_obj_ts(url_args["aas_id"], server_model.AssetAdministrationShellDescriptor) + + def _get_all_submodel_descriptors(self, request: Request) -> Tuple[Iterator[server_model.SubmodelDescriptor], int]: + submodel_descriptors: Iterator[server_model.SubmodelDescriptor] = self._get_all_obj_of_type( + server_model.SubmodelDescriptor + ) + paginated_submodel_descriptors, end_index = self._get_slice(request, submodel_descriptors) + return paginated_submodel_descriptors, end_index + + def _get_submodel_descriptor(self, url_args: Dict) -> server_model.SubmodelDescriptor: + return self._get_obj_ts(url_args["submodel_id"], server_model.SubmodelDescriptor) + + # ------ COMMON ROUTES ------- + def get_self_description( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: + service_description = server_model.ServiceDescription( + profiles=[ + server_model.ServiceSpecificationProfileEnum.AAS_REGISTRY_FULL, + server_model.ServiceSpecificationProfileEnum.AAS_REGISTRY_READ, + server_model.ServiceSpecificationProfileEnum.SUBMODEL_REGISTRY_FULL, + server_model.ServiceSpecificationProfileEnum.SUBMODEL_REGISTRY_READ, + ] + ) + return response_t(service_description.to_dict()) + + # ------ AAS REGISTRY ROUTES ------- + def get_all_aas_descriptors( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: + aas_descriptors, cursor = self._get_all_aas_descriptors(request) + return response_t(list(aas_descriptors), cursor=cursor) + + def post_aas_descriptor( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], map_adapter: MapAdapter + ) -> Response: + descriptor = HTTPApiDecoder.request_body(request, server_model.AssetAdministrationShellDescriptor, False) + try: + self.object_store.add(descriptor) + except KeyError as e: + raise Conflict(f"AssetAdministrationShellDescriptor with Identifier {descriptor.id} already exists!") from e + descriptor.commit() + created_resource_url = map_adapter.build( + self.get_aas_descriptor_by_id, {"aas_id": descriptor.id}, force_external=True + ) + return response_t(descriptor, status=201, headers={"Location": created_resource_url}) + + def get_aas_descriptor_by_id( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: + descriptor = self._get_aas_descriptor(url_args) + return response_t(descriptor) + + def put_aas_descriptor_by_id( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], map_adapter: MapAdapter, **_kwargs + ) -> Response: + try: + descriptor = self._get_aas_descriptor(url_args) + descriptor.update_from( + HTTPApiDecoder.request_body( + request, server_model.AssetAdministrationShellDescriptor, is_stripped_request(request) + ) + ) + descriptor.commit() + return response_t() + except NotFound: + descriptor = HTTPApiDecoder.request_body(request, server_model.AssetAdministrationShellDescriptor, False) + self.object_store.add(descriptor) + descriptor.commit() + created_resource_url = map_adapter.build( + self.get_aas_descriptor_by_id, {"aas_id": descriptor.id}, force_external=True + ) + return response_t(descriptor, status=201, headers={"Location": created_resource_url}) + + def delete_aas_descriptor_by_id( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: + descriptor = self._get_aas_descriptor(url_args) + self.object_store.remove(descriptor) + return response_t() + + def get_all_submodel_descriptors_through_superpath( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: + aas_descriptor = self._get_aas_descriptor(url_args) + submodel_descriptors, cursor = self._get_slice(request, aas_descriptor.submodel_descriptors) + return response_t(list(submodel_descriptors), cursor=cursor) + + def get_submodel_descriptor_by_id_through_superpath( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: + aas_descriptor = self._get_aas_descriptor(url_args) + submodel_id = url_args["submodel_id"] + submodel_descriptor = next((sd for sd in aas_descriptor.submodel_descriptors if sd.id == submodel_id), None) + if submodel_descriptor is None: + raise NotFound(f"Submodel Descriptor with Identifier {submodel_id} not found in AssetAdministrationShell!") + return response_t(submodel_descriptor) + + def post_submodel_descriptor_through_superpath( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], map_adapter: MapAdapter + ) -> Response: + aas_descriptor = self._get_aas_descriptor(url_args) + submodel_descriptor = HTTPApiDecoder.request_body( + request, server_model.SubmodelDescriptor, is_stripped_request(request) + ) + if any(sd.id == submodel_descriptor.id for sd in aas_descriptor.submodel_descriptors): + raise Conflict(f"Submodel Descriptor with Identifier {submodel_descriptor.id} already exists!") + aas_descriptor.submodel_descriptors.append(submodel_descriptor) + aas_descriptor.commit() + created_resource_url = map_adapter.build( + self.get_submodel_descriptor_by_id_through_superpath, + {"aas_id": aas_descriptor.id, "submodel_id": submodel_descriptor.id}, + force_external=True, + ) + return response_t(submodel_descriptor, status=201, headers={"Location": created_resource_url}) + + def put_submodel_descriptor_by_id_through_superpath( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], map_adapter: MapAdapter, **_kwargs + ) -> Response: + aas_descriptor = self._get_aas_descriptor(url_args) + try: + submodel_id = url_args["submodel_id"] + submodel_descriptor = next((sd for sd in aas_descriptor.submodel_descriptors if sd.id == submodel_id), None) + if submodel_descriptor is None: + raise NotFound( + f"Submodel Descriptor with Identifier {submodel_id} not found in AssetAdministrationShell!" + ) + submodel_descriptor.update_from( + HTTPApiDecoder.request_body(request, server_model.SubmodelDescriptor, is_stripped_request(request)) + ) + aas_descriptor.commit() + return response_t() + except NotFound: + submodel_descriptor = HTTPApiDecoder.request_body( + request, server_model.SubmodelDescriptor, is_stripped_request(request) + ) + aas_descriptor.submodel_descriptors.append(submodel_descriptor) + aas_descriptor.commit() + created_resource_url = map_adapter.build( + self.get_submodel_descriptor_by_id_through_superpath, + {"aas_id": aas_descriptor.id, "submodel_id": submodel_descriptor.id}, + force_external=True, + ) + return response_t(submodel_descriptor, status=201, headers={"Location": created_resource_url}) + + def delete_submodel_descriptor_by_id_through_superpath( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: + aas_descriptor = self._get_aas_descriptor(url_args) + submodel_id = url_args["submodel_id"] + submodel_descriptor = next((sd for sd in aas_descriptor.submodel_descriptors if sd.id == submodel_id), None) + if submodel_descriptor is None: + raise NotFound(f"Submodel Descriptor with Identifier {submodel_id} not found in AssetAdministrationShell!") + aas_descriptor.submodel_descriptors.remove(submodel_descriptor) + aas_descriptor.commit() + return response_t() + + # ------ Submodel REGISTRY ROUTES ------- + def get_all_submodel_descriptors( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: + submodel_descriptors, cursor = self._get_all_submodel_descriptors(request) + return response_t(list(submodel_descriptors), cursor=cursor, stripped=is_stripped_request(request)) + + def get_submodel_descriptor_by_id( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: + submodel_descriptor = self._get_submodel_descriptor(url_args) + return response_t(submodel_descriptor, stripped=is_stripped_request(request)) + + def post_submodel_descriptor( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], map_adapter: MapAdapter + ) -> Response: + submodel_descriptor = HTTPApiDecoder.request_body( + request, server_model.SubmodelDescriptor, is_stripped_request(request) + ) + try: + self.object_store.add(submodel_descriptor) + except KeyError as e: + raise Conflict(f"Submodel Descriptor with Identifier {submodel_descriptor.id} already exists!") from e + submodel_descriptor.commit() + created_resource_url = map_adapter.build( + self.get_submodel_descriptor_by_id, {"submodel_id": submodel_descriptor.id}, force_external=True + ) + return response_t(submodel_descriptor, status=201, headers={"Location": created_resource_url}) + + def put_submodel_descriptor_by_id( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], map_adapter: MapAdapter, **_kwargs + ) -> Response: + try: + submodel_descriptor = self._get_submodel_descriptor(url_args) + submodel_descriptor.update_from( + HTTPApiDecoder.request_body(request, server_model.SubmodelDescriptor, is_stripped_request(request)) + ) + submodel_descriptor.commit() + return response_t() + except NotFound: + submodel_descriptor = HTTPApiDecoder.request_body( + request, server_model.SubmodelDescriptor, is_stripped_request(request) + ) + self.object_store.add(submodel_descriptor) + submodel_descriptor.commit() + created_resource_url = map_adapter.build( + self.get_submodel_descriptor_by_id, {"submodel_id": submodel_descriptor.id}, force_external=True + ) + return response_t(submodel_descriptor, status=201, headers={"Location": created_resource_url}) + + def delete_submodel_descriptor_by_id( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: + self.object_store.remove(self._get_obj_ts(url_args["submodel_id"], server_model.SubmodelDescriptor)) + return response_t() + + +if __name__ == "__main__": + from werkzeug.serving import run_simple + + run_simple("localhost", 8083, RegistryAPI(DictDescriptorStore()), use_debugger=True, use_reloader=True) diff --git a/server/app/interfaces/repository.py b/server/app/interfaces/repository.py index c1ee513e..51d7b15e 100644 --- a/server/app/interfaces/repository.py +++ b/server/app/interfaces/repository.py @@ -10,171 +10,348 @@ import io import json -from typing import Type, Iterator, List, Dict, Union, Callable, Tuple, Optional, Iterable +from typing import Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Type, Union import werkzeug.exceptions import werkzeug.routing import werkzeug.utils -from werkzeug import Response, Request -from werkzeug.datastructures import FileStorage -from werkzeug.exceptions import NotFound, BadRequest, Conflict -from werkzeug.routing import Submount, Rule, MapAdapter - from basyx.aas import model from basyx.aas.adapter import aasx +from werkzeug import Request, Response +from werkzeug.datastructures import FileStorage +from werkzeug.exceptions import BadRequest, Conflict, NotFound +from werkzeug.routing import MapAdapter, Rule, Submount + +from app.interfaces.base import APIResponse, HTTPApiDecoder, ObjectStoreWSGIApp, T, is_stripped_request from app.util.converters import IdentifierToBase64URLConverter, IdShortPathConverter, base64url_decode -from .base import ObjectStoreWSGIApp, APIResponse, is_stripped_request, HTTPApiDecoder, T class WSGIApp(ObjectStoreWSGIApp): - def __init__(self, object_store: model.AbstractObjectStore, file_store: aasx.AbstractSupplementaryFileContainer, - base_path: str = "/api/v3.0"): + def __init__( + self, + object_store: model.AbstractObjectStore, + file_store: aasx.AbstractSupplementaryFileContainer, + base_path: str = "/api/v3.0", + ): self.object_store: model.AbstractObjectStore = object_store self.file_store: aasx.AbstractSupplementaryFileContainer = file_store - self.url_map = werkzeug.routing.Map([ - Submount(base_path, [ - Rule("/serialization", methods=["GET"], endpoint=self.not_implemented), - Rule("/description", methods=["GET"], endpoint=self.not_implemented), - Rule("/shells", methods=["GET"], endpoint=self.get_aas_all), - Rule("/shells", methods=["POST"], endpoint=self.post_aas), - Submount("/shells", [ - Rule("/$reference", methods=["GET"], endpoint=self.get_aas_all_reference), - Rule("/", methods=["GET"], endpoint=self.get_aas), - Rule("/", methods=["PUT"], endpoint=self.put_aas), - Rule("/", methods=["DELETE"], endpoint=self.delete_aas), - Submount("/", [ - Rule("/$reference", methods=["GET"], endpoint=self.get_aas_reference), - Rule("/asset-information", methods=["GET"], endpoint=self.get_aas_asset_information), - Rule("/asset-information", methods=["PUT"], endpoint=self.put_aas_asset_information), - Rule("/asset-information/thumbnail", methods=["GET", "PUT", "DELETE"], - endpoint=self.not_implemented), - Rule("/submodel-refs", methods=["GET"], endpoint=self.get_aas_submodel_refs), - Rule("/submodel-refs", methods=["POST"], endpoint=self.post_aas_submodel_refs), - Rule("/submodel-refs/", methods=["DELETE"], - endpoint=self.delete_aas_submodel_refs_specific), - Submount("/submodels", [ - Rule("/", methods=["PUT"], - endpoint=self.put_aas_submodel_refs_submodel), - Rule("/", methods=["DELETE"], - endpoint=self.delete_aas_submodel_refs_submodel), - Rule("/", endpoint=self.aas_submodel_refs_redirect), - Rule("//", endpoint=self.aas_submodel_refs_redirect) - ]) - ]) - ]), - Rule("/submodels", methods=["GET"], endpoint=self.get_submodel_all), - Rule("/submodels", methods=["POST"], endpoint=self.post_submodel), - Submount("/submodels", [ - Rule("/$metadata", methods=["GET"], endpoint=self.get_submodel_all_metadata), - Rule("/$reference", methods=["GET"], endpoint=self.get_submodel_all_reference), - Rule("/$value", methods=["GET"], endpoint=self.not_implemented), - Rule("/$path", methods=["GET"], endpoint=self.not_implemented), - Rule("/", methods=["GET"], endpoint=self.get_submodel), - Rule("/", methods=["PUT"], endpoint=self.put_submodel), - Rule("/", methods=["DELETE"], endpoint=self.delete_submodel), - Rule("/", methods=["PATCH"], endpoint=self.not_implemented), - Submount("/", [ - Rule("/$metadata", methods=["GET"], endpoint=self.get_submodels_metadata), - Rule("/$metadata", methods=["PATCH"], endpoint=self.not_implemented), - Rule("/$value", methods=["GET"], endpoint=self.not_implemented), - Rule("/$value", methods=["PATCH"], endpoint=self.not_implemented), - Rule("/$reference", methods=["GET"], endpoint=self.get_submodels_reference), - Rule("/$path", methods=["GET"], endpoint=self.not_implemented), - Rule("/submodel-elements", methods=["GET"], endpoint=self.get_submodel_submodel_elements), - Rule("/submodel-elements", methods=["POST"], - endpoint=self.post_submodel_submodel_elements_id_short_path), - Submount("/submodel-elements", [ - Rule("/$metadata", methods=["GET"], endpoint=self.get_submodel_submodel_elements_metadata), - Rule("/$reference", methods=["GET"], - endpoint=self.get_submodel_submodel_elements_reference), - Rule("/$value", methods=["GET"], endpoint=self.not_implemented), - Rule("/$path", methods=["GET"], endpoint=self.not_implemented), - Rule("/", methods=["GET"], - endpoint=self.get_submodel_submodel_elements_id_short_path), - Rule("/", methods=["POST"], - endpoint=self.post_submodel_submodel_elements_id_short_path), - Rule("/", methods=["PUT"], - endpoint=self.put_submodel_submodel_elements_id_short_path), - Rule("/", methods=["DELETE"], - endpoint=self.delete_submodel_submodel_elements_id_short_path), - Rule("/", methods=["PATCH"], endpoint=self.not_implemented), - Submount("/", [ - Rule("/$metadata", methods=["GET"], - endpoint=self.get_submodel_submodel_elements_id_short_path_metadata), - Rule("/$metadata", methods=["PATCH"], endpoint=self.not_implemented), - Rule("/$reference", methods=["GET"], - endpoint=self.get_submodel_submodel_elements_id_short_path_reference), + self.url_map = werkzeug.routing.Map( + [ + Submount( + base_path, + [ + Rule("/serialization", methods=["GET"], endpoint=self.not_implemented), + Rule("/description", methods=["GET"], endpoint=self.not_implemented), + Rule("/shells", methods=["GET"], endpoint=self.get_aas_all), + Rule("/shells", methods=["POST"], endpoint=self.post_aas), + Submount( + "/shells", + [ + Rule("/$reference", methods=["GET"], endpoint=self.get_aas_all_reference), + Rule("/", methods=["GET"], endpoint=self.get_aas), + Rule("/", methods=["PUT"], endpoint=self.put_aas), + Rule("/", methods=["DELETE"], endpoint=self.delete_aas), + Submount( + "/", + [ + Rule("/$reference", methods=["GET"], endpoint=self.get_aas_reference), + Rule( + "/asset-information", + methods=["GET"], + endpoint=self.get_aas_asset_information, + ), + Rule( + "/asset-information", + methods=["PUT"], + endpoint=self.put_aas_asset_information, + ), + Rule( + "/asset-information/thumbnail", + methods=["GET", "PUT", "DELETE"], + endpoint=self.not_implemented, + ), + Rule("/submodel-refs", methods=["GET"], endpoint=self.get_aas_submodel_refs), + Rule("/submodel-refs", methods=["POST"], endpoint=self.post_aas_submodel_refs), + Rule( + "/submodel-refs/", + methods=["DELETE"], + endpoint=self.delete_aas_submodel_refs_specific, + ), + Submount( + "/submodels", + [ + Rule( + "/", + methods=["PUT"], + endpoint=self.put_aas_submodel_refs_submodel, + ), + Rule( + "/", + methods=["DELETE"], + endpoint=self.delete_aas_submodel_refs_submodel, + ), + Rule( + "/", endpoint=self.aas_submodel_refs_redirect + ), + Rule( + "//", + endpoint=self.aas_submodel_refs_redirect, + ), + ], + ), + ], + ), + ], + ), + Rule("/submodels", methods=["GET"], endpoint=self.get_submodel_all), + Rule("/submodels", methods=["POST"], endpoint=self.post_submodel), + Submount( + "/submodels", + [ + Rule("/$metadata", methods=["GET"], endpoint=self.get_submodel_all_metadata), + Rule("/$reference", methods=["GET"], endpoint=self.get_submodel_all_reference), Rule("/$value", methods=["GET"], endpoint=self.not_implemented), - Rule("/$value", methods=["PATCH"], endpoint=self.not_implemented), Rule("/$path", methods=["GET"], endpoint=self.not_implemented), - Rule("/attachment", methods=["GET"], - endpoint=self.get_submodel_submodel_element_attachment), - Rule("/attachment", methods=["PUT"], - endpoint=self.put_submodel_submodel_element_attachment), - Rule("/attachment", methods=["DELETE"], - endpoint=self.delete_submodel_submodel_element_attachment), - Rule("/invoke", methods=["POST"], endpoint=self.not_implemented), - Rule("/invoke/$value", methods=["POST"], endpoint=self.not_implemented), - Rule("/invoke-async", methods=["POST"], endpoint=self.not_implemented), - Rule("/invoke-async/$value", methods=["POST"], endpoint=self.not_implemented), - Rule("/operation-status/", methods=["GET"], - endpoint=self.not_implemented), - Submount("/operation-results", [ - Rule("/", methods=["GET"], endpoint=self.not_implemented), - Rule("//$value", methods=["GET"], endpoint=self.not_implemented) - ]), - Rule("/qualifiers", methods=["GET"], - endpoint=self.get_submodel_submodel_element_qualifiers), - Rule("/qualifiers", methods=["POST"], - endpoint=self.post_submodel_submodel_element_qualifiers), - Submount("/qualifiers", [ - Rule("/", methods=["GET"], - endpoint=self.get_submodel_submodel_element_qualifiers), - Rule("/", methods=["PUT"], - endpoint=self.put_submodel_submodel_element_qualifiers), - Rule("/", methods=["DELETE"], - endpoint=self.delete_submodel_submodel_element_qualifiers) - ]) - ]) - ]), - Rule("/qualifiers", methods=["GET"], endpoint=self.get_submodel_submodel_element_qualifiers), - Rule("/qualifiers", methods=["POST"], endpoint=self.post_submodel_submodel_element_qualifiers), - Submount("/qualifiers", [ - Rule("/", methods=["GET"], - endpoint=self.get_submodel_submodel_element_qualifiers), - Rule("/", methods=["PUT"], - endpoint=self.put_submodel_submodel_element_qualifiers), - Rule("/", methods=["DELETE"], - endpoint=self.delete_submodel_submodel_element_qualifiers) - ]) - ]) - ]), - Rule("/concept-descriptions", methods=["GET"], endpoint=self.get_concept_description_all), - Rule("/concept-descriptions", methods=["POST"], endpoint=self.post_concept_description), - Submount("/concept-descriptions", [ - Rule("/", methods=["GET"], endpoint=self.get_concept_description), - Rule("/", methods=["PUT"], endpoint=self.put_concept_description), - Rule("/", methods=["DELETE"], endpoint=self.delete_concept_description), - ]), - ]) - ], converters={ - "base64url": IdentifierToBase64URLConverter, - "id_short_path": IdShortPathConverter - }, strict_slashes=False) + Rule("/", methods=["GET"], endpoint=self.get_submodel), + Rule("/", methods=["PUT"], endpoint=self.put_submodel), + Rule("/", methods=["DELETE"], endpoint=self.delete_submodel), + Rule("/", methods=["PATCH"], endpoint=self.not_implemented), + Submount( + "/", + [ + Rule("/$metadata", methods=["GET"], endpoint=self.get_submodels_metadata), + Rule("/$metadata", methods=["PATCH"], endpoint=self.not_implemented), + Rule("/$value", methods=["GET"], endpoint=self.not_implemented), + Rule("/$value", methods=["PATCH"], endpoint=self.not_implemented), + Rule("/$reference", methods=["GET"], endpoint=self.get_submodels_reference), + Rule("/$path", methods=["GET"], endpoint=self.not_implemented), + Rule( + "/submodel-elements", + methods=["GET"], + endpoint=self.get_submodel_submodel_elements, + ), + Rule( + "/submodel-elements", + methods=["POST"], + endpoint=self.post_submodel_submodel_elements_id_short_path, + ), + Submount( + "/submodel-elements", + [ + Rule( + "/$metadata", + methods=["GET"], + endpoint=self.get_submodel_submodel_elements_metadata, + ), + Rule( + "/$reference", + methods=["GET"], + endpoint=self.get_submodel_submodel_elements_reference, + ), + Rule("/$value", methods=["GET"], endpoint=self.not_implemented), + Rule("/$path", methods=["GET"], endpoint=self.not_implemented), + Rule( + "/", + methods=["GET"], + endpoint=self.get_submodel_submodel_elements_id_short_path, + ), + Rule( + "/", + methods=["POST"], + endpoint=self.post_submodel_submodel_elements_id_short_path, + ), + Rule( + "/", + methods=["PUT"], + endpoint=self.put_submodel_submodel_elements_id_short_path, + ), + Rule( + "/", + methods=["DELETE"], + endpoint=self.delete_submodel_submodel_elements_id_short_path, + ), + Rule( + "/", + methods=["PATCH"], + endpoint=self.not_implemented, + ), + Submount( + "/", + [ + Rule( + "/$metadata", + methods=["GET"], + endpoint=self.get_submodel_submodel_elements_id_short_path_metadata, + ), + Rule( + "/$metadata", + methods=["PATCH"], + endpoint=self.not_implemented, + ), + Rule( + "/$reference", + methods=["GET"], + endpoint=self.get_submodel_submodel_elements_id_short_path_reference, + ), + Rule("/$value", methods=["GET"], endpoint=self.not_implemented), + Rule( + "/$value", methods=["PATCH"], endpoint=self.not_implemented + ), + Rule("/$path", methods=["GET"], endpoint=self.not_implemented), + Rule( + "/attachment", + methods=["GET"], + endpoint=self.get_submodel_submodel_element_attachment, + ), + Rule( + "/attachment", + methods=["PUT"], + endpoint=self.put_submodel_submodel_element_attachment, + ), + Rule( + "/attachment", + methods=["DELETE"], + endpoint=self.delete_submodel_submodel_element_attachment, + ), + Rule( + "/invoke", methods=["POST"], endpoint=self.not_implemented + ), + Rule( + "/invoke/$value", + methods=["POST"], + endpoint=self.not_implemented, + ), + Rule( + "/invoke-async", + methods=["POST"], + endpoint=self.not_implemented, + ), + Rule( + "/invoke-async/$value", + methods=["POST"], + endpoint=self.not_implemented, + ), + Rule( + "/operation-status/", + methods=["GET"], + endpoint=self.not_implemented, + ), + Submount( + "/operation-results", + [ + Rule( + "/", + methods=["GET"], + endpoint=self.not_implemented, + ), + Rule( + "//$value", + methods=["GET"], + endpoint=self.not_implemented, + ), + ], + ), + Rule( + "/qualifiers", + methods=["GET"], + endpoint=self.get_submodel_submodel_element_qualifiers, + ), + Rule( + "/qualifiers", + methods=["POST"], + endpoint=self.post_submodel_submodel_element_qualifiers, + ), + Submount( + "/qualifiers", + [ + Rule( + "/", + methods=["GET"], + endpoint=self.get_submodel_submodel_element_qualifiers, + ), + Rule( + "/", + methods=["PUT"], + endpoint=self.put_submodel_submodel_element_qualifiers, + ), + Rule( + "/", + methods=["DELETE"], + endpoint=self.delete_submodel_submodel_element_qualifiers, + ), + ], + ), + ], + ), + ], + ), + Rule( + "/qualifiers", + methods=["GET"], + endpoint=self.get_submodel_submodel_element_qualifiers, + ), + Rule( + "/qualifiers", + methods=["POST"], + endpoint=self.post_submodel_submodel_element_qualifiers, + ), + Submount( + "/qualifiers", + [ + Rule( + "/", + methods=["GET"], + endpoint=self.get_submodel_submodel_element_qualifiers, + ), + Rule( + "/", + methods=["PUT"], + endpoint=self.put_submodel_submodel_element_qualifiers, + ), + Rule( + "/", + methods=["DELETE"], + endpoint=self.delete_submodel_submodel_element_qualifiers, + ), + ], + ), + ], + ), + ], + ), + Rule("/concept-descriptions", methods=["GET"], endpoint=self.get_concept_description_all), + Rule("/concept-descriptions", methods=["POST"], endpoint=self.post_concept_description), + Submount( + "/concept-descriptions", + [ + Rule("/", methods=["GET"], endpoint=self.get_concept_description), + Rule("/", methods=["PUT"], endpoint=self.put_concept_description), + Rule( + "/", + methods=["DELETE"], + endpoint=self.delete_concept_description, + ), + ], + ), + ], + ) + ], + converters={"base64url": IdentifierToBase64URLConverter, "id_short_path": IdShortPathConverter}, + strict_slashes=False, + ) # TODO: the parameters can be typed via builtin wsgiref with Python 3.11+ def __call__(self, environ, start_response) -> Iterable[bytes]: response: Response = self.handle_request(Request(environ)) return response(environ, start_response) - def _get_obj_ts(self, identifier: model.Identifier, type_: Type[model.provider._IDENTIFIABLE]) \ - -> model.provider._IDENTIFIABLE: + def _get_obj_ts(self, identifier: model.Identifier, type_: Type[T]) -> T: identifiable = self.object_store.get(identifier) if not isinstance(identifiable, type_): raise NotFound(f"No {type_.__name__} with {identifier} found!") return identifiable - def _get_all_obj_of_type(self, type_: Type[model.provider._IDENTIFIABLE]) -> Iterator[model.provider._IDENTIFIABLE]: + def _get_all_obj_of_type(self, type_: Type[T]) -> Iterator[T]: for obj in self.object_store: if isinstance(obj, type_): yield obj @@ -186,8 +363,9 @@ def _resolve_reference(self, reference: model.ModelReference[model.base._RT]) -> raise werkzeug.exceptions.InternalServerError(str(e)) from e @classmethod - def _get_nested_submodel_element(cls, namespace: model.UniqueIdShortNamespace, id_shorts: List[str]) \ - -> model.SubmodelElement: + def _get_nested_submodel_element( + cls, namespace: model.UniqueIdShortNamespace, id_shorts: List[str] + ) -> model.SubmodelElement: if not id_shorts: raise ValueError("No id_shorts specified!") @@ -217,8 +395,9 @@ def _expect_namespace(cls, obj: object, needle: str) -> model.UniqueIdShortNames return obj @classmethod - def _namespace_submodel_element_op(cls, namespace: model.UniqueIdShortNamespace, op: Callable[[str], T], arg: str) \ - -> T: + def _namespace_submodel_element_op( + cls, namespace: model.UniqueIdShortNamespace, op: Callable[[str], T], arg: str + ) -> T: try: return op(arg) except KeyError as e: @@ -232,8 +411,9 @@ def _qualifiable_qualifier_op(cls, qualifiable: model.Qualifiable, op: Callable[ raise NotFound(f"Qualifier with type {arg!r} not found in {qualifiable!r}") from e @classmethod - def _get_submodel_reference(cls, aas: model.AssetAdministrationShell, submodel_id: model.NameType) \ - -> model.ModelReference[model.Submodel]: + def _get_submodel_reference( + cls, aas: model.AssetAdministrationShell, submodel_id: model.NameType + ) -> model.ModelReference[model.Submodel]: # TODO: this is currently O(n), could be O(1) as aas.submodel, but keys would have to precisely match, as they # are hashed including their KeyType for ref in aas.submodel: @@ -261,19 +441,28 @@ def _get_shells(self, request: Request) -> Tuple[Iterator[model.AssetAdministrat value = asset_dict["value"] if name == "specificAssetId": - decoded_specific_id = HTTPApiDecoder.json_list(value, model.SpecificAssetId, - False, True)[0] + decoded_specific_id = HTTPApiDecoder.json_list(value, model.SpecificAssetId, False, True)[0] specific_asset_ids.append(decoded_specific_id) elif name == "globalAssetId": global_asset_ids.append(value) # Filter AAS based on both SpecificAssetIds and globalAssetIds - aas = filter(lambda shell: ( - (not specific_asset_ids or all(specific_asset_id in shell.asset_information.specific_asset_id - for specific_asset_id in specific_asset_ids)) and - (len(global_asset_ids) <= 1 and - (not global_asset_ids or shell.asset_information.global_asset_id in global_asset_ids)) - ), aas) + aas = filter( + lambda shell: ( + ( + not specific_asset_ids + or all( + specific_asset_id in shell.asset_information.specific_asset_id + for specific_asset_id in specific_asset_ids + ) + ) + and ( + len(global_asset_ids) <= 1 + and (not global_asset_ids or shell.asset_information.global_asset_id in global_asset_ids) + ) + ), + aas, + ) paginated_aas, end_index = self._get_slice(request, aas) return paginated_aas, end_index @@ -289,7 +478,8 @@ def _get_submodels(self, request: Request) -> Tuple[Iterator[model.Submodel], in semantic_id = request.args.get("semanticId") if semantic_id is not None: spec_semantic_id = HTTPApiDecoder.base64url_json( - semantic_id, model.Reference, False) # type: ignore[type-abstract] + semantic_id, model.Reference, False # type: ignore[type-abstract] + ) submodels = filter(lambda sm: sm.semantic_id == spec_semantic_id, submodels) paginated_submodels, end_index = self._get_slice(request, submodels) return paginated_submodels, end_index @@ -297,8 +487,9 @@ def _get_submodels(self, request: Request) -> Tuple[Iterator[model.Submodel], in def _get_submodel(self, url_args: Dict) -> model.Submodel: return self._get_obj_ts(url_args["submodel_id"], model.Submodel) - def _get_submodel_submodel_elements(self, request: Request, url_args: Dict) -> \ - Tuple[Iterator[model.SubmodelElement], int]: + def _get_submodel_submodel_elements( + self, request: Request, url_args: Dict + ) -> Tuple[Iterator[model.SubmodelElement], int]: submodel = self._get_submodel(url_args) paginated_submodel_elements: Iterator[model.SubmodelElement] paginated_submodel_elements, end_index = self._get_slice(request, submodel.submodel_element) @@ -321,23 +512,22 @@ def get_aas_all(self, request: Request, url_args: Dict, response_t: Type[APIResp aashells, cursor = self._get_shells(request) return response_t(list(aashells), cursor=cursor) - def post_aas(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - map_adapter: MapAdapter) -> Response: + def post_aas( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], map_adapter: MapAdapter + ) -> Response: aas = HTTPApiDecoder.request_body(request, model.AssetAdministrationShell, False) try: self.object_store.add(aas) except KeyError as e: raise Conflict(f"AssetAdministrationShell with Identifier {aas.id} already exists!") from e - created_resource_url = map_adapter.build(self.get_aas, { - "aas_id": aas.id - }, force_external=True) + created_resource_url = map_adapter.build(self.get_aas, {"aas_id": aas.id}, force_external=True) return response_t(aas, status=201, headers={"Location": created_resource_url}) - def get_aas_all_reference(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def get_aas_all_reference( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: aashells, cursor = self._get_shells(request) - references: list[model.ModelReference] = [model.ModelReference.from_referable(aas) - for aas in aashells] + references: list[model.ModelReference] = [model.ModelReference.from_referable(aas) for aas in aashells] return response_t(references, cursor=cursor) # --------- AAS ROUTES --------- @@ -352,8 +542,9 @@ def get_aas_reference(self, request: Request, url_args: Dict, response_t: Type[A def put_aas(self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs) -> Response: aas = self._get_shell(url_args) - aas.update_from(HTTPApiDecoder.request_body(request, model.AssetAdministrationShell, - is_stripped_request(request))) + aas.update_from( + HTTPApiDecoder.request_body(request, model.AssetAdministrationShell, is_stripped_request(request)) + ) return response_t() def delete_aas(self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs) -> Response: @@ -361,26 +552,30 @@ def delete_aas(self, request: Request, url_args: Dict, response_t: Type[APIRespo self.object_store.remove(aas) return response_t() - def get_aas_asset_information(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def get_aas_asset_information( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: aas = self._get_shell(url_args) return response_t(aas.asset_information) - def put_aas_asset_information(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def put_aas_asset_information( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: aas = self._get_shell(url_args) aas.asset_information = HTTPApiDecoder.request_body(request, model.AssetInformation, False) return response_t() - def get_aas_submodel_refs(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def get_aas_submodel_refs( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: aas = self._get_shell(url_args) submodel_refs: Iterator[model.ModelReference[model.Submodel]] submodel_refs, cursor = self._get_slice(request, aas.submodel) return response_t(list(submodel_refs), cursor=cursor) - def post_aas_submodel_refs(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def post_aas_submodel_refs( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: aas = self._get_shell(url_args) sm_ref = HTTPApiDecoder.request_body(request, model.ModelReference, False) if sm_ref in aas.submodel: @@ -388,14 +583,16 @@ def post_aas_submodel_refs(self, request: Request, url_args: Dict, response_t: T aas.submodel.add(sm_ref) return response_t(sm_ref, status=201) - def delete_aas_submodel_refs_specific(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def delete_aas_submodel_refs_specific( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: aas = self._get_shell(url_args) aas.submodel.remove(self._get_submodel_reference(aas, url_args["submodel_id"])) return response_t() - def put_aas_submodel_refs_submodel(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def put_aas_submodel_refs_submodel( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: aas = self._get_shell(url_args) sm_ref = self._get_submodel_reference(aas, url_args["submodel_id"]) submodel = self._resolve_reference(sm_ref) @@ -409,8 +606,9 @@ def put_aas_submodel_refs_submodel(self, request: Request, url_args: Dict, respo aas.submodel.add(model.ModelReference.from_referable(submodel)) return response_t() - def delete_aas_submodel_refs_submodel(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def delete_aas_submodel_refs_submodel( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: aas = self._get_shell(url_args) sm_ref = self._get_submodel_reference(aas, url_args["submodel_id"]) submodel = self._resolve_reference(sm_ref) @@ -418,14 +616,15 @@ def delete_aas_submodel_refs_submodel(self, request: Request, url_args: Dict, re aas.submodel.remove(sm_ref) return response_t() - def aas_submodel_refs_redirect(self, request: Request, url_args: Dict, map_adapter: MapAdapter, response_t=None, - **_kwargs) -> Response: + def aas_submodel_refs_redirect( + self, request: Request, url_args: Dict, map_adapter: MapAdapter, response_t=None, **_kwargs + ) -> Response: aas = self._get_shell(url_args) # the following makes sure the reference exists self._get_submodel_reference(aas, url_args["submodel_id"]) - redirect_url = map_adapter.build(self.get_submodel, { - "submodel_id": url_args["submodel_id"] - }, force_external=True) + redirect_url = map_adapter.build( + self.get_submodel, {"submodel_id": url_args["submodel_id"]}, force_external=True + ) if "path" in url_args: redirect_url += "/" + url_args["path"] if request.query_string: @@ -437,28 +636,30 @@ def get_submodel_all(self, request: Request, url_args: Dict, response_t: Type[AP submodels, cursor = self._get_submodels(request) return response_t(list(submodels), cursor=cursor, stripped=is_stripped_request(request)) - def post_submodel(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - map_adapter: MapAdapter) -> Response: + def post_submodel( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], map_adapter: MapAdapter + ) -> Response: submodel = HTTPApiDecoder.request_body(request, model.Submodel, is_stripped_request(request)) try: self.object_store.add(submodel) except KeyError as e: raise Conflict(f"Submodel with Identifier {submodel.id} already exists!") from e - created_resource_url = map_adapter.build(self.get_submodel, { - "submodel_id": submodel.id - }, force_external=True) + created_resource_url = map_adapter.build(self.get_submodel, {"submodel_id": submodel.id}, force_external=True) return response_t(submodel, status=201, headers={"Location": created_resource_url}) - def get_submodel_all_metadata(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def get_submodel_all_metadata( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: submodels, cursor = self._get_submodels(request) return response_t(list(submodels), cursor=cursor, stripped=True) - def get_submodel_all_reference(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def get_submodel_all_reference( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: submodels, cursor = self._get_submodels(request) - references: list[model.ModelReference] = [model.ModelReference.from_referable(submodel) - for submodel in submodels] + references: list[model.ModelReference] = [ + model.ModelReference.from_referable(submodel) for submodel in submodels + ] return response_t(references, cursor=cursor, stripped=is_stripped_request(request)) # --------- SUBMODEL ROUTES --------- @@ -471,13 +672,15 @@ def get_submodel(self, request: Request, url_args: Dict, response_t: Type[APIRes submodel = self._get_submodel(url_args) return response_t(submodel, stripped=is_stripped_request(request)) - def get_submodels_metadata(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def get_submodels_metadata( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: submodel = self._get_submodel(url_args) return response_t(submodel, stripped=True) - def get_submodels_reference(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def get_submodels_reference( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: submodel = self._get_submodel(url_args) reference = model.ModelReference.from_referable(submodel) return response_t(reference, stripped=is_stripped_request(request)) @@ -487,83 +690,91 @@ def put_submodel(self, request: Request, url_args: Dict, response_t: Type[APIRes submodel.update_from(HTTPApiDecoder.request_body(request, model.Submodel, is_stripped_request(request))) return response_t() - def get_submodel_submodel_elements(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def get_submodel_submodel_elements( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: submodel_elements, cursor = self._get_submodel_submodel_elements(request, url_args) return response_t(list(submodel_elements), cursor=cursor, stripped=is_stripped_request(request)) - def get_submodel_submodel_elements_metadata(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def get_submodel_submodel_elements_metadata( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: submodel_elements, cursor = self._get_submodel_submodel_elements(request, url_args) return response_t(list(submodel_elements), cursor=cursor, stripped=True) - def get_submodel_submodel_elements_reference(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def get_submodel_submodel_elements_reference( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: submodel_elements, cursor = self._get_submodel_submodel_elements(request, url_args) - references: list[model.ModelReference] = [model.ModelReference.from_referable(element) for element in - list(submodel_elements)] + references: list[model.ModelReference] = [ + model.ModelReference.from_referable(element) for element in list(submodel_elements) + ] return response_t(references, cursor=cursor, stripped=is_stripped_request(request)) - def get_submodel_submodel_elements_id_short_path(self, request: Request, url_args: Dict, - response_t: Type[APIResponse], - **_kwargs) -> Response: + def get_submodel_submodel_elements_id_short_path( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: submodel_element = self._get_submodel_submodel_elements_id_short_path(url_args) return response_t(submodel_element, stripped=is_stripped_request(request)) - def get_submodel_submodel_elements_id_short_path_metadata(self, request: Request, url_args: Dict, - response_t: Type[APIResponse], **_kwargs) -> Response: + def get_submodel_submodel_elements_id_short_path_metadata( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: submodel_element = self._get_submodel_submodel_elements_id_short_path(url_args) if isinstance(submodel_element, model.Capability) or isinstance(submodel_element, model.Operation): raise BadRequest(f"{submodel_element.id_short} does not allow the content modifier metadata!") return response_t(submodel_element, stripped=True) - def get_submodel_submodel_elements_id_short_path_reference(self, request: Request, url_args: Dict, - response_t: Type[APIResponse], **_kwargs) -> Response: + def get_submodel_submodel_elements_id_short_path_reference( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: submodel_element = self._get_submodel_submodel_elements_id_short_path(url_args) reference = model.ModelReference.from_referable(submodel_element) return response_t(reference, stripped=is_stripped_request(request)) - def post_submodel_submodel_elements_id_short_path(self, request: Request, url_args: Dict, - response_t: Type[APIResponse], - map_adapter: MapAdapter): + def post_submodel_submodel_elements_id_short_path( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], map_adapter: MapAdapter + ): parent = self._get_submodel_or_nested_submodel_element(url_args) if not isinstance(parent, model.UniqueIdShortNamespace): raise BadRequest(f"{parent!r} is not a namespace, can't add child submodel element!") # TODO: remove the following type: ignore comment when mypy supports abstract types for Type[T] # see https://github.com/python/mypy/issues/5374 - new_submodel_element = HTTPApiDecoder.request_body(request, - model.SubmodelElement, # type: ignore[type-abstract] - is_stripped_request(request)) + new_submodel_element = HTTPApiDecoder.request_body( + request, model.SubmodelElement, is_stripped_request(request) # type: ignore[type-abstract] + ) try: parent.add_referable(new_submodel_element) except model.AASConstraintViolation as e: if e.constraint_id != 22: raise - raise Conflict(f"SubmodelElement with idShort {new_submodel_element.id_short} already exists " - f"within {parent}!") + raise Conflict( + f"SubmodelElement with idShort {new_submodel_element.id_short} already exists " f"within {parent}!" + ) submodel = self._get_submodel(url_args) id_short_path = url_args.get("id_shorts", []) - created_resource_url = map_adapter.build(self.get_submodel_submodel_elements_id_short_path, { - "submodel_id": submodel.id, - "id_shorts": id_short_path + [new_submodel_element.id_short] - }, force_external=True) + created_resource_url = map_adapter.build( + self.get_submodel_submodel_elements_id_short_path, + {"submodel_id": submodel.id, "id_shorts": id_short_path + [new_submodel_element.id_short]}, + force_external=True, + ) return response_t(new_submodel_element, status=201, headers={"Location": created_resource_url}) - def put_submodel_submodel_elements_id_short_path(self, request: Request, url_args: Dict, - response_t: Type[APIResponse], - **_kwargs) -> Response: + def put_submodel_submodel_elements_id_short_path( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: submodel_element = self._get_submodel_submodel_elements_id_short_path(url_args) # TODO: remove the following type: ignore comment when mypy supports abstract types for Type[T] # see https://github.com/python/mypy/issues/5374 - new_submodel_element = HTTPApiDecoder.request_body(request, - model.SubmodelElement, # type: ignore[type-abstract] - is_stripped_request(request)) + new_submodel_element = HTTPApiDecoder.request_body( + request, model.SubmodelElement, is_stripped_request(request) # type: ignore[type-abstract] + ) submodel_element.update_from(new_submodel_element) return response_t() - def delete_submodel_submodel_elements_id_short_path(self, request: Request, url_args: Dict, - response_t: Type[APIResponse], - **_kwargs) -> Response: + def delete_submodel_submodel_elements_id_short_path( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: sm_or_se = self._get_submodel_or_nested_submodel_element(url_args) parent: model.UniqueIdShortNamespace = self._expect_namespace(sm_or_se.parent, sm_or_se.id_short) self._namespace_submodel_element_op(parent, parent.remove_referable, sm_or_se.id_short) @@ -592,8 +803,9 @@ def get_submodel_submodel_element_attachment(self, request: Request, url_args: D # Blob and File both have the content_type attribute return Response(value, content_type=submodel_element.content_type) # type: ignore[attr-defined] - def put_submodel_submodel_element_attachment(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def put_submodel_submodel_element_attachment( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: submodel_element = self._get_submodel_submodel_elements_id_short_path(url_args) # spec allows PUT only for File, not for Blob @@ -602,26 +814,27 @@ def put_submodel_submodel_element_attachment(self, request: Request, url_args: D elif submodel_element.value is not None: raise Conflict(f"{submodel_element!r} already references a file!") - filename = request.form.get('fileName') + filename = request.form.get("fileName") if filename is None: raise BadRequest("No 'fileName' specified!") elif not filename.startswith("/"): raise BadRequest(f"Given 'fileName' doesn't start with a slash (/): {filename}") - file_storage: Optional[FileStorage] = request.files.get('file') + file_storage: Optional[FileStorage] = request.files.get("file") if file_storage is None: raise BadRequest("Missing file to upload") elif file_storage.mimetype != submodel_element.content_type: raise werkzeug.exceptions.UnsupportedMediaType( f"Request body is of type {file_storage.mimetype!r}, " - f"while {submodel_element!r} has content_type {submodel_element.content_type!r}!") + f"while {submodel_element!r} has content_type {submodel_element.content_type!r}!" + ) submodel_element.value = self.file_store.add_file(filename, file_storage.stream, submodel_element.content_type) return response_t() - def delete_submodel_submodel_element_attachment(self, request: Request, url_args: Dict, - response_t: Type[APIResponse], - **_kwargs) -> Response: + def delete_submodel_submodel_element_attachment( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: submodel_element = self._get_submodel_submodel_elements_id_short_path(url_args) if not isinstance(submodel_element, (model.Blob, model.File)): raise BadRequest(f"{submodel_element!r} is not a Blob or File, no file content to delete!") @@ -641,30 +854,37 @@ def delete_submodel_submodel_element_attachment(self, request: Request, url_args return response_t() - def get_submodel_submodel_element_qualifiers(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def get_submodel_submodel_element_qualifiers( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: sm_or_se = self._get_submodel_or_nested_submodel_element(url_args) qualifier_type = url_args.get("qualifier_type") if qualifier_type is None: return response_t(list(sm_or_se.qualifier)) return response_t(self._qualifiable_qualifier_op(sm_or_se, sm_or_se.get_qualifier_by_type, qualifier_type)) - def post_submodel_submodel_element_qualifiers(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - map_adapter: MapAdapter) -> Response: + def post_submodel_submodel_element_qualifiers( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], map_adapter: MapAdapter + ) -> Response: sm_or_se = self._get_submodel_or_nested_submodel_element(url_args) qualifier = HTTPApiDecoder.request_body(request, model.Qualifier, is_stripped_request(request)) if sm_or_se.qualifier.contains_id("type", qualifier.type): raise Conflict(f"Qualifier with type {qualifier.type} already exists!") sm_or_se.qualifier.add(qualifier) - created_resource_url = map_adapter.build(self.get_submodel_submodel_element_qualifiers, { - "submodel_id": url_args["submodel_id"], - "id_shorts": url_args.get("id_shorts") or None, - "qualifier_type": qualifier.type - }, force_external=True) + created_resource_url = map_adapter.build( + self.get_submodel_submodel_element_qualifiers, + { + "submodel_id": url_args["submodel_id"], + "id_shorts": url_args.get("id_shorts") or None, + "qualifier_type": qualifier.type, + }, + force_external=True, + ) return response_t(qualifier, status=201, headers={"Location": created_resource_url}) - def put_submodel_submodel_element_qualifiers(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - map_adapter: MapAdapter) -> Response: + def put_submodel_submodel_element_qualifiers( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], map_adapter: MapAdapter + ) -> Response: sm_or_se = self._get_submodel_or_nested_submodel_element(url_args) new_qualifier = HTTPApiDecoder.request_body(request, model.Qualifier, is_stripped_request(request)) qualifier_type = url_args["qualifier_type"] @@ -675,63 +895,79 @@ def put_submodel_submodel_element_qualifiers(self, request: Request, url_args: D sm_or_se.remove_qualifier_by_type(qualifier.type) sm_or_se.qualifier.add(new_qualifier) if qualifier_type_changed: - created_resource_url = map_adapter.build(self.get_submodel_submodel_element_qualifiers, { - "submodel_id": url_args["submodel_id"], - "id_shorts": url_args.get("id_shorts") or None, - "qualifier_type": new_qualifier.type - }, force_external=True) + created_resource_url = map_adapter.build( + self.get_submodel_submodel_element_qualifiers, + { + "submodel_id": url_args["submodel_id"], + "id_shorts": url_args.get("id_shorts") or None, + "qualifier_type": new_qualifier.type, + }, + force_external=True, + ) return response_t(new_qualifier, status=201, headers={"Location": created_resource_url}) return response_t(new_qualifier) - def delete_submodel_submodel_element_qualifiers(self, request: Request, url_args: Dict, - response_t: Type[APIResponse], - **_kwargs) -> Response: + def delete_submodel_submodel_element_qualifiers( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: sm_or_se = self._get_submodel_or_nested_submodel_element(url_args) qualifier_type = url_args["qualifier_type"] self._qualifiable_qualifier_op(sm_or_se, sm_or_se.remove_qualifier_by_type, qualifier_type) return response_t() # --------- CONCEPT DESCRIPTION ROUTES --------- - def get_concept_description_all(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def get_concept_description_all( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: concept_descriptions: Iterator[model.ConceptDescription] = self._get_all_obj_of_type(model.ConceptDescription) concept_descriptions, cursor = self._get_slice(request, concept_descriptions) return response_t(list(concept_descriptions), cursor=cursor, stripped=is_stripped_request(request)) - def post_concept_description(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - map_adapter: MapAdapter) -> Response: - concept_description = HTTPApiDecoder.request_body(request, model.ConceptDescription, - is_stripped_request(request)) + def post_concept_description( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], map_adapter: MapAdapter + ) -> Response: + concept_description = HTTPApiDecoder.request_body( + request, model.ConceptDescription, is_stripped_request(request) + ) try: self.object_store.add(concept_description) except KeyError as e: raise Conflict(f"ConceptDescription with Identifier {concept_description.id} already exists!") from e - created_resource_url = map_adapter.build(self.get_concept_description, { - "concept_id": concept_description.id - }, force_external=True) + created_resource_url = map_adapter.build( + self.get_concept_description, {"concept_id": concept_description.id}, force_external=True + ) return response_t(concept_description, status=201, headers={"Location": created_resource_url}) - def get_concept_description(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def get_concept_description( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: concept_description = self._get_concept_description(url_args) return response_t(concept_description, stripped=is_stripped_request(request)) - def put_concept_description(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def put_concept_description( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: concept_description = self._get_concept_description(url_args) - concept_description.update_from(HTTPApiDecoder.request_body(request, model.ConceptDescription, - is_stripped_request(request))) + concept_description.update_from( + HTTPApiDecoder.request_body(request, model.ConceptDescription, is_stripped_request(request)) + ) return response_t() - def delete_concept_description(self, request: Request, url_args: Dict, response_t: Type[APIResponse], - **_kwargs) -> Response: + def delete_concept_description( + self, request: Request, url_args: Dict, response_t: Type[APIResponse], **_kwargs + ) -> Response: self.object_store.remove(self._get_concept_description(url_args)) return response_t() if __name__ == "__main__": - from werkzeug.serving import run_simple from basyx.aas.examples.data.example_aas import create_full_example + from werkzeug.serving import run_simple - run_simple("localhost", 8080, WSGIApp(create_full_example(), aasx.DictSupplementaryFileContainer()), - use_debugger=True, use_reloader=True) + run_simple( + "localhost", + 8080, + WSGIApp(create_full_example(), aasx.DictSupplementaryFileContainer()), + use_debugger=True, + use_reloader=True, + ) diff --git a/server/app/model/__init__.py b/server/app/model/__init__.py new file mode 100644 index 00000000..684379e2 --- /dev/null +++ b/server/app/model/__init__.py @@ -0,0 +1,4 @@ +from .descriptor import * +from .endpoint import * +from .provider import * +from .service_specification import * diff --git a/server/app/model/descriptor.py b/server/app/model/descriptor.py new file mode 100644 index 00000000..13fdf8d0 --- /dev/null +++ b/server/app/model/descriptor.py @@ -0,0 +1,128 @@ +from __future__ import absolute_import + +import abc +from typing import Iterable, List, Optional + +from basyx.aas import model + +from app.model.endpoint import Endpoint + + +class Descriptor(model.HasExtension, metaclass=abc.ABCMeta): + @abc.abstractmethod + def __init__( + self, + description: Optional[model.MultiLanguageTextType] = None, + display_name: Optional[model.MultiLanguageNameType] = None, + extension: Iterable[model.Extension] = (), + ): + super().__init__() + self.description: Optional[model.MultiLanguageTextType] = description + self.display_name: Optional[model.MultiLanguageNameType] = display_name + self.extension = model.NamespaceSet(self, [("name", True)], extension) + + def commit(self): + pass + + def update(self): + pass + + def update_from(self, other: "Descriptor", update_source: bool = False): + """ + Updates the descriptor's attributes from another descriptor. + + :param other: The descriptor to update from. + :param update_source: Placeholder for compatibility; not used in this context. + """ + for attr in vars(other): + if attr == "id": + continue # Skip updating the unique identifier of the AAS + setattr(self, attr, getattr(other, attr)) + + +class SubmodelDescriptor(Descriptor): + + def __init__( + self, + id_: model.Identifier, + endpoints: List[Endpoint], + administration: Optional[model.AdministrativeInformation] = None, + id_short: Optional[model.NameType] = None, + semantic_id: Optional[model.Reference] = None, + supplemental_semantic_id: Iterable[model.Reference] = (), + ): + super().__init__() + self.id: model.Identifier = id_ + self.endpoints: List[Endpoint] = endpoints + self.administration: Optional[model.AdministrativeInformation] = administration + self.id_short: Optional[model.NameType] = id_short + self.semantic_id: Optional[model.Reference] = semantic_id + self.supplemental_semantic_id: model.ConstrainedList[model.Reference] = model.ConstrainedList( + supplemental_semantic_id + ) + + +class AssetAdministrationShellDescriptor(Descriptor): + + def __init__( + self, + id_: model.Identifier, + administration: Optional[model.AdministrativeInformation] = None, + asset_kind: Optional[model.AssetKind] = None, + asset_type: Optional[model.Identifier] = None, + endpoints: Optional[List[Endpoint]] = None, + global_asset_id: Optional[model.Identifier] = None, + id_short: Optional[model.NameType] = None, + specific_asset_id: Iterable[model.SpecificAssetId] = (), + submodel_descriptors: Optional[List[SubmodelDescriptor]] = None, + description: Optional[model.MultiLanguageTextType] = None, + display_name: Optional[model.MultiLanguageNameType] = None, + extension: Iterable[model.Extension] = (), + ): + """AssetAdministrationShellDescriptor - + + Nur das 'id'-Feld (id_) ist zwingend erforderlich. Alle anderen Felder erhalten Defaultwerte. + """ + super().__init__() + self.administration: Optional[model.AdministrativeInformation] = administration + self.asset_kind: Optional[model.AssetKind] = asset_kind + self.asset_type: Optional[model.Identifier] = asset_type + self.endpoints: Optional[List[Endpoint]] = ( + endpoints if endpoints is not None else [] + ) # leere Liste, falls nicht gesetzt + self.global_asset_id: Optional[model.Identifier] = global_asset_id + self.id_short: Optional[model.NameType] = id_short + self.id: model.Identifier = id_ + self._specific_asset_id: model.ConstrainedList[model.SpecificAssetId] = model.ConstrainedList( + specific_asset_id, + item_set_hook=self._check_constraint_set_spec_asset_id, + item_del_hook=self._check_constraint_del_spec_asset_id, + ) + self.submodel_descriptors = submodel_descriptors if submodel_descriptors is not None else [] + self.description: Optional[model.MultiLanguageTextType] = description + self.display_name: Optional[model.MultiLanguageNameType] = display_name + self.extension = model.NamespaceSet(self, [("name", True)], extension) + + @property + def specific_asset_id(self) -> model.ConstrainedList[model.SpecificAssetId]: + return self._specific_asset_id + + @specific_asset_id.setter + def specific_asset_id(self, specific_asset_id: Iterable[model.SpecificAssetId]) -> None: + # constraints are checked via _check_constraint_set_spec_asset_id() in this case + self._specific_asset_id[:] = specific_asset_id + + def _check_constraint_set_spec_asset_id( + self, + items_to_replace: List[model.SpecificAssetId], + new_items: List[model.SpecificAssetId], + old_list: List[model.SpecificAssetId], + ) -> None: + model.AssetInformation._validate_aasd_131( + self.global_asset_id, len(old_list) - len(items_to_replace) + len(new_items) > 0 + ) + + def _check_constraint_del_spec_asset_id( + self, _item_to_del: model.SpecificAssetId, old_list: List[model.SpecificAssetId] + ) -> None: + model.AssetInformation._validate_aasd_131(self.global_asset_id, len(old_list) > 1) diff --git a/server/app/model/endpoint.py b/server/app/model/endpoint.py new file mode 100644 index 00000000..06301e9a --- /dev/null +++ b/server/app/model/endpoint.py @@ -0,0 +1,117 @@ +from __future__ import absolute_import + +import re +from enum import Enum +from typing import List, Optional + +from basyx.aas.model import base + + +class AssetLink: + def __init__(self, name: base.LabelType, value: base.Identifier): + if not name: + raise ValueError("AssetLink 'name' must be a non-empty string.") + if not value: + raise ValueError("AssetLink 'value' must be a non-empty string.") + self.name = name + self.value = value + + +class SecurityTypeEnum(Enum): + NONE = "NONE" + RFC_TLSA = "RFC_TLSA" + W3C_DID = "W3C_DID" + + +class SecurityAttributeObject: + def __init__(self, type_: SecurityTypeEnum, key: str, value: str): + + if not isinstance(type_, SecurityTypeEnum): + raise ValueError(f"Invalid security type: {type_}. Must be one of {list(SecurityTypeEnum)}") + if not key or not isinstance(key, str): + raise ValueError("Key must be a non-empty string.") + if not value or not isinstance(value, str): + raise ValueError("Value must be a non-empty string.") + self.type = type_ + self.key = key + self.value = value + + +class ProtocolInformation: + + def __init__( + self, + href: str, + endpoint_protocol: Optional[str] = None, + endpoint_protocol_version: Optional[List[str]] = None, + subprotocol: Optional[str] = None, + subprotocol_body: Optional[str] = None, + subprotocol_body_encoding: Optional[str] = None, + security_attributes: Optional[List[SecurityAttributeObject]] = None, + ): + if not href or not isinstance(href, str): + raise ValueError("href must be a non-empty string representing a valid URL.") + + self.href = href + self.endpoint_protocol = endpoint_protocol + self.endpoint_protocol_version = endpoint_protocol_version or [] + self.subprotocol = subprotocol + self.subprotocol_body = subprotocol_body + self.subprotocol_body_encoding = subprotocol_body_encoding + self.security_attributes = security_attributes or [] + + +class Endpoint: + INTERFACE_SHORTNAMES = { + "AAS", + "SUBMODEL", + "SERIALIZE", + "AASX-FILE", + "AAS-REGISTRY", + "SUBMODEL-REGISTRY", + "AAS-REPOSITORY", + "SUBMODEL-REPOSITORY", + "CD-REPOSITORY", + "AAS-DISCOVERY", + } + VERSION_PATTERN = re.compile(r"^\d+(\.\d+)*$") + + def __init__(self, interface: base.NameType, protocol_information: ProtocolInformation): # noqa: E501 + + self.interface = interface + self.protocol_information = protocol_information + + @property + def interface(self) -> str: + return self._interface + + @interface.setter + def interface(self, interface: base.NameType): + if interface is None: + raise ValueError("Invalid value for `interface`, must not be `None`") + if not self.is_valid_interface(interface): + raise ValueError(f"Invalid interface format: {interface}. Expected format: '-', ") + + self._interface = interface + + @classmethod + def is_valid_interface(cls, interface: base.NameType) -> bool: + parts = interface.split("-", 1) + if len(parts) != 2: + return False + short_name, version = parts + if short_name in cls.INTERFACE_SHORTNAMES and cls.VERSION_PATTERN.match(version): + return True + else: + return False + + @property + def protocol_information(self) -> ProtocolInformation: + return self._protocol_information + + @protocol_information.setter + def protocol_information(self, protocol_information: ProtocolInformation): + if protocol_information is None: + raise ValueError("Invalid value for `protocol_information`, must not be `None`") # noqa: E501 + + self._protocol_information = protocol_information diff --git a/server/app/model/provider.py b/server/app/model/provider.py new file mode 100644 index 00000000..62c53462 --- /dev/null +++ b/server/app/model/provider.py @@ -0,0 +1,79 @@ +from pathlib import Path +from typing import IO, Dict, Iterable, Iterator, Union + +from basyx.aas import model +from basyx.aas.model import provider as sdk_provider + +from app.adapter import read_server_aas_json_file_into +from app.model import descriptor + +PathOrIO = Union[Path, IO] + + +_DESCRIPTOR_TYPE = Union[descriptor.AssetAdministrationShellDescriptor, descriptor.SubmodelDescriptor] +_DESCRIPTOR_CLASSES = (descriptor.AssetAdministrationShellDescriptor, descriptor.SubmodelDescriptor) + + +class DictDescriptorStore(sdk_provider.AbstractObjectStore[model.Identifier, _DESCRIPTOR_TYPE]): + """ + A local in-memory object store for :class:`~app.model.descriptor.Descriptor` objects, backed by a dict, mapping + :class:`~basyx.aas.model.base.Identifier` → :class:`~app.model.descriptor.Descriptor` + """ + + def __init__(self, descriptors: Iterable[_DESCRIPTOR_TYPE] = ()) -> None: + self._backend: Dict[model.Identifier, _DESCRIPTOR_TYPE] = {} + for x in descriptors: + self.add(x) + + def get_item(self, identifier: model.Identifier) -> _DESCRIPTOR_TYPE: + return self._backend[identifier] + + def add(self, x: _DESCRIPTOR_TYPE) -> None: + if x.id in self._backend and self._backend.get(x.id) is not x: + raise KeyError("Descriptor object with same id {} is already stored in this store".format(x.id)) + self._backend[x.id] = x + + def discard(self, x: _DESCRIPTOR_TYPE) -> None: + if self._backend.get(x.id) is x: + del self._backend[x.id] + + def __contains__(self, x: object) -> bool: + if isinstance(x, model.Identifier): + return x in self._backend + if not isinstance(x, _DESCRIPTOR_CLASSES): + return False + return self._backend.get(x.id) is x + + def __len__(self) -> int: + return len(self._backend) + + def __iter__(self) -> Iterator[_DESCRIPTOR_TYPE]: + return iter(self._backend.values()) + + +def load_directory(directory: Union[Path, str]) -> DictDescriptorStore: + """ + Create a new :class:`~basyx.aas.model.provider.DictIdentifiableStore` and use it to load Asset Administration Shell + and Submodel files in ``AASX``, ``JSON`` and ``XML`` format from a given directory into memory. Additionally, load + all embedded supplementary files into a new :class:`~basyx.aas.adapter.aasx.DictSupplementaryFileContainer`. + + :param directory: :class:`~pathlib.Path` or ``str`` pointing to the directory containing all Asset Administration + Shell and Submodel files to load + :return: Tuple consisting of a :class:`~basyx.aas.model.provider.DictIdentifiableStore` and a + :class:`~basyx.aas.adapter.aasx.DictSupplementaryFileContainer` containing all loaded data + """ + + dict_descriptor_store: DictDescriptorStore = DictDescriptorStore() + + directory = Path(directory) + + for file in directory.iterdir(): + if not file.is_file(): + continue + + suffix = file.suffix.lower() + if suffix == ".json": + with open(file) as f: + read_server_aas_json_file_into(dict_descriptor_store, f) + + return dict_descriptor_store diff --git a/server/app/model/service_specification.py b/server/app/model/service_specification.py new file mode 100644 index 00000000..ff042bbc --- /dev/null +++ b/server/app/model/service_specification.py @@ -0,0 +1,20 @@ +from enum import Enum +from typing import List + + +class ServiceSpecificationProfileEnum(str, Enum): + AAS_REGISTRY_FULL = "https://adminshell.io/aas/API/3/1/AssetAdministrationShellRegistryServiceSpecification/SSP-001" + AAS_REGISTRY_READ = "https://adminshell.io/aas/API/3/1/AssetAdministrationShellRegistryServiceSpecification/SSP-002" + SUBMODEL_REGISTRY_FULL = "https://adminshell.io/aas/API/3/1/SubmodelRegistryServiceSpecification/SSP-001" + SUBMODEL_REGISTRY_READ = "https://adminshell.io/aas/API/3/1/SubmodelRegistryServiceSpecification/SSP-002" + # TODO add other profiles + + +class ServiceDescription: + def __init__(self, profiles: List[ServiceSpecificationProfileEnum]): + if not profiles: + raise ValueError("At least one profile must be specified") + self.profiles = profiles + + def to_dict(self): + return {"profiles": [p.value for p in self.profiles]} diff --git a/server/app/services/run_discovery.py b/server/app/services/run_discovery.py new file mode 100644 index 00000000..7c47124c --- /dev/null +++ b/server/app/services/run_discovery.py @@ -0,0 +1,29 @@ +import atexit +import os + +from app.interfaces.discovery import DiscoveryAPI, DiscoveryStore + +storage_path = os.getenv("storage_path", None) +base_path = os.getenv("API_BASE_PATH") + +wsgi_optparams = {} + +if base_path is not None: + wsgi_optparams["base_path"] = base_path + + +# Load DiscoveryStore from disk, if `storage_path` is set +if storage_path: + discovery_store: DiscoveryStore = DiscoveryStore.from_file(storage_path) +else: + discovery_store = DiscoveryStore() + + +def persist_store(): + if storage_path: + discovery_store.to_file(storage_path) + + +atexit.register(persist_store) + +application = DiscoveryAPI(discovery_store, **wsgi_optparams) diff --git a/server/app/services/run_registry.py b/server/app/services/run_registry.py new file mode 100644 index 00000000..666109d7 --- /dev/null +++ b/server/app/services/run_registry.py @@ -0,0 +1,113 @@ +# Copyright (c) 2026 the Eclipse BaSyx Authors +# +# This program and the accompanying materials are made available under the terms of the MIT License, available in +# the LICENSE file of this project. +# +# SPDX-License-Identifier: MIT +""" +This module provides the WSGI entry point for the Asset Administration Shell Registry Server. +""" + +import logging +import os +from typing import Union + +from app.backend import LocalFileDescriptorStore +from app.interfaces.registry import RegistryAPI +from app.model import DictDescriptorStore, load_directory + +# -------- Helper methods -------- + + +def setup_logger() -> logging.Logger: + """ + Configure a custom :class:`~logging.Logger` for the start-up sequence of the server. + + :return: Configured :class:`~logging.Logger` + """ + + logger = logging.getLogger(__name__) + if not logger.handlers: + logger.setLevel(logging.INFO) + handler = logging.StreamHandler() + handler.setLevel(logging.INFO) + handler.setFormatter(logging.Formatter("%(levelname)s [Server Start-up] %(message)s")) + logger.addHandler(handler) + logger.propagate = False + return logger + + +def build_storage( + env_input: str, env_storage: str, env_storage_persistency: bool, env_storage_overwrite: bool, logger: logging.Logger +) -> Union[DictDescriptorStore, LocalFileDescriptorStore]: + """ + Configure the server's storage according to the given start-up settings. + + :param env_input: ``str`` pointing to the input directory of the server + :param env_storage: ``str`` pointing to the :class:`~basyx.aas.backend.local_file.LocalFileIdentifiableStore` + storage directory of the server if persistent storage is enabled + :param env_storage_persistency: Flag to enable persistent storage + :param env_storage_overwrite: Flag to overwrite existing :class:`Identifiables ` + in the :class:`~basyx.aas.backend.local_file.LocalFileIdentifiableStore` if persistent storage is enabled + :param logger: :class:`~logging.Logger` used for start-up diagnostics + :return: Tuple consisting of a :class:`~basyx.aas.model.provider.DictIdentifiableStore` if persistent storage is + disabled or a :class:`~basyx.aas.backend.local_file.LocalFileIdentifiableStore` if persistent storage is + enabled and a :class:`~basyx.aas.adapter.aasx.DictSupplementaryFileContainer` as storage for + :class:`~interfaces.repository.WSGIApp` + """ + + if env_storage_persistency: + storage_files = LocalFileDescriptorStore(env_storage) + storage_files.check_directory(create=True) + if os.path.isdir(env_input): + input_files = load_directory(env_input) + added, overwritten, skipped = storage_files.sync(input_files, env_storage_overwrite) + logger.info('Loaded %d descriptors(s) from "%s"', len(input_files), env_input) + logger.info( + "Synced INPUT to STORAGE with %d added and %d %s", + added, + overwritten if env_storage_overwrite else skipped, + "overwritten" if env_storage_overwrite else "skipped", + ) + return storage_files + else: + logger.warning('INPUT directory "%s" not found, starting empty', env_input) + return storage_files + + if os.path.isdir(env_input): + input_files = load_directory(env_input) + logger.info('Loaded %d descriptors(s) from "%s"', len(input_files), env_input) + return input_files + else: + logger.warning('INPUT directory "%s" not found, starting empty', env_input) + return DictDescriptorStore() + + +# -------- WSGI entrypoint -------- + +logger = setup_logger() + +env_input = os.getenv("INPUT", "/input") +env_storage = os.getenv("STORAGE", "/storage") +env_storage_persistency = os.getenv("STORAGE_PERSISTENCY", "false").lower() in {"1", "true", "yes"} +env_storage_overwrite = os.getenv("STORAGE_OVERWRITE", "false").lower() in {"1", "true", "yes"} +env_api_base_path = os.getenv("API_BASE_PATH") + +wsgi_optparams = {"base_path": env_api_base_path} if env_api_base_path else {} + +logger.info( + 'Loaded settings API_BASE_PATH="%s", INPUT="%s", STORAGE="%s", PERSISTENCY=%s, OVERWRITE=%s', + env_api_base_path or "", + env_input, + env_storage, + env_storage_persistency, + env_storage_overwrite, +) + +storage_files = build_storage(env_input, env_storage, env_storage_persistency, env_storage_overwrite, logger) + +application = RegistryAPI(storage_files, **wsgi_optparams) + + +if __name__ == "__main__": + logger.info("WSGI entrypoint created. Serve this module with uWSGI/Gunicorn/etc.") diff --git a/server/app/services/run_repository.py b/server/app/services/run_repository.py index 478e4d21..418e12dd 100644 --- a/server/app/services/run_repository.py +++ b/server/app/services/run_repository.py @@ -10,16 +10,18 @@ import logging import os +from typing import Tuple, Union + from basyx.aas.adapter import load_directory from basyx.aas.adapter.aasx import DictSupplementaryFileContainer from basyx.aas.backend.local_file import LocalFileIdentifiableStore from basyx.aas.model.provider import DictIdentifiableStore -from app.interfaces.repository import WSGIApp -from typing import Tuple, Union +from app.interfaces.repository import WSGIApp # -------- Helper methods -------- + def setup_logger() -> logging.Logger: """ Configure a custom :class:`~logging.Logger` for the start-up sequence of the server. @@ -39,11 +41,7 @@ def setup_logger() -> logging.Logger: def build_storage( - env_input: str, - env_storage: str, - env_storage_persistency: bool, - env_storage_overwrite: bool, - logger: logging.Logger + env_input: str, env_storage: str, env_storage_persistency: bool, env_storage_overwrite: bool, logger: logging.Logger ) -> Tuple[Union[DictIdentifiableStore, LocalFileIdentifiableStore], DictSupplementaryFileContainer]: """ Configure the server's storage according to the given start-up settings. @@ -68,29 +66,33 @@ def build_storage( input_files, input_supp_files = load_directory(env_input) added, overwritten, skipped = storage_files.sync(input_files, env_storage_overwrite) logger.info( - "Loaded %d identifiable(s) and %d supplementary file(s) from \"%s\"", - len(input_files), len(input_supp_files), env_input + 'Loaded %d identifiable(s) and %d supplementary file(s) from "%s"', + len(input_files), + len(input_supp_files), + env_input, ) logger.info( "Synced INPUT to STORAGE with %d added and %d %s", added, overwritten if env_storage_overwrite else skipped, - "overwritten" if env_storage_overwrite else "skipped" + "overwritten" if env_storage_overwrite else "skipped", ) return storage_files, input_supp_files else: - logger.warning("INPUT directory \"%s\" not found, starting empty", env_input) + logger.warning('INPUT directory "%s" not found, starting empty', env_input) return storage_files, DictSupplementaryFileContainer() if os.path.isdir(env_input): input_files, input_supp_files = load_directory(env_input) logger.info( - "Loaded %d identifiable(s) and %d supplementary file(s) from \"%s\"", - len(input_files), len(input_supp_files), env_input + 'Loaded %d identifiable(s) and %d supplementary file(s) from "%s"', + len(input_files), + len(input_supp_files), + env_input, ) return input_files, input_supp_files else: - logger.warning("INPUT directory \"%s\" not found, starting empty", env_input) + logger.warning('INPUT directory "%s" not found, starting empty', env_input) return DictIdentifiableStore(), DictSupplementaryFileContainer() @@ -107,16 +109,16 @@ def build_storage( wsgi_optparams = {"base_path": env_api_base_path} if env_api_base_path else {} logger.info( - "Loaded settings API_BASE_PATH=\"%s\", INPUT=\"%s\", STORAGE=\"%s\", PERSISTENCY=%s, OVERWRITE=%s", - env_api_base_path or "", env_input, env_storage, env_storage_persistency, env_storage_overwrite -) - -storage_files, supp_files = build_storage( + 'Loaded settings API_BASE_PATH="%s", INPUT="%s", STORAGE="%s", PERSISTENCY=%s, OVERWRITE=%s', + env_api_base_path or "", env_input, env_storage, env_storage_persistency, env_storage_overwrite, - logger +) + +storage_files, supp_files = build_storage( + env_input, env_storage, env_storage_persistency, env_storage_overwrite, logger ) application = WSGIApp(storage_files, supp_files, **wsgi_optparams) diff --git a/server/app/util/converters.py b/server/app/util/converters.py index 4e37c470..3b98cd5d 100644 --- a/server/app/util/converters.py +++ b/server/app/util/converters.py @@ -1,4 +1,4 @@ -# Copyright (c) 2025 the Eclipse BaSyx Authors +# Copyright (c) 2026 the Eclipse BaSyx Authors # # This program and the accompanying materials are made available under the terms of the MIT License, available in # the LICENSE file of this project. @@ -13,14 +13,12 @@ import base64 import binascii +from typing import List import werkzeug.routing import werkzeug.utils -from werkzeug.exceptions import BadRequest - from basyx.aas import model - -from typing import List +from werkzeug.exceptions import BadRequest BASE64URL_ENCODING = "utf-8" @@ -46,9 +44,10 @@ def base64url_encode(data: str) -> str: class IdentifierToBase64URLConverter(werkzeug.routing.UnicodeConverter): """ - A custom URL converter for Werkzeug routing that encodes and decodes - Identifiers using Base64 URL-safe encoding. + A custom URL converter for Werkzeug routing that encodes and decodes + Identifiers using Base64 URL-safe encoding. """ + def to_url(self, value: model.Identifier) -> str: return super().to_url(base64url_encode(value)) @@ -60,12 +59,12 @@ def to_python(self, value: str) -> model.Identifier: class IdShortPathConverter(werkzeug.routing.UnicodeConverter): """ - A custom Werkzeug URL converter for handling dot-separated idShort paths and indexes. + A custom Werkzeug URL converter for handling dot-separated idShort paths and indexes. - This converter joins a list of idShort strings into an id_short_sep-separated path for URLs - (e.g., ["submodel", "element", "1"] -> "submodel.element[1]") and parses incoming URL paths - back into a list, validating each idShort. - """ + This converter joins a list of idShort strings into an id_short_sep-separated path for URLs + (e.g., ["submodel", "element", "1"] -> "submodel.element[1]") and parses incoming URL paths + back into a list, validating each idShort. + """ def to_url(self, value: List[str]) -> str: id_short_path = model.Referable.build_id_short_path(value) diff --git a/server/docker/repository/entrypoint.sh b/server/docker/common/entrypoint.sh similarity index 100% rename from server/docker/repository/entrypoint.sh rename to server/docker/common/entrypoint.sh diff --git a/server/docker/discovery/Dockerfile b/server/docker/discovery/Dockerfile new file mode 100644 index 00000000..8bc377e1 --- /dev/null +++ b/server/docker/discovery/Dockerfile @@ -0,0 +1,50 @@ +FROM python:3.11-alpine + +LABEL org.label-schema.name="Eclipse BaSyx" \ + org.label-schema.version="1.0" \ + org.label-schema.description="Docker image for the basyx-python-sdk discovery server application" \ + org.label-schema.maintainer="Eclipse BaSyx" + +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 + +# If we have more dependencies for the server it would make sense +# to refactor uswgi to the pyproject.toml +RUN apk update && \ + apk add --no-cache nginx supervisor gcc musl-dev linux-headers python3-dev git bash && \ + pip install uwsgi && \ + apk del git bash + +COPY ./sdk /sdk +COPY ./server/app /server/app +COPY ./server/pyproject.toml /server/pyproject.toml +COPY ./server/docker/discovery/uwsgi.ini /etc/uwsgi/ +COPY ./server/docker/common/supervisord.ini /etc/supervisor/conf.d/supervisord.ini +COPY ./server/docker/common/stop-supervisor.sh /etc/supervisor/stop-supervisor.sh +RUN chmod +x /etc/supervisor/stop-supervisor.sh + +# Makes it possible to use a different configuration +ENV UWSGI_INI=/etc/uwsgi/uwsgi.ini +# object stores aren't thread-safe yet +# https://github.com/eclipse-basyx/basyx-python-sdk/issues/205 +ENV UWSGI_CHEAPER=0 +ENV UWSGI_PROCESSES=1 +ENV NGINX_MAX_UPLOAD=1M +ENV NGINX_WORKER_PROCESSES=1 +ENV LISTEN_PORT=80 +ENV CLIENT_BODY_BUFFER_SIZE=1M +ENV API_BASE_PATH=/api/v3.1.1/ + +# Copy the entrypoint that will generate Nginx additional configs +COPY server/docker/common/entrypoint.sh /entrypoint.sh +RUN chmod +x /entrypoint.sh + +ENTRYPOINT ["/entrypoint.sh"] + +ENV SETUPTOOLS_SCM_PRETEND_VERSION=1.0.0 + +WORKDIR /server/app +RUN pip install ../../sdk +RUN pip install .. + +CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/conf.d/supervisord.ini"] \ No newline at end of file diff --git a/server/docker/discovery/uwsgi.ini b/server/docker/discovery/uwsgi.ini new file mode 100644 index 00000000..250ea4b2 --- /dev/null +++ b/server/docker/discovery/uwsgi.ini @@ -0,0 +1,9 @@ +[uwsgi] +wsgi-file = /server/app/services/run_discovery.py +socket = /tmp/uwsgi.sock +chown-socket = nginx:nginx +chmod-socket = 664 +hook-master-start = unix_signal:15 gracefully_kill_them_all +need-app = true +die-on-term = true +show-config = false diff --git a/server/docker/registry/Dockerfile b/server/docker/registry/Dockerfile new file mode 100644 index 00000000..e9d71600 --- /dev/null +++ b/server/docker/registry/Dockerfile @@ -0,0 +1,58 @@ +FROM python:3.11-alpine + +LABEL org.label-schema.name="Eclipse BaSyx" \ + org.label-schema.version="1.0" \ + org.label-schema.description="Docker image for the basyx-python-sdk registry server application" \ + org.label-schema.maintainer="Eclipse BaSyx" + +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 + +# If we have more dependencies for the server it would make sense +# to refactor uswgi to the pyproject.toml +RUN apk update && \ + apk add --no-cache nginx supervisor gcc musl-dev linux-headers python3-dev git bash && \ + pip install uwsgi && \ + apk del git bash + + +COPY ./sdk /sdk +COPY ./server/app /server/app +COPY ./server/pyproject.toml /server/pyproject.toml +COPY ./server/docker/registry/uwsgi.ini /etc/uwsgi/ +COPY ./server/docker/common/supervisord.ini /etc/supervisor/conf.d/supervisord.ini +COPY ./server/docker/common/stop-supervisor.sh /etc/supervisor/stop-supervisor.sh +RUN chmod +x /etc/supervisor/stop-supervisor.sh + +# Makes it possible to use a different configuration +ENV UWSGI_INI=/etc/uwsgi/uwsgi.ini +# object stores aren't thread-safe yet +# https://github.com/eclipse-basyx/basyx-python-sdk/issues/205 +ENV UWSGI_CHEAPER=0 +ENV UWSGI_PROCESSES=1 +ENV NGINX_MAX_UPLOAD=1M +ENV NGINX_WORKER_PROCESSES=1 +ENV LISTEN_PORT=80 +ENV CLIENT_BODY_BUFFER_SIZE=1M +ENV API_BASE_PATH=/api/v3.1.1/ + +# Default values for the storage envs +ENV INPUT=/input +ENV STORAGE=/storage +ENV STORAGE_PERSISTENCY=False +ENV STORAGE_OVERWRITE=False +VOLUME ["/input", "/storage"] + +# Copy the entrypoint that will generate Nginx additional configs +COPY server/docker/common/entrypoint.sh /entrypoint.sh +RUN chmod +x /entrypoint.sh + +ENTRYPOINT ["/entrypoint.sh"] + +ENV SETUPTOOLS_SCM_PRETEND_VERSION=1.0.0 + +WORKDIR /server/app +RUN pip install ../../sdk +RUN pip install .. + +CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/conf.d/supervisord.ini"] \ No newline at end of file diff --git a/server/docker/registry/uwsgi.ini b/server/docker/registry/uwsgi.ini new file mode 100644 index 00000000..1ede39c3 --- /dev/null +++ b/server/docker/registry/uwsgi.ini @@ -0,0 +1,10 @@ +[uwsgi] +wsgi-file = /server/app/services/run_registry.py +socket = /tmp/uwsgi.sock +chown-socket = nginx:nginx +chmod-socket = 664 +hook-master-start = unix_signal:15 gracefully_kill_them_all +need-app = true +die-on-term = true +show-config = false +logto = /tmp/uwsgi.log diff --git a/server/docker/repository/Dockerfile b/server/docker/repository/Dockerfile index eed1c1ab..bf701c80 100644 --- a/server/docker/repository/Dockerfile +++ b/server/docker/repository/Dockerfile @@ -45,7 +45,7 @@ ENV STORAGE_OVERWRITE=False VOLUME ["/input", "/storage"] # Copy the entrypoint that will generate Nginx additional configs -COPY server/docker/repository/entrypoint.sh /entrypoint.sh +COPY server/docker/common/entrypoint.sh /entrypoint.sh RUN chmod +x /entrypoint.sh ENTRYPOINT ["/entrypoint.sh"] diff --git a/server/example_configurations/discovery_standalone/README.md b/server/example_configurations/discovery_standalone/README.md new file mode 100644 index 00000000..45dfde79 --- /dev/null +++ b/server/example_configurations/discovery_standalone/README.md @@ -0,0 +1,48 @@ +# Eclipse BaSyx Python SDK - Discovery Service + +This is a Python-based implementation of the **BaSyx Asset Administration Shell (AAS) Discovery Service**. +It provides basic discovery functionality for AAS IDs and their corresponding assets, as specified in the official [Discovery Service Specification v3.1.0_SSP-001](https://app.swaggerhub.com/apis/Plattform_i40/DiscoveryServiceSpecification/V3.1.0_SSP-001). + +## Overview + +The Discovery Service stores and retrieves relations between AAS identifiers and asset identifiers. It acts as a lookup service for resolving asset-related queries to corresponding AAS. + +## Features + +| Function | Description | Example URL | +|------------------------------------------|----------------------------------------------------------|-----------------------------------------------------------------------| +| **search_all_aas_ids_by_asset_link** | Find AAS identifiers by providing asset link values | `POST http://localhost:8084/api/v3.0/lookup/shellsByAssetLink` | +| **get_all_specific_asset_ids_by_aas_id** | Return specific asset ids associated with an AAS ID | `GET http://localhost:8084/api/v3.0/lookup/shells/{aasIdentifier}` | +| **post_all_asset_links_by_id** | Register specific asset ids linked to an AAS | `POST http://localhost:8084/api/v3.0/lookup/shells/{aasIdentifier}` | +| **delete_all_asset_links_by_id** | Delete all asset links associated with a specific AAS ID | `DELETE http://localhost:8084/api/v3.0/lookup/shells/{aasIdentifier}` | +| + +## Configuration +Add discovery_store as directory +The service can be configured to use either: + +- **In-memory storage** (default): Temporary data storage that resets on service restart. +- **MongoDB storage**: Persistent backend storage using MongoDB. + +### Configuration via Environment Variables + +| Variable | Description | Default | +|------------------|--------------------------------------------|-----------------------------| +| `STORAGE_TYPE` | `inmemory` or `mongodb` | `inmemory` | +| `MONGODB_URI` | MongoDB connection URI | `mongodb://localhost:27017` | +| `MONGODB_DBNAME` | Name of the MongoDB database | `basyx_registry` | + +## Deployment via Docker + +A `Dockerfile` and `docker-compose.yml` are provided for simple deployment. +The container image can be built and run via: +```bash +docker compose up --build +``` +## Test + +Examples of asset links and specific asset IDs for testing purposes are provided as JSON files in the [storage](./storage) folder. + +## Acknowledgments + +This Dockerfile is inspired by the [tiangolo/uwsgi-nginx-docker](https://github.com/tiangolo/uwsgi-nginx-docker) repository. diff --git a/server/example_configurations/discovery_standalone/compose.yml b/server/example_configurations/discovery_standalone/compose.yml new file mode 100644 index 00000000..27b9309e --- /dev/null +++ b/server/example_configurations/discovery_standalone/compose.yml @@ -0,0 +1,12 @@ +name: basyx-python-server +services: + app: + build: + context: ../../.. + dockerfile: server/docker/discovery/Dockerfile + ports: + - "8084:80" + #environment: + #- storage_path=/discovery_store.json + #volumes: + # - ./discovery_store.json:/discovery_store.json diff --git a/server/example_configurations/registry_standalone/README.md b/server/example_configurations/registry_standalone/README.md new file mode 100644 index 00000000..63d6dd8e --- /dev/null +++ b/server/example_configurations/registry_standalone/README.md @@ -0,0 +1,54 @@ +# Eclipse BaSyx Python SDK - Registry Service + +This is a Python-based implementation of the **Asset Administration Shell (AAS) Registry Service**. +It provides all registry functionality for AAS and submodels descriptors, as specified in the official [Asset Administration Shell Registry Service Specification v3.1.1_SSP-001](https://app.swaggerhub.com/apis/Plattform_i40/AssetAdministrationShellRegistryServiceSpecification/V3.1.1_SSP-001) and [Submodel Registry Service Specification v3.1.1_SSP-001](https://app.swaggerhub.com/apis/Plattform_i40/SubmodelRegistryServiceSpecification/V3.1.1_SSP-001). + +## Overview + +The Registry Service provides the endpoint for a given AAS-ID or Submodel-ID. Such an endpoint for an AAS and the related Submodel-IDs make the AAS and the submodels with their submodelElements accessible. + + + +## Features +# AAS Registry: +| Function | Description | Example URL | +|--------------------------------------------------|----------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------| +| **GetAllAssetAdministrationShellDescriptors** | Return all AAS descriptor | `GET http://localhost:8083/api/v3.1.1/shell-descriptors` | +| **GetAssetAdministrationShellDescriptorById** | Return a specific AAS descriptor | `GET http://localhost:8083/api/v3.1.1/shell-descriptors/{aasIdentifier}` | +| **PostAssetAdministrationShellDescriptor** | Register/create a new AAS descriptor | `POST http://localhost:8083/api/v3.1.1/shell-descriptors` | +| **PutAssetAdministrationShellDescriptorById** | Create or update an existing AAS descriptor | `PUT http://localhost:8083/api/v3.1.1/shell-descriptors/{aasIdentifier}` | +| **DeleteAssetAdministrationShellDescriptorById** | Delete an AAS descriptor by ID | `DELETE http://localhost:8083/api/v3.1.1/shell-descriptors/{aasIdentifier}` | +| **GetSubmodelDescriptorsThroughSuperPath** | Return all submodel descriptors under AAS descriptor | `GET http://localhost:8083/api/v3.1.1/shell-descriptors/{aasIdentifier}/submodel-descriptors` | +| **PostSubmodelDescriptorThroughSuperPath** | Register/create a new submodel descriptor under AAS descriptor | `POST http://localhost:8083/api/v3.1.1/shell-descriptors/{aasIdentifier}/submodel-descriptors` | +| **GetSubmodelDescriptorThroughSuperPath** | Return a specific submodel descriptor under AAS descriptor | `GET http://localhost:8083/api/v3.1.1/shell-descriptors/{aasIdentifier}/submodel-descriptors/{submodelIdentifier}` | +| **PutSubmodelDescriptorThroughSuperPath** | Create or update a specific submodel descriptor under AAS descriptor | `PUT http://localhost:8083/api/v3.1.1/shell-descriptors/{aasIdentifier}/submodel-descriptors/{submodelIdentifier}` | +| **DeleteSubmodelDescriptorThroughSuperPath** | Delete a specific submodel descriptor under AAS descriptor | `DELETE http://localhost:8083/api/v3.1.1/shell-descriptors/{aasIdentifier}/submodel-descriptors/{submodelIdentifier}` | +| **GetDescription** | Return the self‑description of the AAS registry service | `GET http://localhost:8083/api/v3.1.1/description` | + +# Submodel Registry: +| Function | Description | Example URL | +|----------------------------------|--------------------------------------------------------------|-----------------------------------------------------------------------------------| +| **GetAllSubmodelDescriptors** | Return all submodel descriptors | `GET http://localhost:8083/api/v3.0/submodel-descriptors` | +| **PostSubmodelDescriptor** | Register/create a new submodel descriptor | `POST http://localhost:8083/api/v3.0/submodel-descriptors` | +| **GetSubmodelDescriptorById** | Return a specific submodel descriptor | `GET http://localhost:8083/api/v3.0/submodel-descriptors/{submodelIdentifier}` | +| **PutSubmodelDescriptorById** | Create or update a specific submodel descriptor | `PUT http://localhost:8083/api/v3.0/submodel-descriptors/{submodelIdentifier}` | +| **DeleteSubmodelDescriptorById** | Delete a specific submodel descriptor | `DELETE http://localhost:8083/api/v3.0/submodel-descriptors/{submodelIdentifier}` | +| **GetDescription** | Return the self‑description of the submodel registry service | `GET http://localhost:8083/api/v3.0/description` | + + + +## Configuration + +This example Docker compose configuration starts a registry server. + +The container image can also be built and run via: +``` +$ docker compose up +``` + +Input files are read from `./input` and stored persistently under `./storage` on your host system. +The server can be accessed at http://localhost:8083/api/v3.1.1/ from your host system. +To get a different setup, the `compose.yaml` file can be adapted using the options described in the main server [README.md](../../README.md#options). + +Note that the `Dockerfile` has to be specified explicitly via `dockerfile: server/docker/repository/Dockerfile`, as the build context must be set to the repository root to allow access to the local `/sdk`. + diff --git a/server/example_configurations/registry_standalone/compose.yml b/server/example_configurations/registry_standalone/compose.yml new file mode 100644 index 00000000..f7ac9223 --- /dev/null +++ b/server/example_configurations/registry_standalone/compose.yml @@ -0,0 +1,12 @@ +services: + app: + build: + context: ../../.. + dockerfile: server/docker/registry/Dockerfile + ports: + - "8083:80" + volumes: + - ./input:/input + - ./storage:/storage + environment: + STORAGE_PERSISTENCY: True \ No newline at end of file diff --git a/server/test/backend/__init__.py b/server/test/backend/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/server/test/backend/test_local_file.py b/server/test/backend/test_local_file.py new file mode 100644 index 00000000..aeafb85c --- /dev/null +++ b/server/test/backend/test_local_file.py @@ -0,0 +1,96 @@ +# Copyright (c) 2026 the Eclipse BaSyx Authors +# +# This program and the accompanying materials are made available under the terms of the MIT License, available in +# the LICENSE file of this project. +# +# SPDX-License-Identifier: MIT +import os.path +import shutil +from unittest import TestCase + +from app import model +from app.backend import local_file +from app.model import provider + +store_path: str = os.path.dirname(__file__) + "/local_file_test_folder" +source_core: str = "file://localhost/{}/".format(store_path) + + +class LocalFileBackendTest(TestCase): + def setUp(self) -> None: + self.descriptor_store = local_file.LocalFileDescriptorStore(store_path) + self.descriptor_store.check_directory(create=True) + self.mock_endpoint = model.Endpoint( + interface="AAS-3.0", protocol_information=model.ProtocolInformation(href="https://example.org/") + ) + self.aasd1 = model.AssetAdministrationShellDescriptor( + id_="https://example.org/AASDescriptor/1", endpoints=[self.mock_endpoint] + ) + self.aasd2 = model.AssetAdministrationShellDescriptor( + id_="https://example.org/AASDescriptor/2", endpoints=[self.mock_endpoint] + ) + self.sd1 = model.SubmodelDescriptor( + id_="https://example.org/SubmodelDescriptor/1", endpoints=[self.mock_endpoint] + ) + self.sd2 = model.SubmodelDescriptor( + id_="https://example.org/SubmodelDescriptor/2", endpoints=[self.mock_endpoint] + ) + + def tearDown(self) -> None: + try: + self.descriptor_store.clear() + finally: + shutil.rmtree(store_path) + + def test_add(self) -> None: + self.descriptor_store.add(self.aasd1) + # Note that this test is only checking that there are no errors during adding. + # The actual logic is tested together with retrieval in `test_retrieval`. + + def test_retrieval(self) -> None: + self.descriptor_store.add(self.sd1) + + # When retrieving the object, we should get the *same* instance as we added + retrieved_descriptor = self.descriptor_store.get_item("https://example.org/SubmodelDescriptor/1") + self.assertIs(self.sd1, retrieved_descriptor) + + def test_iterating(self) -> None: + self.descriptor_store.add(self.sd1) + self.descriptor_store.add(self.sd2) + self.descriptor_store.add(self.aasd1) + self.descriptor_store.add(self.aasd2) + self.assertEqual(4, len(self.descriptor_store)) + + # Iterate objects, add them to a DictDescriptorStore and check them + retrieved_descriptor_store = provider.DictDescriptorStore() + for item in self.descriptor_store: + retrieved_descriptor_store.add(item) + self.assertEqual(4, len(retrieved_descriptor_store)) + self.assertIn(self.sd1, retrieved_descriptor_store) + self.assertIn(self.sd2, retrieved_descriptor_store) + self.assertIn(self.aasd1, retrieved_descriptor_store) + self.assertIn(self.aasd2, retrieved_descriptor_store) + + def test_key_errors(self) -> None: + self.descriptor_store.add(self.aasd1) + with self.assertRaises(KeyError) as cm: + self.descriptor_store.add(self.aasd1) + self.assertEqual( + "'Descriptor with id https://example.org/AASDescriptor/1 already exists in " "local file database'", + str(cm.exception), + ) + + self.descriptor_store.discard(self.aasd1) + with self.assertRaises(KeyError) as cm: + self.descriptor_store.get_item("https://example.org/AASDescriptor/1") + self.assertIsNone(self.descriptor_store.get("https://example.org/AASDescriptor/1")) + self.assertEqual( + "'No Identifiable with id https://example.org/AASDescriptor/1 found in local " "file database'", + str(cm.exception), + ) + + def test_reload_discard(self) -> None: + self.descriptor_store.add(self.sd1) + self.descriptor_store = local_file.LocalFileDescriptorStore(store_path) + self.descriptor_store.discard(self.sd1) + self.assertNotIn(self.sd1, self.descriptor_store) diff --git a/server/test/interfaces/test_repository.py b/server/test/interfaces/test_repository.py index 5cf421a5..01f3bd61 100644 --- a/server/test/interfaces/test_repository.py +++ b/server/test/interfaces/test_repository.py @@ -25,19 +25,18 @@ # TODO: add id_short format to schemata import os -import random import pathlib +import random import urllib.parse +from typing import Set -import schemathesis import hypothesis.strategies - +import schemathesis from basyx.aas import model from basyx.aas.adapter.aasx import DictSupplementaryFileContainer -from app.interfaces.repository import WSGIApp from basyx.aas.examples.data.example_aas import create_full_example -from typing import Set +from app.interfaces.repository import WSGIApp def _encode_and_quote(identifier: model.Identifier) -> str: @@ -63,7 +62,7 @@ def _check_transformed(response, case): # disable the filter_too_much health check, which triggers if a strategy filters too much data, raising an error suppress_health_check=[hypothesis.HealthCheck.filter_too_much], # disable data generation deadlines, which would result in an error if data generation takes too much time - deadline=None + deadline=None, ) BASE_URL = "/api/v1" @@ -82,11 +81,15 @@ def _check_transformed(response, case): IDENTIFIER_SUBMODEL.add(_encode_and_quote(obj.id)) # load aas and submodel api specs -AAS_SCHEMA = schemathesis.from_path(pathlib.Path(__file__).parent / "http-api-oas-aas.yaml", - app=WSGIApp(create_full_example(), DictSupplementaryFileContainer())) +AAS_SCHEMA = schemathesis.from_path( + pathlib.Path(__file__).parent / "http-api-oas-aas.yaml", + app=WSGIApp(create_full_example(), DictSupplementaryFileContainer()), +) -SUBMODEL_SCHEMA = schemathesis.from_path(pathlib.Path(__file__).parent / "http-api-oas-submodel.yaml", - app=WSGIApp(create_full_example(), DictSupplementaryFileContainer())) +SUBMODEL_SCHEMA = schemathesis.from_path( + pathlib.Path(__file__).parent / "http-api-oas-submodel.yaml", + app=WSGIApp(create_full_example(), DictSupplementaryFileContainer()), +) class APIWorkflowAAS(AAS_SCHEMA.as_state_machine()): # type: ignore diff --git a/server/test/model/__init__.py b/server/test/model/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/server/test/model/test_provider.py b/server/test/model/test_provider.py new file mode 100644 index 00000000..ee3b810d --- /dev/null +++ b/server/test/model/test_provider.py @@ -0,0 +1,59 @@ +import unittest + +from app import model +from app.model.provider import DictDescriptorStore + + +class DictDescriptorStoreTest(unittest.TestCase): + def setUp(self) -> None: + self.mock_endpoint = model.Endpoint( + interface="AAS-3.0", protocol_information=model.ProtocolInformation(href="https://example.org/") + ) + self.aasd1 = model.AssetAdministrationShellDescriptor( + id_="https://example.org/AASDescriptor/1", endpoints=[self.mock_endpoint] + ) + self.aasd2 = model.AssetAdministrationShellDescriptor( + id_="https://example.org/AASDescriptor/2", endpoints=[self.mock_endpoint] + ) + self.sd1 = model.SubmodelDescriptor( + id_="https://example.org/SubmodelDescriptor/1", endpoints=[self.mock_endpoint] + ) + self.sd2 = model.SubmodelDescriptor( + id_="https://example.org/SubmodelDescriptor/2", endpoints=[self.mock_endpoint] + ) + + def test_store_retrieve(self) -> None: + descriptor_store: DictDescriptorStore = DictDescriptorStore() + descriptor_store.add(self.aasd1) + descriptor_store.add(self.aasd2) + self.assertIn(self.aasd1, descriptor_store) + self.assertFalse(self.sd1 in descriptor_store) + + aasd3 = model.AssetAdministrationShellDescriptor( + id_="https://example.org/AASDescriptor/1", endpoints=[self.mock_endpoint] + ) + with self.assertRaises(KeyError) as cm: + descriptor_store.add(aasd3) + self.assertEqual( + "'Descriptor object with same id https://example.org/AASDescriptor/1 is already " "stored in this store'", + str(cm.exception), + ) + self.assertEqual(2, len(descriptor_store)) + self.assertIs(self.aasd1, descriptor_store.get("https://example.org/AASDescriptor/1")) + + descriptor_store.discard(self.aasd1) + with self.assertRaises(KeyError) as cm: + descriptor_store.get_item("https://example.org/AASDescriptor/1") + self.assertIsNone(descriptor_store.get("https://example.org/AASDescriptor/1")) + self.assertEqual("'https://example.org/AASDescriptor/1'", str(cm.exception)) + self.assertIs(self.aasd2, descriptor_store.pop()) + self.assertEqual(0, len(descriptor_store)) + + def test_store_update(self) -> None: + descriptor_store1: DictDescriptorStore = DictDescriptorStore() + descriptor_store2: DictDescriptorStore = DictDescriptorStore() + descriptor_store1.add(self.sd1) + descriptor_store2.add(self.sd2) + descriptor_store1.update(descriptor_store2) + self.assertIsInstance(descriptor_store1, DictDescriptorStore) + self.assertIn(self.sd2, descriptor_store1)