From 633d9ed9895f761046176570474f3a552423dfec Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 29 Jun 2021 15:52:18 +0100 Subject: [PATCH 01/10] Add zeroconf support --- distributed/scheduler.py | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index df7a43510b..352726fcae 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -9,6 +9,7 @@ import operator import os import random +import socket import sys import warnings import weakref @@ -35,6 +36,7 @@ valmap, ) from tornado.ioloop import IOLoop, PeriodicCallback +from zeroconf import IPVersion, ServiceInfo, Zeroconf import dask from dask.highlevelgraph import HighLevelGraph @@ -49,7 +51,11 @@ resolve_address, unparse_host_port, ) -from .comm.addressing import addresses_from_user_args +from .comm.addressing import ( + addresses_from_user_args, + get_address_host_port, + parse_address, +) from .core import CommClosedError, Status, clean_exception, rpc, send_recv from .diagnostics.plugin import SchedulerPlugin from .event import EventExtension @@ -3351,6 +3357,8 @@ def __init__( self._lock = asyncio.Lock() self.bandwidth_workers = defaultdict(float) self.bandwidth_types = defaultdict(float) + self._zeroconf = Zeroconf(ip_version=IPVersion.V4Only) + self._zeroconf_services = [] if not preload: preload = dask.config.get("distributed.scheduler.preload") @@ -3722,6 +3730,21 @@ async def start(self): for listener in self.listeners: logger.info(" Scheduler at: %25s", listener.contact_address) + # Advertise service via mdns service discovery + host, port = get_address_host_port(listener.contact_address) + protocol, _ = parse_address(listener.contact_address) + short_id = self.id.split("-")[1] + info = ServiceInfo( + "_dask._tcp.local.", + f"_sched-{short_id}._dask._tcp.local.", + addresses=[socket.inet_aton(host)], + port=port, + properties={"protocol": protocol}, + server=f"sched-{short_id}.dask.local.", + ) + self._zeroconf_services.append(info) + self._zeroconf.register_service(info) + logger.info(" Advertising as: %25s", info.server) for k, v in self.services.items(): logger.info("%11s at: %25s", k, "%s:%d" % (listen_ip, v.port)) @@ -3786,6 +3809,10 @@ async def close(self, comm=None, fast=False, close_workers=False): self.stop_services() + for info in self._zeroconf_services: + self._zeroconf.unregister_service(info) + self._zeroconf.close() + for ext in parent._extensions.values(): with suppress(AttributeError): ext.teardown() From e5f02f09a8aa9c0f7361b29144a219faa8dfdc88 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 1 Jul 2021 11:07:07 +0100 Subject: [PATCH 02/10] Add optional config --- distributed/distributed-schema.yaml | 111 ++++++++++++++-------------- distributed/distributed.yaml | 2 + distributed/scheduler.py | 51 +++++++------ distributed/tests/test_scheduler.py | 14 ++++ 4 files changed, 100 insertions(+), 78 deletions(-) diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 80f7adce25..3d2238609b 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -2,14 +2,12 @@ properties: distributed: type: object properties: - version: type: integer scheduler: type: object properties: - allowed-failures: type: integer minimum: 0 @@ -22,8 +20,8 @@ properties: bandwidth: type: - - integer - - string + - integer + - string description: | The expected bandwidth between any pair of workers @@ -45,8 +43,8 @@ properties: default-data-size: type: - - string - - integer + - string + - integer description: | The default size of a piece of data if we don't know anything about it. @@ -60,8 +58,8 @@ properties: idle-timeout: type: - - string - - "null" + - string + - "null" description: | Shut down the scheduler after this duration if no activity has occured @@ -108,8 +106,8 @@ properties: worker-ttl: type: - - string - - "null" + - string + - "null" description: | Time to live for workers. @@ -162,6 +160,11 @@ properties: Whether or not to run consistency checks during execution. This is typically only used for debugging. + zeroconf: + type: boolean + description: | + Whether or not to advertise the scheduler via zeroconf. + dashboard: type: object description: | @@ -194,16 +197,16 @@ properties: properties: ca-file: type: - - string - - "null" + - string + - "null" key: type: - - string - - "null" + - string + - "null" cert: type: - - string - - "null" + - string + - "null" bokeh-application: type: object description: | @@ -242,7 +245,6 @@ properties: A list of trusted root modules the schedular is allowed to import (incl. submodules). For security reasons, the scheduler does not import arbitrary Python modules. - worker: type: object description: | @@ -338,8 +340,8 @@ properties: properties: duration: type: - - string - - "null" + - string + - "null" description: | The time after creation to close the worker, like "1 hour" stagger: @@ -359,7 +361,6 @@ properties: description: | Do we try to resurrect the worker after the lifetime deadline? - profile: type: object description: | @@ -446,8 +447,8 @@ properties: target: oneOf: - - {type: number, minimum: 0, maximum: 1} - - {enum: [false]} + - { type: number, minimum: 0, maximum: 1 } + - { enum: [false] } description: >- When the process memory (as observed by the operating system) gets above this amount we start spilling the dask keys holding the largest @@ -455,24 +456,24 @@ properties: spill: oneOf: - - {type: number, minimum: 0, maximum: 1} - - {enum: [false]} + - { type: number, minimum: 0, maximum: 1 } + - { enum: [false] } description: >- When the process memory (as observed by the operating system) gets above this amount we spill all data to disk. pause: oneOf: - - {type: number, minimum: 0, maximum: 1} - - {enum: [false]} + - { type: number, minimum: 0, maximum: 1 } + - { enum: [false] } description: >- When the process memory (as observed by the operating system) gets above this amount we no longer start new tasks on this worker. terminate: oneOf: - - {type: number, minimum: 0, maximum: 1} - - {enum: [false]} + - { type: number, minimum: 0, maximum: 1 } + - { enum: [false] } description: >- When the process memory reaches this level the nanny process will kill the worker (if a nanny is present) @@ -494,7 +495,6 @@ properties: description: | Configuration settings for Dask Nannies properties: - preload: type: array description: | @@ -518,8 +518,7 @@ properties: properties: heartbeat: type: string - description: - This value is the time between heartbeats + description: This value is the time between heartbeats The client sends a periodic heartbeat message to the scheduler. If it misses enough of these then the scheduler assumes that it has gone. @@ -588,13 +587,11 @@ properties: type: object description: Configuration settings for Dask communications properties: - retry: type: object description: | Some operations (such as gathering data) are subject to re-tries with the below parameters properties: - count: type: integer minimum: 0 @@ -620,8 +617,8 @@ properties: offload: type: - - boolean - - string + - boolean + - string description: | The size of message after which we choose to offload serialization to another thread @@ -664,8 +661,8 @@ properties: require-encryption: type: - - boolean - - "null" + - boolean + - "null" description: | Whether to require encryption on non-local comms @@ -683,14 +680,14 @@ properties: properties: ciphers: type: - - string - - "null" + - string + - "null" descsription: Allowed ciphers, specified as an OpenSSL cipher string. ca-file: type: - - string - - "null" + - string + - "null" description: Path to a CA file, in pem format scheduler: @@ -699,13 +696,13 @@ properties: properties: cert: type: - - string - - "null" + - string + - "null" description: Path to certificate file key: type: - - string - - "null" + - string + - "null" description: | Path to key file. @@ -718,13 +715,13 @@ properties: properties: cert: type: - - string - - "null" + - string + - "null" description: Path to certificate file key: type: - - string - - "null" + - string + - "null" description: | Path to key file. @@ -737,13 +734,13 @@ properties: properties: cert: type: - - string - - "null" + - string + - "null" description: Path to certificate file key: type: - - string - - "null" + - string + - "null" description: | Path to key file. @@ -809,7 +806,7 @@ properties: interval: type: string description: The time between ticks, default 20ms - limit : + limit: type: string description: The time allowed before triggering a warning @@ -864,7 +861,7 @@ properties: Configuration options for the RAPIDS Memory Manager. properties: pool-size: - type: [integer, 'null'] + type: [integer, "null"] description: | The size of the memory pool in bytes. ucx: @@ -896,7 +893,7 @@ properties: Set environment variables to enable UCX RDMA connection manager support, requires ``ucx.infiniband=True``. net-devices: - type: [string, 'null'] + type: [string, "null"] description: | Interface(s) used by workers for UCX communication. Can be a string (like ``"eth0"`` for NVLink or ``"mlx5_0:1"``/``"ib0"`` for InfiniBand), ``"auto"`` @@ -906,7 +903,7 @@ properties: and compiled with hwloc support. Unexpected errors can occur when using ``"auto"`` if any interfaces are disconnected or improperly configured. reuse-endpoints: - type: [boolean, 'null'] + type: [boolean, "null"] description: | Enable UCX-Py reuse endpoints mechanism if ``True`` or if it's not specified and UCX < 1.11 is installed, otherwise disable reuse endpoints. This was primarily diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index f948c45fd9..c9a7abee9c 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -30,6 +30,7 @@ distributed: rechunk-split: 1us shuffle-split: 1us validate: False # Check scheduler state at every step for debugging + zeroconf: true dashboard: status: task-stream-length: 1000 @@ -60,6 +61,7 @@ distributed: - dask - distributed + worker: blocked-handlers: [] multiprocessing-method: spawn diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 352726fcae..863a56236c 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -36,7 +36,6 @@ valmap, ) from tornado.ioloop import IOLoop, PeriodicCallback -from zeroconf import IPVersion, ServiceInfo, Zeroconf import dask from dask.highlevelgraph import HighLevelGraph @@ -94,6 +93,12 @@ except ImportError: compiled = False +try: + import zeroconf + from zeroconf.asyncio import AsyncServiceInfo, AsyncZeroconf +except ImportError: + zeroconf = False + if compiled: from cython import ( Py_hash_t, @@ -164,6 +169,7 @@ def nogil(func): LOG_PDB = dask.config.get("distributed.admin.pdb-on-err") +ZEROCONF = dask.config.get("distributed.scheduler.zeroconf") DEFAULT_DATA_SIZE = declare( Py_ssize_t, parse_bytes(dask.config.get("distributed.scheduler.default-data-size")) ) @@ -3357,8 +3363,9 @@ def __init__( self._lock = asyncio.Lock() self.bandwidth_workers = defaultdict(float) self.bandwidth_types = defaultdict(float) - self._zeroconf = Zeroconf(ip_version=IPVersion.V4Only) - self._zeroconf_services = [] + if zeroconf and ZEROCONF: + self._zeroconf = AsyncZeroconf(ip_version=zeroconf.IPVersion.V4Only) + self._zeroconf_services = [] if not preload: preload = dask.config.get("distributed.scheduler.preload") @@ -3730,21 +3737,22 @@ async def start(self): for listener in self.listeners: logger.info(" Scheduler at: %25s", listener.contact_address) - # Advertise service via mdns service discovery - host, port = get_address_host_port(listener.contact_address) - protocol, _ = parse_address(listener.contact_address) - short_id = self.id.split("-")[1] - info = ServiceInfo( - "_dask._tcp.local.", - f"_sched-{short_id}._dask._tcp.local.", - addresses=[socket.inet_aton(host)], - port=port, - properties={"protocol": protocol}, - server=f"sched-{short_id}.dask.local.", - ) - self._zeroconf_services.append(info) - self._zeroconf.register_service(info) - logger.info(" Advertising as: %25s", info.server) + if zeroconf and ZEROCONF: + # Advertise service via mdns service discovery + host, port = get_address_host_port(listener.contact_address) + protocol, _ = parse_address(listener.contact_address) + short_id = self.id.split("-")[1] + info = AsyncServiceInfo( + "_dask._tcp.local.", + f"_sched-{short_id}._dask._tcp.local.", + addresses=[socket.inet_aton(host)], + port=port, + properties={"protocol": protocol}, + server=f"sched-{short_id}.dask.local.", + ) + self._zeroconf_services.append(info) + await self._zeroconf.async_register_service(info) + logger.info(" Advertising as: %25s", info.server) for k, v in self.services.items(): logger.info("%11s at: %25s", k, "%s:%d" % (listen_ip, v.port)) @@ -3809,9 +3817,10 @@ async def close(self, comm=None, fast=False, close_workers=False): self.stop_services() - for info in self._zeroconf_services: - self._zeroconf.unregister_service(info) - self._zeroconf.close() + if zeroconf and ZEROCONF: + for info in self._zeroconf_services: + await self._zeroconf.async_unregister_service(info) + await self._zeroconf.async_close() for ext in parent._extensions.values(): with suppress(AttributeError): diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 418ab7e7f5..ff2dc3a336 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -2800,3 +2800,17 @@ async def test_transition_counter(c, s, a, b): assert s.transition_counter == 0 await c.submit(inc, 1) assert s.transition_counter > 1 + + +@gen_cluster( + config={"distributed.scheduler.zeroconf": True}, +) +async def test_zeroconf(s, *_): + zeroconf = pytest.importorskip("zeroconf") + assert len(s._zeroconf_services) == 1 + async with zeroconf.asyncio.AsyncZeroconf(interfaces=["127.0.0.1"]) as aiozc: + service = s._zeroconf_services[0] + service = await aiozc.async_get_service_info("_dask._tcp.local.", service.name) + [address] = service.parsed_addresses() + assert str(address) in s.address + assert str(service.port) in s.address From f509c96f4e5731827c06fa7e291f6b6ab160dd1b Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 1 Jul 2021 11:08:17 +0100 Subject: [PATCH 03/10] Undo formatting --- distributed/distributed-schema.yaml | 106 +++++++++++++++------------- 1 file changed, 57 insertions(+), 49 deletions(-) diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 3d2238609b..93647fa639 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -2,12 +2,14 @@ properties: distributed: type: object properties: + version: type: integer scheduler: type: object properties: + allowed-failures: type: integer minimum: 0 @@ -20,8 +22,8 @@ properties: bandwidth: type: - - integer - - string + - integer + - string description: | The expected bandwidth between any pair of workers @@ -43,8 +45,8 @@ properties: default-data-size: type: - - string - - integer + - string + - integer description: | The default size of a piece of data if we don't know anything about it. @@ -58,8 +60,8 @@ properties: idle-timeout: type: - - string - - "null" + - string + - "null" description: | Shut down the scheduler after this duration if no activity has occured @@ -106,8 +108,8 @@ properties: worker-ttl: type: - - string - - "null" + - string + - "null" description: | Time to live for workers. @@ -197,16 +199,16 @@ properties: properties: ca-file: type: - - string - - "null" + - string + - "null" key: type: - - string - - "null" + - string + - "null" cert: type: - - string - - "null" + - string + - "null" bokeh-application: type: object description: | @@ -245,6 +247,7 @@ properties: A list of trusted root modules the schedular is allowed to import (incl. submodules). For security reasons, the scheduler does not import arbitrary Python modules. + worker: type: object description: | @@ -340,8 +343,8 @@ properties: properties: duration: type: - - string - - "null" + - string + - "null" description: | The time after creation to close the worker, like "1 hour" stagger: @@ -361,6 +364,7 @@ properties: description: | Do we try to resurrect the worker after the lifetime deadline? + profile: type: object description: | @@ -447,8 +451,8 @@ properties: target: oneOf: - - { type: number, minimum: 0, maximum: 1 } - - { enum: [false] } + - {type: number, minimum: 0, maximum: 1} + - {enum: [false]} description: >- When the process memory (as observed by the operating system) gets above this amount we start spilling the dask keys holding the largest @@ -456,24 +460,24 @@ properties: spill: oneOf: - - { type: number, minimum: 0, maximum: 1 } - - { enum: [false] } + - {type: number, minimum: 0, maximum: 1} + - {enum: [false]} description: >- When the process memory (as observed by the operating system) gets above this amount we spill all data to disk. pause: oneOf: - - { type: number, minimum: 0, maximum: 1 } - - { enum: [false] } + - {type: number, minimum: 0, maximum: 1} + - {enum: [false]} description: >- When the process memory (as observed by the operating system) gets above this amount we no longer start new tasks on this worker. terminate: oneOf: - - { type: number, minimum: 0, maximum: 1 } - - { enum: [false] } + - {type: number, minimum: 0, maximum: 1} + - {enum: [false]} description: >- When the process memory reaches this level the nanny process will kill the worker (if a nanny is present) @@ -495,6 +499,7 @@ properties: description: | Configuration settings for Dask Nannies properties: + preload: type: array description: | @@ -518,7 +523,8 @@ properties: properties: heartbeat: type: string - description: This value is the time between heartbeats + description: + This value is the time between heartbeats The client sends a periodic heartbeat message to the scheduler. If it misses enough of these then the scheduler assumes that it has gone. @@ -587,11 +593,13 @@ properties: type: object description: Configuration settings for Dask communications properties: + retry: type: object description: | Some operations (such as gathering data) are subject to re-tries with the below parameters properties: + count: type: integer minimum: 0 @@ -617,8 +625,8 @@ properties: offload: type: - - boolean - - string + - boolean + - string description: | The size of message after which we choose to offload serialization to another thread @@ -661,8 +669,8 @@ properties: require-encryption: type: - - boolean - - "null" + - boolean + - "null" description: | Whether to require encryption on non-local comms @@ -680,14 +688,14 @@ properties: properties: ciphers: type: - - string - - "null" + - string + - "null" descsription: Allowed ciphers, specified as an OpenSSL cipher string. ca-file: type: - - string - - "null" + - string + - "null" description: Path to a CA file, in pem format scheduler: @@ -696,13 +704,13 @@ properties: properties: cert: type: - - string - - "null" + - string + - "null" description: Path to certificate file key: type: - - string - - "null" + - string + - "null" description: | Path to key file. @@ -715,13 +723,13 @@ properties: properties: cert: type: - - string - - "null" + - string + - "null" description: Path to certificate file key: type: - - string - - "null" + - string + - "null" description: | Path to key file. @@ -734,13 +742,13 @@ properties: properties: cert: type: - - string - - "null" + - string + - "null" description: Path to certificate file key: type: - - string - - "null" + - string + - "null" description: | Path to key file. @@ -806,7 +814,7 @@ properties: interval: type: string description: The time between ticks, default 20ms - limit: + limit : type: string description: The time allowed before triggering a warning @@ -861,7 +869,7 @@ properties: Configuration options for the RAPIDS Memory Manager. properties: pool-size: - type: [integer, "null"] + type: [integer, 'null'] description: | The size of the memory pool in bytes. ucx: @@ -893,7 +901,7 @@ properties: Set environment variables to enable UCX RDMA connection manager support, requires ``ucx.infiniband=True``. net-devices: - type: [string, "null"] + type: [string, 'null'] description: | Interface(s) used by workers for UCX communication. Can be a string (like ``"eth0"`` for NVLink or ``"mlx5_0:1"``/``"ib0"`` for InfiniBand), ``"auto"`` @@ -903,7 +911,7 @@ properties: and compiled with hwloc support. Unexpected errors can occur when using ``"auto"`` if any interfaces are disconnected or improperly configured. reuse-endpoints: - type: [boolean, "null"] + type: [boolean, 'null'] description: | Enable UCX-Py reuse endpoints mechanism if ``True`` or if it's not specified and UCX < 1.11 is installed, otherwise disable reuse endpoints. This was primarily From ea161dc4dd716e16dffcfe8cbc076d32481852dc Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 1 Jul 2021 11:08:56 +0100 Subject: [PATCH 04/10] Remove newline --- distributed/distributed.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index c9a7abee9c..e516766f89 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -61,7 +61,6 @@ distributed: - dask - distributed - worker: blocked-handlers: [] multiprocessing-method: spawn From bdd4038524289a0c7e3353f56d6539a07e1a7bb1 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 12 Jul 2021 10:29:43 +0100 Subject: [PATCH 05/10] Run zeroconf in CI --- continuous_integration/environment-3.9.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/continuous_integration/environment-3.9.yaml b/continuous_integration/environment-3.9.yaml index 0dcd972db5..fd5f4ce52c 100644 --- a/continuous_integration/environment-3.9.yaml +++ b/continuous_integration/environment-3.9.yaml @@ -42,6 +42,7 @@ dependencies: - toolz - torchvision # Only tested here - tornado=6 + - zeroconf # Only tested here - zict # overridden by git tip below - zstandard - pip: From 09ea5f98d84366aa6fcd0319f9e0c4112161ef35 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 12 Jul 2021 10:30:00 +0100 Subject: [PATCH 06/10] Remove zeroconf config constant --- distributed/scheduler.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 863a56236c..ebc8bb2bfd 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -169,7 +169,6 @@ def nogil(func): LOG_PDB = dask.config.get("distributed.admin.pdb-on-err") -ZEROCONF = dask.config.get("distributed.scheduler.zeroconf") DEFAULT_DATA_SIZE = declare( Py_ssize_t, parse_bytes(dask.config.get("distributed.scheduler.default-data-size")) ) @@ -3363,7 +3362,7 @@ def __init__( self._lock = asyncio.Lock() self.bandwidth_workers = defaultdict(float) self.bandwidth_types = defaultdict(float) - if zeroconf and ZEROCONF: + if zeroconf and dask.config.get("distributed.scheduler.zeroconf"): self._zeroconf = AsyncZeroconf(ip_version=zeroconf.IPVersion.V4Only) self._zeroconf_services = [] @@ -3737,7 +3736,7 @@ async def start(self): for listener in self.listeners: logger.info(" Scheduler at: %25s", listener.contact_address) - if zeroconf and ZEROCONF: + if zeroconf and dask.config.get("distributed.scheduler.zeroconf"): # Advertise service via mdns service discovery host, port = get_address_host_port(listener.contact_address) protocol, _ = parse_address(listener.contact_address) @@ -3817,7 +3816,7 @@ async def close(self, comm=None, fast=False, close_workers=False): self.stop_services() - if zeroconf and ZEROCONF: + if zeroconf and dask.config.get("distributed.scheduler.zeroconf"): for info in self._zeroconf_services: await self._zeroconf.async_unregister_service(info) await self._zeroconf.async_close() From f7ed37920e00e1e43de46bea9457595116c45bd1 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 19 Jul 2021 13:43:40 +0100 Subject: [PATCH 07/10] Skip zeroconf for non IP based addresses (like inproc) --- distributed/scheduler.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 455cc01a0f..cb35e305e0 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3798,7 +3798,11 @@ async def start(self): logger.info(" Scheduler at: %25s", listener.contact_address) if zeroconf and dask.config.get("distributed.scheduler.zeroconf"): # Advertise service via mdns service discovery - host, port = get_address_host_port(listener.contact_address) + try: + host, port = get_address_host_port(listener.contact_address) + except NotImplementedError: + # If address is not IP based continue + continue protocol, _ = parse_address(listener.contact_address) short_id = self.id.split("-")[1] info = AsyncServiceInfo( From d6c2b9b053b3748ea21d34a692723ed28ffc8439 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 25 Aug 2021 11:26:38 +0100 Subject: [PATCH 08/10] Disable zeroconf on inproc comms --- distributed/scheduler.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 8ca62308ff..bbe398b97f 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3943,7 +3943,11 @@ async def start(self): for listener in self.listeners: logger.info(" Scheduler at: %25s", listener.contact_address) - if zeroconf and dask.config.get("distributed.scheduler.zeroconf"): + if ( + zeroconf + and dask.config.get("distributed.scheduler.zeroconf") + and not self.address.startswith("inproc://") + ): # Advertise service via mdns service discovery try: host, port = get_address_host_port(listener.contact_address) From 79f78a36760dd4ec95ddb1f69992f5f6ac73ca44 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 25 Aug 2021 12:06:09 +0100 Subject: [PATCH 09/10] Fix dangling asyncio tasks --- distributed/scheduler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index bbe398b97f..b1d033f182 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3568,7 +3568,9 @@ def __init__( self.bandwidth_types = defaultdict(float) if zeroconf and dask.config.get("distributed.scheduler.zeroconf"): self._zeroconf = AsyncZeroconf(ip_version=zeroconf.IPVersion.V4Only) - self._zeroconf_services = [] + else: + self._zeroconf = None + self._zeroconf_services = [] if not preload: preload = dask.config.get("distributed.scheduler.preload") @@ -4034,9 +4036,7 @@ async def close(self, comm=None, fast=False, close_workers=False): self.stop_services() - if zeroconf and dask.config.get("distributed.scheduler.zeroconf"): - for info in self._zeroconf_services: - await self._zeroconf.async_unregister_service(info) + if self._zeroconf: await self._zeroconf.async_close() for ext in parent._extensions.values(): From 3a43d38ecde070d57c3e1ce82cedae578c81e477 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 25 Aug 2021 14:24:29 +0100 Subject: [PATCH 10/10] If zeroconf is still registering at close cancel the tasks --- distributed/scheduler.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index b1d033f182..1e4eb682dd 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3571,6 +3571,7 @@ def __init__( else: self._zeroconf = None self._zeroconf_services = [] + self._zeroconf_registration_tasks = [] if not preload: preload = dask.config.get("distributed.scheduler.preload") @@ -3967,7 +3968,9 @@ async def start(self): server=f"sched-{short_id}.dask.local.", ) self._zeroconf_services.append(info) - await self._zeroconf.async_register_service(info) + self._zeroconf_registration_tasks.append( + await self._zeroconf.async_register_service(info) + ) logger.info(" Advertising as: %25s", info.server) for k, v in self.services.items(): logger.info("%11s at: %25s", k, "%s:%d" % (listen_ip, v.port)) @@ -4038,6 +4041,9 @@ async def close(self, comm=None, fast=False, close_workers=False): if self._zeroconf: await self._zeroconf.async_close() + for task in self._zeroconf_registration_tasks: + with suppress(asyncio.CancelledError): + task.cancel() for ext in parent._extensions.values(): with suppress(AttributeError):