diff --git a/examples/ask_interaction.py b/examples/ask_interaction.py new file mode 100644 index 0000000..a597e4d --- /dev/null +++ b/examples/ask_interaction.py @@ -0,0 +1,48 @@ +from shared import get_example_logger + +from src import KnowledgeBase +from src.ke.models import BindingModel, Literal, Uri + +EXAMPLE_NAME = "ask-interaction" +logger = get_example_logger(EXAMPLE_NAME) + +kb = KnowledgeBase( + id="http://example.org/knowledge-mapper/ask-interaction#kb", + name="ask-interaction-kb", + description="An example KB that demonstrates handling an ASK KI.", + ke_url="http://localhost:8280/rest", +) + + +class PersonBinding(BindingModel): + person: Uri + name: Literal[str] + age: Literal[int] + + +kb.ask_ki( + name="ask-ki", + graph_pattern=""" + ?person a ex:Person ; + ex:hasName ?name ; + ex:hasAge ?age . + """, + binding_model=PersonBinding, + prefixes={"ex": "http://example.org/knowledge-mapper/ask-interaction#"}, +) + +if __name__ == "__main__": + kb.register() + logger.info("KB registered.") + result = kb.ask( + [ + { + "person": "http://example.org/knowledge-mapper/ask-interaction#person1", + } + ], + "ask-ki", + ) + logger.info(f"Received result from ASK KI: {result}") + + kb.unregister() + logger.info("KB unregistered.") diff --git a/examples/compose.yaml b/examples/compose.yaml index 4b42504..ac66abf 100644 --- a/examples/compose.yaml +++ b/examples/compose.yaml @@ -1,9 +1,21 @@ services: + knowledge-directory: + image: ghcr.io/tno/knowledge-engine/knowledge-directory:1.3.2 + ker-examples: image: ghcr.io/tno/knowledge-engine/smart-connector:1.3.2 ports: - - 8081:8081 - 8280:8280 environment: KE_RUNTIME_EXPOSED_URL: http://ker-examples:8081 KE_RUNTIME_PORT: 8081 + KD_URL: http://knowledge-directory:8282 + + ker-testing: + image: ghcr.io/tno/knowledge-engine/smart-connector:1.3.2 + ports: + - 8281:8280 + environment: + KE_RUNTIME_EXPOSED_URL: http://ker-testing:8081 + KE_RUNTIME_PORT: 8081 + KD_URL: http://knowledge-directory:8282 diff --git a/examples/post_measurement.py b/examples/post_measurement.py new file mode 100644 index 0000000..38f98e4 --- /dev/null +++ b/examples/post_measurement.py @@ -0,0 +1,81 @@ +import time +from datetime import datetime +from uuid import uuid4 + +from rdflib import URIRef +from shared import get_example_logger + +from src.ke.models import ( + BindingModel, + Literal, + Uri, +) +from src.knowledge_base import KnowledgeBase + +EXAMPLE_NAME = "post-measurement" +logger = get_example_logger(EXAMPLE_NAME) + +kb = KnowledgeBase( + id="http://example.org/knowledge-mapper/post-measurement#kb", + name="post-measurement-kb", + description="An example KB that demonstrates handling a POST KI for posting a new " + "measurement.", + ke_url="http://localhost:8280/rest", +) + + +class MeasurementBinding(BindingModel): + measurement: Uri + value: Literal[float] + unit: Uri + time: Literal[datetime] + + +class ResultBinding(BindingModel): + measurement: Uri + kb: Uri + + +kb.post_ki( + name="post-measurement-ki", + argument_graph_pattern=""" + ?measurement a ex:Measurement ; + ex:hasValue ?value ; + ex:hasUnit ?unit ; + ex:hasTime ?time . + """, + result_graph_pattern=""" + ?measurement a ex:Measurement ; + ex:storedBy ?kb ; + """, + prefixes={"ex": "http://example.org/knowledge-mapper/post-measurement#"}, + result_binding_model=ResultBinding, + argument_binding_model=MeasurementBinding, +) + + +if __name__ == "__main__": + kb.register() + logger.info("KB registered.") + time.sleep( + 5 + ) # Sleep for a bit to allow time for testing the POST KI with an external client + logger.info("Posting...") + result_bindings = kb.post( + [ + MeasurementBinding( + measurement=URIRef( + f"http://example.org/knowledge-mapper/post-measurement#measurement-{uuid4()}" + ), + value=99.9, + unit=URIRef( + "http://example.org/knowledge-mapper/post-measurement#Percent" + ), + time=datetime.now(), + ) + ], + "post-measurement-ki", + ) + logger.info(f"Received result bindings: {result_bindings}") + kb.unregister() + logger.info("KB unregistered.") diff --git a/examples/testing/kb.py b/examples/testing/kb.py new file mode 100644 index 0000000..1d52378 --- /dev/null +++ b/examples/testing/kb.py @@ -0,0 +1,113 @@ +import sys +from pathlib import Path +from time import sleep +from typing import cast + +from rdflib import URIRef + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from shared import get_example_logger + +from src import KnowledgeBase +from src.ke.models import BindingModel, Literal, Uri + +EXAMPLE_NAME = "testing" +logger = get_example_logger(EXAMPLE_NAME) + +kb = KnowledgeBase( + id="http://example.org/knowledge-mapper/testing#kb", + name="testing-kb", + description="An example KB that demonstrates testing the KB.", + ke_url="http://localhost:8280/rest", +) + +kb.ask_ki( + name="ask-ki-no-binding-model", + graph_pattern=""" + ?s a ex:TestSubject ; + ex:hasValue ?value . + """, + prefixes={"ex": "http://example.org/knowledge-mapper/testing#"}, +) + + +class TestBinding(BindingModel): + s: Uri + value: Literal[str] + + +kb.ask_ki( + name="ask-ki-with-binding-model", + graph_pattern=""" + ?s a ex:TestSubject ; + ex:hasValue ?value . + """, + binding_model=TestBinding, + prefixes={"ex": "http://example.org/knowledge-mapper/testing#"}, +) + + +def ask_for_values_of_subject(subject_name: str) -> list[str]: + result_binding_set: list[TestBinding] = kb.ask( + [ + TestBinding( + s=URIRef(f"http://example.org/knowledge-mapper/testing#{subject_name}"), + value=None, + ) + ], + "ask-ki-with-binding-model", + ) # pyright: ignore[reportAssignmentType] + return ( + [str(binding.value) for binding in result_binding_set] + if result_binding_set + else [] + ) + + +class ResultBinding(BindingModel): + s: Uri + other: Uri + + +kb.post_ki( + name="post-ki", + argument_graph_pattern=""" + ?s a ex:TestSubject ; + ex:hasValue ?value . + """, + result_graph_pattern=""" + ?s a ex:TestSubject ; + ex:storedBy ?other . + """, + argument_binding_model=TestBinding, + result_binding_model=ResultBinding, + prefixes={"ex": "http://example.org/knowledge-mapper/testing#"}, +) + + +def repeat_value_post(value: str, iterations: int) -> list[URIRef]: + result_binding_set: list[ResultBinding] = [] + for i in range(iterations): + result_binding_set.extend( + kb.post( + [ + TestBinding( + s=URIRef( + f"http://example.org/knowledge-mapper/testing#Subject-{i}" + ), + value=value, + ) + ], + "post-ki", + ) # type: ignore + ) + sleep(1) + return [cast(URIRef, binding.other) for binding in result_binding_set] + + +if __name__ == "__main__": + logger.info( + "This KB demonstrates testing, and is not meant to be run as a standalone " + "example." + ) diff --git a/examples/testing/test_kb.py b/examples/testing/test_kb.py new file mode 100644 index 0000000..67c0a9d --- /dev/null +++ b/examples/testing/test_kb.py @@ -0,0 +1,118 @@ +import pytest +from rdflib import URIRef + +from src.ke.testing import TestClient + +# Import the Knowledge Base that you would like to test, along with any relevant binding +# models. +from .kb import TestBinding, ask_for_values_of_subject, kb, repeat_value_post + +# In your tests you likely want to use the TestClient to mock results from the KE. +# A Knowledge Base instance is initialized with a real Client that makes HTTP requests +# to the KE, so its important to replace it with a TestClient +test_client = TestClient(fake_url="http://fake-ke") +kb.client = test_client +# Here the KB and its interactions are registered with the TestClient, which always +# succeeds. This registration is necessary for the KB to be able to execute +# interactions in the tests. +kb.register() + + +@pytest.fixture() +def client(): + return test_client + + +# In a test you can do any ASK interaction that is registered. +# The TestClient will return an empty result binding set by default, disregarding the +# input. +def test_ask_ki_no_resuls(): + result_binding_set = kb.ask([], "ask-ki-no-binding-model") + assert result_binding_set == [] + + +# You likely want to mock result binding sets, which can be done using the TestClient as +# in this test. The mocked result is returned when the ASK interaction is executed, +# disregarding the input. +def test_ask_ki_with_result(client: TestClient): + client.mock_result_binding_set( + "ask-ki-no-binding-model", + [ + { + "s": "", + "value": "test value", + } + ], + ) + result_binding_set = kb.ask([], "ask-ki-no-binding-model") + assert result_binding_set == [ + { + "s": "", + "value": "test value", + } + ] + + +# This is a little more useful when you have a binding model, testing the correctness of +# the binding model according to the graph pattern. One test per interaction like this +# per interaction is probably a good idea, to isolate issues with the binding model from +# other issues. +def test_ask_ki_with_binding_model(client: TestClient): + client.mock_result_binding_set( + "ask-ki-with-binding-model", + [ + { + "s": "", + "value": "test value", + } + ], + ) + + result_binding_set = kb.ask( + [ + TestBinding( + s=URIRef("http://example.org/knowledge-mapper/testing#Subject"), + value=None, + ) + ], + "ask-ki-with-binding-model", + ) + assert result_binding_set == [ + TestBinding( + s=URIRef("http://example.org/knowledge-mapper/testing#Subject"), + value="test value", + ) + ] + + +# However, most likely you will want to test the logic around interactions, where you +# might want to mock different results for different inputs. +def test_function_containing_ask(client: TestClient): + client.mock_result_binding_set( + ki_name="ask-ki-with-binding-model", + binding_set=[ + TestBinding( + s=URIRef("http://example.org/knowledge-mapper/testing#Subject"), + value="test value", + ).model_dump(), + ], + ) + + result = ask_for_values_of_subject("Subject") + assert result == ["test value"] + + +# Similar approaches can be taken for POST interactions. +def test_function_containing_post(client: TestClient): + client.mock_result_binding_set( + ki_name="post-ki", + binding_set=[ + { + "s": "", + "other": "", + } + ], + ) + + result = repeat_value_post("test value", 1) + assert result == [URIRef("http://example.org/knowledge-mapper/testing#Other")] diff --git a/src/ke/client.py b/src/ke/client.py index 58d6dc0..4b85fff 100644 --- a/src/ke/client.py +++ b/src/ke/client.py @@ -9,11 +9,13 @@ from .errors import SmartConnectorNotFoundError, UnexpectedHttpResponseError from .models import ( AskAnswerInteractionInfo, + AskResult, BindingSet, KiTypes, KnowledgeBaseInfo, KnowledgeInteractionInfo, PostReactInteractionInfo, + PostResult, ) logger = logging.getLogger(__name__) @@ -129,6 +131,42 @@ def poll_ki_call(self, kb_id: str) -> tuple[PollResult, HandleRequest | None]: """ ... + def ask( + self, + kb_id: str, + ki_id: str, + binding_set: BindingSet, + recipient_ids: list[str] | None = None, + ) -> AskResult: + """Execute an ASK interaction by sending the given binding set as the + response to the KI call and returning the resulting binding set from the KE. + + Raises: + SmartConnectorNotFoundError: If no smart connector exists for the given KB + ID. + UnexpectedHttpResponseError: If the KE runtime returns an unexpected HTTP + response. + """ + ... + + def post( + self, + kb_id: str, + ki_id: str, + binding_set: BindingSet, + recipient_ids: list[str] | None = None, + ) -> PostResult: + """Execute a POST interaction by sending the given binding set as the + response to the KI call. + + Raises: + SmartConnectorNotFoundError: If no smart connector exists for the given KB + ID. + UnexpectedHttpResponseError: If the KE runtime returns an unexpected HTTP + response. + """ + ... + @property def ke_url(self) -> str: """Return the base URL of the KE runtime this client is communicating with.""" @@ -288,6 +326,68 @@ def post_handle_response( if not response.ok: raise UnexpectedHttpResponseError(response) + def post( + self, + kb_id: str, + ki_id: str, + binding_set: BindingSet, + recipient_ids: list[str] | None = None, + ) -> PostResult: + if recipient_ids is not None: + payload = { + "bindingSet": binding_set, + "recipientSelector": { + "knowledgeBases": recipient_ids, + }, + } + else: + payload = binding_set + + response = requests.post( + f"{self.ke_url}/sc/post", + json=payload, + headers={ + "Knowledge-Base-Id": kb_id, + "Knowledge-Interaction-Id": ki_id, + }, + ) + + if not response.ok: + raise UnexpectedHttpResponseError(response) + + return PostResult.model_validate(response.json()) + + def ask( + self, + kb_id: str, + ki_id: str, + binding_set: BindingSet, + recipient_ids: list[str] | None = None, + ) -> AskResult: + if recipient_ids is not None: + payload = { + "bindingSet": binding_set, + "recipientSelector": { + "knowledgeBases": recipient_ids, + }, + } + else: + payload = binding_set + + response = requests.post( + f"{self.ke_url}/sc/ask", + json=payload, + headers={ + "Knowledge-Base-Id": kb_id, + "Knowledge-Interaction-Id": ki_id, + }, + ) + + if not response.ok: + raise UnexpectedHttpResponseError(response) + + return AskResult.model_validate(response.json()) + @property def ke_url(self) -> str: return self._ke_url diff --git a/src/ke/models.py b/src/ke/models.py index 9260b47..87e21b9 100644 --- a/src/ke/models.py +++ b/src/ke/models.py @@ -1,4 +1,5 @@ from collections.abc import Sequence +from datetime import datetime from enum import StrEnum from typing import Annotated, Any, Self, TypeVar @@ -140,3 +141,40 @@ class AskAnswerInteractionInfo(KnowledgeInteractionInfo): class PostReactInteractionInfo(KnowledgeInteractionInfo): argument_graph_pattern: str result_graph_pattern: str + + +class Initiator(StrEnum): + KNOWLEDGE_BASE = "knowledgeBase" + REASONER = "reasoner" + + +class ExchangeInfo(BaseModel): + model_config = ConfigDict( + alias_generator=to_camel, frozen=True, populate_by_name=True + ) + + initiator: Initiator + knowledge_base_id: str + knowledge_interaction_id: str + exchange_start: datetime + exchange_end: datetime + status: str + failed_message: str | None = None + + +class AskResult(BaseModel): + model_config = ConfigDict( + alias_generator=to_camel, frozen=True, populate_by_name=True + ) + + binding_set: BindingSet + exchange_info: list[ExchangeInfo] + + +class PostResult(BaseModel): + model_config = ConfigDict( + alias_generator=to_camel, frozen=True, populate_by_name=True + ) + + result_binding_set: BindingSet + exchange_info: list[ExchangeInfo] diff --git a/src/ke/testing/__init__.py b/src/ke/testing/__init__.py new file mode 100644 index 0000000..24e57bb --- /dev/null +++ b/src/ke/testing/__init__.py @@ -0,0 +1 @@ +from .fake_client import TestClient diff --git a/src/ke/testing/fake_client.py b/src/ke/testing/fake_client.py new file mode 100644 index 0000000..5a778c1 --- /dev/null +++ b/src/ke/testing/fake_client.py @@ -0,0 +1,155 @@ +"""In-memory FakeClient that satisfies ClientProtocol for use in tests.""" + +from datetime import UTC, datetime + +from src.ke.client import ClientProtocol, PollResult +from src.ke.models import ( + AskResult, + BindingSet, + ExchangeInfo, + Initiator, + KnowledgeBaseInfo, + KnowledgeInteractionInfo, + PostResult, +) + + +class TestClient(ClientProtocol): + """A lightweight in-memory stand-in for Client. Always succeeds.""" + + def __init__(self, fake_url) -> None: + self._knowledge_bases: dict[str, KnowledgeBaseInfo] = {} + # Maps kb_id -> list of registered KIs + self._knowledge_interactions: dict[str, list[KnowledgeInteractionInfo]] = {} + self._next_ki_id: int = 1 + self._ke_url = fake_url + # Maps ki_name -> BindingSet to return from execute_post_interaction + self._mock_interaction_results: dict[str, BindingSet] = {} + + def ke_is_available(self) -> bool: + return True + + def ke_version(self) -> str: + return "0.0.0-fake" + + def get_knowledge_base(self, id: str) -> KnowledgeBaseInfo | None: + return self._knowledge_bases.get(id) + + def get_all_knowledge_bases(self) -> list[KnowledgeBaseInfo]: + return list(self._knowledge_bases.values()) + + def register_kb(self, info: KnowledgeBaseInfo, reregister: bool = True) -> None: + if info.id in self._knowledge_bases: + if reregister: + self.unregister_kb(info.id) + else: + return + self._knowledge_bases[info.id] = info + self._knowledge_interactions[info.id] = [] + + def unregister_kb(self, id: str) -> None: + self._knowledge_bases.pop(id) + self._knowledge_interactions.pop(id, None) + + def get_all_knowledge_interactions( + self, kb_id: str + ) -> list[KnowledgeInteractionInfo]: + return list(self._knowledge_interactions.get(kb_id, [])) + + def register_ki( + self, kb_id: str, ki: KnowledgeInteractionInfo + ) -> KnowledgeInteractionInfo: + registered = ki.model_copy(update={"id": f"fake-ki-{self._next_ki_id}"}) + self._next_ki_id += 1 + self._knowledge_interactions.setdefault(kb_id, []).append(registered) + return registered + + def poll_ki_call(self, kb_id: str) -> tuple[PollResult, None]: + # This fake client never returns any KI calls to handle, but always asks to + # repoll. + return (PollResult.REPOLL, None) + + def mock_result_binding_set(self, ki_name: str, binding_set: BindingSet) -> None: + """Store a result binding set to be returned when execute_post_interaction + is called for the KI with the given name.""" + self._mock_interaction_results[ki_name] = binding_set + + def ask( + self, + kb_id: str, + ki_id: str, + binding_set: BindingSet, + recipient_ids: list[str] | None = None, + ) -> AskResult: + # Look up KI by ID to find its name, then check for a mocked result. + ki = next( + ( + ki + for kis in self._knowledge_interactions.values() + for ki in kis + if ki.id == ki_id + ), + None, + ) + ki_name = ki.name if ki is not None else None + binding_set = ( + self._mock_interaction_results[ki_name] + if ki_name is not None and ki_name in self._mock_interaction_results + else [] + ) + now = datetime.now(tz=UTC) + return AskResult( + binding_set=binding_set, + exchange_info=[ + ExchangeInfo( + initiator=Initiator.KNOWLEDGE_BASE, + knowledge_base_id=kb_id, + knowledge_interaction_id=ki_id, + exchange_start=now, + exchange_end=now, + status="OK", + ) + ], + ) + + def post( + self, + kb_id: str, + ki_id: str, + binding_set: BindingSet, + recipient_ids: list[str] | None = None, + ) -> PostResult: + # Look up KI by ID to find its name, then check for a mocked result. + ki = next( + ( + ki + for kis in self._knowledge_interactions.values() + for ki in kis + if ki.id == ki_id + ), + None, + ) + ki_name = ki.name if ki is not None else None + result_binding_set = ( + self._mock_interaction_results[ki_name] + if ki_name is not None and ki_name in self._mock_interaction_results + else [] + ) + now = datetime.now(tz=UTC) + return PostResult( + result_binding_set=result_binding_set, + exchange_info=[ + ExchangeInfo( + initiator=Initiator.KNOWLEDGE_BASE, + knowledge_base_id=kb_id, + knowledge_interaction_id=ki_id, + exchange_start=now, + exchange_end=now, + status="OK", + ) + ], + ) + + @property + def ke_url(self) -> str: + return self._ke_url diff --git a/src/knowledge_base.py b/src/knowledge_base.py index 5e4ae36..f7d4ce5 100644 --- a/src/knowledge_base.py +++ b/src/knowledge_base.py @@ -20,8 +20,6 @@ Handler, KnowledgeInteractionContext, KnowledgeInteractionStatus, - default_ask_handler, - default_post_handler, ) from .settings import KnowledgeBaseSettings @@ -248,9 +246,10 @@ def ask_ki( self, name: str, graph_pattern: str, + binding_model: type[BindingModel] | None = None, prefixes: dict | None = None, defer_ke_registration: bool = True, - ) -> Callable[[Handler], Handler]: + ) -> None: """Return a decorator that registers the decorated function as an ASK KI handler. @@ -262,15 +261,22 @@ def ask_ki( UnexpectedHttpResponseError: Propagated from ``register_ki`` when contacting the KE runtime. """ - return self._register_ki_decorator( - info=AskAnswerInteractionInfo( - type=KiTypes.ASK, - name=name, - prefixes=prefixes or dict(), - graph_pattern=graph_pattern, + self.register_ki( + KnowledgeInteractionContext( + info=AskAnswerInteractionInfo( + type=KiTypes.ASK, + name=name, + prefixes=prefixes or dict(), + graph_pattern=graph_pattern, + ), + handler=None, + status=KnowledgeInteractionStatus.UNREGISTERED, + validation_model=binding_model, + serialization_model=binding_model, ), defer_ke_registration=defer_ke_registration, ) + return def answer_ki( self, @@ -305,11 +311,13 @@ def post_ki( name: str, argument_graph_pattern: str, result_graph_pattern: str, + argument_binding_model: type[BindingModel] | None = None, + result_binding_model: type[BindingModel] | None = None, prefixes: dict | None = None, defer_ke_registration: bool = True, - ) -> Callable[[Handler], Handler]: - """Return a decorator that registers the decorated function as a POST KI - handler. + ) -> None: + """Register a POST KI at the KE runtime with optional argument and result + binding models. Raises: ValueError: Propagated from ``register_ki`` if registration constraints are @@ -319,16 +327,23 @@ def post_ki( UnexpectedHttpResponseError: Propagated from ``register_ki`` when contacting the KE runtime. """ - return self._register_ki_decorator( - info=PostReactInteractionInfo( - type=KiTypes.POST, - name=name, - prefixes=prefixes or dict(), - argument_graph_pattern=argument_graph_pattern, - result_graph_pattern=result_graph_pattern, + self.register_ki( + KnowledgeInteractionContext( + info=PostReactInteractionInfo( + type=KiTypes.POST, + name=name, + prefixes=prefixes or dict(), + argument_graph_pattern=argument_graph_pattern, + result_graph_pattern=result_graph_pattern, + ), + handler=None, + status=KnowledgeInteractionStatus.UNREGISTERED, + validation_model=result_binding_model, + serialization_model=argument_binding_model, ), defer_ke_registration=defer_ke_registration, ) + return def react_ki( self, @@ -439,9 +454,7 @@ def ki_from_settings_with_default_handler( self.register_ki( KnowledgeInteractionContext( info=info, - handler=default_ask_handler - if info.type == KiTypes.ASK - else default_post_handler, + handler=None, ), defer_ke_registration=defer_ke_registration, ) @@ -454,6 +467,8 @@ def call(self, binding_set: BindingSet, ki_name: str) -> BindingSet: KeyError: If ``ki_name`` is not found in the local KI registry. """ ki_ctx = self.ki_registry[ki_name] + assert ki_ctx.handler is not None # Should always be set for ANSWER/REACT KI's + if ki_ctx.validation_model: binding_models = [ ki_ctx.validation_model.model_validate(b) for b in binding_set @@ -467,6 +482,98 @@ def call(self, binding_set: BindingSet, ki_name: str) -> BindingSet: result_bindings = [b.model_dump() for b in result_bindings] # pyright: ignore[reportAttributeAccessIssue], return result_bindings # pyright: ignore[reportReturnType] + def post( + self, binding_set: Sequence[BindingModel] | BindingSet, ki_name: str + ) -> Sequence[BindingModel] | BindingSet: + """... + + Raises: + KeyError: If ``ki_name`` is not found in the local KI registry. + ValueError: If the KI is not registered at the KE runtime. + """ + ki_ctx = self.ki_registry[ki_name] + if ki_ctx.info.type != KiTypes.POST: + raise ValueError( + f"KI named '{ki_name}' is of type {ki_ctx.info.type}, not POST, and " + f"cannot be called with the post() method." + ) + if ki_ctx.status != KnowledgeInteractionStatus.REGISTERED: + raise ValueError( + f"Cannot call KI '{ki_name}' because it is not registered. Please " + f"register the KB and sync KIs first." + ) + assert ki_ctx.info.id is not None # Should always be set for registered KIs + + if ki_ctx.serialization_model: + # We can assume the result bindings are BindingModels, so we can model_dump + binding_models = [ + b.model_dump() # pyright: ignore[reportAttributeAccessIssue] + for b in binding_set + ] + post_result = self.client.post( + kb_id=self.info.id, + ki_id=ki_ctx.info.id, + binding_set=binding_models, + ) + else: + post_result = self.client.post( + kb_id=self.info.id, + ki_id=ki_ctx.info.id, + binding_set=binding_set, # pyright: ignore[reportArgumentType] + ) + + if ki_ctx.validation_model and post_result.result_binding_set: + result_bindings = [ + ki_ctx.validation_model.model_validate(b) + for b in post_result.result_binding_set + ] + return result_bindings + else: + return post_result.result_binding_set + + def ask( + self, binding_set: Sequence[BindingModel] | BindingSet, ki_name: str + ) -> Sequence[BindingModel] | BindingSet: + """...""" + ki_ctx = self.ki_registry[ki_name] + if ki_ctx.info.type != KiTypes.ASK: + raise ValueError( + f"KI named '{ki_name}' is of type {ki_ctx.info.type}, not ASK, and " + f"cannot be called with the ask() method." + ) + if ki_ctx.status != KnowledgeInteractionStatus.REGISTERED: + raise ValueError( + f"Cannot call KI '{ki_name}' because it is not registered. Please " + f"register the KB and sync KIs first." + ) + assert ki_ctx.info.id is not None # Should always be set for registered KIs + if ki_ctx.serialization_model: + # We can assume the result bindings are BindingModels, so we can model_dump + binding_models = [ + b.model_dump() # pyright: ignore[reportAttributeAccessIssue] + for b in binding_set + ] + ask_result = self.client.ask( + kb_id=self.info.id, + ki_id=ki_ctx.info.id, + binding_set=binding_models, + ) + else: + ask_result = self.client.ask( + kb_id=self.info.id, + ki_id=ki_ctx.info.id, + binding_set=binding_set, # pyright: ignore[reportArgumentType] + ) + + if ki_ctx.validation_model and ask_result.binding_set: + result_bindings = [ + ki_ctx.validation_model.model_validate(b) + for b in ask_result.binding_set + ] + return result_bindings + else: + return ask_result.binding_set + def start_handling_loop(self, loops: int | None = None) -> None: """Poll the KE runtime for incoming KI calls and dispatch them to handlers. diff --git a/src/knowledge_interaction.py b/src/knowledge_interaction.py index 1141ebc..c9ce89b 100644 --- a/src/knowledge_interaction.py +++ b/src/knowledge_interaction.py @@ -1,10 +1,10 @@ import inspect from collections.abc import Callable, Sequence -from dataclasses import dataclass, field +from dataclasses import dataclass from enum import StrEnum from typing import Any, Concatenate, get_args -from src.ke.models import BindingModel, BindingSet, KnowledgeInteractionInfo +from src.ke.models import BindingModel, BindingSet, KiTypes, KnowledgeInteractionInfo type Handler[B, **P] = Callable[ Concatenate[B, KnowledgeInteractionInfo, P], @@ -12,28 +12,6 @@ ] -def default_ask_handler( - binding_set: BindingSet, info: KnowledgeInteractionInfo, keyword: str -) -> BindingSet: - # TODO: Implement a default ASK handler when implementing serialization and - # validation of binding sets - raise NotImplementedError( - "default_ask_handler is not yet implemented. " - "Provide a custom handler via ki_from_settings instead." - ) - - -def default_post_handler( - binding_set: BindingSet, info: KnowledgeInteractionInfo, keyword: str -) -> BindingSet: - # TODO: Implement a default POST handler when implementing serialization and - # validation of binding sets - raise NotImplementedError( - "default_post_handler is not yet implemented. " - "Provide a custom handler via ki_from_settings instead." - ) - - class KnowledgeInteractionStatus(StrEnum): REGISTERED = "registered" UNREGISTERED = "unregistered" @@ -42,17 +20,20 @@ class KnowledgeInteractionStatus(StrEnum): @dataclass class KnowledgeInteractionContext[B, **P]: info: KnowledgeInteractionInfo - handler: Handler[B, P] + handler: Handler[B, P] | None status: KnowledgeInteractionStatus = KnowledgeInteractionStatus.UNREGISTERED - validation_model: type[BindingModel] | None = field(init=False, default=None) - serialization_model: type[BindingModel] | None = field(init=False, default=None) + validation_model: type[BindingModel] | None = None + serialization_model: type[BindingModel] | None = None def __post_init__(self): - if not callable(self.handler): - raise ValueError("Handler must be a callable.") - - self.validation_model = self._inspect_incoming_binding_model(self.handler) - self.serialization_model = self._inspect_outgoing_binding_model(self.handler) + if self.info.type == KiTypes.ANSWER or self.info.type == KiTypes.REACT: + if not callable(self.handler): + raise ValueError("Handler must be a callable.") + + self.validation_model = self._inspect_incoming_binding_model(self.handler) + self.serialization_model = self._inspect_outgoing_binding_model( + self.handler + ) def _inspect_incoming_binding_model( self, handler: Callable[..., Any] diff --git a/tests/fake_client.py b/tests/fake_client.py deleted file mode 100644 index 3c7aeee..0000000 --- a/tests/fake_client.py +++ /dev/null @@ -1,62 +0,0 @@ -"""In-memory FakeClient that satisfies ClientProtocol for use in tests.""" - -from src.ke.client import PollResult -from src.ke.models import KnowledgeBaseInfo, KnowledgeInteractionInfo - - -class FakeClient: - """A lightweight in-memory stand-in for Client. Always succeeds.""" - - def __init__(self, fake_url) -> None: - self._knowledge_bases: dict[str, KnowledgeBaseInfo] = {} - # Maps kb_id -> list of registered KIs - self._knowledge_interactions: dict[str, list[KnowledgeInteractionInfo]] = {} - self._next_ki_id: int = 1 - self._ke_url = fake_url - - def ke_is_available(self) -> bool: - return True - - def ke_version(self) -> str: - return "0.0.0-fake" - - def get_knowledge_base(self, id: str) -> KnowledgeBaseInfo | None: - return self._knowledge_bases.get(id) - - def get_all_knowledge_bases(self) -> list[KnowledgeBaseInfo]: - return list(self._knowledge_bases.values()) - - def register_kb(self, info: KnowledgeBaseInfo, reregister: bool = True) -> None: - if info.id in self._knowledge_bases: - if reregister: - self.unregister_kb(info.id) - else: - return - self._knowledge_bases[info.id] = info - self._knowledge_interactions[info.id] = [] - - def unregister_kb(self, id: str) -> None: - self._knowledge_bases.pop(id) - self._knowledge_interactions.pop(id, None) - - def get_all_knowledge_interactions( - self, kb_id: str - ) -> list[KnowledgeInteractionInfo]: - return list(self._knowledge_interactions.get(kb_id, [])) - - def register_ki( - self, kb_id: str, ki: KnowledgeInteractionInfo - ) -> KnowledgeInteractionInfo: - registered = ki.model_copy(update={"id": f"fake-ki-{self._next_ki_id}"}) - self._next_ki_id += 1 - self._knowledge_interactions.setdefault(kb_id, []).append(registered) - return registered - - def poll_ki_call(self, kb_id: str) -> tuple[PollResult, None]: - # This fake client never returns any KI calls to handle, but always asks to - # repoll. - return (PollResult.REPOLL, None) - - @property - def ke_url(self) -> str: - return self._ke_url diff --git a/tests/test_ask_and_post.py b/tests/test_ask_and_post.py new file mode 100644 index 0000000..abc078b --- /dev/null +++ b/tests/test_ask_and_post.py @@ -0,0 +1,220 @@ +import pytest +from rdflib import URIRef + +from src import KnowledgeBase +from src.ke.models import BindingModel, Literal, Uri +from src.ke.testing import TestClient + + +@pytest.fixture +def client(): + return TestClient(fake_url="http://fake-ke") + + +@pytest.fixture +def kb(client: TestClient): + kb = KnowledgeBase( + id="http://example.org/test#kb", + name="test-kb", + description="A KB for testing.", + ke_url="http://fake-ke", + ) + kb.client = client + kb.register() + return kb + + +def test_ask_interaction_no_binding_models(kb: KnowledgeBase, client: TestClient): + kb.ask_ki( + name="ask-ki", + graph_pattern=""" + ?person a ex:Person . + ex:hasName ?name . + ex:hasAge ?age . + """, + prefixes={"ex": "http://example.org/test#"}, + defer_ke_registration=False, + ) + + client.mock_result_binding_set( + ki_name="ask-ki", + binding_set=[ + { + "person": "http://example.org/test#person1", + "name": "'Alice'^^xsd:string", + "age": "'30'^^xsd:integer", + } + ], + ) + + result = kb.ask( + [ + { + "person": "http://example.org/test#person1", + } + ], + "ask-ki", + ) + + assert result == [ + { + "person": "http://example.org/test#person1", + "name": "'Alice'^^xsd:string", + "age": "'30'^^xsd:integer", + } + ] + + +def test_ask_interaction_with_binding_models(kb: KnowledgeBase, client: TestClient): + class PersonBinding(BindingModel): + person: Uri + name: Literal[str] + age: Literal[int] + + kb.ask_ki( + name="ask-ki", + graph_pattern=""" + ?person a ex:Person . + ex:hasName ?name . + ex:hasAge ?age . + """, + binding_model=PersonBinding, + prefixes={"ex": "http://example.org/test#"}, + defer_ke_registration=False, + ) + + client.mock_result_binding_set( + ki_name="ask-ki", + binding_set=[ + { + "person": "", + "name": '"Alice"^^xsd:string', + "age": '"30"^^xsd:integer', + } + ], + ) + + result = kb.ask( + [ + PersonBinding( + person=URIRef("http://example.org/test#person1"), + name=None, + age=None, + ) + ], + "ask-ki", + ) + + assert result == [ + PersonBinding( + person=URIRef("http://example.org/test#person1"), + name="Alice", + age=30, + ) + ] + + +def test_post_measurement_no_binding_models(kb: KnowledgeBase, client: TestClient): + kb.post_ki( + name="post-ki", + argument_graph_pattern=""" + ?measurement a ex:Measurement ; + ex:hasValue ?value ; + ex:hasUnit ?unit ; + ex:hasTime ?time . + """, + result_graph_pattern=""" + ?measurement a ex:Measurement ; + ex:storedBy ?kb ; + """, + prefixes={"ex": "http://example.org/test#"}, + defer_ke_registration=False, + ) + + client.mock_result_binding_set( + ki_name="post-ki", + binding_set=[ + { + "measurement": "", + "kb": "", + } + ], + ) + + result = kb.post( + [ + { + "measurement": "", + "value": "'42.0'^^xsd:float", + "unit": "", + "time": "'2024-01-01T12:00:00Z'^^xsd:dateTime", + } + ], + "post-ki", + ) + + assert result == [ + { + "measurement": "", + "kb": "", + } + ] + + +def test_post_measurement_with_binding_models(kb: KnowledgeBase, client: TestClient): + class MeasurementBinding(BindingModel): + measurement: Uri + value: Literal[float] + unit: Uri + time: Literal[str] + + class ResultBinding(BindingModel): + measurement: Uri + kb: Uri + + kb.post_ki( + name="post-ki", + argument_graph_pattern=""" + ?measurement a ex:Measurement ; + ex:hasValue ?value ; + ex:hasUnit ?unit ; + ex:hasTime ?time . + """, + result_graph_pattern=""" + ?measurement a ex:Measurement ; + ex:storedBy ?kb ; + """, + prefixes={"ex": "http://example.org/test#"}, + argument_binding_model=MeasurementBinding, + result_binding_model=ResultBinding, + defer_ke_registration=False, + ) + + client.mock_result_binding_set( + ki_name="post-ki", + binding_set=[ + { + "measurement": "", + "kb": "", + } + ], + ) + + result = kb.post( + [ + MeasurementBinding( + measurement=URIRef("http://example.org/test#measurement1"), + value=42.0, + unit=URIRef("http://example.org/test#unit1"), + time="2024-01-01T12:00:00Z", + ) + ], + "post-ki", + ) + + assert result == [ + ResultBinding( + measurement=URIRef("http://example.org/test#measurement1"), + kb=URIRef("http://example.org/test#kb"), + ) + ] diff --git a/tests/test_kb_lifespan.py b/tests/test_kb_lifespan.py index 4a10203..8fade58 100644 --- a/tests/test_kb_lifespan.py +++ b/tests/test_kb_lifespan.py @@ -4,17 +4,17 @@ from src import KnowledgeBase from src.ke.errors import KnowledgeEngineNotAvailableError +from src.ke.testing import TestClient from src.knowledge_base import KnowledgeBaseState -from tests.fake_client import FakeClient @pytest.fixture -def client() -> FakeClient: - return FakeClient(fake_url="http://fake-ke") +def client() -> TestClient: + return TestClient(fake_url="http://fake-ke") @pytest.fixture -def kb(client: FakeClient) -> KnowledgeBase: +def kb(client: TestClient) -> KnowledgeBase: kb = KnowledgeBase( id="http://example.org/test#kb", name="test-kb", @@ -37,7 +37,7 @@ def test_connect_raises_if_ke_unavailable(kb: KnowledgeBase): kb.connect() -def test_register_unregister_cycle(kb: KnowledgeBase, client: FakeClient): +def test_register_unregister_cycle(kb: KnowledgeBase, client: TestClient): kb.connect() kb.register() assert kb.state == KnowledgeBaseState.REGISTERED diff --git a/tests/test_ki_registration.py b/tests/test_ki_registration.py index 7782389..7edfb94 100644 --- a/tests/test_ki_registration.py +++ b/tests/test_ki_registration.py @@ -7,11 +7,11 @@ KiTypes, KnowledgeInteractionInfo, ) +from src.ke.testing import TestClient from src.knowledge_interaction import ( KnowledgeInteractionContext, KnowledgeInteractionStatus, ) -from tests.fake_client import FakeClient # Not a fixture as a fresh KB instance is needed for each test. @@ -22,7 +22,7 @@ def kb_setup() -> KnowledgeBase: description="A KB for testing.", ke_url="http://fake-ke", ) - kb.client = FakeClient(fake_url="http://fake-ke") + kb.client = TestClient(fake_url="http://fake-ke") return kb