Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,013 changes: 1,077 additions & 1,936 deletions package-lock.json

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions src/jest-global.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
declare namespace jest {
type Mock = any;
function fn(): any;
function setTimeout(ms: number): void;
}

export {};
12 changes: 12 additions & 0 deletions src/jest.local.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
module.exports = {
moduleFileExtensions: ['js', 'json', 'ts'],
rootDir: '..',
testRegex: '.*\\.spec\\.ts$',
transform: {
'^.+\\.(t|j)s$': 'ts-jest',
},
setupFilesAfterEnv: [],
testEnvironment: 'node',
verbose: true,
forceExit: true,
};
30 changes: 30 additions & 0 deletions src/monitoring/capacity-planning.alert.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { CapacityPlanningService } from './capacity-planning.service';

describe('CapacityPlanningService alerting integration', () => {
it('fires alert when forecast predicts exhaustion', async () => {
const mockMetrics: any = {};
const mockAlerting: any = { sendAlert: jest.fn() };

// Worker orchestration will report small pool so utilization becomes high
const mockWorkerOrchestration: any = {
getPoolStatistics: () => ({ totalWorkers: 1, totalJobsProcessed: 0, averageExecutionTime: 2000 }),
getAllWorkerMetrics: () => [],
};

const svc = new CapacityPlanningService(mockMetrics, mockWorkerOrchestration, mockAlerting);

// Seed samples with rapidly increasing jobsProcessed to create upward trend
const now = Date.now();
(svc as any).samples = [
{ timestamp: now - 4 * 60_000, totalJobsProcessed: 10, averageExecutionTimeMs: 2000, totalWorkers: 1 },
{ timestamp: now - 3 * 60_000, totalJobsProcessed: 30, averageExecutionTimeMs: 2000, totalWorkers: 1 },
{ timestamp: now - 2 * 60_000, totalJobsProcessed: 90, averageExecutionTimeMs: 2000, totalWorkers: 1 },
{ timestamp: now - 1 * 60_000, totalJobsProcessed: 270, averageExecutionTimeMs: 2000, totalWorkers: 1 },
];

// Now invoke analysis which should detect predicted utilization >= 90% and call sendAlert
await svc.sampleAndAnalyze();

expect(mockAlerting.sendAlert).toHaveBeenCalled();
});
});
18 changes: 18 additions & 0 deletions src/monitoring/capacity-planning.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Controller, Get } from '@nestjs/common';
import { CapacityPlanningService } from './capacity-planning.service';

@Controller('monitoring/capacity')
export class CapacityPlanningController {
constructor(private readonly capacity: CapacityPlanningService) {}

@Get('forecast')
getForecast() {
// return short forecast for next 60 minutes
return this.capacity.forecastUtilizationMinutes(60);
}

@Get('recommendation')
getRecommendation() {
return this.capacity.getRecommendations();
}
}
42 changes: 42 additions & 0 deletions src/monitoring/capacity-planning.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { CapacityPlanningService } from './capacity-planning.service';

const noop: any = () => {};
const mockMetrics: any = {};
const mockWorkerOrchestration: any = {
getPoolStatistics: () => ({ totalWorkers: 4, totalJobsProcessed: 120, averageExecutionTime: 200 }),
};
const mockAlerting: any = { sendAlert: noop };

describe('CapacityPlanningService', () => {
let svc: CapacityPlanningService;

beforeEach(() => {
svc = new CapacityPlanningService(mockMetrics, mockWorkerOrchestration, mockAlerting);
});

it('computes linear regression correctly for increasing points', () => {
const points = [
{ x: 0, y: 10 },
{ x: 1, y: 20 },
{ x: 2, y: 30 },
{ x: 3, y: 40 },
];
const { slope, intercept } = (svc as any).linearRegression(points);
expect(Math.round(slope)).toBe(10);
expect(Math.round(intercept)).toBe(10);
});

it('recommends more workers when predicted util is high', () => {
const recommended = svc.recommendWorkers(1.0, 4); // 100% -> target 60%
expect(recommended).toBeGreaterThan(4);
const same = svc.recommendWorkers(0.4, 4); // 40% -> below target
expect(same).toBe(4);
});

it('forecasts flat when not enough samples', () => {
(svc as any).samples = [];
const out = svc.forecastUtilizationMinutes(5);
expect(out.length).toBe(5);
expect(out.every((p) => p.utilization === 0)).toBe(true);
});
});
202 changes: 202 additions & 0 deletions src/monitoring/capacity-planning.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { MetricsCollectionService } from './metrics/metrics-collection.service';
import { WorkerOrchestrationService } from '../workers/orchestration/worker-orchestration.service';
import { AlertingService } from './alerting/alerting.service';

