Source code

Revision control

Copy as Markdown

Other Tools

/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
/**
* @typedef {import("../../content/Utils.sys.mjs").ProgressAndStatusCallbackParams} ProgressAndStatusCallbackParams
*/
/* eslint-disable-next-line mozilla/reject-import-system-module-from-non-system */
import { Progress } from "chrome://global/content/ml/Utils.sys.mjs";
/* eslint-disable-next-line mozilla/reject-import-system-module-from-non-system */
import { AppConstants } from "resource://gre/modules/AppConstants.sys.mjs";
/**
* Log level set by the pipeline.
*
* @type {string}
*/
let _logLevel = "Error";
/**
* Lazy initialization container.
*
* @type {object}
*/
const lazy = {};
ChromeUtils.defineLazyGetter(lazy, "console", () => {
return console.createInstance({
maxLogLevel: _logLevel, // we can't use maxLogLevelPref in workers.
prefix: "ML:OpenAIPipeline",
});
});
ChromeUtils.defineESModuleGetters(
lazy,
{
setLogLevel: "chrome://global/content/ml/Utils.sys.mjs",
},
{ global: "current" }
);
export class OpenAIPipeline {
#errorFactory = null;
#options = null;
OpenAILib = null;
constructor(options, errorFactory) {
this.#errorFactory = errorFactory;
this.#options = options;
}
static async initialize(mlEngineWorker, wasm, options = {}, errorFactory) {
lazy.console.debug("Initializing OpenAI pipeline");
if (AppConstants.NIGHTLY_BUILD) {
OpenAIPipeline.OpenAILib = await import(
"chrome://global/content/ml/openai-dev.mjs"
);
} else {
OpenAIPipeline.OpenAILib = await import(
"chrome://global/content/ml/openai.mjs"
);
}
if (options.logLevel) {
_logLevel = options.logLevel;
lazy.setLogLevel(options.logLevel); // setting Utils log level
}
let config = {};
options.applyToConfig(config);
config.backend = config.backend || "openai";
// reapply logLevel if it has changed.
if (lazy.console.logLevel != config.logLevel) {
lazy.console.logLevel = config.logLevel;
}
return new OpenAIPipeline(config, errorFactory);
}
/**
* Sends progress updates to both the port and inference progress callback.
*
* @private
* @param {object} args - The arguments object
* @param {string} args.content - The text content to send in the progress update
* @param {string|null} args.requestId - Unique identifier for the request
* @param {Function|null} args.inferenceProgressCallback - Callback function to report inference progress
* @param {MessagePort|null} args.port - Port for posting messages to the caller
* @param {boolean} args.isDone - Whether this is the final progress update
*/
#sendProgress(args) {
const {
content,
requestId,
inferenceProgressCallback,
port,
isDone,
toolCalls,
} = args;
port?.postMessage({
text: content,
...(toolCalls ? { toolCalls } : {}),
...(isDone && { done: true, finalOutput: content }),
ok: true,
});
inferenceProgressCallback?.({
ok: true,
metadata: {
text: content,
requestId,
tokens: [],
...(toolCalls ? { toolCalls } : {}),
},
type: Progress.ProgressType.INFERENCE,
statusText: isDone
? Progress.ProgressStatusText.DONE
: Progress.ProgressStatusText.IN_PROGRESS,
});
}
/**
* Finalizes the tool calls by sorting them by index and returning an array of call objects.
*
* @param {map} acc
* @returns {Array}
*/
#finalizeToolCalls(acc) {
// Convert Map entries ([index, call]) to an array
const entries = Array.from(acc.entries());
// Sort by the numeric index
entries.sort((a, b) => a[0] - b[0]);
// Return just the call objects (drop the index)
return entries.map(([_index, call]) => call);
}
/**
* Because we are streaming here, we may get multiple partial tool_calls deltas and need to merge them together.
* This helper does that by looking at the index property on each tool call fragment.
*
* @param {map} acc - Accumulated tool calls map
* @param {Array} deltas - New tool call fragments to merge
* @returns {map} Merged tool calls map
*/
#mergeToolDeltas(acc, deltas) {
// If no deltas, return acc unchanged
if (!Array.isArray(deltas) || deltas.length === 0) {
return acc;
}
let next = new Map(acc); // shallow copy to keep immutability
for (const toolCall of deltas) {
const idx = toolCall.index ?? 0;
const existing = next.get(idx) || {
id: null,
type: "function",
function: { name: "", arguments: "" },
};
const nameFrag = toolCall.function?.name ?? "";
const argsFrag = toolCall.function?.arguments ?? "";
// Merge fragments into previous entry
const merged = {
id: toolCall.id ?? existing.id,
type: "function",
function: {
name: nameFrag
? existing.function.name + nameFrag
: existing.function.name,
arguments: argsFrag
? existing.function.arguments + argsFrag
: existing.function.arguments,
},
};
next.set(idx, merged);
}
return next;
}
/**
* Handles streaming response from the OpenAI API.
* Processes each chunk as it arrives and sends progress updates.
*
* @private
* @param {object} args - The arguments object
* @param {OpenAI} args.client - OpenAI client instance
* @param {object} args.completionParams - Parameters for the completion request
* @param {string|null} args.requestId - Unique identifier for the request
* @param {Function|null} args.inferenceProgressCallback - Callback function to report inference progress
* @param {MessagePort|null} args.port - Port for posting messages to the caller
* @returns {Promise<object>} Result object with done, finalOutput, ok, and metrics properties
*/
async #handleStreamingResponse(args) {
const {
client,
completionParams,
requestId,
inferenceProgressCallback,
port,
} = args;
const stream = await client.chat.completions.create(completionParams);
let streamOutput = "";
let toolAcc = new Map();
let sawToolCallsFinish = false;
for await (const chunk of stream) {
const choice = chunk?.choices?.[0];
const delta = choice?.delta ?? {};
// Normal text tokens
if (delta.content) {
streamOutput += delta.content;
this.#sendProgress({
content: delta.content,
requestId,
inferenceProgressCallback,
port,
isDone: false,
});
}
if (Array.isArray(delta.tool_calls) && delta.tool_calls.length) {
toolAcc = this.#mergeToolDeltas(toolAcc, delta.tool_calls);
}
// If the model signals it wants tools now
if (choice?.finish_reason === "tool_calls") {
sawToolCallsFinish = true;
const toolCalls = this.#finalizeToolCalls(toolAcc);
// Emit the completed tool calls to the caller so they can execute them.
this.#sendProgress({
content: "", // no user-visible text here
requestId,
inferenceProgressCallback,
port,
isDone: false,
toolCalls,
});
// Typically end this assistant turn here.
break;
}
}
// Final message: does not carry full content to avoid duplication
this.#sendProgress({
requestId,
inferenceProgressCallback,
port,
isDone: true,
});
return {
finalOutput: streamOutput,
metrics: [],
...(sawToolCallsFinish
? { toolCalls: this.#finalizeToolCalls(toolAcc) }
: {}),
};
}
/**
* Handles non-streaming response from the OpenAI API.
* Waits for the complete response and sends it as a single update.
*
* @private
* @param {object} args - The arguments object
* @param {OpenAI} args.client - OpenAI client instance
* @param {object} args.completionParams - Parameters for the completion request
* @param {string|null} args.requestId - Unique identifier for the request
* @param {Function|null} args.inferenceProgressCallback - Callback function to report inference progress
* @param {MessagePort|null} args.port - Port for posting messages to the caller
* @returns {Promise<object>} Result object with done, finalOutput, ok, and metrics properties
*/
async #handleNonStreamingResponse(args) {
const {
client,
completionParams,
requestId,
inferenceProgressCallback,
port,
} = args;
const completion = await client.chat.completions.create(completionParams);
const message = completion.choices[0].message;
const output = message.content || "";
const toolCalls = message.tool_calls || null;
this.#sendProgress({
content: output,
requestId,
inferenceProgressCallback,
port,
isDone: true,
toolCalls,
});
return {
finalOutput: output,
metrics: [],
...(toolCalls ? { toolCalls } : {}),
};
}
/**
* Executes the OpenAI pipeline with the given request.
* Supports both streaming and non-streaming modes based on options configuration.
*
* @param {object} request - The request object containing the messages
* @param {Array} request.args - Array of message objects for the chat completion
* @param {string|null} [requestId=null] - Unique identifier for this request
* @param {Function|null} [inferenceProgressCallback=null] - Callback function to report progress during inference
* @param {MessagePort|null} [port=null] - Port for posting messages back to the caller
* @returns {Promise<object>} Result object containing completion status, output, and metrics
* @throws {Error} Throws backend error if the API request fails
*/
async run(
request,
requestId = null,
inferenceProgressCallback = null,
port = null
) {
lazy.console.debug("Running OpenAI pipeline");
try {
const { baseURL, apiKey, modelId } = this.#options;
const client = new OpenAIPipeline.OpenAILib.OpenAI({
baseURL: baseURL ? baseURL : "http://localhost:11434/v1",
apiKey: apiKey || "ollama",
});
const stream = request.streamOptions?.enabled || false;
const tools = request.tools || [];
const completionParams = {
model: modelId,
messages: request.args,
stream,
tools,
};
const args = {
client,
completionParams,
requestId,
inferenceProgressCallback,
port,
};
if (stream) {
return await this.#handleStreamingResponse(args);
}
return await this.#handleNonStreamingResponse(args);
} catch (error) {
const backendError = this.#errorFactory(error);
port?.postMessage({ done: true, ok: false, error: backendError });
inferenceProgressCallback?.({
ok: false,
metadata: {
text: "",
requestId,
tokens: [],
},
type: Progress.ProgressType.INFERENCE,
statusText: Progress.ProgressStatusText.DONE,
});
throw backendError;
}
}
}