Skip to content

Commit a2bc4fe

Browse files
committed
added publish overload and header fields
1 parent 1b7ec83 commit a2bc4fe

5 files changed

Lines changed: 141 additions & 63 deletions

File tree

src/Communication/DataStructures.cs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ public interface IMessage : ICloneable {
1414
byte[] Payload { get; set; }
1515

1616
QualityOfServiceLevel QOS { get; set;}
17-
long Timestamp { get; set; }
17+
long Timestamp { get; set; }
18+
int ResponseCount { get; set; }
19+
bool BulkResponse { get; set; }
1820

1921

2022
object Content { get; set; }
@@ -31,6 +33,8 @@ public class Message : IMessage {
3133
public object Content { get; set; }
3234
public QualityOfServiceLevel QOS { get; set; }
3335
public long Timestamp { get; set; }
36+
public int ResponseCount { get; set; }
37+
public bool BulkResponse { get; set; }
3438

3539

3640
public Message() { }
@@ -56,8 +60,21 @@ public Message(string clientId, string clientName, string topic, string response
5660
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
5761
}
5862

63+
public Message(string clientId, string clientName, string topic, string responseTopic, string contentType, byte[] payload, QualityOfServiceLevel qos = QualityOfServiceLevel.ExactlyOnce, int responseCount = 1, bool bulkResponse = false) {
64+
ClientId = clientId;
65+
ClientName = clientName;
66+
Topic = topic;
67+
ResponseTopic = responseTopic;
68+
ContentType = contentType;
69+
Payload = payload != null ? payload.ToArray() : payload;
70+
QOS = qos;
71+
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
72+
ResponseCount = responseCount;
73+
BulkResponse = bulkResponse;
74+
}
75+
5976
public object Clone() {
60-
return new Message(ClientId, ClientName, Topic, ResponseTopic, ContentType, Payload, QOS);
77+
return new Message(ClientId, ClientName, Topic, ResponseTopic, ContentType, Payload, QOS, ResponseCount, BulkResponse);
6178
}
6279
}
6380

@@ -80,8 +97,13 @@ public Message(string clientId, string clientName, string topic, string response
8097
Content = content;
8198
}
8299

100+
//public Message(string clientId, string clientName, string topic, string responseTopic, string contentType, byte[] payload, T content, QualityOfServiceLevel qos = QualityOfServiceLevel.ExactlyOnce, int responseCount = 1, bool bulkResponse = false)
101+
// : base(clientId, clientName, topic, responseTopic, contentType, payload, qos) {
102+
// Content = content;
103+
//}
104+
83105
public new object Clone() {
84-
return new Message<T>(ClientId, ClientName, Topic, ResponseTopic, ContentType, Payload, Content);
106+
return new Message<T>(ClientId, ClientName, Topic, ResponseTopic, ContentType, Payload, Content, QOS);
85107
}
86108
}
87109

src/Communication/Sockets/ApachekafkaSocket.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,14 @@ public Task<T1> RequestAsync<T1, T2>(T2 payload) {
521521
throw new NotImplementedException();
522522
}
523523

524+
public Task PublishAsync<T>(string topic, IMessage msg) {
525+
throw new NotImplementedException();
526+
}
527+
528+
public Task PublishAsync(PublicationOptions options, IMessage msg) {
529+
throw new NotImplementedException();
530+
}
531+
524532
#endregion helper
525533
}
526534
}

