diff --git a/dart/lib/leancode_pipe/pipe_client.dart b/dart/lib/leancode_pipe/pipe_client.dart index 001b003..fedb420 100644 --- a/dart/lib/leancode_pipe/pipe_client.dart +++ b/dart/lib/leancode_pipe/pipe_client.dart @@ -64,6 +64,10 @@ class PipeClient { _hubConnection.state, ); + Stream get connectionStateStream => + _hubConnection.connectionStateStream + .map(PipeConnectionStateMapper.fromHubConnectionState); + Future connect() async { if (connectionState != PipeConnectionState.disconnected) { _logger.warning( @@ -558,7 +562,7 @@ class PipeClient { Future dispose() async { await Future.wait(_registeredTopicSubscriptions.map((e) => e.close())); - await _hubConnection.stop(); + await _hubConnection.dispose(); } Future _sendPipeServiceMethod({ diff --git a/dart/lib/signalr_core/src/hub_connection.dart b/dart/lib/signalr_core/src/hub_connection.dart index 7bcbb0a..8bf9abe 100644 --- a/dart/lib/signalr_core/src/hub_connection.dart +++ b/dart/lib/signalr_core/src/hub_connection.dart @@ -6,6 +6,7 @@ import 'package:leancode_pipe/signalr_core/src/hub_protocol.dart'; import 'package:leancode_pipe/signalr_core/src/logger.dart'; import 'package:leancode_pipe/signalr_core/src/retry_policy.dart'; import 'package:leancode_pipe/signalr_core/src/utils.dart'; +import 'package:rxdart/rxdart.dart'; typedef InvocationEventCallback = void Function( HubMessage? invocationEvent, Exception? exception); @@ -55,8 +56,8 @@ class HubConnection { late Completer _handshakeCompleter; Exception? _stopDuringStartError; - HubConnectionState? _connectionState; late bool _connectionStarted; + late BehaviorSubject _connectionStateSubject; Future? _startFuture; Future? _stopFuture; @@ -92,7 +93,8 @@ class HubConnection { _reconnectedCallbacks = []; _invocationId = 0; _receivedHandshakeResponse = false; - _connectionState = HubConnectionState.disconnected; + _connectionStateSubject = + BehaviorSubject.seeded(HubConnectionState.disconnected); _connectionStarted = false; } @@ -111,7 +113,11 @@ class HubConnection { late int keepAliveIntervalInMilliseconds; /// Indicates the state of the {@link HubConnection} to the server. - HubConnectionState? get state => _connectionState; + HubConnectionState? get state => _connectionStateSubject.value; + + /// Stream of the state changes of the {@link HubConnection} to the server. + Stream get connectionStateStream => + _connectionStateSubject.stream; /// Represents the connection id of the [HubConnection] on the server. The /// connection id will be null when the connection is either @@ -125,8 +131,8 @@ class HubConnection { /// changed when the connection is in either the Disconnected or /// Reconnecting states. set baseUrl(String url) { - if ((_connectionState != HubConnectionState.disconnected) && - (_connectionState != HubConnectionState.reconnecting)) { + if ((state != HubConnectionState.disconnected) && + (state != HubConnectionState.reconnecting)) { throw Exception( 'The HubConnection must be in the Disconnected or Reconnecting ' 'state to change the url.', @@ -147,24 +153,24 @@ class HubConnection { } Future _startWithStateTransitions() async { - if (_connectionState != HubConnectionState.disconnected) { + if (state != HubConnectionState.disconnected) { return Future.error(Exception( 'Cannot start a HubConnection that is not in the ' '\'Disconnected\' state.', )); } - _connectionState = HubConnectionState.connecting; + _connectionStateSubject.add(HubConnectionState.connecting); _logger!(LogLevel.debug, 'Starting HubConnection.'); try { await _startInternal(); - _connectionState = HubConnectionState.connected; + _connectionStateSubject.add(HubConnectionState.connected); _connectionStarted = true; _logger(LogLevel.debug, 'HubConnection connected successfully.'); } catch (e) { - _connectionState = HubConnectionState.disconnected; + _connectionStateSubject.add(HubConnectionState.disconnected); _logger( LogLevel.debug, 'HubConnection failed to start successfully because of error ' @@ -241,6 +247,11 @@ class HubConnection { } } + Future dispose() async { + await _connectionStateSubject.close(); + await stop(); + } + /// Stops the connection. Future stop() async { // Capture the start future before the connection might be restarted in an @@ -260,7 +271,7 @@ class HubConnection { } Future? _stopInternal({Exception? exception}) async { - if (_connectionState == HubConnectionState.disconnected) { + if (state == HubConnectionState.disconnected) { _logger!( LogLevel.debug, 'Call to HubConnection.stop(${exception.toString()}) ignored ' @@ -269,7 +280,7 @@ class HubConnection { return Future.value(null); } - if (_connectionState == HubConnectionState.disconnecting) { + if (state == HubConnectionState.disconnecting) { _logger!( LogLevel.debug, 'Call to HttpConnection.stop(${exception.toString()}) ignored ' @@ -278,7 +289,7 @@ class HubConnection { return _stopFuture; } - _connectionState = HubConnectionState.disconnecting; + _connectionStateSubject.add(HubConnectionState.disconnecting); _logger!(LogLevel.debug, 'Stopping HubConnection'); @@ -359,7 +370,7 @@ class HubConnection { _pingServerHandle = Timer.periodic(Duration(milliseconds: keepAliveIntervalInMilliseconds), (Timer timer) async { - if (_connectionState == HubConnectionState.connected) { + if (state == HubConnectionState.connected) { try { await _sendMessage(_cachedPingMessage); } catch (e) { @@ -395,7 +406,7 @@ class HubConnection { void _completeClose({Exception? exception}) { if (_connectionStarted) { - _connectionState = HubConnectionState.disconnected; + _connectionStateSubject.add(HubConnectionState.disconnected); _connectionStarted = false; try { @@ -435,7 +446,7 @@ class HubConnection { return; } - _connectionState = HubConnectionState.reconnecting; + _connectionStateSubject.add(HubConnectionState.reconnecting); if (exception != null) { _logger!( @@ -460,7 +471,7 @@ class HubConnection { } // Exit early if an onreconnecting callback called connection.stop(). - if (_connectionState != HubConnectionState.reconnecting) { + if (state != HubConnectionState.reconnecting) { _logger( LogLevel.debug, 'Connection left the reconnecting state in onreconnecting ' @@ -484,8 +495,7 @@ class HubConnection { }); _reconnectDelayHandle = null; - if (_connectionState == null || - _connectionState != HubConnectionState.reconnecting) { + if (state == null || state != HubConnectionState.reconnecting) { _logger( LogLevel.debug, 'Connection left the reconnecting state during reconnect delay. ' @@ -497,7 +507,7 @@ class HubConnection { try { await _startInternal(); - _connectionState = HubConnectionState.connected; + _connectionStateSubject.add(HubConnectionState.connected); _logger( LogLevel.information, 'HubConnection reconnected successfully.'); @@ -518,7 +528,7 @@ class HubConnection { _logger(LogLevel.information, 'Reconnect attempt failed because of error \'${e.toString()}\'.'); - if (_connectionState != HubConnectionState.reconnecting) { + if (state != HubConnectionState.reconnecting) { _logger( LogLevel.debug, 'Connection left the reconnecting state during reconnect ' @@ -863,7 +873,7 @@ class HubConnection { _logger!( LogLevel.debug, 'HubConnection.connectionClosed(${exception.toString()}) called while ' - 'in state ${_connectionState.toString()}.', + 'in state ${state.toString()}.', ); // Triggering this.handshakeRejecter is insufficient because it could @@ -894,12 +904,12 @@ class HubConnection { _cleanupTimeout(); _cleanupPingTimer(); - if (_connectionState == HubConnectionState.disconnecting) { + if (state == HubConnectionState.disconnecting) { _completeClose(exception: exception); - } else if ((_connectionState == HubConnectionState.connected) && + } else if ((state == HubConnectionState.connected) && _reconnectPolicy != null) { _reconnect(exception: exception); - } else if (_connectionState == HubConnectionState.connected) { + } else if (state == HubConnectionState.connected) { _completeClose(exception: exception); }