diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection.java index fb720dfeb2..59b8b190de 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection.java @@ -43,6 +43,7 @@ /** * @author Christoph Strobl * @author Mark Paluch + * @author sywu14 * @since 2.0 */ @NullUnmarked @@ -324,7 +325,9 @@ static class AsyncConnect this.connectionPublisher = defer.doOnNext(it -> { if (isClosing(STATE.get(this))) { - it.closeAsync(); + // Connection arrived after close() was initiated. Release it back to the connection provider so a + // pooled connection is returned to the pool instead of being closed without releasing the pool slot. + connectionProvider.releaseAsync(it); } else { connection = it; } diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnectionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnectionUnitTests.java index 54e9d02635..6765e58ef2 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnectionUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnectionUnitTests.java @@ -50,6 +50,7 @@ * Unit tests for {@link LettuceReactiveRedisConnection}. * * @author Mark Paluch + * @author sywu14 */ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) @@ -196,6 +197,32 @@ void shouldPropagateConnectionFailures() { connection.getConnection().as(StepVerifier::create).expectError(RedisConnectionFailureException.class).verify(); } + @Test // GH-3371 + @SuppressWarnings("unchecked") + void shouldReleaseConnectionArrivingAfterClose() throws Exception { + + CompletableFuture> connectionFuture = new CompletableFuture<>(); + reset(connectionProvider); + when(connectionProvider.getConnectionAsync(any())).thenReturn(connectionFuture); + when(connectionProvider.releaseAsync(any())).thenReturn(CompletableFuture.completedFuture(null)); + + LettuceReactiveRedisConnection connection = new LettuceReactiveRedisConnection(connectionProvider); + + // connection request is in-flight, the connection has not arrived yet + CompletableFuture> inFlight = (CompletableFuture) connection + .getConnection().toFuture(); + assertThat(inFlight).isNotDone(); + + // the connection gets closed while the acquisition is still in progress + connection.close(); + + // the connection finally arrives from the provider + connectionFuture.complete(sharedConnection); + + // the late-arriving connection must be released back to the provider (returned to the pool), not leaked + verify(connectionProvider, times(1)).releaseAsync(sharedConnection); + } + @Test // DATAREDIS-720, DATAREDIS-721 void shouldRejectCommandsAfterClose() {