src/Communication/Sockets/ISocket.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,12 @@ public interface ISocket : ICloneable, IDisposable {
6262

6363
Task PublishAsync<T>(string topic, T payload);
6464

65+
Task PublishAsync<T>(string topic, IMessage msg);
66+
6567
Task PublishAsync<T>(PublicationOptions options, T payload);
6668

69+
Task PublishAsync(PublicationOptions options, IMessage msg);
70+
6771
T Request<T>();
6872

6973
T Request<T>(string topic, string responseTopic, bool generateResponseTopicPostfix = true);

src/Communication/Sockets/MqttSocket.cs

Lines changed: 97 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,9 @@
1-
using Confluent.Kafka;
2-
using Ai.Hgb.Dat.Configuration;
1+
using Ai.Hgb.Dat.Configuration;
32
using Ai.Hgb.Dat.Utils;
43
using MQTTnet;
54
using MQTTnet.Client;
65
using MQTTnet.Extensions.ManagedClient;
7-
using MQTTnet.Internal;
86
using MQTTnet.Protocol;
9-
using System;
10-
using System.Net.Mime;
117

128
namespace Ai.Hgb.Dat.Communication {
139
public class MqttSocket : ISocket {
@@ -55,7 +51,8 @@ public bool BlockingActionExecution {
5551

5652
private Dictionary<SubscriptionOptions, List<ActionItem>> actions;
5753
private Dictionary<RequestOptions, TaskCompletionSource<IMessage>> promises;
58-
54+
55+
#region setup
5956

6057
public MqttSocket(SocketConfiguration configuration) {
6158
this.configuration = configuration;
@@ -130,7 +127,6 @@ public MqttSocket(string id, string name, HostAddress address, IPayloadConverter
130127
if (connect) Connect();
131128
}
132129

133-
134130
public object Clone() {
135131
return new MqttSocket(Configuration.Id, Configuration.Name, Configuration.Broker, Converter,
136132
(SubscriptionOptions)Configuration.DefaultSubscriptionOptions?.Clone(),
@@ -139,6 +135,63 @@ public object Clone() {
139135
BlockingActionExecution);
140136
}
141137

138+
public ISocket Connect() {
139+
if (IsConnected()) return this;
140+
141+
var options = new MqttClientOptionsBuilder()
142+
.WithClientId(Configuration.Name)
143+
.WithWebSocketServer("ws://127.0.0.1:5000/mqtt")
144+
.WithTcpServer(configuration.Broker.Name, configuration.Broker.Port)
145+
.WithCleanSession(true);
146+
var mgOptions = new ManagedMqttClientOptionsBuilder()
147+
.WithAutoReconnectDelay(TimeSpan.FromSeconds(10))
148+
.WithClientOptions(options.Build())
149+
.Build();
150+
151+
client.StartAsync(mgOptions).Wait(cts.Token);
152+
client.ConnectingFailedAsync += OnClient_ConnectingFailedAsync;
153+
connected.WaitOne();
154+
155+
if (configuration.DefaultSubscriptionOptions != null && configuration.DefaultSubscriptionOptions.Topic != null)
156+
client.SubscribeAsync(configuration.DefaultSubscriptionOptions.Topic, GetQosLevel(configuration.DefaultSubscriptionOptions.QosLevel)).Wait(cts.Token);
157+
158+
foreach (var subscription in pendingSubscriptions) {
159+
client.SubscribeAsync(subscription.Topic, GetQosLevel(subscription.QosLevel)).Wait(cts.Token);
160+
}
161+
pendingSubscriptions.Clear();
162+
163+
return this;
164+
}
165+
166+
public ISocket Disconnect() {
167+
client.StopAsync().Wait(cts.Token);
168+
cts.Cancel();
169+
disconnected.WaitOne();
170+
client.Dispose();
171+
client = null;
172+
173+
return this;
174+
}
175+
176+
public void Abort() {
177+
cts.Cancel();
178+
client.StopAsync();
179+
client.Dispose();
180+
client = null;
181+
}
182+
183+
public void Dispose() {
184+
Disconnect();
185+
}
186+
187+
public bool IsConnected() {
188+
return client != null && client.IsConnected;
189+
}
190+
191+
#endregion setup
192+
193+
#region event handling
194+
142195
private void Configuration_ConfigurationChanged(object sender, EventArgs<Ai.Hgb.Dat.Configuration.IConfiguration> e) {
143196
Console.WriteLine("Udating socket now...");
144197
var newConfiguration = e.Value as SocketConfiguration;
@@ -297,63 +350,14 @@ private void OnMessageReceived_AfterRegisteredHandlers(IMessage message) {
297350
if (handler != null) handler(this, new EventArgs<IMessage>(message));
298351
}
299352

300-
public ISocket Connect() {
301-
if (IsConnected()) return this;
302-
303-
var options = new MqttClientOptionsBuilder()
304-
.WithClientId(Configuration.Name)
305-
.WithWebSocketServer("ws://127.0.0.1:5000/mqtt")
306-
.WithTcpServer(configuration.Broker.Name, configuration.Broker.Port)
307-
.WithCleanSession(true);
308-
var mgOptions = new ManagedMqttClientOptionsBuilder()
309-
.WithAutoReconnectDelay(TimeSpan.FromSeconds(10))
310-
.WithClientOptions(options.Build())
311-
.Build();
312-
313-
client.StartAsync(mgOptions).Wait(cts.Token);
314-
client.ConnectingFailedAsync += OnClient_ConnectingFailedAsync;
315-
connected.WaitOne();
316-
317-
if (configuration.DefaultSubscriptionOptions != null && configuration.DefaultSubscriptionOptions.Topic != null)
318-
client.SubscribeAsync(configuration.DefaultSubscriptionOptions.Topic, GetQosLevel(configuration.DefaultSubscriptionOptions.QosLevel)).Wait(cts.Token);
319-
320-
foreach (var subscription in pendingSubscriptions) {
321-
client.SubscribeAsync(subscription.Topic, GetQosLevel(subscription.QosLevel)).Wait(cts.Token);
322-
}
323-
pendingSubscriptions.Clear();
324-
325-
return this;
326-
}
327-
328353
private Task OnClient_ConnectingFailedAsync(ConnectingFailedEventArgs arg) {
329354
Console.WriteLine(arg.Exception.InnerException.ToString());
330355
return Task.CompletedTask;
331356
}
332357

333-
public ISocket Disconnect() {
334-
client.StopAsync().Wait(cts.Token);
335-
cts.Cancel();
336-
disconnected.WaitOne();
337-
client.Dispose();
338-
client = null;
358+
#endregion event handling
339359

340-
return this;
341-
}
342-
343-
public void Abort() {
344-
cts.Cancel();
345-
client.StopAsync();
346-
client.Dispose();
347-
client = null;
348-
}
349-
350-
public void Dispose() {
351-
Disconnect();
352-
}
353-
354-
public bool IsConnected() {
355-
return client != null && client.IsConnected;
356-
}
360+
#region subscribe
357361

358362
private void Subscribe(SubscriptionOptions options) {
359363
var o = options != null ? options : configuration.DefaultSubscriptionOptions;
@@ -439,6 +443,9 @@ public void Unsubscribe(string topic = null) {
439443
}
440444
}
441445

446+
#endregion subscribe
447+
448+
#region publish
442449
public void Publish<T>(T payload) {
443450
var o = (PublicationOptions)configuration.DefaultPublicationOptions.Clone();
444451
Publish(o, payload);
@@ -466,6 +473,12 @@ public async Task PublishAsync<T>(string topic, T payload) {
466473
await PublishAsync(o, payload);
467474
}
468475

476+
public async Task PublishAsync<T>(string topic, IMessage msg) {
477+
var o = (PublicationOptions)configuration.DefaultPublicationOptions.Clone();
478+
o.Topic = topic;
479+
await PublishAsync(o, msg);
480+
}
481+
469482
public async Task PublishAsync<T>(PublicationOptions options, T payload) {
470483
var o = options != null ? options : configuration.DefaultPublicationOptions;
471484

@@ -487,6 +500,28 @@ public async Task PublishAsync<T>(PublicationOptions options, T payload) {
487500
await client.EnqueueAsync(mappMessage);
488501
}
489502

503+
public async Task PublishAsync(PublicationOptions options, IMessage msg) {
504+
var o = options != null ? options : configuration.DefaultPublicationOptions;
505+
506+
msg.Topic = o.Topic; // use old msg, update topic only
507+
508+
var appMessage = new MqttApplicationMessageBuilder()
509+
.WithTopic(msg.Topic)
510+
.WithPayload(converter.Serialize(msg))
511+
.WithQualityOfServiceLevel(GetQosLevel(o.QosLevel))
512+
.Build();
513+
514+
var mappMessage = new ManagedMqttApplicationMessageBuilder()
515+
.WithApplicationMessage(appMessage)
516+
.Build();
517+
518+
await client.EnqueueAsync(mappMessage);
519+
}
520+
521+
#endregion publish
522+
523+
#region request
524+
490525
public T Request<T>() {
491526
var o = (RequestOptions)configuration.DefaultRequestOptions.Clone();
492527

@@ -582,6 +617,8 @@ public async Task<T1> RequestAsync<T1, T2>(RequestOptions options, T2 payload) {
582617
string contentType = payload != null ? typeof(T2).FullName : "";
583618
//IMessage msg = new Message<T2>(Configuration.Id, Configuration.Name, o.Topic, o.ResponseTopic, contentType, payload);
584619
var msg = new Message<T2>(Configuration.Id, Configuration.Name, o.Topic, o.ResponseTopic, contentType, converter.Serialize<T2>(payload), payload);
620+
msg.ResponseCount = o.ResponseCount;
621+
msg.BulkResponse = o.BulkResponse;
585622
var serMsg = converter.Serialize(msg);
586623

587624
appMessageBuilder = msg != null
@@ -606,6 +643,8 @@ public async Task<T1> RequestAsync<T1, T2>(RequestOptions options, T2 payload) {
606643
return converter.Deserialize<T1>(response.Payload);
607644
}
608645

646+
#endregion request
647+
609648
#region helper
610649

611650
private MqttQualityOfServiceLevel GetQosLevel(QualityOfServiceLevel qosl) {

src/Configuration/DataStructures.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ public class RequestOptions : ICloneable {
113113

114114
public string ContentTypeFullName { get; set; }
115115

116+
public int ResponseCount { get; set; }
117+
public bool BulkResponse { get; set; }
118+
116119
public RequestOptions() {
117120
Topic = null;
118121
ResponseTopic = null;
@@ -121,16 +124,18 @@ public RequestOptions() {
121124
ContentTypeFullName = "";
122125
}
123126

124-
public RequestOptions(string topic, string responseTopic, bool generateResponseTopicPostfix = true, Type contentType = null, string contentTypeFullName = "") {
127+
public RequestOptions(string topic, string responseTopic, bool generateResponseTopicPostfix = true, Type contentType = null, string contentTypeFullName = "", int responseCount = 1, bool bulkResponse = false) {
125128
Topic = topic;
126129
ResponseTopic = responseTopic;
127130
GenerateResponseTopicPostfix = generateResponseTopicPostfix;
128131
ContentType = contentType;
129132
ContentTypeFullName = contentTypeFullName;
133+
ResponseCount = responseCount;
134+
BulkResponse = bulkResponse;
130135
}
131136

132137
public object Clone() {
133-
return new RequestOptions(Topic, ResponseTopic, GenerateResponseTopicPostfix, ContentType, ContentTypeFullName);
138+
return new RequestOptions(Topic, ResponseTopic, GenerateResponseTopicPostfix, ContentType, ContentTypeFullName, ResponseCount, BulkResponse);
134139
}
135140

136141
public SubscriptionOptions GetRequestSubscriptionOptions() {

0 commit comments

Comments
 (0)