diff --git a/.github/workflows/docker_image.yaml b/.github/workflows/docker_image.yaml index c74b1309..61885920 100644 --- a/.github/workflows/docker_image.yaml +++ b/.github/workflows/docker_image.yaml @@ -20,7 +20,17 @@ jobs: uses: docker/metadata-action@v5 with: images: crate/crate-operator - tags: type=semver,pattern={{major}}.{{minor}}.{{patch}} + tags: | + type=semver,pattern={{major}}.{{minor}}.{{patch}} + type=raw,value={{tag}} + - name: Docker meta (sidecar) + id: meta_sidecar + uses: docker/metadata-action@v5 + with: + images: crate/crate-control + tags: | + type=semver,pattern={{major}}.{{minor}}.{{patch}} + type=raw,value={{tag}} - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 - name: Login to DockerHub @@ -36,3 +46,11 @@ jobs: platforms: linux/amd64,linux/arm64 push: true tags: ${{ steps.meta.outputs.tags }} + - name: Build and publish sidecar + uses: docker/build-push-action@v6 + with: + context: ./sidecars/cratecontrol + file: ./sidecars/cratecontrol/Dockerfile + platforms: linux/amd64,linux/arm64 + push: true + tags: ${{ steps.meta_sidecar.outputs.tags }} diff --git a/CHANGES.rst b/CHANGES.rst index a781dc57..e453d7dd 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -5,6 +5,24 @@ Changelog Unreleased ---------- +* Added support for running CrateDB on Red Hat OpenShift Container Platform. + When ``CLOUD_PROVIDER`` is set to ``openshift``, the operator will: + + - Use a lightweight ``crate-control`` sidecar for SQL execution instead of + ``pod_exec`` to comply with OpenShift's restricted security policies. + - Create OpenShift-specific SecurityContextConstraints (SCC) and ServiceAccounts + to allow CrateDB to run with the required ``SYS_CHROOT`` capability. + - Skip privileged init containers and adjust security contexts for compatibility + with OpenShift's security model. + - Disable ``blockOwnerDeletion`` on PVC owner references to work with OpenShift's + restricted RBAC permissions. + +* Refactored SQL execution logic to support both traditional ``pod_exec`` and + sidecar-based approaches, with automatic fallback based on available resources. + +* Updated RBAC permissions to include ``serviceaccounts`` and OpenShift + ``securitycontextconstraints`` resources. + 2.58.0 (2026-02-16) ------------------- diff --git a/README.rst b/README.rst index 6243ba49..637ab0c4 100644 --- a/README.rst +++ b/README.rst @@ -79,7 +79,7 @@ Previous versions might work, but the operator will not attempt to set a license 🎉 Features -========== +=========== - "all equal nodes" cluster setup - "master + data nodes" cluster setup @@ -90,6 +90,7 @@ Previous versions might work, but the operator will not attempt to set a license - custom cluster settings - custom storage classes - region/zone awareness for AWS and Azure +- OpenShift support (Red Hat OpenShift Container Platform 4.x) 💽 Installation =============== @@ -110,6 +111,23 @@ dependency of the `Operator Helm Chart`_. To override the environment variables from values.yaml, please refer to the `configuration documentation`_. +Installation on OpenShift +------------------------- + +When installing on Red Hat OpenShift Container Platform, additional configuration +is required, after adding the Helm repo: + +.. code-block:: console + + helm install crate-operator crate-operator/crate-operator \ + --set env.CRATEDB_OPERATOR_CLOUD_PROVIDER=openshift \ + --set env.CRATEDB_OPERATOR_CRATE_CONTROL_IMAGE=your-registry/crate-control:latest \ + --namespace crate-operator \ + --create-namespace + +Replace ``your-registry/crate-control:latest`` with the location of your built +crate-control sidecar image. See the `OpenShift documentation`_ for details. + Installation with kubectl ------------------------- @@ -151,3 +169,4 @@ Please refer to the `Working on the operator`_ section of the documentation. .. _Working on the operator: ./docs/source/development.rst .. _CRD Helm Chart: ./deploy/charts/crate-operator-crds/README.md .. _Operator Helm Chart: ./deploy/charts/crate-operator/README.md +.. _OpenShift documentation: ./docs/source/openshift.rst diff --git a/crate/operator/bootstrap.py b/crate/operator/bootstrap.py index 94f0548f..a6bdaf14 100644 --- a/crate/operator/bootstrap.py +++ b/crate/operator/bootstrap.py @@ -34,8 +34,10 @@ GC_USER_SECRET_NAME, GC_USERNAME, SYSTEM_USERNAME, + CloudProvider, ) from crate.operator.cratedb import create_user, get_connection +from crate.operator.sql import execute_sql_via_crate_control from crate.operator.utils import crate from crate.operator.utils.k8s_api_client import GlobalApiClient from crate.operator.utils.kopf import StateBasedSubHandler @@ -78,9 +80,143 @@ async def bootstrap_system_user( when SSL/TLS is enabled, and encrypted connections aren't possible when no SSL/TLS is configured. """ - scheme = "https" if has_ssl else "http" password = await get_system_user_password(core, namespace, name) + command_create_user: Dict[str, Any] = { + "stmt": f'CREATE USER "{SYSTEM_USERNAME}" WITH (password = ?)', + "args": [password], + } + + command_alter_user: Dict[str, Any] = { + "stmt": f'ALTER USER "{SYSTEM_USERNAME}" SET (password = ?)', + "args": [password], + } + + command_grant: Dict[str, Any] = { + "stmt": f'GRANT ALL PRIVILEGES TO "{SYSTEM_USERNAME}"', + "args": [], + } + + exception_logger = logger.exception if config.TESTING else logger.error + + if config.CLOUD_PROVIDER == CloudProvider.OPENSHIFT: + await _bootstrap_user_via_sidecar( + namespace, + name, + command_create_user, + command_alter_user, + command_grant, + logger, + exception_logger, + ) + else: + scheme = "https" if has_ssl else "http" + await _bootstrap_user_via_pod_exec( + namespace, + master_node_pod, + scheme, + command_create_user, + command_alter_user, + command_grant, + logger, + exception_logger, + ) + + +async def _bootstrap_user_via_sidecar( + namespace: str, + name: str, + command_create_user: Dict[str, Any], + command_alter_user: Dict[str, Any], + command_grant: Dict[str, Any], + logger: logging.Logger, + exception_logger, +) -> None: + """ + Bootstrap system user using the crate-control sidecar. + """ + + needs_update = False + try: + logger.info("Trying to create system user via sidecar...") + result = await execute_sql_via_crate_control( + namespace=namespace, + name=name, + sql=command_create_user["stmt"], + args=command_create_user["args"], + logger=logger, + ) + except Exception as e: + exception_logger("... failed. %s", str(e)) + raise _temporary_error() + else: + logger.info("Create user result: %s", result) + if "rowcount" in result: + logger.info("... success") + elif ( + "error" in result + and "RoleAlreadyExistsException" in result["error"]["message"] + ): + needs_update = True + logger.info("... success. Already present") + else: + logger.info("... error. %s", result) + raise _temporary_error() + + if needs_update: + try: + logger.info("Trying to update system user password via sidecar...") + result = await execute_sql_via_crate_control( + namespace=namespace, + name=name, + sql=command_alter_user["stmt"], + args=command_alter_user["args"], + logger=logger, + ) + except Exception as e: + exception_logger("... failed: %s", str(e)) + raise _temporary_error() + else: + if "rowcount" in result: + logger.info("... success") + else: + logger.info("... error. %s", result) + raise _temporary_error() + + try: + logger.info("Trying to grant system user all privileges via sidecar...") + result = await execute_sql_via_crate_control( + namespace=namespace, + name=name, + sql=command_grant["stmt"], + args=command_grant["args"], + logger=logger, + ) + except Exception as e: + exception_logger("... failed. %s", str(e)) + raise _temporary_error() + else: + if "rowcount" in result: + logger.info("... success") + else: + logger.info("... error. %s", result) + raise _temporary_error() + + +async def _bootstrap_user_via_pod_exec( + namespace: str, + master_node_pod: str, + scheme: str, + command_create_user: Dict[str, Any], + command_alter_user: Dict[str, Any], + command_grant: Dict[str, Any], + logger: logging.Logger, + exception_logger, +) -> None: + """ + Bootstrap system user using pod_exec (legacy approach). + """ + def get_curl_command(payload: dict) -> List[str]: return [ "curl", @@ -97,95 +233,62 @@ def get_curl_command(payload: dict) -> List[str]: "\\n", ] - command_create_user = get_curl_command( + command_create = get_curl_command( { - "stmt": 'CREATE USER "{}" WITH (password = $1)'.format(SYSTEM_USERNAME), - "args": [password], + "stmt": command_create_user["stmt"].replace("?", "$1"), + "args": command_create_user["args"], } ) - command_alter_user = get_curl_command( + command_alter = get_curl_command( { - "stmt": 'ALTER USER "{}" SET (password = $1)'.format(SYSTEM_USERNAME), - "args": [password], + "stmt": command_alter_user["stmt"].replace("?", "$1"), + "args": command_alter_user["args"], } ) - command_grant = get_curl_command( - {"stmt": 'GRANT ALL PRIVILEGES TO "{}" '.format(SYSTEM_USERNAME)} + command_grant_curl = get_curl_command( + { + "stmt": command_grant["stmt"], + "args": command_grant["args"], + } ) - exception_logger = logger.exception if config.TESTING else logger.error async def pod_exec(cmd): - return await core_ws.connect_get_namespaced_pod_exec( - namespace=namespace, - name=master_node_pod, - command=cmd, - container="crate", - stderr=True, - stdin=False, - stdout=True, - tty=False, - ) + async with WsApiClient() as ws_api_client: + core_ws = CoreV1Api(ws_api_client) + return await core_ws.connect_get_namespaced_pod_exec( + namespace=namespace, + name=master_node_pod, + command=cmd, + container="crate", + stderr=True, + stdin=False, + stdout=True, + tty=False, + ) needs_update = False - async with WsApiClient() as ws_api_client: - core_ws = CoreV1Api(ws_api_client) - try: - logger.info("Trying to create system user ...") - result = await pod_exec(command_create_user) - except ApiException as e: - # We don't use `logger.exception()` to not accidentally include the - # password in the log messages which might be part of the string - # representation of the exception. - exception_logger("... failed. Status: %s Reason: %s", e.status, e.reason) - raise _temporary_error() - except WSServerHandshakeError as e: - # We don't use `logger.exception()` to not accidentally include the - # password in the log messages which might be part of the string - # representation of the exception. - exception_logger("... failed. Status: %s Message: %s", e.status, e.message) - raise _temporary_error() + try: + logger.info("Trying to create system user via pod_exec...") + result = await pod_exec(command_create) + except (ApiException, WSServerHandshakeError) as e: + exception_logger("... failed. %s", str(e)) + raise _temporary_error() + else: + if "rowcount" in result: + logger.info("... success") + elif "AlreadyExistsException" in result: + needs_update = True + logger.info("... success. Already present") else: - if "rowcount" in result: - logger.info("... success") - elif "AlreadyExistsException" in result: - needs_update = True - logger.info("... success. Already present") - else: - logger.info("... error. %s", result) - raise _temporary_error() - - if needs_update: - try: - logger.info("Trying to update system user password ...") - result = await pod_exec(command_alter_user) - except ApiException as e: - # We don't use `logger.exception()` to not accidentally include the - # password in the log messages which might be part of the string - # representation of the exception. - exception_logger( - "... failed. Status: %s Reason: %s", e.status, e.reason - ) - raise _temporary_error() - except WSServerHandshakeError as e: - # We don't use `logger.exception()` to not accidentally include the - # password in the log messages which might be part of the string - # representation of the exception. - exception_logger( - "... failed. Status: %s Message: %s", e.status, e.message - ) - raise _temporary_error() - else: - if "rowcount" in result: - logger.info("... success") - else: - logger.info("... error. %s", result) - raise _temporary_error() + logger.info("... error. %s", result) + raise _temporary_error() + if needs_update: try: - logger.info("Trying to grant system user all privileges ...") - result = await pod_exec(command_grant) - except (ApiException, WSServerHandshakeError): - logger.exception("... failed") + logger.info("Trying to update system user password via pod_exec...") + result = await pod_exec(command_alter) + except (ApiException, WSServerHandshakeError) as e: + exception_logger("... failed. %s", str(e)) raise _temporary_error() else: if "rowcount" in result: @@ -194,6 +297,19 @@ async def pod_exec(cmd): logger.info("... error. %s", result) raise _temporary_error() + try: + logger.info("Trying to grant system user all privileges via pod_exec...") + result = await pod_exec(command_grant_curl) + except (ApiException, WSServerHandshakeError): + logger.exception("... failed") + raise _temporary_error() + else: + if "rowcount" in result: + logger.info("... success") + else: + logger.info("... error. %s", result) + raise _temporary_error() + async def bootstrap_gc_admin_user(core: CoreV1Api, namespace: str, name: str): """ diff --git a/crate/operator/config.py b/crate/operator/config.py index caca70cf..927b0eea 100644 --- a/crate/operator/config.py +++ b/crate/operator/config.py @@ -62,6 +62,9 @@ class Config: #: The Docker image that contains scripts to run cluster backups. CLUSTER_BACKUP_IMAGE: Optional[str] = None + #: The Docker image that contains CrateControl sidecar. + CRATE_CONTROL_IMAGE: Optional[str] = None + #: The volume size for the ``PersistentVolume`` that is used as a storage #: location for Java heap dumps. DEBUG_VOLUME_SIZE: bitmath.Byte = bitmath.GiB(64) @@ -233,6 +236,10 @@ def load(self): "CLUSTER_BACKUP_IMAGE", default=self.CLUSTER_BACKUP_IMAGE ) + self.CRATE_CONTROL_IMAGE = self.env( + "CRATE_CONTROL_IMAGE", default=self.CRATE_CONTROL_IMAGE + ) + debug_volume_size = self.env( "DEBUG_VOLUME_SIZE", default=str(self.DEBUG_VOLUME_SIZE) ) diff --git a/crate/operator/constants.py b/crate/operator/constants.py index bc93490d..5e80b0fd 100644 --- a/crate/operator/constants.py +++ b/crate/operator/constants.py @@ -61,6 +61,9 @@ TERMINATION_GRACE_PERIOD_SECONDS = 1200 DECOMMISSION_TIMEOUT = "720s" +# Port on which the crate-control sidecar listens. +CRATE_CONTROL_PORT = 5050 + # dcutil fileserver URL for dynamic architecture detection DCUTIL_FILESERVER_URL = "http://dc-util-fileserver.dcutil.svc.cluster.local/latest" @@ -83,6 +86,7 @@ class CloudProvider(str, enum.Enum): AWS = "aws" AZURE = "azure" GCP = "gcp" + OPENSHIFT = "openshift" class Port(enum.Enum): diff --git a/crate/operator/create.py b/crate/operator/create.py index 5d9ad591..72c083f5 100644 --- a/crate/operator/create.py +++ b/crate/operator/create.py @@ -29,8 +29,10 @@ import kopf import yaml from kubernetes_asyncio.client import ( + ApiException, AppsV1Api, CoreV1Api, + CustomObjectsApi, PolicyV1Api, RbacAuthorizationV1Api, RbacV1Subject, @@ -63,6 +65,7 @@ V1PodAntiAffinity, V1PodDisruptionBudget, V1PodDisruptionBudgetSpec, + V1PodSecurityContext, V1PodSpec, V1PodTemplateSpec, V1PolicyRule, @@ -76,6 +79,7 @@ V1SecretVolumeSource, V1SecurityContext, V1Service, + V1ServiceAccount, V1ServicePort, V1ServiceSpec, V1StatefulSet, @@ -90,6 +94,7 @@ from crate.operator.config import config from crate.operator.constants import ( API_GROUP, + CRATE_CONTROL_PORT, DATA_PVC_NAME_PREFIX, DCUTIL_FILESERVER_URL, DECOMMISSION_TIMEOUT, @@ -312,9 +317,104 @@ def get_statefulset_containers( crate_command: List[str], crate_env: List[V1EnvVar], crate_volume_mounts: List[V1VolumeMount], + name: str, + has_ssl: bool, ) -> List[V1Container]: sql_exporter_image = config.SQL_EXPORTER_IMAGE - return [ + crate_control_image = config.CRATE_CONTROL_IMAGE + + if config.CLOUD_PROVIDER == CloudProvider.OPENSHIFT and not crate_control_image: + raise kopf.PermanentError( + "CRATEDB_OPERATOR_CRATE_CONTROL_IMAGE must be set when " + "CRATEDB_OPERATOR_CLOUD_PROVIDER=openshift" + ) + + crate_container = V1Container( + command=crate_command, + env=crate_env, + image=crate_image, + name="crate", + ports=[ + V1ContainerPort(container_port=http_port, name="http"), + V1ContainerPort(container_port=jmx_port, name="jmx"), + V1ContainerPort(container_port=postgres_port, name="postgres"), + V1ContainerPort(container_port=prometheus_port, name="prometheus"), + V1ContainerPort(container_port=transport_port, name="transport"), + ], + readiness_probe=V1Probe( + http_get=V1HTTPGetAction(path="/ready", port=prometheus_port), + initial_delay_seconds=10 if config.TESTING else 30, + period_seconds=5 if config.TESTING else 10, + ), + resources=V1ResourceRequirements( + limits={ + "cpu": str( + get_cluster_resource_limits( + node_spec, resource_type="cpu", fallback_key="cpus" + ) + ), + "memory": format_bitmath( + bitmath.parse_string_unsafe( + get_cluster_resource_limits(node_spec, resource_type="memory") + ) + ), + }, + requests={ + "cpu": str( + get_cluster_resource_requests( + node_spec, resource_type="cpu", fallback_key="cpus" + ) + ), + "memory": format_bitmath( + bitmath.parse_string_unsafe( + get_cluster_resource_requests(node_spec, resource_type="memory") + ) + ), + }, + ), + volume_mounts=crate_volume_mounts, + security_context=V1SecurityContext( + capabilities=V1Capabilities(add=["SYS_CHROOT"]) + ), + ) + + if config.CLOUD_PROVIDER != CloudProvider.OPENSHIFT: + crate_container.lifecycle = V1Lifecycle( + post_start=V1LifecycleHandler( + _exec=V1ExecAction( + command=[ + "/bin/sh", + "-c", + ( + "ARCH=$(uname -m) &&\n" + f"curl -sLO {DCUTIL_FILESERVER_URL}/dc_util-linux-${{ARCH}} &&\n" # noqa + f"curl -sLO {DCUTIL_FILESERVER_URL}/dc_util-linux-${{ARCH}}.sha256 &&\n" # noqa + "sha256sum -c dc_util-linux-${ARCH}.sha256 &&\n" + "chmod +x dc_util-linux-${ARCH} &&\n" + "./dc_util-linux-${ARCH} --reset-routing || true" + ), + ] + ) + ), + pre_stop=V1LifecycleHandler( + _exec=V1ExecAction( + command=[ + "/bin/sh", + "-c", + ( + "ARCH=$(uname -m) &&\n" + f"curl -sLO {DCUTIL_FILESERVER_URL}/dc_util-linux-${{ARCH}} &&\n" # noqa + f"curl -sLO {DCUTIL_FILESERVER_URL}/dc_util-linux-${{ARCH}}.sha256 &&\n" # noqa + "sha256sum -c dc_util-linux-${ARCH}.sha256 &&\n" + "chmod +x dc_util-linux-${ARCH} &&\n" + f"./dc_util-linux-${{ARCH}} --min-availability PRIMARIES --timeout {DECOMMISSION_TIMEOUT} || true" # noqa + ), + ] + ) + ), + ) + + containers = [ V1Container( command=[ "/bin/sql_exporter", @@ -331,93 +431,53 @@ def get_statefulset_containers( ), ], ), - V1Container( - command=crate_command, - env=crate_env, - image=crate_image, - name="crate", - ports=[ - V1ContainerPort(container_port=http_port, name="http"), - V1ContainerPort(container_port=jmx_port, name="jmx"), - V1ContainerPort(container_port=postgres_port, name="postgres"), - V1ContainerPort(container_port=prometheus_port, name="prometheus"), - V1ContainerPort(container_port=transport_port, name="transport"), - ], - readiness_probe=V1Probe( - http_get=V1HTTPGetAction(path="/ready", port=prometheus_port), - initial_delay_seconds=10 if config.TESTING else 30, - period_seconds=5 if config.TESTING else 10, - ), - resources=V1ResourceRequirements( - limits={ - "cpu": str( - get_cluster_resource_limits( - node_spec, resource_type="cpu", fallback_key="cpus" - ) - ), - "memory": format_bitmath( - bitmath.parse_string_unsafe( - get_cluster_resource_limits( - node_spec, resource_type="memory" - ) - ) - ), - }, - requests={ - "cpu": str( - get_cluster_resource_requests( - node_spec, resource_type="cpu", fallback_key="cpus" - ) + crate_container, + ] + + # Only add crate-control sidecar for OpenShift + if config.CLOUD_PROVIDER == CloudProvider.OPENSHIFT: + containers.append( + V1Container( + name="crate-control", + image=crate_control_image, + ports=[ + V1ContainerPort(container_port=CRATE_CONTROL_PORT, name="control"), + ], + env=[ + V1EnvVar( + name="CRATE_HTTP_URL", + value=f"{'https' if has_ssl else 'http'}://localhost:4200/_sql", ), - "memory": format_bitmath( - bitmath.parse_string_unsafe( - get_cluster_resource_requests( - node_spec, resource_type="memory" + V1EnvVar( + name="BOOTSTRAP_TOKEN", + value_from=V1EnvVarSource( + secret_key_ref=V1SecretKeySelector( + name=f"crate-control-{name}", + key="token", ) - ) + ), ), - }, - ), - volume_mounts=crate_volume_mounts, - security_context=V1SecurityContext( - capabilities=V1Capabilities(add=["SYS_CHROOT"]) - ), - lifecycle=V1Lifecycle( - post_start=V1LifecycleHandler( - _exec=V1ExecAction( - command=[ - "/bin/sh", - "-c", - ( - "ARCH=$(uname -m) &&\n" - f"curl -sLO {DCUTIL_FILESERVER_URL}/dc_util-linux-${{ARCH}} &&\n" # noqa - f"curl -sLO {DCUTIL_FILESERVER_URL}/dc_util-linux-${{ARCH}}.sha256 &&\n" # noqa - "sha256sum -c dc_util-linux-${ARCH}.sha256 &&\n" - "chmod +x dc_util-linux-${ARCH} &&\n" - "./dc_util-linux-${ARCH} --reset-routing || true" - ), - ] - ) + ], + readiness_probe=V1Probe( + http_get=V1HTTPGetAction(path="/health", port=CRATE_CONTROL_PORT), + initial_delay_seconds=20, + period_seconds=5, + failure_threshold=6, ), - pre_stop=V1LifecycleHandler( - _exec=V1ExecAction( - command=[ - "/bin/sh", - "-c", - ( - "ARCH=$(uname -m) &&\n" - f"curl -sLO {DCUTIL_FILESERVER_URL}/dc_util-linux-${{ARCH}} &&\n" # noqa - f"curl -sLO {DCUTIL_FILESERVER_URL}/dc_util-linux-${{ARCH}}.sha256 &&\n" # noqa - "sha256sum -c dc_util-linux-${ARCH}.sha256 &&\n" - "chmod +x dc_util-linux-${ARCH} &&\n" - f"./dc_util-linux-${{ARCH}} --min-availability PRIMARIES --timeout {DECOMMISSION_TIMEOUT} || true" # noqa - ), - ] - ) + resources=V1ResourceRequirements( + limits={ + "cpu": "100m", + "memory": "64Mi", + }, + requests={ + "cpu": "50m", + "memory": "32Mi", + }, ), - ), - ), - ] + ) + ) + + return containers def get_statefulset_crate_command( @@ -656,34 +716,31 @@ def get_statefulset_crate_volume_mounts( def get_statefulset_init_containers(crate_image: str) -> List[V1Container]: - heapdump_cmd = ( - "mkdir -pv /resource/heapdump ; chown -R crate:crate /resource" # noqa - ) - # Ignore failures on AWS, where we are likely using EFS (which do not permit chown) - if config.CLOUD_PROVIDER == CloudProvider.AWS: - heapdump_cmd = f"{heapdump_cmd} || true" + containers = [] + + # Only add the privileged sysctl init container if NOT on OpenShift + if config.CLOUD_PROVIDER != CloudProvider.OPENSHIFT: + containers.append( + V1Container( + command=[ + "sysctl", + "-w", + # CrateDB requirement due to the number of open file descriptors + "vm.max_map_count=2566080", + # Certain load balancers (i.e. AWS NLB) terminate idle connections. + # We set explicit TCP keepalives so that this does not happen. + "net.ipv4.tcp_keepalive_time=120", + "net.ipv4.tcp_keepalive_intvl=30", + "net.ipv4.tcp_keepalive_probes=6", + ], + image="busybox:1.35.0", + image_pull_policy="IfNotPresent", + name="init-sysctl", + security_context=V1SecurityContext(privileged=True), + ) + ) - return [ - V1Container( - # We need to do this in an init container because of the required - # security context. We don't want to run CrateDB with that context, - # thus doing it before. - command=[ - "sysctl", - "-w", - # CrateDB requirement due to the number of open file descriptors - "vm.max_map_count=2566080", - # Certain load balancers (i.e. AWS NLB) terminate idle connections. - # We set explicit TCP keepalives so that this does not happen. - "net.ipv4.tcp_keepalive_time=120", - "net.ipv4.tcp_keepalive_intvl=30", - "net.ipv4.tcp_keepalive_probes=6", - ], - image="busybox:1.35.0", - image_pull_policy="IfNotPresent", - name="init-sysctl", - security_context=V1SecurityContext(privileged=True), - ), + containers.append( V1Container( command=[ "wget", @@ -696,6 +753,14 @@ def get_statefulset_init_containers(crate_image: str) -> List[V1Container]: name="fetch-jmx-exporter", volume_mounts=[V1VolumeMount(name="jmxdir", mount_path="/jmxdir")], ), + ) + + heapdump_cmd = "mkdir -pv /resource/heapdump ; chown -R crate:crate /resource" + # Ignore failures on AWS EFS which doesn't permit chown + if config.CLOUD_PROVIDER == CloudProvider.AWS: + heapdump_cmd = f"{heapdump_cmd} || true" + + containers.append( V1Container( command=[ "sh", @@ -706,8 +771,10 @@ def get_statefulset_init_containers(crate_image: str) -> List[V1Container]: image_pull_policy="IfNotPresent", name="mkdir-heapdump", volume_mounts=[V1VolumeMount(name="debug", mount_path="/resource")], - ), - ] + ) + ) + + return containers def get_statefulset_pvc( @@ -840,8 +907,23 @@ def get_statefulset( ), get_statefulset_crate_env(node_spec, jmx_port, prometheus_port, ssl), get_statefulset_crate_volume_mounts(node_spec, ssl), + name, + has_ssl=bool(ssl), ) + service_account_name = None + if config.CLOUD_PROVIDER == CloudProvider.OPENSHIFT: + service_account_name = f"crate-{name}" + + # On OpenShift, set security context to run as root (UID 0) + # This allows the entrypoint's chroot to work + pod_security_context = None + if config.CLOUD_PROVIDER == CloudProvider.OPENSHIFT: + pod_security_context = V1PodSecurityContext( + run_as_user=0, + fs_group=0, + ) + return V1StatefulSet( metadata=V1ObjectMeta( annotations=node_spec.get("annotations"), @@ -866,6 +948,8 @@ def get_statefulset( labels=node_labels, ), spec=V1PodSpec( + service_account_name=service_account_name, + security_context=pod_security_context, affinity=get_statefulset_affinity(name, logger, node_spec), topology_spread_constraints=get_topology_spread(name, logger), containers=containers, @@ -882,6 +966,121 @@ def get_statefulset( ) +async def create_crate_scc( + namespace: str, + name: str, + logger: logging.Logger, +) -> None: + """ + Create a SecurityContextConstraint for CrateDB on OpenShift. + + This SCC allows running as any UID (including root) and the SYS_CHROOT capability. + + Security Note: While this allows starting as root, the CrateDB entrypoint + immediately uses chroot to drop privileges to UID 1000 (crate user). This + maintains security while being compatible with OpenShift's restricted environment. + """ + scc_name = f"crate-anyuid-{namespace}-{name}" + + scc = { + "apiVersion": "security.openshift.io/v1", + "kind": "SecurityContextConstraints", + "metadata": { + "name": scc_name, + }, + "allowPrivilegedContainer": False, + "allowedCapabilities": ["SYS_CHROOT"], + "defaultAddCapabilities": None, + "requiredDropCapabilities": ["KILL", "MKNOD"], + "runAsUser": {"type": "RunAsAny"}, + "seLinuxContext": {"type": "MustRunAs"}, + "fsGroup": {"type": "RunAsAny"}, + "supplementalGroups": {"type": "RunAsAny"}, + "volumes": [ + "configMap", + "downwardAPI", + "emptyDir", + "persistentVolumeClaim", + "projected", + "secret", + ], + "users": [f"system:serviceaccount:{namespace}:crate-{name}"], + "groups": [], + } + + async with GlobalApiClient() as api_client: + custom_api = CustomObjectsApi(api_client) + try: + await custom_api.create_cluster_custom_object( + group="security.openshift.io", + version="v1", + plural="securitycontextconstraints", + body=scc, + ) + logger.info("Created SCC %s", scc_name) + except ApiException as e: + if e.status == 409: # Already exists + logger.info("SCC %s already exists", scc_name) + else: + raise + + +async def create_crate_service_account( + owner_references: Optional[List[V1OwnerReference]], + namespace: str, + name: str, + labels: LabelType, + logger: logging.Logger, +) -> None: + """Create a ServiceAccount for CrateDB pods.""" + sa = V1ServiceAccount( + metadata=V1ObjectMeta( + name=f"crate-{name}", + namespace=namespace, + labels=labels, + owner_references=owner_references, + ), + ) + + async with GlobalApiClient() as api_client: + core = CoreV1Api(api_client) + await call_kubeapi( + core.create_namespaced_service_account, + logger, + continue_on_conflict=True, + namespace=namespace, + body=sa, + ) + + +class CreateCrateSCCSubHandler(StateBasedSubHandler): + @crate.on.error(error_handler=crate.send_create_failed_notification) + async def handle( + self, + namespace: str, + name: str, + logger: logging.Logger, + **kwargs: Any, + ): + await create_crate_scc(namespace, name, logger) + + +class CreateCrateServiceAccountSubHandler(StateBasedSubHandler): + @crate.on.error(error_handler=crate.send_create_failed_notification) + async def handle( + self, + namespace: str, + name: str, + owner_references: Optional[List[V1OwnerReference]], + cratedb_labels: LabelType, + logger: logging.Logger, + **kwargs: Any, + ): + await create_crate_service_account( + owner_references, namespace, name, cratedb_labels, logger + ) + + async def create_statefulset( owner_references: Optional[List[V1OwnerReference]], namespace: str, @@ -1117,6 +1316,44 @@ def get_discovery_service( ) +def get_crate_control_service( + owner_references: Optional[List[V1OwnerReference]], + name: str, + namespace: str, + labels: Dict[str, str], + port: int = CRATE_CONTROL_PORT, +) -> V1Service: + """ + Create a headless service that exposes the crate-control sidecar of a + single CrateDB pod. + """ + return V1Service( + metadata=V1ObjectMeta( + name=f"crate-control-{name}", + namespace=namespace, + labels=labels, + owner_references=owner_references, + ), + spec=V1ServiceSpec( + cluster_ip="None", + publish_not_ready_addresses=True, + ports=[ + V1ServicePort( + name="control", + port=port, + target_port="control", + protocol="TCP", + ) + ], + selector={ + LABEL_COMPONENT: "cratedb", + LABEL_NAME: name, + }, + type="ClusterIP", + ), + ) + + async def create_services( owner_references: Optional[List[V1OwnerReference]], namespace: str, @@ -1163,6 +1400,17 @@ async def create_services( ), ) + if config.CLOUD_PROVIDER == CloudProvider.OPENSHIFT: + await call_kubeapi( + core.create_namespaced_service, + logger, + continue_on_conflict=True, + namespace=namespace, + body=get_crate_control_service( + owner_references, name, namespace, labels + ), + ) + async def recreate_services( namespace: str, @@ -1178,16 +1426,7 @@ async def recreate_services( cratedb_labels = build_cratedb_labels(name, meta) - owner_references = [ - V1OwnerReference( - api_version=f"{API_GROUP}/v1", - block_owner_deletion=True, - controller=True, - kind="CrateDB", - name=name, - uid=meta["uid"], - ) - ] + owner_references = get_owner_references(name, meta) source_ranges = spec["cluster"].get("allowedCIDRs", None) additional_annotations = ( @@ -1238,6 +1477,24 @@ def get_gc_user_secret( ) +def get_crate_control_secret( + owner_references: Optional[List[V1OwnerReference]], + name: str, + labels: LabelType, +) -> V1Secret: + return V1Secret( + metadata=V1ObjectMeta( + name=f"crate-control-{name}", + labels=labels, + owner_references=owner_references, + ), + data={ + "token": b64encode(gen_password(50)), + }, + type="Opaque", + ) + + async def create_system_user( owner_references: Optional[List[V1OwnerReference]], namespace: str, @@ -1261,6 +1518,24 @@ async def create_system_user( ) +async def create_crate_control_secret( + owner_references: Optional[List[V1OwnerReference]], + namespace: str, + name: str, + labels: LabelType, + logger: logging.Logger, +) -> None: + async with GlobalApiClient() as api_client: + core = CoreV1Api(api_client) + await call_kubeapi( + core.create_namespaced_secret, + logger, + continue_on_conflict=True, + namespace=namespace, + body=get_crate_control_secret(owner_references, name, labels), + ) + + def is_shared_resources_cluster(node_spec: Dict[str, Any]) -> bool: return node_spec.get("nodepool") == Nodepool.SHARED @@ -1293,10 +1568,16 @@ def get_cluster_resource_limits( def get_owner_references(name: str, meta: kopf.Meta) -> List[V1OwnerReference]: + """ + Create owner references with blockOwnerDeletion set based on cloud provider. + + OpenShift's StatefulSet controller doesn't have permission to set finalizers + on PVCs, so we disable blockOwnerDeletion in that environment. + """ return [ V1OwnerReference( api_version=f"{API_GROUP}/v1", - block_owner_deletion=True, + block_owner_deletion=(config.CLOUD_PROVIDER != CloudProvider.OPENSHIFT), controller=True, kind="CrateDB", name=name, @@ -1364,6 +1645,22 @@ async def handle( # type: ignore ) +class CreateCrateControlSubHandler(StateBasedSubHandler): + @crate.on.error(error_handler=crate.send_create_failed_notification) + async def handle( # type: ignore + self, + namespace: str, + name: str, + owner_references: Optional[List[V1OwnerReference]], + cratedb_labels: LabelType, + logger: logging.Logger, + **kwargs: Any, + ): + await create_crate_control_secret( + owner_references, namespace, name, cratedb_labels, logger + ) + + class CreateServicesSubHandler(StateBasedSubHandler): @crate.on.error(error_handler=crate.send_create_failed_notification) async def handle( # type: ignore diff --git a/crate/operator/handlers/handle_create_cratedb.py b/crate/operator/handlers/handle_create_cratedb.py index 30cc4b9e..f44885da 100644 --- a/crate/operator/handlers/handle_create_cratedb.py +++ b/crate/operator/handlers/handle_create_cratedb.py @@ -23,18 +23,21 @@ import logging import kopf -from kubernetes_asyncio.client import V1OwnerReference from crate.operator.backup import CreateBackupsSubHandler from crate.operator.bootstrap import CreateUsersSubHandler from crate.operator.config import config -from crate.operator.constants import API_GROUP, CLUSTER_CREATE_ID, Port +from crate.operator.constants import CLUSTER_CREATE_ID, CloudProvider, Port from crate.operator.create import ( + CreateCrateControlSubHandler, + CreateCrateSCCSubHandler, + CreateCrateServiceAccountSubHandler, CreateServicesSubHandler, CreateSqlExporterConfigSubHandler, CreateStatefulsetSubHandler, CreateSystemUserSubHandler, build_cratedb_labels, + get_owner_references, ) from crate.operator.operations import get_master_nodes_names, get_total_nodes_count from crate.operator.utils.secrets import get_image_pull_secrets @@ -53,16 +56,7 @@ async def create_cratedb( name = meta["name"] cratedb_labels = build_cratedb_labels(name, meta) - owner_references = [ - V1OwnerReference( - api_version=f"{API_GROUP}/v1", - block_owner_deletion=True, - controller=True, - kind="CrateDB", - name=name, - uid=meta["uid"], - ) - ] + owner_references = get_owner_references(name, meta) image_pull_secrets = get_image_pull_secrets() @@ -103,6 +97,26 @@ async def create_cratedb( id="system_user", ) + if config.CLOUD_PROVIDER == CloudProvider.OPENSHIFT: + kopf.register( + fn=CreateCrateSCCSubHandler(namespace, name, hash, context)(), + id="crate_scc", + ) + + kopf.register( + fn=CreateCrateServiceAccountSubHandler(namespace, name, hash, context)( + cratedb_labels=cratedb_labels, owner_references=owner_references + ), + id="crate_service_account", + ) + + kopf.register( + fn=CreateCrateControlSubHandler(namespace, name, hash, context)( + cratedb_labels=cratedb_labels, owner_references=owner_references + ), + id="crate_control", + ) + kopf.register( fn=CreateServicesSubHandler(namespace, name, hash, context)( owner_references=owner_references, diff --git a/crate/operator/handlers/handle_delete_cratedb.py b/crate/operator/handlers/handle_delete_cratedb.py new file mode 100644 index 00000000..8fc78b8c --- /dev/null +++ b/crate/operator/handlers/handle_delete_cratedb.py @@ -0,0 +1,81 @@ +# CrateDB Kubernetes Operator +# +# Licensed to Crate.IO GmbH ("Crate") under one or more contributor +# license agreements. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. Crate licenses +# this file to you under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. You may +# obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# However, if you have executed another commercial license agreement +# with Crate these terms will supersede the license and you may use the +# software solely pursuant to the terms of the relevant commercial agreement. + +import logging + +from kubernetes_asyncio.client import ApiException, CustomObjectsApi + +from crate.operator.config import config +from crate.operator.constants import CloudProvider +from crate.operator.utils.k8s_api_client import GlobalApiClient + + +async def delete_cratedb( + namespace: str, + name: str, + logger: logging.Logger, +) -> None: + """ + Clean up cluster-scoped resources that cannot be garbage-collected + via Kubernetes owner references. + + Namespace-scoped resources (Services, Secrets, StatefulSets, etc.) + are cleaned up automatically via owner references. However, + SecurityContextConstraints are cluster-scoped and require explicit + deletion here. + """ + await _delete_crate_scc(namespace, name, logger) + + +async def _delete_crate_scc( + namespace: str, + name: str, + logger: logging.Logger, +) -> None: + """ + Delete the OpenShift SecurityContextConstraint for a CrateDB cluster. + """ + if config.CLOUD_PROVIDER != CloudProvider.OPENSHIFT: + return + + scc_name = f"crate-anyuid-{namespace}-{name}" + + async with GlobalApiClient() as api_client: + custom_api = CustomObjectsApi(api_client) + try: + await custom_api.delete_cluster_custom_object( + group="security.openshift.io", + version="v1", + plural="securitycontextconstraints", + name=scc_name, + ) + logger.info("Deleted SCC %s", scc_name) + except ApiException as e: + if e.status == 404: + logger.info("SCC %s already deleted", scc_name) + else: + logger.warning( + "Could not delete SCC %s (status=%s reason=%s) — " + "it may need to be removed manually", + scc_name, + e.status, + e.reason, + ) diff --git a/crate/operator/main.py b/crate/operator/main.py index a317c137..d3c95fbe 100644 --- a/crate/operator/main.py +++ b/crate/operator/main.py @@ -38,6 +38,7 @@ ) from crate.operator.handlers.handle_create_cratedb import create_cratedb from crate.operator.handlers.handle_create_grand_central import create_grand_central +from crate.operator.handlers.handle_delete_cratedb import delete_cratedb from crate.operator.handlers.handle_notify_external_ip_changed import ( external_ip_changed, ) @@ -149,6 +150,22 @@ async def cluster_create( await create_cratedb(namespace, meta, spec, patch, status, logger) +@kopf.on.delete(API_GROUP, "v1", RESOURCE_CRATEDB, annotations=annotation_filter()) +async def cluster_delete( + namespace: str, + name: str, + logger: logging.Logger, + **_kwargs, +): + """ + Handles deletion of CrateDB Clusters. + + Cleans up cluster-scoped resources that cannot be garbage-collected + via owner references (e.g. OpenShift SecurityContextConstraints). + """ + await delete_cratedb(namespace, name, logger) + + @kopf.on.update( API_GROUP, "v1", diff --git a/crate/operator/operations.py b/crate/operator/operations.py index f7b47196..33608ba6 100644 --- a/crate/operator/operations.py +++ b/crate/operator/operations.py @@ -27,11 +27,9 @@ import kopf import yaml -from aiohttp.client_exceptions import WSServerHandshakeError from aiopg import Cursor from kopf import TemporaryError from kubernetes_asyncio.client import ( - ApiException, AppsV1Api, BatchV1Api, CoreV1Api, @@ -48,7 +46,6 @@ V1StatefulSetList, ) from kubernetes_asyncio.client.models.v1_delete_options import V1DeleteOptions -from kubernetes_asyncio.stream import WsApiClient from psycopg2 import DatabaseError, OperationalError from psycopg2.extensions import quote_ident @@ -73,6 +70,7 @@ ) from crate.operator.create import recreate_services from crate.operator.grand_central import read_grand_central_deployment +from crate.operator.sql import execute_sql from crate.operator.utils import crate from crate.operator.utils.jwt import crate_version_supports_jwt from crate.operator.utils.k8s_api_client import GlobalApiClient @@ -882,63 +880,6 @@ def get_crash_scheme(spec: dict) -> str: return "https" if "ssl" in spec["spec"]["cluster"] else "http" -async def run_crash_command( - namespace: str, - pod_name: str, - scheme: str, - command: str, - logger, - delay: int = 30, -): - """ - This connects to a CrateDB pod and executes a crash command in the - ``crate`` container. It returns the result of the execution. - - :param namespace: The Kubernetes namespace of the CrateDB cluster. - :param pod_name: The pod name where the command should be run. - :param scheme: The host scheme for running the command. - :param command: The SQL query that should be run. - :param logger: the logger on which we're logging - :param delay: Time in seconds between the retries when executing - the query. - """ - async with WsApiClient() as ws_api_client: - core_ws = CoreV1Api(ws_api_client) - try: - exception_logger = logger.exception if config.TESTING else logger.error - crash_command = [ - "crash", - "--verify-ssl=false", - f"--host={scheme}://localhost:4200", - "-c", - command, - ] - result = await core_ws.connect_get_namespaced_pod_exec( - namespace=namespace, - name=pod_name, - command=crash_command, - container="crate", - stderr=True, - stdin=False, - stdout=True, - tty=False, - ) - except ApiException as e: - # We don't use `logger.exception()` to not accidentally include sensitive - # data in the log messages which might be part of the string - # representation of the exception. - exception_logger("... failed. Status: %s Reason: %s", e.status, e.reason) - raise kopf.TemporaryError(delay=delay) - except WSServerHandshakeError as e: - # We don't use `logger.exception()` to not accidentally include sensitive - # data in the log messages which might be part of the string - # representation of the exception. - exception_logger("... failed. Status: %s Message: %s", e.status, e.message) - raise kopf.TemporaryError(delay=delay) - else: - return result - - class RestartSubHandler(StateBasedSubHandler): @crate.on.error(error_handler=crate.send_update_failed_notification) @crate.timeout(timeout=float(config.ROLLING_RESTART_TIMEOUT)) @@ -1210,7 +1151,11 @@ async def set_cronjob_delay(patch): async def set_user_jwt( - cursor: Cursor, namespace: str, name: str, username: str, logger: logging.Logger + cursor: Cursor, + namespace: str, + name: str, + username: str, + logger: logging.Logger, ) -> None: """ Set JWT auth properties for a given username @@ -1220,6 +1165,7 @@ async def set_user_jwt( :param namespace: The Kubernetes namespace of the CrateDB cluster. :param name: The CrateDB custom resource name defining the CrateDB cluster. :param username: The name of the user the JWT properties should be set for. + :param logger: Logger for operation tracking. """ cratedb = await get_cratedb_resource(namespace, name) await cursor.execute( @@ -1230,22 +1176,67 @@ async def set_user_jwt( username_ident = quote_ident(username, cursor._impl) iss = cratedb["spec"].get("grandCentral", {}).get("jwkUrl") - if user_exists and iss: - query = ( - f"ALTER USER {username_ident} SET " - f"""(jwt = {{"iss" = '{iss}', "username" = '{username}', """ - f""""aud" = '{name}'}})""" + + if not (user_exists and iss): + return + + pod_name = get_crash_pod_name(cratedb, name) + scheme = get_crash_scheme(cratedb) + + # Step 1: Reset JWT to NULL first + # This prevents RoleAlreadyExistsException when restoring snapshots + # where the user might have JWT properties with different aud/iss + logger.info("Resetting JWT auth properties for user '%s'", username) + reset_query = f"ALTER USER {username_ident} SET (jwt = NULL)" + logger.info("... executing query: %s", reset_query) + + reset_result = await execute_sql( + namespace=namespace, + name=name, + pod_name=pod_name, + scheme=scheme, + sql=reset_query, + args=None, + logger=logger, + ) + logger.info("... result: %s", reset_result) + + if (reset_result.rowcount or 0) > 0: + logger.info("... JWT reset successful") + else: + logger.info( + "... JWT reset had no effect (might not have been set). %s", reset_result ) - pod_name = get_crash_pod_name(cratedb, name) - scheme = get_crash_scheme(cratedb) - result = await run_crash_command(namespace, pod_name, scheme, query, logger) - if "ALTER OK" in result: - logger.info("... success") - else: - logger.info("... error. %s", result) - # Continue if the same JWT properties are already set - if "RoleAlreadyExistsException" not in result: - raise kopf.TemporaryError(delay=config.BOOTSTRAP_RETRY_DELAY) + + # Step 2: Set new JWT properties with current cluster's aud + logger.info("Setting JWT auth properties for user '%s'", username) + set_query = ( + f"ALTER USER {username_ident} SET " + f"""(jwt = {{"iss" = '{iss}', "username" = '{username}', """ + f""""aud" = '{name}'}})""" + ) + logger.info("... executing query: %s", set_query) + + result = await execute_sql( + namespace=namespace, + name=name, + pod_name=pod_name, + scheme=scheme, + sql=set_query, + args=None, + logger=logger, + ) + logger.info("... result: %s", result) + + if (result.rowcount or 0) > 0: + logger.info("... success") + else: + logger.info("... error. %s", result) + if ( + not result.error_message + or "RoleAlreadyExistsException" not in result.error_message + ): + raise kopf.TemporaryError(delay=config.BOOTSTRAP_RETRY_DELAY) class StartClusterSubHandler(StateBasedSubHandler): diff --git a/crate/operator/restore_backup.py b/crate/operator/restore_backup.py index ab99fc72..d8807781 100644 --- a/crate/operator/restore_backup.py +++ b/crate/operator/restore_backup.py @@ -22,7 +22,6 @@ import abc import asyncio import logging -import re from contextlib import asynccontextmanager from dataclasses import fields from typing import Any, Dict, List, Optional, Tuple @@ -55,9 +54,9 @@ set_cluster_setting, ) from crate.operator.operations import ( + execute_sql, get_crash_pod_name, get_crash_scheme, - run_crash_command, scale_backup_metrics_deployment, suspend_or_start_grand_central, ) @@ -150,19 +149,36 @@ async def ensure_no_restore_in_progress( """ command = ( - "SELECT * FROM sys.snapshot_restore WHERE " - f"name='{snapshot}' AND state NOT IN ('SUCCESS', 'FAILURE')" + "SELECT 1 FROM sys.snapshot_restore WHERE " + f"name='{snapshot}' AND state NOT IN ('SUCCESS', 'FAILURE') LIMIT 1;" ) - result = await run_crash_command(namespace, pod_name, scheme, command, logger) - if snapshot in result: + result = await execute_sql( + namespace=namespace, + name=name, + pod_name=pod_name, + scheme=scheme, + sql=command, + args=[], + logger=logger, + ) + if (result.rowcount or 0) > 0: progress_command = ( "SELECT min(recovery['size']['percent']) FROM sys.shards " "where state='RECOVERING' and recovery['type']='SNAPSHOT';" ) - result = await run_crash_command( - namespace, pod_name, scheme, progress_command, logger + progress = await execute_sql( + namespace=namespace, + name=name, + pod_name=pod_name, + scheme=scheme, + sql=progress_command, + args=[], + logger=logger, ) - pct = int(re.findall(r"(\d+)", result)[0]) or 0 + if not progress.rows or not progress.rows[0]: + pct = 0 + else: + pct = int(progress.rows[0][0] or 0) await send_operation_progress_notification( namespace=namespace, name=name, @@ -798,7 +814,7 @@ async def handle( # type: ignore # Reset the system user with the password from the CRD await self._reset_user_password( - SYSTEM_USERNAME, password, namespace, pod_name, scheme, logger + SYSTEM_USERNAME, password, namespace, name, pod_name, scheme, logger ) await self._restore_gc_admin_password( @@ -817,7 +833,13 @@ async def _restore_gc_admin_password( try: gc_admin_password = await get_gc_user_password(core, namespace, name) await self._reset_user_password( - GC_USERNAME, gc_admin_password, namespace, pod_name, scheme, logger + GC_USERNAME, + gc_admin_password, + namespace, + name, + pod_name, + scheme, + logger, ) except kopf.TemporaryError as e: logger.warning("GC admin password reset failed; will retry: %s", e) @@ -832,14 +854,23 @@ async def _reset_user_password( username: str, password: str, namespace: str, + name: str, pod_name: str, scheme: str, logger: logging.Logger, ): password_quoted = QuotedString(password).getquoted().decode() command = f'ALTER USER "{username}" SET (password={password_quoted});' - result = await run_crash_command(namespace, pod_name, scheme, command, logger) - if "ALTER OK" in result: + result = await execute_sql( + namespace=namespace, + name=name, + pod_name=pod_name, + scheme=scheme, + sql=command, + args=[], + logger=logger, + ) + if (result.rowcount or 0) > 0: logger.info("... %s password reset success", username) else: logger.info("... %s password reset error. %s", username, result) diff --git a/crate/operator/sql.py b/crate/operator/sql.py new file mode 100644 index 00000000..38cad499 --- /dev/null +++ b/crate/operator/sql.py @@ -0,0 +1,261 @@ +import json +import logging +import re +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Tuple + +import httpx +import kopf +from aiohttp.client_exceptions import WSServerHandshakeError +from kubernetes_asyncio.client import ApiException, CoreV1Api +from kubernetes_asyncio.stream import WsApiClient + +from crate.operator.config import config +from crate.operator.constants import CRATE_CONTROL_PORT, CloudProvider +from crate.operator.utils.k8s_api_client import GlobalApiClient +from crate.operator.utils.kubeapi import resolve_secret_key_ref + + +@dataclass +class SQLResult: + rowcount: int | None + rows: list | None + error_code: int | None + error_message: str | None + + @property + def ok(self) -> bool: + return self.error_code is None and self.error_message is None + + +def normalize_crate_control(resp: dict) -> SQLResult: + err = None + + if "error" in resp: + err = resp["error"] + elif "detail" in resp: + detail = resp["detail"] + + if isinstance(detail, str): + try: + parsed = json.loads(detail) + if isinstance(parsed, dict) and "error" in parsed: + err = parsed["error"] + else: + err = {"message": detail} + except Exception: + err = {"message": detail} + + if err is not None: + if isinstance(err, str): + try: + err = json.loads(err) + except Exception: + return SQLResult( + rowcount=None, + rows=None, + error_code=None, + error_message=err, + ) + + return SQLResult( + rowcount=None, + rows=None, + error_code=err.get("code"), + error_message=err.get("message"), + ) + + return SQLResult( + rowcount=resp.get("rowcount"), + rows=resp.get("rows"), + error_code=None, + error_message=None, + ) + + +def _convert_cell(value: str) -> Any: + """ + Convert crash cell value to int/float/bool/None if possible. + """ + if value == "": + return None + + if re.fullmatch(r"-?\d+", value): + return int(value) + + if re.fullmatch(r"-?\d+\.\d+", value): + return float(value) + + if value.lower() == "true": + return True + if value.lower() == "false": + return False + + return value + + +def parse_crash_table(output: str) -> List[Tuple[Any, ...]]: + lines = [line.rstrip() for line in output.splitlines() if line.strip()] + + data_lines = [line for line in lines if line.startswith("|") and line.endswith("|")] + + if len(data_lines) < 2: + return [] + + _ = data_lines[0] + rows = data_lines[1:] + + parsed_rows = [] + for row in rows: + cells = [cell.strip() for cell in row[1:-1].split("|")] + parsed_rows.append(tuple(_convert_cell(cell) for cell in cells)) + + return parsed_rows + + +def normalize_crash(output: str) -> SQLResult: + if "Exception" in output: + return SQLResult( + rowcount=None, + rows=None, + error_code=None, + error_message=output, + ) + + if "ALTER OK" in output or "CREATE OK" in output or "GRANT OK" in output: + return SQLResult(rowcount=1, rows=None, error_code=None, error_message=None) + + # SELECT output + rows = parse_crash_table(output) + return SQLResult(rowcount=len(rows), rows=rows, error_code=None, error_message=None) + + +async def run_crash_command( + namespace: str, + pod_name: str, + scheme: str, + command: str, + logger, + delay: int = 30, +): + """ + This connects to a CrateDB pod and executes a crash command in the + ``crate`` container. It returns the result of the execution. + + :param namespace: The Kubernetes namespace of the CrateDB cluster. + :param pod_name: The pod name where the command should be run. + :param scheme: The host scheme for running the command. + :param command: The SQL query that should be run. + :param logger: the logger on which we're logging + :param delay: Time in seconds between the retries when executing + the query. + """ + async with WsApiClient() as ws_api_client: + core_ws = CoreV1Api(ws_api_client) + try: + exception_logger = logger.exception if config.TESTING else logger.error + crash_command = [ + "crash", + "--verify-ssl=false", + f"--host={scheme}://localhost:4200", + "-c", + command, + ] + result = await core_ws.connect_get_namespaced_pod_exec( + namespace=namespace, + name=pod_name, + command=crash_command, + container="crate", + stderr=True, + stdin=False, + stdout=True, + tty=False, + ) + except ApiException as e: + # We don't use `logger.exception()` to not accidentally include sensitive + # data in the log messages which might be part of the string + # representation of the exception. + exception_logger("... failed. Status: %s Reason: %s", e.status, e.reason) + raise kopf.TemporaryError(delay=delay) + except WSServerHandshakeError as e: + # We don't use `logger.exception()` to not accidentally include sensitive + # data in the log messages which might be part of the string + # representation of the exception. + exception_logger("... failed. Status: %s Message: %s", e.status, e.message) + raise kopf.TemporaryError(delay=delay) + else: + return result + + +async def execute_sql_via_crate_control( + *, + namespace: str, + name: str, + sql: str, + args: Optional[List[Any]], + logger: logging.Logger, +): + control_host = f"crate-control-{name}.{namespace}.svc.cluster.local" + url = f"http://{control_host}:{CRATE_CONTROL_PORT}/exec" + + async with GlobalApiClient() as api_client: + core = CoreV1Api(api_client) + bootstrap_token = await resolve_secret_key_ref( + core, + namespace, + {"name": f"crate-control-{name}", "key": "token"}, + ) + logger.info("Resolved crate-control bootstrap token") + + payload: Dict[str, Any] = {"stmt": sql} + if args is not None: + payload["args"] = args + logger.info("Sending SQL to crate-control sidecar: %s", sql[:80]) + + async with httpx.AsyncClient(timeout=5.0) as client: + resp = await client.post( + url, + headers={"token": bootstrap_token}, + json=payload, + ) + + if resp.status_code >= 500: + raise kopf.TemporaryError("Sidecar unavailable", delay=5) + + data = resp.json() + if resp.status_code >= 400: + logger.info("Sidecar returned %s: %s", resp.status_code, data) + + return data + + +async def execute_sql( + *, + namespace: str, + name: str, + pod_name: str, + scheme: str, + sql: str, + args: list | None, + logger: logging.Logger, +) -> SQLResult: + if config.CLOUD_PROVIDER == CloudProvider.OPENSHIFT: + logger.info("Using sidecar SQL execution") + resp = await execute_sql_via_crate_control( + namespace=namespace, + name=name, + sql=sql, + args=args, + logger=logger, + ) + return normalize_crate_control(resp) + else: + logger.info("Using legacy pod_exec SQL execution") + raw = await run_crash_command( + namespace=namespace, + pod_name=pod_name, + scheme=scheme, + command=sql, + logger=logger, + ) + return normalize_crash(raw) diff --git a/crate/operator/update_user_password.py b/crate/operator/update_user_password.py index 33bc69d8..a0c2a614 100644 --- a/crate/operator/update_user_password.py +++ b/crate/operator/update_user_password.py @@ -29,6 +29,8 @@ from kubernetes_asyncio.stream import WsApiClient from crate.operator.config import config +from crate.operator.constants import CloudProvider +from crate.operator.sql import execute_sql_via_crate_control, normalize_crate_control from crate.operator.utils.formatting import b64decode from crate.operator.utils.jwt import crate_version_supports_jwt from crate.operator.utils.kubeapi import get_cratedb_resource @@ -36,7 +38,6 @@ from crate.operator.webhooks import WebhookAction, WebhookOperation, WebhookStatus -# update_user_password(host, username, old_password, new_password) async def update_user_password( namespace: str, cluster_id: str, @@ -66,6 +67,132 @@ async def update_user_password( password = b64decode(new_password) cratedb = await get_cratedb_resource(namespace, cluster_id) crate_version = cratedb["spec"]["cluster"]["version"] + exception_logger = logger.exception if config.TESTING else logger.error + + if config.CLOUD_PROVIDER == CloudProvider.OPENSHIFT: + await _update_user_password_via_sidecar( + namespace=namespace, + name=cluster_id, + cluster_id=cluster_id, + username=username, + password=password, + cratedb=cratedb, + crate_version=crate_version, + logger=logger, + exception_logger=exception_logger, + ) + else: + await _update_user_password_via_pod_exec( + namespace=namespace, + pod_name=pod_name, + cluster_id=cluster_id, + username=username, + password=password, + cratedb=cratedb, + crate_version=crate_version, + scheme=scheme, + logger=logger, + exception_logger=exception_logger, + ) + + await send_operation_progress_notification( + namespace=namespace, + name=cluster_id, + message="Password updated successfully.", + logger=logger, + status=WebhookStatus.SUCCESS, + operation=WebhookOperation.UPDATE, + action=WebhookAction.PASSWORD_UPDATE, + ) + + +async def _update_user_password_via_sidecar( + namespace: str, + name: str, + cluster_id: str, + username: str, + password: str, + cratedb: dict, + crate_version: str, + logger: logging.Logger, + exception_logger, +) -> None: + """ + Update user password using the crate-control sidecar (OpenShift path). + """ + iss = cratedb["spec"].get("grandCentral", {}).get("jwkUrl") + if crate_version_supports_jwt(crate_version) and iss: + stmt_reset_jwt = f'ALTER USER "{username}" SET (jwt = NULL)' + try: + logger.info("Resetting JWT config for user %s ...", username) + result = normalize_crate_control( + await execute_sql_via_crate_control( + namespace=namespace, + name=name, + sql=stmt_reset_jwt, + args=[], + logger=logger, + ) + ) + except Exception as e: + exception_logger("Failed to reset JWT for user %s: %s", username, str(e)) + raise _temporary_error() + else: + if (result.rowcount or 0) > 0: + logger.info("... JWT reset success") + else: + logger.info("... JWT reset error: %s", result) + raise _temporary_error() + + stmt_update = ( + f'ALTER USER "{username}" SET ' + f'(password = ?, jwt = {{"iss" = ?, "username" = ?, "aud" = ?}})' + ) + args = [password, iss, username, cluster_id] + else: + stmt_update = f'ALTER USER "{username}" SET (password = ?)' + args = [password] + + try: + logger.info("Updating password for user %s ...", username) + result = normalize_crate_control( + await execute_sql_via_crate_control( + namespace=namespace, + name=name, + sql=stmt_update, + args=args, + logger=logger, + ) + ) + except Exception as e: + exception_logger("Password update failed for user %s: %s", username, str(e)) + raise _temporary_error() + else: + if (result.rowcount or 0) > 0: + logger.info("... password update success") + else: + logger.info("... password update error: %s", result) + raise _temporary_error() + + +async def _update_user_password_via_pod_exec( + namespace: str, + pod_name: str, + cluster_id: str, + username: str, + password: str, + cratedb: dict, + crate_version: str, + scheme: str, + logger: logging.Logger, + exception_logger, +) -> None: + """ + Update user password using pod_exec with curl (legacy path). + + Uses curl with a JSON body so that parameterised args are correctly + substituted by CrateDB's HTTP API, matching the original behaviour. + """ def get_curl_command(payload: dict) -> List[str]: return [ @@ -83,16 +210,18 @@ def get_curl_command(payload: dict) -> List[str]: ] async def pod_exec(cmd): - return await core_ws.connect_get_namespaced_pod_exec( - namespace=namespace, - name=pod_name, - command=cmd, - container="crate", - stderr=True, - stdin=False, - stdout=True, - tty=False, - ) + async with WsApiClient() as ws_api_client: + core_ws = CoreV1Api(ws_api_client) + return await core_ws.connect_get_namespaced_pod_exec( + namespace=namespace, + name=pod_name, + command=cmd, + container="crate", + stderr=True, + stdin=False, + stdout=True, + tty=False, + ) command_alter_user = get_curl_command( { @@ -100,61 +229,20 @@ async def pod_exec(cmd): "args": [password], } ) - exception_logger = logger.exception if config.TESTING else logger.error - - async with WsApiClient() as ws_api_client: - core_ws = CoreV1Api(ws_api_client) - iss = cratedb["spec"].get("grandCentral", {}).get("jwkUrl") - if crate_version_supports_jwt(crate_version) and iss: - # For users with `jwt` and `password` set, we need to reset - # `jwt` config first to be able to update the password. - command_reset_user_jwt = get_curl_command( - { - "stmt": 'ALTER USER "{}" SET (jwt = NULL)'.format(username), - "args": [], - } - ) - try: - logger.info("Trying to reset user jwt config ...") - result = await pod_exec(command_reset_user_jwt) - except ApiException as e: - exception_logger( - "... failed. Status: %s Reason: %s", e.status, e.reason - ) - raise _temporary_error() - except TemporaryError: - raise - except WSServerHandshakeError as e: - exception_logger( - "... failed. Status: %s Message: %s", e.status, e.message - ) - raise _temporary_error() - except Exception as e: - exception_logger( - "... failed. Unexpected exception. Class: %s. Message: %s", - type(e).__name__, - str(e), - ) - raise _temporary_error() - else: - if "rowcount" in result: - logger.info("... success") - command_alter_user = get_curl_command( - { - "stmt": ( - 'ALTER USER "{}" SET (password = $1, jwt = ' - '{{"iss" = $2, "username" = $3, "aud" = $4}})' - ).format(username), - "args": [password, iss, username, cluster_id], - } - ) - else: - logger.info("... error. %s", result) - raise _temporary_error() + iss = cratedb["spec"].get("grandCentral", {}).get("jwkUrl") + if crate_version_supports_jwt(crate_version) and iss: + # For users with `jwt` and `password` set, we need to reset + # `jwt` config first to be able to update the password. + command_reset_user_jwt = get_curl_command( + { + "stmt": 'ALTER USER "{}" SET (jwt = NULL)'.format(username), + "args": [], + } + ) try: - logger.info("Trying to update user password ...") - result = await pod_exec(command_alter_user) + logger.info("Trying to reset user jwt config ...") + result = await pod_exec(command_reset_user_jwt) except ApiException as e: exception_logger("... failed. Status: %s Reason: %s", e.status, e.reason) raise _temporary_error() @@ -165,7 +253,7 @@ async def pod_exec(cmd): raise _temporary_error() except Exception as e: exception_logger( - "... failed. Unexpected exception was raised. Class: %s. Message: %s", + "... failed. Unexpected exception. Class: %s. Message: %s", type(e).__name__, str(e), ) @@ -173,19 +261,43 @@ async def pod_exec(cmd): else: if "rowcount" in result: logger.info("... success") + command_alter_user = get_curl_command( + { + "stmt": ( + 'ALTER USER "{}" SET (password = $1, jwt = ' + '{{"iss" = $2, "username" = $3, "aud" = $4}})' + ).format(username), + "args": [password, iss, username, cluster_id], + } + ) else: logger.info("... error. %s", result) raise _temporary_error() - await send_operation_progress_notification( - namespace=namespace, - name=cluster_id, - message="Password updated successfully.", - logger=logger, - status=WebhookStatus.SUCCESS, - operation=WebhookOperation.UPDATE, - action=WebhookAction.PASSWORD_UPDATE, + try: + logger.info("Trying to update user password ...") + result = await pod_exec(command_alter_user) + except ApiException as e: + exception_logger("... failed. Status: %s Reason: %s", e.status, e.reason) + raise _temporary_error() + except TemporaryError: + raise + except WSServerHandshakeError as e: + exception_logger("... failed. Status: %s Message: %s", e.status, e.message) + raise _temporary_error() + except Exception as e: + exception_logger( + "... failed. Unexpected exception was raised. Class: %s. Message: %s", + type(e).__name__, + str(e), ) + raise _temporary_error() + else: + if "rowcount" in result: + logger.info("... success") + else: + logger.info("... error. %s", result) + raise _temporary_error() def _temporary_error(): diff --git a/deploy/charts/crate-operator-crds/Chart.yaml b/deploy/charts/crate-operator-crds/Chart.yaml index 499f8b80..81f1049d 100644 --- a/deploy/charts/crate-operator-crds/Chart.yaml +++ b/deploy/charts/crate-operator-crds/Chart.yaml @@ -6,13 +6,13 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 2.58.0 +version: 2.59.0-beta.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "2.58.0" +appVersion: "2.59.0-beta.1" maintainers: - - name: Crate.io \ No newline at end of file + - name: Crate.io diff --git a/deploy/charts/crate-operator/Chart.yaml b/deploy/charts/crate-operator/Chart.yaml index 0ef0df4e..e6f0195a 100644 --- a/deploy/charts/crate-operator/Chart.yaml +++ b/deploy/charts/crate-operator/Chart.yaml @@ -6,19 +6,19 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 2.58.0 +version: 2.59.0-beta.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "2.58.0" +appVersion: "2.59.0-beta.1" dependencies: - name: crate-operator-crds - version: 2.58.0 + version: 2.59.0-beta.1 repository: "file://../crate-operator-crds" condition: crate-operator-crds.enabled maintainers: - - name: Crate.io \ No newline at end of file + - name: Crate.io diff --git a/deploy/charts/crate-operator/templates/deployment.yaml b/deploy/charts/crate-operator/templates/deployment.yaml index 28a3ebe2..0787c63b 100644 --- a/deploy/charts/crate-operator/templates/deployment.yaml +++ b/deploy/charts/crate-operator/templates/deployment.yaml @@ -40,9 +40,11 @@ spec: key: {{ $value.key }} {{- end }} {{- range $name, $value := .Values.env }} + {{- if $value }} - name: {{ $name }} value: "{{ $value }}" {{- end }} + {{- end }} resources: {{- toYaml .Values.resources | nindent 12 }} {{- with .Values.nodeSelector }} diff --git a/deploy/charts/crate-operator/templates/rbac.yaml b/deploy/charts/crate-operator/templates/rbac.yaml index ea92d0cb..2797d472 100644 --- a/deploy/charts/crate-operator/templates/rbac.yaml +++ b/deploy/charts/crate-operator/templates/rbac.yaml @@ -44,6 +44,7 @@ rules: - persistentvolumes - pods - secrets + - serviceaccounts - services - statefulsets - poddisruptionbudgets @@ -71,6 +72,18 @@ rules: verbs: - list - watch +- apiGroups: + - security.openshift.io + resources: + - securitycontextconstraints + verbs: + - create + - delete + - get + - list + - patch + - update + - watch --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/deploy/charts/crate-operator/values.yaml b/deploy/charts/crate-operator/values.yaml index 884109cb..e61c8d61 100644 --- a/deploy/charts/crate-operator/values.yaml +++ b/deploy/charts/crate-operator/values.yaml @@ -32,6 +32,8 @@ env: CRATEDB_OPERATOR_LOG_LEVEL: "INFO" CRATEDB_OPERATOR_ROLLING_RESTART_TIMEOUT: "3600" CRATEDB_OPERATOR_SCALING_TIMEOUT: "3600" + CRATEDB_OPERATOR_CLOUD_PROVIDER: "" + CRATEDB_OPERATOR_CRATE_CONTROL_IMAGE: "" envFromSecret: {} @@ -53,10 +55,10 @@ podAnnotations: {} resources: limits: cpu: 250m - memory: 128Mi + memory: 512Mi requests: cpu: 250m - memory: 128Mi + memory: 512Mi nodeSelector: {} diff --git a/deploy/rbac.yaml b/deploy/rbac.yaml index 6248ab99..405b67f0 100644 --- a/deploy/rbac.yaml +++ b/deploy/rbac.yaml @@ -69,6 +69,7 @@ rules: - persistentvolumes - pods - secrets + - serviceaccounts - services - statefulsets - poddisruptionbudgets @@ -96,6 +97,18 @@ rules: verbs: - list - watch +- apiGroups: + - security.openshift.io + resources: + - securitycontextconstraints + verbs: + - create + - delete + - get + - list + - patch + - update + - watch --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/docs/source/configuration.rst b/docs/source/configuration.rst index 20cf732f..2bbbf560 100644 --- a/docs/source/configuration.rst +++ b/docs/source/configuration.rst @@ -49,6 +49,16 @@ expected to use upper case letters and must be prefixed with - ``aws`` - ``azure`` + - ``gcp`` + - ``openshift`` (Red Hat OpenShift Container Platform) + + When set to ``openshift``, the operator will: + + - Use a sidecar container (``crate-control``) for SQL execution instead of + ``pod_exec`` to comply with OpenShift's security policies + - Create OpenShift-specific SecurityContextConstraints and ServiceAccounts + - Skip privileged init containers + - Adjust pod security contexts to allow CrateDB to run with required capabilities Under the hood, the operator will pass a ``zone`` attribute to all CrateDB nodes. This attribute can also be defined explicitly or override the one set @@ -65,6 +75,20 @@ expected to use upper case letters and must be prefixed with To set or override the attribute on a node type level, set it in ``.spec.nodes.master.settings`` or ``.spec.nodes.data.*.settings``. +.. envvar:: CRATE_CONTROL_IMAGE + + When running on OpenShift (``CLOUD_PROVIDER=openshift``), this variable + specifies the Docker image for the ``crate-control`` sidecar container. + The sidecar provides an HTTP endpoint for SQL execution, replacing the + ``pod_exec`` approach which is not allowed by OpenShift's security policies. + + This variable is **required** when ``CLOUD_PROVIDER`` is set to ``openshift``. + + Example value: ``your-registry.example.com/crate-control:latest`` + + The crate-control image can be built from the ``sidecars/cratecontrol/`` + directory in the operator repository. + .. envvar:: CLUSTER_BACKUP_IMAGE When enabling backups for a cluster, the operator deploys a Prometheus_ diff --git a/docs/source/index.rst b/docs/source/index.rst index 10baab49..5b8b469f 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -12,6 +12,7 @@ CrateDB_ inside Kubernetes_. configuration architecture concepts + openshift development changelog deprecation diff --git a/docs/source/openshift.rst b/docs/source/openshift.rst new file mode 100644 index 00000000..60af99b9 --- /dev/null +++ b/docs/source/openshift.rst @@ -0,0 +1,523 @@ +OpenShift Support +================= + +The CrateDB Kubernetes Operator supports deployment on Red Hat OpenShift +Container Platform 4.x. + +Overview +-------- + +OpenShift enforces stricter security policies than standard Kubernetes. +The operator adapts to these constraints when ``CLOUD_PROVIDER`` is set to +``openshift``: + +- **No ``pod_exec``**: OpenShift's restricted SCC does not permit + ``pod_exec`` by default. The operator deploys a lightweight HTTP sidecar + (``crate-control``) for SQL execution instead. +- **No privileged init containers**: The ``sysctl`` init container is + skipped. Kernel tuning is handled via TuneD or MachineConfig. +- **Custom SCC**: A per-cluster SecurityContextConstraint is created to + allow the ``SYS_CHROOT`` capability and ``RunAsAny`` UID policy. +- **PVC owner references**: ``blockOwnerDeletion`` is disabled on PVC + owner references because the StatefulSet controller lacks permission + to set finalizers on PVCs in OpenShift. +- **Lifecycle hooks disabled**: The ``postStart`` and ``preStop`` hooks + (used for ``dc_util`` routing and graceful decommission) are disabled + because they download binaries from an external server, which may not + be reachable from OpenShift pods. + +.. warning:: + + Disabling lifecycle hooks means that the ``preStop`` graceful + decommission (``dc_util --min-availability PRIMARIES``) is **not + performed** on OpenShift. During rolling updates or pod evictions, + shard availability may be temporarily reduced. Plan maintenance + windows accordingly and monitor shard replication status. + + +Architecture Changes +-------------------- + +**Sidecar-based SQL Execution** + A ``crate-control`` sidecar container runs alongside each CrateDB pod. + It exposes port 5050 with a ``/exec`` endpoint for SQL execution and a + ``/health`` endpoint for readiness probes. Communication is + authenticated via a randomly generated token stored in a Kubernetes + Secret (``crate-control-``). + +**Custom SecurityContextConstraints (SCC)** + The operator creates one SCC per CrateDB cluster, scoped to a dedicated + ServiceAccount. The SCC spec is as follows: + + .. code-block:: yaml + + allowPrivilegedContainer: false + allowedCapabilities: + - SYS_CHROOT + requiredDropCapabilities: + - KILL + - MKNOD + runAsUser: + type: RunAsAny # Required: CrateDB entrypoint starts as root + seLinuxContext: + type: MustRunAs + fsGroup: + type: RunAsAny + supplementalGroups: + type: RunAsAny + volumes: + - configMap + - downwardAPI + - emptyDir + - persistentVolumeClaim + - projected + - secret + users: + - "system:serviceaccount::crate-" + + **Why ``RunAsAny`` / root?** The upstream CrateDB container image uses + ``chroot`` in its entrypoint to set up the runtime environment, which + requires starting as UID 0. After ``chroot``, the process drops + privileges to UID 1000 (the ``crate`` user). The pod-level + ``securityContext`` is set to ``runAsUser: 0, fsGroup: 0``. + + **Why ``SYS_CHROOT``?** Required for the ``chroot`` syscall in the + CrateDB entrypoint. + + **Why drop ``KILL`` and ``MKNOD``?** These capabilities are not needed + by CrateDB and are dropped to reduce the attack surface. + +**Per-Cluster ServiceAccount** + A ServiceAccount ``crate-`` is created in the cluster's namespace + and bound to the SCC via the ``users`` field. This ServiceAccount has + owner references and will be garbage collected when the CrateDB CR is + deleted. + + +Prerequisites +------------- + +1. OpenShift Container Platform 4.12 or later +2. Cluster-admin privileges for installation (see + :ref:`operator-rbac-requirements` below) +3. Kernel parameters configured via MachineConfig or TuneD (see + :ref:`kernel-tuning` below) +4. ``crate-control`` sidecar image built and available in your registry +5. CrateDB container image available in your registry + + +.. _kernel-tuning: + +Configuring Kernel Parameters +----------------------------- + +CrateDB requires specific kernel parameters, most critically +``vm.max_map_count=262144``. On standard Kubernetes the operator sets +these via a privileged init container. On OpenShift, **you** must +configure them before deploying any CrateDB cluster and verify host defaults. + +Node Tuning Operator with machineConfigLabels (Recommended) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The Node Tuning Operator (NTO) can generate a ``MachineConfig`` +automatically when you use ``machineConfigLabels`` in the recommend +section. This applies kernel parameters at the node level via the +Machine Config Operator, guaranteeing the settings are in place before +any pod starts — with no race condition. + +Create the following ``Tuned`` resource: + +.. code-block:: yaml + + apiVersion: tuned.openshift.io/v1 + kind: Tuned + metadata: + name: cratedb-tuned + namespace: openshift-cluster-node-tuning-operator + spec: + profile: + - name: openshift-cratedb + data: | + [main] + summary=Optimize systems running CrateDB + include=openshift-node + [sysctl] + vm.max_map_count=262144 + recommend: + - machineConfigLabels: + machineconfiguration.openshift.io/role: worker + priority: 10 + profile: openshift-cratedb + +.. code-block:: console + + $ oc apply -f cratedb-tuned.yaml + +This targets all nodes in the ``worker`` MachineConfigPool. If you run +CrateDB on a dedicated pool (recommended for production), replace the +label value accordingly (e.g., ``machineconfiguration.openshift.io/role: +cratedb``). + +.. note:: + + You need access to the ``openshift-cluster-node-tuning-operator`` + namespace to create the Tuned resource. + +.. warning:: + + The generated MachineConfig triggers a **rolling reboot** of all + nodes in the matching pool. Apply this during a maintenance window. + +After the rollout completes, verify on a node: + +.. code-block:: console + + $ oc debug node/ -- chroot /host sysctl vm.max_map_count + + +Building the Sidecar Image +-------------------------- + +Build and push the ``crate-control`` sidecar to your container registry: + +.. code-block:: console + + $ cd sidecars/cratecontrol + $ podman build -t /crate-control: . + $ podman push /crate-control: + +For **disconnected / air-gapped clusters**, mirror the image to your +internal registry: + +.. code-block:: console + + $ oc image mirror \ + /crate-control: \ + /crate-control: + +If your cluster uses an ``ImageDigestMirrorSet`` or +``ImageContentSourcePolicy``, add appropriate entries for the +``crate-control`` and ``crate`` images. + +If pulling from a private registry, create a pull secret and link it to +the CrateDB ServiceAccount: + +.. code-block:: console + + $ oc create secret docker-registry crate-pull-secret \ + --docker-server= \ + --docker-username= \ + --docker-password= \ + -n + $ oc secrets link crate- crate-pull-secret --for=pull \ + -n + + +.. _operator-rbac-requirements: + +Operator RBAC Requirements +-------------------------- + +The operator's ClusterRole requires permissions to manage SCCs: + +.. code-block:: yaml + + - apiGroups: ["security.openshift.io"] + resources: ["securitycontextconstraints"] + verbs: ["create", "delete", "get", "list", "patch", "update", "watch"] + +It also requires ``serviceaccounts`` in core API resources. + +These permissions are included in the Helm chart's RBAC templates and +in ``deploy/rbac.yaml``. + +**If your organization restricts SCC creation**, the SCC can be +pre-created by a cluster admin before the operator runs. The operator +will detect the existing SCC (409 Conflict) and skip creation. The SCC +name follows the pattern ``crate-anyuid--``. + +.. note:: + + The person installing the operator needs sufficient privileges to + create the ClusterRole and ClusterRoleBinding. After installation, + the operator's ServiceAccount performs all SCC operations. + + +Installation +------------ + +Step 1: Install CRDs +^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: console + + $ helm repo add crate-operator https://crate.github.io/crate-operator + $ helm install crate-operator-crds crate-operator/crate-operator-crds + +Step 2: Install the Operator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: console + + $ helm install crate-operator crate-operator/crate-operator \ + --set env.CRATEDB_OPERATOR_CLOUD_PROVIDER=openshift \ + --set env.CRATEDB_OPERATOR_CRATE_CONTROL_IMAGE=/crate-control: \ + --namespace crate-operator \ + --create-namespace + +Step 3: Prepare the Target Namespace +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Create the namespace where CrateDB clusters will run and configure Pod +Security Admission: + +.. code-block:: console + + $ oc new-project cratedb + $ oc label namespace cratedb \ + pod-security.kubernetes.io/enforce=privileged \ + pod-security.kubernetes.io/warn=privileged \ + pod-security.kubernetes.io/audit=privileged + +.. warning:: + + The ``privileged`` Pod Security Admission level is required because + CrateDB pods start as root (UID 0) and use ``SYS_CHROOT``. Without + this label, pods will be rejected by the admission controller on + OpenShift 4.12+. + + If your organization prohibits namespace-wide ``privileged`` PSA, you + will need to work with your security team to create a suitable + exception or use a policy engine (e.g., Kyverno, Gatekeeper) with + more granular rules. + + +Storage Classes +--------------- + +CrateDB uses ``ReadWriteOnce`` (RWO) PersistentVolumeClaims, one per pod +(via StatefulSet volumeClaimTemplates). + +Before deploying, verify available StorageClasses: + +.. code-block:: console + + $ oc get storageclass + +Key considerations: + +- **Performance**: CrateDB is I/O intensive. Use SSD/NVMe-backed storage + (e.g., ``gp3-csi`` on ROSA, ``ocs-storagecluster-ceph-rbd`` on ODF, + or the ``local-storage`` operator for bare-metal). +- **Volume expansion**: If you plan to resize data volumes, ensure the + StorageClass has ``allowVolumeExpansion: true``. +- **Debug volume**: The operator also creates a debug volume for Java + heap dumps. This uses the ``DEBUG_VOLUME_STORAGE_CLASS`` config + variable (defaults to the cluster's default StorageClass). + +Use the chosen StorageClass name in the ``storageClass`` field of your +CrateDB resource spec. + + +Deploying CrateDB Clusters +--------------------------- + +.. code-block:: yaml + + apiVersion: cloud.crate.io/v1 + kind: CrateDB + metadata: + name: my-cluster + namespace: cratedb + spec: + cluster: + imageRegistry: crate + name: crate-dev + version: 6.2.1 + nodes: + data: + - name: hot + replicas: 3 + resources: + limits: + cpu: 4 + memory: 4Gi + disk: + count: 1 + size: 128GiB + storageClass: gp3-csi + heapRatio: 0.5 + +The operator will automatically: + +1. Create a SecurityContextConstraint (``crate-anyuid-cratedb-my-cluster``) +2. Create a ServiceAccount (``crate-my-cluster``) +3. Create a Secret with the sidecar auth token (``crate-control-my-cluster``) +4. Create a headless Service for the sidecar (``crate-control-my-cluster``) +5. Deploy the StatefulSet with the ``crate-control`` sidecar +6. Configure pod security context (``runAsUser: 0, fsGroup: 0``) + +Verify the deployment: + +.. code-block:: console + + $ oc get pods -n cratedb -l app.kubernetes.io/component=cratedb + $ oc get scc | grep crate-anyuid + +Confirm the correct SCC is being used by a pod: + +.. code-block:: console + + $ oc get pod -n cratedb -o jsonpath='{.metadata.annotations.openshift\.io/scc}' + + +Exposing CrateDB +----------------- + +CrateDB exposes two protocols: + +- **HTTP API** (port 4200): Used for the Admin UI and REST SQL endpoint. +- **PostgreSQL wire protocol** (port 5432): Used by psql, JDBC, and + other PostgreSQL-compatible clients. + +**HTTP via Route (Admin UI / REST)** + +.. code-block:: console + + $ oc create route passthrough cratedb-http \ + --service=crate-my-cluster \ + --port=4200 \ + -n cratedb + +**PostgreSQL protocol** + +OpenShift Routes are HTTP-only. For PostgreSQL wire protocol access, use +a ``LoadBalancer`` Service (if your platform supports it) or a +``NodePort``: + +.. code-block:: console + + $ oc expose service crate-my-cluster \ + --type=LoadBalancer \ + --name=cratedb-pg-lb \ + --port=5432 \ + --target-port=5432 \ + -n cratedb + + +Security Considerations +----------------------- + +**Root Access** + CrateDB pods start as UID 0 (root). The CrateDB entrypoint + immediately uses ``chroot`` to set up the runtime environment, then + drops to UID 1000 (``crate`` user). The root phase is brief and + limited to the entrypoint script. + +**SCC Scope** + Each CrateDB cluster gets its own SCC, bound only to + ``system:serviceaccount::crate-``. No cluster-wide + grants are made. See the full SCC spec in the `Architecture Changes`_ + section above. + +**Sidecar Authentication** + The ``crate-control`` sidecar requires a token (from the + ``crate-control-`` Secret) in the ``Token`` HTTP header for all + ``/exec`` requests. The ``/health`` endpoint is unauthenticated. + +**Sidecar Network Scope** + The sidecar is exposed via a headless ClusterIP Service and is only + reachable from within the cluster network. + +**Dropped Capabilities** + ``KILL`` and ``MKNOD`` are explicitly dropped. Only ``SYS_CHROOT`` is + added. ``allowPrivilegedContainer`` is ``false``. + + +Troubleshooting +--------------- + +**Pods rejected by admission controller** + If pods fail with a ``Forbidden`` error mentioning Pod Security: + + .. code-block:: console + + $ oc get events -n cratedb | grep Forbidden + + Verify namespace PSA labels: + + .. code-block:: console + + $ oc get namespace cratedb -o jsonpath='{.metadata.labels}' | python3 -m json.tool + + Ensure ``pod-security.kubernetes.io/enforce: privileged`` is set. + +**Pod fails with "cannot set blockOwnerDeletion" error** + Verify ``CRATEDB_OPERATOR_CLOUD_PROVIDER=openshift`` is set: + + .. code-block:: console + + $ oc get deploy crate-operator -n crate-operator \ + -o jsonpath='{.spec.template.spec.containers[0].env}' | python3 -m json.tool + +**Pod fails with "Permission denied" on /crate/bin/crate** + The SCC may not be created or bound correctly: + + .. code-block:: console + + $ oc get scc | grep crate-anyuid + $ oc describe scc crate-anyuid-- + $ oc get pod -n \ + -o jsonpath='{.metadata.annotations.openshift\.io/scc}' + +**CrateDB fails with "vm.max_map_count too low"** + Kernel parameters are not applied on the node. Check: + + .. code-block:: console + + $ oc debug node/ -- chroot /host sysctl vm.max_map_count + + If using TuneD, this may be a timing issue on the first pod per node. + The pod will succeed on automatic retry. For a permanent fix, use + MachineConfig (see :ref:`kernel-tuning`). + +**Sidecar container fails to start** + Check the image is pullable and the config is set: + + .. code-block:: console + + $ oc get events -n | grep crate-control + $ oc logs -c crate-control -n + + Verify the operator has the image configured: + + .. code-block:: console + + $ oc get deploy crate-operator -n crate-operator \ + -o jsonpath='{.spec.template.spec.containers[0].env}' | python3 -m json.tool + +**System user bootstrap fails** + Check sidecar logs and verify the auth token Secret exists: + + .. code-block:: console + + $ oc logs -c crate-control -n + $ oc get secret crate-control- -n + + +Limitations +----------- + +- **No graceful decommission on pod eviction**: The ``preStop`` lifecycle + hook (``dc_util --min-availability PRIMARIES``) is disabled on + OpenShift. Rolling updates and node drains may temporarily reduce shard + availability. Monitor ``sys.shards`` during maintenance. +- **``CLOUD_PROVIDER`` is exclusive**: Setting ``openshift`` means + cloud-specific features for ``aws``, ``azure``, or ``gcp`` (e.g., + zone awareness attribute auto-detection) are not active. You can still + set zone attributes manually via ``.spec.nodes.data.*.settings``. +- **``CRATE_CONTROL_IMAGE`` is required**: The operator does not validate + that this variable is set. If it is missing, StatefulSets will be + created with an empty image reference and pods will fail to pull. +- **Lifecycle hooks disabled**: ``postStart`` (routing reset) and + ``preStop`` (graceful decommission) hooks are skipped entirely. diff --git a/setup.py b/setup.py index 1a9c0177..15645de4 100644 --- a/setup.py +++ b/setup.py @@ -60,6 +60,7 @@ def read(path: str) -> str: "verlib2==0.3.1", "wrapt==2.1.0", "python-json-logger==4.0.0", + "httpx==0.28.1", ], extras_require={ "docs": [ diff --git a/sidecars/cratecontrol/Dockerfile b/sidecars/cratecontrol/Dockerfile new file mode 100644 index 00000000..8234714a --- /dev/null +++ b/sidecars/cratecontrol/Dockerfile @@ -0,0 +1,54 @@ +# CrateDB Kubernetes Operator +# +# Licensed to Crate.IO GmbH ("Crate") under one or more contributor +# license agreements. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. Crate licenses +# this file to you under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. You may +# obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# However, if you have executed another commercial license agreement +# with Crate these terms will supersede the license and you may use the +# software solely pursuant to the terms of the relevant commercial agreement. + +FROM python:3.12-slim AS builder + +WORKDIR /wheels + +RUN pip install --no-cache-dir --upgrade pip wheel && \ + pip wheel --no-cache-dir bottle==0.13.4 waitress==3.0.2 + +FROM python:3.12-slim + +LABEL license="Apache License 2.0" \ + maintainer="Crate.IO GmbH " \ + name="CrateDB Control Sidecar" \ + repository="crate/crate-operator" + +WORKDIR /app + +RUN useradd -U -M crate-control && \ + apt-get update && \ + apt-get upgrade -y && \ + apt-get autoremove -y && \ + rm -rf /var/lib/apt/lists/* + +COPY --from=builder /wheels /wheels + +RUN pip install --no-cache-dir --no-compile /wheels/* && rm -rf /wheels + +COPY main.py /app/ + +USER crate-control + +EXPOSE 5050 + +CMD ["python", "-u", "main.py"] diff --git a/sidecars/cratecontrol/README.md b/sidecars/cratecontrol/README.md new file mode 100644 index 00000000..e8fb4d9a --- /dev/null +++ b/sidecars/cratecontrol/README.md @@ -0,0 +1,62 @@ +# CrateDB Control Sidecar + +Lightweight HTTP proxy for executing SQL statements against CrateDB. + +## Features + +- **Minimal footprint**: ~20-25 MB memory +- **Simple**: Single Python file, 2 dependencies + +## Endpoints + +- `GET /health` - Health check +- `POST /exec` - Execute SQL statement + +## Environment Variables + +- `BOOTSTRAP_TOKEN` (required) - Authentication token +- `CRATE_HTTP_URL` - CrateDB HTTP endpoint (default: `http://localhost:4200/_sql`) +- `CRATE_USERNAME` - CrateDB username (optional) +- `CRATE_PASSWORD` - CrateDB password (optional) +- `VERIFY_SSL` - Verify SSL certificates (default: `false`) +- `HTTP_TIMEOUT` - Request timeout in seconds (default: `30`) +- `LOG_LEVEL` - Logging level (default: `INFO`) + +## Usage + +```bash +curl -X POST http://localhost:5050/exec \ + -H "Token: your-bootstrap-token" \ + -H "Content-Type: application/json" \ + -d '{"stmt": "SELECT current_user;"}' +``` + +## Build & Run + +```bash +docker build -t crate/crate-control:latest . +docker run -p 5050:5050 \ + -e BOOTSTRAP_TOKEN=test123 \ + -e CRATE_HTTP_URL=http://localhost:4200/_sql \ + crate/crate-control:latest +``` + +## License + +Licensed to Crate.IO GmbH ("Crate") under one or more contributor license agreements. +See the NOTICE file distributed with this work for additional information regarding +copyright ownership. Crate licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with the License. + +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed under the +License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. See the License for the specific language governing permissions +and limitations under the License. + +However, if you have executed another commercial license agreement with Crate these terms +will supersede the license and you may use the software solely pursuant to the terms of +the relevant commercial agreement. diff --git a/sidecars/cratecontrol/main.py b/sidecars/cratecontrol/main.py new file mode 100644 index 00000000..3912be40 --- /dev/null +++ b/sidecars/cratecontrol/main.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python3 +# CrateDB Kubernetes Operator +# +# Licensed to Crate.IO GmbH ("Crate") under one or more contributor +# license agreements. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. Crate licenses +# this file to you under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. You may +# obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# However, if you have executed another commercial license agreement +# with Crate these terms will supersede the license and you may use the +# software solely pursuant to the terms of the relevant commercial agreement. + +import hmac +import json +import logging +import os +import signal +import socket +import ssl +import sys +from base64 import b64encode +from urllib.error import HTTPError, URLError +from urllib.request import Request, urlopen + +from bottle import Bottle, request, response +from waitress import serve + +BOOTSTRAP_TOKEN = os.environ.get("BOOTSTRAP_TOKEN", "") +CRATE_HTTP_URL = os.environ.get("CRATE_HTTP_URL", "http://localhost:4200/_sql") +CRATE_USERNAME = os.environ.get("CRATE_USERNAME", "") +CRATE_PASSWORD = os.environ.get("CRATE_PASSWORD", "") +VERIFY_SSL = os.environ.get("VERIFY_SSL", "false").lower() == "true" +HTTP_TIMEOUT = float(os.environ.get("HTTP_TIMEOUT", "30")) +LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper() + +logging.basicConfig( + level=getattr(logging, LOG_LEVEL, logging.INFO), + format="%(asctime)s %(levelname)s %(message)s", +) +logger = logging.getLogger("crate-control") + +SSL_CONTEXT = None if VERIFY_SSL else ssl._create_unverified_context() + +app = Bottle() + + +def abort_json(status: int, payload: dict): + """ + Return JSON error response. + """ + response.status = status + response.content_type = "application/json" + return json.dumps(payload) + + +def verify_token(): + """ + Verify bootstrap token from request header. + + Uses hmac.compare_digest() for timing-safe comparison to prevent + timing attacks that could leak the token value. + """ + token = request.headers.get("Token") or "" + if not hmac.compare_digest(token, BOOTSTRAP_TOKEN): + logger.warning("unauthorized_request") + return False + return True + + +def crate_request(stmt: str, args): + """ + Execute SQL request against CrateDB HTTP endpoint. + + Returns: (status_code, response_body_string) + Raises: RuntimeError on connection failure + """ + body = {"stmt": stmt} + if args is not None: + body["args"] = args + + data = json.dumps(body).encode("utf-8") + + req = Request(CRATE_HTTP_URL, data=data, method="POST") + req.add_header("Content-Type", "application/json") + + if CRATE_USERNAME: + auth = f"{CRATE_USERNAME}:{CRATE_PASSWORD}".encode() + req.add_header("Authorization", "Basic " + b64encode(auth).decode()) + + try: + with urlopen(req, timeout=HTTP_TIMEOUT, context=SSL_CONTEXT) as resp: + payload = resp.read().decode() + return resp.status, payload + except HTTPError as e: + return e.code, e.read().decode() + except (URLError, TimeoutError, socket.timeout) as e: + raise RuntimeError(f"connection_failed: {e}") + + +@app.get("/health") +def health(): + """ + Health check endpoint. + """ + return {"status": "ok"} + + +@app.post("/exec") +def exec_sql(): + """ + Execute SQL statement against CrateDB. + """ + if not verify_token(): + return abort_json(401, {"detail": "Invalid or missing token"}) + + try: + payload = request.json + if not payload or "stmt" not in payload: + return abort_json(400, {"detail": "Missing stmt"}) + + stmt = payload["stmt"] + args = payload.get("args") + + logger.info("sql_execution_requested: %s", stmt[:80]) + + status, body = crate_request(stmt, args) + response.content_type = "application/json" + + if status >= 400: + logger.error("query_failed status=%d", status) + response.status = status + return body + + logger.info("query_succeeded status=%d", status) + return body + except RuntimeError as e: + logger.warning("crate_not_ready: %s", e) + return abort_json(503, {"detail": str(e), "error_type": "connection_error"}) + except Exception as e: + logger.exception("unexpected_error") + return abort_json(500, {"detail": str(e), "error_type": "internal_error"}) + + +def handle_signal(sig, frame): + """ + Handle SIGTERM/SIGINT for graceful shutdown. + """ + logger.info("received signal %s, shutting down...", sig) + sys.exit(0) + + +signal.signal(signal.SIGTERM, handle_signal) +signal.signal(signal.SIGINT, handle_signal) + +if __name__ == "__main__": + logger.info("starting crate-control on port 5050") + logger.info("crate_url=%s verify_ssl=%s", CRATE_HTTP_URL, VERIFY_SSL) + + serve(app, host="0.0.0.0", port=5050, threads=2, cleanup_interval=30) diff --git a/tests/conftest.py b/tests/conftest.py index dacbaf67..9f7164e5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -103,6 +103,7 @@ def pytest_collection_modifyitems(config, items): def load_config(worker_id): env = { "CRATEDB_OPERATOR_CLUSTER_BACKUP_IMAGE": "crate/does-not-exist-backup", + "CRATEDB_OPERATOR_CRATE_CONTROL_IMAGE": "crate/does-not-exist-control", "CRATEDB_OPERATOR_DEBUG_VOLUME_SIZE": "2GiB", "CRATEDB_OPERATOR_DEBUG_VOLUME_STORAGE_CLASS": "default", "CRATEDB_OPERATOR_IMAGE_PULL_SECRETS": "", diff --git a/tests/test_create.py b/tests/test_create.py index 7d11d1be..8ffaf848 100644 --- a/tests/test_create.py +++ b/tests/test_create.py @@ -331,6 +331,7 @@ def test_shared_resources_tolerations(self, node_spec, faker): class TestStatefulSetContainers: def test(self, faker, random_string): + name = faker.domain_word() cpus = faker.pyfloat(min_value=0) memory = faker.numerify("%!!") + ".0" + faker.lexify("?i", "KMG") node_spec = { @@ -350,6 +351,8 @@ def test(self, faker, random_string): ["/path/to/some/exec.sh", "--with", "args"], [], [], + name, + False, ) assert c_sql_exporter.name == "sql-exporter" assert len(c_sql_exporter.volume_mounts) == 1 diff --git a/tests/test_openshift.py b/tests/test_openshift.py new file mode 100644 index 00000000..2aee7c4b --- /dev/null +++ b/tests/test_openshift.py @@ -0,0 +1,288 @@ +# CrateDB Kubernetes Operator +# +# Licensed to Crate.IO GmbH ("Crate") under one or more contributor +# license agreements. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. Crate licenses +# this file to you under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. You may +# obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# However, if you have executed another commercial license agreement +# with Crate these terms will supersede the license and you may use the +# software solely pursuant to the terms of the relevant commercial agreement. + +from unittest import mock + +import pytest + +from crate.operator.constants import CloudProvider +from crate.operator.create import ( + get_owner_references, + get_statefulset_containers, + get_statefulset_init_containers, +) + + +class TestOpenShiftOwnerReferences: + """Test that owner references are created correctly for OpenShift.""" + + def test_openshift_disables_block_owner_deletion(self, faker): + name = faker.domain_word() + meta = {"uid": faker.uuid4()} + + with mock.patch( + "crate.operator.create.config.CLOUD_PROVIDER", CloudProvider.OPENSHIFT + ): + owner_refs = get_owner_references(name, meta) + + assert len(owner_refs) == 1 + assert owner_refs[0].block_owner_deletion is False + assert owner_refs[0].controller is True + assert owner_refs[0].kind == "CrateDB" + assert owner_refs[0].name == name + + @pytest.mark.parametrize( + "provider", [None, CloudProvider.AWS, CloudProvider.AZURE, CloudProvider.GCP] + ) + def test_non_openshift_enables_block_owner_deletion(self, provider, faker): + name = faker.domain_word() + meta = {"uid": faker.uuid4()} + + with mock.patch("crate.operator.create.config.CLOUD_PROVIDER", provider): + owner_refs = get_owner_references(name, meta) + + assert len(owner_refs) == 1 + assert owner_refs[0].block_owner_deletion is True + + +class TestOpenShiftInitContainers: + """Test that init containers are configured correctly for OpenShift.""" + + def test_openshift_skips_sysctl_init_container(self): + with mock.patch( + "crate.operator.create.config.CLOUD_PROVIDER", CloudProvider.OPENSHIFT + ): + containers = get_statefulset_init_containers("crate:5.10.16") + + container_names = [c.name for c in containers] + assert "init-sysctl" not in container_names + assert "fetch-jmx-exporter" in container_names + assert "mkdir-heapdump" in container_names + + @pytest.mark.parametrize( + "provider", [None, CloudProvider.AWS, CloudProvider.AZURE, CloudProvider.GCP] + ) + def test_non_openshift_includes_sysctl_init_container(self, provider): + with mock.patch("crate.operator.create.config.CLOUD_PROVIDER", provider): + containers = get_statefulset_init_containers("crate:5.10.16") + + container_names = [c.name for c in containers] + assert "init-sysctl" in container_names + assert "fetch-jmx-exporter" in container_names + assert "mkdir-heapdump" in container_names + + # Verify sysctl container is privileged + sysctl_container = next(c for c in containers if c.name == "init-sysctl") + assert sysctl_container.security_context.privileged is True + + +class TestOpenShiftSidecarContainer: + """Test that the crate-control sidecar is added on OpenShift.""" + + def test_openshift_adds_crate_control_sidecar(self, faker): + name = faker.domain_word() + node_spec = { + "resources": { + "requests": {"cpu": 1, "memory": "1Gi"}, + "limits": {"cpu": 1, "memory": "1Gi"}, + } + } + + with mock.patch( + "crate.operator.create.config.CLOUD_PROVIDER", CloudProvider.OPENSHIFT + ): + with mock.patch( + "crate.operator.create.config.CRATE_CONTROL_IMAGE", + "crate/crate-control:latest", + ): + containers = get_statefulset_containers( + node_spec, + 1, + 2, + 3, + 4, + 5, + "crate:5.10.16", + ["/docker-entrypoint.sh", "crate"], + [], + [], + name, + False, + ) + + container_names = [c.name for c in containers] + assert "crate-control" in container_names + + # Verify crate-control container configuration + control_container = next(c for c in containers if c.name == "crate-control") + assert control_container.image == "crate/crate-control:latest" + assert len(control_container.ports) == 1 + assert control_container.ports[0].container_port == 5050 + assert control_container.ports[0].name == "control" + + # Verify environment variables + env_names = [e.name for e in control_container.env] + assert "CRATE_HTTP_URL" in env_names + assert "BOOTSTRAP_TOKEN" in env_names + + # Verify readiness probe + assert control_container.readiness_probe is not None + assert control_container.readiness_probe.http_get.path == "/health" + assert control_container.readiness_probe.http_get.port == 5050 + + def test_openshift_sidecar_uses_ssl_aware_url(self, faker): + name = faker.domain_word() + node_spec = { + "resources": { + "requests": {"cpu": 1, "memory": "1Gi"}, + "limits": {"cpu": 1, "memory": "1Gi"}, + } + } + + with mock.patch( + "crate.operator.create.config.CLOUD_PROVIDER", CloudProvider.OPENSHIFT + ): + with mock.patch( + "crate.operator.create.config.CRATE_CONTROL_IMAGE", + "crate/crate-control:latest", + ): + containers = get_statefulset_containers( + node_spec, + 1, + 2, + 3, + 4, + 5, + "crate:5.10.16", + ["/docker-entrypoint.sh", "crate"], + [], + [], + name, + True, + ) + + control_container = next(c for c in containers if c.name == "crate-control") + crate_http_url = next( + e for e in control_container.env if e.name == "CRATE_HTTP_URL" + ) + assert crate_http_url.value == "https://localhost:4200/_sql" + + @pytest.mark.parametrize( + "provider", [None, CloudProvider.AWS, CloudProvider.AZURE, CloudProvider.GCP] + ) + def test_non_openshift_no_crate_control_sidecar(self, provider, faker): + name = faker.domain_word() + node_spec = { + "resources": { + "requests": {"cpu": 1, "memory": "1Gi"}, + "limits": {"cpu": 1, "memory": "1Gi"}, + } + } + + with mock.patch("crate.operator.create.config.CLOUD_PROVIDER", provider): + containers = get_statefulset_containers( + node_spec, + 1, + 2, + 3, + 4, + 5, + "crate:5.10.16", + ["/docker-entrypoint.sh", "crate"], + [], + [], + name, + True, + ) + + container_names = [c.name for c in containers] + assert "crate-control" not in container_names + assert len(containers) == 2 # Only sql-exporter and crate + + +class TestOpenShiftLifecycleHooks: + """Test that lifecycle hooks are conditionally added.""" + + def test_openshift_skips_lifecycle_hooks(self, faker): + name = faker.domain_word() + node_spec = { + "resources": { + "requests": {"cpu": 1, "memory": "1Gi"}, + "limits": {"cpu": 1, "memory": "1Gi"}, + } + } + + with mock.patch( + "crate.operator.create.config.CLOUD_PROVIDER", CloudProvider.OPENSHIFT + ): + containers = get_statefulset_containers( + node_spec, + 1, + 2, + 3, + 4, + 5, + "crate:5.10.16", + ["/docker-entrypoint.sh", "crate"], + [], + [], + name, + True, + ) + + crate_container = next(c for c in containers if c.name == "crate") + assert ( + not hasattr(crate_container, "lifecycle") + or crate_container.lifecycle is None + ) + + @pytest.mark.parametrize( + "provider", [None, CloudProvider.AWS, CloudProvider.AZURE, CloudProvider.GCP] + ) + def test_non_openshift_includes_lifecycle_hooks(self, provider, faker): + name = faker.domain_word() + node_spec = { + "resources": { + "requests": {"cpu": 1, "memory": "1Gi"}, + "limits": {"cpu": 1, "memory": "1Gi"}, + } + } + + with mock.patch("crate.operator.create.config.CLOUD_PROVIDER", provider): + containers = get_statefulset_containers( + node_spec, + 1, + 2, + 3, + 4, + 5, + "crate:5.10.16", + ["/docker-entrypoint.sh", "crate"], + [], + [], + name, + True, + ) + + crate_container = next(c for c in containers if c.name == "crate") + assert crate_container.lifecycle is not None + assert crate_container.lifecycle.post_start is not None + assert crate_container.lifecycle.pre_stop is not None diff --git a/tests/test_restore_backup.py b/tests/test_restore_backup.py index cb1508a5..a096991c 100644 --- a/tests/test_restore_backup.py +++ b/tests/test_restore_backup.py @@ -54,6 +54,7 @@ AzureBackupRepositoryData, BackupRepositoryData, ) +from crate.operator.sql import SQLResult from crate.operator.utils.formatting import b64encode from crate.operator.webhooks import ( WebhookAction, @@ -959,9 +960,9 @@ async def test_create_backup_repository( @pytest.mark.asyncio @mock.patch("crate.operator.restore_backup.get_gc_user_password") -@mock.patch("crate.operator.restore_backup.run_crash_command") +@mock.patch("crate.operator.restore_backup.execute_sql") async def test_gc_admin_password_restore( - mock_run_crash, mock_get_gc_user_password, faker + mock_execute_sql, mock_get_gc_user_password, faker ): name = faker.domain_word() namespace = faker.uuid4() @@ -988,18 +989,22 @@ async def test_gc_admin_password_restore( mock_get_gc_user_password.side_effect = None mock_get_gc_user_password.return_value = "gc-secret-password" - mock_run_crash.return_value = "ERROR something wrong" + mock_execute_sql.return_value = SQLResult( + error_message="some transient error", error_code=None, rowcount=0, rows=[] + ) with pytest.raises(kopf.TemporaryError): await handler._restore_gc_admin_password( core, namespace, name, pod_name, scheme, logger ) - mock_run_crash.assert_called() + mock_execute_sql.assert_called() # successful password reset mock_get_gc_user_password.return_value = "gc-secret-password" - mock_run_crash.return_value = "ALTER OK" + mock_execute_sql.return_value = SQLResult( + rowcount=1, rows=[(1,)], error_message=None, error_code=None + ) await handler._restore_gc_admin_password( core, namespace, name, pod_name, scheme, logger diff --git a/tests/test_sql.py b/tests/test_sql.py new file mode 100644 index 00000000..84092092 --- /dev/null +++ b/tests/test_sql.py @@ -0,0 +1,178 @@ +# CrateDB Kubernetes Operator +# +# Licensed to Crate.IO GmbH ("Crate") under one or more contributor +# license agreements. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. Crate licenses +# this file to you under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. You may +# obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# However, if you have executed another commercial license agreement +# with Crate these terms will supersede the license and you may use the +# software solely pursuant to the terms of the relevant commercial agreement. + +from crate.operator.sql import ( + SQLResult, + normalize_crash, + normalize_crate_control, + parse_crash_table, +) + + +class TestSQLResult: + def test_ok_property_true(self): + result = SQLResult(rowcount=1, rows=None, error_code=None, error_message=None) + assert result.ok is True + + def test_ok_property_false(self): + result = SQLResult( + rowcount=None, rows=None, error_code=4000, error_message="Error" + ) + assert result.ok is False + + +class TestNormalizeCrash: + def test_alter_ok(self): + result = normalize_crash("ALTER OK, 1 row affected (0.123 sec)") + assert result.ok is True + assert result.rowcount == 1 + assert result.error_message is None + + def test_create_ok(self): + result = normalize_crash("CREATE OK, 1 row affected (0.123 sec)") + assert result.ok is True + assert result.rowcount == 1 + + def test_grant_ok(self): + result = normalize_crash("GRANT OK, 1 row affected (0.123 sec)") + assert result.ok is True + assert result.rowcount == 1 + + def test_exception(self): + result = normalize_crash( + "SQLActionException[UserAlreadyExistsException: User 'test' already exists]" + ) + assert result.ok is False + assert result.error_message is not None + assert "Exception" in result.error_message + + def test_select_with_results(self): + output = """ ++------+-------+ +| col1 | col2 | ++------+-------+ +| 1 | test | +| 2 | hello | ++------+-------+ +SELECT 2 rows in set (0.123 sec) +""" + result = normalize_crash(output) + assert result.ok is True + assert result.rowcount == 2 + assert result.rows is not None + assert len(result.rows) == 2 + assert result.rows[0] == (1, "test") + assert result.rows[1] == (2, "hello") + + +class TestParseCrashTable: + def test_parse_simple_table(self): + output = """ ++------+-------+ +| col1 | col2 | ++------+-------+ +| 1 | test | +| 2 | hello | ++------+-------+ +""" + rows = parse_crash_table(output) + assert len(rows) == 2 + assert rows[0] == (1, "test") + assert rows[1] == (2, "hello") + + def test_parse_empty_table(self): + output = """ ++------+ +| col1 | ++------+ ++------+ +""" + rows = parse_crash_table(output) + assert rows == [] + + def test_parse_with_null_values(self): + """NULL values (empty strings) should be converted to None.""" + output = """ ++------+-------+ +| col1 | col2 | ++------+-------+ +| 1 | | ++------+-------+ +""" + rows = parse_crash_table(output) + assert rows[0] == (1, None) + + def test_parse_with_boolean(self): + """Boolean values should be converted.""" + output = """ ++------+-------+ +| col1 | col2 | ++------+-------+ +| true | false | ++------+-------+ +""" + rows = parse_crash_table(output) + assert rows[0] == (True, False) + + def test_parse_with_float(self): + """Float values should be converted.""" + output = """ ++-------+ +| col1 | ++-------+ +| 3.14 | +| -2.5 | ++-------+ +""" + rows = parse_crash_table(output) + assert rows[0] == (3.14,) + assert rows[1] == (-2.5,) + + +class TestNormalizeCrateControl: + def test_success_response(self): + response = {"rowcount": 1, "rows": [[1, "test"]]} + result = normalize_crate_control(response) + assert result.ok is True + assert result.rowcount == 1 + assert result.rows is not None + assert result.rows == [[1, "test"]] + assert result.error_message is None + + def test_error_response_with_error_key(self): + response = {"error": {"code": 4000, "message": "User already exists"}} + result = normalize_crate_control(response) + assert result.ok is False + assert result.error_code == 4000 + assert result.error_message == "User already exists" + + def test_error_response_with_detail_key(self): + response = {"detail": '{"error": {"code": 4000, "message": "Error"}}'} + result = normalize_crate_control(response) + assert result.ok is False + assert result.error_code == 4000 + assert result.error_message == "Error" + + def test_error_response_with_string_detail(self): + response = {"detail": "Something went wrong"} + result = normalize_crate_control(response) + assert result.ok is False + assert result.error_message == "Something went wrong"