Files
BetterSEQTA-Plus/src/plugins/built-in/globalSearch/indexing/indexer.ts
T
2025-04-11 00:07:29 +10:00

264 lines
7.1 KiB
TypeScript

import { clear, getAll, put, remove } from "./db";
import { jobs } from "./jobs";
import { renderComponentMap } from "./renderComponents";
import type { HydratedIndexItem, IndexItem, Job, JobContext } from "./types";
import { EmbeddingIndex, getEmbedding, initializeModel } from "client-vector-search";
const META_STORE = "meta";
const LOCK_KEY = "bsq-indexer-lock";
const HEARTBEAT_INTERVAL = 10000;
const LOCK_TIMEOUT = 20000;
let heartbeatTimer: ReturnType<typeof setInterval> | null = null;
let vectorIndex: EmbeddingIndex | null = null;
let isInitialized = false;
async function initVectorSearch() {
if (isInitialized) return;
try {
await initializeModel();
vectorIndex = new EmbeddingIndex([]);
// Load existing items from IndexedDB
const stored = await vectorIndex.getAllObjectsFromIndexedDB();
if (stored.length > 0) {
stored.forEach((item) => vectorIndex!.add(item));
console.debug("Vector index loaded from IndexedDB");
}
isInitialized = true;
} catch (e) {
console.error("Failed to initialize vector search:", e);
throw e;
}
}
async function vectorizeItem(
item: HydratedIndexItem,
): Promise<HydratedIndexItem & { embedding: number[] }> {
const textToEmbed = [
item.text,
item.content,
item.category,
item.metadata?.author,
item.metadata?.subject,
]
.filter(Boolean)
.join(" ");
const embedding = await getEmbedding(textToEmbed);
return { ...item, embedding };
}
async function processItems(items: HydratedIndexItem[]) {
if (!vectorIndex) await initVectorSearch();
const unprocessedItems = items.filter((item) => {
try {
return !vectorIndex!.get({ id: item.id });
} catch {
return true;
}
});
if (unprocessedItems.length === 0) {
console.debug("No new items to vectorize");
return;
}
console.debug(`Vectorizing ${unprocessedItems.length} new items...`);
// Process in batches to avoid UI freeze
const BATCH_SIZE = 5;
for (let i = 0; i < unprocessedItems.length; i += BATCH_SIZE) {
const batch = unprocessedItems.slice(i, i + BATCH_SIZE);
const vectorized = await Promise.all(batch.map(vectorizeItem));
for (const item of vectorized) {
vectorIndex!.add(item);
}
// Save periodically to avoid losing progress
await vectorIndex!.saveIndex("indexedDB");
// Log progress
console.debug(
`Vectorized ${Math.min(i + BATCH_SIZE, unprocessedItems.length)}/${unprocessedItems.length} items`,
);
}
}
function shouldRun(job: Job, lastRun?: number): boolean {
const now = Date.now();
if (job.frequency === "pageLoad") return true;
if (!lastRun) return true;
if (job.frequency.type === "interval") {
return now - lastRun >= job.frequency.ms;
}
if (job.frequency.type === "expiry") {
return now - lastRun >= job.frequency.afterMs;
}
return false;
}
function getLastRunMeta(jobId: string): Promise<number | undefined> {
return getAll(META_STORE).then((metaItems) => {
const match = metaItems.find((m: any) => m.jobId === jobId);
return match?.lastRun;
});
}
async function updateLastRunMeta(jobId: string): Promise<void> {
await put(META_STORE, { jobId, lastRun: Date.now() }, jobId);
}
function shouldIndex(): boolean {
const last = parseInt(localStorage.getItem(LOCK_KEY) || "0", 10);
return isNaN(last) || Date.now() - last > LOCK_TIMEOUT;
}
function startHeartbeat() {
localStorage.setItem(LOCK_KEY, `${Date.now()}`);
heartbeatTimer = setInterval(() => {
localStorage.setItem(LOCK_KEY, `${Date.now()}`);
}, HEARTBEAT_INTERVAL);
}
function stopHeartbeat() {
if (heartbeatTimer) clearInterval(heartbeatTimer);
localStorage.removeItem(LOCK_KEY);
}
function dispatchProgress(completed: number, total: number, indexing: boolean) {
const event = new CustomEvent("indexing-progress", {
detail: { completed, total, indexing },
});
window.dispatchEvent(event);
}
export async function loadAllStoredItems(): Promise<HydratedIndexItem[]> {
const all: HydratedIndexItem[] = [];
for (const jobId in jobs) {
const items = await getAll(jobId);
const job = jobs[jobId];
const renderComponent = renderComponentMap[job.renderComponentId];
for (const item of items) {
all.push({
...item,
renderComponent,
});
}
}
return all;
}
export async function runIndexing(): Promise<void> {
if (!shouldIndex()) {
console.debug(
"%c[Indexer] Skipping indexing (another tab has the lock)",
"color: gray",
);
return;
}
startHeartbeat();
console.debug("%c[Indexer] Starting indexing...", "color: green");
const jobIds = Object.keys(jobs);
let completedJobs = 0;
dispatchProgress(completedJobs, jobIds.length, true);
const allNewItems: HydratedIndexItem[] = [];
for (const jobId of jobIds) {
const job = jobs[jobId];
const lastRun = await getLastRunMeta(jobId);
if (!shouldRun(job, lastRun)) {
console.debug(
`%c[Indexer] Skipping job "${jobId}" (not due)`,
"color: gray",
);
completedJobs++;
dispatchProgress(completedJobs, jobIds.length, true);
continue;
}
const getStoredItems = async () => await getAll(jobId);
const setStoredItems = async (items: IndexItem[]) => {
await clear(jobId);
await Promise.all(items.map((i) => put(jobId, i, i.id)));
};
const addItem = async (item: IndexItem) => {
await put(jobId, item, item.id);
};
const removeItem = async (id: string) => {
await remove(jobId, id);
};
const ctx: JobContext = {
getStoredItems,
setStoredItems,
addItem,
removeItem,
};
console.debug(`%c[Indexer] Running job "${jobId}"...`, "color: #4ea1ff");
try {
const newItems = await job.run(ctx);
const stored = await getStoredItems();
let merged = mergeItems(stored, newItems);
if (job.purge) merged = job.purge(merged);
await setStoredItems(merged);
await updateLastRunMeta(jobId);
// Add to our collection of new items for vector processing
const hydratedItems = merged.map((item) => ({
...item,
renderComponent: renderComponentMap[job.renderComponentId],
}));
allNewItems.push(...hydratedItems);
console.debug(
`%c[Indexer] ✅ ${job.label}: ${newItems.length} items indexed`,
"color: #00c46f",
);
} catch (err) {
console.debug(`%c[Indexer] ❌ ${job.label} failed:`, "color: red");
console.error(err);
}
completedJobs++;
dispatchProgress(completedJobs, jobIds.length, true);
}
// Process all new items through vector search
if (allNewItems.length > 0) {
console.debug(
`%c[Indexer] Processing ${allNewItems.length} items for vector search...`,
"color: #4ea1ff",
);
await processItems(allNewItems);
}
stopHeartbeat();
dispatchProgress(completedJobs, jobIds.length, false);
}
function mergeItems(existing: IndexItem[], incoming: IndexItem[]): IndexItem[] {
const map = new Map<string, IndexItem>();
for (const item of existing) map.set(item.id, item);
for (const item of incoming) map.set(item.id, item);
return Array.from(map.values());
}