Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,21 @@
<Import Project="$(ProjectDir)../../Targets/Sourcelink.targets" />

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Http" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.Http" Version="10.0.8" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="10.0.8" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="10.0.8" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="10.0.8" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="10.0.8" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="10.0.8" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.8" />
<PackageReference Include="Microsoft.CodeAnalysis.CSharp" Version="5.3.0" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="10.0.203" PrivateAssets="all" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="10.0.300" PrivateAssets="all" />
<PackageReference Include="System.Reactive" Version="6.1.0" />
<PackageReference Include="Roslynator.Analyzers" Version="4.15.0">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="YamlDotNet" Version="17.1.0" />
<PackageReference Include="YamlDotNet" Version="18.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,23 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="10.0.8" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="10.0.8" />
<PackageReference Include="System.Configuration.ConfigurationManager" Version="10.0.8" />
<PackageReference Include="System.Reactive" Version="6.1.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="18.5.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="18.6.0" />
<PackageReference Include="Microsoft.CodeAnalysis.CSharp" Version="5.3.0" />
<PackageReference Include="Moq" Version="4.20.72" />
<PackageReference Include="xunit" Version="2.9.3" />
<PackageReference Include="xunit.runner.visualstudio" Version="3.1.5">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="coverlet.collector" Version="10.0.0">
<PackageReference Include="coverlet.collector" Version="10.0.1">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="coverlet.msbuild" Version="10.0.0">
<PackageReference Include="coverlet.msbuild" Version="10.0.1">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
Expand Down
16 changes: 8 additions & 8 deletions src/AppModel/NetDaemon.AppModel/NetDaemon.AppModel.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@
<Import Project="$(ProjectDir)../../Targets/Sourcelink.targets" />

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Http" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.Http" Version="10.0.8" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="10.0.8" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="10.0.8" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="10.0.8" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="10.0.8" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="10.0.8" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.8" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="10.0.300" PrivateAssets="all" />
<PackageReference Include="System.Reactive" Version="6.1.0" />
<PackageReference Include="Roslynator.Analyzers" Version="4.15.0">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="YamlDotNet" Version="17.1.0" />
<PackageReference Include="YamlDotNet" Version="18.0.0" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Http" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.7" />
<PackageReference Include="Microsoft.Extensions.Http" Version="10.0.8" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="10.0.8" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="10.0.8" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="10.0.8" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.8" />
<PackageReference Include="System.Reactive" Version="6.1.0" />
<PackageReference Include="Roslynator.Analyzers" Version="4.15.0">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,48 +1,216 @@
using MQTTnet.Extensions.ManagedClient;
using MQTTnet;
using MQTTnet.Packets;
using NetDaemon.Extensions.MqttEntityManager;
using NetDaemon.Extensions.MqttEntityManager.Helpers;

namespace NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests;

