From 99c58c85169ba9ae584cb24d8a3cb4ce68dbefbe Mon Sep 17 00:00:00 2001 From: Eugene Nesvetaev Date: Mon, 29 Jun 2026 00:52:11 +0400 Subject: [PATCH 1/2] Stale child cleanup and non-blocking KVM WebSocket connect Spawn connect work via asyncio.create_task so the WebSocket receive loop can process control frames during long child startup (tornado#2941). Schedule log writes on the IOLoop; cancel in-flight connect on close. Remove explicit websocket_ping_interval. Add connect unittest. --- kvm_handler.py | 280 ++++++++++++++++++------------ main.py | 28 ++- tests/__init__.py | 0 tests/test_kvm_handler_connect.py | 83 +++++++++ 4 files changed, 278 insertions(+), 113 deletions(-) create mode 100644 tests/__init__.py create mode 100644 tests/test_kvm_handler_connect.py diff --git a/kvm_handler.py b/kvm_handler.py index b15272e..0ea4369 100644 --- a/kvm_handler.py +++ b/kvm_handler.py @@ -1,10 +1,16 @@ -import json, logging, os +import asyncio +import json +import logging +import os try: - from typing import List + from typing import List, Optional except ImportError: pass +from tornado import ioloop +from tornado.websocket import WebSocketClosedError + from nojava_ipmi_kvm.kvm import ( start_kvm_container, WebserverNotReachableError, @@ -40,137 +46,189 @@ def open(self): self._current_user = self.get_current_user() self._connecting = False self._is_closed = False + self._connect_task = None if self._current_user is None or not self._current_user["is_admin"]: return self.close(code=401, reason="Unauthorized") logging.info("Websocket opened by %s", self._current_user["name"]) + def _safe_write(self, message): + if self._is_closed: + return False + try: + self.write_message(message) + return True + except WebSocketClosedError: + self._is_closed = True + return False + + def _schedule_write(self, message): + if self._is_closed: + return + ioloop.IOLoop.current().add_callback(self._safe_write, message) + + def _send_log(self, msg, *args, **kwargs): + self._schedule_write({"action": "log", "message": msg if len(args) == 0 else msg % args}) + + def _release_web_port(self, web_port): + if web_port in used_ports: + used_ports.remove(web_port) + async def on_message(self, msg): logging.info("Websocket from %s said %s", self._current_user["name"], msg) try: msg = json.loads(msg) except json.decoder.JSONDecodeError: - return self.write_message({"action": "notice", "message": "Invalid json received"}) + return self._safe_write({"action": "notice", "message": "Invalid json received"}) if "action" in msg: if msg["action"] == "connect": - if self._connecting: - return self.write_message({"action": "notice", "message": "Already connected to a kvm!"}) - self._connecting = True - - server = msg["server"] - password = msg["password"] - resolution = msg["resolution"] if "resolution" in msg else None - logging.info("%s wants to connect to %s with res %s", self._current_user["name"], server, resolution) - - if server not in config.get_servers(): - return self.write_message( - {"action": "notice", "message": "The specified hostname is not valid.", "refresh": True} - ) - host_config = config[server] - - web_port = 1 - for p in range(WEB_PORT_START, WEB_PORT_END): - if p not in used_ports: - self._web_port = p - web_port = p - used_ports.append(p) - break - else: - return self.write_message( - { - "action": "notice", - "message": "No unused port available. Please notify admins.", - "refresh": True, - } - ) - - def send_log_message(msg, *args, **kwargs): - if self._is_closed: - return - self.write_message({"action": "log", "message": msg if len(args) == 0 else msg % args}) - - try: - authorization_key = None - authorization_value = None - if isinstance(host_config, HTML5HostConfig): - if HTML5_AUTHORIZATION == "generate": - authorization_key = "kvm_auth_" + self._web_port - authorization_value = utils.generate_temp_password(20) - elif HTML5_AUTHORIZATION == "use_server": - authorization_key = "is_admin" - authorization_value = self.get_cookie("is_admin") - elif ":" in HTML5_AUTHORIZATION: - authorization_key = HTML5_AUTHORIZATION.split(":")[0] - authorization_value = HTML5_AUTHORIZATION.split(":", 1)[1] - - sess = self._current_session = await start_kvm_container( - host_config=host_config, - login_password=password, - external_vnc_dns=external_web_dns, - docker_port=self._web_port, - additional_logging=send_log_message, - selected_resolution=resolution, - authorization_key=authorization_key, - authorization_value=authorization_value, - subdir=HTML5_SUBDIR_FORMAT.format( - external_web_dns=external_web_dns, port=self._web_port, hostname=host_config.full_hostname - ), - ) - except ( - WebserverNotReachableError, - DockerNotInstalledError, - DockerNotCallableError, - IOError, - DockerTerminatedError, - DockerPortNotReadableError, - ) as ex: - logging.exception("Could not start KVM container") - used_ports.remove(web_port) - return self.write_message({"action": "error", "message": str(ex)}) - - if isinstance(sess, HTML5KvmViewer): - return self.write_message( - { - "action": "connected", - "url": HTML5_IFRAME_PATH_FORMAT.format( - url=sess.url, - external_web_dns=external_web_dns, - port=sess.web_port, - subdir=sess.subdir, - authorization_key=sess.authorization_key, - authorization_value=sess.authorization_value, - html5_endpoint=sess.html5_endpoint, - ), - "authorization_key": sess.authorization_key, - "authorization_value": sess.authorization_value, - } - ) - else: - return self.write_message( - { - "action": "connected", - "url": JAVA_IFRAME_PATH_FORMAT.format( - url=sess.url, - external_web_dns=external_web_dns, - port=sess.web_port, - password=sess.vnc_password, - ), - } - ) - - self.write_message( + if self._connecting or ( + self._connect_task is not None and not self._connect_task.done() + ): + return self._safe_write({"action": "notice", "message": "Already connected to a kvm!"}) + self._connect_task = asyncio.create_task(self._handle_connect(msg)) + return + + self._safe_write( {"action": "notice", "message": "Invalid msg received", "source": msg, "user": self.get_current_user()} ) + async def _handle_connect(self, msg): + web_port = None + self._connecting = True + try: + server = msg["server"] + password = msg["password"] + resolution = msg["resolution"] if "resolution" in msg else None + logging.info("%s wants to connect to %s with res %s", self._current_user["name"], server, resolution) + + if server not in config.get_servers(): + return self._safe_write( + {"action": "notice", "message": "The specified hostname is not valid.", "refresh": True} + ) + host_config = config[server] + + for p in range(WEB_PORT_START, WEB_PORT_END): + if p not in used_ports: + self._web_port = p + web_port = p + used_ports.append(p) + break + else: + return self._safe_write( + { + "action": "notice", + "message": "No unused port available. Please notify admins.", + "refresh": True, + } + ) + + authorization_key = None + authorization_value = None + if isinstance(host_config, HTML5HostConfig): + if HTML5_AUTHORIZATION == "generate": + authorization_key = "kvm_auth_" + str(self._web_port) + authorization_value = utils.generate_temp_password(20) + elif HTML5_AUTHORIZATION == "use_server": + authorization_key = "is_admin" + authorization_value = self.get_cookie("is_admin") + elif ":" in HTML5_AUTHORIZATION: + authorization_key = HTML5_AUTHORIZATION.split(":")[0] + authorization_value = HTML5_AUTHORIZATION.split(":", 1)[1] + + sess = self._current_session = await start_kvm_container( + host_config=host_config, + login_password=password, + external_vnc_dns=external_web_dns, + docker_port=self._web_port, + additional_logging=self._send_log, + selected_resolution=resolution, + authorization_key=authorization_key, + authorization_value=authorization_value, + subdir=HTML5_SUBDIR_FORMAT.format( + external_web_dns=external_web_dns, port=self._web_port, hostname=host_config.full_hostname + ), + ) + except asyncio.CancelledError: + if self._current_session is not None: + self._release_web_port(self._web_port) + self._current_session.kill_process() + self._current_session = None + elif web_port is not None: + self._release_web_port(web_port) + raise + except ( + WebserverNotReachableError, + DockerNotInstalledError, + DockerNotCallableError, + IOError, + DockerTerminatedError, + DockerPortNotReadableError, + ) as ex: + logging.exception("Could not start KVM container") + if web_port is not None: + self._release_web_port(web_port) + return self._safe_write({"action": "error", "message": str(ex)}) + finally: + self._connecting = False + + if self._is_closed: + logging.warning("WebSocket closed before KVM connect completed; skipping connected message") + return + + if isinstance(sess, HTML5KvmViewer): + if not self._safe_write( + { + "action": "connected", + "url": HTML5_IFRAME_PATH_FORMAT.format( + url=sess.url, + external_web_dns=external_web_dns, + port=sess.web_port, + subdir=sess.subdir, + authorization_key=sess.authorization_key, + authorization_value=sess.authorization_value, + html5_endpoint=sess.html5_endpoint, + ), + "authorization_key": sess.authorization_key, + "authorization_value": sess.authorization_value, + } + ): + logging.info("WebSocket closed while sending connected message") + return + + if not self._safe_write( + { + "action": "connected", + "url": JAVA_IFRAME_PATH_FORMAT.format( + url=sess.url, + external_web_dns=external_web_dns, + port=sess.web_port, + password=sess.vnc_password, + ), + } + ): + logging.info("WebSocket closed while sending connected message") + def on_close(self): - logging.info("WS from %s closed", None if self._current_user is None else self._current_user["name"]) + logging.info( + "WS from %s closed code=%s reason=%r", + None if self._current_user is None else self._current_user["name"], + self.close_code, + self.close_reason, + ) self._is_closed = True + if self._connect_task is not None and not self._connect_task.done(): + self._connect_task.cancel() + self._connecting = False if self._current_session is not None: - used_ports.remove(self._web_port) + self._release_web_port(self._web_port) self._current_session.kill_process() + self._current_session = None + elif self._web_port and self._web_port in used_ports: + self._release_web_port(self._web_port) __all__ = ["KVMHandler"] diff --git a/main.py b/main.py index 5c3ea7e..ea3219e 100644 --- a/main.py +++ b/main.py @@ -3,14 +3,17 @@ __version__ = "0.2.2" __author__ = "M. Heuwes " -import os +import atexit import json import logging +import os +import signal from tornado.web import authenticated from tornado import web, ioloop from nojava_ipmi_kvm.config import config, DEFAULT_CONFIG_FILEPATH +from nojava_ipmi_kvm.stale_children import cleanup_stale_kvm_children from nojava_ipmi_kvm import utils from login_handler import OAuth2LoginHandler @@ -26,6 +29,28 @@ config.read_config(CONFIG_PATH) +def _stale_port_range(): + return ( + int(os.environ.get("WEB_PORT_START", 8800)), + int(os.environ.get("WEB_PORT_END", 8900)), + ) + + +def _cleanup_stale_children(): + port_start, port_end = _stale_port_range() + cleanup_stale_kvm_children(port_start, port_end) + + +def _shutdown_handler(signum, frame): + _cleanup_stale_children() + ioloop.IOLoop.current().stop() + + +_cleanup_stale_children() +signal.signal(signal.SIGTERM, _shutdown_handler) +atexit.register(_cleanup_stale_children) + + class MainHandler(BaseHandler): @authenticated @authorized @@ -67,7 +92,6 @@ def make_app(): "login_url": "/oauth/login", "xsrf_cookies": True, "default_handler_class": MainHandler, - "websocket_ping_interval": 10, } return web.Application( [web.url(r"/oauth/login", OAuth2LoginHandler), web.url(r"/", MainHandler), web.url(r"/kvm", KVMHandler)], diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_kvm_handler_connect.py b/tests/test_kvm_handler_connect.py new file mode 100644 index 0000000..9a9a8d2 --- /dev/null +++ b/tests/test_kvm_handler_connect.py @@ -0,0 +1,83 @@ +import asyncio +import json +import unittest +from unittest import mock + +import kvm_handler +from kvm_handler import KVMHandler, used_ports +from nojava_ipmi_kvm.kvm import JavaKvmViewer + + +class _FakeHostConfig: + full_hostname = "bmc.example" + skip_login = False + + +class ConnectSpawnsTaskTest(unittest.IsolatedAsyncioTestCase): + def setUp(self): + used_ports.clear() + + def _make_handler(self): + handler = KVMHandler.__new__(KVMHandler) + handler._current_user = {"name": "tester", "is_admin": True} + handler._connecting = False + handler._is_closed = False + handler._connect_task = None + handler._current_session = None + handler._web_port = 0 + handler.write_message = mock.Mock() + handler.get_cookie = mock.Mock(return_value=None) + return handler + + async def test_on_message_returns_before_connect_finishes(self): + connect_started = asyncio.Event() + allow_finish = asyncio.Event() + + async def slow_start(*args, **kwargs): + connect_started.set() + await allow_finish.wait() + return JavaKvmViewer( + "http://localhost:8800/vnc.html", + "localhost", + 8800, + lambda: None, + "secret", + ) + + handler = self._make_handler() + host_config = _FakeHostConfig() + + with mock.patch.object(kvm_handler, "start_kvm_container", slow_start): + with mock.patch.object(kvm_handler.config, "get_servers", return_value=["host1"]): + with mock.patch.object(kvm_handler.config, "__getitem__", return_value=host_config): + with mock.patch.object(kvm_handler, "WEB_PORT_START", 8800): + with mock.patch.object(kvm_handler, "WEB_PORT_END", 8802): + await handler.on_message( + json.dumps( + { + "action": "connect", + "server": "host1", + "password": "secret", + "resolution": "1280x960", + } + ) + ) + + self.assertIsNotNone(handler._connect_task) + await asyncio.wait_for(connect_started.wait(), timeout=1) + self.assertFalse(handler._connect_task.done()) + + allow_finish.set() + await handler._connect_task + + handler.write_message.assert_called() + connected_calls = [ + call + for call in handler.write_message.call_args_list + if call.args and call.args[0].get("action") == "connected" + ] + self.assertEqual(len(connected_calls), 1) + + +if __name__ == "__main__": + unittest.main() From 7f1f31769ab1ef5c52f8a8316de2a6401cee9b0a Mon Sep 17 00:00:00 2001 From: Eugene Nesvetaev Date: Mon, 29 Jun 2026 02:50:22 +0400 Subject: [PATCH 2/2] Fix connect unittest config mock --- tests/test_kvm_handler_connect.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/test_kvm_handler_connect.py b/tests/test_kvm_handler_connect.py index 9a9a8d2..f7b437d 100644 --- a/tests/test_kvm_handler_connect.py +++ b/tests/test_kvm_handler_connect.py @@ -46,10 +46,12 @@ async def slow_start(*args, **kwargs): handler = self._make_handler() host_config = _FakeHostConfig() + mock_config = mock.Mock() + mock_config.get_servers.return_value = ["host1"] + mock_config.__getitem__ = mock.Mock(return_value=host_config) with mock.patch.object(kvm_handler, "start_kvm_container", slow_start): - with mock.patch.object(kvm_handler.config, "get_servers", return_value=["host1"]): - with mock.patch.object(kvm_handler.config, "__getitem__", return_value=host_config): + with mock.patch.object(kvm_handler, "config", mock_config): with mock.patch.object(kvm_handler, "WEB_PORT_START", 8800): with mock.patch.object(kvm_handler, "WEB_PORT_END", 8802): await handler.on_message(