Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/releases.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Release notes

## 0.23.1

### ‼️🚨 High vulnerability fixed – please upgrade ASAP

* In this version of ormar a high severity vulnerability (CVE-2026-27953) in model initialization was patched. The vulnerability allowed injection of `__pk_only__` and `__excluded__` parameters through user-supplied `**kwargs` (e.g. JSON request bodies). Passing `__pk_only__=True` bypassed all Pydantic validation, and `__excluded__` could nullify arbitrary fields. Thanks @Mistz1 for reporting!
* Affected versions:
* All versions prior to `0.23.1`

## 0.23.0

### ‼️🚨 Critical vulnerability fixed – please upgrade ASAP
Expand Down
14 changes: 9 additions & 5 deletions ormar/fields/foreign_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import uuid
from dataclasses import dataclass
from random import choices
from typing import TYPE_CHECKING, Any, ForwardRef, Optional, Union, overload
from typing import TYPE_CHECKING, Any, ForwardRef, Optional, Union, cast, overload

import sqlalchemy
from pydantic import BaseModel, create_model
Expand Down Expand Up @@ -38,14 +38,14 @@ def create_dummy_instance(fk: type["T"], pk: Any = None) -> "T":
:rtype: Model
"""
init_dict = {
**{fk.ormar_config.pkname: pk or -1, "__pk_only__": True},
**{fk.ormar_config.pkname: pk or -1},
**{
k: create_dummy_instance(v.to)
for k, v in fk.ormar_config.model_fields.items()
if v.is_relation and not v.nullable and not v.virtual
},
}
return fk(**init_dict)
return cast("T", fk._internal_construct(_pk_only=True, _excluded=None, **init_dict))


def create_dummy_model(
Expand Down Expand Up @@ -524,9 +524,13 @@ def _construct_model_from_dict(
and value.get(self.to.ormar_config.pkname) is not None
and not self.is_through
):
value["__pk_only__"] = True
pk_only_model = self.to_pk_only(**value)
model = self.to(**value)
model = cast(
"Model",
self.to._internal_construct(_pk_only=True, _excluded=None, **value),
)
else:
model = self.to(**value)
if to_register:
self.register_relation(model=model, child=child)
return pk_only_model if pk_only_model is not None else model
Expand Down
8 changes: 4 additions & 4 deletions ormar/models/model_row.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ def from_row( # noqa: CFQ002

instance: Optional["Model"] = None
if item.get(cls.ormar_config.pkname, None) is not None:
item["__excluded__"] = cls.get_names_to_exclude(
excluded = cls.get_names_to_exclude(
excludable=excludable, alias=table_prefix
)
instance = cast("Model", cls(**item))
instance = cast("Model", cls._construct_with_excluded(excluded, **item))
instance.set_save_status(True)
return instance

Expand Down Expand Up @@ -326,10 +326,10 @@ def _create_through_instance(
child_dict = model_cls.extract_prefixed_table_columns(
item={}, row=row, excludable=excludable, table_prefix=table_prefix
)
child_dict["__excluded__"] = model_cls.get_names_to_exclude(
excluded = model_cls.get_names_to_exclude(
excludable=excludable, alias=table_prefix
)
child = model_cls(**child_dict) # type: ignore
child = model_cls._construct_with_excluded(excluded, **child_dict) # type: ignore
return child

@classmethod
Expand Down
111 changes: 82 additions & 29 deletions ormar/models/newbasemodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,6 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: # type: ignore

Models marked as abstract=True in internal OrmarConfig cannot be initialized.

Accepts also special __pk_only__ flag that indicates that Model is constructed
only with primary key value (so no other fields, it's a child model on other
Model), that causes skipping the validation, that's the only case when the
validation can be skipped.

Accepts also special __excluded__ parameter that contains a set of fields that
should be explicitly set to None, as otherwise pydantic will try to populate
them with their default values if default is set.

:raises ModelError: if abstract model is initialized, model has ForwardRefs
that has not been updated or unknown field is passed
:param args: ignored args
Expand All @@ -125,31 +116,101 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: # type: ignore
self._verify_model_can_be_initialized()
self._initialize_internal_attributes()

pk_only = kwargs.pop("__pk_only__", False)
object.__setattr__(self, "__pk_only__", pk_only)
object.__setattr__(self, "__pk_only__", False)

new_kwargs, through_tmp_dict = self._process_kwargs(kwargs)

if not pk_only:
new_kwargs = self.serialize_nested_models_json_fields(new_kwargs)
self.__pydantic_validator__.validate_python(
new_kwargs = self.serialize_nested_models_json_fields(new_kwargs)
self.__pydantic_validator__.validate_python(
new_kwargs,
self_instance=self, # type: ignore
)
self._register_related_models(new_kwargs, through_tmp_dict)

@classmethod
def _internal_construct(
cls,
_pk_only: bool = False,
_excluded: Optional[set[str]] = None,
**kwargs: Any,
) -> "NewBaseModel":
"""
Internal-only factory for constructing model instances with pk_only or
excluded support. Not reachable from user-supplied kwargs or JSON
deserialization.

