Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 69 additions & 25 deletions qubes/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,45 @@ def validate_kernel(obj, property_name: str, kernel: str) -> None:
)


def get_qube_prop_deps(
qube,
system_properties: list | None = None,
qube_properties: list | None = None,
):
if not system_properties:
system_properties = []
if not qube_properties:
qube_properties = []
system_deps: list = []
qube_deps: list = []
for obj in itertools.chain(qube.app.domains, (qube.app,)):
if obj is qube:
continue
for prop in obj.property_list():
if not (
isinstance(prop, qubes.vm.VMProperty)
and getattr(obj, prop.__name__, None) == qube
):
continue
if getattr(obj, "is_preload", False) and (
prop.__name__ == "template"
or (
prop.__name__ in ["default_dispvm", "management_dispvm"]
and getattr(obj, "template", None) == qube
)
):
continue
if isinstance(obj, qubes.app.Qubes):
if system_properties and prop.__name__ not in system_properties:
continue
system_deps.append(prop.__name__)
elif not obj.property_is_default(prop):
if qube_properties and prop.__name__ not in qube_properties:
continue
qube_deps.append((obj.name, prop.__name__))
return system_deps, qube_deps


class Qubes(qubes.PropertyHolder):
"""Main Qubes application

Expand Down Expand Up @@ -890,16 +929,20 @@ class Qubes(qubes.PropertyHolder):
"default_dispvm",
load_stage=3,
default=None,
doc="Default DispVM base for service calls",
setter=qubes.vm.setter_disposable_template,
allow_none=True,
doc="""Default disposable template to be used for spawning disposable
qubes for service calls in this system.""",
)

management_dispvm = qubes.VMProperty(
"management_dispvm",
load_stage=3,
default=None,
doc="Default DispVM base for managing VMs",
allow_none=True,
setter=qubes.vm.setter_disposable_template,
doc="""Default disposable template to be used for spawning disposable
qubes for managing a qube in this system.""",
)

default_pool = qubes.property(
Expand Down Expand Up @@ -1046,7 +1089,10 @@ def store(self):
return self._store

def _migrate_global_properties(self):
"""Migrate renamed/dropped properties"""
"""Migrate renamed/dropped properties or properties that had weak or no
setter to a stricter setter, that would make current value invalid,
requiring setting it to None, to avoid failure to load XML and qubesd.
"""
if self.xml is None:
return

Expand Down Expand Up @@ -1091,6 +1137,21 @@ def _migrate_global_properties(self):
pass
node_default_fw_netvm.getparent().remove(node_default_fw_netvm)

# Drop invalid setting.
for prop in ["default_dispvm", "management_dispvm"]:
for obj in [self] + list(self.domains):
if obj.xml is None:
continue
node_dispvm = obj.xml.find(
f"./properties/property[@name='{prop}']"
)
if node_dispvm is None or node_dispvm.text is None:
continue
dispvm = self.domains[node_dispvm.text]
if getattr(dispvm, "template_for_dispvms", None):
continue
node_dispvm.text = ""

def _migrate_labels(self):
"""Migrate changed labels"""
if self.xml is None:
Expand Down Expand Up @@ -1649,28 +1710,11 @@ def on_domain_pre_deleted(self, event, vm):
"""
# pylint: disable=unused-argument
dependencies = []
for obj in itertools.chain(self.domains, (self,)):
if obj is vm:
# allow removed VM to reference itself
continue
for prop in obj.property_list():
with suppress(AttributeError):
if (
isinstance(prop, qubes.vm.VMProperty)
and getattr(obj, prop.__name__) == vm
):
if getattr(obj, "is_preload", False) and (
prop.__name__ == "template"
or (
prop.__name__ == "default_dispvm"
and getattr(obj, "template", None) == vm
)
):
continue
if isinstance(obj, qubes.app.Qubes):
dependencies.insert(0, ("@GLOBAL", prop.__name__))
elif not obj.property_is_default(prop):
dependencies.append((obj.name, prop.__name__))
system_deps, qube_deps = get_qube_prop_deps(qube=vm)
if system_deps:
dependencies = [("@GLOBAL", dep) for dep in system_deps]
if qube_deps:
dependencies.extend(qube_deps)
if dependencies:
self.log.error(
"Cannot remove %s as it is used by %s",
Expand Down
195 changes: 188 additions & 7 deletions qubes/tests/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,169 @@ def test_100_property_migrate_default_fw_netvm(self):
app.close()
del app

def test_101_property_migrate_label(self):
def test_101_property_migrate_disposable_template(self):
xml_template = """<?xml version="1.0" encoding="utf-8" ?>
<qubes version="3.0">
<properties>
<property name="default_template"></property>
<property name="default_dispvm">{app_default_dispvm}</property>
<property name="management_dispvm">{app_management_dispvm}</property>
</properties>
<labels>
<label id="label-1" color="#cc0000">red</label>
</labels>
<pools>
<pool driver="file" dir_path="/tmp/qubes-test" name="default"/>
</pools>
<domains>

<domain class="AdminVM" id="domain-0">
<properties>
<property name="default_dispvm">{adminvm_default_dispvm}</property>
</properties>
</domain>

<domain class="StandaloneVM" id="domain-1">
<properties>
<property name="qid">1</property>
<property name="name">work</property>
<property name="label" ref="label-1" />
<property name="uuid">2fcfc1f4-b2fe-4361-931a-c5294b35edfa</property>
<property name="management_dispvm">{work_management_dispvm}</property>
</properties>
<features/>
<devices class="pci"/>
</domain>

<domain class="StandaloneVM" id="domain-2">
<properties>
<property name="qid">3</property>
<property name="name">disp-template</property>
<property name="label" ref="label-1" />
<property name="uuid">2ccfc1f4-b2fe-4361-931a-c5294b35edfa</property>
<property name="template_for_dispvms">True</property>
</properties>
</domain>

</domains>
</qubes>
"""

with self.subTest("default"):
with open("/tmp/qubestest.xml", "w", encoding="ascii") as xml_file:
xml_file.write(
xml_template.format(
app_default_dispvm="disp-template",
app_management_dispvm="disp-template",
adminvm_default_dispvm="disp-template",
work_management_dispvm="disp-template",
)
)
app = qubes.Qubes("/tmp/qubestest.xml", offline_mode=True)
adminvm = app.domains["dom0"]
work = app.domains["work"]
disp_template = app.domains["disp-template"]

self.assertFalse(app.property_is_default("default_dispvm"))
self.assertEqual(app.default_dispvm, disp_template)
self.assertFalse(app.property_is_default("management_dispvm"))
self.assertEqual(app.management_dispvm, disp_template)

self.assertFalse(adminvm.property_is_default("default_dispvm"))
self.assertEqual(adminvm.default_dispvm, disp_template)

self.assertTrue(work.property_is_default("default_dispvm"))
self.assertEqual(work.default_dispvm, disp_template)
self.assertFalse(work.property_is_default("management_dispvm"))
self.assertEqual(work.management_dispvm, disp_template)

self.assertTrue(disp_template.property_is_default("default_dispvm"))
self.assertEqual(disp_template.default_dispvm, disp_template)
self.assertTrue(
disp_template.property_is_default("management_dispvm")
)
self.assertEqual(disp_template.management_dispvm, disp_template)

app.close()
del app

with self.subTest("invalid-system"):
with open("/tmp/qubestest.xml", "w", encoding="ascii") as xml_file:
xml_file.write(
xml_template.format(
app_default_dispvm="work",
app_management_dispvm="work",
adminvm_default_dispvm="disp-template",
work_management_dispvm="disp-template",
)
)
app = qubes.Qubes("/tmp/qubestest.xml", offline_mode=True)
adminvm = app.domains["dom0"]
work = app.domains["work"]
disp_template = app.domains["disp-template"]

self.assertFalse(app.property_is_default("default_dispvm"))
self.assertEqual(app.default_dispvm, None)
self.assertFalse(app.property_is_default("management_dispvm"))
self.assertEqual(app.management_dispvm, None)

self.assertFalse(adminvm.property_is_default("default_dispvm"))
self.assertEqual(adminvm.default_dispvm, disp_template)

self.assertTrue(work.property_is_default("default_dispvm"))
self.assertEqual(work.default_dispvm, None)
self.assertFalse(work.property_is_default("management_dispvm"))
self.assertEqual(work.management_dispvm, disp_template)

self.assertTrue(disp_template.property_is_default("default_dispvm"))
self.assertEqual(disp_template.default_dispvm, None)
self.assertTrue(
disp_template.property_is_default("management_dispvm")
)
self.assertEqual(disp_template.management_dispvm, None)

app.close()
del app

with self.subTest("invalid-per-qube"):
with open("/tmp/qubestest.xml", "w", encoding="ascii") as xml_file:
xml_file.write(
xml_template.format(
app_default_dispvm="disp-template",
app_management_dispvm="disp-template",
adminvm_default_dispvm="work",
work_management_dispvm="work",
)
)
app = qubes.Qubes("/tmp/qubestest.xml", offline_mode=True)
adminvm = app.domains["dom0"]
work = app.domains["work"]
disp_template = app.domains["disp-template"]

self.assertFalse(app.property_is_default("default_dispvm"))
self.assertEqual(app.default_dispvm, disp_template)
self.assertFalse(app.property_is_default("management_dispvm"))
self.assertEqual(app.management_dispvm, disp_template)

self.assertFalse(adminvm.property_is_default("default_dispvm"))
self.assertEqual(adminvm.default_dispvm, None)

self.assertTrue(work.property_is_default("default_dispvm"))
self.assertEqual(work.default_dispvm, disp_template)
self.assertFalse(work.property_is_default("management_dispvm"))
self.assertEqual(work.management_dispvm, None)

self.assertTrue(disp_template.property_is_default("default_dispvm"))
self.assertEqual(disp_template.default_dispvm, disp_template)
self.assertTrue(
disp_template.property_is_default("management_dispvm")
)
self.assertEqual(disp_template.management_dispvm, disp_template)

app.close()
del app

def test_102_property_migrate_label(self):
xml_template = """<?xml version="1.0" encoding="utf-8" ?>
<qubes version="3.0">
<labels>
Expand Down Expand Up @@ -1053,27 +1215,32 @@ def test_203_remove_default_dispvm(self):
appvm = self.app.add_new_vm(
"AppVM", name="test-appvm", template=self.template, label="red"
)
appvm.template_for_dispvms = True
self.app.default_dispvm = appvm
with mock.patch.object(self.app, "vmm"):
with self.assertRaises(qubes.exc.QubesVMInUseError):
del self.app.domains[appvm]

def test_204_remove_appvm_dispvm(self):
dispvm = self.app.add_new_vm(
"AppVM", name="test-appvm", template=self.template, label="red"
def test_204_remove_appvm_used_as_default_dispvm(self):
appvm = self.app.add_new_vm(
"AppVM",
name="test-appvm",
template=self.template,
label="red",
template_for_dispvms=True,
)
self.app.add_new_vm(
"AppVM",
name="test-appvm2",
template=self.template,
default_dispvm=dispvm,
default_dispvm=appvm,
label="red",
)
with mock.patch.object(self.app, "vmm"):
with self.assertRaises(qubes.exc.QubesVMInUseError):
del self.app.domains[dispvm]
del self.app.domains[appvm]

def test_205_remove_appvm_dispvm(self):
def test_205_remove_appvm_used_as_template_of_dispvm(self):
appvm = self.app.add_new_vm(
"AppVM",
name="test-appvm",
Expand Down Expand Up @@ -1257,6 +1424,20 @@ def test_303_preload_default_dispvm_change_partial_noop(self):
"domain-preload-dispvm-start", reason=mock.ANY
)

def test_304_event_default_dispvm(self):
self.appvm.template_for_dispvms = False
with self.assertRaises(qubes.exc.QubesPropertyValueError):
self.app.default_dispvm = self.appvm
self.appvm.template_for_dispvms = True
self.app.default_dispvm = self.appvm

def test_305_event_management_dispvm(self):
self.appvm.template_for_dispvms = False
with self.assertRaises(qubes.exc.QubesPropertyValueError):
self.app.management_dispvm = self.appvm
self.appvm.template_for_dispvms = True
self.app.management_dispvm = self.appvm

@qubes.tests.skipUnlessGit
def test_900_example_xml_in_doc(self):
path = os.path.join(qubes.tests.in_git, "doc/example.xml")
Expand Down
14 changes: 12 additions & 2 deletions qubes/tests/vm/adminvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def test_700_run_service(self, mock_subprocess):
"test.service",
"dom0",
"name",
"dom0"
"dom0",
)

mock_subprocess.reset_mock()
Expand Down Expand Up @@ -162,7 +162,7 @@ def test_700_run_service(self, mock_subprocess):
"test.service",
self.appvm.name,
"name",
"dom0"
"dom0",
)

@unittest.mock.patch("qubes.vm.adminvm.AdminVM.run_service")
Expand Down Expand Up @@ -297,3 +297,13 @@ def test_803_preload_set_delay(self):
for value in cases_valid:
with self.subTest(value=value):
self.vm.features["preload-dispvm-delay"] = value

def test_901_prop_event_disposable_template(self):
appvm = self.app.add_new_vm(
"AppVM",
name="test-1",
template=self.template,
label="red",
)
with self.assertRaises(qubes.exc.QubesPropertyValueError):
self.vm.default_dispvm = appvm
Loading