Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions __tests__/workload.grpc.mapper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,16 @@ describe('workload gRPC mapping', () => {

expect(rebuilt.binds).toEqual(['data:/data:ro,z']);
});

it('applies label.* additionalProperties to labels', () => {
const request = containerOptsToStartWorkloadRequest({ image: 'alpine:3' });
request.additionalProperties = {
'label.workload_key': 'workload-1',
'label.team': 'core',
};

const rebuilt = startWorkloadRequestToContainerOpts(request);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] Good to see coverage for label.* extraction. However, this PR introduces several new behaviors that lack test coverage:

  • extractRequestLabels merging precedence: what happens when request.labels and label.* in additionalProperties have overlapping keys?
  • labels_json + label.* merge: the existing labels_json path now merges with request-level labels — the precedence should be tested.
  • resolveVolumeName: edge cases (empty name, slashes, persistentName vs name fallback).
  • ensureWorkloadVolumes: deduplication, NAMED-only filtering, skip-when-no-labels.
  • listWorkloads / listVolumes handlers: at minimum, unit-level tests with mocked ContainerService to verify sidecar filtering and label key extraction.

Consider adding tests for these to avoid regressions in the reconciliation logic.

expect(rebuilt.labels).toEqual({ workload_key: 'workload-1', team: 'core' });
});
});
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"lint": "eslint .",
"test": "vitest run",
"test:e2e": "vitest run --config vitest.config.e2e.ts",
"proto:generate": "buf generate buf.build/agynio/api"
"proto:generate": "rm -rf src/proto/gen && buf generate buf.build/agynio/api --path agynio/api/runner/v1/runner.proto"
},
"dependencies": {
"@nestjs/common": "^11.1.7",
Expand Down
47 changes: 46 additions & 1 deletion src/contracts/workload.grpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const PROP_CREATE_EXTRAS_JSON = 'create_extras_json';
const PROP_BIND_OPTIONS = 'bind_options';
const PROP_TTL_SECONDS = 'ttl_seconds';
const PROP_PLATFORM = 'platform';
const LABEL_PREFIX = 'label.';

const BOOL_TRUE = new Set(['1', 'true', 'yes', 'on']);

Expand Down Expand Up @@ -104,6 +105,44 @@ const ensureVolumeSpecName = (prefix: string, index: number): string => `${prefi

const cloneAdditionalProperties = (input?: Record<string, string>): Record<string, string> => ({ ...(input ?? {}) });

const sanitizeLabels = (input?: Record<string, string>): Record<string, string> => {
if (!input) return {};
const labels: Record<string, string> = {};
for (const [key, value] of Object.entries(input)) {
const trimmed = key.trim();
if (!trimmed) continue;
labels[trimmed] = value;
}
return labels;
};

const extractLabelProperties = (input?: Record<string, string>): Record<string, string> => {
if (!input) return {};
const labels: Record<string, string> = {};
for (const [key, value] of Object.entries(input)) {
if (!key.startsWith(LABEL_PREFIX)) continue;
const labelKey = key.slice(LABEL_PREFIX.length).trim();
if (!labelKey) continue;
labels[labelKey] = value;
}
return labels;
};

const mergeLabels = (...sources: Array<Record<string, string> | undefined>): Record<string, string> => {
const labels: Record<string, string> = {};
for (const source of sources) {
if (!source) continue;
for (const [key, value] of Object.entries(source)) {
if (!key) continue;
labels[key] = value;
}
}
return labels;
};

export const extractRequestLabels = (request: StartWorkloadRequest): Record<string, string> =>
mergeLabels(extractLabelProperties(request.additionalProperties), sanitizeLabels(request.labels));

export const workloadContainerPropKeys = {
autoRemove: PROP_AUTO_REMOVE,
networkMode: PROP_NETWORK_MODE,
Expand Down Expand Up @@ -264,6 +303,9 @@ export const startWorkloadRequestToContainerOpts = (request: StartWorkloadReques

const opts: ContainerOpts = {};

const requestLabels = extractRequestLabels(request);
if (Object.keys(requestLabels).length > 0) opts.labels = requestLabels;

if (isNonEmptyString(main.image)) opts.image = main.image;
if (isNonEmptyString(main.name)) opts.name = main.name;
if (Array.isArray(main.cmd) && main.cmd.length > 0) opts.cmd = [...main.cmd];
Expand Down Expand Up @@ -291,7 +333,10 @@ export const startWorkloadRequestToContainerOpts = (request: StartWorkloadReques
if (isNonEmptyString(containerProps[PROP_LABELS_JSON])) {
try {
const parsed = JSON.parse(containerProps[PROP_LABELS_JSON]) as Record<string, string>;
if (parsed && typeof parsed === 'object') opts.labels = parsed;
if (parsed && typeof parsed === 'object') {
const mergedLabels = mergeLabels(requestLabels, parsed);
if (Object.keys(mergedLabels).length > 0) opts.labels = mergedLabels;
}
} catch {
// ignore malformed labels payloads
}
Expand Down
49 changes: 49 additions & 0 deletions src/lib/container.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import {
LogsStreamSession,
Platform,
PLATFORM_LABEL,
ContainerListEntry,
VolumeListEntry,
type ContainerInspectInfo,
} from './types.ts';

Expand Down Expand Up @@ -1010,12 +1012,59 @@ export class ContainerService implements DockerClientPort {
return sorted.map((c) => new ContainerHandle(this, c.Id));
}

async listContainersByLabelKey(labelKey: string, options?: { all?: boolean }): Promise<ContainerListEntry[]> {
if (!labelKey) return [];
this.log(`Listing containers by label key all=${options?.all ?? true} label=${labelKey}`);
const list: Docker.ContainerInfo[] = await this.docker.listContainers({
all: options?.all ?? true,
filters: { label: [labelKey] },
});
const sorted = [...list].sort((a: Docker.ContainerInfo, b: Docker.ContainerInfo) => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] This sorting comparator is duplicated verbatim from findContainersByLabels (line 1004). Extract it into a named helper (e.g., sortContainersByCreatedThenId) to eliminate the duplication and make future changes to the ordering policy apply consistently.

const ac = typeof a.Created === 'number' ? a.Created : 0;
const bc = typeof b.Created === 'number' ? b.Created : 0;
if (ac !== bc) return ac - bc;
const aid = String(a.Id ?? '');
const bid = String(b.Id ?? '');
return aid.localeCompare(bid);
});
return sorted
.filter((container) => typeof container.Id === 'string' && container.Id.length > 0)
.map((container) => ({ id: container.Id, labels: container.Labels ?? {} }));
}

async listContainersByVolume(volumeName: string): Promise<string[]> {
if (!volumeName) return [];
const result = await this.docker.listContainers({ all: true, filters: { volume: [volumeName] } });
return Array.isArray(result) ? result.map((it) => it.Id) : [];
}

async listVolumesByLabelKey(labelKey: string): Promise<VolumeListEntry[]> {
if (!labelKey) return [];
this.log(`Listing volumes by label key label=${labelKey}`);
const result = await this.docker.listVolumes({ filters: { label: [labelKey] } });
const volumes = Array.isArray(result?.Volumes) ? result.Volumes : [];
const sorted = [...volumes].sort((a, b) => String(a.Name ?? '').localeCompare(String(b.Name ?? '')));
return sorted
.filter((volume) => typeof volume.Name === 'string' && volume.Name.length > 0)
.map((volume) => ({ name: volume.Name, labels: volume.Labels ?? {} }));
}

async ensureVolume(volumeName: string, labels?: Record<string, string>): Promise<void> {
const name = volumeName.trim();
if (!name) return;
this.log(`Ensuring volume name=${name}`);
try {
await this.docker.createVolume({ Name: name, Labels: labels ?? {} });
} catch (e: unknown) {
const sc = typeof e === 'object' && e && 'statusCode' in e ? (e as { statusCode?: number }).statusCode : undefined;
if (sc === 409) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] Note that if the volume already exists (409), labels are silently not applied since Docker doesn't support updating volume labels post-creation. This means if a volume was created earlier (e.g., by a previous workload without labels or with different labels), ListVolumes may not return the expected volume_key. Consider adding a log at debug level when hitting the 409 path to note that labels were not applied, so operators can diagnose reconciliation mismatches.

this.debug(`Volume already exists name=${name}`);
return;
}
throw e;
}
}

async removeVolume(volumeName: string, options?: { force?: boolean }): Promise<void> {
const force = options?.force ?? false;
this.log(`Removing volume name=${volumeName} force=${force}`);
Expand Down
5 changes: 5 additions & 0 deletions src/lib/dockerClient.port.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import type {
LogsStreamOptions,
LogsStreamSession,
Platform,
ContainerListEntry,
VolumeListEntry,
} from './types.ts';

export type DockerEventFilters = Record<string, Array<string | number>>;
Expand All @@ -33,6 +35,9 @@ export interface DockerClientPort {
getContainerLabels(containerId: string): Promise<Record<string, string> | undefined>;
getContainerNetworks(containerId: string): Promise<string[]>;
findContainersByLabels(labels: Record<string, string>, options?: { all?: boolean }): Promise<ContainerHandle[]>;
listContainersByLabelKey(labelKey: string, options?: { all?: boolean }): Promise<ContainerListEntry[]>;
listVolumesByLabelKey(labelKey: string): Promise<VolumeListEntry[]>;
ensureVolume(volumeName: string, labels?: Record<string, string>): Promise<void>;
listContainersByVolume(volumeName: string): Promise<string[]>;
removeVolume(volumeName: string, options?: { force?: boolean }): Promise<void>;
findContainerByLabels(labels: Record<string, string>, options?: { all?: boolean }): Promise<ContainerHandle | undefined>;
Expand Down
10 changes: 10 additions & 0 deletions src/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,13 @@ export type ContainerInspectInfo = {
State?: ContainerInspectState;
NetworkSettings?: ContainerInspectNetworkSettings;
};

export type ContainerListEntry = {
id: string;
labels: Record<string, string>;
};

export type VolumeListEntry = {
name: string;
labels: Record<string, string>;
};
16 changes: 16 additions & 0 deletions src/proto/grpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ import {
GetWorkloadLabelsResponseSchema,
InspectWorkloadRequestSchema,
InspectWorkloadResponseSchema,
ListWorkloadsRequestSchema,
ListWorkloadsResponseSchema,
ListWorkloadsByVolumeRequestSchema,
ListWorkloadsByVolumeResponseSchema,
ListVolumesRequestSchema,
ListVolumesResponseSchema,
PutArchiveRequestSchema,
PutArchiveResponseSchema,
ReadyRequestSchema,
Expand Down Expand Up @@ -87,7 +91,9 @@ export const RUNNER_SERVICE_REMOVE_WORKLOAD_PATH = '/agynio.api.runner.v1.Runner
export const RUNNER_SERVICE_INSPECT_WORKLOAD_PATH = '/agynio.api.runner.v1.RunnerService/InspectWorkload';
export const RUNNER_SERVICE_GET_WORKLOAD_LABELS_PATH = '/agynio.api.runner.v1.RunnerService/GetWorkloadLabels';
export const RUNNER_SERVICE_FIND_WORKLOADS_BY_LABELS_PATH = '/agynio.api.runner.v1.RunnerService/FindWorkloadsByLabels';
export const RUNNER_SERVICE_LIST_WORKLOADS_PATH = '/agynio.api.runner.v1.RunnerService/ListWorkloads';
export const RUNNER_SERVICE_LIST_WORKLOADS_BY_VOLUME_PATH = '/agynio.api.runner.v1.RunnerService/ListWorkloadsByVolume';
export const RUNNER_SERVICE_LIST_VOLUMES_PATH = '/agynio.api.runner.v1.RunnerService/ListVolumes';
export const RUNNER_SERVICE_REMOVE_VOLUME_PATH = '/agynio.api.runner.v1.RunnerService/RemoveVolume';
export const RUNNER_SERVICE_TOUCH_WORKLOAD_PATH = '/agynio.api.runner.v1.RunnerService/TouchWorkload';
export const RUNNER_SERVICE_PUT_ARCHIVE_PATH = '/agynio.api.runner.v1.RunnerService/PutArchive';
Expand Down Expand Up @@ -132,11 +138,21 @@ export const runnerServiceGrpcDefinition: ServiceDefinition = {
FindWorkloadsByLabelsRequestSchema,
FindWorkloadsByLabelsResponseSchema,
),
listWorkloads: unaryDefinition(
RUNNER_SERVICE_LIST_WORKLOADS_PATH,
ListWorkloadsRequestSchema,
ListWorkloadsResponseSchema,
),
listWorkloadsByVolume: unaryDefinition(
RUNNER_SERVICE_LIST_WORKLOADS_BY_VOLUME_PATH,
ListWorkloadsByVolumeRequestSchema,
ListWorkloadsByVolumeResponseSchema,
),
listVolumes: unaryDefinition(
RUNNER_SERVICE_LIST_VOLUMES_PATH,
ListVolumesRequestSchema,
ListVolumesResponseSchema,
),
removeVolume: unaryDefinition(
RUNNER_SERVICE_REMOVE_VOLUME_PATH,
RemoveVolumeRequestSchema,
Expand Down
91 changes: 90 additions & 1 deletion src/service/grpc/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,15 @@ import {
InspectWorkloadRequest,
InspectWorkloadResponse,
InspectWorkloadResponseSchema,
ListWorkloadsRequest,
ListWorkloadsResponse,
ListWorkloadsResponseSchema,
ListWorkloadsByVolumeRequest,
ListWorkloadsByVolumeResponse,
ListWorkloadsByVolumeResponseSchema,
ListVolumesRequest,
ListVolumesResponse,
ListVolumesResponseSchema,
LogChunkSchema,
LogEndSchema,
PutArchiveRequest,
Expand Down Expand Up @@ -66,7 +72,13 @@ import {
TouchWorkloadResponseSchema,
SidecarInstance,
SidecarInstanceSchema,
VolumeKind,
VolumeListItem,
VolumeListItemSchema,
VolumeSpec,
WorkloadContainersSchema,
WorkloadListItem,
WorkloadListItemSchema,
WorkloadStatus,
} from '../../proto/gen/agynio/api/runner/v1/runner_pb.js';
import { runnerServiceGrpcDefinition } from '../../proto/grpc.js';
Expand All @@ -76,7 +88,7 @@ import type { ContainerService, InteractiveExecSession, LogsStreamSession } from
import type { ContainerHandle } from '../../lib/container.handle.ts';
import type { RunnerConfig } from '../config.ts';
import { createDockerEventsParser } from '../dockerEvents.parser.ts';
import { startWorkloadRequestToContainerOpts } from '../../contracts/workload.grpc.ts';
import { extractRequestLabels, startWorkloadRequestToContainerOpts } from '../../contracts/workload.grpc.ts';

type ExecStream = ServerDuplexStream<ExecRequest, ExecResponse>;

Expand Down Expand Up @@ -130,6 +142,8 @@ const CONTAINER_STOP_TIMEOUT_SEC = 10;
const SIDECAR_ROLE_LABEL = 'hautech.ai/role';
const SIDECAR_ROLE_VALUE = 'sidecar';
const PARENT_CONTAINER_LABEL = 'hautech.ai/parent_cid';
const WORKLOAD_KEY_LABEL = 'workload_key';
const VOLUME_KEY_LABEL = 'volume_key';

async function findSidecarHandles(containers: ContainerService, workloadId: string): Promise<ContainerHandle[]> {
try {
Expand Down Expand Up @@ -171,6 +185,34 @@ async function removeSidecars(
}
}

const resolveVolumeName = (spec: VolumeSpec): string | null => {
const persistentName = spec.persistentName?.trim() ?? '';
const fallbackName = spec.name?.trim() ?? '';
const resolved = persistentName || fallbackName;
if (!resolved) return null;
if (resolved.includes('/')) return null;
return resolved;
};

async function ensureWorkloadVolumes(
containers: ContainerService,
request: StartWorkloadRequest,
baseLabels: Record<string, string>,
): Promise<void> {
const volumeSpecs = request.volumes ?? [];
if (volumeSpecs.length === 0) return;
const ensured = new Set<string>();
for (const spec of volumeSpecs) {
if (!spec || spec.kind !== VolumeKind.NAMED) continue;
const volumeName = resolveVolumeName(spec);
if (!volumeName || ensured.has(volumeName)) continue;
const volumeLabels = { ...baseLabels, ...(spec.labels ?? {}) };
if (Object.keys(volumeLabels).length === 0) continue;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] This skips ensureVolume when volumeLabels is empty. That means named volumes without any orchestrator-provided label.* properties won't be pre-created here — Docker will create them implicitly (without labels) when the container starts.

This is likely intentional (no labels → nothing useful to apply), but it's a subtle contract: a named volume that the orchestrator doesn't label will never appear in ListVolumes results. Worth a brief comment documenting the intent.

ensured.add(volumeName);
await containers.ensureVolume(volumeName, volumeLabels);
}
}

type DockerErrorDetails = {
statusCode?: number;
status?: number;
Expand Down Expand Up @@ -366,6 +408,8 @@ export function createRunnerGrpcServer(opts: RunnerGrpcOptions): Server {
return callback(toServiceError(status.INVALID_ARGUMENT, 'main_container_required'));
}
try {
const requestLabels = extractRequestLabels(call.request);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[major] extractRequestLabels(call.request) is computed here, and then startWorkloadRequestToContainerOpts(call.request) at line 413 calls extractRequestLabels(request) again internally (line 306 in workload.grpc.ts). This means:

  1. The label extraction runs twice per startWorkload call — redundant work.
  2. Worse, if either call site is changed independently, the two label sets could diverge silently, leading to ensureWorkloadVolumes using different labels than the container itself.

Consider having startWorkloadRequestToContainerOpts accept the pre-computed labels as a parameter, or return the extracted labels alongside opts, so the extraction happens in a single place.

await ensureWorkloadVolumes(opts.containers, call.request, requestLabels);
const containerOpts = startWorkloadRequestToContainerOpts(call.request);
const sidecarOpts = Array.isArray(containerOpts.sidecars) ? containerOpts.sidecars : [];
const stopAndRemove = async (containerId: string) => {
Expand Down Expand Up @@ -562,6 +606,29 @@ export function createRunnerGrpcServer(opts: RunnerGrpcOptions): Server {
callback(toDockerServiceError(error, status.NOT_FOUND));
}
},
listWorkloads: async (
call: ServerUnaryCall<ListWorkloadsRequest, ListWorkloadsResponse>,
callback: (error: ServiceError | null, value?: ListWorkloadsResponse) => void,
) => {
try {
const containers = await opts.containers.listContainersByLabelKey(WORKLOAD_KEY_LABEL, { all: true });
const workloads: WorkloadListItem[] = [];
for (const container of containers) {
if (container.labels[SIDECAR_ROLE_LABEL] === SIDECAR_ROLE_VALUE) continue;
const workloadKey = container.labels[WORKLOAD_KEY_LABEL];
if (!workloadKey) continue;
workloads.push(
create(WorkloadListItemSchema, {
instanceId: container.id,
workloadKey,
}),
);
}
callback(null, create(ListWorkloadsResponseSchema, { workloads }));
} catch (error) {
callback(toDockerServiceError(error, status.UNKNOWN));
}
},
findWorkloadsByLabels: async (
call: ServerUnaryCall<FindWorkloadsByLabelsRequest, FindWorkloadsByLabelsResponse>,
callback: (error: ServiceError | null, value?: FindWorkloadsByLabelsResponse) => void,
Expand Down Expand Up @@ -597,6 +664,28 @@ export function createRunnerGrpcServer(opts: RunnerGrpcOptions): Server {
callback(toDockerServiceError(error, status.UNKNOWN));
}
},
listVolumes: async (
call: ServerUnaryCall<ListVolumesRequest, ListVolumesResponse>,
callback: (error: ServiceError | null, value?: ListVolumesResponse) => void,
) => {
try {
const volumes = await opts.containers.listVolumesByLabelKey(VOLUME_KEY_LABEL);
const items: VolumeListItem[] = [];
for (const volume of volumes) {
const volumeKey = volume.labels[VOLUME_KEY_LABEL];
if (!volumeKey) continue;
items.push(
create(VolumeListItemSchema, {
instanceId: volume.name,
volumeKey,
}),
);
}
callback(null, create(ListVolumesResponseSchema, { volumes: items }));
} catch (error) {
callback(toDockerServiceError(error, status.UNKNOWN));
}
},
removeVolume: async (
call: ServerUnaryCall<RemoveVolumeRequest, RemoveVolumeResponse>,
callback: (error: ServiceError | null, value?: RemoveVolumeResponse) => void,
Expand Down
Loading