type Sample = {
timestamp: number;
totalJobsProcessed: number;
averageExecutionTimeMs: number;
totalWorkers: number;
};

/**
* Capacity Planning Service
* - Collects lightweight time-series from existing metrics
* - Runs a simple linear regression forecast
* - Fires alerts before capacity exhaustion and suggests scaling
*/
@Injectable()
export class CapacityPlanningService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(CapacityPlanningService.name);
private readonly samples: Sample[] = [];
private readonly maxSamples = 120; // keep up to last 120 minutes

private running = false;

constructor(
private readonly metrics: MetricsCollectionService,
private readonly workerOrchestration: WorkerOrchestrationService,
private readonly alerting: AlertingService,
) {}

onModuleInit(): void {
this.running = true;
this.logger.log('CapacityPlanningService initialized');
}

onModuleDestroy(): void {
this.running = false;
}

@Cron(CronExpression.EVERY_MINUTE)
async sampleAndAnalyze(): Promise<void> {
try {
// Gather lightweight stats from worker orchestration
const pool = this.workerOrchestration.getPoolStatistics();
const totalWorkers = pool.totalWorkers ?? 1;
const totalJobsProcessed = pool.totalJobsProcessed ?? 0;
const averageExecutionTime = pool.averageExecutionTime ?? 0; // ms

const sample: Sample = {
timestamp: Date.now(),
totalJobsProcessed,
averageExecutionTimeMs: averageExecutionTime,
totalWorkers,
};

this.pushSample(sample);
const forecast = this.forecastUtilizationMinutes(60);

// If any forecast point exceeds threshold, alert and recommend scaling
const threshold = 0.9; // 90% utilization
const exceed = forecast.find((f) => f.utilization >= threshold);
if (exceed) {
const minutesAhead = exceed.minutesAhead;
const predictedUtil = exceed.utilization;
const recommendedWorkers = this.recommendWorkers(predictedUtil, totalWorkers);

const message = `Projected utilization ${Math.round(predictedUtil * 100)}% in ${minutesAhead}m — recommend scaling from ${totalWorkers} → ${recommendedWorkers}`;
this.alerting.sendAlert('CAPACITY_PLANNING_WARNING', message, 'WARNING', {
predictedUtil: Math.round(predictedUtil * 100),
minutesAhead,
currentWorkers: totalWorkers,
recommendedWorkers,
});
}
} catch (err) {
this.logger.error('Capacity planning failed: ' + (err as Error).message);
}
}

private pushSample(s: Sample) {
this.samples.push(s);
if (this.samples.length > this.maxSamples) this.samples.shift();
}

/**
* Forecast utilization for the next N minutes.
* Utilization = (jobsPerMinute * avgExecutionTimeMs) / (totalWorkers * 60_000)
*/
forecastUtilizationMinutes(minutes: number): { minutesAhead: number; utilization: number }[] {
if (this.samples.length < 3) {
// Not enough data — return conservative flat forecast
const latest = this.samples[this.samples.length - 1];
const util = latest
? (latest.totalJobsProcessed * latest.averageExecutionTimeMs) / (latest.totalWorkers * 60_000)
: 0;
return Array.from({ length: minutes }, (_, i) => ({ minutesAhead: i + 1, utilization: util }));
}

// Build time series for jobs per minute
const points = this.samples.map((s, i) => ({ x: i, y: s.totalJobsProcessed }));
const { slope, intercept } = this.linearRegression(points);

const last = this.samples[this.samples.length - 1];
const result: { minutesAhead: number; utilization: number }[] = [];
for (let m = 1; m <= minutes; m++) {
const predictedJobs = Math.max(0, intercept + slope * (points.length + m - 1));
const predictedAvgExec = last.averageExecutionTimeMs;
const predictedWorkers = last.totalWorkers || 1;
const util = (predictedJobs * predictedAvgExec) / (predictedWorkers * 60_000);
result.push({ minutesAhead: m, utilization: util });
}

return result;
}

/**
* Linear regression (least squares) on integer x,y points
*/
linearRegression(points: { x: number; y: number }[]): { slope: number; intercept: number } {
const n = points.length;
let sumX = 0,
sumY = 0,
sumXY = 0,
sumXX = 0;
for (const p of points) {
sumX += p.x;
sumY += p.y;
sumXY += p.x * p.y;
sumXX += p.x * p.x;
}
const slope = (n * sumXY - sumX * sumY) / Math.max(1, n * sumXX - sumX * sumX);
const intercept = (sumY - slope * sumX) / n;
return { slope, intercept };
}

