Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .fernignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ tests/unit/fetcher/stream-wrappers/webpack.test.ts
.prettierrc.yml
.prettierignore
.gitignore
babel.config.js

# Package Scripts

Expand Down
10 changes: 0 additions & 10 deletions babel.config.js

This file was deleted.

5 changes: 1 addition & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,7 @@
"jsonschema": "^1.4.1",
"@types/cli-progress": "^3.11.6",
"@types/lodash": "4.14.74",
"@trivago/prettier-plugin-sort-imports": "^4.3.0",
"babel-jest": "^29.7.0",
"@babel/core": "^7.26.0",
"@babel/preset-env": "^7.26.0"
"@trivago/prettier-plugin-sort-imports": "^4.3.0"
},
"browser": {
"fs": false,
Expand Down
147 changes: 147 additions & 0 deletions src/context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import * as contextApi from "@opentelemetry/api";

import { HumanloopRuntimeError } from "./error";
import {
HUMANLOOP_CONTEXT_DECORATOR,
HUMANLOOP_CONTEXT_EVALUATION,
HUMANLOOP_CONTEXT_TRACE_ID,
} from "./otel/constants";

export function getTraceId(): string | undefined {
const key = contextApi.createContextKey(HUMANLOOP_CONTEXT_TRACE_ID);
const value = contextApi.context.active().getValue(key);
return (value || undefined) as string | undefined;
}

export function setTraceId(flowLogId: string): contextApi.Context {
const key = contextApi.createContextKey(HUMANLOOP_CONTEXT_TRACE_ID);
return contextApi.context.active().setValue(key, flowLogId);
}

export type DecoratorContext = {
path: string;
type: "prompt" | "tool" | "flow";
version: Record<string, unknown>;
};

export function setDecoratorContext(
decoratorContext: DecoratorContext,
): contextApi.Context {
const key = contextApi.createContextKey(HUMANLOOP_CONTEXT_DECORATOR);
return contextApi.context.active().setValue(key, decoratorContext);
}

export function getDecoratorContext(): DecoratorContext | undefined {
const key = contextApi.createContextKey(HUMANLOOP_CONTEXT_DECORATOR);
return (contextApi.context.active().getValue(key) || undefined) as
| DecoratorContext
| undefined;
}

export class EvaluationContext {
public sourceDatapointId: string;
public runId: string;
public fileId: string;
public path: string;
private _logged: boolean;
private _callback: (log_id: string) => Promise<void>;

constructor({
sourceDatapointId,
runId,
evalCallback,
fileId,
path,
}: {
sourceDatapointId: string;
runId: string;
evalCallback: (log_id: string) => Promise<void>;
fileId: string;
path: string;
}) {
this.sourceDatapointId = sourceDatapointId;
this.runId = runId;
this._callback = evalCallback;
this.fileId = fileId;
this.path = path;
this._logged = false;
}

public get logged(): boolean {
return this._logged;
}

public logArgsWithContext({
logArgs,
forOtel,
path,
fileId,
}: {
logArgs: Record<string, any>;
forOtel: boolean;
path?: string;
fileId?: string;
}): [Record<string, any>, ((log_id: string) => Promise<void>) | null] {
if (path === undefined && fileId === undefined) {
throw new HumanloopRuntimeError(
"Internal error: Evaluation context called without providing a path or file_id",
);
}

if (this._logged) {
return [logArgs, null];
}

if (this.path !== undefined && this.path === path) {
this._logged = true;
return [
forOtel
? {
...logArgs,
source_datapoint_id: this.sourceDatapointId,
run_id: this.runId,
}
: {
...logArgs,
sourceDatapointId: this.sourceDatapointId,
runId: this.runId,
},
this._callback,
];
} else if (this.fileId !== undefined && this.fileId === fileId) {
this._logged = true;
return [
forOtel
? {
...logArgs,
sourceDatapointId: this.sourceDatapointId,
runId: this.runId,
}
: {
...logArgs,
sourceDatapointId: this.sourceDatapointId,
runId: this.runId,
},
this._callback,
];
} else {
return [logArgs, null];
}
}
}

// ... existing code ...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i assume this is AI?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(is this a sign that any other bits of code need cleanup/looking at?)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


export function setEvaluationContext(
evaluationContext: EvaluationContext,
): contextApi.Context {
const key = contextApi.createContextKey(HUMANLOOP_CONTEXT_EVALUATION);
return contextApi.context.active().setValue(key, evaluationContext);
}

export function getEvaluationContext(): EvaluationContext | undefined {
const key = contextApi.createContextKey(HUMANLOOP_CONTEXT_EVALUATION);
return (contextApi.context.active().getValue(key) || undefined) as
| EvaluationContext
| undefined;
}
151 changes: 151 additions & 0 deletions src/decorators/flow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import * as contextApi from "@opentelemetry/api";
import { ReadableSpan, Tracer } from "@opentelemetry/sdk-trace-node";

import { HumanloopClient } from "../Client";
import { ChatMessage, FlowLogRequest, FlowLogResponse } from "../api";
import { getTraceId, setDecoratorContext, setTraceId } from "../context";
import { jsonifyIfNotString, writeToOpenTelemetrySpan } from "../otel";
import {
HUMANLOOP_FILE_TYPE_KEY,
HUMANLOOP_FLOW_SPAN_NAME,
HUMANLOOP_LOG_KEY,
HUMANLOOP_PATH_KEY,
} from "../otel/constants";

export function flowUtilityFactory<I, O>(
client: HumanloopClient,
opentelemetryTracer: Tracer,
callable: (
args: I extends Record<string, unknown> & {
messages?: ChatMessage[];
}
? I
: never,
) => O,
path: string,
attributes?: Record<string, unknown>,
): (args: I) => Promise<O | undefined> & {
file: {
type: string;
version: { attributes?: Record<string, unknown> };
callable: (args: I) => Promise<O | undefined>;
};
} {
const flowKernel = { attributes: attributes || {} };
const fileType = "flow";

const wrappedFunction = async (
inputs: I extends Record<string, unknown> & {
messages?: ChatMessage[];
}
? I
: never,
) => {
return contextApi.context.with(
setDecoratorContext({
path: path,
type: fileType,
version: flowKernel,
}),
async () => {
return opentelemetryTracer.startActiveSpan(
HUMANLOOP_FLOW_SPAN_NAME,
async (span) => {
span.setAttribute(HUMANLOOP_PATH_KEY, path);
span.setAttribute(HUMANLOOP_FILE_TYPE_KEY, fileType);
const traceId = getTraceId();

const logInputs = { ...inputs } as Record<string, unknown>;
const logMessages = logInputs.messages as
| ChatMessage[]
| undefined;
delete logInputs.messages;

const initLogInputs: FlowLogRequest = {
inputs: logInputs,
messages: logMessages,
traceParentId: traceId,
};

const flowLogResponse: FlowLogResponse =
// @ts-ignore
await client.flows._log({
path,
flow: flowKernel,
logStatus: "incomplete",
...initLogInputs,
});

return await contextApi.context.with(
setTraceId(flowLogResponse.id),
async () => {
let logOutput: string | undefined;
let outputMessage: ChatMessage | undefined;
let logError: string | undefined;
let funcOutput: O | undefined;

try {
funcOutput = await callable(inputs);
if (
// @ts-ignore
funcOutput instanceof Object &&
"role" in funcOutput &&
"content" in funcOutput
) {
outputMessage =
funcOutput as unknown as ChatMessage;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

logOutput = undefined;
} else {
logOutput = jsonifyIfNotString(
callable,
funcOutput,
);
outputMessage = undefined;
}
logError = undefined;
} catch (err: any) {
console.error(
`Error calling ${callable.name}:`,
err,
);
logOutput = undefined;
outputMessage = undefined;
logError = err.message || String(err);
funcOutput = undefined as unknown as O;
}

const updatedFlowLog = {
log_status: "complete",
output: logOutput,
error: logError,
output_message: outputMessage,
id: flowLogResponse.id,
};

writeToOpenTelemetrySpan(
span as unknown as ReadableSpan,
// @ts-ignore
updatedFlowLog,
HUMANLOOP_LOG_KEY,
);

span.end();
return funcOutput;
},
);
},
);
},
);
};

// @ts-ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally these would be scoped

return Object.assign(wrappedFunction, {
file: {
path: path,
type: fileType,
version: flowKernel,
callable: wrappedFunction,
},
});
}
2 changes: 2 additions & 0 deletions src/decorators/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { flowUtilityFactory } from "./flow";
export { toolUtilityFactory } from "./tool";
35 changes: 35 additions & 0 deletions src/decorators/prompt.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import * as contextApi from "@opentelemetry/api";

import { setDecoratorContext } from "../evals";

export function promptDecoratorFactory<I, O>(path: string, callable: (inputs: I) => O) {
const fileType = "prompt";

const wrappedFunction = (inputs: I) => {
return contextApi.context.with(
setDecoratorContext({
path: path,
type: fileType,
version: {
// TODO: Implement reverse lookup of template
template: undefined,
},
}),
async () => {
return await callable(inputs);
},
);
};

return Object.assign(wrappedFunction, {
file: {
path: path,
type: fileType,
version: {
// TODO: Implement reverse lookup of template
template: undefined,
},
callable: wrappedFunction,
},
});
}
Loading