:param _pk_only: if True, skip validation and set only pk field
:type _pk_only: bool
:param _excluded: set of field names to explicitly set to None
:type _excluded: Optional[set[str]]
:param kwargs: field values for the model
:type kwargs: Any
:return: constructed model instance
:rtype: NewBaseModel
"""
instance = cls.__new__(cls)
instance._verify_model_can_be_initialized()
instance._initialize_internal_attributes()
object.__setattr__(instance, "__pk_only__", _pk_only)

new_kwargs, through_tmp_dict = instance._process_kwargs(kwargs)

if _excluded:
for field_to_nullify in _excluded:
new_kwargs[field_to_nullify] = None

if not _pk_only:
new_kwargs = instance.serialize_nested_models_json_fields(new_kwargs)
instance.__pydantic_validator__.validate_python(
new_kwargs,
self_instance=self, # type: ignore
self_instance=instance, # type: ignore
)
else:
fields_set = {self.ormar_config.pkname}
values = new_kwargs
object.__setattr__(self, "__dict__", values)
object.__setattr__(self, "__pydantic_fields_set__", fields_set)
# add back through fields
fields_set = {instance.ormar_config.pkname}
object.__setattr__(instance, "__dict__", new_kwargs)
object.__setattr__(instance, "__pydantic_fields_set__", fields_set)

instance._register_related_models(new_kwargs, through_tmp_dict)
return instance

def _register_related_models(
self, new_kwargs: dict[str, Any], through_tmp_dict: dict[str, Any]
) -> None:
"""
Adds back through fields and registers related models after initialization.

:param new_kwargs: processed keyword arguments with field values
:type new_kwargs: dict[str, Any]
:param through_tmp_dict: through model fields extracted during processing
:type through_tmp_dict: dict[str, Any]
"""
new_kwargs.update(through_tmp_dict)
model_fields = object.__getattribute__(self, "ormar_config").model_fields
# register the columns models after initialization
for related in self.extract_related_names().union(self.extract_through_names()):
model_fields[related].expand_relationship(
new_kwargs.get(related), self, to_register=True
)

@classmethod
def _construct_with_excluded(
cls, excluded: set[str], **kwargs: Any
) -> typing_extensions.Self:
"""
Constructs model instance and nullifies excluded fields post-construction.
Used when loading partial results from the database.

:param excluded: set of field names to nullify after construction
:type excluded: set[str]
:param kwargs: field values for the model
:type kwargs: Any
:return: constructed model instance
:rtype: Self
"""
instance = cls(**kwargs)
for field_to_nullify in excluded:
instance.__dict__[field_to_nullify] = None
return instance

def __setattr__(self, name: str, value: Any) -> None: # noqa CCR001
"""
Overwrites setattr in pydantic parent as otherwise descriptors are not called.
Expand Down Expand Up @@ -273,8 +334,6 @@ def _process_kwargs(self, kwargs: dict) -> tuple[dict, dict]: # noqa: CCR001

Checks if field is in the model fields or pydantic fields.

Nullifies fields that should be excluded.

Extracts through models from kwargs into temporary dict.

:param kwargs: passed to init keyword arguments
Expand All @@ -290,7 +349,6 @@ def _process_kwargs(self, kwargs: dict) -> tuple[dict, dict]: # noqa: CCR001
for prop_filed in property_fields:
kwargs.pop(prop_filed, None)

excluded: set[str] = kwargs.pop("__excluded__", set())
if "pk" in kwargs:
kwargs[self.ormar_config.pkname] = kwargs.pop("pk")

Expand Down Expand Up @@ -324,11 +382,6 @@ def _process_kwargs(self, kwargs: dict) -> tuple[dict, dict]: # noqa: CCR001
f"Unknown field '{e.args[0]}' for model {self.get_name(lower=False)}"
)

# explicitly set None to excluded fields
# as pydantic populates them with default if set
for field_to_nullify in excluded:
new_kwargs[field_to_nullify] = None

return new_kwargs, through_tmp_dict

