Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dart/lib/leancode_pipe/pipe_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ class PipeClient {
_hubConnection.state,
);

Stream<PipeConnectionState> get connectionStateStream =>
_hubConnection.connectionStateStream
.map(PipeConnectionStateMapper.fromHubConnectionState);

Future<void> connect() async {
if (connectionState != PipeConnectionState.disconnected) {
_logger.warning(
Expand Down Expand Up @@ -558,7 +562,7 @@ class PipeClient {

Future<void> dispose() async {
await Future.wait(_registeredTopicSubscriptions.map((e) => e.close()));
await _hubConnection.stop();
await _hubConnection.dispose();
}

Future<R> _sendPipeServiceMethod<R extends Object>({
Expand Down
58 changes: 34 additions & 24 deletions dart/lib/signalr_core/src/hub_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import 'package:leancode_pipe/signalr_core/src/hub_protocol.dart';
import 'package:leancode_pipe/signalr_core/src/logger.dart';
import 'package:leancode_pipe/signalr_core/src/retry_policy.dart';
import 'package:leancode_pipe/signalr_core/src/utils.dart';
import 'package:rxdart/rxdart.dart';

typedef InvocationEventCallback = void Function(
HubMessage? invocationEvent, Exception? exception);
Expand Down Expand Up @@ -55,8 +56,8 @@ class HubConnection {
late Completer _handshakeCompleter;
Exception? _stopDuringStartError;

HubConnectionState? _connectionState;
late bool _connectionStarted;
late BehaviorSubject<HubConnectionState> _connectionStateSubject;
Future<void>? _startFuture;
Future<void>? _stopFuture;

Expand Down Expand Up @@ -92,7 +93,8 @@ class HubConnection {
_reconnectedCallbacks = [];
_invocationId = 0;
_receivedHandshakeResponse = false;
_connectionState = HubConnectionState.disconnected;
_connectionStateSubject =
BehaviorSubject.seeded(HubConnectionState.disconnected);
_connectionStarted = false;
}

Expand All @@ -111,7 +113,11 @@ class HubConnection {
late int keepAliveIntervalInMilliseconds;

/// Indicates the state of the {@link HubConnection} to the server.
HubConnectionState? get state => _connectionState;
HubConnectionState? get state => _connectionStateSubject.value;

/// Stream of the state changes of the {@link HubConnection} to the server.
Stream<HubConnectionState> get connectionStateStream =>
_connectionStateSubject.stream;

/// Represents the connection id of the [HubConnection] on the server. The
/// connection id will be null when the connection is either
Expand All @@ -125,8 +131,8 @@ class HubConnection {
/// changed when the connection is in either the Disconnected or
/// Reconnecting states.
set baseUrl(String url) {
if ((_connectionState != HubConnectionState.disconnected) &&
(_connectionState != HubConnectionState.reconnecting)) {
if ((state != HubConnectionState.disconnected) &&
(state != HubConnectionState.reconnecting)) {
throw Exception(
'The HubConnection must be in the Disconnected or Reconnecting '
'state to change the url.',
Expand All @@ -147,24 +153,24 @@ class HubConnection {
}

Future<void> _startWithStateTransitions() async {
if (_connectionState != HubConnectionState.disconnected) {
if (state != HubConnectionState.disconnected) {
return Future.error(Exception(
'Cannot start a HubConnection that is not in the '
'\'Disconnected\' state.',
));
}

_connectionState = HubConnectionState.connecting;
_connectionStateSubject.add(HubConnectionState.connecting);
_logger!(LogLevel.debug, 'Starting HubConnection.');

try {
await _startInternal();

_connectionState = HubConnectionState.connected;
_connectionStateSubject.add(HubConnectionState.connected);
_connectionStarted = true;
_logger(LogLevel.debug, 'HubConnection connected successfully.');
} catch (e) {
_connectionState = HubConnectionState.disconnected;
_connectionStateSubject.add(HubConnectionState.disconnected);
_logger(
LogLevel.debug,
'HubConnection failed to start successfully because of error '
Expand Down Expand Up @@ -241,6 +247,11 @@ class HubConnection {
}
}

Future<void> dispose() async {
await _connectionStateSubject.close();
await stop();
}

/// Stops the connection.
Future<void> stop() async {
// Capture the start future before the connection might be restarted in an
Expand All @@ -260,7 +271,7 @@ class HubConnection {
}

Future<void>? _stopInternal({Exception? exception}) async {
if (_connectionState == HubConnectionState.disconnected) {
if (state == HubConnectionState.disconnected) {
_logger!(
LogLevel.debug,
'Call to HubConnection.stop(${exception.toString()}) ignored '
Expand All @@ -269,7 +280,7 @@ class HubConnection {
return Future.value(null);
}

if (_connectionState == HubConnectionState.disconnecting) {
if (state == HubConnectionState.disconnecting) {
_logger!(
LogLevel.debug,
'Call to HttpConnection.stop(${exception.toString()}) ignored '
Expand All @@ -278,7 +289,7 @@ class HubConnection {
return _stopFuture;
}

_connectionState = HubConnectionState.disconnecting;
_connectionStateSubject.add(HubConnectionState.disconnecting);

_logger!(LogLevel.debug, 'Stopping HubConnection');

Expand Down Expand Up @@ -359,7 +370,7 @@ class HubConnection {
_pingServerHandle =
Timer.periodic(Duration(milliseconds: keepAliveIntervalInMilliseconds),
(Timer timer) async {
if (_connectionState == HubConnectionState.connected) {
if (state == HubConnectionState.connected) {
try {
await _sendMessage(_cachedPingMessage);
} catch (e) {
Expand Down Expand Up @@ -395,7 +406,7 @@ class HubConnection {

void _completeClose({Exception? exception}) {
if (_connectionStarted) {
_connectionState = HubConnectionState.disconnected;
_connectionStateSubject.add(HubConnectionState.disconnected);
_connectionStarted = false;

try {
Expand Down Expand Up @@ -435,7 +446,7 @@ class HubConnection {
return;
}

_connectionState = HubConnectionState.reconnecting;
_connectionStateSubject.add(HubConnectionState.reconnecting);

if (exception != null) {
_logger!(
Expand All @@ -460,7 +471,7 @@ class HubConnection {
}

// Exit early if an onreconnecting callback called connection.stop().
if (_connectionState != HubConnectionState.reconnecting) {
if (state != HubConnectionState.reconnecting) {
_logger(
LogLevel.debug,
'Connection left the reconnecting state in onreconnecting '
Expand All @@ -484,8 +495,7 @@ class HubConnection {
});
_reconnectDelayHandle = null;

if (_connectionState == null ||
_connectionState != HubConnectionState.reconnecting) {
if (state == null || state != HubConnectionState.reconnecting) {
_logger(
LogLevel.debug,
'Connection left the reconnecting state during reconnect delay. '
Expand All @@ -497,7 +507,7 @@ class HubConnection {
try {
await _startInternal();

_connectionState = HubConnectionState.connected;
_connectionStateSubject.add(HubConnectionState.connected);
_logger(
LogLevel.information, 'HubConnection reconnected successfully.');

Expand All @@ -518,7 +528,7 @@ class HubConnection {
_logger(LogLevel.information,
'Reconnect attempt failed because of error \'${e.toString()}\'.');

if (_connectionState != HubConnectionState.reconnecting) {
if (state != HubConnectionState.reconnecting) {
_logger(
LogLevel.debug,
'Connection left the reconnecting state during reconnect '
Expand Down Expand Up @@ -863,7 +873,7 @@ class HubConnection {
_logger!(
LogLevel.debug,
'HubConnection.connectionClosed(${exception.toString()}) called while '
'in state ${_connectionState.toString()}.',
'in state ${state.toString()}.',
);

// Triggering this.handshakeRejecter is insufficient because it could
Expand Down Expand Up @@ -894,12 +904,12 @@ class HubConnection {
_cleanupTimeout();
_cleanupPingTimer();

if (_connectionState == HubConnectionState.disconnecting) {
if (state == HubConnectionState.disconnecting) {
_completeClose(exception: exception);
} else if ((_connectionState == HubConnectionState.connected) &&
} else if ((state == HubConnectionState.connected) &&
_reconnectPolicy != null) {
_reconnect(exception: exception);
} else if (_connectionState == HubConnectionState.connected) {
} else if (state == HubConnectionState.connected) {
_completeClose(exception: exception);
}

Expand Down