diff --git a/src/AppModel/NetDaemon.AppModel.SourceDeployedApps/NetDaemon.AppModel.SourceDeployedApps.csproj b/src/AppModel/NetDaemon.AppModel.SourceDeployedApps/NetDaemon.AppModel.SourceDeployedApps.csproj
index bca728b5e..5162985de 100644
--- a/src/AppModel/NetDaemon.AppModel.SourceDeployedApps/NetDaemon.AppModel.SourceDeployedApps.csproj
+++ b/src/AppModel/NetDaemon.AppModel.SourceDeployedApps/NetDaemon.AppModel.SourceDeployedApps.csproj
@@ -29,21 +29,21 @@
-
-
-
-
-
-
-
+
+
+
+
+
+
+
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
-
+
diff --git a/src/AppModel/NetDaemon.AppModel.Tests/NetDaemon.AppModel.Tests.csproj b/src/AppModel/NetDaemon.AppModel.Tests/NetDaemon.AppModel.Tests.csproj
index 173df4641..da6cd8ee7 100644
--- a/src/AppModel/NetDaemon.AppModel.Tests/NetDaemon.AppModel.Tests.csproj
+++ b/src/AppModel/NetDaemon.AppModel.Tests/NetDaemon.AppModel.Tests.csproj
@@ -11,11 +11,11 @@
all
runtime; build; native; contentfiles; analyzers; buildtransitive
-
-
+
+
-
+
@@ -23,11 +23,11 @@
runtime; build; native; contentfiles; analyzers; buildtransitive
all
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
diff --git a/src/AppModel/NetDaemon.AppModel/NetDaemon.AppModel.csproj b/src/AppModel/NetDaemon.AppModel/NetDaemon.AppModel.csproj
index a9b95ee94..97083f219 100644
--- a/src/AppModel/NetDaemon.AppModel/NetDaemon.AppModel.csproj
+++ b/src/AppModel/NetDaemon.AppModel/NetDaemon.AppModel.csproj
@@ -28,19 +28,19 @@
-
-
-
-
-
-
-
+
+
+
+
+
+
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
-
+
diff --git a/src/Client/NetDaemon.HassClient.Debug/NetDaemon.HassClient.Debug.csproj b/src/Client/NetDaemon.HassClient.Debug/NetDaemon.HassClient.Debug.csproj
index 19c802fb5..4a6110d93 100644
--- a/src/Client/NetDaemon.HassClient.Debug/NetDaemon.HassClient.Debug.csproj
+++ b/src/Client/NetDaemon.HassClient.Debug/NetDaemon.HassClient.Debug.csproj
@@ -24,11 +24,11 @@
-
-
-
-
-
+
+
+
+
+
runtime; build; native; contentfiles; analyzers; buildtransitive
diff --git a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/AssuredMqttConnectionTests.cs b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/AssuredMqttConnectionTests.cs
index 7206d3555..16c97a957 100644
--- a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/AssuredMqttConnectionTests.cs
+++ b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/AssuredMqttConnectionTests.cs
@@ -1,48 +1,216 @@
-using MQTTnet.Extensions.ManagedClient;
+using MQTTnet;
+using MQTTnet.Packets;
using NetDaemon.Extensions.MqttEntityManager;
-using NetDaemon.Extensions.MqttEntityManager.Helpers;
namespace NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests;
public class AssuredMqttConnectionTests
{
+ private static readonly TimeSpan DefaultWaitTimeout = TimeSpan.FromSeconds(5);
+ private static readonly TimeSpan RetryWaitTimeout = TimeSpan.FromSeconds(10);
+
+ [Fact]
+ public async Task QueuesPublishUntilConnected()
+ {
+ var setup = new AssuredMqttConnectionSetup();
+ using var conn = setup.CreateConnection();
+
+ var message = new MqttApplicationMessageBuilder()
+ .WithTopic("topic")
+ .WithPayload("payload")
+ .Build();
+
+ await conn.PublishAsync(message);
+ setup.PublishedMessages.Should().BeEmpty();
+
+ setup.CompleteConnect();
+
+ var publishedMessage = await setup.WaitForPublishedMessageAsync();
+ publishedMessage.Should().Be(message);
+ }
+
+ [Fact]
+ public async Task ReplaysSubscriptionsAfterConnect()
+ {
+ var setup = new AssuredMqttConnectionSetup();
+ using var conn = setup.CreateConnection();
+
+ var topicFilter = new MqttTopicFilterBuilder()
+ .WithTopic("homeassistant/domain/sensor/set")
+ .Build();
+
+ await conn.SubscribeAsync(topicFilter);
+ setup.SubscribedTopics.Should().BeEmpty();
+
+ setup.CompleteConnect();
+
+ var subscribedTopic = await setup.WaitForSubscribedTopicAsync();
+ subscribedTopic.Should().Be("homeassistant/domain/sensor/set");
+ }
+
[Fact]
- public async Task CanGetClient()
+ public async Task PublishesImmediatelyWhenAlreadyConnected()
{
- var logger = new Mock>();
+ var setup = new AssuredMqttConnectionSetup();
+ using var conn = setup.CreateConnection();
+ setup.CompleteConnect();
+ await setup.WaitForConnectedAsync();
+
+ var message = new MqttApplicationMessageBuilder()
+ .WithTopic("topic")
+ .WithPayload("payload")
+ .Build();
- var mqttClient = new Mock();
- var mqttFactory = new MqttFactoryWrapper(mqttClient.Object);
- var mqttClientOptionsFactory = new Mock();
- var mqttConfigurationOptions = new Mock>();
+ await conn.PublishAsync(message);
- ConfigureMockOptions(mqttConfigurationOptions);
+ var publishedMessage = await setup.WaitForPublishedMessageAsync();
+ publishedMessage.Should().Be(message);
+ }
+
+ [Fact]
+ public async Task RetriesAfterTransientConnectionFailure()
+ {
+ var setup = new AssuredMqttConnectionSetup();
+ setup.FailConnectAttempts(1);
+ using var conn = setup.CreateConnection();
+
+ var topicFilter = new MqttTopicFilterBuilder()
+ .WithTopic("homeassistant/domain/switch/set")
+ .Build();
- mqttClientOptionsFactory.Setup(f => f.CreateClientOptions(It.Is(o => o.Host == "localhost" && o.UserName == "id")))
- .Returns(new ManagedMqttClientOptions())
- .Verifiable(Times.Once);
+ var message = new MqttApplicationMessageBuilder()
+ .WithTopic("homeassistant/domain/switch/state")
+ .WithPayload("ON")
+ .Build();
- var conn = new AssuredMqttConnection(logger.Object, mqttClientOptionsFactory.Object, mqttFactory, mqttConfigurationOptions.Object);
- var returnedClient = await conn.GetClientAsync();
+ await conn.SubscribeAsync(topicFilter);
+ await conn.PublishAsync(message);
+ setup.CompleteConnect();
- returnedClient.Should().Be(mqttClient.Object);
+ var subscribedTopic = await setup.WaitForSubscribedTopicAsync(RetryWaitTimeout);
+ var publishedMessage = await setup.WaitForPublishedMessageAsync(RetryWaitTimeout);
- mqttClientOptionsFactory.VerifyAll();
- mqttConfigurationOptions.VerifyAll();
+ setup.ConnectAttempts.Should().BeGreaterThan(1);
+ subscribedTopic.Should().Be("homeassistant/domain/switch/set");
+ publishedMessage.Should().Be(message);
}
- private static void ConfigureMockOptions(Mock> mockOptions, Action? configuration = null)
+ private sealed class AssuredMqttConnectionSetup
{
- var mqttConfiguration = new MqttConfiguration
+ private readonly TaskCompletionSource _connectCompletion = new(TaskCreationOptions.RunContinuationsAsynchronously);
+ private readonly TaskCompletionSource _connected = new(TaskCreationOptions.RunContinuationsAsynchronously);
+ private readonly TaskCompletionSource _publishedMessage = new(TaskCreationOptions.RunContinuationsAsynchronously);
+ private readonly TaskCompletionSource _subscribedTopic = new(TaskCreationOptions.RunContinuationsAsynchronously);
+ private readonly Mock _mqttClient = new();
+ private readonly Mock _mqttFactory = new();
+ private readonly Mock _mqttClientOptionsFactory = new();
+ private readonly Mock> _mqttConfigurationOptions = new();
+ private int _connectAttempts;
+ private int _connectFailuresBeforeSuccess;
+
+ public AssuredMqttConnectionSetup()
{
- Host = "localhost",
- UserName = "id"
- };
+ var mqttConfiguration = new MqttConfiguration
+ {
+ Host = "localhost",
+ UserName = "id"
+ };
- configuration?.Invoke(mqttConfiguration);
+ var clientOptions = new MqttClientOptionsBuilder()
+ .WithTcpServer(mqttConfiguration.Host, mqttConfiguration.Port)
+ .Build();
- mockOptions.SetupGet(o => o.Value)
- .Returns(() => mqttConfiguration)
- .Verifiable(Times.Once);
+ _mqttConfigurationOptions.SetupGet(o => o.Value)
+ .Returns(mqttConfiguration);
+
+ _mqttClientOptionsFactory.Setup(f => f.CreateClientOptions(mqttConfiguration))
+ .Returns(clientOptions);
+
+ _mqttFactory.Setup(f => f.CreateMqttClient())
+ .Returns(_mqttClient.Object);
+
+ _mqttClient.SetupGet(c => c.IsConnected)
+ .Returns(() => IsConnected);
+
+ _mqttClient.Setup(c => c.ConnectAsync(clientOptions, It.IsAny()))
+ .Returns(async () =>
+ {
+ Interlocked.Increment(ref _connectAttempts);
+ if (Interlocked.CompareExchange(ref _connectFailuresBeforeSuccess, 0, 0) > 0)
+ {
+ Interlocked.Decrement(ref _connectFailuresBeforeSuccess);
+ throw new InvalidOperationException("MQTT broker is not ready yet.");
+ }
+
+ await _connectCompletion.Task;
+ IsConnected = true;
+ _connected.TrySetResult();
+ return new MqttClientConnectResult
+ {
+ ResultCode = MqttClientConnectResultCode.Success
+ };
+ });
+
+ _mqttClient.Setup(c => c.PublishAsync(It.IsAny(), It.IsAny()))
+ .Callback((message, _) =>
+ {
+ PublishedMessages.Add(message);
+ _publishedMessage.TrySetResult(message);
+ })
+ .ReturnsAsync(() => new MqttClientPublishResult(null, MqttClientPublishReasonCode.Success, string.Empty, []));
+
+ _mqttClient.Setup(c => c.SubscribeAsync(It.IsAny(), It.IsAny()))
+ .Callback((options, _) =>
+ {
+ foreach (var topic in options.TopicFilters.Select(topicFilter => topicFilter.Topic))
+ {
+ SubscribedTopics.Add(topic);
+ _subscribedTopic.TrySetResult(topic);
+ }
+ })
+ .ReturnsAsync(() => new MqttClientSubscribeResult(1, [], string.Empty, []));
+ }
+
+ public bool IsConnected { get; private set; }
+
+ public int ConnectAttempts => _connectAttempts;
+
+ public List PublishedMessages { get; } = [];
+
+ public List SubscribedTopics { get; } = [];
+
+ public AssuredMqttConnection CreateConnection()
+ {
+ return new AssuredMqttConnection(
+ new Mock>().Object,
+ _mqttClientOptionsFactory.Object,
+ _mqttFactory.Object,
+ _mqttConfigurationOptions.Object);
+ }
+
+ public void CompleteConnect()
+ {
+ _connectCompletion.TrySetResult();
+ }
+
+ public void FailConnectAttempts(int count)
+ {
+ _connectFailuresBeforeSuccess = count;
+ }
+
+ public async Task WaitForConnectedAsync(TimeSpan? timeout = null)
+ {
+ await _connected.Task.WaitAsync(timeout ?? DefaultWaitTimeout);
+ }
+
+ public async Task WaitForPublishedMessageAsync(TimeSpan? timeout = null)
+ {
+ return await _publishedMessage.Task.WaitAsync(timeout ?? DefaultWaitTimeout);
+ }
+
+ public async Task WaitForSubscribedTopicAsync(TimeSpan? timeout = null)
+ {
+ return await _subscribedTopic.Task.WaitAsync(timeout ?? DefaultWaitTimeout);
+ }
}
}
diff --git a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MessageSenderTests.cs b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MessageSenderTests.cs
index bff7bf427..84d4e2bca 100644
--- a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MessageSenderTests.cs
+++ b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MessageSenderTests.cs
@@ -1,4 +1,5 @@
-using MQTTnet.Protocol;
+using MQTTnet;
+using MQTTnet.Protocol;
using NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests.TestHelpers;
namespace NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests;
@@ -13,7 +14,7 @@ public async Task TopicAndPayloadAreSet()
await mqttSetup.MessageSender.SendMessageAsync("topic", "payload", true, MqttQualityOfServiceLevel.AtMostOnce);
var publishedMessage = mqttSetup.LastPublishedMessage;
- var payloadAsText = System.Text.Encoding.Default.GetString(publishedMessage.PayloadSegment.Array ?? []);
+ var payloadAsText = publishedMessage.ConvertPayloadToString();
publishedMessage.Topic.Should().Be("topic");
payloadAsText.Should().Be("payload");
@@ -73,4 +74,4 @@ public async Task CanUnsetPersist()
publishedMessage.Retain.Should().BeFalse();
}
-}
\ No newline at end of file
+}
diff --git a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MessageSubscriberTests.cs b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MessageSubscriberTests.cs
new file mode 100644
index 000000000..0d1172afc
--- /dev/null
+++ b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MessageSubscriberTests.cs
@@ -0,0 +1,46 @@
+using NetDaemon.Extensions.MqttEntityManager;
+using NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests.TestHelpers;
+
+namespace NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests;
+
+public class MessageSubscriberTests
+{
+ [Fact]
+ public async Task SubscribeStoresTopicFilter()
+ {
+ var connection = new FakeAssuredMqttConnection();
+ var subscriber = new MessageSubscriber(new Mock>().Object, connection);
+
+ await subscriber.SubscribeTopicAsync("homeassistant/domain/sensor/set");
+
+ connection.Subscriptions.Should().ContainSingle();
+ connection.Subscriptions[0].Topic.Should().Be("homeassistant/domain/sensor/set");
+ }
+
+ [Fact]
+ public async Task ReceivedMessageIsForwardedToMatchingSubscriber()
+ {
+ var connection = new FakeAssuredMqttConnection();
+ var subscriber = new MessageSubscriber(new Mock>().Object, connection);
+ var observable = await subscriber.SubscribeTopicAsync("homeassistant/domain/sensor/set");
+ var received = observable.FirstAsync().ToTask();
+
+ await connection.ReceiveAsync("homeassistant/domain/sensor/set", "on");
+
+ (await received.WaitAsync(TimeSpan.FromSeconds(1))).Should().Be("on");
+ }
+
+ [Fact]
+ public async Task ReceivedMessageIsIgnoredForUnmatchedTopic()
+ {
+ var connection = new FakeAssuredMqttConnection();
+ var subscriber = new MessageSubscriber(new Mock>().Object, connection);
+ var observable = await subscriber.SubscribeTopicAsync("homeassistant/domain/sensor/set");
+ var received = new List();
+ using var subscription = observable.Subscribe(received.Add);
+
+ await connection.ReceiveAsync("homeassistant/domain/other/set", "on");
+
+ received.Should().BeEmpty();
+ }
+}
diff --git a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MqttClientOptionsFactoryTests.cs b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MqttClientOptionsFactoryTests.cs
index 87734511d..c4e9dda33 100644
--- a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MqttClientOptionsFactoryTests.cs
+++ b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MqttClientOptionsFactoryTests.cs
@@ -1,4 +1,5 @@
-using MQTTnet.Client;
+using MQTTnet;
+using MQTTnet.Formatter;
using NetDaemon.Extensions.MqttEntityManager;
namespace NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests;
@@ -21,19 +22,20 @@ public void CreatesDefaultConfiguration()
mqttClientOptions.Should().NotBeNull();
- mqttClientOptions.ClientOptions.ChannelOptions.Should().NotBeNull();
- mqttClientOptions.ClientOptions.ChannelOptions.Should().BeOfType();
+ mqttClientOptions.ProtocolVersion.Should().Be(MqttProtocolVersion.V311);
+ mqttClientOptions.ChannelOptions.Should().NotBeNull();
+ mqttClientOptions.ChannelOptions.Should().BeOfType();
- var mqttClientChannelOptions = (MqttClientTcpOptions)mqttClientOptions.ClientOptions.ChannelOptions;
+ var mqttClientChannelOptions = (MqttClientTcpOptions)mqttClientOptions.ChannelOptions;
var ipEndpoint = (System.Net.DnsEndPoint)mqttClientChannelOptions.RemoteEndpoint;
ipEndpoint.Host.Should().Be("broker");
ipEndpoint.Port.Should().Be(1883);
- mqttClientOptions.ClientOptions.Credentials.Should().BeNull();
+ mqttClientOptions.Credentials.Should().BeNull();
- mqttClientOptions.ClientOptions.ChannelOptions.TlsOptions.UseTls.Should().BeFalse();
- mqttClientOptions.ClientOptions.ChannelOptions.TlsOptions.AllowUntrustedCertificates.Should().BeFalse();
+ mqttClientOptions.ChannelOptions.TlsOptions.UseTls.Should().BeFalse();
+ mqttClientOptions.ChannelOptions.TlsOptions.AllowUntrustedCertificates.Should().BeFalse();
}
[Fact]
@@ -49,21 +51,22 @@ public void CreatesDefaultConfigurationWithTls()
mqttClientOptions.Should().NotBeNull();
- mqttClientOptions.ClientOptions.ChannelOptions.Should().NotBeNull();
- mqttClientOptions.ClientOptions.ChannelOptions.Should().BeOfType();
+ mqttClientOptions.ProtocolVersion.Should().Be(MqttProtocolVersion.V311);
+ mqttClientOptions.ChannelOptions.Should().NotBeNull();
+ mqttClientOptions.ChannelOptions.Should().BeOfType();
- var mqttClientChannelOptions = (MqttClientTcpOptions)mqttClientOptions.ClientOptions.ChannelOptions;
+ var mqttClientChannelOptions = (MqttClientTcpOptions)mqttClientOptions.ChannelOptions;
var ipEndpoint = (System.Net.DnsEndPoint)mqttClientChannelOptions.RemoteEndpoint;
ipEndpoint.Host.Should().Be("broker");
ipEndpoint.Port.Should().Be(1883);
- mqttClientOptions.ClientOptions.Credentials.Should().BeNull();
+ mqttClientOptions.Credentials.Should().BeNull();
- mqttClientOptions.ClientOptions.ChannelOptions.TlsOptions.UseTls.Should().BeTrue();
+ mqttClientOptions.ChannelOptions.TlsOptions.UseTls.Should().BeTrue();
// This would only get set to true if it and UseTls are both true
- mqttClientOptions.ClientOptions.ChannelOptions.TlsOptions.AllowUntrustedCertificates.Should().BeFalse();
+ mqttClientOptions.ChannelOptions.TlsOptions.AllowUntrustedCertificates.Should().BeFalse();
}
[Fact]
@@ -80,21 +83,22 @@ public void IgnoresTlsCustomizationIfTlsIsntEnabled()
mqttClientOptions.Should().NotBeNull();
- mqttClientOptions.ClientOptions.ChannelOptions.Should().NotBeNull();
- mqttClientOptions.ClientOptions.ChannelOptions.Should().BeOfType();
+ mqttClientOptions.ProtocolVersion.Should().Be(MqttProtocolVersion.V311);
+ mqttClientOptions.ChannelOptions.Should().NotBeNull();
+ mqttClientOptions.ChannelOptions.Should().BeOfType();
- var mqttClientChannelOptions = (MqttClientTcpOptions)mqttClientOptions.ClientOptions.ChannelOptions;
+ var mqttClientChannelOptions = (MqttClientTcpOptions)mqttClientOptions.ChannelOptions;
var ipEndpoint = (System.Net.DnsEndPoint)mqttClientChannelOptions.RemoteEndpoint;
ipEndpoint.Host.Should().Be("broker");
ipEndpoint.Port.Should().Be(1883);
- mqttClientOptions.ClientOptions.Credentials.Should().BeNull();
+ mqttClientOptions.Credentials.Should().BeNull();
- mqttClientOptions.ClientOptions.ChannelOptions.TlsOptions.UseTls.Should().BeFalse();
+ mqttClientOptions.ChannelOptions.TlsOptions.UseTls.Should().BeFalse();
// This would only get set to true if it and UseTls are both true
- mqttClientOptions.ClientOptions.ChannelOptions.TlsOptions.AllowUntrustedCertificates.Should().BeFalse();
+ mqttClientOptions.ChannelOptions.TlsOptions.AllowUntrustedCertificates.Should().BeFalse();
}
[Fact]
@@ -114,23 +118,43 @@ public void CreatesFullyCustomizedConfiguration()
mqttClientOptions.Should().NotBeNull();
- mqttClientOptions.ClientOptions.ChannelOptions.Should().NotBeNull();
- mqttClientOptions.ClientOptions.ChannelOptions.Should().BeOfType();
+ mqttClientOptions.ProtocolVersion.Should().Be(MqttProtocolVersion.V311);
+ mqttClientOptions.ChannelOptions.Should().NotBeNull();
+ mqttClientOptions.ChannelOptions.Should().BeOfType();
- var mqttClientChannelOptions = (MqttClientTcpOptions)mqttClientOptions.ClientOptions.ChannelOptions;
+ var mqttClientChannelOptions = (MqttClientTcpOptions)mqttClientOptions.ChannelOptions;
var ipEndpoint = (System.Net.DnsEndPoint)mqttClientChannelOptions.RemoteEndpoint;
ipEndpoint.Host.Should().Be("broker");
ipEndpoint.Port.Should().Be(1234);
- mqttClientOptions.ClientOptions.Credentials.Should().NotBeNull();
- mqttClientOptions.ClientOptions.Credentials.Should().BeOfType();
+ mqttClientOptions.Credentials.Should().NotBeNull();
+ mqttClientOptions.Credentials.Should().BeOfType();
- mqttClientOptions.ClientOptions.Credentials.GetUserName(mqttClientOptions.ClientOptions).Should().Be("testuser");
- mqttClientOptions.ClientOptions.Credentials.GetPassword(mqttClientOptions.ClientOptions).Should().BeEquivalentTo(Encoding.UTF8.GetBytes("testpassword"));
+ mqttClientOptions.Credentials.GetUserName(mqttClientOptions).Should().Be("testuser");
+ mqttClientOptions.Credentials.GetPassword(mqttClientOptions).Should().BeEquivalentTo(Encoding.UTF8.GetBytes("testpassword"));
- mqttClientOptions.ClientOptions.ChannelOptions.TlsOptions.UseTls.Should().BeTrue();
- mqttClientOptions.ClientOptions.ChannelOptions.TlsOptions.AllowUntrustedCertificates.Should().BeTrue();
+ mqttClientOptions.ChannelOptions.TlsOptions.UseTls.Should().BeTrue();
+ mqttClientOptions.ChannelOptions.TlsOptions.AllowUntrustedCertificates.Should().BeTrue();
+ }
+
+ [Theory]
+ [InlineData("testuser", null)]
+ [InlineData(null, "testpassword")]
+ [InlineData("", "testpassword")]
+ [InlineData("testuser", "")]
+ public void DoesNotSetCredentialsUnlessUserNameAndPasswordAreProvided(string? userName, string? password)
+ {
+ var mqttConfiguration = new MqttConfiguration
+ {
+ Host = "broker",
+ UserName = userName,
+ Password = password
+ };
+
+ var mqttClientOptions = MqttClientOptionsFactory.CreateClientOptions(mqttConfiguration);
+
+ mqttClientOptions.Credentials.Should().BeNull();
}
[Fact]
diff --git a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MqttEntityManagerTester.cs b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MqttEntityManagerTester.cs
index b3de546a3..9e99fbbd9 100644
--- a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MqttEntityManagerTester.cs
+++ b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/MqttEntityManagerTester.cs
@@ -1,4 +1,5 @@
-using NetDaemon.Extensions.MqttEntityManager;
+using MQTTnet;
+using NetDaemon.Extensions.MqttEntityManager;
using NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests.TestHelpers;
namespace NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests;
@@ -22,7 +23,7 @@ public async Task CreateWithNoOptionsSetsBaseConfig()
var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions());
await entityManager.CreateAsync("domain.sensor");
- var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []);
+ var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString());
payload?.Count.Should().Be(6);
payload?["name"].ToString().Should().Be("sensor");
@@ -40,7 +41,7 @@ public async Task CreateWithDefaultOptionsSetsBaseConfig()
var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions());
await entityManager.CreateAsync("domain.sensor", new EntityCreationOptions());
- var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []);
+ var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString());
payload?.Count.Should().Be(6);
payload?["name"].ToString().Should().Be("sensor");
@@ -58,7 +59,7 @@ public async Task CreateCanSetUniqueId()
var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions());
await entityManager.CreateAsync("domain.sensor", new EntityCreationOptions(UniqueId: "my_id"));
- var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []);
+ var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString());
payload?["unique_id"].ToString().Should().Be("my_id");
}
@@ -70,7 +71,7 @@ public async Task CreateSetsDefaultEntityId()
var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions());
await entityManager.CreateAsync("domain.the_id");
- var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []);
+ var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString());
payload?["default_entity_id"].ToString().Should().Be("domain.the_id");
}
@@ -82,7 +83,7 @@ public async Task CreateCanSetDeviceClass()
var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions());
await entityManager.CreateAsync("domain.sensor", new EntityCreationOptions(DeviceClass: "classy"));
- var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []);
+ var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString());
payload?["device_class"].ToString().Should().Be("classy");
}
@@ -94,7 +95,7 @@ public async Task CreateCanSetName()
var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions());
await entityManager.CreateAsync("domain.sensor", new EntityCreationOptions(Name: "george"));
- var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []);
+ var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString());
payload?["name"].ToString().Should().Be("george");
}
@@ -130,7 +131,7 @@ public async Task CreateCanSetAdditionalOptions()
var otherOptions = new { sub_class = "lights", up_state = "live" };
await entityManager.CreateAsync("domain.sensor", additionalConfig: otherOptions);
- var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []);
+ var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString());
payload?["sub_class"].ToString().Should().Be("lights");
payload?["up_state"].ToString().Should().Be("live");
@@ -145,7 +146,7 @@ public async Task CreateCanOverrideBaseConfig()
var otherOptions = new { command_topic = "my/topic" };
await entityManager.CreateAsync("domain.sensor", additionalConfig: otherOptions);
- var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []);
+ var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString());
payload?["command_topic"].ToString().Should().Be("my/topic");
}
@@ -157,7 +158,7 @@ public async Task CreateAvailabilityTopicOffByDefault()
var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions());
await entityManager.CreateAsync("domain.sensor");
- var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []);
+ var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString());
payload?.ContainsKey("availability_topic").Should().BeFalse();
}
@@ -169,7 +170,7 @@ public async Task CreateAvailabilityTopicSetForAvailUp()
var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions());
await entityManager.CreateAsync("domain.sensor", new EntityCreationOptions(PayloadAvailable: "up"));
- var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []);
+ var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString());
payload?.ContainsKey("availability_topic").Should().BeTrue();
payload?["availability_topic"].ToString().Should().Be("homeassistant/domain/sensor/availability");
@@ -183,7 +184,7 @@ public async Task CreateAvailabilityTopicSetForAvailDown()
var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions());
await entityManager.CreateAsync("domain.sensor", new EntityCreationOptions(PayloadNotAvailable: "down"));
- var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []);
+ var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString());
payload?.ContainsKey("availability_topic").Should().BeTrue();
payload?["availability_topic"].ToString().Should().Be("homeassistant/domain/sensor/availability");
@@ -198,7 +199,7 @@ public async Task CanRemove()
await entityManager.RemoveAsync("domain.sensor");
- mqttSetup.LastPublishedMessage.PayloadSegment.Should().BeEmpty();
+ mqttSetup.LastPublishedMessage.ConvertPayloadToString().Should().BeNullOrEmpty();
}
[Fact]
@@ -208,7 +209,7 @@ public async Task CanSetState()
var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions());
await entityManager.SetStateAsync("domain.sensor", "NewState");
- var payload = Encoding.Default.GetString(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []);
+ var payload = mqttSetup.LastPublishedMessage.ConvertPayloadToString();
mqttSetup.LastPublishedMessage.Topic.Should().Be("homeassistant/domain/sensor/state");
payload.Should().Be("NewState");
@@ -223,7 +224,7 @@ public async Task CanSetStateToBlank()
await entityManager.SetStateAsync("domain.sensor", "");
mqttSetup.LastPublishedMessage.Topic.Should().Be("homeassistant/domain/sensor/state");
- mqttSetup.LastPublishedMessage.PayloadSegment.Should().BeEmpty();
+ mqttSetup.LastPublishedMessage.ConvertPayloadToString().Should().BeNullOrEmpty();
}
[Fact]
@@ -234,7 +235,7 @@ public async Task CanSetAttributes()
var attributes = new { colour = "purple", ziggy = "stardust" };
await entityManager.SetAttributesAsync("domain.sensor", attributes);
- var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []);
+ var payload = PayloadToDictionary(mqttSetup.LastPublishedMessage.ConvertPayloadToString());
mqttSetup.LastPublishedMessage.Topic.Should().Be("homeassistant/domain/sensor/attributes");
payload?["colour"].ToString().Should().Be("purple");
@@ -248,18 +249,16 @@ public async Task CanSetAvailability()
var entityManager = new MqttEntityManager(mqttSetup.MessageSender, null!, GetOptions());
await entityManager.SetAvailabilityAsync("domain.sensor", "up");
- var payload = Encoding.Default.GetString(mqttSetup.LastPublishedMessage.PayloadSegment.Array ?? []);
+ var payload = mqttSetup.LastPublishedMessage.ConvertPayloadToString();
mqttSetup.LastPublishedMessage.Topic.Should().Be("homeassistant/domain/sensor/availability");
payload.Should().Be("up");
}
- private static Dictionary? PayloadToDictionary(byte[] payload)
+ private static Dictionary? PayloadToDictionary(string payload)
{
- return JsonSerializer.Deserialize>(
- Encoding.Default.GetString(payload)
- );
+ return JsonSerializer.Deserialize>(payload);
}
private static IOptions GetOptions()
diff --git a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/TestHelpers/MockMqttMessageSenderSetup.cs b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/TestHelpers/MockMqttMessageSenderSetup.cs
index 22fb686bf..eac30f1fc 100644
--- a/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/TestHelpers/MockMqttMessageSenderSetup.cs
+++ b/src/Client/NetDaemon.HassClient.Tests/ExtensionsTest/MqttEntityManagerTests/TestHelpers/MockMqttMessageSenderSetup.cs
@@ -1,78 +1,71 @@
-using MQTTnet;
-using MQTTnet.Extensions.ManagedClient;
+using MQTTnet;
+using MQTTnet.Packets;
using NetDaemon.Extensions.MqttEntityManager;
-using NetDaemon.Extensions.MqttEntityManager.Helpers;
namespace NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests.TestHelpers;
///
-/// These are helpers that set up mock mqtt factories and connection, and allow a message to be sent through
-/// the mock client and capture the message that would have been published
+/// Test helper that captures MQTT messages sent by the entity manager.
///
internal sealed class MockMqttMessageSenderSetup
{
- public AssuredMqttConnection Connection { get; private set; } = null!;
- public Mock MqttClient { get; private set; } = null!;
- public Mock MqttClientOptionsFactory { get; private set; } = null!;
-
- public MqttFactoryWrapper MqttFactory { get; private set; } = null!;
+ public FakeAssuredMqttConnection Connection { get; } = new();
public MessageSender MessageSender { get; private set; } = null!;
- public MqttApplicationMessage LastPublishedMessage { get; set; } = null!;
+ public MqttApplicationMessage LastPublishedMessage => Connection.LastPublishedMessage;
public MockMqttMessageSenderSetup()
{
- SetupMockMqtt();
SetupMessageSender();
- SetupMessageReceiver();
}
- // ReSharper disable once MemberCanBePrivate.Global
- public void SetupMessageReceiver()
+ private void SetupMessageSender()
{
- // Ensure that when the MQTT Client is called its published message is saved
- MqttClient.Setup(m => m.EnqueueAsync(It.IsAny()))
- .Callback(message =>
- {
- LastPublishedMessage = message;
- });
+ var logger = new Mock>().Object;
+ MessageSender = new MessageSender(logger, Connection);
}
+}
- ///
- /// Get a mocked MQTT client, factor and connection
- ///
- ///
- private void SetupMockMqtt()
- {
- var mqttConfiguration = new MqttConfiguration
- {
- Host = "localhost",
- UserName = "id"
- };
+internal sealed class FakeAssuredMqttConnection : IAssuredMqttConnection
+{
+ private readonly List _subscriptions = [];
- var options = new Mock>();
+ public event Func? ApplicationMessageReceivedAsync;
- options.Setup(o => o.Value)
- .Returns(() => mqttConfiguration);
+ public IReadOnlyList Subscriptions => _subscriptions;
- MqttClient = new Mock();
- MqttClientOptionsFactory = new Mock();
- MqttFactory = new MqttFactoryWrapper(MqttClient.Object);
+ public MqttApplicationMessage LastPublishedMessage { get; private set; } = null!;
- MqttClientOptionsFactory
- .Setup(o => o.CreateClientOptions(mqttConfiguration))
- .Returns(new ManagedMqttClientOptions());
+ public Task PublishAsync(MqttApplicationMessage message)
+ {
+ LastPublishedMessage = message;
+ return Task.CompletedTask;
+ }
- Connection = new AssuredMqttConnection(
- new Mock>().Object,
- MqttClientOptionsFactory.Object,
- MqttFactory,
- options.Object);
+ public Task SubscribeAsync(MqttTopicFilter topicFilter)
+ {
+ _subscriptions.Add(topicFilter);
+ return Task.CompletedTask;
}
- private void SetupMessageSender()
+ public Task ReceiveAsync(string topic, string payload)
{
- var logger = new Mock>().Object;
- MessageSender = new MessageSender(logger, Connection);
+ var message = new MqttApplicationMessageBuilder()
+ .WithTopic(topic)
+ .WithPayload(Encoding.UTF8.GetBytes(payload))
+ .Build();
+
+ var publishPacket = new MqttPublishPacket
+ {
+ Topic = topic,
+ PayloadSegment = new ArraySegment(Encoding.UTF8.GetBytes(payload))
+ };
+
+ var eventArgs = new MqttApplicationMessageReceivedEventArgs(
+ "test",
+ message,
+ publishPacket,
+ (_, _) => Task.CompletedTask);
+ return ApplicationMessageReceivedAsync?.Invoke(eventArgs) ?? Task.CompletedTask;
}
}
diff --git a/src/Client/NetDaemon.HassClient.Tests/NetDaemon.HassClient.Tests.csproj b/src/Client/NetDaemon.HassClient.Tests/NetDaemon.HassClient.Tests.csproj
index 0d4a98e16..54e4d2074 100644
--- a/src/Client/NetDaemon.HassClient.Tests/NetDaemon.HassClient.Tests.csproj
+++ b/src/Client/NetDaemon.HassClient.Tests/NetDaemon.HassClient.Tests.csproj
@@ -1,7 +1,7 @@
-
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
@@ -9,18 +9,18 @@
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
diff --git a/src/Client/NetDaemon.HassClient/NetDaemon.Client.csproj b/src/Client/NetDaemon.HassClient/NetDaemon.Client.csproj
index ee209ab93..8d4ed78e5 100644
--- a/src/Client/NetDaemon.HassClient/NetDaemon.Client.csproj
+++ b/src/Client/NetDaemon.HassClient/NetDaemon.Client.csproj
@@ -38,11 +38,11 @@
-
-
-
-
-
+
+
+
+
+
runtime; build; native; contentfiles; analyzers; buildtransitive
diff --git a/src/Extensions/NetDaemon.Extensions.Logging/NetDaemon.Extensions.Logging.csproj b/src/Extensions/NetDaemon.Extensions.Logging/NetDaemon.Extensions.Logging.csproj
index ce78d5c17..0cb16d7ac 100644
--- a/src/Extensions/NetDaemon.Extensions.Logging/NetDaemon.Extensions.Logging.csproj
+++ b/src/Extensions/NetDaemon.Extensions.Logging/NetDaemon.Extensions.Logging.csproj
@@ -30,9 +30,9 @@
-
-
-
+
+
+
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/AssuredMqttConnection.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/AssuredMqttConnection.cs
index 882ea9a68..90a9af9c2 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/AssuredMqttConnection.cs
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/AssuredMqttConnection.cs
@@ -1,24 +1,37 @@
-using System.Globalization;
-using System.Text;
+using System.Collections.Concurrent;
+using System.Globalization;
+using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
-using MQTTnet.Client;
-using MQTTnet.Extensions.ManagedClient;
-using NetDaemon.Extensions.MqttEntityManager.Exceptions;
-using NetDaemon.Extensions.MqttEntityManager.Helpers;
+using MQTTnet;
+using MQTTnet.Packets;
namespace NetDaemon.Extensions.MqttEntityManager;
///
-/// Wrapper to assure an MQTT connection
+/// Wrapper to assure an MQTT connection.
///
internal class AssuredMqttConnection : IAssuredMqttConnection, IDisposable
{
+ private static readonly TimeSpan ReconnectDelay = TimeSpan.FromSeconds(5);
+ private static readonly TimeSpan ConnectedPollDelay = TimeSpan.FromSeconds(1);
+
private readonly ILogger _logger;
- private readonly IMqttClientOptionsFactory _mqttClientOptionsFactory;
+ private readonly IMqttClient _mqttClient;
+ private readonly MqttClientOptions _clientOptions;
+ private readonly Channel _publishQueue = Channel.CreateUnbounded(
+ new UnboundedChannelOptions
+ {
+ SingleReader = true,
+ SingleWriter = false
+ });
+ private readonly ConcurrentDictionary _subscriptions = new();
+ private readonly CancellationTokenSource _stopping = new();
+ private readonly SemaphoreSlim _connectionSignal = new(0, 1);
private readonly Task _connectionTask;
- private IManagedMqttClient? _mqttClient;
+ private readonly Task _publishTask;
private bool _disposed;
+ private volatile bool _hasConnected;
///
/// Initializes a new instance of the class.
@@ -30,61 +43,186 @@ internal class AssuredMqttConnection : IAssuredMqttConnection, IDisposable
public AssuredMqttConnection(
ILogger logger,
IMqttClientOptionsFactory mqttClientOptionsFactory,
- IMqttFactoryWrapper mqttFactory,
+ IMqttFactory mqttFactory,
IOptions mqttConfig)
{
_logger = logger;
- _mqttClientOptionsFactory = mqttClientOptionsFactory;
+
_logger.LogTrace("MQTT initiating connection");
- _connectionTask = Task.Run(() => ConnectAsync(mqttConfig.Value, mqttFactory));
+ _clientOptions = mqttClientOptionsFactory.CreateClientOptions(mqttConfig.Value);
+ _mqttClient = mqttFactory.CreateMqttClient();
+
+ _mqttClient.ConnectedAsync += MqttClientOnConnectedAsync;
+ _mqttClient.DisconnectedAsync += MqttClientOnDisconnectedAsync;
+ _mqttClient.ApplicationMessageReceivedAsync += MqttClientOnApplicationMessageReceivedAsync;
+
+ _connectionTask = Task.Run(() => MaintainConnectionAsync(_stopping.Token));
+ _publishTask = Task.Run(() => PublishQueuedMessagesAsync(_stopping.Token));
}
- ///
- /// Ensures that the MQTT client is available
- ///
- ///
- /// Timed out while waiting for connection
- public async Task GetClientAsync()
+ ///
+ public event Func? ApplicationMessageReceivedAsync;
+
+ ///
+ public async Task PublishAsync(MqttApplicationMessage message)
{
- await _connectionTask;
+ ObjectDisposedException.ThrowIf(_disposed, this);
+ await _publishQueue.Writer.WriteAsync(message, _stopping.Token).ConfigureAwait(false);
+ }
- return _mqttClient ?? throw new MqttConnectionException("Unable to create MQTT connection");
+ ///
+ public async Task SubscribeAsync(MqttTopicFilter topicFilter)
+ {
+ ObjectDisposedException.ThrowIf(_disposed, this);
+
+ if (string.IsNullOrEmpty(topicFilter.Topic))
+ {
+ throw new ArgumentException("MQTT topic filter must specify a topic.", nameof(topicFilter));
+ }
+
+ _subscriptions[topicFilter.Topic] = topicFilter;
+
+ if (_mqttClient.IsConnected)
+ {
+ await SubscribeOnClientAsync(topicFilter, _stopping.Token).ConfigureAwait(false);
+ }
}
- private async Task ConnectAsync(MqttConfiguration mqttConfig, IMqttFactoryWrapper mqttFactory)
+ private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
{
- _logger.LogTrace("Connecting to MQTT broker at {Host}:{Port}/{UserName}",
- mqttConfig.Host, mqttConfig.Port, mqttConfig.UserName);
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ try
+ {
+ if (_mqttClient.IsConnected)
+ {
+ await Task.Delay(ConnectedPollDelay, cancellationToken).ConfigureAwait(false);
+ continue;
+ }
- var clientOptions = _mqttClientOptionsFactory.CreateClientOptions(mqttConfig);
+ _logger.LogTrace("Connecting to MQTT broker at {Host}:{Port}/{UserName}",
+ GetConfiguredHost(), GetConfiguredPort(), _clientOptions.Credentials?.GetUserName(_clientOptions));
- _mqttClient = mqttFactory.CreateManagedMqttClient();
+ var connectResult = await _mqttClient.ConnectAsync(_clientOptions, cancellationToken).ConfigureAwait(false);
- _mqttClient.ConnectedAsync += MqttClientOnConnectedAsync;
- _mqttClient.DisconnectedAsync += MqttClientOnDisconnectedAsync;
+ if (connectResult.ResultCode != MqttClientConnectResultCode.Success)
+ {
+ _logger.LogTrace("MQTT connection rejected: {ResultCode} {ReasonString}",
+ connectResult.ResultCode, connectResult.ReasonString);
+ await Task.Delay(ReconnectDelay, cancellationToken).ConfigureAwait(false);
+ continue;
+ }
+
+ _logger.LogTrace("MQTT client is ready");
+ _hasConnected = true;
+ SignalConnection();
+ await ResubscribeAsync(cancellationToken).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+ {
+ break;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogTrace(ex, "MQTT connection attempt failed");
+ await DelayReconnectAsync(cancellationToken).ConfigureAwait(false);
+ }
+ }
+ }
+
+ private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
+ {
+ await foreach (var message in _publishQueue.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ try
+ {
+ if (!_mqttClient.IsConnected)
+ {
+ await WaitForConnectionAsync(cancellationToken).ConfigureAwait(false);
+ continue;
+ }
- await _mqttClient.StartAsync(clientOptions);
+ await _mqttClient.PublishAsync(message, cancellationToken).ConfigureAwait(false);
+ break;
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+ {
+ break;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogDebug(ex, "Failed to publish MQTT message. The message will be retried after reconnect.");
+ await WaitForConnectionAsync(cancellationToken).ConfigureAwait(false);
+ }
+ }
+ }
+ }
- _logger.LogTrace("MQTT client is ready");
+ private async Task ResubscribeAsync(CancellationToken cancellationToken)
+ {
+ foreach (var topicFilter in _subscriptions.Values)
+ {
+ await SubscribeOnClientAsync(topicFilter, cancellationToken).ConfigureAwait(false);
+ }
+ }
+
+ private async Task SubscribeOnClientAsync(MqttTopicFilter topicFilter, CancellationToken cancellationToken)
+ {
+ var subscribeOptions = new MqttClientSubscribeOptionsBuilder()
+ .WithTopicFilter(topicFilter)
+ .Build();
+
+ await _mqttClient.SubscribeAsync(subscribeOptions, cancellationToken).ConfigureAwait(false);
}
private Task MqttClientOnDisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
- _logger.LogDebug("MQTT disconnected: {Reason}", BuildErrorResponse(arg));
+ if (_disposed || _stopping.IsCancellationRequested)
+ {
+ return Task.CompletedTask;
+ }
+
+ if (_hasConnected)
+ {
+ _logger.LogDebug("MQTT disconnected: {Reason}", BuildErrorResponse(arg));
+ }
+ else
+ {
+ _logger.LogTrace("MQTT disconnected before the initial connection completed: {Reason}", BuildErrorResponse(arg));
+ }
+
return Task.CompletedTask;
}
private Task MqttClientOnConnectedAsync(MqttClientConnectedEventArgs arg)
{
+ _hasConnected = true;
_logger.LogDebug("MQTT connected: {ResultCode}", arg.ConnectResult.ResultCode);
+ SignalConnection();
return Task.CompletedTask;
}
+ private async Task MqttClientOnApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
+ {
+ var handlers = ApplicationMessageReceivedAsync;
+ if (handlers is null)
+ {
+ return;
+ }
+
+ foreach (Func handler in handlers.GetInvocationList())
+ {
+ await handler(arg).ConfigureAwait(false);
+ }
+ }
+
private static string BuildErrorResponse(MqttClientDisconnectedEventArgs arg)
{
- var sb = new StringBuilder();
+ var sb = new System.Text.StringBuilder();
- sb.AppendLine(CultureInfo.InvariantCulture, $"{arg.Exception?.Message} ({arg.Reason})"); // Note: arg.ReasonString is always null
+ sb.AppendLine(CultureInfo.InvariantCulture, $"{arg.Exception?.Message} ({arg.Reason})");
var ex = arg.Exception?.InnerException;
while (ex != null)
{
@@ -95,14 +233,82 @@ private static string BuildErrorResponse(MqttClientDisconnectedEventArgs arg)
return sb.ToString();
}
+ private void SignalConnection()
+ {
+ if (_connectionSignal.CurrentCount == 0)
+ {
+ _connectionSignal.Release();
+ }
+ }
+
+ private async Task WaitForConnectionAsync(CancellationToken cancellationToken)
+ {
+ if (_mqttClient.IsConnected)
+ {
+ return;
+ }
+
+ await _connectionSignal.WaitAsync(cancellationToken).ConfigureAwait(false);
+ }
+
+ private static async Task DelayReconnectAsync(CancellationToken cancellationToken)
+ {
+ try
+ {
+ await Task.Delay(ReconnectDelay, cancellationToken).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+ {
+ }
+ }
+
+ private string? GetConfiguredHost()
+ {
+ return (_clientOptions.ChannelOptions as MqttClientTcpOptions)?.RemoteEndpoint switch
+ {
+ System.Net.DnsEndPoint dnsEndPoint => dnsEndPoint.Host,
+ System.Net.IPEndPoint ipEndPoint => ipEndPoint.Address.ToString(),
+ _ => null
+ };
+ }
+
+ private int? GetConfiguredPort()
+ {
+ return (_clientOptions.ChannelOptions as MqttClientTcpOptions)?.RemoteEndpoint switch
+ {
+ System.Net.DnsEndPoint dnsEndPoint => dnsEndPoint.Port,
+ System.Net.IPEndPoint ipEndPoint => ipEndPoint.Port,
+ _ => null
+ };
+ }
+
+ ///
public void Dispose()
{
if (_disposed)
+ {
return;
+ }
_disposed = true;
_logger.LogTrace("MQTT disconnecting");
- _connectionTask?.Dispose();
- _mqttClient?.Dispose();
+ _stopping.Cancel();
+ _publishQueue.Writer.TryComplete();
+
+ try
+ {
+ Task.WaitAll([_connectionTask, _publishTask], TimeSpan.FromSeconds(1));
+ }
+ catch (AggregateException)
+ {
+ }
+
+ _stopping.Dispose();
+ _connectionSignal.Dispose();
+
+ if (_mqttClient is IDisposable disposable)
+ {
+ disposable.Dispose();
+ }
}
}
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/DependencyInjectionSetup.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/DependencyInjectionSetup.cs
index 0e7f95b11..2addf8dfe 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/DependencyInjectionSetup.cs
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/DependencyInjectionSetup.cs
@@ -2,8 +2,6 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
-using NetDaemon.Extensions.MqttEntityManager.Helpers;
-
#endregion
namespace NetDaemon.Extensions.MqttEntityManager;
@@ -16,8 +14,8 @@ public static class DependencyInjectionSetup
///
/// Add support for managing entities via MQTT
///
- ///
- ///
+ /// The host builder.
+ /// The configured host builder.
public static IHostBuilder UseNetDaemonMqttEntityManagement(this IHostBuilder hostBuilder)
{
return hostBuilder.ConfigureServices((_, services) =>
@@ -29,11 +27,12 @@ public static IHostBuilder UseNetDaemonMqttEntityManagement(this IHostBuilder ho
///
/// Add support for managing entities via MQTT
///
+ /// The service collection.
+ /// The configured service collection.
public static IServiceCollection AddNetDaemonMqttEntityManagement(this IServiceCollection services)
{
services.AddSingleton();
services.AddSingleton();
- services.AddSingleton();
services.AddSingleton();
services.AddSingleton();
services.AddSingleton();
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Exceptions/MqttConnectionException.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Exceptions/MqttConnectionException.cs
index c8a5a3f64..68cd19de3 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Exceptions/MqttConnectionException.cs
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Exceptions/MqttConnectionException.cs
@@ -8,15 +8,15 @@ public class MqttConnectionException : Exception
///
/// MQTT connection failed
///
- ///
+ /// The exception message.
public MqttConnectionException(string msg) : base(msg)
{}
///
/// MQTT connection failed
///
- ///
- ///
+ /// The exception message.
+ /// The inner exception.
public MqttConnectionException(string msg, Exception innerException) : base(msg, innerException)
{}
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Exceptions/MqttPublishException.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Exceptions/MqttPublishException.cs
index b5c9e62c3..b1a2cef4f 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Exceptions/MqttPublishException.cs
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Exceptions/MqttPublishException.cs
@@ -8,15 +8,15 @@ public class MqttPublishException : Exception
///
/// Failed to publish a message to MQTT
///
- ///
+ /// The exception message.
public MqttPublishException(string msg) : base(msg)
{}
///
/// Failed to publish a message to MQTT
///
- ///
- ///
+ /// The exception message.
+ /// The inner exception.
public MqttPublishException(string msg, Exception innerException) : base(msg, innerException)
{}
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/ByteArrayHelper.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/ByteArrayHelper.cs
index cf9a79a76..78c6b126c 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/ByteArrayHelper.cs
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/ByteArrayHelper.cs
@@ -10,8 +10,8 @@ internal static class ByteArrayHelper
///
/// Convert a byte array to a string, or to an empty string if the array is not valid UTF8
///
- ///
- ///
+ /// The byte array to convert.
+ /// The UTF-8 string, or an empty string when the array is null, empty, or invalid.
public static string SafeToString(byte[]? array)
{
try
@@ -26,4 +26,4 @@ public static string SafeToString(byte[]? array)
return "";
}
}
-}
\ No newline at end of file
+}
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/EntityCreationPayloadHelper.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/EntityCreationPayloadHelper.cs
index 167587430..167d0ab42 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/EntityCreationPayloadHelper.cs
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/EntityCreationPayloadHelper.cs
@@ -12,10 +12,10 @@ internal class EntityCreationPayloadHelper
///
/// Merge an optional dynamic set of parameters with the concrete payload
///
- ///
- ///
- ///
- ///
+ /// The base MQTT discovery payload.
+ /// Optional additional MQTT discovery configuration.
+ /// The merged MQTT discovery payload JSON.
+ /// Thrown when the base payload cannot be converted to JSON.
internal static string Merge(EntityCreationPayload concreteOptions, object? additionalOptions)
{
var concreteJson = JsonSerializer.SerializeToNode(concreteOptions)?.AsObject()
@@ -29,4 +29,4 @@ internal static string Merge(EntityCreationPayload concreteOptions, object? addi
return concreteJson.ToJsonString();
}
-}
\ No newline at end of file
+}
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/EntityIdParser.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/EntityIdParser.cs
index cec33c174..800791e29 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/EntityIdParser.cs
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/EntityIdParser.cs
@@ -9,7 +9,7 @@ internal static class EntityIdParser
/// Extract the domain and identifier from an entity ID string
///
/// Entity ID in the format "domain.identifier"
- ///
+ /// The parsed domain and identifier.
/// If entityId is not supplied or is an invalid
/// format
public static (string domain, string identifier) Extract(string entityId)
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/IMqttFactoryWrapper.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/IMqttFactoryWrapper.cs
deleted file mode 100644
index edc2753ed..000000000
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/IMqttFactoryWrapper.cs
+++ /dev/null
@@ -1,15 +0,0 @@
-using MQTTnet.Extensions.ManagedClient;
-
-namespace NetDaemon.Extensions.MqttEntityManager.Helpers;
-
-///
-/// Testable wrapper around IMqttFactory
-///
-internal interface IMqttFactoryWrapper
-{
- ///
- /// Return a managed MQTT client, either from the original factory or a pre-supplied one
- ///
- ///
- IManagedMqttClient CreateManagedMqttClient();
-}
\ No newline at end of file
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/JsonNodeExtensions.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/JsonNodeExtensions.cs
index 7b3ad1464..401e63c89 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/JsonNodeExtensions.cs
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/JsonNodeExtensions.cs
@@ -12,8 +12,8 @@ internal static class JsonNodeExtensions
/// For a given JsonObject, merge a second set of values, replacing any pre-existing properties in
/// the target object
///
- ///
- ///
+ /// The target JSON object.
+ /// The JSON object to merge into the target.
public static void AddRange(this JsonObject target, JsonObject? toMerge)
{
if (toMerge == null)
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/MqttFactoryWrapper.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/MqttFactoryWrapper.cs
deleted file mode 100644
index 9adf6dad2..000000000
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Helpers/MqttFactoryWrapper.cs
+++ /dev/null
@@ -1,40 +0,0 @@
-using MQTTnet.Extensions.ManagedClient;
-
-namespace NetDaemon.Extensions.MqttEntityManager.Helpers;
-
-///
-/// Testable wrapper around IMqttFactory
-///
-internal class MqttFactoryWrapper : IMqttFactoryWrapper
-{
- private readonly IMqttFactory? _mqttFactory;
- private readonly IManagedMqttClient? _client;
-
- ///
- /// Standard functionality - set the IMqttFactory that will return a client
- ///
- ///
- public MqttFactoryWrapper(IMqttFactory mqttFactory)
- {
- _mqttFactory = mqttFactory;
- }
-
- ///
- /// Testing functionality - specify a client that will be returned
- ///
- ///
- public MqttFactoryWrapper(IManagedMqttClient client)
- {
- _client = client;
- }
-
- ///
- /// Return a managed MQTT client, either from the original factory or a pre-supplied one
- ///
- ///
- public IManagedMqttClient CreateManagedMqttClient()
- {
- return _client ?? _mqttFactory?.CreateManagedMqttClient()
- ?? throw new InvalidOperationException("No client or MqttFactory specified");
- }
-}
\ No newline at end of file
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IAssuredMqttConnection.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IAssuredMqttConnection.cs
index f68b797c6..b8c5cbf61 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IAssuredMqttConnection.cs
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IAssuredMqttConnection.cs
@@ -1,14 +1,25 @@
-using MQTTnet.Extensions.ManagedClient;
+using MQTTnet;
+using MQTTnet.Packets;
namespace NetDaemon.Extensions.MqttEntityManager;
///
-/// Wrapper to assure an MQTT connection
+/// Wrapper to assure an MQTT connection.
///
internal interface IAssuredMqttConnection
{
///
- /// Ensures that the MQTT client is available
+ /// Raised when an MQTT application message is received.
///
- Task GetClientAsync();
-}
\ No newline at end of file
+ event Func? ApplicationMessageReceivedAsync;
+
+ ///
+ /// Queue a message to publish to MQTT.
+ ///
+ Task PublishAsync(MqttApplicationMessage message);
+
+ ///
+ /// Subscribe to a topic, retaining the subscription across reconnects.
+ ///
+ Task SubscribeAsync(MqttTopicFilter topicFilter);
+}
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMessageSender.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMessageSender.cs
index b54eb7c1c..22ce79cb9 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMessageSender.cs
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMessageSender.cs
@@ -10,10 +10,10 @@ internal interface IMessageSender
///
/// Send a message for the given payload to the MQTT topic
///
- ///
- ///
- ///
- ///
- ///
+ /// The MQTT topic.
+ /// The message payload.
+ /// Whether the message should be retained.
+ /// The MQTT quality of service level.
+ /// A task that represents the asynchronous send operation.
Task SendMessageAsync(string topic, string payload, bool retain, MqttQualityOfServiceLevel qos);
-}
\ No newline at end of file
+}
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMessageSubscriber.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMessageSubscriber.cs
index 7397ea0b9..f1d728e33 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMessageSubscriber.cs
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMessageSubscriber.cs
@@ -5,6 +5,7 @@ internal interface IMessageSubscriber
///
/// Receive a message from the given topic
///
- ///
+ /// The MQTT topic.
+ /// An observable that receives payloads for the topic.
Task> SubscribeTopicAsync(string topic);
-}
\ No newline at end of file
+}
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttClientOptionsFactory.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttClientOptionsFactory.cs
index 180d5e113..9fab990a2 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttClientOptionsFactory.cs
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttClientOptionsFactory.cs
@@ -1,4 +1,4 @@
-using MQTTnet.Extensions.ManagedClient;
+using MQTTnet;
namespace NetDaemon.Extensions.MqttEntityManager;
@@ -9,8 +9,8 @@ public interface IMqttClientOptionsFactory
{
///
/// Creates the client options for MQTT connection from the supplied configuration.
- /// ///
+ ///
/// The MQTT configuration.
- /// The managed MQTT client options.
- ManagedMqttClientOptions CreateClientOptions(MqttConfiguration mqttConfig);
+ /// The MQTT client options.
+ MqttClientOptions CreateClientOptions(MqttConfiguration mqttConfig);
}
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttEntityManager.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttEntityManager.cs
index 93db6f010..36d2195ce 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttEntityManager.cs
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttEntityManager.cs
@@ -15,19 +15,25 @@ public interface IMqttEntityManager
///
/// Create an entity in Home Assistant via MQTT
///
+ /// Distinct identifier, in the format "domain.id", such as "sensor.kitchen_temp".
+ /// Optional entity creation options.
+ /// Optional additional Home Assistant MQTT discovery configuration.
+ /// A task that represents the asynchronous create operation.
Task CreateAsync(string entityId, EntityCreationOptions? options = null, object? additionalConfig = null);
///
/// Remove an entity from Home Assistant
///
+ /// The entity id.
+ /// A task that represents the asynchronous remove operation.
Task RemoveAsync(string entityId);
///
/// Set attributes on an entity
///
- ///
- ///
- ///
+ /// The entity id.
+ /// The attributes to set.
+ /// A task that represents the asynchronous attribute update operation.
Task SetAttributesAsync(string entityId, object attributes);
///
@@ -35,24 +41,24 @@ public interface IMqttEntityManager
/// on creating the entity then the value should match one of these.
/// If not, then use "online" and "offline"
///
- ///
- ///
- ///
+ /// The entity id.
+ /// The availability payload.
+ /// A task that represents the asynchronous availability update operation.
Task SetAvailabilityAsync(string entityId, string availability);
///
/// Set the state of an entity
///
- ///
- ///
- ///
+ /// The entity id.
+ /// The state payload.
+ /// A task that represents the asynchronous state update operation.
Task SetStateAsync(string entityId, string state);
///
/// Prepare a subscription to command topics for the given entity
/// Be sure to chain this request with .Subscribe(...)
///
- ///
- ///
+ /// The entity id.
+ /// An observable that receives command payloads for the entity.
Task> PrepareCommandSubscriptionAsync(string entityId);
-}
\ No newline at end of file
+}
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttFactory.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttFactory.cs
index 55104237b..99f1c0e03 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttFactory.cs
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/IMqttFactory.cs
@@ -1,16 +1,15 @@
-using MQTTnet.Extensions.ManagedClient;
+using MQTTnet;
namespace NetDaemon.Extensions.MqttEntityManager;
///
-/// MqttNet removed the IMqttFactory interface at v4 which breaks our DI
-/// So this is a factory for the MqttFactory to satisfy our use case
+/// Factory abstraction for MQTT clients.
///
internal interface IMqttFactory
{
///
- /// Create a Managed Mqtt Client
+ /// Create an MQTT client.
///
- ///
- IManagedMqttClient CreateManagedMqttClient();
-}
\ No newline at end of file
+ /// The MQTT client.
+ IMqttClient CreateMqttClient();
+}
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MessageSender.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MessageSender.cs
index 3c5a1a676..c6853afb9 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MessageSender.cs
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MessageSender.cs
@@ -2,9 +2,9 @@
using Microsoft.Extensions.Logging;
using MQTTnet;
-using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Protocol;
using NetDaemon.Extensions.MqttEntityManager.Exceptions;
+using System.Text;
#endregion
@@ -21,33 +21,24 @@ internal class MessageSender : IMessageSender
///
/// Manage connections and message publishing to MQTT
///
- ///
- ///
+ /// The logger.
+ /// The assured MQTT connection.
public MessageSender(ILogger logger, IAssuredMqttConnection assuredMqttConnection)
{
_logger = logger;
_assuredMqttConnection = assuredMqttConnection;
}
- ///
- /// Publish a message to the given topic
- ///
- ///
- /// Json structure of payload
- ///
- ///
+ ///
public async Task SendMessageAsync(string topic, string payload, bool retain, MqttQualityOfServiceLevel qos)
{
- var mqttClient = await _assuredMqttConnection.GetClientAsync();
-
- await PublishMessage(mqttClient, topic, payload, retain, qos);
+ await PublishMessage(topic, payload, retain, qos);
}
- private async Task PublishMessage(IManagedMqttClient mqttClient, string topic, string payload, bool retain,
- MqttQualityOfServiceLevel qos)
+ private async Task PublishMessage(string topic, string payload, bool retain, MqttQualityOfServiceLevel qos)
{
var message = new MqttApplicationMessageBuilder().WithTopic(topic)
- .WithPayload(payload)
+ .WithPayload(Encoding.UTF8.GetBytes(payload))
.WithRetainFlag(retain)
.WithQualityOfServiceLevel(qos)
.Build();
@@ -56,7 +47,7 @@ private async Task PublishMessage(IManagedMqttClient mqttClient, string topic, s
try
{
- await mqttClient.EnqueueAsync(message).ConfigureAwait(false);
+ await _assuredMqttConnection.PublishAsync(message).ConfigureAwait(false);
}
catch (Exception e)
{
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MessageSubscriber.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MessageSubscriber.cs
index dc5d16386..06b01d736 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MessageSubscriber.cs
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MessageSubscriber.cs
@@ -1,14 +1,10 @@
#region
using System.Collections.Concurrent;
-using System.Collections.ObjectModel;
using System.Reactive.Subjects;
using Microsoft.Extensions.Logging;
using MQTTnet;
-using MQTTnet.Client;
-using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Packets;
-using NetDaemon.Extensions.MqttEntityManager.Helpers;
#endregion
@@ -30,31 +26,24 @@ internal class MessageSubscriber : IMessageSubscriber, IDisposable
///
/// Managed subscriptions to topics within MQTT
///
- ///
- ///
+ /// The logger.
+ /// The assured MQTT connection.
public MessageSubscriber(ILogger logger, IAssuredMqttConnection assuredMqttConnection)
{
_logger = logger;
_assuredMqttConnection = assuredMqttConnection;
}
- ///
- /// Subscribe to the given topic
- ///
- ///
+ ///
public async Task> SubscribeTopicAsync(string topic)
{
try
{
- var mqttClient = await _assuredMqttConnection.GetClientAsync();
- await EnsureSubscriptionAsync(mqttClient);
+ await EnsureSubscriptionAsync();
- var topicFilters = new Collection
- {
- new MqttTopicFilterBuilder().WithTopic(topic).Build()
- };
+ var topicFilter = new MqttTopicFilterBuilder().WithTopic(topic).Build();
- await mqttClient.SubscribeAsync(topicFilters);
+ await _assuredMqttConnection.SubscribeAsync(topicFilter);
return _subscribers.GetOrAdd(topic, new Lazy>()).Value;
}
catch (Exception e)
@@ -67,8 +56,7 @@ public async Task> SubscribeTopicAsync(string topic)
///
/// If we are not already subscribed to receive messages, set up the handler
///
- ///
- private async Task EnsureSubscriptionAsync(IManagedMqttClient mqttClient)
+ private async Task EnsureSubscriptionAsync()
{
await _subscriptionSetupLock.WaitAsync();
try
@@ -76,7 +64,7 @@ private async Task EnsureSubscriptionAsync(IManagedMqttClient mqttClient)
if (!_subscriptionIsSetup)
{
_logger.LogInformation("Configuring message subscription");
- mqttClient.ApplicationMessageReceivedAsync += OnMessageReceivedAsync;
+ _assuredMqttConnection.ApplicationMessageReceivedAsync += OnMessageReceivedAsync;
_subscriptionIsSetup = true;
}
}
@@ -94,13 +82,13 @@ private async Task EnsureSubscriptionAsync(IManagedMqttClient mqttClient)
///
/// Message received from MQTT, so find the subscription (if any) and notify them
///
- ///
- ///
+ /// The received MQTT application message.
+ /// A completed task.
private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs msg)
{
try
{
- var payload = ByteArrayHelper.SafeToString(msg.ApplicationMessage.PayloadSegment.Array ?? []);
+ var payload = msg.ApplicationMessage.ConvertPayloadToString();
var topic = msg.ApplicationMessage.Topic;
_logger.LogTrace("Subscription received {Payload} from {Topic}", payload, topic);
@@ -119,6 +107,7 @@ private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs msg)
return Task.CompletedTask;
}
+ ///
public void Dispose()
{
if (_isDisposed) return;
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Models/EntityCreationPayload.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Models/EntityCreationPayload.cs
index 4c7ab2e3c..ee6761cba 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Models/EntityCreationPayload.cs
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/Models/EntityCreationPayload.cs
@@ -7,47 +7,83 @@ namespace NetDaemon.Extensions.MqttEntityManager.Models;
///
internal class EntityCreationPayload
{
+ ///
+ /// Gets or sets the display name of the entity.
+ ///
[JsonPropertyName("name")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Name { get; set; }
+ ///
+ /// Gets or sets the Home Assistant device class.
+ ///
[JsonPropertyName("device_class")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? DeviceClass { get; set; }
+ ///
+ /// Gets or sets the unique identifier for the entity.
+ ///
[JsonPropertyName("unique_id")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? UniqueId { get; set; }
+ ///
+ /// Gets or sets the default entity id.
+ ///
[JsonPropertyName("default_entity_id")]
public string? DefaultEntityId { get; set; }
+ ///
+ /// Gets or sets the MQTT command topic.
+ ///
[JsonPropertyName("command_topic")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? CommandTopic { get; set; }
+ ///
+ /// Gets or sets the MQTT state topic.
+ ///
[JsonPropertyName("state_topic")]
public string? StateTopic { get; set; }
+ ///
+ /// Gets or sets the MQTT JSON attributes topic.
+ ///
[JsonPropertyName("json_attributes_topic")]
public string? JsonAttributesTopic { get; set; }
+ ///
+ /// Gets or sets the MQTT availability topic.
+ ///
[JsonPropertyName("availability_topic")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? AvailabilityTopic { get; set; }
+ ///
+ /// Gets or sets the payload that marks the entity as available.
+ ///
[JsonPropertyName("payload_available")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? PayloadAvailable { get; set; }
+ ///
+ /// Gets or sets the payload that marks the entity as not available.
+ ///
[JsonPropertyName("payload_not_available")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? PayloadNotAvailable { get; set; }
+ ///
+ /// Gets or sets the payload that represents the on state.
+ ///
[JsonPropertyName("payload_on")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? PayloadOn { get; set; }
+ ///
+ /// Gets or sets the payload that represents the off state.
+ ///
[JsonPropertyName("payload_off")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? PayloadOff { get; set; }
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttClientOptionsFactory.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttClientOptionsFactory.cs
index 6589ceaa8..b0f647987 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttClientOptionsFactory.cs
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttClientOptionsFactory.cs
@@ -1,4 +1,5 @@
-using MQTTnet.Extensions.ManagedClient;
+using MQTTnet;
+using MQTTnet.Formatter;
namespace NetDaemon.Extensions.MqttEntityManager;
@@ -6,7 +7,7 @@ namespace NetDaemon.Extensions.MqttEntityManager;
public class MqttClientOptionsFactory : IMqttClientOptionsFactory
{
///
- public ManagedMqttClientOptions CreateClientOptions(MqttConfiguration mqttConfig)
+ public MqttClientOptions CreateClientOptions(MqttConfiguration mqttConfig)
{
ArgumentNullException.ThrowIfNull(mqttConfig);
@@ -15,29 +16,25 @@ public ManagedMqttClientOptions CreateClientOptions(MqttConfiguration mqttConfig
throw new ArgumentException("Explicit MQTT host configuration was not provided and no suitable broker addon was discovered", nameof(mqttConfig));
}
- var clientOptions = new ManagedMqttClientOptionsBuilder()
- .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
- .WithClientOptions(clientOptionsBuilder =>
- {
- clientOptionsBuilder.WithTcpServer(mqttConfig.Host, mqttConfig.Port);
+ var clientOptionsBuilder = new MqttClientOptionsBuilder()
+ .WithProtocolVersion(MqttProtocolVersion.V311)
+ .WithTcpServer(mqttConfig.Host, mqttConfig.Port);
- if (!string.IsNullOrEmpty(mqttConfig.UserName) && !string.IsNullOrEmpty(mqttConfig.Password))
- {
- clientOptionsBuilder.WithCredentials(mqttConfig.UserName, mqttConfig.Password);
- }
+ if (mqttConfig.UseTls)
+ {
+ clientOptionsBuilder.WithTlsOptions(tlsOptionsBuilder =>
+ {
+ tlsOptionsBuilder
+ .UseTls()
+ .WithAllowUntrustedCertificates(mqttConfig.AllowUntrustedCertificates);
+ });
+ }
- if (mqttConfig.UseTls)
- {
- clientOptionsBuilder.WithTlsOptions(tlsOptionsBuilder =>
- {
- tlsOptionsBuilder
- .UseTls()
- .WithAllowUntrustedCertificates(mqttConfig.AllowUntrustedCertificates);
- });
- }
- })
- .Build();
+ if (!string.IsNullOrEmpty(mqttConfig.UserName) && !string.IsNullOrEmpty(mqttConfig.Password))
+ {
+ clientOptionsBuilder.WithCredentials(mqttConfig.UserName, mqttConfig.Password);
+ }
- return clientOptions;
+ return clientOptionsBuilder.Build();
}
}
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttEntityManager.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttEntityManager.cs
index 66058df34..17a21e0d8 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttEntityManager.cs
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttEntityManager.cs
@@ -23,14 +23,15 @@ internal class MqttEntityManager : IMqttEntityManager
private readonly IMessageSender _messageSender;
private readonly IMessageSubscriber _messageSubscriber;
+ ///
public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; } = MqttQualityOfServiceLevel.AtMostOnce;
///
/// Manage entities via MQTT
///
- ///
- ///
- ///
+ /// The MQTT message sender.
+ /// The MQTT message subscriber.
+ /// The MQTT configuration options.
public MqttEntityManager(IMessageSender messageSender, IMessageSubscriber messageSubscriber, IOptions config)
{
_messageSender = messageSender;
@@ -38,12 +39,7 @@ public MqttEntityManager(IMessageSender messageSender, IMessageSubscriber messag
_config = config.Value;
}
- ///
- /// Create an entity in Home Assistant via MQTT
- ///
- /// Distinct identifier, in the format "domain.id", such as "sensor.kitchen_temp"
- /// Optional set of additional parameters
- ///
+ ///
public async Task CreateAsync(string entityId, EntityCreationOptions? options = null,
object? additionalConfig = null)
{
@@ -57,10 +53,7 @@ await _messageSender
.ConfigureAwait(false);
}
- ///
- /// Remove an entity from Home Assistant
- ///
- ///
+ ///
public async Task RemoveAsync(string entityId)
{
var (domain, identifier) = EntityIdParser.Extract(entityId);
@@ -69,13 +62,7 @@ await _messageSender
.ConfigureAwait(false);
}
- ///
- /// Set the state of an entity
- ///
- ///
- ///
- ///
- ///
+ ///
public async Task SetStateAsync(string entityId, string state)
{
var (domain, identifier) = EntityIdParser.Extract(entityId);
@@ -84,13 +71,7 @@ await _messageSender.SendMessageAsync(StatePath(domain, identifier), state, true
.ConfigureAwait(false);
}
- ///
- /// Set attributes on an entity
- ///
- ///
- ///
- ///
- ///
+ ///
public async Task SetAttributesAsync(string entityId, object attributes)
{
var (domain, identifier) = EntityIdParser.Extract(entityId);
@@ -100,13 +81,7 @@ await _messageSender.SendMessageAsync(AttrsPath(domain, identifier), JsonSeriali
.ConfigureAwait(false);
}
- ///
- /// Set availability of the entity. If you specified "payload_available" and "payload_not_available" configuration
- /// on creating the entity then the value should match one of these.
- /// If not, then use "online" and "offline"
- ///
- ///
- ///
+ ///
public async Task SetAvailabilityAsync(string entityId, string availability)
{
var (domain, identifier) = EntityIdParser.Extract(entityId);
@@ -116,12 +91,7 @@ await _messageSender
.ConfigureAwait(false);
}
- ///
- /// Prepare a subscription to command topics for the given entity
- /// Be sure to chain this request with .Subscribe(...)
- ///
- ///
- ///
+ ///
public async Task> PrepareCommandSubscriptionAsync(string entityId)
{
var (domain, identifier) = EntityIdParser.Extract(entityId);
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttFactoryFactory.cs b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttFactoryFactory.cs
index 54bb190a5..e59c760e0 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttFactoryFactory.cs
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MqttFactoryFactory.cs
@@ -1,20 +1,15 @@
-using MQTTnet;
-using MQTTnet.Extensions.ManagedClient;
+using MQTTnet;
namespace NetDaemon.Extensions.MqttEntityManager;
///
-/// MqttNet removed the IMqttFactory interface at v4 which breaks our DI
-/// So this is a factory for the MqttFactory to satisfy our use case
+/// Factory wrapper for MQTTnet's client factory.
///
internal class MqttFactoryFactory : IMqttFactory
{
- ///
- /// Create a Managed Mqtt Client
- ///
- ///
- public IManagedMqttClient CreateManagedMqttClient()
+ ///
+ public IMqttClient CreateMqttClient()
{
- return new MqttFactory().CreateManagedMqttClient();
+ return new MqttClientFactory().CreateMqttClient();
}
-}
\ No newline at end of file
+}
diff --git a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/NetDaemon.Extensions.MqttEntityManager.csproj b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/NetDaemon.Extensions.MqttEntityManager.csproj
index 83fd0bac3..4b18c03a8 100644
--- a/src/Extensions/NetDaemon.Extensions.MqttEntityManager/NetDaemon.Extensions.MqttEntityManager.csproj
+++ b/src/Extensions/NetDaemon.Extensions.MqttEntityManager/NetDaemon.Extensions.MqttEntityManager.csproj
@@ -31,13 +31,12 @@
-
-
-
-
-
-
-
+
+
+
+
+
+
diff --git a/src/Extensions/NetDaemon.Extensions.Scheduling.Tests/NetDaemon.Extensions.Scheduling.Tests.csproj b/src/Extensions/NetDaemon.Extensions.Scheduling.Tests/NetDaemon.Extensions.Scheduling.Tests.csproj
index 6b4f8c1ef..f88749ff3 100644
--- a/src/Extensions/NetDaemon.Extensions.Scheduling.Tests/NetDaemon.Extensions.Scheduling.Tests.csproj
+++ b/src/Extensions/NetDaemon.Extensions.Scheduling.Tests/NetDaemon.Extensions.Scheduling.Tests.csproj
@@ -1,20 +1,20 @@
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
-
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
-
+
diff --git a/src/Extensions/NetDaemon.Extensions.Scheduling/NetDaemon.Extensions.Scheduling.csproj b/src/Extensions/NetDaemon.Extensions.Scheduling/NetDaemon.Extensions.Scheduling.csproj
index acf145709..311bfb38b 100644
--- a/src/Extensions/NetDaemon.Extensions.Scheduling/NetDaemon.Extensions.Scheduling.csproj
+++ b/src/Extensions/NetDaemon.Extensions.Scheduling/NetDaemon.Extensions.Scheduling.csproj
@@ -31,9 +31,9 @@
-
-
-
+
+
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
diff --git a/src/Extensions/NetDaemon.Extensions.Tts/NetDaemon.Extensions.Tts.csproj b/src/Extensions/NetDaemon.Extensions.Tts/NetDaemon.Extensions.Tts.csproj
index 43664eb89..f889ebad3 100644
--- a/src/Extensions/NetDaemon.Extensions.Tts/NetDaemon.Extensions.Tts.csproj
+++ b/src/Extensions/NetDaemon.Extensions.Tts/NetDaemon.Extensions.Tts.csproj
@@ -29,9 +29,9 @@
-
-
-
+
+
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
diff --git a/src/HassModel/NetDaemon.HassModel.CodeGenerator/NetDaemon.HassModel.CodeGenerator.csproj b/src/HassModel/NetDaemon.HassModel.CodeGenerator/NetDaemon.HassModel.CodeGenerator.csproj
index 3ae790aa4..4fd08bc26 100644
--- a/src/HassModel/NetDaemon.HassModel.CodeGenerator/NetDaemon.HassModel.CodeGenerator.csproj
+++ b/src/HassModel/NetDaemon.HassModel.CodeGenerator/NetDaemon.HassModel.CodeGenerator.csproj
@@ -32,14 +32,14 @@
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
-
+
diff --git a/src/HassModel/NetDaemon.HassModel.Integration/NetDaemon.HassModel.Integration.csproj b/src/HassModel/NetDaemon.HassModel.Integration/NetDaemon.HassModel.Integration.csproj
index b7c7e998b..ccf16237e 100644
--- a/src/HassModel/NetDaemon.HassModel.Integration/NetDaemon.HassModel.Integration.csproj
+++ b/src/HassModel/NetDaemon.HassModel.Integration/NetDaemon.HassModel.Integration.csproj
@@ -30,7 +30,7 @@
-
+
diff --git a/src/HassModel/NetDaemon.HassModel.Tests/NetDaemon.HassModel.Tests.csproj b/src/HassModel/NetDaemon.HassModel.Tests/NetDaemon.HassModel.Tests.csproj
index 20f9f9d38..49665fcfd 100644
--- a/src/HassModel/NetDaemon.HassModel.Tests/NetDaemon.HassModel.Tests.csproj
+++ b/src/HassModel/NetDaemon.HassModel.Tests/NetDaemon.HassModel.Tests.csproj
@@ -1,28 +1,28 @@
-
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
-
-
-
+
+
+
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
diff --git a/src/HassModel/NetDaemon.HassModel/NetDaemon.HassModel.csproj b/src/HassModel/NetDaemon.HassModel/NetDaemon.HassModel.csproj
index 9ed4b747d..a6a66a692 100644
--- a/src/HassModel/NetDaemon.HassModel/NetDaemon.HassModel.csproj
+++ b/src/HassModel/NetDaemon.HassModel/NetDaemon.HassModel.csproj
@@ -30,8 +30,8 @@
-
-
+
+
diff --git a/src/Host/NetDaemon.Host.Default/NetDaemon.Host.Default.csproj b/src/Host/NetDaemon.Host.Default/NetDaemon.Host.Default.csproj
index 5507bb172..b28660517 100644
--- a/src/Host/NetDaemon.Host.Default/NetDaemon.Host.Default.csproj
+++ b/src/Host/NetDaemon.Host.Default/NetDaemon.Host.Default.csproj
@@ -5,7 +5,7 @@
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
diff --git a/src/Runtime/NetDaemon.Runtime.Tests/NetDaemon.Runtime.Tests.csproj b/src/Runtime/NetDaemon.Runtime.Tests/NetDaemon.Runtime.Tests.csproj
index 79c04949b..72e295e41 100644
--- a/src/Runtime/NetDaemon.Runtime.Tests/NetDaemon.Runtime.Tests.csproj
+++ b/src/Runtime/NetDaemon.Runtime.Tests/NetDaemon.Runtime.Tests.csproj
@@ -1,16 +1,16 @@
-
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
-
-
+
+
-
+
@@ -18,11 +18,11 @@
runtime; build; native; contentfiles; analyzers; buildtransitive
all
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
diff --git a/src/Runtime/NetDaemon.Runtime/NetDaemon.Runtime.csproj b/src/Runtime/NetDaemon.Runtime/NetDaemon.Runtime.csproj
index 765431ec9..ac9ec276d 100644
--- a/src/Runtime/NetDaemon.Runtime/NetDaemon.Runtime.csproj
+++ b/src/Runtime/NetDaemon.Runtime/NetDaemon.Runtime.csproj
@@ -28,14 +28,14 @@
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
runtime; build; native; contentfiles; analyzers; buildtransitive
diff --git a/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantCollection.cs b/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantCollection.cs
index 9b17c60bd..550ccb66b 100644
--- a/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantCollection.cs
+++ b/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantCollection.cs
@@ -2,8 +2,11 @@
namespace NetDaemon.Tests.Integration.Helpers;
+///
+/// Defines the shared Home Assistant integration test collection.
+///
[CollectionDefinition("HomeAssistant collection")]
public class HomeAssistantCollection : ICollectionFixture
{
-}
\ No newline at end of file
+}
diff --git a/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantLifetime.cs b/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantLifetime.cs
index cd97ad7ab..01a8488f8 100644
--- a/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantLifetime.cs
+++ b/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantLifetime.cs
@@ -1,34 +1,158 @@
-using Microsoft.Extensions.Logging;
+using DotNet.Testcontainers.Builders;
+using DotNet.Testcontainers.Containers;
+using DotNet.Testcontainers.Networks;
+using Microsoft.Extensions.Logging;
+using MQTTnet;
+using MQTTnet.Formatter;
using NetDaemon.Tests.Integration.Helpers.HomeAssistantTestContainer;
+using System.Text;
using Xunit;
namespace NetDaemon.Tests.Integration.Helpers;
+///
+/// Manages the shared Home Assistant and MQTT broker containers for integration tests.
+///
public class HomeAssistantLifetime : IAsyncLifetime
{
- private readonly HomeAssistantContainer _homeassistant = new HomeAssistantContainerBuilder()
- .WithResourceMapping(new DirectoryInfo("./HA/config"), "/config")
- .WithLogger(LoggerFactory.Create(builder => builder.AddConsole()).CreateLogger())
- .WithVersion(Environment.GetEnvironmentVariable("HomeAssistantVersion") ?? HomeAssistantContainerBuilder.DefaultVersion)
- .Build();
+ private const int MqttContainerPort = 1883;
+ private const string MqttContainerAlias = "mqtt";
+ private const string MosquittoConfiguration = """
+ listener 1883 0.0.0.0
+ allow_anonymous true
+ persistence false
+ log_dest stdout
+ """;
+ private readonly ILoggerFactory _loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
+ private readonly INetwork _network = new NetworkBuilder().Build();
+ private readonly IContainer _mqttBroker;
+ private readonly HomeAssistantContainer _homeassistant;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ public HomeAssistantLifetime()
+ {
+ _mqttBroker = new ContainerBuilder()
+ .WithImage(Environment.GetEnvironmentVariable("MqttBrokerImage") ?? "eclipse-mosquitto:2")
+ .WithNetwork(_network)
+ .WithNetworkAliases(MqttContainerAlias)
+ .WithPortBinding(MqttContainerPort, true)
+ .WithResourceMapping(Encoding.UTF8.GetBytes(MosquittoConfiguration), "/mosquitto/config/mosquitto.conf")
+ .WithWaitStrategy(
+ Wait.ForUnixContainer()
+ .UntilInternalTcpPortIsAvailable(MqttContainerPort)
+ .UntilExternalTcpPortIsAvailable(MqttContainerPort))
+ .WithLogger(_loggerFactory.CreateLogger("MqttBrokerContainer"))
+ .Build();
+
+ _homeassistant = new HomeAssistantContainerBuilder()
+ .WithNetwork(_network)
+ .WithResourceMapping(new DirectoryInfo("./HA/config"), "/config")
+ .WithLogger(_loggerFactory.CreateLogger())
+ .WithVersion(Environment.GetEnvironmentVariable("HomeAssistantVersion") ?? HomeAssistantContainerBuilder.DefaultVersion)
+ .Build();
+ }
+
+ ///
+ /// Gets or sets the Home Assistant API access token.
+ ///
public string? AccessToken { get; set; }
+
+ ///
+ /// Gets the mapped Home Assistant HTTP port.
+ ///
public ushort Port => _homeassistant.Port;
+ ///
+ /// Gets the host name used by the test host to connect to the MQTT broker.
+ ///
+ public string MqttHost { get; } = "localhost";
+
+ ///
+ /// Gets the mapped MQTT broker port.
+ ///
+ public ushort MqttPort => _mqttBroker.GetMappedPublicPort(MqttContainerPort);
+
+ ///
public async Task InitializeAsync()
{
+ await _network.CreateAsync();
+ await _mqttBroker.StartAsync();
+ await WaitForMqttBrokerAsync();
await _homeassistant.StartAsync();
var authorizeResult = await _homeassistant.DoOnboarding();
AccessToken = (await _homeassistant.GenerateApiToken(authorizeResult.AuthCode)).AccessToken;
- await _homeassistant.AddIntegrations(AccessToken);
+ await _homeassistant.AddIntegrations(AccessToken, new MqttBrokerSettings(MqttContainerAlias, MqttContainerPort));
}
+ ///
public async Task DisposeAsync()
{
- var (_, stderr) = await _homeassistant.GetLogsAsync();
- Console.WriteLine($"Writing Homeassistant logs to console:{Environment.NewLine}{stderr}{Environment.NewLine}End of Homeassistant logs");
- await _homeassistant.StopAsync();
+ var (_, homeAssistantStderr) = await _homeassistant.GetLogsAsync();
+ Console.WriteLine($"Writing Homeassistant logs to console:{Environment.NewLine}{homeAssistantStderr}{Environment.NewLine}End of Homeassistant logs");
+
+ var (_, mqttStderr) = await _mqttBroker.GetLogsAsync();
+ Console.WriteLine($"Writing MQTT broker logs to console:{Environment.NewLine}{mqttStderr}{Environment.NewLine}End of MQTT broker logs");
+
+ await _homeassistant.DisposeAsync();
+ await _mqttBroker.DisposeAsync();
+ await _network.DisposeAsync();
+ _loggerFactory.Dispose();
}
-}
+ private async Task WaitForMqttBrokerAsync()
+ {
+ using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(30));
+ var logger = _loggerFactory.CreateLogger();
+ Exception? lastException = null;
+
+ while (!timeout.IsCancellationRequested)
+ {
+ try
+ {
+ using var client = new MqttClientFactory().CreateMqttClient();
+ var clientOptions = new MqttClientOptionsBuilder()
+ .WithTcpServer(MqttHost, MqttPort)
+ .WithProtocolVersion(MqttProtocolVersion.V311)
+ .WithClientId($"netdaemon-test-readiness-{Guid.NewGuid():N}")
+ .Build();
+
+ var connectResult = await client.ConnectAsync(clientOptions, timeout.Token);
+ if (connectResult.ResultCode == MqttClientConnectResultCode.Success)
+ {
+ await client.DisconnectAsync();
+ return;
+ }
+
+ lastException = new InvalidOperationException($"MQTT broker rejected readiness connection: {connectResult.ResultCode} {connectResult.ReasonString}");
+ }
+ catch (OperationCanceledException) when (timeout.IsCancellationRequested)
+ {
+ break;
+ }
+ catch (Exception ex)
+ {
+ lastException = ex;
+ }
+
+ await DelayNextMqttBrokerAttempt(timeout.Token);
+ }
+
+ logger.LogError(lastException, "MQTT broker endpoint {Host}:{Port} did not become ready within 30 seconds", MqttHost, MqttPort);
+ throw new TimeoutException($"MQTT broker endpoint {MqttHost}:{MqttPort} did not become ready within 30 seconds.", lastException);
+ }
+
+ private static async Task DelayNextMqttBrokerAttempt(CancellationToken cancellationToken)
+ {
+ try
+ {
+ await Task.Delay(TimeSpan.FromMilliseconds(100), cancellationToken);
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+ {
+ }
+ }
+}
diff --git a/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantConfiguration.cs b/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantConfiguration.cs
index 2618daa30..1723b2e81 100644
--- a/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantConfiguration.cs
+++ b/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantConfiguration.cs
@@ -4,13 +4,38 @@
namespace NetDaemon.Tests.Integration.Helpers.HomeAssistantTestContainer;
+///
+/// Container configuration for Home Assistant integration tests.
+///
public class HomeAssistantConfiguration : ContainerConfiguration
{
+ ///
+ /// Gets the Home Assistant onboarding username.
+ ///
public string Username { get; } = null!;
+
+ ///
+ /// Gets the Home Assistant onboarding password.
+ ///
public string Password { get; } = null!;
+
+ ///
+ /// Gets the Home Assistant OAuth client id.
+ ///
public string ClientId { get; } = null!;
+
+ ///
+ /// Gets the Home Assistant container image version.
+ ///
public string Version { get; } = null!;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The Home Assistant onboarding username.
+ /// The Home Assistant onboarding password.
+ /// The Home Assistant OAuth client id.
+ /// The Home Assistant container image version.
public HomeAssistantConfiguration(
string? username = null,
string? password = null,
@@ -23,16 +48,29 @@ public HomeAssistantConfiguration(
Version = version!;
}
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The Docker resource configuration.
public HomeAssistantConfiguration(IResourceConfiguration resourceConfiguration)
: base(resourceConfiguration)
{
}
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The container configuration.
public HomeAssistantConfiguration(IContainerConfiguration resourceConfiguration)
: base(resourceConfiguration)
{
}
-
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The previous Home Assistant configuration.
+ /// The new Home Assistant configuration.
public HomeAssistantConfiguration(HomeAssistantConfiguration oldValue, HomeAssistantConfiguration newValue)
: base(oldValue, newValue)
{
@@ -41,4 +79,4 @@ public HomeAssistantConfiguration(HomeAssistantConfiguration oldValue, HomeAssis
Password = BuildConfiguration.Combine(oldValue.Password, newValue.Password);
Version = BuildConfiguration.Combine(oldValue.Version, newValue.Version);
}
-}
\ No newline at end of file
+}
diff --git a/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantContainer.cs b/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantContainer.cs
index 1df0c4840..a8757c6a9 100644
--- a/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantContainer.cs
+++ b/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantContainer.cs
@@ -6,22 +6,41 @@
namespace NetDaemon.Tests.Integration.Helpers.HomeAssistantTestContainer;
+///
+/// Testcontainers container wrapper for Home Assistant integration tests.
+///
public class HomeAssistantContainer : DockerContainer
{
private readonly HomeAssistantConfiguration _configuration;
+
+ ///
+ /// Gets the mapped Home Assistant HTTP port.
+ ///
public ushort Port => GetMappedPublicPort(8123);
private HttpClient? _client;
+
+ ///
+ /// Gets the HTTP client configured for the Home Assistant container.
+ ///
public HttpClient Client => _client ??= new HttpClient
{
BaseAddress = new Uri($"http://localhost:{Port}")
};
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The Home Assistant container configuration.
public HomeAssistantContainer(HomeAssistantConfiguration configuration) : base(configuration)
{
_configuration = configuration;
}
+ ///
+ /// Completes Home Assistant onboarding for the test user.
+ ///
+ /// The Home Assistant authorization result.
public async Task DoOnboarding()
{
var onboardingResult = await Client.PostAsync("/api/onboarding/users", JsonContent.Create(new
@@ -37,10 +56,16 @@ public async Task DoOnboarding()
return (await onboardingResult.Content.ReadFromJsonAsync())!;
}
- public async Task AddIntegrations(string token)
+ ///
+ /// Adds integrations required by the integration tests.
+ ///
+ /// The Home Assistant API access token.
+ /// The MQTT broker settings used by Home Assistant.
+ public async Task AddIntegrations(string token, MqttBrokerSettings mqttBrokerSettings)
{
AddAuthorizationHeaders(token);
await AddLocalCalendarIntegration();
+ await AddMqttIntegration(mqttBrokerSettings);
}
private void AddAuthorizationHeaders(string token)
@@ -69,6 +94,72 @@ private async Task AddLocalCalendarIntegration()
}
}
+ private async Task AddMqttIntegration(MqttBrokerSettings mqttBrokerSettings)
+ {
+ var submitFlow = await SubmitMqttIntegration(mqttBrokerSettings, includeProtocol: true);
+ var flowType = submitFlow.GetProperty("type").GetString();
+ if (flowType != "create_entry")
+ {
+ throw new InvalidOperationException($"Home Assistant MQTT integration setup failed: {submitFlow}");
+ }
+ }
+
+ private async Task SubmitMqttIntegration(MqttBrokerSettings mqttBrokerSettings, bool includeProtocol)
+ {
+ var result = await Client.PostAsync("/api/config/config_entries/flow", JsonContent.Create(new
+ {
+ handler = "mqtt",
+ show_advanced_options = true
+ }));
+
+ await EnsureSuccessStatusCode(result, "starting MQTT config flow");
+ var startFlow = await result.Content.ReadFromJsonAsync();
+ var flowId = startFlow.GetProperty("flow_id").GetString()
+ ?? throw new InvalidOperationException("Home Assistant did not return an MQTT config flow id.");
+
+ var brokerSettings = new Dictionary
+ {
+ ["broker"] = mqttBrokerSettings.Host,
+ ["port"] = mqttBrokerSettings.Port
+ };
+
+ if (includeProtocol)
+ {
+ brokerSettings["protocol"] = "3.1.1";
+ }
+
+ var submitResult = await Client.PostAsync($"/api/config/config_entries/flow/{flowId}", JsonContent.Create(brokerSettings));
+
+ if (submitResult.IsSuccessStatusCode)
+ {
+ return await submitResult.Content.ReadFromJsonAsync();
+ }
+
+ var content = await submitResult.Content.ReadAsStringAsync();
+ if (includeProtocol && content.Contains("data['protocol']", StringComparison.Ordinal))
+ {
+ return await SubmitMqttIntegration(mqttBrokerSettings, includeProtocol: false);
+ }
+
+ throw new HttpRequestException($"Home Assistant returned {(int)submitResult.StatusCode} ({submitResult.StatusCode}) while submitting MQTT config flow: {content}");
+ }
+
+ private static async Task EnsureSuccessStatusCode(HttpResponseMessage response, string operation)
+ {
+ if (response.IsSuccessStatusCode)
+ {
+ return;
+ }
+
+ var content = await response.Content.ReadAsStringAsync();
+ throw new HttpRequestException($"Home Assistant returned {(int)response.StatusCode} ({response.StatusCode}) while {operation}: {content}");
+ }
+
+ ///
+ /// Generates a Home Assistant API token from an authorization code.
+ ///
+ /// The Home Assistant authorization code.
+ /// The generated Home Assistant token result.
public async Task GenerateApiToken(string authCode)
{
var tokenResponse = await Client.PostAsync("/auth/token", new FormUrlEncodedContent(
@@ -84,12 +175,31 @@ public async Task GenerateApiToken(string authCode)
}
}
+///
+/// Authorization result returned by Home Assistant onboarding.
+///
+/// The authorization code.
public record HomeAssistantAuthorizeResult(
[property:JsonPropertyName("auth_code")][property:JsonRequired]string AuthCode);
+///
+/// Token result returned by the Home Assistant token endpoint.
+///
+/// The API access token.
+/// The token type.
+/// The refresh token.
+/// The token lifetime in seconds.
+/// The Home Assistant authentication provider.
public record HomeAssistantTokenResult(
[property:JsonPropertyName("access_token")][property:JsonRequired]string AccessToken,
[property:JsonPropertyName("token_type")][property:JsonRequired]string TokenType,
[property:JsonPropertyName("refresh_token")][property:JsonRequired]string RefreshToken,
[property:JsonPropertyName("expires_in")][property:JsonRequired]int ExpiresIn,
[property:JsonPropertyName("ha_auth_provider")][property:JsonRequired]string HaAuthProvider);
+
+///
+/// MQTT broker settings used to configure Home Assistant.
+///
+/// The broker host name visible from Home Assistant.
+/// The broker port visible from Home Assistant.
+public record MqttBrokerSettings(string Host, int Port);
diff --git a/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantContainerBuilder.cs b/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantContainerBuilder.cs
index 80320fb96..4e07b6d2e 100644
--- a/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantContainerBuilder.cs
+++ b/tests/Integration/NetDaemon.Tests.Integration/Helpers/HomeAssistantTestContainer/HomeAssistantContainerBuilder.cs
@@ -5,18 +5,43 @@
namespace NetDaemon.Tests.Integration.Helpers.HomeAssistantTestContainer;
+///
+/// Builds Home Assistant containers for integration tests.
+///
public class HomeAssistantContainerBuilder : ContainerBuilder
{
+ ///
+ /// The default Home Assistant container image version.
+ ///
public const string DefaultVersion = "stable";
+
+ ///
+ /// The default OAuth client id used during Home Assistant onboarding.
+ ///
public const string DefaultClientId = "http://dummyClientId";
+
+ ///
+ /// The default Home Assistant onboarding username.
+ ///
public const string DefaultUsername = "username";
+
+ ///
+ /// The default Home Assistant onboarding password.
+ ///
public const string DefaultPassword = "password";
+ ///
+ /// Initializes a new instance of the class.
+ ///
public HomeAssistantContainerBuilder() : this(new HomeAssistantConfiguration())
{
DockerResourceConfiguration = Init().DockerResourceConfiguration;
}
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The Home Assistant container configuration.
public HomeAssistantContainerBuilder(HomeAssistantConfiguration dockerResourceConfiguration) : base(dockerResourceConfiguration)
{
DockerResourceConfiguration = dockerResourceConfiguration;
@@ -34,14 +59,37 @@ protected override HomeAssistantContainerBuilder Init() =>
.WithClientId(DefaultClientId)
.WithVersion(DefaultVersion);
+ ///
+ /// Sets the Home Assistant container image version.
+ ///
+ /// The Home Assistant container image version.
+ /// The configured builder.
public HomeAssistantContainerBuilder WithVersion(string version) =>
Merge(DockerResourceConfiguration, new HomeAssistantConfiguration(version: version))
.WithImage($"homeassistant/home-assistant:{version}");
+ ///
+ /// Sets the Home Assistant onboarding username.
+ ///
+ /// The Home Assistant onboarding username.
+ /// The configured builder.
public HomeAssistantContainerBuilder WithUsername(string username) => Merge(DockerResourceConfiguration, new HomeAssistantConfiguration(username: username));
+
+ ///
+ /// Sets the Home Assistant onboarding password.
+ ///
+ /// The Home Assistant onboarding password.
+ /// The configured builder.
public HomeAssistantContainerBuilder WithPassword(string password) => Merge(DockerResourceConfiguration, new HomeAssistantConfiguration(password: password));
+
+ ///
+ /// Sets the Home Assistant OAuth client id.
+ ///
+ /// The Home Assistant OAuth client id.
+ /// The configured builder.
public HomeAssistantContainerBuilder WithClientId(string clientId) => Merge(DockerResourceConfiguration, new HomeAssistantConfiguration(clientId: clientId));
+ ///
public override HomeAssistantContainer Build()
{
Validate();
diff --git a/tests/Integration/NetDaemon.Tests.Integration/Helpers/NetDaemonIntegrationBase.cs b/tests/Integration/NetDaemon.Tests.Integration/Helpers/NetDaemonIntegrationBase.cs
index cf04718c9..a8a10854c 100644
--- a/tests/Integration/NetDaemon.Tests.Integration/Helpers/NetDaemonIntegrationBase.cs
+++ b/tests/Integration/NetDaemon.Tests.Integration/Helpers/NetDaemonIntegrationBase.cs
@@ -4,19 +4,30 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using NetDaemon.AppModel;
+using NetDaemon.Extensions.MqttEntityManager;
using NetDaemon.Runtime;
using Xunit;
namespace NetDaemon.Tests.Integration.Helpers;
+///
+/// Base class for integration tests that run against the shared Home Assistant container.
+///
[Collection("HomeAssistant collection")]
public class NetDaemonIntegrationBase : IAsyncDisposable
{
+ ///
+ /// Gets the scoped services for the running NetDaemon host.
+ ///
public IServiceProvider Services => _scope.ServiceProvider;
private readonly HomeAssistantLifetime _homeAssistantLifetime;
private readonly IHost _netDaemon;
private readonly AsyncServiceScope _scope;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The shared Home Assistant test lifetime.
public NetDaemonIntegrationBase(HomeAssistantLifetime homeAssistantLifetime)
{
_homeAssistantLifetime = homeAssistantLifetime;
@@ -36,13 +47,17 @@ private IHost StartNetDaemon()
{
{ "HomeAssistant:Port", _homeAssistantLifetime.Port.ToString(CultureInfo.InvariantCulture) },
{ "HomeAssistant:Token", _homeAssistantLifetime.AccessToken },
- { "HomeAssistant:Host", "localhost" }
+ { "HomeAssistant:Host", "localhost" },
+ { "Mqtt:Host", _homeAssistantLifetime.MqttHost },
+ { "Mqtt:Port", _homeAssistantLifetime.MqttPort.ToString(CultureInfo.InvariantCulture) },
+ { "Mqtt:DiscoveryPrefix", "homeassistant" }
});
})
.ConfigureServices((_, services) =>
services
.AddAppsFromAssembly(Assembly.GetExecutingAssembly())
.AddNetDaemonStateManager()
+ .AddNetDaemonMqttEntityManagement()
).Build();
netDaemon.Start();
@@ -61,9 +76,9 @@ private static async Task WaitForRuntimeToBeInitialized(IHost host)
///
/// Runs the specified function without a synchronization context and restores the synchronization context afterwards.
///
- ///
+ /// The function to run.
///
- ///
+ /// The result of the function.
private static T RunWithoutSynchronizationContext(Func func)
{
// Capture the current synchronization context so we can restore it later.
@@ -80,6 +95,7 @@ private static T RunWithoutSynchronizationContext(Func func)
}
}
+ ///
public async ValueTask DisposeAsync()
{
await _scope.DisposeAsync();
diff --git a/tests/Integration/NetDaemon.Tests.Integration/MqttIntegrationTests.cs b/tests/Integration/NetDaemon.Tests.Integration/MqttIntegrationTests.cs
new file mode 100644
index 000000000..cf7537409
--- /dev/null
+++ b/tests/Integration/NetDaemon.Tests.Integration/MqttIntegrationTests.cs
@@ -0,0 +1,138 @@
+using FluentAssertions;
+using Microsoft.Extensions.DependencyInjection;
+using NetDaemon.AppModel;
+using NetDaemon.Extensions.MqttEntityManager;
+using NetDaemon.HassModel;
+using NetDaemon.HassModel.Entities;
+using NetDaemon.Tests.Integration.Helpers;
+using System.Reactive.Linq;
+using System.Reactive.Threading.Tasks;
+using Xunit;
+
+namespace NetDaemon.Tests.Integration;
+
+///
+/// Test app that exposes a switch through the MQTT entity manager.
+///
+[NetDaemonApp]
+public sealed class MqttIntegrationSwitchApp : IAsyncInitializable, IDisposable
+{
+ ///
+ /// The Home Assistant entity id created by the MQTT integration test app.
+ ///
+ public const string EntityId = "switch.netdaemon_mqtt_test_switch";
+ private const string PayloadOn = "ON";
+ private const string PayloadOff = "OFF";
+
+ private readonly IMqttEntityManager _entityManager;
+ private IDisposable? _commandSubscription;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The MQTT entity manager.
+ public MqttIntegrationSwitchApp(IMqttEntityManager entityManager)
+ {
+ _entityManager = entityManager;
+ }
+
+ ///
+ public async Task InitializeAsync(CancellationToken cancellationToken)
+ {
+ await _entityManager.CreateAsync(
+ EntityId,
+ new EntityCreationOptions(
+ UniqueId: "netdaemon_mqtt_test_switch",
+ Name: "NetDaemon MQTT Test Switch",
+ PayloadOn: PayloadOn,
+ PayloadOff: PayloadOff)).ConfigureAwait(false);
+
+ await _entityManager.SetStateAsync(EntityId, PayloadOff).ConfigureAwait(false);
+
+ var commands = await _entityManager.PrepareCommandSubscriptionAsync(EntityId).ConfigureAwait(false);
+ _commandSubscription = commands.Subscribe(command => _ = HandleCommandAsync(command));
+ }
+
+ ///
+ public void Dispose()
+ {
+ _commandSubscription?.Dispose();
+ }
+
+ private async Task HandleCommandAsync(string command)
+ {
+ if (command is PayloadOn or PayloadOff)
+ {
+ await _entityManager.SetStateAsync(EntityId, command).ConfigureAwait(false);
+ }
+ }
+}
+
+///
+/// Integration tests for MQTT-backed Home Assistant entities.
+///
+public class MqttIntegrationTests : NetDaemonIntegrationBase
+{
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The shared Home Assistant test lifetime.
+ public MqttIntegrationTests(HomeAssistantLifetime homeAssistantLifetime) : base(homeAssistantLifetime)
+ {
+ }
+
+ ///
+ /// Verifies that an MQTT switch can be turned on and off through Home Assistant.
+ ///
+ [Fact]
+ public async Task MqttSwitch_ShouldTurnOnAndOffThroughHomeAssistant()
+ {
+ var haContext = Services.GetRequiredService();
+
+ await WaitForStateAsync(haContext, MqttIntegrationSwitchApp.EntityId, "off");
+
+ var waitForOn = WaitForStateAsync(haContext, MqttIntegrationSwitchApp.EntityId, "on");
+ haContext.CallService(
+ "switch",
+ "turn_on",
+ ServiceTarget.FromEntities(MqttIntegrationSwitchApp.EntityId));
+
+ await waitForOn;
+
+ var waitForOff = WaitForStateAsync(haContext, MqttIntegrationSwitchApp.EntityId, "off");
+ haContext.CallService(
+ "switch",
+ "turn_off",
+ ServiceTarget.FromEntities(MqttIntegrationSwitchApp.EntityId));
+
+ await waitForOff;
+ }
+
+ private static async Task WaitForStateAsync(IHaContext haContext, string entityId, string expectedState)
+ {
+ if (haContext.GetState(entityId)?.State == expectedState)
+ {
+ return;
+ }
+
+ var waitForState = haContext.StateChanges()
+ .Where(change => change.Entity.EntityId == entityId && change.New?.State == expectedState)
+ .FirstAsync()
+ .ToTask();
+
+ if (haContext.GetState(entityId)?.State == expectedState)
+ {
+ return;
+ }
+
+ try
+ {
+ await waitForState.WaitAsync(TimeSpan.FromSeconds(30));
+ }
+ catch (TimeoutException)
+ {
+ haContext.GetState(entityId)?.State.Should().Be(expectedState,
+ $"entity {entityId} should reach state {expectedState} within 30 seconds");
+ }
+ }
+}
diff --git a/tests/Integration/NetDaemon.Tests.Integration/NetDaemon.Tests.Integration.csproj b/tests/Integration/NetDaemon.Tests.Integration/NetDaemon.Tests.Integration.csproj
index 04c86cd40..7e06348d5 100644
--- a/tests/Integration/NetDaemon.Tests.Integration/NetDaemon.Tests.Integration.csproj
+++ b/tests/Integration/NetDaemon.Tests.Integration/NetDaemon.Tests.Integration.csproj
@@ -1,26 +1,26 @@
-
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
-
-
-
+
+
+
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
-
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
@@ -28,6 +28,7 @@
+