From d3a37ddc19066f7de49e80d16cb024770751913c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Komoszy=C5=84ski?= Date: Thu, 18 Dec 2025 11:50:08 +0100 Subject: [PATCH 1/6] Export connectionStateStream in pipe_client --- dart/lib/leancode_pipe/pipe_client.dart | 4 ++++ dart/lib/signalr_core/src/hub_connection.dart | 24 +++++++++++++------ 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/dart/lib/leancode_pipe/pipe_client.dart b/dart/lib/leancode_pipe/pipe_client.dart index 001b003..c42bf8c 100644 --- a/dart/lib/leancode_pipe/pipe_client.dart +++ b/dart/lib/leancode_pipe/pipe_client.dart @@ -64,6 +64,10 @@ class PipeClient { _hubConnection.state, ); + Stream get connectionStateStream => + _hubConnection.connectionStateStream + .map(PipeConnectionStateMapper.fromHubConnectionState); + Future connect() async { if (connectionState != PipeConnectionState.disconnected) { _logger.warning( diff --git a/dart/lib/signalr_core/src/hub_connection.dart b/dart/lib/signalr_core/src/hub_connection.dart index 7bcbb0a..6de779e 100644 --- a/dart/lib/signalr_core/src/hub_connection.dart +++ b/dart/lib/signalr_core/src/hub_connection.dart @@ -57,6 +57,7 @@ class HubConnection { HubConnectionState? _connectionState; late bool _connectionStarted; + late StreamController _connectionStateStreamController; Future? _startFuture; Future? _stopFuture; @@ -93,6 +94,7 @@ class HubConnection { _invocationId = 0; _receivedHandshakeResponse = false; _connectionState = HubConnectionState.disconnected; + _connectionStateStreamController = StreamController(); _connectionStarted = false; } @@ -113,6 +115,14 @@ class HubConnection { /// Indicates the state of the {@link HubConnection} to the server. HubConnectionState? get state => _connectionState; + Stream get connectionStateStream => + _connectionStateStreamController.stream; + + void updateConnectionState(HubConnectionState state) { + _connectionState = state; + _connectionStateStreamController.add(state); + } + /// Represents the connection id of the [HubConnection] on the server. The /// connection id will be null when the connection is either /// in the disconnected state or if the negotiation step was skipped. @@ -154,17 +164,17 @@ class HubConnection { )); } - _connectionState = HubConnectionState.connecting; + updateConnectionState(HubConnectionState.connecting); _logger!(LogLevel.debug, 'Starting HubConnection.'); try { await _startInternal(); - _connectionState = HubConnectionState.connected; + updateConnectionState(HubConnectionState.connected); _connectionStarted = true; _logger(LogLevel.debug, 'HubConnection connected successfully.'); } catch (e) { - _connectionState = HubConnectionState.disconnected; + updateConnectionState(HubConnectionState.disconnected); _logger( LogLevel.debug, 'HubConnection failed to start successfully because of error ' @@ -278,7 +288,7 @@ class HubConnection { return _stopFuture; } - _connectionState = HubConnectionState.disconnecting; + updateConnectionState(HubConnectionState.disconnecting); _logger!(LogLevel.debug, 'Stopping HubConnection'); @@ -395,7 +405,7 @@ class HubConnection { void _completeClose({Exception? exception}) { if (_connectionStarted) { - _connectionState = HubConnectionState.disconnected; + updateConnectionState(HubConnectionState.disconnected); _connectionStarted = false; try { @@ -435,7 +445,7 @@ class HubConnection { return; } - _connectionState = HubConnectionState.reconnecting; + updateConnectionState(HubConnectionState.reconnecting); if (exception != null) { _logger!( @@ -497,7 +507,7 @@ class HubConnection { try { await _startInternal(); - _connectionState = HubConnectionState.connected; + updateConnectionState(HubConnectionState.connected); _logger( LogLevel.information, 'HubConnection reconnected successfully.'); From 5078a50bd319b51cb41f61378e75fc0a93f04999 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Komoszy=C5=84ski?= Date: Thu, 18 Dec 2025 12:42:54 +0100 Subject: [PATCH 2/6] Make updateConnectionState private --- dart/lib/signalr_core/src/hub_connection.dart | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/dart/lib/signalr_core/src/hub_connection.dart b/dart/lib/signalr_core/src/hub_connection.dart index 6de779e..ff15bb6 100644 --- a/dart/lib/signalr_core/src/hub_connection.dart +++ b/dart/lib/signalr_core/src/hub_connection.dart @@ -118,7 +118,7 @@ class HubConnection { Stream get connectionStateStream => _connectionStateStreamController.stream; - void updateConnectionState(HubConnectionState state) { + void _updateConnectionState(HubConnectionState state) { _connectionState = state; _connectionStateStreamController.add(state); } @@ -164,17 +164,17 @@ class HubConnection { )); } - updateConnectionState(HubConnectionState.connecting); + _updateConnectionState(HubConnectionState.connecting); _logger!(LogLevel.debug, 'Starting HubConnection.'); try { await _startInternal(); - updateConnectionState(HubConnectionState.connected); + _updateConnectionState(HubConnectionState.connected); _connectionStarted = true; _logger(LogLevel.debug, 'HubConnection connected successfully.'); } catch (e) { - updateConnectionState(HubConnectionState.disconnected); + _updateConnectionState(HubConnectionState.disconnected); _logger( LogLevel.debug, 'HubConnection failed to start successfully because of error ' @@ -288,7 +288,7 @@ class HubConnection { return _stopFuture; } - updateConnectionState(HubConnectionState.disconnecting); + _updateConnectionState(HubConnectionState.disconnecting); _logger!(LogLevel.debug, 'Stopping HubConnection'); @@ -405,7 +405,7 @@ class HubConnection { void _completeClose({Exception? exception}) { if (_connectionStarted) { - updateConnectionState(HubConnectionState.disconnected); + _updateConnectionState(HubConnectionState.disconnected); _connectionStarted = false; try { @@ -445,7 +445,7 @@ class HubConnection { return; } - updateConnectionState(HubConnectionState.reconnecting); + _updateConnectionState(HubConnectionState.reconnecting); if (exception != null) { _logger!( @@ -507,7 +507,7 @@ class HubConnection { try { await _startInternal(); - updateConnectionState(HubConnectionState.connected); + _updateConnectionState(HubConnectionState.connected); _logger( LogLevel.information, 'HubConnection reconnected successfully.'); From 11c8cca2c04773c6957049a67e3d5b97946ca888 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Komoszy=C5=84ski?= Date: Thu, 18 Dec 2025 12:44:06 +0100 Subject: [PATCH 3/6] Add broadcast to _connectionStateStreamController --- dart/lib/signalr_core/src/hub_connection.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dart/lib/signalr_core/src/hub_connection.dart b/dart/lib/signalr_core/src/hub_connection.dart index ff15bb6..f09b37f 100644 --- a/dart/lib/signalr_core/src/hub_connection.dart +++ b/dart/lib/signalr_core/src/hub_connection.dart @@ -94,7 +94,7 @@ class HubConnection { _invocationId = 0; _receivedHandshakeResponse = false; _connectionState = HubConnectionState.disconnected; - _connectionStateStreamController = StreamController(); + _connectionStateStreamController = StreamController.broadcast(); _connectionStarted = false; } From 1982071f7410ae2fcdcc498eb125eb4da0e1e181 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Komoszy=C5=84ski?= Date: Thu, 18 Dec 2025 12:45:24 +0100 Subject: [PATCH 4/6] Document connectionStateStream --- dart/lib/signalr_core/src/hub_connection.dart | 1 + 1 file changed, 1 insertion(+) diff --git a/dart/lib/signalr_core/src/hub_connection.dart b/dart/lib/signalr_core/src/hub_connection.dart index f09b37f..854fb16 100644 --- a/dart/lib/signalr_core/src/hub_connection.dart +++ b/dart/lib/signalr_core/src/hub_connection.dart @@ -115,6 +115,7 @@ class HubConnection { /// Indicates the state of the {@link HubConnection} to the server. HubConnectionState? get state => _connectionState; + /// Stream of the state changes of the {@link HubConnection} to the server. Stream get connectionStateStream => _connectionStateStreamController.stream; From 00a913225071733035216c8ff16a4b3036f3679b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Komoszy=C5=84ski?= Date: Thu, 18 Dec 2025 13:13:34 +0100 Subject: [PATCH 5/6] Expose dispose method from _hubConnection --- dart/lib/leancode_pipe/pipe_client.dart | 2 +- dart/lib/signalr_core/src/hub_connection.dart | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/dart/lib/leancode_pipe/pipe_client.dart b/dart/lib/leancode_pipe/pipe_client.dart index c42bf8c..fedb420 100644 --- a/dart/lib/leancode_pipe/pipe_client.dart +++ b/dart/lib/leancode_pipe/pipe_client.dart @@ -562,7 +562,7 @@ class PipeClient { Future dispose() async { await Future.wait(_registeredTopicSubscriptions.map((e) => e.close())); - await _hubConnection.stop(); + await _hubConnection.dispose(); } Future _sendPipeServiceMethod({ diff --git a/dart/lib/signalr_core/src/hub_connection.dart b/dart/lib/signalr_core/src/hub_connection.dart index 854fb16..88559e8 100644 --- a/dart/lib/signalr_core/src/hub_connection.dart +++ b/dart/lib/signalr_core/src/hub_connection.dart @@ -252,6 +252,11 @@ class HubConnection { } } + Future dispose() async { + await _connectionStateStreamController.close(); + await stop(); + } + /// Stops the connection. Future stop() async { // Capture the start future before the connection might be restarted in an From 5a3e2eaebb52e22210b735848f59297e55ef90c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Komoszy=C5=84ski?= Date: Fri, 19 Dec 2025 16:25:53 +0100 Subject: [PATCH 6/6] Use BehaviorSubject for state instead of separate state and stream controller --- dart/lib/signalr_core/src/hub_connection.dart | 60 +++++++++---------- 1 file changed, 27 insertions(+), 33 deletions(-) diff --git a/dart/lib/signalr_core/src/hub_connection.dart b/dart/lib/signalr_core/src/hub_connection.dart index 88559e8..8bf9abe 100644 --- a/dart/lib/signalr_core/src/hub_connection.dart +++ b/dart/lib/signalr_core/src/hub_connection.dart @@ -6,6 +6,7 @@ import 'package:leancode_pipe/signalr_core/src/hub_protocol.dart'; import 'package:leancode_pipe/signalr_core/src/logger.dart'; import 'package:leancode_pipe/signalr_core/src/retry_policy.dart'; import 'package:leancode_pipe/signalr_core/src/utils.dart'; +import 'package:rxdart/rxdart.dart'; typedef InvocationEventCallback = void Function( HubMessage? invocationEvent, Exception? exception); @@ -55,9 +56,8 @@ class HubConnection { late Completer _handshakeCompleter; Exception? _stopDuringStartError; - HubConnectionState? _connectionState; late bool _connectionStarted; - late StreamController _connectionStateStreamController; + late BehaviorSubject _connectionStateSubject; Future? _startFuture; Future? _stopFuture; @@ -93,8 +93,8 @@ class HubConnection { _reconnectedCallbacks = []; _invocationId = 0; _receivedHandshakeResponse = false; - _connectionState = HubConnectionState.disconnected; - _connectionStateStreamController = StreamController.broadcast(); + _connectionStateSubject = + BehaviorSubject.seeded(HubConnectionState.disconnected); _connectionStarted = false; } @@ -113,16 +113,11 @@ class HubConnection { late int keepAliveIntervalInMilliseconds; /// Indicates the state of the {@link HubConnection} to the server. - HubConnectionState? get state => _connectionState; + HubConnectionState? get state => _connectionStateSubject.value; /// Stream of the state changes of the {@link HubConnection} to the server. Stream get connectionStateStream => - _connectionStateStreamController.stream; - - void _updateConnectionState(HubConnectionState state) { - _connectionState = state; - _connectionStateStreamController.add(state); - } + _connectionStateSubject.stream; /// Represents the connection id of the [HubConnection] on the server. The /// connection id will be null when the connection is either @@ -136,8 +131,8 @@ class HubConnection { /// changed when the connection is in either the Disconnected or /// Reconnecting states. set baseUrl(String url) { - if ((_connectionState != HubConnectionState.disconnected) && - (_connectionState != HubConnectionState.reconnecting)) { + if ((state != HubConnectionState.disconnected) && + (state != HubConnectionState.reconnecting)) { throw Exception( 'The HubConnection must be in the Disconnected or Reconnecting ' 'state to change the url.', @@ -158,24 +153,24 @@ class HubConnection { } Future _startWithStateTransitions() async { - if (_connectionState != HubConnectionState.disconnected) { + if (state != HubConnectionState.disconnected) { return Future.error(Exception( 'Cannot start a HubConnection that is not in the ' '\'Disconnected\' state.', )); } - _updateConnectionState(HubConnectionState.connecting); + _connectionStateSubject.add(HubConnectionState.connecting); _logger!(LogLevel.debug, 'Starting HubConnection.'); try { await _startInternal(); - _updateConnectionState(HubConnectionState.connected); + _connectionStateSubject.add(HubConnectionState.connected); _connectionStarted = true; _logger(LogLevel.debug, 'HubConnection connected successfully.'); } catch (e) { - _updateConnectionState(HubConnectionState.disconnected); + _connectionStateSubject.add(HubConnectionState.disconnected); _logger( LogLevel.debug, 'HubConnection failed to start successfully because of error ' @@ -253,7 +248,7 @@ class HubConnection { } Future dispose() async { - await _connectionStateStreamController.close(); + await _connectionStateSubject.close(); await stop(); } @@ -276,7 +271,7 @@ class HubConnection { } Future? _stopInternal({Exception? exception}) async { - if (_connectionState == HubConnectionState.disconnected) { + if (state == HubConnectionState.disconnected) { _logger!( LogLevel.debug, 'Call to HubConnection.stop(${exception.toString()}) ignored ' @@ -285,7 +280,7 @@ class HubConnection { return Future.value(null); } - if (_connectionState == HubConnectionState.disconnecting) { + if (state == HubConnectionState.disconnecting) { _logger!( LogLevel.debug, 'Call to HttpConnection.stop(${exception.toString()}) ignored ' @@ -294,7 +289,7 @@ class HubConnection { return _stopFuture; } - _updateConnectionState(HubConnectionState.disconnecting); + _connectionStateSubject.add(HubConnectionState.disconnecting); _logger!(LogLevel.debug, 'Stopping HubConnection'); @@ -375,7 +370,7 @@ class HubConnection { _pingServerHandle = Timer.periodic(Duration(milliseconds: keepAliveIntervalInMilliseconds), (Timer timer) async { - if (_connectionState == HubConnectionState.connected) { + if (state == HubConnectionState.connected) { try { await _sendMessage(_cachedPingMessage); } catch (e) { @@ -411,7 +406,7 @@ class HubConnection { void _completeClose({Exception? exception}) { if (_connectionStarted) { - _updateConnectionState(HubConnectionState.disconnected); + _connectionStateSubject.add(HubConnectionState.disconnected); _connectionStarted = false; try { @@ -451,7 +446,7 @@ class HubConnection { return; } - _updateConnectionState(HubConnectionState.reconnecting); + _connectionStateSubject.add(HubConnectionState.reconnecting); if (exception != null) { _logger!( @@ -476,7 +471,7 @@ class HubConnection { } // Exit early if an onreconnecting callback called connection.stop(). - if (_connectionState != HubConnectionState.reconnecting) { + if (state != HubConnectionState.reconnecting) { _logger( LogLevel.debug, 'Connection left the reconnecting state in onreconnecting ' @@ -500,8 +495,7 @@ class HubConnection { }); _reconnectDelayHandle = null; - if (_connectionState == null || - _connectionState != HubConnectionState.reconnecting) { + if (state == null || state != HubConnectionState.reconnecting) { _logger( LogLevel.debug, 'Connection left the reconnecting state during reconnect delay. ' @@ -513,7 +507,7 @@ class HubConnection { try { await _startInternal(); - _updateConnectionState(HubConnectionState.connected); + _connectionStateSubject.add(HubConnectionState.connected); _logger( LogLevel.information, 'HubConnection reconnected successfully.'); @@ -534,7 +528,7 @@ class HubConnection { _logger(LogLevel.information, 'Reconnect attempt failed because of error \'${e.toString()}\'.'); - if (_connectionState != HubConnectionState.reconnecting) { + if (state != HubConnectionState.reconnecting) { _logger( LogLevel.debug, 'Connection left the reconnecting state during reconnect ' @@ -879,7 +873,7 @@ class HubConnection { _logger!( LogLevel.debug, 'HubConnection.connectionClosed(${exception.toString()}) called while ' - 'in state ${_connectionState.toString()}.', + 'in state ${state.toString()}.', ); // Triggering this.handshakeRejecter is insufficient because it could @@ -910,12 +904,12 @@ class HubConnection { _cleanupTimeout(); _cleanupPingTimer(); - if (_connectionState == HubConnectionState.disconnecting) { + if (state == HubConnectionState.disconnecting) { _completeClose(exception: exception); - } else if ((_connectionState == HubConnectionState.connected) && + } else if ((state == HubConnectionState.connected) && _reconnectPolicy != null) { _reconnect(exception: exception); - } else if (_connectionState == HubConnectionState.connected) { + } else if (state == HubConnectionState.connected) { _completeClose(exception: exception); }