diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/connections.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/connections.py index 34cfe47334893..3c5ad947749f3 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/connections.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/connections.py @@ -191,7 +191,8 @@ class ConnectionBody(StrictBaseModel): host: str | None = Field(default=None) login: str | None = Field(default=None) schema_: str | None = Field(None, alias="schema") - port: int | None = Field(default=None) + port: int | None = Field(default=None, ge=0, le=65535) + password: str | None = Field(default=None) extra: str | None = Field(default=None) team_name: str | None = Field(max_length=50, default=None) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/connection_test.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/connection_test.py index eaeafcf5ab355..8208025f87d91 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/connection_test.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/connection_test.py @@ -46,5 +46,5 @@ class ConnectionTestConnectionResponse(BaseModel): login: str | None = None password: str | None = None schema_: str | None = Field(None, alias="schema") - port: int | None = None + port: int | None = Field(default=None, ge=0, le=65535) extra: str | None = None diff --git a/airflow-core/src/airflow/cli/commands/connection_command.py b/airflow-core/src/airflow/cli/commands/connection_command.py index 8911bc80b99e2..aac87ab2f5e8b 100644 --- a/airflow-core/src/airflow/cli/commands/connection_command.py +++ b/airflow-core/src/airflow/cli/commands/connection_command.py @@ -314,6 +314,11 @@ def connections_add(args): ) args.conn_type = "generic" + # Validate port if provided + if args.conn_port is not None: + if not isinstance(args.conn_port, int) or not (0 <= args.conn_port <= 65535): + raise SystemExit(f"Port must be between 0 and 65535, got {args.conn_port}") + if has_uri or has_json: invalid_args = [] if has_uri and not _valid_uri(args.conn_uri): diff --git a/airflow-core/src/airflow/models/connection.py b/airflow-core/src/airflow/models/connection.py index 1b4b0f8f86768..943a5cc3ce4c3 100644 --- a/airflow-core/src/airflow/models/connection.py +++ b/airflow-core/src/airflow/models/connection.py @@ -202,6 +202,13 @@ def __init__( mask_secret(quote(self.password)) self.team_name = team_name + @staticmethod + def _validate_port(port: int | None, conn_id: str | None = None) -> None: + """Validate that port is within the valid TCP/UDP range (0-65535).""" + if port is not None and not (0 <= port <= 65535): + conn_msg = f" for connection {conn_id!r}" if conn_id else "" + raise ValueError(f"Port must be between 0 and 65535{conn_msg}, got {port}") + @staticmethod def _validate_extra(extra, conn_id) -> None: """Verify that ``extra`` is a JSON-encoded Python dict.""" diff --git a/airflow-core/src/airflow/models/connection_test.py b/airflow-core/src/airflow/models/connection_test.py index f3d79a2a34c3f..b760d4c7e8ebd 100644 --- a/airflow-core/src/airflow/models/connection_test.py +++ b/airflow-core/src/airflow/models/connection_test.py @@ -107,6 +107,7 @@ class ConnectionTestRequest(Base, FernetFieldsMixin): login: Mapped[str | None] = mapped_column(Text, nullable=True) schema: Mapped[str | None] = mapped_column("schema", String(500), nullable=True) port: Mapped[int | None] = mapped_column(Integer, nullable=True) + commit_on_success: Mapped[bool] = mapped_column( Boolean, nullable=False, default=False, server_default="0" ) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py index 74920b11d51fd..75ab7015d1397 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py @@ -2062,3 +2062,30 @@ def test_post_should_fail_with_non_json_object_as_extra( "method": "POST", }, ) + + +class TestConnectionBodyPortValidation: + """Test port validation in ConnectionBody model.""" + + @pytest.mark.parametrize( + "port", + [0, 1, 80, 443, 3306, 5432, 8080, 65535], + ) + def test_valid_ports(self, port): + """Test that valid port numbers (0-65535) are accepted.""" + body = ConnectionBody(connection_id="test", conn_type="test", port=port) + assert body.port == port + + def test_none_port_allowed(self): + """Test that None port is allowed (optional field).""" + body = ConnectionBody(connection_id="test", conn_type="test", port=None) + assert body.port is None + + @pytest.mark.parametrize( + "port", + [-1, 65536, 99999, 99999999], + ) + def test_invalid_ports(self, port): + """Test that invalid port numbers are rejected.""" + with pytest.raises(ValueError, match="Port must be between 0 and 65535"): + ConnectionBody(connection_id="test", conn_type="test", port=port) diff --git a/airflow-core/tests/unit/models/test_connection.py b/airflow-core/tests/unit/models/test_connection.py index 94cabe5e4daf4..918a7e6fb309e 100644 --- a/airflow-core/tests/unit/models/test_connection.py +++ b/airflow-core/tests/unit/models/test_connection.py @@ -540,3 +540,54 @@ def test_get_conn_id_to_team_name_mapping(self, testing_team: Team, session: Ses "test_conn2": None, } clear_db_connections() + + def test_port_validation_valid_ports(self): + """Test that valid port numbers (0-65535) are accepted.""" + for port in [0, 1, 80, 443, 3306, 5432, 8080, 65535]: + conn = Connection(conn_id=f"test_{port}", conn_type="test", port=port) + assert conn.port == port + + def test_port_validation_invalid_negative_port(self): + """Test that negative port numbers are rejected.""" + with pytest.raises(ValueError, match="Port must be between 0 and 65535"): + Connection(conn_id="test_neg", conn_type="test", port=-1) + + def test_port_validation_invalid_port_too_large(self): + """Test that port numbers > 65535 are rejected.""" + with pytest.raises(ValueError, match="Port must be between 0 and 65535"): + Connection(conn_id="test_large", conn_type="test", port=65536) + + def test_port_validation_invalid_port_very_large(self): + """Test that very large port numbers are rejected.""" + with pytest.raises(ValueError, match="Port must be between 0 and 65535"): + Connection(conn_id="test_very_large", conn_type="test", port=99999999) + + def test_port_validation_none_allowed(self): + """Test that None port is allowed (optional field).""" + conn = Connection(conn_id="test_none", conn_type="test", port=None) + assert conn.port is None + + def test_port_validation_from_uri_valid_port(self): + """Test that valid ports from URI are accepted.""" + conn = Connection(uri="postgres://user:pass@host:5432/db", conn_id="test_uri") + assert conn.port == 5432 + + def test_port_validation_from_uri_invalid_port(self): + """Test that invalid ports from URI are rejected.""" + with pytest.raises(ValueError, match="Port must be between 0 and 65535"): + Connection(uri="postgres://user:pass@host:99999/db", conn_id="test_uri_invalid") + + def test_port_validation_from_json_valid_port(self): + """Test that valid ports from JSON are accepted.""" + conn = Connection.from_json('{"conn_type": "postgres", "port": "5432"}', conn_id="test_json") + assert conn.port == 5432 + + def test_port_validation_from_json_invalid_port(self): + """Test that invalid ports from JSON are rejected.""" + with pytest.raises(ValueError, match="Port must be between 0 and 65535"): + Connection.from_json('{"conn_type": "postgres", "port": "99999"}', conn_id="test_json_invalid") + + def test_port_validation_from_json_negative_port(self): + """Test that negative ports from JSON are rejected.""" + with pytest.raises(ValueError, match="Port must be between 0 and 65535"): + Connection.from_json('{"conn_type": "postgres", "port": "-1"}', conn_id="test_json_neg") diff --git a/task-sdk/src/airflow/sdk/definitions/connection.py b/task-sdk/src/airflow/sdk/definitions/connection.py index 06a95a3b868b8..717484eff9b50 100644 --- a/task-sdk/src/airflow/sdk/definitions/connection.py +++ b/task-sdk/src/airflow/sdk/definitions/connection.py @@ -123,6 +123,11 @@ class Connection: EXTRA_KEY = "__extra__" + def __attrs_post_init__(self) -> None: + """Validate port after initialization.""" + if self.port is not None and not (0 <= self.port <= 65535): + raise ValueError(f"Port must be between 0 and 65535, got {self.port}") + @overload def __init__(self, *, conn_id: str, uri: str) -> None: ... diff --git a/task-sdk/tests/task_sdk/definitions/test_connection.py b/task-sdk/tests/task_sdk/definitions/test_connection.py index 5746b1b14f75c..fbd4468d2dd84 100644 --- a/task-sdk/tests/task_sdk/definitions/test_connection.py +++ b/task-sdk/tests/task_sdk/definitions/test_connection.py @@ -421,3 +421,58 @@ def test_from_uri_roundtrip(self): original_extra = json.loads(conn_from_original.extra) roundtrip_extra = json.loads(conn_from_roundtrip.extra) assert original_extra == roundtrip_extra + + +class TestConnectionPortValidation: + """Test port validation in Connection model.""" + + def test_port_validation_valid_ports(self): + """Test that valid port numbers (0-65535) are accepted.""" + for port in [0, 1, 80, 443, 3306, 5432, 8080, 65535]: + conn = Connection(conn_id=f"test_{port}", conn_type="test", port=port) + assert conn.port == port + + def test_port_validation_none_allowed(self): + """Test that None port is allowed (optional field).""" + conn = Connection(conn_id="test_none", conn_type="test", port=None) + assert conn.port is None + + def test_port_validation_invalid_negative_port(self): + """Test that negative port numbers are rejected.""" + with pytest.raises(ValueError, match="Port must be between 0 and 65535"): + Connection(conn_id="test_neg", conn_type="test", port=-1) + + def test_port_validation_invalid_port_too_large(self): + """Test that port numbers > 65535 are rejected.""" + with pytest.raises(ValueError, match="Port must be between 0 and 65535"): + Connection(conn_id="test_large", conn_type="test", port=65536) + + def test_port_validation_invalid_port_very_large(self): + """Test that very large port numbers are rejected.""" + with pytest.raises(ValueError, match="Port must be between 0 and 65535"): + Connection(conn_id="test_very_large", conn_type="test", port=99999999) + + def test_port_validation_from_uri_valid_port(self): + """Test that valid ports from URI are accepted.""" + conn = Connection.from_uri("postgres://user:***@host:5432/db", conn_id="test_uri") + assert conn.port == 5432 + + def test_port_validation_from_uri_invalid_port(self): + """Test that invalid ports from URI are rejected.""" + with pytest.raises(ValueError, match="Port must be between 0 and 65535"): + Connection.from_uri("postgres://user:***@host:99999/db", conn_id="test_uri_invalid") + + def test_port_validation_from_json_valid_port(self): + """Test that valid ports from JSON are accepted.""" + conn = Connection.from_json('{"conn_type": "postgres", "port": "5432"}', conn_id="test_json") + assert conn.port == 5432 + + def test_port_validation_from_json_invalid_port(self): + """Test that invalid ports from JSON are rejected.""" + with pytest.raises(ValueError, match="Port must be between 0 and 65535"): + Connection.from_json('{"conn_type": "postgres", "port": "99999"}', conn_id="test_json_invalid") + + def test_port_validation_from_json_negative_port(self): + """Test that negative ports from JSON are rejected.""" + with pytest.raises(ValueError, match="Port must be between 0 and 65535"): + Connection.from_json('{"conn_type": "postgres", "port": "-1"}', conn_id="test_json_neg")