Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions transfer_queue/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,36 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig:
else:
raise RuntimeError(f"Failed to kill existing mooncake_master processes (exit code: {result}).")

raw_address = conf.backend.MooncakeStore.metadata_server
if "://" not in raw_address:
raw_address = "//" + raw_address
# process metadata_server
metadata_server_raw_address = conf.backend.MooncakeStore.metadata_server
if "://" not in metadata_server_raw_address:
metadata_server_raw_address = "//" + metadata_server_raw_address

parsed = urlparse(raw_address)
metadata_server_parsed = urlparse(metadata_server_raw_address)

if not parsed.hostname or parsed.port is None:
if not metadata_server_parsed.hostname or metadata_server_parsed.port is None:
raise ValueError(
f"Invalid metadata_server '{conf.backend.MooncakeStore.metadata_server}'. "
f"Host and port are required (e.g., host:port)."
)

metadata_server_host = parsed.hostname
metadata_server_port = str(parsed.port)
metadata_server_host = metadata_server_parsed.hostname
metadata_server_port = str(metadata_server_parsed.port)

# process master_server
master_server_raw_address = conf.backend.MooncakeStore.master_server_address
if "://" not in master_server_raw_address:
master_server_raw_address = "//" + master_server_raw_address

master_server_parsed = urlparse(master_server_raw_address)

if not master_server_parsed.hostname or master_server_parsed.port is None:
raise ValueError(
f"Invalid master_server_address '{conf.backend.MooncakeStore.master_server_address}'. "
f"Host and port are required (e.g., host:port)."
)

master_server_port = str(master_server_parsed.port)

cmd = [
"mooncake_master",
Expand All @@ -136,6 +152,7 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig:
"--allow_evict_soft_pinned_objects=false",
f"--http_metadata_server_host={metadata_server_host}",
f"--http_metadata_server_port={metadata_server_port}",
f"--rpc_port={master_server_port}",
]

log_file_path = "/tmp/mooncake_master.log"
Expand Down
Loading