recommendWorkers(predictedUtil: number, currentWorkers: number): number {
// target utilization 60% (0.6)
const target = 0.6;
if (predictedUtil <= target) return currentWorkers;
const needed = Math.ceil((predictedUtil / target) * currentWorkers);
return Math.max(1, needed);
}

/**
* Produce per-worker-type recommendations and summary.
* Uses current worker metrics to estimate jobs/minute and utilization.
*/
getRecommendations(): Array<{
workerType: string;
currentWorkers: number;
jobsPerMinute: number;
averageExecutionTimeMs: number;
utilization: number; // 0..1
recommendedWorkers: number;
reason?: string;
}> {
const metrics = this.workerOrchestration.getAllWorkerMetrics();
const byType = new Map<string, any[]>();
for (const m of metrics) {
const arr = byType.get(m.workerType) || [];
arr.push(m);
byType.set(m.workerType, arr);
}

const results: any[] = [];
for (const [workerType, arr] of byType.entries()) {
const currentWorkers = arr.length;
// Estimate jobs per minute using jobsProcessed / (uptime minutes)
let jobsPerMinute = 0;
let avgExec = 0;
let totalUptimeMs = 0;
let totalJobs = 0;
for (const w of arr) {
totalJobs += w.jobsProcessed || 0;
totalUptimeMs += w.uptime || 0;
avgExec += (w.averageExecutionTime || 0) * 1; // accumulate
}
avgExec = currentWorkers > 0 ? avgExec / currentWorkers : 0;
const uptimeMinutes = totalUptimeMs / 60000 || 1; // avoid div0
jobsPerMinute = uptimeMinutes > 0 ? totalJobs / uptimeMinutes : 0;

const utilization = (jobsPerMinute * avgExec) / (Math.max(1, currentWorkers) * 60000);
const recommendedWorkers = this.recommendWorkers(utilization, currentWorkers);
const reason = utilization >= 0.9 ? 'projected high utilization' : utilization >= 0.6 ? 'above target' : 'within target';

results.push({
workerType,
currentWorkers,
jobsPerMinute: Number(jobsPerMinute.toFixed(2)),
averageExecutionTimeMs: Number(avgExec.toFixed(2)),
utilization: Number(utilization.toFixed(4)),
recommendedWorkers,
reason,
});
}

return results;
}
}
4 changes: 4 additions & 0 deletions src/test-typings.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
declare var jest: any;
declare function afterEach(fn: () => void): void;
declare function afterAll(fn: () => void): void;
export {};
42 changes: 42 additions & 0 deletions test/monitoring/capacity-planning.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { CapacityPlanningService } from '../../src/monitoring/capacity-planning.service';

const noop: any = () => {};
const mockMetrics: any = {};
const mockWorkerOrchestration: any = {
getPoolStatistics: () => ({ totalWorkers: 4, totalJobsProcessed: 120, averageExecutionTime: 200 }),
};
const mockAlerting: any = { sendAlert: noop };

describe('CapacityPlanningService', () => {
let svc: CapacityPlanningService;

beforeEach(() => {
svc = new CapacityPlanningService(mockMetrics, mockWorkerOrchestration, mockAlerting);
});

it('computes linear regression correctly for increasing points', () => {
const points = [
{ x: 0, y: 10 },
{ x: 1, y: 20 },
{ x: 2, y: 30 },
{ x: 3, y: 40 },
];
const { slope, intercept } = (svc as any).linearRegression(points);
expect(Math.round(slope)).toBe(10);
expect(Math.round(intercept)).toBe(10);
});

it('recommends more workers when predicted util is high', () => {
const recommended = svc.recommendWorkers(1.0, 4); // 100% -> target 60%
expect(recommended).toBeGreaterThan(4);
const same = svc.recommendWorkers(0.4, 4); // 40% -> below target
expect(same).toBe(4);
});

it('forecasts flat when not enough samples', () => {
(svc as any).samples = [];
const out = svc.forecastUtilizationMinutes(5);
expect(out.length).toBe(5);
expect(out.every((p) => p.utilization === 0)).toBe(true);
});
});
1 change: 1 addition & 0 deletions test/setup.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Test setup file to optimize memory usage and test performance
/// <reference path="../src/test-typings.d.ts" />
export {};

declare global {
Expand Down
4 changes: 2 additions & 2 deletions tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@
"lib": ["es2021", "dom"],
"types": ["node", "jest"]
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist", "test/**/*"]
"include": ["src/**/*", "test/**/*"],
"exclude": ["node_modules", "dist"]
}
Loading