Source code
Revision control
Copy as Markdown
Other Tools
/*
This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
const lazy = {};
ChromeUtils.defineESModuleGetters(lazy, {
Sqlite: "resource://gre/modules/Sqlite.sys.mjs",
});
ChromeUtils.defineLazyGetter(lazy, "log", function () {
return console.createInstance({
prefix: "ChatStore",
maxLogLevelPref: "browser.aiwindow.chatStore.loglevel",
});
});
import {
CONVERSATION_TABLE,
CONVERSATION_UPDATED_DATE_INDEX,
CONVERSATION_INSERT,
MESSAGE_TABLE,
MESSAGE_ORDINAL_INDEX,
MESSAGE_URL_INDEX,
MESSAGE_CREATED_DATE_INDEX,
MESSAGE_CONV_ID_INDEX,
MESSAGE_INSERT,
CONVERSATIONS_MOST_RECENT,
CONVERSATION_BY_ID,
CONVERSATIONS_BY_DATE,
CONVERSATIONS_BY_URL,
CONVERSATIONS_CONTENT_SEARCH,
CONVERSATIONS_CONTENT_SEARCH_BY_ROLE,
CONVERSATIONS_HISTORY_SEARCH,
MESSAGES_BY_DATE,
MESSAGES_BY_DATE_AND_ROLE,
DELETE_CONVERSATION_BY_ID,
CONVERSATIONS_OLDEST,
CONVERSATION_HISTORY,
ESCAPE_CHAR,
getConversationMessagesSql,
} from "./ChatSql.sys.mjs";
import { ChatMinimal } from "./ChatMessage.sys.mjs";
export { ChatConversation } from "./ChatConversation.sys.mjs";
export { ChatMessage, ChatMinimal } from "./ChatMessage.sys.mjs";
export {
CONVERSATION_STATUS,
MESSAGE_ROLE,
INSIGHTS_FLAG_SOURCE,
} from "./ChatConstants.sys.mjs";
import {
CURRENT_SCHEMA_VERSION,
DB_FOLDER_PATH,
DB_FILE_NAME,
PREF_BRANCH,
CONVERSATION_STATUS,
} from "./ChatConstants.sys.mjs";
import {
parseConversationRow,
parseMessageRows,
parseChatHistoryViewRows,
toJSONOrNull,
} from "./ChatUtils.sys.mjs";
// NOTE: Reference to migrations file, migrations.mjs has an example
// migration function set up for a migration, and the eslint-disable-next-line
// should be removed once we create the first migration.
//
// eslint-disable-next-line no-unused-vars
import { migrations } from "./ChatMigrations.sys.mjs";
const MAX_DB_SIZE_BYTES = 75 * 1024 * 1024;
const SORTS = ["ASC", "DESC"];
/**
* Simple interface to store and retrieve chat conversations and messages.
*
* Move this documentation to Firefox source docs
*
*
* @example
* let { ChatStore, ChatConversation, ChatMessage, MESSAGE_ROLE } =
* const chatStore = new ChatStore();
* const conversation = new ChatConversation({
* title: "title",
* description: "description",
* pageMeta: { one: 1, two: 2 },
* });
* const msg1 = new ChatMessage({
* ordinal: 0,
* role: MESSAGE_ROLE.USER,
* modelId: "test",
* params: { one: "one" },
* usage: { two: "two", content: "some content" },
* });
* const msg2 = new ChatMessage({
* ordinal: 1,
* role: MESSAGE_ROLE.ASSISTANT,
* modelId: "test",
* params: { one: "one" },
* usage: { two: "two", content: "some content 2" },
* });
* conversation.messages = [msg1, msg2];
* await chatStore.updateConversation(conversation);
* // Or findConversationsByDate, findConversationsByURL.
* const foundConversation =
* await chatStore.findConversationById(conversation.id);
*
* @typedef {object} ChatStore
*
* @property {*} x ?
*/
export class ChatStore {
#asyncShutdownBlocker;
#conn;
#promiseConn;
constructor() {
this.#asyncShutdownBlocker = async () => {
await this.#closeConnection();
};
}
/**
* Updates a conversation's saved state in the SQLite db
*
* @param {ChatConversation} conversation
*/
async updateConversation(conversation) {
await this.#ensureDatabase().catch(e => {
lazy.log.error("Could not ensure a database connection.");
throw e;
});
const pageUrl = URL.parse(conversation.pageUrl);
await this.#conn
.executeTransaction(async () => {
await this.#conn.executeCached(CONVERSATION_INSERT, {
conv_id: conversation.id,
title: conversation.title,
description: conversation.description,
page_url: pageUrl?.href ?? null,
page_meta: toJSONOrNull(conversation.pageMeta),
created_date: conversation.createdDate,
updated_date: conversation.updatedDate,
status: conversation.status,
active_branch_tip_message_id: conversation.activeBranchTipMessageId,
});
const messages = conversation.messages.map(m => ({
message_id: m.id,
conv_id: conversation.id,
created_date: m.createdDate,
parent_message_id: m.parentMessageId,
revision_root_message_id: m.revisionRootMessageId,
ordinal: m.ordinal,
is_active_branch: m.isActiveBranch ? 1 : 0,
role: m.role,
model_id: m.modelId,
params: toJSONOrNull(m.params),
content: toJSONOrNull(m.content),
usage: toJSONOrNull(m.usage),
page_url: m.pageUrl?.href || "",
turn_index: m.turnIndex,
insights_enabled: m.insightsEnabled,
insights_flag_source: m.insightsFlagSource,
insights_applied_jsonb: toJSONOrNull(m.insightsApplied),
web_search_queries_jsonb: toJSONOrNull(m.webSearchQueries),
}));
await this.#conn.executeCached(MESSAGE_INSERT, messages);
})
.catch(e => {
lazy.log.error("Transaction failed to execute");
throw e;
});
}
/**
* Gets a list of oldest conversations
*
* @param {number} numberOfConversations - How many conversations to retrieve
* @returns {Array<ChatMinimal>} - List of ChatMinimal items
*/
async findOldestConversations(numberOfConversations) {
await this.#ensureDatabase().catch(e => {
lazy.log.error("Could not ensure a database connection.");
throw e;
});
const rows = await this.#conn
.executeCached(CONVERSATIONS_OLDEST, {
limit: numberOfConversations,
})
.catch(e => {
lazy.log.error("Could not retrieve oldest conversations.");
throw e;
});
return rows.map(row => {
return new ChatMinimal({
convId: row.getResultByName("conv_id"),
title: row.getResultByName("title"),
});
});
}
/**
* Gets a list of most recent conversations
*
* @param {number} numberOfConversations - How many conversations to retrieve
* @returns {Array<ChatMinimal>} - List of ChatMinimal items
*/
async findRecentConversations(numberOfConversations) {
await this.#ensureDatabase().catch(e => {
lazy.log.error("Could not ensure a database connection.");
throw e;
});
const rows = await this.#conn
.executeCached(CONVERSATIONS_MOST_RECENT, {
limit: numberOfConversations,
})
.catch(e => {
lazy.log.error("Could not retrieve most recent conversations.");
throw e;
});
return rows.map(row => {
return new ChatMinimal({
convId: row.getResultByName("conv_id"),
title: row.getResultByName("title"),
});
});
}
/**
* Gets a Conversation using it's id
*
* @param {string} conversationId - The ID of the conversation to retrieve
*
* @returns {ChatConversation} - The conversation and its messages
*/
async findConversationById(conversationId) {
const conversations = await this.#findConversationsWithMessages(
CONVERSATION_BY_ID,
{
conv_id: conversationId,
}
);
return conversations[0] ?? null;
}
/**
* Finds conversations between a specified start and end date
*
* @param {number} startDate - Start time epoch format
* @param {number} endDate - End time epoch format
*
* @returns {Array<ChatConversation>} - The conversations and their messages
*/
async findConversationsByDate(startDate, endDate) {
return this.#findConversationsWithMessages(CONVERSATIONS_BY_DATE, {
start_date: startDate,
end_date: endDate,
});
}
/**
* Finds conversations between a specified start and end date
*
* @param {URL} pageUrl - The URL to find conversations for
*
* @returns {Array<ChatConversation>} - The conversations and their messages
*/
async findConversationsByURL(pageUrl) {
return this.#findConversationsWithMessages(CONVERSATIONS_BY_URL, {
page_url: pageUrl.href,
});
}
/**
* Search for messages that happened between the specified start
* and end dates, optionally, filter the messages by a specific
* message role type.
*
* @param {Date} startDate - The start date, inclusive
* @param {Date} [endDate=new Date()] - The end date, inclusive
* @param {MessageRole} [role=-1] - The message role type to filter by, one of 0|1|2|3
* as defined by the constant MESSAGE_ROLE
* @param {number} [limit=-1] - The max number of messages to retrieve
* @param {number} [offset=-1] - The number or messages to skip from the result set
*
* @returns {Array<ChatMessage>} - An array of ChatMessage entries
*/
async findMessagesByDate(
startDate,
endDate = new Date(),
role = -1,
limit = -1,
offset = -1
) {
const params = {
start_date: startDate.getTime(),
end_date: endDate.getTime(),
limit,
offset,
};
let sql = MESSAGES_BY_DATE;
if (role > -1) {
sql = MESSAGES_BY_DATE_AND_ROLE;
params.role = role;
}
let rows = await this.#conn.executeCached(sql, params);
return parseMessageRows(rows);
}
#escapeForLike(searchString) {
return searchString
.replaceAll(ESCAPE_CHAR, `${ESCAPE_CHAR}${ESCAPE_CHAR}`)
.replaceAll("%", `${ESCAPE_CHAR}%`)
.replaceAll("_", `${ESCAPE_CHAR}_`);
}
/**
* Searches through the message.content JSON object to find a particular
* object path that contains a partial string match of a value.
*
* @param {string} keyChain - The object key chain to look through,
* like obj.field1.field2
* @param {MessageRole} [role=-1] - A message role to search for
*
* @returns {Array<ChatConversation>} - An array of conversations with messages
* that contain a message that matches the search string at the given content
* object path
*/
async searchContent(keyChain, role = -1) {
const path = `$.${keyChain}`;
const query =
role > -1
? CONVERSATIONS_CONTENT_SEARCH_BY_ROLE
: CONVERSATIONS_CONTENT_SEARCH;
const params = { path };
if (role > -1) {
params.role = role;
}
const rows = await this.#conn.executeCached(query, params);
if (!rows.length) {
return [];
}
const conversations = rows.map(parseConversationRow);
return await this.#getMessagesForConversations(conversations);
}
/**
* Searches for conversations where the conversation title, or the conversation
* contains a user message where the search string contains a partial match
* in the message.content.body field
*
* @param {string} searchString - The string to search with for conversations
*
* @returns {Array<ChatConversation>} - An array of conversations with messages
* that contain a message that matches the search string in the conversation
* titles
*/
async search(searchString) {
const path = `$.body`;
const pattern = `%${this.#escapeForLike(searchString)}%`;
const rows = await this.#conn.executeCached(CONVERSATIONS_HISTORY_SEARCH, {
path,
pattern,
});
if (!rows.length) {
return [];
}
const conversations = rows.map(parseConversationRow);
return await this.#getMessagesForConversations(conversations);
}
/**
* Gets a list of chat history items to display in Chat History view.
*
* @param {number} [pageNumber=1] - The page number to get, 1 based indexing
* @param {number} [pageSize=20] - Number of items to get per page
* @param {string} [sort="desc"] - desc|asc The sorting order based on updated_date for conversations
*/
async chatHistoryView(pageNumber = 1, pageSize = 20, sort = "desc") {
const sorting = SORTS.find(item => item === sort.toUpperCase()) ?? "DESC";
const offset = pageSize * (pageNumber - 1);
const limit = pageSize;
const params = {
limit,
offset,
};
const rows = await this.#conn.executeCached(
CONVERSATION_HISTORY.replace("{sort}", sorting),
params
);
return parseChatHistoryViewRows(rows);
}
/**
* Prunes the database of old conversations in order to get the
* database file size to the specified maximum size.
*
* Review the requirements for db pruning and set up invocation schedule, and refactor
* to use dbstat
*
* @param {number} [reduceByPercentage=0.05] - Percentage to reduce db file size by
* @param {number} [maxDbSizeBytes=MAX_DB_SIZE_BYTES] - Db max file size
*/
async pruneDatabase(
reduceByPercentage = 0.05,
maxDbSizeBytes = MAX_DB_SIZE_BYTES
) {
if (!IOUtils.exists(this.databaseFilePath)) {
return;
}
const DELETE_BATCH_SIZE = 50;
const getPragmaInt = async name => {
const result = await this.#conn.execute(`PRAGMA ${name}`);
return result[0].getInt32(0);
};
// compute the logical DB size in bytes using SQLite's page_size,
// page_count, and freelist_count
const getLogicalDbSizeBytes = async () => {
const pageSize = await getPragmaInt("page_size");
const pageCount = await getPragmaInt("page_count");
const freelistCount = await getPragmaInt("freelist_count");
// Logical used pages = total pages - free pages
const usedPages = pageCount - freelistCount;
const lSize = usedPages * pageSize;
return lSize;
};
let logicalSize = await getLogicalDbSizeBytes();
if (logicalSize < maxDbSizeBytes) {
return;
}
const targetLogicalSize = Math.max(
0,
logicalSize * (1 - reduceByPercentage)
);
const MAX_ITERATIONS = 100;
// how many "no file size change" batches we tolerate
const MAX_STAGNANT = 5;
let iterations = 0;
let stagnantIterations = 0;
while (
logicalSize > targetLogicalSize &&
iterations < MAX_ITERATIONS &&
stagnantIterations < MAX_STAGNANT
) {
iterations++;
const recentChats = await this.findOldestConversations(DELETE_BATCH_SIZE);
if (!recentChats.length) {
break;
}
for (const chat of recentChats) {
await this.deleteConversationById(chat.id);
}
const newLogicalSize = await getLogicalDbSizeBytes();
if (newLogicalSize >= logicalSize) {
stagnantIterations++;
} else {
stagnantIterations = 0;
}
logicalSize = newLogicalSize;
}
// Actually reclaim disk space.
await this.#conn.execute("PRAGMA incremental_vacuum;");
}
/**
* Returns the file size of the database.
* Establishes a connection first to make sure the
* database exists.
*
* @returns {number} - The file size in bytes
*/
async getDatabaseSize() {
await this.#ensureDatabase();
const stats = await IOUtils.stat(this.databaseFilePath);
return stats.size;
}
/**
* Deletes a particular conversation using it's id
*
* @param {string} id - The conv_id of a conversation row to delete
*/
async deleteConversationById(id) {
await this.#ensureDatabase();
await this.#conn.execute(DELETE_CONVERSATION_BY_ID, {
conv_id: id,
});
}
/**
* This method is meant to only be used for testing cleanup
*/
async destroyDatabase() {
await this.#removeDatabaseFiles();
}
/**
* Gets the version of the schema currently set in the database.
*
* @returns {number}
*/
async getDatabaseSchemaVersion() {
if (!this.#conn) {
await this.#ensureDatabase();
}
return this.#conn.getSchemaVersion();
}
async #getMessagesForConversations(conversations) {
const convs = conversations.reduce((convMap, conv) => {
convMap[conv.id] = conv;
return convMap;
}, {});
// Find all the messages for all the conversations.
const rows = await this.#conn
.executeCached(
getConversationMessagesSql(conversations.length),
conversations.map(c => c.id)
)
.catch(e => {
lazy.log.error("Could not retrieve messages for conversatons");
lazy.log.error(`${e.message}\n${e.stack}`);
return [];
});
// TODO: retrieve TTL content.
parseMessageRows(rows).forEach(message => {
const conversation = convs[message.convId];
if (conversation) {
conversation.messages.push(message);
}
});
return conversations;
}
async #openConnection() {
lazy.log.debug("Opening new connection");
try {
const confConfig = { path: this.databaseFilePath };
this.#conn = await lazy.Sqlite.openConnection(confConfig);
} catch (e) {
lazy.log.error("openConnection() could not open db:", e.message);
throw e;
}
lazy.Sqlite.shutdown.addBlocker(
"ChatStore: Shutdown",
this.#asyncShutdownBlocker
);
try {
// TODO: remove this after switching pruneDatabase() to use dbstat
await this.#conn.execute("PRAGMA page_size = 4096;");
// Setup WAL journaling, as it is generally faster.
await this.#conn.execute("PRAGMA journal_mode = WAL;");
await this.#conn.execute("PRAGMA wal_autocheckpoint = 16;");
// Store VACUUM information to be used by the VacuumManager.
await this.#conn.execute("PRAGMA auto_vacuum = INCREMENTAL;");
await this.#conn.execute("PRAGMA foreign_keys = ON;");
} catch (e) {
lazy.log.warn("Configuring SQLite PRAGMA settings: ", e.message);
}
}
async #closeConnection() {
if (!this.#conn) {
return;
}
lazy.log.debug("Closing connection");
lazy.Sqlite.shutdown.removeBlocker(this.#asyncShutdownBlocker);
try {
await this.#conn.close();
} catch (e) {
lazy.log.warn(`Error closing connection: ${e.message}`);
}
this.#conn = null;
}
/**
* Discuss implications of multiple instances of ChatStore
* and the potential issues with migrations/schemas.
*/
async #ensureDatabase() {
if (this.#promiseConn) {
return this.#promiseConn;
}
let deferred = Promise.withResolvers();
this.#promiseConn = deferred.promise;
if (this.#removeDatabaseOnStartup) {
lazy.log.debug("Removing database on startup");
try {
await this.#removeDatabaseFiles();
} catch (e) {
deferred.reject(new Error("Could not remove the database files"));
return deferred.promise;
}
}
try {
await this.#openConnection();
} catch (e) {
if (
e.result == Cr.NS_ERROR_FILE_CORRUPTED ||
e.errors?.some(error => error.result == Ci.mozIStorageError.NOTADB)
) {
lazy.log.warn("Invalid database detected, removing it.", e);
await this.#removeDatabaseFiles();
}
}
if (!this.#conn) {
try {
await this.#openConnection();
} catch (e) {
lazy.log.error("Could not open the database connection.", e);
deferred.reject(new Error("Could not open the database connection"));
return deferred.promise;
}
}
try {
await this.#initializeSchema();
} catch (e) {
lazy.log.warn(
"Failed to initialize the database schema, recreating the database.",
e
);
// If the schema cannot be initialized try to create a new database file.
await this.#removeDatabaseFiles();
}
deferred.resolve(this.#conn);
return this.#promiseConn;
}
async setSchemaVersion(version) {
await this.#conn.setSchemaVersion(version);
}
async #initializeSchema() {
const version = await this.getDatabaseSchemaVersion();
if (version == this.CURRENT_SCHEMA_VERSION) {
return;
}
if (version > this.CURRENT_SCHEMA_VERSION) {
await this.setSchemaVersion(this.CURRENT_SCHEMA_VERSION);
return;
}
// Must migrate the schema.
await this.#conn.executeTransaction(async () => {
if (version == 0) {
// This is a newly created database, just create the entities.
await this.#createDatabaseEntities();
await this.#conn.setSchemaVersion(this.CURRENT_SCHEMA_VERSION);
// eslint-disable-next-line no-useless-return
return;
}
await this.applyMigrations();
await this.setSchemaVersion(this.CURRENT_SCHEMA_VERSION);
});
}
async applyMigrations() {
for (const migration of migrations) {
if (typeof migration !== "function") {
continue;
}
await migration(this.#conn, this.CURRENT_SCHEMA_VERSION);
}
}
async #removeDatabaseFiles() {
lazy.log.debug("Removing database files");
await this.#closeConnection();
try {
for (let file of [
this.databaseFilePath,
PathUtils.join(DB_FOLDER_PATH, this.databaseFileName + "-wal"),
PathUtils.join(DB_FOLDER_PATH, this.databaseFileName + "-shm"),
]) {
lazy.log.debug(`Removing ${file}`);
await IOUtils.remove(file, {
retryReadonly: true,
recursive: true,
ignoreAbsent: true,
});
}
this.#removeDatabaseOnStartup = false;
} catch (e) {
lazy.log.warn("Failed to remove database files", e);
// Try to clear on next startup.
this.#removeDatabaseOnStartup = true;
// Re-throw the exception for the caller.
throw e;
}
}
async #findConversationsWithMessages(sql, queryParams) {
await this.#ensureDatabase().catch(e => {
lazy.log.error("Could not ensure a database connection.");
lazy.log.error(`${e.message}\n${e.stack}`);
return [];
});
// Check summary first, find the one with the largest end_ordinal.
// If not found retrieve all messages.
// If found compare end_ordinal of the summary with active branch ordinal
// to determine if extra messages must be retrieved.
let rows = await this.#conn.executeCached(sql, queryParams);
const conversations = rows.map(parseConversationRow);
return await this.#getMessagesForConversations(conversations);
}
async #createDatabaseEntities() {
await this.#conn.execute(CONVERSATION_TABLE);
await this.#conn.execute(CONVERSATION_UPDATED_DATE_INDEX);
await this.#conn.execute(MESSAGE_TABLE);
await this.#conn.execute(MESSAGE_ORDINAL_INDEX);
await this.#conn.execute(MESSAGE_URL_INDEX);
await this.#conn.execute(MESSAGE_CREATED_DATE_INDEX);
await this.#conn.execute(MESSAGE_CONV_ID_INDEX);
}
get #removeDatabaseOnStartup() {
return Services.prefs.getBoolPref(
`${PREF_BRANCH}.removeDatabaseOnStartup`,
false
);
}
set #removeDatabaseOnStartup(value) {
lazy.log.debug(`Setting removeDatabaseOnStartup to ${value}`);
Services.prefs.setBoolPref(`${PREF_BRANCH}.removeDatabaseOnStartup`, value);
}
static get CONVERSATION_STATUS() {
return CONVERSATION_STATUS;
}
get CURRENT_SCHEMA_VERSION() {
return CURRENT_SCHEMA_VERSION;
}
get connection() {
return this.#conn;
}
get databaseFileName() {
return DB_FILE_NAME;
}
get databaseFilePath() {
return PathUtils.join(PathUtils.profileDir, this.databaseFileName);
}
}