def _remove_extra_parameters_if_they_should_be_ignored(
Expand Down
4 changes: 3 additions & 1 deletion ormar/queryset/queries/prefetch_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,9 @@ def _instantiate_models(self) -> None:
hashable_item = self._hash_item(item)
instance = parsed_rows.setdefault(
hashable_item,
self.relation_field.to(**item, **{"__excluded__": fields_to_exclude}),
self.relation_field.to._construct_with_excluded(
fields_to_exclude, **item
),
)
self.models.append(instance)

Expand Down
8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ name = "ormar"

[tool.poetry]
name = "ormar"
version = "0.23.0"
version = "0.23.1"
description = "An async ORM with fastapi in mind and pydantic validation."
authors = ["Radosław Drążkiewicz <collerek@gmail.com>"]
license = "MIT"
readme = "README.md"
homepage = "https://github.com/collerek/ormar"
repository = "https://github.com/collerek/ormar"
documentation = "https://collerek.github.io/ormar/"
homepage = "https://github.com/ormar-orm/ormar"
repository = "https://github.com/ormar-orm/ormar"
documentation = "https://ormar-orm.github.io/ormar/"
packages = [
{ include="ormar" }
]
Expand Down
123 changes: 123 additions & 0 deletions tests/test_vulnerabilities/test_kwargs_injection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# type: ignore
"""
Tests that __pk_only__ and __excluded__ cannot be injected via kwargs.
"""

import pytest

import ormar
from ormar.exceptions import ModelError
from ormar.models import Model
from tests.lifespan import init_tests
from tests.settings import create_config

base_ormar_config = create_config()


class Item(Model):
ormar_config = base_ormar_config.copy(tablename="security_items")

id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
secret: str = ormar.String(max_length=100, default="default_secret")


class Category(Model):
ormar_config = base_ormar_config.copy(tablename="security_categories")

id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)


class Product(Model):
ormar_config = base_ormar_config.copy(tablename="security_products")

id: int = ormar.Integer(primary_key=True)
name: str = ormar.String(max_length=100)
category = ormar.ForeignKey(Category)


create_test_database = init_tests(base_ormar_config)


@pytest.mark.asyncio
async def test_pk_only_injection_rejected():
"""__pk_only__ in kwargs must not bypass validation."""
with pytest.raises(ModelError, match="Unknown field '__pk_only__'"):
Item(**{"__pk_only__": True, "id": 1, "name": "test"})


@pytest.mark.asyncio
async def test_pk_only_invalid_data_still_validated():
"""Even with __pk_only__=True in kwargs, validation still runs."""
with pytest.raises(ModelError, match="Unknown field '__pk_only__'"):
Item(**{"__pk_only__": True, "id": 1, "name": 123456})


@pytest.mark.asyncio
async def test_excluded_injection_ignored():
"""__excluded__ in kwargs must not nullify fields."""
with pytest.raises(ModelError, match="Unknown field '__excluded__'"):
Item(**{"__excluded__": {"secret"}, "id": 1, "name": "test"})


@pytest.mark.asyncio
async def test_create_with_pk_only_kwarg():
"""objects.create with __pk_only__ must not bypass validation."""
with pytest.raises(ModelError, match="Unknown field '__pk_only__'"):
await Item.objects.create(**{"__pk_only__": True, "id": 1, "name": "injected"})


@pytest.mark.asyncio
async def test_fk_relations_still_work():
"""Internal pk_only path via _internal_construct still works for FK."""
async with base_ormar_config.database:
async with base_ormar_config.database.transaction(force_rollback=True):
cat = await Category.objects.create(name="Electronics")
prod = await Product.objects.create(name="Phone", category=cat)

loaded = await Product.objects.select_related("category").get(id=prod.id)
assert loaded.category.name == "Electronics"


@pytest.mark.asyncio
async def test_query_exclusions_still_work():
"""Internal excluded path via _internal_construct still works for queries."""
async with base_ormar_config.database:
async with base_ormar_config.database.transaction(force_rollback=True):
await Item.objects.create(name="widget", secret="hidden")
loaded = await Item.objects.fields(["id", "name"]).get(name="widget")
assert loaded.name == "widget"
assert loaded.secret is None


@pytest.mark.asyncio
async def test_single_underscore_pk_only_not_a_threat():
"""_pk_only in kwargs is rejected as unknown field, not treated specially."""
with pytest.raises(ModelError, match="Unknown field '_pk_only'"):
Item(**{"_pk_only": True, "id": 1, "name": "test"})


@pytest.mark.asyncio
async def test_single_underscore_excluded_not_a_threat():
"""_excluded in kwargs is rejected as unknown field, not treated specially."""
with pytest.raises(ModelError, match="Unknown field '_excluded'"):
Item(**{"_excluded": {"secret"}, "id": 1, "name": "test"})


@pytest.mark.asyncio
async def test_internal_construct_pk_only():
"""_internal_construct with _pk_only=True skips validation."""
instance = Item._internal_construct(_pk_only=True, id=42)
assert instance.pk == 42
assert instance.__pk_only__ is True


@pytest.mark.asyncio
async def test_internal_construct_excluded():
"""_internal_construct with _excluded nullifies fields."""
instance = Item._internal_construct(
_excluded={"secret"}, id=1, name="test", secret="value"
)
assert instance.secret is None
assert instance.name == "test"
Loading