diff --git a/src/AppModel/NetDaemon.AppModel.SourceDeployedApps/NetDaemon.AppModel.SourceDeployedApps.csproj b/src/AppModel/NetDaemon.AppModel.SourceDeployedApps/NetDaemon.AppModel.SourceDeployedApps.csproj index bca728b5e..5162985de 100644 --- a/src/AppModel/NetDaemon.AppModel.SourceDeployedApps/NetDaemon.AppModel.SourceDeployedApps.csproj +++ b/src/AppModel/NetDaemon.AppModel.SourceDeployedApps/NetDaemon.AppModel.SourceDeployedApps.csproj @@ -29,21 +29,21 @@ - - - - - - - + + + + + + + - + runtime; build; native; contentfiles; analyzers; buildtransitive all - + diff --git a/src/AppModel/NetDaemon.AppModel.Tests/NetDaemon.AppModel.Tests.csproj b/src/AppModel/NetDaemon.AppModel.Tests/NetDaemon.AppModel.Tests.csproj index 173df4641..da6cd8ee7 100644 --- a/src/AppModel/NetDaemon.AppModel.Tests/NetDaemon.AppModel.Tests.csproj +++ b/src/AppModel/NetDaemon.AppModel.Tests/NetDaemon.AppModel.Tests.csproj @@ -11,11 +11,11 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - - + + - + @@ -23,11 +23,11 @@ runtime; build; native; contentfiles; analyzers; buildtransitive all - + runtime; build; native; contentfiles; analyzers; buildtransitive all - + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/src/AppModel/NetDaemon.AppModel/NetDaemon.AppModel.csproj b/src/AppModel/NetDaemon.AppModel/NetDaemon.AppModel.csproj index a9b95ee94..97083f219 100644 --- a/src/AppModel/NetDaemon.AppModel/NetDaemon.AppModel.csproj +++ b/src/AppModel/NetDaemon.AppModel/NetDaemon.AppModel.csproj @@ -28,19 +28,19 @@ - - - - - - - + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive all - + diff --git a/src/Client/NetDaemon.HassClient.Debug/NetDaemon.HassClient.Debug.csproj b/src/Client/NetDaemon.HassClient.Debug/NetDaemon.HassClient.Debug.csproj index 19c802fb5..4a6110d93 100644 --- a/src/Client/NetDaemon.HassClient.Debug/NetDaemon.HassClient.Debug.csproj +++ b/src/Client/NetDaemon.HassClient.Debug/NetDaemon.HassClient.Debug.csproj @@ -24,11 +24,11 @@ - - - - - + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/AssuredMqttConnectionTests.cs b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/AssuredMqttConnectionTests.cs index 7206d3555..16c97a957 100644 --- a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/AssuredMqttConnectionTests.cs +++ b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/AssuredMqttConnectionTests.cs @@ -1,48 +1,216 @@ -using MQTTnet.Extensions.ManagedClient; +using MQTTnet; +using MQTTnet.Packets; using NetDaemon.Extensions.MqttEntityManager; -using NetDaemon.Extensions.MqttEntityManager.Helpers; namespace NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests; public class AssuredMqttConnectionTests { + private static readonly TimeSpan DefaultWaitTimeout = TimeSpan.FromSeconds(5); + private static readonly TimeSpan RetryWaitTimeout = TimeSpan.FromSeconds(10); + + [Fact] + public async Task QueuesPublishUntilConnected() + { + var setup = new AssuredMqttConnectionSetup(); + using var conn = setup.CreateConnection(); + + var message = new MqttApplicationMessageBuilder() + .WithTopic("topic") + .WithPayload("payload") + .Build(); + + await conn.PublishAsync(message); + setup.PublishedMessages.Should().BeEmpty(); + + setup.CompleteConnect(); + + var publishedMessage = await setup.WaitForPublishedMessageAsync(); + publishedMessage.Should().Be(message); + } + + [Fact] + public async Task ReplaysSubscriptionsAfterConnect() + { + var setup = new AssuredMqttConnectionSetup(); + using var conn = setup.CreateConnection(); + + var topicFilter = new MqttTopicFilterBuilder() + .WithTopic("homeassistant/domain/sensor/set") + .Build(); + + await conn.SubscribeAsync(topicFilter); + setup.SubscribedTopics.Should().BeEmpty(); + + setup.CompleteConnect(); + + var subscribedTopic = await setup.WaitForSubscribedTopicAsync(); + subscribedTopic.Should().Be("homeassistant/domain/sensor/set"); + } + [Fact] - public async Task CanGetClient() + public async Task PublishesImmediatelyWhenAlreadyConnected() { - var logger = new Mock>(); + var setup = new AssuredMqttConnectionSetup(); + using var conn = setup.CreateConnection(); + setup.CompleteConnect(); + await setup.WaitForConnectedAsync(); + + var message = new MqttApplicationMessageBuilder() + .WithTopic("topic") + .WithPayload("payload") + .Build(); - var mqttClient = new Mock(); - var mqttFactory = new MqttFactoryWrapper(mqttClient.Object); - var mqttClientOptionsFactory = new Mock(); - var mqttConfigurationOptions = new Mock>(); + await conn.PublishAsync(message); - ConfigureMockOptions(mqttConfigurationOptions); + var publishedMessage = await setup.WaitForPublishedMessageAsync(); + publishedMessage.Should().Be(message); + } + + [Fact] + public async Task RetriesAfterTransientConnectionFailure() + { + var setup = new AssuredMqttConnectionSetup(); + setup.FailConnectAttempts(1); + using var conn = setup.CreateConnection(); + + var topicFilter = new MqttTopicFilterBuilder() + .WithTopic("homeassistant/domain/switch/set") + .Build(); - mqttClientOptionsFactory.Setup(f => f.CreateClientOptions(It.Is(o => o.Host == "localhost" && o.UserName == "id"))) - .Returns(new ManagedMqttClientOptions()) - .Verifiable(Times.Once); + var message = new MqttApplicationMessageBuilder() + .WithTopic("homeassistant/domain/switch/state") + .WithPayload("ON") + .Build(); - var conn = new AssuredMqttConnection(logger.Object, mqttClientOptionsFactory.Object, mqttFactory, mqttConfigurationOptions.Object); - var returnedClient = await conn.GetClientAsync(); + await conn.SubscribeAsync(topicFilter); + await conn.PublishAsync(message); + setup.CompleteConnect(); - returnedClient.Should().Be(mqttClient.Object); + var subscribedTopic = await setup.WaitForSubscribedTopicAsync(RetryWaitTimeout); + var publishedMessage = await setup.WaitForPublishedMessageAsync(RetryWaitTimeout); - mqttClientOptionsFactory.VerifyAll(); - mqttConfigurationOptions.VerifyAll(); + setup.ConnectAttempts.Should().BeGreaterThan(1); + subscribedTopic.Should().Be("homeassistant/domain/switch/set"); + publishedMessage.Should().Be(message); } - private static void ConfigureMockOptions(Mock> mockOptions, Action? configuration = null) + private sealed class AssuredMqttConnectionSetup { - var mqttConfiguration = new MqttConfiguration + private readonly TaskCompletionSource _connectCompletion = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _connected = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _publishedMessage = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly TaskCompletionSource _subscribedTopic = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly Mock _mqttClient = new(); + private readonly Mock _mqttFactory = new(); + private readonly Mock _mqttClientOptionsFactory = new(); + private readonly Mock> _mqttConfigurationOptions = new(); + private int _connectAttempts; + private int _connectFailuresBeforeSuccess; + + public AssuredMqttConnectionSetup() { - Host = "localhost", - UserName = "id" - }; + var mqttConfiguration = new MqttConfiguration + { + Host = "localhost", + UserName = "id" + }; - configuration?.Invoke(mqttConfiguration); + var clientOptions = new MqttClientOptionsBuilder() + .WithTcpServer(mqttConfiguration.Host, mqttConfiguration.Port) + .Build(); - mockOptions.SetupGet(o => o.Value) - .Returns(() => mqttConfiguration) - .Verifiable(Times.Once); + _mqttConfigurationOptions.SetupGet(o => o.Value) + .Returns(mqttConfiguration); + + _mqttClientOptionsFactory.Setup(f => f.CreateClientOptions(mqttConfiguration)) + .Returns(clientOptions); + + _mqttFactory.Setup(f => f.CreateMqttClient()) + .Returns(_mqttClient.Object); + + _mqttClient.SetupGet(c => c.IsConnected) + .Returns(() => IsConnected); + + _mqttClient.Setup(c => c.ConnectAsync(clientOptions, It.IsAny())) + .Returns(async () => + { + Interlocked.Increment(ref _connectAttempts); + if (Interlocked.CompareExchange(ref _connectFailuresBeforeSuccess, 0, 0) > 0) + { + Interlocked.Decrement(ref _connectFailuresBeforeSuccess); + throw new InvalidOperationException("MQTT broker is not ready yet."); + } + + await _connectCompletion.Task; + IsConnected = true; + _connected.TrySetResult(); + return new MqttClientConnectResult + { + ResultCode = MqttClientConnectResultCode.Success + }; + }); + + _mqttClient.Setup(c => c.PublishAsync(It.IsAny(), It.IsAny())) + .Callback((message, _) => + { + PublishedMessages.Add(message); + _publishedMessage.TrySetResult(message); + }) + .ReturnsAsync(() => new MqttClientPublishResult(null, MqttClientPublishReasonCode.Success, string.Empty, [])); + + _mqttClient.Setup(c => c.SubscribeAsync(It.IsAny(), It.IsAny())) + .Callback((options, _) => + { + foreach (var topic in options.TopicFilters.Select(topicFilter => topicFilter.Topic)) + { + SubscribedTopics.Add(topic); + _subscribedTopic.TrySetResult(topic); + } + }) + .ReturnsAsync(() => new MqttClientSubscribeResult(1, [], string.Empty, [])); + } + + public bool IsConnected { get; private set; } + + public int ConnectAttempts => _connectAttempts; + + public List PublishedMessages { get; } = []; + + public List SubscribedTopics { get; } = []; + + public AssuredMqttConnection CreateConnection() + { + return new AssuredMqttConnection( + new Mock>().Object, + _mqttClientOptionsFactory.Object, + _mqttFactory.Object, + _mqttConfigurationOptions.Object); + } + + public void CompleteConnect() + { + _connectCompletion.TrySetResult(); + } + + public void FailConnectAttempts(int count) + { + _connectFailuresBeforeSuccess = count; + } + + public async Task WaitForConnectedAsync(TimeSpan? timeout = null) + { + await _connected.Task.WaitAsync(timeout ?? DefaultWaitTimeout); + } + + public async Task WaitForPublishedMessageAsync(TimeSpan? timeout = null) + { + return await _publishedMessage.Task.WaitAsync(timeout ?? DefaultWaitTimeout); + } + + public async Task WaitForSubscribedTopicAsync(TimeSpan? timeout = null) + { + return await _subscribedTopic.Task.WaitAsync(timeout ?? DefaultWaitTimeout); + } } } diff --git a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MessageSenderTests.cs b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MessageSenderTests.cs index bff7bf427..84d4e2bca 100644 --- a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MessageSenderTests.cs +++ b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MessageSenderTests.cs @@ -1,4 +1,5 @@ -using MQTTnet.Protocol; +using MQTTnet; +using MQTTnet.Protocol; using NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests.TestHelpers; namespace NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests; @@ -13,7 +14,7 @@ public async Task TopicAndPayloadAreSet() await mqttSetup.MessageSender.SendMessageAsync("topic", "payload", true, MqttQualityOfServiceLevel.AtMostOnce); var publishedMessage = mqttSetup.LastPublishedMessage; - var payloadAsText = System.Text.Encoding.Default.GetString(publishedMessage.PayloadSegment.Array ?? []); + var payloadAsText = publishedMessage.ConvertPayloadToString(); publishedMessage.Topic.Should().Be("topic"); payloadAsText.Should().Be("payload"); @@ -73,4 +74,4 @@ public async Task CanUnsetPersist() publishedMessage.Retain.Should().BeFalse(); } -} \ No newline at end of file +} diff --git a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MessageSubscriberTests.cs b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MessageSubscriberTests.cs new file mode 100644 index 000000000..0d1172afc --- /dev/null +++ b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MessageSubscriberTests.cs @@ -0,0 +1,46 @@ +using NetDaemon.Extensions.MqttEntityManager; +using NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests.TestHelpers; + +namespace NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests; + +public class MessageSubscriberTests +{ + [Fact] + public async Task SubscribeStoresTopicFilter() + { + var connection = new FakeAssuredMqttConnection(); + var subscriber = new MessageSubscriber(new Mock>().Object, connection); + + await subscriber.SubscribeTopicAsync("homeassistant/domain/sensor/set"); + + connection.Subscriptions.Should().ContainSingle(); + connection.Subscriptions[0].Topic.Should().Be("homeassistant/domain/sensor/set"); + } + + [Fact] + public async Task ReceivedMessageIsForwardedToMatchingSubscriber() + { + var connection = new FakeAssuredMqttConnection(); + var subscriber = new MessageSubscriber(new Mock>().Object, connection); + var observable = await subscriber.SubscribeTopicAsync("homeassistant/domain/sensor/set"); + var received = observable.FirstAsync().ToTask(); + + await connection.ReceiveAsync("homeassistant/domain/sensor/set", "on"); + + (await received.WaitAsync(TimeSpan.FromSeconds(1))).Should().Be("on"); + } + + [Fact] + public async Task ReceivedMessageIsIgnoredForUnmatchedTopic() + { + var connection = new FakeAssuredMqttConnection(); + var subscriber = new MessageSubscriber(new Mock>().Object, connection); + var observable = await subscriber.SubscribeTopicAsync("homeassistant/domain/sensor/set"); + var received = new List(); + using var subscription = observable.Subscribe(received.Add); + + await connection.ReceiveAsync("homeassistant/domain/other/set", "on"); + + received.Should().BeEmpty(); + } +} diff --git a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MqttClientOptionsFactoryTests.cs b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MqttClientOptionsFactoryTests.cs index 87734511d..c4e9dda33 100644 --- a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MqttClientOptionsFactoryTests.cs +++ b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MqttClientOptionsFactoryTests.cs @@ -1,4 +1,5 @@ -using MQTTnet.Client; +using MQTTnet; +using MQTTnet.Formatter; using NetDaemon.Extensions.MqttEntityManager; namespace NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests; @@ -21,19 +22,20 @@ public void CreatesDefaultConfiguration() mqttClientOptions.Should().NotBeNull(); - mqttClientOptions.ClientOptions.ChannelOptions.Should().NotBeNull(); - mqttClientOptions.ClientOptions.ChannelOptions.Should().BeOfType(); + mqttClientOptions.ProtocolVersion.Should().Be(MqttProtocolVersion.V311); + mqttClientOptions.ChannelOptions.Should().NotBeNull(); + mqttClientOptions.ChannelOptions.Should().BeOfType(); - var mqttClientChannelOptions = (MqttClientTcpOptions)mqttClientOptions.ClientOptions.ChannelOptions; + var mqttClientChannelOptions = (MqttClientTcpOptions)mqttClientOptions.ChannelOptions; var ipEndpoint = (System.Net.DnsEndPoint)mqttClientChannelOptions.RemoteEndpoint; ipEndpoint.Host.Should().Be("broker"); ipEndpoint.Port.Should().Be(1883); - mqttClientOptions.ClientOptions.Credentials.Should().BeNull(); + mqttClientOptions.Credentials.Should().BeNull(); - mqttClientOptions.ClientOptions.ChannelOptions.TlsOptions.UseTls.Should().BeFalse(); - mqttClientOptions.ClientOptions.ChannelOptions.TlsOptions.AllowUntrustedCertificates.Should().BeFalse(); + mqttClientOptions.ChannelOptions.TlsOptions.UseTls.Should().BeFalse(); + mqttClientOptions.ChannelOptions.TlsOptions.AllowUntrustedCertificates.Should().BeFalse(); } [Fact] @@ -49,21 +51,22 @@ public void CreatesDefaultConfigurationWithTls() mqttClientOptions.Should().NotBeNull(); - mqttClientOptions.ClientOptions.ChannelOptions.Should().NotBeNull(); - mqttClientOptions.ClientOptions.ChannelOptions.Should().BeOfType(); + mqttClientOptions.ProtocolVersion.Should().Be(MqttProtocolVersion.V311); + mqttClientOptions.ChannelOptions.Should().NotBeNull(); + mqttClientOptions.ChannelOptions.Should().BeOfType(); - var mqttClientChannelOptions = (MqttClientTcpOptions)mqttClientOptions.ClientOptions.ChannelOptions; + var mqttClientChannelOptions = (MqttClientTcpOptions)mqttClientOptions.ChannelOptions; var ipEndpoint = (System.Net.DnsEndPoint)mqttClientChannelOptions.RemoteEndpoint; ipEndpoint.Host.Should().Be("broker"); ipEndpoint.Port.Should().Be(1883); - mqttClientOptions.ClientOptions.Credentials.Should().BeNull(); + mqttClientOptions.Credentials.Should().BeNull(); - mqttClientOptions.ClientOptions.ChannelOptions.TlsOptions.UseTls.Should().BeTrue(); + mqttClientOptions.ChannelOptions.TlsOptions.UseTls.Should().BeTrue(); // This would only get set to true if it and UseTls are both true - mqttClientOptions.ClientOptions.ChannelOptions.TlsOptions.AllowUntrustedCertificates.Should().BeFalse(); + mqttClientOptions.ChannelOptions.TlsOptions.AllowUntrustedCertificates.Should().BeFalse(); } [Fact] @@ -80,21 +83,22 @@ public void IgnoresTlsCustomizationIfTlsIsntEnabled() mqttClientOptions.Should().NotBeNull(); - mqttClientOptions.ClientOptions.ChannelOptions.Should().NotBeNull(); - mqttClientOptions.ClientOptions.ChannelOptions.Should().BeOfType(); + mqttClientOptions.ProtocolVersion.Should().Be(MqttProtocolVersion.V311); + mqttClientOptions.ChannelOptions.Should().NotBeNull(); + mqttClientOptions.ChannelOptions.Should().BeOfType(); - var mqttClientChannelOptions = (MqttClientTcpOptions)mqttClientOptions.ClientOptions.ChannelOptions; + var mqttClientChannelOptions = (MqttClientTcpOptions)mqttClientOptions.ChannelOptions; var ipEndpoint = (System.Net.DnsEndPoint)mqttClientChannelOptions.RemoteEndpoint; ipEndpoint.Host.Should().Be("broker"); ipEndpoint.Port.Should().Be(1883); - mqttClientOptions.ClientOptions.Credentials.Should().BeNull(); + mqttClientOptions.Credentials.Should().BeNull(); - mqttClientOptions.ClientOptions.ChannelOptions.TlsOptions.UseTls.Should().BeFalse(); + mqttClientOptions.ChannelOptions.TlsOptions.UseTls.Should().BeFalse(); // This would only get set to true if it and UseTls are both true - mqttClientOptions.ClientOptions.ChannelOptions.TlsOptions.AllowUntrustedCertificates.Should().BeFalse(); + mqttClientOptions.ChannelOptions.TlsOptions.AllowUntrustedCertificates.Should().BeFalse(); } [Fact] @@ -114,23 +118,43 @@ public void CreatesFullyCustomizedConfiguration() mqttClientOptions.Should().NotBeNull(); - mqttClientOptions.ClientOptions.ChannelOptions.Should().NotBeNull(); - mqttClientOptions.ClientOptions.ChannelOptions.Should().BeOfType(); + mqttClientOptions.ProtocolVersion.Should().Be(MqttProtocolVersion.V311); + mqttClientOptions.ChannelOptions.Should().NotBeNull(); + mqttClientOptions.ChannelOptions.Should().BeOfType(); - var mqttClientChannelOptions = (MqttClientTcpOptions)mqttClientOptions.ClientOptions.ChannelOptions; + var mqttClientChannelOptions = (MqttClientTcpOptions)mqttClientOptions.ChannelOptions; var ipEndpoint = (System.Net.DnsEndPoint)mqttClientChannelOptions.RemoteEndpoint; ipEndpoint.Host.Should().Be("broker"); ipEndpoint.Port.Should().Be(1234); - mqttClientOptions.ClientOptions.Credentials.Should().NotBeNull(); - mqttClientOptions.ClientOptions.Credentials.Should().BeOfType(); + mqttClientOptions.Credentials.Should().NotBeNull(); + mqttClientOptions.Credentials.Should().BeOfType(); - mqttClientOptions.ClientOptions.Credentials.GetUserName(mqttClientOptions.ClientOptions).Should().Be("testuser"); - mqttClientOptions.ClientOptions.Credentials.GetPassword(mqttClientOptions.ClientOptions).Should().BeEquivalentTo(Encoding.UTF8.GetBytes("testpassword")); + mqttClientOptions.Credentials.GetUserName(mqttClientOptions).Should().Be("testuser"); + mqttClientOptions.Credentials.GetPassword(mqttClientOptions).Should().BeEquivalentTo(Encoding.UTF8.GetBytes("testpassword")); - mqttClientOptions.ClientOptions.ChannelOptions.TlsOptions.UseTls.Should().BeTrue(); - mqttClientOptions.ClientOptions.ChannelOptions.TlsOptions.AllowUntrustedCertificates.Should().BeTrue(); + mqttClientOptions.ChannelOptions.TlsOptions.UseTls.Should().BeTrue(); + mqttClientOptions.ChannelOptions.TlsOptions.AllowUntrustedCertificates.Should().BeTrue(); + } + + [Theory] + [InlineData("testuser", null)] + [InlineData(null, "testpassword")] + [InlineData("", "testpassword")] + [InlineData("testuser", "")] + public void DoesNotSetCredentialsUnlessUserNameAndPasswordAreProvided(string? userName, string? password) + { + var mqttConfiguration = new MqttConfiguration + { + Host = "broker", + UserName = userName, + Password = password + }; + + var mqttClientOptions = MqttClientOptionsFactory.CreateClientOptions(mqttConfiguration); + + mqttClientOptions.Credentials.Should().BeNull(); } [Fact] diff --git a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MqttEntityManagerTester.cs b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MqttEntityManagerTester.cs index b3de546a3..9e99fbbd9 100644 --- a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MqttEntityManagerTester.cs +++ b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MqttEntityManagerTester.cs @@ -1,4 +1,5 @@ -using NetDaemon.Extensions.MqttEntityManager; +using MQTTnet; +using NetDaemon.Extensions.MqttEntityManager; using NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests.TestHelpers; namespace NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests; @@ -22,7 +23,7 @@ public async Task CreateWithNoOptionsSetsBaseConfig() var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions()); await entityManager.CreateAsync("domain.sensor"); - var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []); + var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString()); payload?.Count.Should().Be(6); payload?["name"].ToString().Should().Be("sensor"); @@ -40,7 +41,7 @@ public async Task CreateWithDefaultOptionsSetsBaseConfig() var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions()); await entityManager.CreateAsync("domain.sensor", new EntityCreationOptions()); - var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []); + var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString()); payload?.Count.Should().Be(6); payload?["name"].ToString().Should().Be("sensor"); @@ -58,7 +59,7 @@ public async Task CreateCanSetUniqueId() var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions()); await entityManager.CreateAsync("domain.sensor", new EntityCreationOptions(UniqueId: "my_id")); - var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []); + var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString()); payload?["unique_id"].ToString().Should().Be("my_id"); } @@ -70,7 +71,7 @@ public async Task CreateSetsDefaultEntityId() var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions()); await entityManager.CreateAsync("domain.the_id"); - var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []); + var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString()); payload?["default_entity_id"].ToString().Should().Be("domain.the_id"); } @@ -82,7 +83,7 @@ public async Task CreateCanSetDeviceClass() var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions()); await entityManager.CreateAsync("domain.sensor", new EntityCreationOptions(DeviceClass: "classy")); - var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []); + var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString()); payload?["device_class"].ToString().Should().Be("classy"); } @@ -94,7 +95,7 @@ public async Task CreateCanSetName() var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions()); await entityManager.CreateAsync("domain.sensor", new EntityCreationOptions(Name: "george")); - var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []); + var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString()); payload?["name"].ToString().Should().Be("george"); } @@ -130,7 +131,7 @@ public async Task CreateCanSetAdditionalOptions() var otherOptions = new { sub_class = "lights", up_state = "live" }; await entityManager.CreateAsync("domain.sensor", additionalConfig: otherOptions); - var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []); + var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString()); payload?["sub_class"].ToString().Should().Be("lights"); payload?["up_state"].ToString().Should().Be("live"); @@ -145,7 +146,7 @@ public async Task CreateCanOverrideBaseConfig() var otherOptions = new { command_topic = "my/topic" }; await entityManager.CreateAsync("domain.sensor", additionalConfig: otherOptions); - var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []); + var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString()); payload?["command_topic"].ToString().Should().Be("my/topic"); } @@ -157,7 +158,7 @@ public async Task CreateAvailabilityTopicOffByDefault() var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions()); await entityManager.CreateAsync("domain.sensor"); - var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []); + var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString()); payload?.ContainsKey("availability_topic").Should().BeFalse(); } @@ -169,7 +170,7 @@ public async Task CreateAvailabilityTopicSetForAvailUp() var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions()); await entityManager.CreateAsync("domain.sensor", new EntityCreationOptions(PayloadAvailable: "up")); - var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []); + var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString()); payload?.ContainsKey("availability_topic").Should().BeTrue(); payload?["availability_topic"].ToString().Should().Be("homeassistant/domain/sensor/availability"); @@ -183,7 +184,7 @@ public async Task CreateAvailabilityTopicSetForAvailDown() var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions()); await entityManager.CreateAsync("domain.sensor", new EntityCreationOptions(PayloadNotAvailable: "down")); - var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []); + var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString()); payload?.ContainsKey("availability_topic").Should().BeTrue(); payload?["availability_topic"].ToString().Should().Be("homeassistant/domain/sensor/availability"); @@ -198,7 +199,7 @@ public async Task CanRemove() await entityManager.RemoveAsync("domain.sensor"); - mqttSetup.LastPublishedMessage.PayloadSegment.Should().BeEmpty(); + mqttSetup.LastPublishedMessage.ConvertPayloadToString().Should().BeNullOrEmpty(); } [Fact] @@ -208,7 +209,7 @@ public async Task CanSetState() var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions()); await entityManager.SetStateAsync("domain.sensor", "NewState"); - var payload = Encoding.Default.GetString(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []); + var payload = mqttSetup.LastPublishedMessage.ConvertPayloadToString(); mqttSetup.LastPublishedMessage.Topic.Should().Be("homeassistant/domain/sensor/state"); payload.Should().Be("NewState"); @@ -223,7 +224,7 @@ public async Task CanSetStateToBlank() await entityManager.SetStateAsync("domain.sensor", ""); mqttSetup.LastPublishedMessage.Topic.Should().Be("homeassistant/domain/sensor/state"); - mqttSetup.LastPublishedMessage.PayloadSegment.Should().BeEmpty(); + mqttSetup.LastPublishedMessage.ConvertPayloadToString().Should().BeNullOrEmpty(); } [Fact] @@ -234,7 +235,7 @@ public async Task CanSetAttributes() var attributes = new { colour = "purple", ziggy = "stardust" }; await entityManager.SetAttributesAsync("domain.sensor", attributes); - var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []); + var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString()); mqttSetup.LastPublishedMessage.Topic.Should().Be("homeassistant/domain/sensor/attributes"); payload?["colour"].ToString().Should().Be("purple"); @@ -248,18 +249,16 @@ public async Task CanSetAvailability() var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions()); await entityManager.SetAvailabilityAsync("domain.sensor", "up"); - var payload = Encoding.Default.GetString(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []); + var payload = mqttSetup.LastPublishedMessage.ConvertPayloadToString(); mqttSetup.LastPublishedMessage.Topic.Should().Be("homeassistant/domain/sensor/availability"); payload.Should().Be("up"); } - private static Dictionary? PayloadToDictionary(byte[] payload) + private static Dictionary? PayloadToDictionary(string payload) { - return JsonSerializer.Deserialize>( - Encoding.Default.GetString(payload) - ); + return JsonSerializer.Deserialize>(payload); } private static IOptions GetOptions() diff --git a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/TestHelpers/MockMqttMessageSenderSetup.cs b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/TestHelpers/MockMqttMessageSenderSetup.cs index 22fb686bf..eac30f1fc 100644 --- a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/TestHelpers/MockMqttMessageSenderSetup.cs +++ b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/TestHelpers/MockMqttMessageSenderSetup.cs @@ -1,78 +1,71 @@ -using MQTTnet; -using MQTTnet.Extensions.ManagedClient; +using MQTTnet; +using MQTTnet.Packets; using NetDaemon.Extensions.MqttEntityManager; -using NetDaemon.Extensions.MqttEntityManager.Helpers; namespace NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests.TestHelpers; /// -/// These are helpers that set up mock mqtt factories and connection, and allow a message to be sent through -/// the mock client and capture the message that would have been published +/// Test helper that captures MQTT messages sent by the entity manager. /// internal sealed class MockMqttMessageSenderSetup { - public AssuredMqttConnection Connection { get; private set; } = null!; - public Mock MqttClient { get; private set; } = null!; - public Mock MqttClientOptionsFactory { get; private set; } = null!; - - public MqttFactoryWrapper MqttFactory { get; private set; } = null!; + public FakeAssuredMqttConnection Connection { get; } = new(); public MessageSender MessageSender { get; private set; } = null!; - public MqttApplicationMessage LastPublishedMessage { get; set; } = null!; + public MqttApplicationMessage LastPublishedMessage => Connection.LastPublishedMessage; public MockMqttMessageSenderSetup() { - SetupMockMqtt(); SetupMessageSender(); - SetupMessageReceiver(); } - // ReSharper disable once MemberCanBePrivate.Global - public void SetupMessageReceiver() + private void SetupMessageSender() { - // Ensure that when the MQTT Client is called its published message is saved - MqttClient.Setup(m => m.EnqueueAsync(It.IsAny())) - .Callback(message => - { - LastPublishedMessage = message; - }); + var logger = new Mock>().Object; + MessageSender = new MessageSender(logger, Connection); } +} - /// - /// Get a mocked MQTT client, factor and connection - /// - /// - private void SetupMockMqtt() - { - var mqttConfiguration = new MqttConfiguration - { - Host = "localhost", - UserName = "id" - }; +internal sealed class FakeAssuredMqttConnection : IAssuredMqttConnection +{ + private readonly List _subscriptions = []; - var options = new Mock>(); + public event Func? ApplicationMessageReceivedAsync; - options.Setup(o => o.Value) - .Returns(() => mqttConfiguration); + public IReadOnlyList Subscriptions => _subscriptions; - MqttClient = new Mock(); - MqttClientOptionsFactory = new Mock(); - MqttFactory = new MqttFactoryWrapper(MqttClient.Object); + public MqttApplicationMessage LastPublishedMessage { get; private set; } = null!; - MqttClientOptionsFactory - .Setup(o => o.CreateClientOptions(mqttConfiguration)) - .Returns(new ManagedMqttClientOptions()); + public Task PublishAsync(MqttApplicationMessage message) + { + LastPublishedMessage = message; + return Task.CompletedTask; + } - Connection = new AssuredMqttConnection( - new Mock>().Object, - MqttClientOptionsFactory.Object, - MqttFactory, - options.Object); + public Task SubscribeAsync(MqttTopicFilter topicFilter) + { + _subscriptions.Add(topicFilter); + return Task.CompletedTask; } - private void SetupMessageSender() + public Task ReceiveAsync(string topic, string payload) { - var logger = new Mock>().Object; - MessageSender = new MessageSender(logger, Connection); + var message = new MqttApplicationMessageBuilder() + .WithTopic(topic) + .WithPayload(Encoding.UTF8.GetBytes(payload)) + .Build(); + + var publishPacket = new MqttPublishPacket + { + Topic = topic, + PayloadSegment = new ArraySegment(Encoding.UTF8.GetBytes(payload)) + }; + + var eventArgs = new MqttApplicationMessageReceivedEventArgs( + "test", + message, + publishPacket, + (_, _) => Task.CompletedTask); + return ApplicationMessageReceivedAsync?.Invoke(eventArgs) ?? Task.CompletedTask; } } diff --git a/src/Client/NetDaemon.HassClient.Tests/NetDaemon.HassClient.Tests.csproj b/src/Client/NetDaemon.HassClient.Tests/NetDaemon.HassClient.Tests.csproj index 0d4a98e16..54e4d2074 100644 --- a/src/Client/NetDaemon.HassClient.Tests/NetDaemon.HassClient.Tests.csproj +++ b/src/Client/NetDaemon.HassClient.Tests/NetDaemon.HassClient.Tests.csproj @@ -1,7 +1,7 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive @@ -9,18 +9,18 @@ - + runtime; build; native; contentfiles; analyzers; buildtransitive all - + runtime; build; native; contentfiles; analyzers; buildtransitive all - + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/src/Client/NetDaemon.HassClient/NetDaemon.Client.csproj b/src/Client/NetDaemon.HassClient/NetDaemon.Client.csproj index ee209ab93..8d4ed78e5 100644 --- a/src/Client/NetDaemon.HassClient/NetDaemon.Client.csproj +++ b/src/Client/NetDaemon.HassClient/NetDaemon.Client.csproj @@ -38,11 +38,11 @@ - - - - - + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/Extensions/NetDaemon.Extensions.Logging/NetDaemon.Extensions.Logging.csproj b/src/Extensions/NetDaemon.Extensions.Logging/NetDaemon.Extensions.Logging.csproj index ce78d5c17..0cb16d7ac 100644 --- a/src/Extensions/NetDaemon.Extensions.Logging/NetDaemon.Extensions.Logging.csproj +++ b/src/Extensions/NetDaemon.Extensions.Logging/NetDaemon.Extensions.Logging.csproj @@ -30,9 +30,9 @@ - - - + + + diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/AssuredMqttConnection.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/AssuredMqttConnection.cs index 882ea9a68..90a9af9c2 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/AssuredMqttConnection.cs +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/AssuredMqttConnection.cs @@ -1,24 +1,37 @@ -using System.Globalization; -using System.Text; +using System.Collections.Concurrent; +using System.Globalization; +using System.Threading.Channels; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using MQTTnet.Client; -using MQTTnet.Extensions.ManagedClient; -using NetDaemon.Extensions.MqttEntityManager.Exceptions; -using NetDaemon.Extensions.MqttEntityManager.Helpers; +using MQTTnet; +using MQTTnet.Packets; namespace NetDaemon.Extensions.MqttEntityManager; /// -/// Wrapper to assure an MQTT connection +/// Wrapper to assure an MQTT connection. /// internal class AssuredMqttConnection : IAssuredMqttConnection, IDisposable { + private static readonly TimeSpan ReconnectDelay = TimeSpan.FromSeconds(5); + private static readonly TimeSpan ConnectedPollDelay = TimeSpan.FromSeconds(1); + private readonly ILogger _logger; - private readonly IMqttClientOptionsFactory _mqttClientOptionsFactory; + private readonly IMqttClient _mqttClient; + private readonly MqttClientOptions _clientOptions; + private readonly Channel _publishQueue = Channel.CreateUnbounded( + new UnboundedChannelOptions + { + SingleReader = true, + SingleWriter = false + }); + private readonly ConcurrentDictionary _subscriptions = new(); + private readonly CancellationTokenSource _stopping = new(); + private readonly SemaphoreSlim _connectionSignal = new(0, 1); private readonly Task _connectionTask; - private IManagedMqttClient? _mqttClient; + private readonly Task _publishTask; private bool _disposed; + private volatile bool _hasConnected; /// /// Initializes a new instance of the class. @@ -30,61 +43,186 @@ internal class AssuredMqttConnection : IAssuredMqttConnection, IDisposable public AssuredMqttConnection( ILogger logger, IMqttClientOptionsFactory mqttClientOptionsFactory, - IMqttFactoryWrapper mqttFactory, + IMqttFactory mqttFactory, IOptions mqttConfig) { _logger = logger; - _mqttClientOptionsFactory = mqttClientOptionsFactory; + _logger.LogTrace("MQTT initiating connection"); - _connectionTask = Task.Run(() => ConnectAsync(mqttConfig.Value, mqttFactory)); + _clientOptions = mqttClientOptionsFactory.CreateClientOptions(mqttConfig.Value); + _mqttClient = mqttFactory.CreateMqttClient(); + + _mqttClient.ConnectedAsync += MqttClientOnConnectedAsync; + _mqttClient.DisconnectedAsync += MqttClientOnDisconnectedAsync; + _mqttClient.ApplicationMessageReceivedAsync += MqttClientOnApplicationMessageReceivedAsync; + + _connectionTask = Task.Run(() => MaintainConnectionAsync(_stopping.Token)); + _publishTask = Task.Run(() => PublishQueuedMessagesAsync(_stopping.Token)); } - /// - /// Ensures that the MQTT client is available - /// - /// - /// Timed out while waiting for connection - public async Task GetClientAsync() + /// + public event Func? ApplicationMessageReceivedAsync; + + /// + public async Task PublishAsync(MqttApplicationMessage message) { - await _connectionTask; + ObjectDisposedException.ThrowIf(_disposed, this); + await _publishQueue.Writer.WriteAsync(message, _stopping.Token).ConfigureAwait(false); + } - return _mqttClient ?? throw new MqttConnectionException("Unable to create MQTT connection"); + /// + public async Task SubscribeAsync(MqttTopicFilter topicFilter) + { + ObjectDisposedException.ThrowIf(_disposed, this); + + if (string.IsNullOrEmpty(topicFilter.Topic)) + { + throw new ArgumentException("MQTT topic filter must specify a topic.", nameof(topicFilter)); + } + + _subscriptions[topicFilter.Topic] = topicFilter; + + if (_mqttClient.IsConnected) + { + await SubscribeOnClientAsync(topicFilter, _stopping.Token).ConfigureAwait(false); + } } - private async Task ConnectAsync(MqttConfiguration mqttConfig, IMqttFactoryWrapper mqttFactory) + private async Task MaintainConnectionAsync(CancellationToken cancellationToken) { - _logger.LogTrace("Connecting to MQTT broker at {Host}:{Port}/{UserName}", - mqttConfig.Host, mqttConfig.Port, mqttConfig.UserName); + while (!cancellationToken.IsCancellationRequested) + { + try + { + if (_mqttClient.IsConnected) + { + await Task.Delay(ConnectedPollDelay, cancellationToken).ConfigureAwait(false); + continue; + } - var clientOptions = _mqttClientOptionsFactory.CreateClientOptions(mqttConfig); + _logger.LogTrace("Connecting to MQTT broker at {Host}:{Port}/{UserName}", + GetConfiguredHost(), GetConfiguredPort(), _clientOptions.Credentials?.GetUserName(_clientOptions)); - _mqttClient = mqttFactory.CreateManagedMqttClient(); + var connectResult = await _mqttClient.ConnectAsync(_clientOptions, cancellationToken).ConfigureAwait(false); - _mqttClient.ConnectedAsync += MqttClientOnConnectedAsync; - _mqttClient.DisconnectedAsync += MqttClientOnDisconnectedAsync; + if (connectResult.ResultCode != MqttClientConnectResultCode.Success) + { + _logger.LogTrace("MQTT connection rejected: {ResultCode} {ReasonString}", + connectResult.ResultCode, connectResult.ReasonString); + await Task.Delay(ReconnectDelay, cancellationToken).ConfigureAwait(false); + continue; + } + + _logger.LogTrace("MQTT client is ready"); + _hasConnected = true; + SignalConnection(); + await ResubscribeAsync(cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + break; + } + catch (Exception ex) + { + _logger.LogTrace(ex, "MQTT connection attempt failed"); + await DelayReconnectAsync(cancellationToken).ConfigureAwait(false); + } + } + } + + private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken) + { + await foreach (var message in _publishQueue.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) + { + while (!cancellationToken.IsCancellationRequested) + { + try + { + if (!_mqttClient.IsConnected) + { + await WaitForConnectionAsync(cancellationToken).ConfigureAwait(false); + continue; + } - await _mqttClient.StartAsync(clientOptions); + await _mqttClient.PublishAsync(message, cancellationToken).ConfigureAwait(false); + break; + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + break; + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Failed to publish MQTT message. The message will be retried after reconnect."); + await WaitForConnectionAsync(cancellationToken).ConfigureAwait(false); + } + } + } + } - _logger.LogTrace("MQTT client is ready"); + private async Task ResubscribeAsync(CancellationToken cancellationToken) + { + foreach (var topicFilter in _subscriptions.Values) + { + await SubscribeOnClientAsync(topicFilter, cancellationToken).ConfigureAwait(false); + } + } + + private async Task SubscribeOnClientAsync(MqttTopicFilter topicFilter, CancellationToken cancellationToken) + { + var subscribeOptions = new MqttClientSubscribeOptionsBuilder() + .WithTopicFilter(topicFilter) + .Build(); + + await _mqttClient.SubscribeAsync(subscribeOptions, cancellationToken).ConfigureAwait(false); } private Task MqttClientOnDisconnectedAsync(MqttClientDisconnectedEventArgs arg) { - _logger.LogDebug("MQTT disconnected: {Reason}", BuildErrorResponse(arg)); + if (_disposed || _stopping.IsCancellationRequested) + { + return Task.CompletedTask; + } + + if (_hasConnected) + { + _logger.LogDebug("MQTT disconnected: {Reason}", BuildErrorResponse(arg)); + } + else + { + _logger.LogTrace("MQTT disconnected before the initial connection completed: {Reason}", BuildErrorResponse(arg)); + } + return Task.CompletedTask; } private Task MqttClientOnConnectedAsync(MqttClientConnectedEventArgs arg) { + _hasConnected = true; _logger.LogDebug("MQTT connected: {ResultCode}", arg.ConnectResult.ResultCode); + SignalConnection(); return Task.CompletedTask; } + private async Task MqttClientOnApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) + { + var handlers = ApplicationMessageReceivedAsync; + if (handlers is null) + { + return; + } + + foreach (Func handler in handlers.GetInvocationList()) + { + await handler(arg).ConfigureAwait(false); + } + } + private static string BuildErrorResponse(MqttClientDisconnectedEventArgs arg) { - var sb = new StringBuilder(); + var sb = new System.Text.StringBuilder(); - sb.AppendLine(CultureInfo.InvariantCulture, $"{arg.Exception?.Message} ({arg.Reason})"); // Note: arg.ReasonString is always null + sb.AppendLine(CultureInfo.InvariantCulture, $"{arg.Exception?.Message} ({arg.Reason})"); var ex = arg.Exception?.InnerException; while (ex != null) { @@ -95,14 +233,82 @@ private static string BuildErrorResponse(MqttClientDisconnectedEventArgs arg) return sb.ToString(); } + private void SignalConnection() + { + if (_connectionSignal.CurrentCount == 0) + { + _connectionSignal.Release(); + } + } + + private async Task WaitForConnectionAsync(CancellationToken cancellationToken) + { + if (_mqttClient.IsConnected) + { + return; + } + + await _connectionSignal.WaitAsync(cancellationToken).ConfigureAwait(false); + } + + private static async Task DelayReconnectAsync(CancellationToken cancellationToken) + { + try + { + await Task.Delay(ReconnectDelay, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + } + } + + private string? GetConfiguredHost() + { + return (_clientOptions.ChannelOptions as MqttClientTcpOptions)?.RemoteEndpoint switch + { + System.Net.DnsEndPoint dnsEndPoint => dnsEndPoint.Host, + System.Net.IPEndPoint ipEndPoint => ipEndPoint.Address.ToString(), + _ => null + }; + } + + private int? GetConfiguredPort() + { + return (_clientOptions.ChannelOptions as MqttClientTcpOptions)?.RemoteEndpoint switch + { + System.Net.DnsEndPoint dnsEndPoint => dnsEndPoint.Port, + System.Net.IPEndPoint ipEndPoint => ipEndPoint.Port, + _ => null + }; + } + + /// public void Dispose() { if (_disposed) + { return; + } _disposed = true; _logger.LogTrace("MQTT disconnecting"); - _connectionTask?.Dispose(); - _mqttClient?.Dispose(); + _stopping.Cancel(); + _publishQueue.Writer.TryComplete(); + + try + { + Task.WaitAll([_connectionTask, _publishTask], TimeSpan.FromSeconds(1)); + } + catch (AggregateException) + { + } + + _stopping.Dispose(); + _connectionSignal.Dispose(); + + if (_mqttClient is IDisposable disposable) + { + disposable.Dispose(); + } } } diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/DependencyInjectionSetup.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/DependencyInjectionSetup.cs index 0e7f95b11..2addf8dfe 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/DependencyInjectionSetup.cs +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/DependencyInjectionSetup.cs @@ -2,8 +2,6 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; -using NetDaemon.Extensions.MqttEntityManager.Helpers; - #endregion namespace NetDaemon.Extensions.MqttEntityManager; @@ -16,8 +14,8 @@ public static class DependencyInjectionSetup /// /// Add support for managing entities via MQTT /// - /// - /// + /// The host builder. + /// The configured host builder. public static IHostBuilder UseNetDaemonMqttEntityManagement(this IHostBuilder hostBuilder) { return hostBuilder.ConfigureServices((_, services) => @@ -29,11 +27,12 @@ public static IHostBuilder UseNetDaemonMqttEntityManagement(this IHostBuilder ho /// /// Add support for managing entities via MQTT /// + /// The service collection. + /// The configured service collection. public static IServiceCollection AddNetDaemonMqttEntityManagement(this IServiceCollection services) { services.AddSingleton(); services.AddSingleton(); - services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Exceptions/MqttConnectionException.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Exceptions/MqttConnectionException.cs index c8a5a3f64..68cd19de3 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Exceptions/MqttConnectionException.cs +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Exceptions/MqttConnectionException.cs @@ -8,15 +8,15 @@ public class MqttConnectionException : Exception /// /// MQTT connection failed /// - /// + /// The exception message. public MqttConnectionException(string msg) : base(msg) {} /// /// MQTT connection failed /// - /// - /// + /// The exception message. + /// The inner exception. public MqttConnectionException(string msg, Exception innerException) : base(msg, innerException) {} diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Exceptions/MqttPublishException.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Exceptions/MqttPublishException.cs index b5c9e62c3..b1a2cef4f 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Exceptions/MqttPublishException.cs +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Exceptions/MqttPublishException.cs @@ -8,15 +8,15 @@ public class MqttPublishException : Exception /// /// Failed to publish a message to MQTT /// - /// + /// The exception message. public MqttPublishException(string msg) : base(msg) {} /// /// Failed to publish a message to MQTT /// - /// - /// + /// The exception message. + /// The inner exception. public MqttPublishException(string msg, Exception innerException) : base(msg, innerException) {} diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/ByteArrayHelper.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/ByteArrayHelper.cs index cf9a79a76..78c6b126c 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/ByteArrayHelper.cs +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/ByteArrayHelper.cs @@ -10,8 +10,8 @@ internal static class ByteArrayHelper /// /// Convert a byte array to a string, or to an empty string if the array is not valid UTF8 /// - /// - /// + /// The byte array to convert. + /// The UTF-8 string, or an empty string when the array is null, empty, or invalid. public static string SafeToString(byte[]? array) { try @@ -26,4 +26,4 @@ public static string SafeToString(byte[]? array) return ""; } } -} \ No newline at end of file +} diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/EntityCreationPayloadHelper.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/EntityCreationPayloadHelper.cs index 167587430..167d0ab42 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/EntityCreationPayloadHelper.cs +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/EntityCreationPayloadHelper.cs @@ -12,10 +12,10 @@ internal class EntityCreationPayloadHelper /// /// Merge an optional dynamic set of parameters with the concrete payload /// - /// - /// - /// - /// + /// The base MQTT discovery payload. + /// Optional additional MQTT discovery configuration. + /// The merged MQTT discovery payload JSON. + /// Thrown when the base payload cannot be converted to JSON. internal static string Merge(EntityCreationPayload concreteOptions, object? additionalOptions) { var concreteJson = JsonSerializer.SerializeToNode(concreteOptions)?.AsObject() @@ -29,4 +29,4 @@ internal static string Merge(EntityCreationPayload concreteOptions, object? addi return concreteJson.ToJsonString(); } -} \ No newline at end of file +} diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/EntityIdParser.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/EntityIdParser.cs index cec33c174..800791e29 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/EntityIdParser.cs +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/EntityIdParser.cs @@ -9,7 +9,7 @@ internal static class EntityIdParser /// Extract the domain and identifier from an entity ID string /// /// Entity ID in the format "domain.identifier" - /// + /// The parsed domain and identifier. /// If entityId is not supplied or is an invalid /// format public static (string domain, string identifier) Extract(string entityId) diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/IMqttFactoryWrapper.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/IMqttFactoryWrapper.cs deleted file mode 100644 index edc2753ed..000000000 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/IMqttFactoryWrapper.cs +++ /dev/null @@ -1,15 +0,0 @@ -using MQTTnet.Extensions.ManagedClient; - -namespace NetDaemon.Extensions.MqttEntityManager.Helpers; - -/// -/// Testable wrapper around IMqttFactory -/// -internal interface IMqttFactoryWrapper -{ - /// - /// Return a managed MQTT client, either from the original factory or a pre-supplied one - /// - /// - IManagedMqttClient CreateManagedMqttClient(); -} \ No newline at end of file diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/JsonNodeExtensions.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/JsonNodeExtensions.cs index 7b3ad1464..401e63c89 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/JsonNodeExtensions.cs +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/JsonNodeExtensions.cs @@ -12,8 +12,8 @@ internal static class JsonNodeExtensions /// For a given JsonObject, merge a second set of values, replacing any pre-existing properties in /// the target object /// - /// - /// + /// The target JSON object. + /// The JSON object to merge into the target. public static void AddRange(this JsonObject target, JsonObject? toMerge) { if (toMerge == null) diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/MqttFactoryWrapper.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/MqttFactoryWrapper.cs deleted file mode 100644 index 9adf6dad2..000000000 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/MqttFactoryWrapper.cs +++ /dev/null @@ -1,40 +0,0 @@ -using MQTTnet.Extensions.ManagedClient; - -namespace NetDaemon.Extensions.MqttEntityManager.Helpers; - -/// -/// Testable wrapper around IMqttFactory -/// -internal class MqttFactoryWrapper : IMqttFactoryWrapper -{ - private readonly IMqttFactory? _mqttFactory; - private readonly IManagedMqttClient? _client; - - /// - /// Standard functionality - set the IMqttFactory that will return a client - /// - /// - public MqttFactoryWrapper(IMqttFactory mqttFactory) - { - _mqttFactory = mqttFactory; - } - - /// - /// Testing functionality - specify a client that will be returned - /// - /// - public MqttFactoryWrapper(IManagedMqttClient client) - { - _client = client; - } - - /// - /// Return a managed MQTT client, either from the original factory or a pre-supplied one - /// - /// - public IManagedMqttClient CreateManagedMqttClient() - { - return _client ?? _mqttFactory?.CreateManagedMqttClient() - ?? throw new InvalidOperationException("No client or MqttFactory specified"); - } -} \ No newline at end of file diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IAssuredMqttConnection.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IAssuredMqttConnection.cs index f68b797c6..b8c5cbf61 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IAssuredMqttConnection.cs +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IAssuredMqttConnection.cs @@ -1,14 +1,25 @@ -using MQTTnet.Extensions.ManagedClient; +using MQTTnet; +using MQTTnet.Packets; namespace NetDaemon.Extensions.MqttEntityManager; /// -/// Wrapper to assure an MQTT connection +/// Wrapper to assure an MQTT connection. /// internal interface IAssuredMqttConnection { /// - /// Ensures that the MQTT client is available + /// Raised when an MQTT application message is received. /// - Task GetClientAsync(); -} \ No newline at end of file + event Func? ApplicationMessageReceivedAsync; + + /// + /// Queue a message to publish to MQTT. + /// + Task PublishAsync(MqttApplicationMessage message); + + /// + /// Subscribe to a topic, retaining the subscription across reconnects. + /// + Task SubscribeAsync(MqttTopicFilter topicFilter); +} diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMessageSender.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMessageSender.cs index b54eb7c1c..22ce79cb9 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMessageSender.cs +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMessageSender.cs @@ -10,10 +10,10 @@ internal interface IMessageSender /// /// Send a message for the given payload to the MQTT topic /// - /// - /// - /// - /// - /// + /// The MQTT topic. + /// The message payload. + /// Whether the message should be retained. + /// The MQTT quality of service level. + /// A task that represents the asynchronous send operation. Task SendMessageAsync(string topic, string payload, bool retain, MqttQualityOfServiceLevel qos); -} \ No newline at end of file +} diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMessageSubscriber.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMessageSubscriber.cs index 7397ea0b9..f1d728e33 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMessageSubscriber.cs +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMessageSubscriber.cs @@ -5,6 +5,7 @@ internal interface IMessageSubscriber /// /// Receive a message from the given topic /// - /// + /// The MQTT topic. + /// An observable that receives payloads for the topic. Task> SubscribeTopicAsync(string topic); -} \ No newline at end of file +} diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttClientOptionsFactory.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttClientOptionsFactory.cs index 180d5e113..9fab990a2 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttClientOptionsFactory.cs +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttClientOptionsFactory.cs @@ -1,4 +1,4 @@ -using MQTTnet.Extensions.ManagedClient; +using MQTTnet; namespace NetDaemon.Extensions.MqttEntityManager; @@ -9,8 +9,8 @@ public interface IMqttClientOptionsFactory { /// /// Creates the client options for MQTT connection from the supplied configuration. - /// /// + /// /// The MQTT configuration. - /// The managed MQTT client options. - ManagedMqttClientOptions CreateClientOptions(MqttConfiguration mqttConfig); + /// The MQTT client options. + MqttClientOptions CreateClientOptions(MqttConfiguration mqttConfig); } diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttEntityManager.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttEntityManager.cs index 93db6f010..36d2195ce 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttEntityManager.cs +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttEntityManager.cs @@ -15,19 +15,25 @@ public interface IMqttEntityManager /// /// Create an entity in Home Assistant via MQTT /// + /// Distinct identifier, in the format "domain.id", such as "sensor.kitchen_temp". + /// Optional entity creation options. + /// Optional additional Home Assistant MQTT discovery configuration. + /// A task that represents the asynchronous create operation. Task CreateAsync(string entityId, EntityCreationOptions? options = null, object? additionalConfig = null); /// /// Remove an entity from Home Assistant /// + /// The entity id. + /// A task that represents the asynchronous remove operation. Task RemoveAsync(string entityId); /// /// Set attributes on an entity /// - /// - /// - /// + /// The entity id. + /// The attributes to set. + /// A task that represents the asynchronous attribute update operation. Task SetAttributesAsync(string entityId, object attributes); /// @@ -35,24 +41,24 @@ public interface IMqttEntityManager /// on creating the entity then the value should match one of these. /// If not, then use "online" and "offline" /// - /// - /// - /// + /// The entity id. + /// The availability payload. + /// A task that represents the asynchronous availability update operation. Task SetAvailabilityAsync(string entityId, string availability); /// /// Set the state of an entity /// - /// - /// - /// + /// The entity id. + /// The state payload. + /// A task that represents the asynchronous state update operation. Task SetStateAsync(string entityId, string state); /// /// Prepare a subscription to command topics for the given entity /// Be sure to chain this request with .Subscribe(...) /// - /// - /// + /// The entity id. + /// An observable that receives command payloads for the entity. Task> PrepareCommandSubscriptionAsync(string entityId); -} \ No newline at end of file +} diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttFactory.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttFactory.cs index 55104237b..99f1c0e03 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttFactory.cs +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttFactory.cs @@ -1,16 +1,15 @@ -using MQTTnet.Extensions.ManagedClient; +using MQTTnet; namespace NetDaemon.Extensions.MqttEntityManager; /// -/// MqttNet removed the IMqttFactory interface at v4 which breaks our DI -/// So this is a factory for the MqttFactory to satisfy our use case +/// Factory abstraction for MQTT clients. /// internal interface IMqttFactory { /// - /// Create a Managed Mqtt Client + /// Create an MQTT client. /// - /// - IManagedMqttClient CreateManagedMqttClient(); -} \ No newline at end of file + /// The MQTT client. + IMqttClient CreateMqttClient(); +} diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MessageSender.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MessageSender.cs index 3c5a1a676..c6853afb9 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MessageSender.cs +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MessageSender.cs @@ -2,9 +2,9 @@ using Microsoft.Extensions.Logging; using MQTTnet; -using MQTTnet.Extensions.ManagedClient; using MQTTnet.Protocol; using NetDaemon.Extensions.MqttEntityManager.Exceptions; +using System.Text; #endregion @@ -21,33 +21,24 @@ internal class MessageSender : IMessageSender /// /// Manage connections and message publishing to MQTT /// - /// - /// + /// The logger. + /// The assured MQTT connection. public MessageSender(ILogger logger, IAssuredMqttConnection assuredMqttConnection) { _logger = logger; _assuredMqttConnection = assuredMqttConnection; } - /// - /// Publish a message to the given topic - /// - /// - /// Json structure of payload - /// - /// + /// public async Task SendMessageAsync(string topic, string payload, bool retain, MqttQualityOfServiceLevel qos) { - var mqttClient = await _assuredMqttConnection.GetClientAsync(); - - await PublishMessage(mqttClient, topic, payload, retain, qos); + await PublishMessage(topic, payload, retain, qos); } - private async Task PublishMessage(IManagedMqttClient mqttClient, string topic, string payload, bool retain, - MqttQualityOfServiceLevel qos) + private async Task PublishMessage(string topic, string payload, bool retain, MqttQualityOfServiceLevel qos) { var message = new MqttApplicationMessageBuilder().WithTopic(topic) - .WithPayload(payload) + .WithPayload(Encoding.UTF8.GetBytes(payload)) .WithRetainFlag(retain) .WithQualityOfServiceLevel(qos) .Build(); @@ -56,7 +47,7 @@ private async Task PublishMessage(IManagedMqttClient mqttClient, string topic, s try { - await mqttClient.EnqueueAsync(message).ConfigureAwait(false); + await _assuredMqttConnection.PublishAsync(message).ConfigureAwait(false); } catch (Exception e) { diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MessageSubscriber.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MessageSubscriber.cs index dc5d16386..06b01d736 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MessageSubscriber.cs +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MessageSubscriber.cs @@ -1,14 +1,10 @@ #region using System.Collections.Concurrent; -using System.Collections.ObjectModel; using System.Reactive.Subjects; using Microsoft.Extensions.Logging; using MQTTnet; -using MQTTnet.Client; -using MQTTnet.Extensions.ManagedClient; using MQTTnet.Packets; -using NetDaemon.Extensions.MqttEntityManager.Helpers; #endregion @@ -30,31 +26,24 @@ internal class MessageSubscriber : IMessageSubscriber, IDisposable /// /// Managed subscriptions to topics within MQTT /// - /// - /// + /// The logger. + /// The assured MQTT connection. public MessageSubscriber(ILogger logger, IAssuredMqttConnection assuredMqttConnection) { _logger = logger; _assuredMqttConnection = assuredMqttConnection; } - /// - /// Subscribe to the given topic - /// - /// + /// public async Task> SubscribeTopicAsync(string topic) { try { - var mqttClient = await _assuredMqttConnection.GetClientAsync(); - await EnsureSubscriptionAsync(mqttClient); + await EnsureSubscriptionAsync(); - var topicFilters = new Collection - { - new MqttTopicFilterBuilder().WithTopic(topic).Build() - }; + var topicFilter = new MqttTopicFilterBuilder().WithTopic(topic).Build(); - await mqttClient.SubscribeAsync(topicFilters); + await _assuredMqttConnection.SubscribeAsync(topicFilter); return _subscribers.GetOrAdd(topic, new Lazy>()).Value; } catch (Exception e) @@ -67,8 +56,7 @@ public async Task> SubscribeTopicAsync(string topic) /// /// If we are not already subscribed to receive messages, set up the handler /// - /// - private async Task EnsureSubscriptionAsync(IManagedMqttClient mqttClient) + private async Task EnsureSubscriptionAsync() { await _subscriptionSetupLock.WaitAsync(); try @@ -76,7 +64,7 @@ private async Task EnsureSubscriptionAsync(IManagedMqttClient mqttClient) if (!_subscriptionIsSetup) { _logger.LogInformation("Configuring message subscription"); - mqttClient.ApplicationMessageReceivedAsync += OnMessageReceivedAsync; + _assuredMqttConnection.ApplicationMessageReceivedAsync += OnMessageReceivedAsync; _subscriptionIsSetup = true; } } @@ -94,13 +82,13 @@ private async Task EnsureSubscriptionAsync(IManagedMqttClient mqttClient) /// /// Message received from MQTT, so find the subscription (if any) and notify them /// - /// - /// + /// The received MQTT application message. + /// A completed task. private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs msg) { try { - var payload = ByteArrayHelper.SafeToString(msg.ApplicationMessage.PayloadSegment.Array ?? []); + var payload = msg.ApplicationMessage.ConvertPayloadToString(); var topic = msg.ApplicationMessage.Topic; _logger.LogTrace("Subscription received {Payload} from {Topic}", payload, topic); @@ -119,6 +107,7 @@ private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs msg) return Task.CompletedTask; } + /// public void Dispose() { if (_isDisposed) return; diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Models/EntityCreationPayload.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Models/EntityCreationPayload.cs index 4c7ab2e3c..ee6761cba 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Models/EntityCreationPayload.cs +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Models/EntityCreationPayload.cs @@ -7,47 +7,83 @@ namespace NetDaemon.Extensions.MqttEntityManager.Models; /// internal class EntityCreationPayload { + /// + /// Gets or sets the display name of the entity. + /// [JsonPropertyName("name")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] public string? Name { get; set; } + /// + /// Gets or sets the Home Assistant device class. + /// [JsonPropertyName("device_class")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] public string? DeviceClass { get; set; } + /// + /// Gets or sets the unique identifier for the entity. + /// [JsonPropertyName("unique_id")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] public string? UniqueId { get; set; } + /// + /// Gets or sets the default entity id. + /// [JsonPropertyName("default_entity_id")] public string? DefaultEntityId { get; set; } + /// + /// Gets or sets the MQTT command topic. + /// [JsonPropertyName("command_topic")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] public string? CommandTopic { get; set; } + /// + /// Gets or sets the MQTT state topic. + /// [JsonPropertyName("state_topic")] public string? StateTopic { get; set; } + /// + /// Gets or sets the MQTT JSON attributes topic. + /// [JsonPropertyName("json_attributes_topic")] public string? JsonAttributesTopic { get; set; } + /// + /// Gets or sets the MQTT availability topic. + /// [JsonPropertyName("availability_topic")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] public string? AvailabilityTopic { get; set; } + /// + /// Gets or sets the payload that marks the entity as available. + /// [JsonPropertyName("payload_available")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] public string? PayloadAvailable { get; set; } + /// + /// Gets or sets the payload that marks the entity as not available. + /// [JsonPropertyName("payload_not_available")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] public string? PayloadNotAvailable { get; set; } + /// + /// Gets or sets the payload that represents the on state. + /// [JsonPropertyName("payload_on")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] public string? PayloadOn { get; set; } + /// + /// Gets or sets the payload that represents the off state. + /// [JsonPropertyName("payload_off")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] public string? PayloadOff { get; set; } diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttClientOptionsFactory.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttClientOptionsFactory.cs index 6589ceaa8..b0f647987 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttClientOptionsFactory.cs +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttClientOptionsFactory.cs @@ -1,4 +1,5 @@ -using MQTTnet.Extensions.ManagedClient; +using MQTTnet; +using MQTTnet.Formatter; namespace NetDaemon.Extensions.MqttEntityManager; @@ -6,7 +7,7 @@ namespace NetDaemon.Extensions.MqttEntityManager; public class MqttClientOptionsFactory : IMqttClientOptionsFactory { /// - public ManagedMqttClientOptions CreateClientOptions(MqttConfiguration mqttConfig) + public MqttClientOptions CreateClientOptions(MqttConfiguration mqttConfig) { ArgumentNullException.ThrowIfNull(mqttConfig); @@ -15,29 +16,25 @@ public ManagedMqttClientOptions CreateClientOptions(MqttConfiguration mqttConfig throw new ArgumentException("Explicit MQTT host configuration was not provided and no suitable broker addon was discovered", nameof(mqttConfig)); } - var clientOptions = new ManagedMqttClientOptionsBuilder() - .WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) - .WithClientOptions(clientOptionsBuilder => - { - clientOptionsBuilder.WithTcpServer(mqttConfig.Host, mqttConfig.Port); + var clientOptionsBuilder = new MqttClientOptionsBuilder() + .WithProtocolVersion(MqttProtocolVersion.V311) + .WithTcpServer(mqttConfig.Host, mqttConfig.Port); - if (!string.IsNullOrEmpty(mqttConfig.UserName) && !string.IsNullOrEmpty(mqttConfig.Password)) - { - clientOptionsBuilder.WithCredentials(mqttConfig.UserName, mqttConfig.Password); - } + if (mqttConfig.UseTls) + { + clientOptionsBuilder.WithTlsOptions(tlsOptionsBuilder => + { + tlsOptionsBuilder + .UseTls() + .WithAllowUntrustedCertificates(mqttConfig.AllowUntrustedCertificates); + }); + } - if (mqttConfig.UseTls) - { - clientOptionsBuilder.WithTlsOptions(tlsOptionsBuilder => - { - tlsOptionsBuilder - .UseTls() - .WithAllowUntrustedCertificates(mqttConfig.AllowUntrustedCertificates); - }); - } - }) - .Build(); + if (!string.IsNullOrEmpty(mqttConfig.UserName) && !string.IsNullOrEmpty(mqttConfig.Password)) + { + clientOptionsBuilder.WithCredentials(mqttConfig.UserName, mqttConfig.Password); + } - return clientOptions; + return clientOptionsBuilder.Build(); } } diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttEntityManager.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttEntityManager.cs index 66058df34..17a21e0d8 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttEntityManager.cs +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttEntityManager.cs @@ -23,14 +23,15 @@ internal class MqttEntityManager : IMqttEntityManager private readonly IMessageSender _messageSender; private readonly IMessageSubscriber _messageSubscriber; + /// public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; } = MqttQualityOfServiceLevel.AtMostOnce; /// /// Manage entities via MQTT /// - /// - /// - /// + /// The MQTT message sender. + /// The MQTT message subscriber. + /// The MQTT configuration options. public MqttEntityManager(IMessageSender messageSender, IMessageSubscriber messageSubscriber, IOptions config) { _messageSender = messageSender; @@ -38,12 +39,7 @@ public MqttEntityManager(IMessageSender messageSender, IMessageSubscriber messag _config = config.Value; } - /// - /// Create an entity in Home Assistant via MQTT - /// - /// Distinct identifier, in the format "domain.id", such as "sensor.kitchen_temp" - /// Optional set of additional parameters - /// + /// public async Task CreateAsync(string entityId, EntityCreationOptions? options = null, object? additionalConfig = null) { @@ -57,10 +53,7 @@ await _messageSender .ConfigureAwait(false); } - /// - /// Remove an entity from Home Assistant - /// - /// + /// public async Task RemoveAsync(string entityId) { var (domain, identifier) = EntityIdParser.Extract(entityId); @@ -69,13 +62,7 @@ await _messageSender .ConfigureAwait(false); } - /// - /// Set the state of an entity - /// - /// - /// - /// - /// + /// public async Task SetStateAsync(string entityId, string state) { var (domain, identifier) = EntityIdParser.Extract(entityId); @@ -84,13 +71,7 @@ await _messageSender.SendMessageAsync(StatePath(domain, identifier), state, true .ConfigureAwait(false); } - /// - /// Set attributes on an entity - /// - /// - /// - /// - /// + /// public async Task SetAttributesAsync(string entityId, object attributes) { var (domain, identifier) = EntityIdParser.Extract(entityId); @@ -100,13 +81,7 @@ await _messageSender.SendMessageAsync(AttrsPath(domain, identifier), JsonSeriali .ConfigureAwait(false); } - /// - /// Set availability of the entity. If you specified "payload_available" and "payload_not_available" configuration - /// on creating the entity then the value should match one of these. - /// If not, then use "online" and "offline" - /// - /// - /// + /// public async Task SetAvailabilityAsync(string entityId, string availability) { var (domain, identifier) = EntityIdParser.Extract(entityId); @@ -116,12 +91,7 @@ await _messageSender .ConfigureAwait(false); } - /// - /// Prepare a subscription to command topics for the given entity - /// Be sure to chain this request with .Subscribe(...) - /// - /// - /// + /// public async Task> PrepareCommandSubscriptionAsync(string entityId) { var (domain, identifier) = EntityIdParser.Extract(entityId); diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttFactoryFactory.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttFactoryFactory.cs index 54bb190a5..e59c760e0 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttFactoryFactory.cs +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttFactoryFactory.cs @@ -1,20 +1,15 @@ -using MQTTnet; -using MQTTnet.Extensions.ManagedClient; +using MQTTnet; namespace NetDaemon.Extensions.MqttEntityManager; /// -/// MqttNet removed the IMqttFactory interface at v4 which breaks our DI -/// So this is a factory for the MqttFactory to satisfy our use case +/// Factory wrapper for MQTTnet's client factory. /// internal class MqttFactoryFactory : IMqttFactory { - /// - /// Create a Managed Mqtt Client - /// - /// - public IManagedMqttClient CreateManagedMqttClient() + /// + public IMqttClient CreateMqttClient() { - return new MqttFactory().CreateManagedMqttClient(); + return new MqttClientFactory().CreateMqttClient(); } -} \ No newline at end of file +} diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/NetDaemon.Extensions.MqttEntityManager.csproj b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/NetDaemon.Extensions.MqttEntityManager.csproj index 83fd0bac3..4b18c03a8 100644 --- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/NetDaemon.Extensions.MqttEntityManager.csproj +++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/NetDaemon.Extensions.MqttEntityManager.csproj @@ -31,13 +31,12 @@ - - - - - - - + + + + + + diff --git a/src/Extensions/NetDaemon.Extensions.Scheduling.Tests/NetDaemon.Extensions.Scheduling.Tests.csproj b/src/Extensions/NetDaemon.Extensions.Scheduling.Tests/NetDaemon.Extensions.Scheduling.Tests.csproj index 6b4f8c1ef..f88749ff3 100644 --- a/src/Extensions/NetDaemon.Extensions.Scheduling.Tests/NetDaemon.Extensions.Scheduling.Tests.csproj +++ b/src/Extensions/NetDaemon.Extensions.Scheduling.Tests/NetDaemon.Extensions.Scheduling.Tests.csproj @@ -1,20 +1,20 @@ - + runtime; build; native; contentfiles; analyzers; buildtransitive all - + runtime; build; native; contentfiles; analyzers; buildtransitive all - + all runtime; build; native; contentfiles; analyzers; buildtransitive - + diff --git a/src/Extensions/NetDaemon.Extensions.Scheduling/NetDaemon.Extensions.Scheduling.csproj b/src/Extensions/NetDaemon.Extensions.Scheduling/NetDaemon.Extensions.Scheduling.csproj index acf145709..311bfb38b 100644 --- a/src/Extensions/NetDaemon.Extensions.Scheduling/NetDaemon.Extensions.Scheduling.csproj +++ b/src/Extensions/NetDaemon.Extensions.Scheduling/NetDaemon.Extensions.Scheduling.csproj @@ -31,9 +31,9 @@ - - - + + + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/src/Extensions/NetDaemon.Extensions.Tts/NetDaemon.Extensions.Tts.csproj b/src/Extensions/NetDaemon.Extensions.Tts/NetDaemon.Extensions.Tts.csproj index 43664eb89..f889ebad3 100644 --- a/src/Extensions/NetDaemon.Extensions.Tts/NetDaemon.Extensions.Tts.csproj +++ b/src/Extensions/NetDaemon.Extensions.Tts/NetDaemon.Extensions.Tts.csproj @@ -29,9 +29,9 @@ - - - + + + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/src/HassModel/NetDaemon.HassModel.CodeGenerator/NetDaemon.HassModel.CodeGenerator.csproj b/src/HassModel/NetDaemon.HassModel.CodeGenerator/NetDaemon.HassModel.CodeGenerator.csproj index 3ae790aa4..4fd08bc26 100644 --- a/src/HassModel/NetDaemon.HassModel.CodeGenerator/NetDaemon.HassModel.CodeGenerator.csproj +++ b/src/HassModel/NetDaemon.HassModel.CodeGenerator/NetDaemon.HassModel.CodeGenerator.csproj @@ -32,14 +32,14 @@ - + runtime; build; native; contentfiles; analyzers; buildtransitive all - + diff --git a/src/HassModel/NetDaemon.HassModel.Integration/NetDaemon.HassModel.Integration.csproj b/src/HassModel/NetDaemon.HassModel.Integration/NetDaemon.HassModel.Integration.csproj index b7c7e998b..ccf16237e 100644 --- a/src/HassModel/NetDaemon.HassModel.Integration/NetDaemon.HassModel.Integration.csproj +++ b/src/HassModel/NetDaemon.HassModel.Integration/NetDaemon.HassModel.Integration.csproj @@ -30,7 +30,7 @@ - + diff --git a/src/HassModel/NetDaemon.HassModel.Tests/NetDaemon.HassModel.Tests.csproj b/src/HassModel/NetDaemon.HassModel.Tests/NetDaemon.HassModel.Tests.csproj index 20f9f9d38..49665fcfd 100644 --- a/src/HassModel/NetDaemon.HassModel.Tests/NetDaemon.HassModel.Tests.csproj +++ b/src/HassModel/NetDaemon.HassModel.Tests/NetDaemon.HassModel.Tests.csproj @@ -1,28 +1,28 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive - - - + + + - + runtime; build; native; contentfiles; analyzers; buildtransitive all - + runtime; build; native; contentfiles; analyzers; buildtransitive all - + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/src/HassModel/NetDaemon.HassModel/NetDaemon.HassModel.csproj b/src/HassModel/NetDaemon.HassModel/NetDaemon.HassModel.csproj index 9ed4b747d..a6a66a692 100644 --- a/src/HassModel/NetDaemon.HassModel/NetDaemon.HassModel.csproj +++ b/src/HassModel/NetDaemon.HassModel/NetDaemon.HassModel.csproj @@ -30,8 +30,8 @@ - - + + diff --git a/src/Host/NetDaemon.Host.Default/NetDaemon.Host.Default.csproj b/src/Host/NetDaemon.Host.Default/NetDaemon.Host.Default.csproj index 5507bb172..b28660517 100644 --- a/src/Host/NetDaemon.Host.Default/NetDaemon.Host.Default.csproj +++ b/src/Host/NetDaemon.Host.Default/NetDaemon.Host.Default.csproj @@ -5,7 +5,7 @@ - + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/src/Runtime/NetDaemon.Runtime.Tests/NetDaemon.Runtime.Tests.csproj b/src/Runtime/NetDaemon.Runtime.Tests/NetDaemon.Runtime.Tests.csproj index 79c04949b..72e295e41 100644 --- a/src/Runtime/NetDaemon.Runtime.Tests/NetDaemon.Runtime.Tests.csproj +++ b/src/Runtime/NetDaemon.Runtime.Tests/NetDaemon.Runtime.Tests.csproj @@ -1,16 +1,16 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive - - + + - + @@ -18,11 +18,11 @@ runtime; build; native; contentfiles; analyzers; buildtransitive all - + runtime; build; native; contentfiles; analyzers; buildtransitive all - + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/src/Runtime/NetDaemon.Runtime/NetDaemon.Runtime.csproj b/src/Runtime/NetDaemon.Runtime/NetDaemon.Runtime.csproj index 765431ec9..ac9ec276d 100644 --- a/src/Runtime/NetDaemon.Runtime/NetDaemon.Runtime.csproj +++ b/src/Runtime/NetDaemon.Runtime/NetDaemon.Runtime.csproj @@ -28,14 +28,14 @@ - - - - - - - - + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantCollection.cs b/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantCollection.cs index 9b17c60bd..550ccb66b 100644 --- a/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantCollection.cs +++ b/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantCollection.cs @@ -2,8 +2,11 @@ namespace NetDaemon.Tests.Integration.Helpers; +/// +/// Defines the shared Home Assistant integration test collection. +/// [CollectionDefinition("HomeAssistant collection")] public class HomeAssistantCollection : ICollectionFixture { -} \ No newline at end of file +} diff --git a/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantLifetime.cs b/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantLifetime.cs index cd97ad7ab..01a8488f8 100644 --- a/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantLifetime.cs +++ b/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantLifetime.cs @@ -1,34 +1,158 @@ -using Microsoft.Extensions.Logging; +using DotNet.Testcontainers.Builders; +using DotNet.Testcontainers.Containers; +using DotNet.Testcontainers.Networks; +using Microsoft.Extensions.Logging; +using MQTTnet; +using MQTTnet.Formatter; using NetDaemon.Tests.Integration.Helpers.HomeAssistantTestContainer; +using System.Text; using Xunit; namespace NetDaemon.Tests.Integration.Helpers; +/// +/// Manages the shared Home Assistant and MQTT broker containers for integration tests. +/// public class HomeAssistantLifetime : IAsyncLifetime { - private readonly HomeAssistantContainer _homeassistant = new HomeAssistantContainerBuilder() - .WithResourceMapping(new DirectoryInfo("./HA/config"), "/config") - .WithLogger(LoggerFactory.Create(builder => builder.AddConsole()).CreateLogger()) - .WithVersion(Environment.GetEnvironmentVariable("HomeAssistantVersion") ?? HomeAssistantContainerBuilder.DefaultVersion) - .Build(); + private const int MqttContainerPort = 1883; + private const string MqttContainerAlias = "mqtt"; + private const string MosquittoConfiguration = """ + listener 1883 0.0.0.0 + allow_anonymous true + persistence false + log_dest stdout + """; + private readonly ILoggerFactory _loggerFactory = LoggerFactory.Create(builder => builder.AddConsole()); + private readonly INetwork _network = new NetworkBuilder().Build(); + private readonly IContainer _mqttBroker; + private readonly HomeAssistantContainer _homeassistant; + + /// + /// Initializes a new instance of the class. + /// + public HomeAssistantLifetime() + { + _mqttBroker = new ContainerBuilder() + .WithImage(Environment.GetEnvironmentVariable("MqttBrokerImage") ?? "eclipse-mosquitto:2") + .WithNetwork(_network) + .WithNetworkAliases(MqttContainerAlias) + .WithPortBinding(MqttContainerPort, true) + .WithResourceMapping(Encoding.UTF8.GetBytes(MosquittoConfiguration), "/mosquitto/config/mosquitto.conf") + .WithWaitStrategy( + Wait.ForUnixContainer() + .UntilInternalTcpPortIsAvailable(MqttContainerPort) + .UntilExternalTcpPortIsAvailable(MqttContainerPort)) + .WithLogger(_loggerFactory.CreateLogger("MqttBrokerContainer")) + .Build(); + + _homeassistant = new HomeAssistantContainerBuilder() + .WithNetwork(_network) + .WithResourceMapping(new DirectoryInfo("./HA/config"), "/config") + .WithLogger(_loggerFactory.CreateLogger()) + .WithVersion(Environment.GetEnvironmentVariable("HomeAssistantVersion") ?? HomeAssistantContainerBuilder.DefaultVersion) + .Build(); + } + + /// + /// Gets or sets the Home Assistant API access token. + /// public string? AccessToken { get; set; } + + /// + /// Gets the mapped Home Assistant HTTP port. + /// public ushort Port => _homeassistant.Port; + /// + /// Gets the host name used by the test host to connect to the MQTT broker. + /// + public string MqttHost { get; } = "localhost"; + + /// + /// Gets the mapped MQTT broker port. + /// + public ushort MqttPort => _mqttBroker.GetMappedPublicPort(MqttContainerPort); + + /// public async Task InitializeAsync() { + await _network.CreateAsync(); + await _mqttBroker.StartAsync(); + await WaitForMqttBrokerAsync(); await _homeassistant.StartAsync(); var authorizeResult = await _homeassistant.DoOnboarding(); AccessToken = (await _homeassistant.GenerateApiToken(authorizeResult.AuthCode)).AccessToken; - await _homeassistant.AddIntegrations(AccessToken); + await _homeassistant.AddIntegrations(AccessToken, new MqttBrokerSettings(MqttContainerAlias, MqttContainerPort)); } + /// public async Task DisposeAsync() { - var (_, stderr) = await _homeassistant.GetLogsAsync(); - Console.WriteLine($"Writing Homeassistant logs to console:{Environment.NewLine}{stderr}{Environment.NewLine}End of Homeassistant logs"); - await _homeassistant.StopAsync(); + var (_, homeAssistantStderr) = await _homeassistant.GetLogsAsync(); + Console.WriteLine($"Writing Homeassistant logs to console:{Environment.NewLine}{homeAssistantStderr}{Environment.NewLine}End of Homeassistant logs"); + + var (_, mqttStderr) = await _mqttBroker.GetLogsAsync(); + Console.WriteLine($"Writing MQTT broker logs to console:{Environment.NewLine}{mqttStderr}{Environment.NewLine}End of MQTT broker logs"); + + await _homeassistant.DisposeAsync(); + await _mqttBroker.DisposeAsync(); + await _network.DisposeAsync(); + _loggerFactory.Dispose(); } -} + private async Task WaitForMqttBrokerAsync() + { + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var logger = _loggerFactory.CreateLogger(); + Exception? lastException = null; + + while (!timeout.IsCancellationRequested) + { + try + { + using var client = new MqttClientFactory().CreateMqttClient(); + var clientOptions = new MqttClientOptionsBuilder() + .WithTcpServer(MqttHost, MqttPort) + .WithProtocolVersion(MqttProtocolVersion.V311) + .WithClientId($"netdaemon-test-readiness-{Guid.NewGuid():N}") + .Build(); + + var connectResult = await client.ConnectAsync(clientOptions, timeout.Token); + if (connectResult.ResultCode == MqttClientConnectResultCode.Success) + { + await client.DisconnectAsync(); + return; + } + + lastException = new InvalidOperationException($"MQTT broker rejected readiness connection: {connectResult.ResultCode} {connectResult.ReasonString}"); + } + catch (OperationCanceledException) when (timeout.IsCancellationRequested) + { + break; + } + catch (Exception ex) + { + lastException = ex; + } + + await DelayNextMqttBrokerAttempt(timeout.Token); + } + + logger.LogError(lastException, "MQTT broker endpoint {Host}:{Port} did not become ready within 30 seconds", MqttHost, MqttPort); + throw new TimeoutException($"MQTT broker endpoint {MqttHost}:{MqttPort} did not become ready within 30 seconds.", lastException); + } + + private static async Task DelayNextMqttBrokerAttempt(CancellationToken cancellationToken) + { + try + { + await Task.Delay(TimeSpan.FromMilliseconds(100), cancellationToken); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + } + } +} diff --git a/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantConfiguration.cs b/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantConfiguration.cs index 2618daa30..1723b2e81 100644 --- a/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantConfiguration.cs +++ b/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantConfiguration.cs @@ -4,13 +4,38 @@ namespace NetDaemon.Tests.Integration.Helpers.HomeAssistantTestContainer; +/// +/// Container configuration for Home Assistant integration tests. +/// public class HomeAssistantConfiguration : ContainerConfiguration { + /// + /// Gets the Home Assistant onboarding username. + /// public string Username { get; } = null!; + + /// + /// Gets the Home Assistant onboarding password. + /// public string Password { get; } = null!; + + /// + /// Gets the Home Assistant OAuth client id. + /// public string ClientId { get; } = null!; + + /// + /// Gets the Home Assistant container image version. + /// public string Version { get; } = null!; + /// + /// Initializes a new instance of the class. + /// + /// The Home Assistant onboarding username. + /// The Home Assistant onboarding password. + /// The Home Assistant OAuth client id. + /// The Home Assistant container image version. public HomeAssistantConfiguration( string? username = null, string? password = null, @@ -23,16 +48,29 @@ public HomeAssistantConfiguration( Version = version!; } + /// + /// Initializes a new instance of the class. + /// + /// The Docker resource configuration. public HomeAssistantConfiguration(IResourceConfiguration resourceConfiguration) : base(resourceConfiguration) { } + /// + /// Initializes a new instance of the class. + /// + /// The container configuration. public HomeAssistantConfiguration(IContainerConfiguration resourceConfiguration) : base(resourceConfiguration) { } - + + /// + /// Initializes a new instance of the class. + /// + /// The previous Home Assistant configuration. + /// The new Home Assistant configuration. public HomeAssistantConfiguration(HomeAssistantConfiguration oldValue, HomeAssistantConfiguration newValue) : base(oldValue, newValue) { @@ -41,4 +79,4 @@ public HomeAssistantConfiguration(HomeAssistantConfiguration oldValue, HomeAssis Password = BuildConfiguration.Combine(oldValue.Password, newValue.Password); Version = BuildConfiguration.Combine(oldValue.Version, newValue.Version); } -} \ No newline at end of file +} diff --git a/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantContainer.cs b/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantContainer.cs index 1df0c4840..a8757c6a9 100644 --- a/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantContainer.cs +++ b/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantContainer.cs @@ -6,22 +6,41 @@ namespace NetDaemon.Tests.Integration.Helpers.HomeAssistantTestContainer; +/// +/// Testcontainers container wrapper for Home Assistant integration tests. +/// public class HomeAssistantContainer : DockerContainer { private readonly HomeAssistantConfiguration _configuration; + + /// + /// Gets the mapped Home Assistant HTTP port. + /// public ushort Port => GetMappedPublicPort(8123); private HttpClient? _client; + + /// + /// Gets the HTTP client configured for the Home Assistant container. + /// public HttpClient Client => _client ??= new HttpClient { BaseAddress = new Uri($"http://localhost:{Port}") }; + /// + /// Initializes a new instance of the class. + /// + /// The Home Assistant container configuration. public HomeAssistantContainer(HomeAssistantConfiguration configuration) : base(configuration) { _configuration = configuration; } + /// + /// Completes Home Assistant onboarding for the test user. + /// + /// The Home Assistant authorization result. public async Task DoOnboarding() { var onboardingResult = await Client.PostAsync("/api/onboarding/users", JsonContent.Create(new @@ -37,10 +56,16 @@ public async Task DoOnboarding() return (await onboardingResult.Content.ReadFromJsonAsync())!; } - public async Task AddIntegrations(string token) + /// + /// Adds integrations required by the integration tests. + /// + /// The Home Assistant API access token. + /// The MQTT broker settings used by Home Assistant. + public async Task AddIntegrations(string token, MqttBrokerSettings mqttBrokerSettings) { AddAuthorizationHeaders(token); await AddLocalCalendarIntegration(); + await AddMqttIntegration(mqttBrokerSettings); } private void AddAuthorizationHeaders(string token) @@ -69,6 +94,72 @@ private async Task AddLocalCalendarIntegration() } } + private async Task AddMqttIntegration(MqttBrokerSettings mqttBrokerSettings) + { + var submitFlow = await SubmitMqttIntegration(mqttBrokerSettings, includeProtocol: true); + var flowType = submitFlow.GetProperty("type").GetString(); + if (flowType != "create_entry") + { + throw new InvalidOperationException($"Home Assistant MQTT integration setup failed: {submitFlow}"); + } + } + + private async Task SubmitMqttIntegration(MqttBrokerSettings mqttBrokerSettings, bool includeProtocol) + { + var result = await Client.PostAsync("/api/config/config_entries/flow", JsonContent.Create(new + { + handler = "mqtt", + show_advanced_options = true + })); + + await EnsureSuccessStatusCode(result, "starting MQTT config flow"); + var startFlow = await result.Content.ReadFromJsonAsync(); + var flowId = startFlow.GetProperty("flow_id").GetString() + ?? throw new InvalidOperationException("Home Assistant did not return an MQTT config flow id."); + + var brokerSettings = new Dictionary + { + ["broker"] = mqttBrokerSettings.Host, + ["port"] = mqttBrokerSettings.Port + }; + + if (includeProtocol) + { + brokerSettings["protocol"] = "3.1.1"; + } + + var submitResult = await Client.PostAsync($"/api/config/config_entries/flow/{flowId}", JsonContent.Create(brokerSettings)); + + if (submitResult.IsSuccessStatusCode) + { + return await submitResult.Content.ReadFromJsonAsync(); + } + + var content = await submitResult.Content.ReadAsStringAsync(); + if (includeProtocol && content.Contains("data['protocol']", StringComparison.Ordinal)) + { + return await SubmitMqttIntegration(mqttBrokerSettings, includeProtocol: false); + } + + throw new HttpRequestException($"Home Assistant returned {(int)submitResult.StatusCode} ({submitResult.StatusCode}) while submitting MQTT config flow: {content}"); + } + + private static async Task EnsureSuccessStatusCode(HttpResponseMessage response, string operation) + { + if (response.IsSuccessStatusCode) + { + return; + } + + var content = await response.Content.ReadAsStringAsync(); + throw new HttpRequestException($"Home Assistant returned {(int)response.StatusCode} ({response.StatusCode}) while {operation}: {content}"); + } + + /// + /// Generates a Home Assistant API token from an authorization code. + /// + /// The Home Assistant authorization code. + /// The generated Home Assistant token result. public async Task GenerateApiToken(string authCode) { var tokenResponse = await Client.PostAsync("/auth/token", new FormUrlEncodedContent( @@ -84,12 +175,31 @@ public async Task GenerateApiToken(string authCode) } } +/// +/// Authorization result returned by Home Assistant onboarding. +/// +/// The authorization code. public record HomeAssistantAuthorizeResult( [property:JsonPropertyName("auth_code")][property:JsonRequired]string AuthCode); +/// +/// Token result returned by the Home Assistant token endpoint. +/// +/// The API access token. +/// The token type. +/// The refresh token. +/// The token lifetime in seconds. +/// The Home Assistant authentication provider. public record HomeAssistantTokenResult( [property:JsonPropertyName("access_token")][property:JsonRequired]string AccessToken, [property:JsonPropertyName("token_type")][property:JsonRequired]string TokenType, [property:JsonPropertyName("refresh_token")][property:JsonRequired]string RefreshToken, [property:JsonPropertyName("expires_in")][property:JsonRequired]int ExpiresIn, [property:JsonPropertyName("ha_auth_provider")][property:JsonRequired]string HaAuthProvider); + +/// +/// MQTT broker settings used to configure Home Assistant. +/// +/// The broker host name visible from Home Assistant. +/// The broker port visible from Home Assistant. +public record MqttBrokerSettings(string Host, int Port); diff --git a/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantContainerBuilder.cs b/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantContainerBuilder.cs index 80320fb96..4e07b6d2e 100644 --- a/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantContainerBuilder.cs +++ b/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantContainerBuilder.cs @@ -5,18 +5,43 @@ namespace NetDaemon.Tests.Integration.Helpers.HomeAssistantTestContainer; +/// +/// Builds Home Assistant containers for integration tests. +/// public class HomeAssistantContainerBuilder : ContainerBuilder { + /// + /// The default Home Assistant container image version. + /// public const string DefaultVersion = "stable"; + + /// + /// The default OAuth client id used during Home Assistant onboarding. + /// public const string DefaultClientId = "http://dummyClientId"; + + /// + /// The default Home Assistant onboarding username. + /// public const string DefaultUsername = "username"; + + /// + /// The default Home Assistant onboarding password. + /// public const string DefaultPassword = "password"; + /// + /// Initializes a new instance of the class. + /// public HomeAssistantContainerBuilder() : this(new HomeAssistantConfiguration()) { DockerResourceConfiguration = Init().DockerResourceConfiguration; } + /// + /// Initializes a new instance of the class. + /// + /// The Home Assistant container configuration. public HomeAssistantContainerBuilder(HomeAssistantConfiguration dockerResourceConfiguration) : base(dockerResourceConfiguration) { DockerResourceConfiguration = dockerResourceConfiguration; @@ -34,14 +59,37 @@ protected override HomeAssistantContainerBuilder Init() => .WithClientId(DefaultClientId) .WithVersion(DefaultVersion); + /// + /// Sets the Home Assistant container image version. + /// + /// The Home Assistant container image version. + /// The configured builder. public HomeAssistantContainerBuilder WithVersion(string version) => Merge(DockerResourceConfiguration, new HomeAssistantConfiguration(version: version)) .WithImage($"homeassistant/home-assistant:{version}"); + /// + /// Sets the Home Assistant onboarding username. + /// + /// The Home Assistant onboarding username. + /// The configured builder. public HomeAssistantContainerBuilder WithUsername(string username) => Merge(DockerResourceConfiguration, new HomeAssistantConfiguration(username: username)); + + /// + /// Sets the Home Assistant onboarding password. + /// + /// The Home Assistant onboarding password. + /// The configured builder. public HomeAssistantContainerBuilder WithPassword(string password) => Merge(DockerResourceConfiguration, new HomeAssistantConfiguration(password: password)); + + /// + /// Sets the Home Assistant OAuth client id. + /// + /// The Home Assistant OAuth client id. + /// The configured builder. public HomeAssistantContainerBuilder WithClientId(string clientId) => Merge(DockerResourceConfiguration, new HomeAssistantConfiguration(clientId: clientId)); + /// public override HomeAssistantContainer Build() { Validate(); diff --git a/tests/Integration/NetDaemon.Tests.Integration/Helpers/NetDaemonIntegrationBase.cs b/tests/Integration/NetDaemon.Tests.Integration/Helpers/NetDaemonIntegrationBase.cs index cf04718c9..a8a10854c 100644 --- a/tests/Integration/NetDaemon.Tests.Integration/Helpers/NetDaemonIntegrationBase.cs +++ b/tests/Integration/NetDaemon.Tests.Integration/Helpers/NetDaemonIntegrationBase.cs @@ -4,19 +4,30 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using NetDaemon.AppModel; +using NetDaemon.Extensions.MqttEntityManager; using NetDaemon.Runtime; using Xunit; namespace NetDaemon.Tests.Integration.Helpers; +/// +/// Base class for integration tests that run against the shared Home Assistant container. +/// [Collection("HomeAssistant collection")] public class NetDaemonIntegrationBase : IAsyncDisposable { + /// + /// Gets the scoped services for the running NetDaemon host. + /// public IServiceProvider Services => _scope.ServiceProvider; private readonly HomeAssistantLifetime _homeAssistantLifetime; private readonly IHost _netDaemon; private readonly AsyncServiceScope _scope; + /// + /// Initializes a new instance of the class. + /// + /// The shared Home Assistant test lifetime. public NetDaemonIntegrationBase(HomeAssistantLifetime homeAssistantLifetime) { _homeAssistantLifetime = homeAssistantLifetime; @@ -36,13 +47,17 @@ private IHost StartNetDaemon() { { "HomeAssistant:Port", _homeAssistantLifetime.Port.ToString(CultureInfo.InvariantCulture) }, { "HomeAssistant:Token", _homeAssistantLifetime.AccessToken }, - { "HomeAssistant:Host", "localhost" } + { "HomeAssistant:Host", "localhost" }, + { "Mqtt:Host", _homeAssistantLifetime.MqttHost }, + { "Mqtt:Port", _homeAssistantLifetime.MqttPort.ToString(CultureInfo.InvariantCulture) }, + { "Mqtt:DiscoveryPrefix", "homeassistant" } }); }) .ConfigureServices((_, services) => services .AddAppsFromAssembly(Assembly.GetExecutingAssembly()) .AddNetDaemonStateManager() + .AddNetDaemonMqttEntityManagement() ).Build(); netDaemon.Start(); @@ -61,9 +76,9 @@ private static async Task WaitForRuntimeToBeInitialized(IHost host) /// /// Runs the specified function without a synchronization context and restores the synchronization context afterwards. /// - /// + /// The function to run. /// - /// + /// The result of the function. private static T RunWithoutSynchronizationContext(Func func) { // Capture the current synchronization context so we can restore it later. @@ -80,6 +95,7 @@ private static T RunWithoutSynchronizationContext(Func func) } } + /// public async ValueTask DisposeAsync() { await _scope.DisposeAsync(); diff --git a/tests/Integration/NetDaemon.Tests.Integration/MqttIntegrationTests.cs b/tests/Integration/NetDaemon.Tests.Integration/MqttIntegrationTests.cs new file mode 100644 index 000000000..cf7537409 --- /dev/null +++ b/tests/Integration/NetDaemon.Tests.Integration/MqttIntegrationTests.cs @@ -0,0 +1,138 @@ +using FluentAssertions; +using Microsoft.Extensions.DependencyInjection; +using NetDaemon.AppModel; +using NetDaemon.Extensions.MqttEntityManager; +using NetDaemon.HassModel; +using NetDaemon.HassModel.Entities; +using NetDaemon.Tests.Integration.Helpers; +using System.Reactive.Linq; +using System.Reactive.Threading.Tasks; +using Xunit; + +namespace NetDaemon.Tests.Integration; + +/// +/// Test app that exposes a switch through the MQTT entity manager. +/// +[NetDaemonApp] +public sealed class MqttIntegrationSwitchApp : IAsyncInitializable, IDisposable +{ + /// + /// The Home Assistant entity id created by the MQTT integration test app. + /// + public const string EntityId = "switch.netdaemon_mqtt_test_switch"; + private const string PayloadOn = "ON"; + private const string PayloadOff = "OFF"; + + private readonly IMqttEntityManager _entityManager; + private IDisposable? _commandSubscription; + + /// + /// Initializes a new instance of the class. + /// + /// The MQTT entity manager. + public MqttIntegrationSwitchApp(IMqttEntityManager entityManager) + { + _entityManager = entityManager; + } + + /// + public async Task InitializeAsync(CancellationToken cancellationToken) + { + await _entityManager.CreateAsync( + EntityId, + new EntityCreationOptions( + UniqueId: "netdaemon_mqtt_test_switch", + Name: "NetDaemon MQTT Test Switch", + PayloadOn: PayloadOn, + PayloadOff: PayloadOff)).ConfigureAwait(false); + + await _entityManager.SetStateAsync(EntityId, PayloadOff).ConfigureAwait(false); + + var commands = await _entityManager.PrepareCommandSubscriptionAsync(EntityId).ConfigureAwait(false); + _commandSubscription = commands.Subscribe(command => _ = HandleCommandAsync(command)); + } + + /// + public void Dispose() + { + _commandSubscription?.Dispose(); + } + + private async Task HandleCommandAsync(string command) + { + if (command is PayloadOn or PayloadOff) + { + await _entityManager.SetStateAsync(EntityId, command).ConfigureAwait(false); + } + } +} + +/// +/// Integration tests for MQTT-backed Home Assistant entities. +/// +public class MqttIntegrationTests : NetDaemonIntegrationBase +{ + /// + /// Initializes a new instance of the class. + /// + /// The shared Home Assistant test lifetime. + public MqttIntegrationTests(HomeAssistantLifetime homeAssistantLifetime) : base(homeAssistantLifetime) + { + } + + /// + /// Verifies that an MQTT switch can be turned on and off through Home Assistant. + /// + [Fact] + public async Task MqttSwitch_ShouldTurnOnAndOffThroughHomeAssistant() + { + var haContext = Services.GetRequiredService(); + + await WaitForStateAsync(haContext, MqttIntegrationSwitchApp.EntityId, "off"); + + var waitForOn = WaitForStateAsync(haContext, MqttIntegrationSwitchApp.EntityId, "on"); + haContext.CallService( + "switch", + "turn_on", + ServiceTarget.FromEntities(MqttIntegrationSwitchApp.EntityId)); + + await waitForOn; + + var waitForOff = WaitForStateAsync(haContext, MqttIntegrationSwitchApp.EntityId, "off"); + haContext.CallService( + "switch", + "turn_off", + ServiceTarget.FromEntities(MqttIntegrationSwitchApp.EntityId)); + + await waitForOff; + } + + private static async Task WaitForStateAsync(IHaContext haContext, string entityId, string expectedState) + { + if (haContext.GetState(entityId)?.State == expectedState) + { + return; + } + + var waitForState = haContext.StateChanges() + .Where(change => change.Entity.EntityId == entityId && change.New?.State == expectedState) + .FirstAsync() + .ToTask(); + + if (haContext.GetState(entityId)?.State == expectedState) + { + return; + } + + try + { + await waitForState.WaitAsync(TimeSpan.FromSeconds(30)); + } + catch (TimeoutException) + { + haContext.GetState(entityId)?.State.Should().Be(expectedState, + $"entity {entityId} should reach state {expectedState} within 30 seconds"); + } + } +} diff --git a/tests/Integration/NetDaemon.Tests.Integration/NetDaemon.Tests.Integration.csproj b/tests/Integration/NetDaemon.Tests.Integration/NetDaemon.Tests.Integration.csproj index 04c86cd40..7e06348d5 100644 --- a/tests/Integration/NetDaemon.Tests.Integration/NetDaemon.Tests.Integration.csproj +++ b/tests/Integration/NetDaemon.Tests.Integration/NetDaemon.Tests.Integration.csproj @@ -1,26 +1,26 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive - - - + + + - + runtime; build; native; contentfiles; analyzers; buildtransitive all - + runtime; build; native; contentfiles; analyzers; buildtransitive all - + runtime; build; native; contentfiles; analyzers; buildtransitive all @@ -28,6 +28,7 @@ +