@@ -86,6 +86,11 @@ public void start() {
8686 * configured.
8787 */
8888 public void close () {
89+ this .closeSideCarChannel ();
90+ this .shutDownWorkerPool ();
91+ }
92+
93+ private void closeSideCarChannel () {
8994 if (this .managedSidecarChannel != null ) {
9095 try {
9196 this .managedSidecarChannel .shutdownNow ().awaitTermination (5 , TimeUnit .SECONDS );
@@ -97,10 +102,6 @@ public void close() {
97102 }
98103 }
99104
100- private String getSidecarAddress () {
101- return this .sidecarClient .getChannel ().authority ();
102- }
103-
104105 /**
105106 * Establishes a gRPC connection to the sidecar and starts processing work-items on the current thread.
106107 * This method call blocks indefinitely, or until the current thread is interrupted.
@@ -125,7 +126,7 @@ public void startAndBlock() {
125126 logger );
126127
127128 // TODO: How do we interrupt manually?
128- while (true ) {
129+ while (! this . workerPool . isShutdown () ) {
129130 try {
130131 GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest .newBuilder ().build ();
131132 Iterator <WorkItem > workItemStream = this .sidecarClient .getWorkItems (getWorkItemsRequest );
@@ -211,4 +212,19 @@ public void startAndBlock() {
211212 public void stop () {
212213 this .close ();
213214 }
215+
216+ private void shutDownWorkerPool () {
217+ this .workerPool .shutdown ();
218+ try {
219+ if (!this .workerPool .awaitTermination (60 , TimeUnit .SECONDS )) {
220+ this .workerPool .shutdownNow ();
221+ }
222+ } catch (InterruptedException ex ) {
223+ Thread .currentThread ().interrupt ();
224+ }
225+ }
226+
227+ private String getSidecarAddress () {
228+ return this .sidecarClient .getChannel ().authority ();
229+ }
214230}
0 commit comments