-
Notifications
You must be signed in to change notification settings - Fork 0
feat(grpc): add list workloads/volumes #18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,8 @@ import { | |
| LogsStreamSession, | ||
| Platform, | ||
| PLATFORM_LABEL, | ||
| ContainerListEntry, | ||
| VolumeListEntry, | ||
| type ContainerInspectInfo, | ||
| } from './types.ts'; | ||
|
|
||
|
|
@@ -1010,12 +1012,59 @@ export class ContainerService implements DockerClientPort { | |
| return sorted.map((c) => new ContainerHandle(this, c.Id)); | ||
| } | ||
|
|
||
| async listContainersByLabelKey(labelKey: string, options?: { all?: boolean }): Promise<ContainerListEntry[]> { | ||
| if (!labelKey) return []; | ||
| this.log(`Listing containers by label key all=${options?.all ?? true} label=${labelKey}`); | ||
| const list: Docker.ContainerInfo[] = await this.docker.listContainers({ | ||
| all: options?.all ?? true, | ||
| filters: { label: [labelKey] }, | ||
| }); | ||
| const sorted = [...list].sort((a: Docker.ContainerInfo, b: Docker.ContainerInfo) => { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [minor] This sorting comparator is duplicated verbatim from |
||
| const ac = typeof a.Created === 'number' ? a.Created : 0; | ||
| const bc = typeof b.Created === 'number' ? b.Created : 0; | ||
| if (ac !== bc) return ac - bc; | ||
| const aid = String(a.Id ?? ''); | ||
| const bid = String(b.Id ?? ''); | ||
| return aid.localeCompare(bid); | ||
| }); | ||
| return sorted | ||
| .filter((container) => typeof container.Id === 'string' && container.Id.length > 0) | ||
| .map((container) => ({ id: container.Id, labels: container.Labels ?? {} })); | ||
| } | ||
|
|
||
| async listContainersByVolume(volumeName: string): Promise<string[]> { | ||
| if (!volumeName) return []; | ||
| const result = await this.docker.listContainers({ all: true, filters: { volume: [volumeName] } }); | ||
| return Array.isArray(result) ? result.map((it) => it.Id) : []; | ||
| } | ||
|
|
||
| async listVolumesByLabelKey(labelKey: string): Promise<VolumeListEntry[]> { | ||
| if (!labelKey) return []; | ||
| this.log(`Listing volumes by label key label=${labelKey}`); | ||
| const result = await this.docker.listVolumes({ filters: { label: [labelKey] } }); | ||
| const volumes = Array.isArray(result?.Volumes) ? result.Volumes : []; | ||
| const sorted = [...volumes].sort((a, b) => String(a.Name ?? '').localeCompare(String(b.Name ?? ''))); | ||
| return sorted | ||
| .filter((volume) => typeof volume.Name === 'string' && volume.Name.length > 0) | ||
| .map((volume) => ({ name: volume.Name, labels: volume.Labels ?? {} })); | ||
| } | ||
|
|
||
| async ensureVolume(volumeName: string, labels?: Record<string, string>): Promise<void> { | ||
| const name = volumeName.trim(); | ||
| if (!name) return; | ||
| this.log(`Ensuring volume name=${name}`); | ||
| try { | ||
| await this.docker.createVolume({ Name: name, Labels: labels ?? {} }); | ||
| } catch (e: unknown) { | ||
| const sc = typeof e === 'object' && e && 'statusCode' in e ? (e as { statusCode?: number }).statusCode : undefined; | ||
| if (sc === 409) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [minor] Note that if the volume already exists (409), labels are silently not applied since Docker doesn't support updating volume labels post-creation. This means if a volume was created earlier (e.g., by a previous workload without labels or with different labels), |
||
| this.debug(`Volume already exists name=${name}`); | ||
| return; | ||
| } | ||
| throw e; | ||
| } | ||
| } | ||
|
|
||
| async removeVolume(volumeName: string, options?: { force?: boolean }): Promise<void> { | ||
| const force = options?.force ?? false; | ||
| this.log(`Removing volume name=${volumeName} force=${force}`); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,9 +28,15 @@ import { | |
| InspectWorkloadRequest, | ||
| InspectWorkloadResponse, | ||
| InspectWorkloadResponseSchema, | ||
| ListWorkloadsRequest, | ||
| ListWorkloadsResponse, | ||
| ListWorkloadsResponseSchema, | ||
| ListWorkloadsByVolumeRequest, | ||
| ListWorkloadsByVolumeResponse, | ||
| ListWorkloadsByVolumeResponseSchema, | ||
| ListVolumesRequest, | ||
| ListVolumesResponse, | ||
| ListVolumesResponseSchema, | ||
| LogChunkSchema, | ||
| LogEndSchema, | ||
| PutArchiveRequest, | ||
|
|
@@ -66,7 +72,13 @@ import { | |
| TouchWorkloadResponseSchema, | ||
| SidecarInstance, | ||
| SidecarInstanceSchema, | ||
| VolumeKind, | ||
| VolumeListItem, | ||
| VolumeListItemSchema, | ||
| VolumeSpec, | ||
| WorkloadContainersSchema, | ||
| WorkloadListItem, | ||
| WorkloadListItemSchema, | ||
| WorkloadStatus, | ||
| } from '../../proto/gen/agynio/api/runner/v1/runner_pb.js'; | ||
| import { runnerServiceGrpcDefinition } from '../../proto/grpc.js'; | ||
|
|
@@ -76,7 +88,7 @@ import type { ContainerService, InteractiveExecSession, LogsStreamSession } from | |
| import type { ContainerHandle } from '../../lib/container.handle.ts'; | ||
| import type { RunnerConfig } from '../config.ts'; | ||
| import { createDockerEventsParser } from '../dockerEvents.parser.ts'; | ||
| import { startWorkloadRequestToContainerOpts } from '../../contracts/workload.grpc.ts'; | ||
| import { extractRequestLabels, startWorkloadRequestToContainerOpts } from '../../contracts/workload.grpc.ts'; | ||
|
|
||
| type ExecStream = ServerDuplexStream<ExecRequest, ExecResponse>; | ||
|
|
||
|
|
@@ -130,6 +142,8 @@ const CONTAINER_STOP_TIMEOUT_SEC = 10; | |
| const SIDECAR_ROLE_LABEL = 'hautech.ai/role'; | ||
| const SIDECAR_ROLE_VALUE = 'sidecar'; | ||
| const PARENT_CONTAINER_LABEL = 'hautech.ai/parent_cid'; | ||
| const WORKLOAD_KEY_LABEL = 'workload_key'; | ||
| const VOLUME_KEY_LABEL = 'volume_key'; | ||
|
|
||
| async function findSidecarHandles(containers: ContainerService, workloadId: string): Promise<ContainerHandle[]> { | ||
| try { | ||
|
|
@@ -171,6 +185,34 @@ async function removeSidecars( | |
| } | ||
| } | ||
|
|
||
| const resolveVolumeName = (spec: VolumeSpec): string | null => { | ||
| const persistentName = spec.persistentName?.trim() ?? ''; | ||
| const fallbackName = spec.name?.trim() ?? ''; | ||
| const resolved = persistentName || fallbackName; | ||
| if (!resolved) return null; | ||
| if (resolved.includes('/')) return null; | ||
| return resolved; | ||
| }; | ||
|
|
||
| async function ensureWorkloadVolumes( | ||
| containers: ContainerService, | ||
| request: StartWorkloadRequest, | ||
| baseLabels: Record<string, string>, | ||
| ): Promise<void> { | ||
| const volumeSpecs = request.volumes ?? []; | ||
| if (volumeSpecs.length === 0) return; | ||
| const ensured = new Set<string>(); | ||
| for (const spec of volumeSpecs) { | ||
| if (!spec || spec.kind !== VolumeKind.NAMED) continue; | ||
| const volumeName = resolveVolumeName(spec); | ||
| if (!volumeName || ensured.has(volumeName)) continue; | ||
| const volumeLabels = { ...baseLabels, ...(spec.labels ?? {}) }; | ||
| if (Object.keys(volumeLabels).length === 0) continue; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [minor] This skips This is likely intentional (no labels → nothing useful to apply), but it's a subtle contract: a named volume that the orchestrator doesn't label will never appear in |
||
| ensured.add(volumeName); | ||
| await containers.ensureVolume(volumeName, volumeLabels); | ||
| } | ||
| } | ||
|
|
||
| type DockerErrorDetails = { | ||
| statusCode?: number; | ||
| status?: number; | ||
|
|
@@ -366,6 +408,8 @@ export function createRunnerGrpcServer(opts: RunnerGrpcOptions): Server { | |
| return callback(toServiceError(status.INVALID_ARGUMENT, 'main_container_required')); | ||
| } | ||
| try { | ||
| const requestLabels = extractRequestLabels(call.request); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [major]
Consider having |
||
| await ensureWorkloadVolumes(opts.containers, call.request, requestLabels); | ||
| const containerOpts = startWorkloadRequestToContainerOpts(call.request); | ||
| const sidecarOpts = Array.isArray(containerOpts.sidecars) ? containerOpts.sidecars : []; | ||
| const stopAndRemove = async (containerId: string) => { | ||
|
|
@@ -562,6 +606,29 @@ export function createRunnerGrpcServer(opts: RunnerGrpcOptions): Server { | |
| callback(toDockerServiceError(error, status.NOT_FOUND)); | ||
| } | ||
| }, | ||
| listWorkloads: async ( | ||
| call: ServerUnaryCall<ListWorkloadsRequest, ListWorkloadsResponse>, | ||
| callback: (error: ServiceError | null, value?: ListWorkloadsResponse) => void, | ||
| ) => { | ||
| try { | ||
| const containers = await opts.containers.listContainersByLabelKey(WORKLOAD_KEY_LABEL, { all: true }); | ||
| const workloads: WorkloadListItem[] = []; | ||
| for (const container of containers) { | ||
| if (container.labels[SIDECAR_ROLE_LABEL] === SIDECAR_ROLE_VALUE) continue; | ||
| const workloadKey = container.labels[WORKLOAD_KEY_LABEL]; | ||
| if (!workloadKey) continue; | ||
| workloads.push( | ||
| create(WorkloadListItemSchema, { | ||
| instanceId: container.id, | ||
| workloadKey, | ||
| }), | ||
| ); | ||
| } | ||
| callback(null, create(ListWorkloadsResponseSchema, { workloads })); | ||
| } catch (error) { | ||
| callback(toDockerServiceError(error, status.UNKNOWN)); | ||
| } | ||
| }, | ||
| findWorkloadsByLabels: async ( | ||
| call: ServerUnaryCall<FindWorkloadsByLabelsRequest, FindWorkloadsByLabelsResponse>, | ||
| callback: (error: ServiceError | null, value?: FindWorkloadsByLabelsResponse) => void, | ||
|
|
@@ -597,6 +664,28 @@ export function createRunnerGrpcServer(opts: RunnerGrpcOptions): Server { | |
| callback(toDockerServiceError(error, status.UNKNOWN)); | ||
| } | ||
| }, | ||
| listVolumes: async ( | ||
| call: ServerUnaryCall<ListVolumesRequest, ListVolumesResponse>, | ||
| callback: (error: ServiceError | null, value?: ListVolumesResponse) => void, | ||
| ) => { | ||
| try { | ||
| const volumes = await opts.containers.listVolumesByLabelKey(VOLUME_KEY_LABEL); | ||
| const items: VolumeListItem[] = []; | ||
| for (const volume of volumes) { | ||
| const volumeKey = volume.labels[VOLUME_KEY_LABEL]; | ||
| if (!volumeKey) continue; | ||
| items.push( | ||
| create(VolumeListItemSchema, { | ||
| instanceId: volume.name, | ||
| volumeKey, | ||
| }), | ||
| ); | ||
| } | ||
| callback(null, create(ListVolumesResponseSchema, { volumes: items })); | ||
| } catch (error) { | ||
| callback(toDockerServiceError(error, status.UNKNOWN)); | ||
| } | ||
| }, | ||
| removeVolume: async ( | ||
| call: ServerUnaryCall<RemoveVolumeRequest, RemoveVolumeResponse>, | ||
| callback: (error: ServiceError | null, value?: RemoveVolumeResponse) => void, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] Good to see coverage for
label.*extraction. However, this PR introduces several new behaviors that lack test coverage:extractRequestLabelsmerging precedence: what happens whenrequest.labelsandlabel.*inadditionalPropertieshave overlapping keys?labels_json+label.*merge: the existinglabels_jsonpath now merges with request-level labels — the precedence should be tested.resolveVolumeName: edge cases (empty name, slashes, persistentName vs name fallback).ensureWorkloadVolumes: deduplication, NAMED-only filtering, skip-when-no-labels.listWorkloads/listVolumeshandlers: at minimum, unit-level tests with mockedContainerServiceto verify sidecar filtering and label key extraction.Consider adding tests for these to avoid regressions in the reconciliation logic.