public class AssuredMqttConnectionTests
{
private static readonly TimeSpan DefaultWaitTimeout = TimeSpan.FromSeconds(5);
private static readonly TimeSpan RetryWaitTimeout = TimeSpan.FromSeconds(10);

[Fact]
public async Task QueuesPublishUntilConnected()
{
var setup = new AssuredMqttConnectionSetup();
using var conn = setup.CreateConnection();

var message = new MqttApplicationMessageBuilder()
.WithTopic("topic")
.WithPayload("payload")
.Build();

await conn.PublishAsync(message);
setup.PublishedMessages.Should().BeEmpty();

setup.CompleteConnect();

var publishedMessage = await setup.WaitForPublishedMessageAsync();
publishedMessage.Should().Be(message);
}

[Fact]
public async Task ReplaysSubscriptionsAfterConnect()
{
var setup = new AssuredMqttConnectionSetup();
using var conn = setup.CreateConnection();

var topicFilter = new MqttTopicFilterBuilder()
.WithTopic("homeassistant/domain/sensor/set")
.Build();

await conn.SubscribeAsync(topicFilter);
setup.SubscribedTopics.Should().BeEmpty();

setup.CompleteConnect();

var subscribedTopic = await setup.WaitForSubscribedTopicAsync();
subscribedTopic.Should().Be("homeassistant/domain/sensor/set");
}

[Fact]
public async Task CanGetClient()
public async Task PublishesImmediatelyWhenAlreadyConnected()
{
var logger = new Mock<ILogger<AssuredMqttConnection>>();
var setup = new AssuredMqttConnectionSetup();
using var conn = setup.CreateConnection();
setup.CompleteConnect();
await setup.WaitForConnectedAsync();

var message = new MqttApplicationMessageBuilder()
.WithTopic("topic")
.WithPayload("payload")
.Build();

var mqttClient = new Mock<IManagedMqttClient>();
var mqttFactory = new MqttFactoryWrapper(mqttClient.Object);
var mqttClientOptionsFactory = new Mock<IMqttClientOptionsFactory>();
var mqttConfigurationOptions = new Mock<IOptions<MqttConfiguration>>();
await conn.PublishAsync(message);

ConfigureMockOptions(mqttConfigurationOptions);
var publishedMessage = await setup.WaitForPublishedMessageAsync();
publishedMessage.Should().Be(message);
}

[Fact]
public async Task RetriesAfterTransientConnectionFailure()
{
var setup = new AssuredMqttConnectionSetup();
setup.FailConnectAttempts(1);
using var conn = setup.CreateConnection();

var topicFilter = new MqttTopicFilterBuilder()
.WithTopic("homeassistant/domain/switch/set")
.Build();

mqttClientOptionsFactory.Setup(f => f.CreateClientOptions(It.Is<MqttConfiguration>(o => o.Host == "localhost" && o.UserName == "id")))
.Returns(new ManagedMqttClientOptions())
.Verifiable(Times.Once);
var message = new MqttApplicationMessageBuilder()
.WithTopic("homeassistant/domain/switch/state")
.WithPayload("ON")
.Build();

var conn = new AssuredMqttConnection(logger.Object, mqttClientOptionsFactory.Object, mqttFactory, mqttConfigurationOptions.Object);
var returnedClient = await conn.GetClientAsync();
await conn.SubscribeAsync(topicFilter);
await conn.PublishAsync(message);
setup.CompleteConnect();

returnedClient.Should().Be(mqttClient.Object);
var subscribedTopic = await setup.WaitForSubscribedTopicAsync(RetryWaitTimeout);
var publishedMessage = await setup.WaitForPublishedMessageAsync(RetryWaitTimeout);

mqttClientOptionsFactory.VerifyAll();
mqttConfigurationOptions.VerifyAll();
setup.ConnectAttempts.Should().BeGreaterThan(1);
subscribedTopic.Should().Be("homeassistant/domain/switch/set");
publishedMessage.Should().Be(message);
}

private static void ConfigureMockOptions(Mock<IOptions<MqttConfiguration>> mockOptions, Action<MqttConfiguration>? configuration = null)
private sealed class AssuredMqttConnectionSetup
{
var mqttConfiguration = new MqttConfiguration
private readonly TaskCompletionSource _connectCompletion = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource _connected = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource<MqttApplicationMessage> _publishedMessage = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource<string> _subscribedTopic = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly Mock<IMqttClient> _mqttClient = new();
private readonly Mock<IMqttFactory> _mqttFactory = new();
private readonly Mock<IMqttClientOptionsFactory> _mqttClientOptionsFactory = new();
private readonly Mock<IOptions<MqttConfiguration>> _mqttConfigurationOptions = new();
private int _connectAttempts;
private int _connectFailuresBeforeSuccess;

public AssuredMqttConnectionSetup()
{
Host = "localhost",
UserName = "id"
};
var mqttConfiguration = new MqttConfiguration
{
Host = "localhost",
UserName = "id"
};

configuration?.Invoke(mqttConfiguration);
var clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer(mqttConfiguration.Host, mqttConfiguration.Port)
.Build();

mockOptions.SetupGet(o => o.Value)
.Returns(() => mqttConfiguration)
.Verifiable(Times.Once);
_mqttConfigurationOptions.SetupGet(o => o.Value)
.Returns(mqttConfiguration);

_mqttClientOptionsFactory.Setup(f => f.CreateClientOptions(mqttConfiguration))
.Returns(clientOptions);

_mqttFactory.Setup(f => f.CreateMqttClient())
.Returns(_mqttClient.Object);

_mqttClient.SetupGet(c => c.IsConnected)
.Returns(() => IsConnected);

_mqttClient.Setup(c => c.ConnectAsync(clientOptions, It.IsAny<CancellationToken>()))
.Returns(async () =>
{
Interlocked.Increment(ref _connectAttempts);
if (Interlocked.CompareExchange(ref _connectFailuresBeforeSuccess, 0, 0) > 0)
{
Interlocked.Decrement(ref _connectFailuresBeforeSuccess);
throw new InvalidOperationException("MQTT broker is not ready yet.");
}

await _connectCompletion.Task;
IsConnected = true;
_connected.TrySetResult();
return new MqttClientConnectResult
{
ResultCode = MqttClientConnectResultCode.Success
};
});

_mqttClient.Setup(c => c.PublishAsync(It.IsAny<MqttApplicationMessage>(), It.IsAny<CancellationToken>()))
.Callback<MqttApplicationMessage, CancellationToken>((message, _) =>
{
PublishedMessages.Add(message);
_publishedMessage.TrySetResult(message);
})
.ReturnsAsync(() => new MqttClientPublishResult(null, MqttClientPublishReasonCode.Success, string.Empty, []));

_mqttClient.Setup(c => c.SubscribeAsync(It.IsAny<MqttClientSubscribeOptions>(), It.IsAny<CancellationToken>()))
.Callback<MqttClientSubscribeOptions, CancellationToken>((options, _) =>
{
foreach (var topic in options.TopicFilters.Select(topicFilter => topicFilter.Topic))
{
SubscribedTopics.Add(topic);
_subscribedTopic.TrySetResult(topic);
}
})
.ReturnsAsync(() => new MqttClientSubscribeResult(1, [], string.Empty, []));
}

public bool IsConnected { get; private set; }

public int ConnectAttempts => _connectAttempts;

public List<MqttApplicationMessage> PublishedMessages { get; } = [];

public List<string> SubscribedTopics { get; } = [];

public AssuredMqttConnection CreateConnection()
{
return new AssuredMqttConnection(
new Mock<ILogger<AssuredMqttConnection>>().Object,
_mqttClientOptionsFactory.Object,
_mqttFactory.Object,
_mqttConfigurationOptions.Object);
}

public void CompleteConnect()
{
_connectCompletion.TrySetResult();
}

public void FailConnectAttempts(int count)
{
_connectFailuresBeforeSuccess = count;
}

public async Task WaitForConnectedAsync(TimeSpan? timeout = null)
{
await _connected.Task.WaitAsync(timeout ?? DefaultWaitTimeout);
}

public async Task<MqttApplicationMessage> WaitForPublishedMessageAsync(TimeSpan? timeout = null)
{
return await _publishedMessage.Task.WaitAsync(timeout ?? DefaultWaitTimeout);
}

public async Task<string> WaitForSubscribedTopicAsync(TimeSpan? timeout = null)
{
return await _subscribedTopic.Task.WaitAsync(timeout ?? DefaultWaitTimeout);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using MQTTnet.Protocol;
using MQTTnet;
using MQTTnet.Protocol;
using NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests.TestHelpers;

namespace NetDaemon.HassClient.Tests.ExtensionsTest.MqttEntityManagerTests;
Expand All @@ -13,7 +14,7 @@ public async Task TopicAndPayloadAreSet()
await mqttSetup.MessageSender.SendMessageAsync("topic", "payload", true, MqttQualityOfServiceLevel.AtMostOnce);
var publishedMessage = mqttSetup.LastPublishedMessage;

var payloadAsText = System.Text.Encoding.Default.GetString(publishedMessage.PayloadSegment.Array ?? []);
var payloadAsText = publishedMessage.ConvertPayloadToString();

publishedMessage.Topic.Should().Be("topic");
payloadAsText.Should().Be("payload");
Expand Down Expand Up @@ -73,4 +74,4 @@ public async Task CanUnsetPersist()

publishedMessage.Retain.Should().BeFalse();
}
}
}
Loading
Loading