feat: global search bug fixes and performance improvements

This commit is contained in:
SethBurkart123
2025-06-12 14:45:36 +10:00
parent fa37fe9d21
commit 57b4daa9b7
9 changed files with 519 additions and 254 deletions
@@ -24,7 +24,6 @@
searchHotkey: string
}>();
// Make searchHotkey reactive to setting changes
let currentSearchHotkey = $state(initialSearchHotkey);
let commandsFuse = $state<Fuse<StaticCommandItem>>();
@@ -177,7 +176,7 @@
isLoading = false;
};
const debouncedPerformSearch = debounce(performSearch, 10);
const debouncedPerformSearch = debounce(performSearch, 200);
$effect(() => {
if (commandPalleteOpen) {
@@ -126,6 +126,19 @@ const globalSearchPlugin: Plugin<typeof settings> = {
initVectorSearch();
// Warm up vector worker in background to improve initial response time
setTimeout(async () => {
try {
const workerManager = VectorWorkerManager.getInstance();
console.debug("[Global Search] Warming up vector worker...");
// Just ensure the worker is ready, don't process anything yet
await workerManager.processItems([], () => {});
console.debug("[Global Search] Vector worker warmed up successfully");
} catch (error) {
console.warn("[Global Search] Vector worker warm-up failed:", error);
}
}, 1000);
// Add debug helpers to window for troubleshooting
// @ts-ignore
window.globalSearchDebug = {
@@ -8,7 +8,7 @@ import browser from "webextension-polyfill";
export function mountSearchBar(
titleElement: Element,
api: any,
appRef: { current: any },
appRef: { current: any; storageChangeHandler?: any },
) {
if (titleElement.querySelector(".search-trigger")) {
return;
@@ -49,6 +49,9 @@ export function mountSearchBar(
browser.storage.onChanged.addListener(handleStorageChange);
// Store reference to cleanup function for proper removal
appRef.storageChangeHandler = handleStorageChange;
const searchRoot = document.createElement("div");
document.body.appendChild(searchRoot);
const searchRootShadow = searchRoot.attachShadow({ mode: "open" });
@@ -69,7 +72,7 @@ export function mountSearchBar(
}
}
export function cleanupSearchBar(appRef: { current: any }) {
export function cleanupSearchBar(appRef: { current: any; storageChangeHandler?: any }) {
if (appRef.current) {
try {
unmount(appRef.current);
@@ -94,6 +97,8 @@ export function cleanupSearchBar(appRef: { current: any }) {
// Clean up vector worker
VectorWorkerManager.getInstance().terminate();
// Remove storage listener
browser.storage.onChanged.removeListener(() => {});
if (appRef.storageChangeHandler) {
browser.storage.onChanged.removeListener(appRef.storageChangeHandler);
appRef.storageChangeHandler = null;
}
}
@@ -3,19 +3,22 @@ const META_STORE = "meta";
const VERSION_KEY = "betterseqta-index-version";
let dbPromise: Promise<IDBDatabase> | null = null;
let cachedDb: IDBDatabase | null = null;
// Get the current version from localStorage or start at 1
function getCurrentVersion(): number {
const storedVersion = localStorage.getItem(VERSION_KEY);
return storedVersion ? parseInt(storedVersion, 10) : 1;
}
// Update the version in localStorage
function updateVersion(version: number) {
localStorage.setItem(VERSION_KEY, version.toString());
}
function openDB(): Promise<IDBDatabase> {
if (cachedDb && cachedDb.version >= getCurrentVersion()) {
return Promise.resolve(cachedDb);
}
if (dbPromise) return dbPromise;
const currentVersion = getCurrentVersion();
@@ -26,8 +29,11 @@ function openDB(): Promise<IDBDatabase> {
try {
request = indexedDB.open(DB_NAME, currentVersion);
} catch (e) {
// If there's a version error, try to delete the database and start fresh
console.warn("Database version conflict, recreating database...");
if (cachedDb) {
cachedDb.close();
cachedDb = null;
}
indexedDB.deleteDatabase(DB_NAME);
localStorage.removeItem(VERSION_KEY);
request = indexedDB.open(DB_NAME, 1);
@@ -38,22 +44,37 @@ function openDB(): Promise<IDBDatabase> {
const db = request.result;
const existingStores = Array.from(db.objectStoreNames);
// Always ensure META_STORE exists
if (!existingStores.includes(META_STORE)) {
db.createObjectStore(META_STORE);
}
// Update version in localStorage to match the database
updateVersion(event.newVersion || 1);
};
request.onsuccess = () => resolve(request.result);
request.onsuccess = () => {
if (cachedDb && cachedDb !== request.result) {
cachedDb.close();
}
cachedDb = request.result;
cachedDb.onclose = () => {
cachedDb = null;
dbPromise = null;
};
resolve(request.result);
};
request.onerror = () => {
console.error("Error opening database:", request.error);
// If there's an error, try to recover by deleting and recreating
if (cachedDb) {
cachedDb.close();
cachedDb = null;
}
indexedDB.deleteDatabase(DB_NAME);
localStorage.removeItem(VERSION_KEY);
dbPromise = null;
reject(request.error);
};
});
@@ -64,11 +85,12 @@ function openDB(): Promise<IDBDatabase> {
async function getStore(store: string, mode: IDBTransactionMode = "readonly") {
const db = await openDB();
// Create store dynamically if needed
if (!db.objectStoreNames.contains(store)) {
db.close();
await upgradeDB(store);
return getStore(store, mode);
const upgradedDb = await openDB();
const tx = upgradedDb.transaction(store, mode);
return tx.objectStore(store);
}
const tx = db.transaction(store, mode);
@@ -80,11 +102,11 @@ function upgradeDB(newStore: string): Promise<void> {
const currentVersion = getCurrentVersion();
const newVersion = currentVersion + 1;
// Close any existing connections
if (dbPromise) {
dbPromise.then((db) => db.close());
dbPromise = null;
if (cachedDb) {
cachedDb.close();
cachedDb = null;
}
dbPromise = null;
const request = indexedDB.open(DB_NAME, newVersion);
@@ -93,11 +115,18 @@ function upgradeDB(newStore: string): Promise<void> {
if (!db.objectStoreNames.contains(newStore)) {
db.createObjectStore(newStore);
}
// Update version in localStorage
updateVersion(event.newVersion || newVersion);
};
request.onsuccess = () => {
cachedDb = request.result;
cachedDb.onclose = () => {
cachedDb = null;
dbPromise = null;
};
dbPromise = Promise.resolve(request.result);
resolve();
};
@@ -183,11 +212,17 @@ export async function clear(store: string): Promise<void> {
}
}
// Helper function to reset the database if needed
export async function resetDatabase(): Promise<void> {
if (cachedDb) {
cachedDb.close();
cachedDb = null;
}
if (dbPromise) {
const db = await dbPromise;
db.close();
try {
const db = await dbPromise;
db.close();
} catch (e) {}
dbPromise = null;
}
@@ -276,116 +276,102 @@ export async function runIndexing(): Promise<void> {
);
}
if (!hasStreamingJobs) {
const allItemsInPrimaryStores = await loadAllStoredItems();
let allItemsInPrimaryStores = await loadAllStoredItems();
if (allItemsInPrimaryStores.length > 0) {
console.debug(
`%c[Indexer] Sending ${allItemsInPrimaryStores.length} items from primary stores to worker for vectorization check...`,
"color: #4ea1ff",
);
dispatchProgress(completedJobs, totalSteps, true, "Starting vectorization of stored items");
if (allItemsInPrimaryStores.length > 0) {
console.debug(
`%c[Indexer] Sending ${allItemsInPrimaryStores.length} items from primary stores to worker for vectorization check...`,
"color: #4ea1ff",
);
dispatchProgress(completedJobs, totalSteps, true, "Starting vectorization of stored items");
try {
const workerManager = VectorWorkerManager.getInstance();
await workerManager.processItems(allItemsInPrimaryStores, (progress) => {
let detailMessage = progress.message || "";
if (
progress.status === "processing" &&
progress.total &&
progress.processed !== undefined
) {
detailMessage = `Vectorizing: ${progress.processed} / ${progress.total}`;
} else if (progress.status === "complete") {
detailMessage = "Vectorization complete";
completedJobs++;
try {
const workerManager = VectorWorkerManager.getInstance();
await workerManager.processItems(allItemsInPrimaryStores, (progress) => {
let detailMessage = progress.message || "";
if (
progress.status === "processing" &&
progress.total &&
progress.processed !== undefined
) {
detailMessage = `Vectorizing: ${progress.processed} / ${progress.total}`;
} else if (progress.status === "complete") {
detailMessage = "Vectorization complete";
completedJobs++;
dispatchProgress(
completedJobs,
totalSteps,
false,
"Indexing finished",
detailMessage
);
} else if (progress.status === "error") {
detailMessage = `Vectorization error: ${progress.message}`;
dispatchProgress(
completedJobs,
totalSteps,
false,
"Vectorization failed",
detailMessage,
);
} else if (progress.status === "started") {
detailMessage = `Vectorization started for ${progress.total} items`;
} else if (progress.status === "cancelled") {
detailMessage = `Vectorization cancelled: ${progress.message}`;
dispatchProgress(
completedJobs,
totalSteps,
false,
"Vectorization cancelled",
detailMessage,
);
}
if (progress.status !== "complete" && progress.status !== "error" && progress.status !== "cancelled") {
dispatchProgress(
completedJobs,
totalSteps,
false,
"Indexing finished",
detailMessage
);
} else if (progress.status === "error") {
detailMessage = `Vectorization error: ${progress.message}`;
dispatchProgress(
completedJobs,
totalSteps,
false,
"Vectorization failed",
true,
"Vectorization in progress",
detailMessage,
);
} else if (progress.status === "started") {
detailMessage = `Vectorization started for ${progress.total} items`;
} else if (progress.status === "cancelled") {
detailMessage = `Vectorization cancelled: ${progress.message}`;
dispatchProgress(
completedJobs,
totalSteps,
false,
"Vectorization cancelled",
detailMessage,
);
}
if (progress.status !== "complete" && progress.status !== "error" && progress.status !== "cancelled") {
dispatchProgress(
completedJobs,
totalSteps,
true,
"Vectorization in progress",
detailMessage,
);
}
});
console.debug(
"%c[Indexer] Vectorization task for stored items sent to worker.",
"color: green",
);
} catch (error) {
console.error(
`%c[Indexer] ❌ Failed to send items to vector worker:`,
"color: red",
error,
);
dispatchProgress(
completedJobs,
totalSteps,
false,
"Vectorization failed",
String(error),
);
}
} else {
}
});
console.debug(
"%c[Indexer] No items found in primary stores to send for vectorization.",
"color: gray",
"%c[Indexer] Vectorization task for stored items sent to worker.",
"color: green",
);
} catch (error) {
console.error(
`%c[Indexer] ❌ Failed to send items to vector worker:`,
"color: red",
error,
);
completedJobs++;
dispatchProgress(
completedJobs,
totalSteps,
false,
"Indexing finished (no items for vectorization)",
"Vectorization failed",
String(error),
);
}
} else {
console.debug(
"%c[Indexer] Skipping bulk vectorization - streaming jobs will handle vectorization",
"color: #4ea1ff",
"%c[Indexer] No items found in primary stores to send for vectorization.",
"color: gray",
);
completedJobs++;
dispatchProgress(
completedJobs,
totalSteps,
false,
"Indexing finished (streaming vectorization active)",
"Indexing finished (no items for vectorization)",
);
}
stopHeartbeat();
const allItemsInPrimaryStores = await loadAllStoredItems();
allItemsInPrimaryStores = await loadAllStoredItems();
allItemsInPrimaryStores.forEach(item => {
const jobDef = jobs[item.category] || Object.values(jobs).find(j => j.id === item.category) || jobs[item.renderComponentId];
if (jobDef) {
@@ -8,18 +8,20 @@ import { renderComponentMap } from "../renderComponents";
import { jobs } from "../jobs";
const RATE_LIMIT_CONFIG = {
minDelay: 50,
maxDelay: 5000,
baseDelay: 200,
backoffMultiplier: 1.5,
minDelay: 30,
maxDelay: 3000,
baseDelay: 150,
backoffMultiplier: 1.3,
maxRetries: 3,
adaptiveBatchSize: true,
minBatchSize: 10,
maxBatchSize: 100,
baseBatchSize: 50,
vectorBatchSize: 5,
parallelRequests: 5,
parallelDelay: 100,
minBatchSize: 15,
maxBatchSize: 150,
baseBatchSize: 75,
vectorBatchSize: 10,
parallelRequests: 8,
parallelDelay: 50,
circuitBreakerThreshold: 5,
circuitBreakerResetTime: 30000,
};
interface MessagesProgress {
@@ -33,6 +35,9 @@ interface MessagesProgress {
processedIds: string[];
streamingStarted: boolean;
totalEstimated: number;
circuitBreakerOpen: boolean;
circuitBreakerOpenTime: number;
consecutiveFailures: number;
}
const fetchMessages = async (offset = 0, limit = 100) => {
@@ -99,50 +104,38 @@ function calculateAdaptiveDelay(
progress: MessagesProgress,
responseTime: number,
): number {
const { currentDelay, failedRequests, lastSuccessTime } = progress;
const {
currentDelay,
failedRequests,
lastSuccessTime,
circuitBreakerOpen,
consecutiveFailures,
} = progress;
const timeSinceLastSuccess = Date.now() - lastSuccessTime;
if (failedRequests > 0 || responseTime > 2000) {
if (circuitBreakerOpen) {
return RATE_LIMIT_CONFIG.maxDelay;
}
if (consecutiveFailures > 2 || failedRequests > 3 || responseTime > 3000) {
return Math.min(
currentDelay * RATE_LIMIT_CONFIG.backoffMultiplier,
currentDelay *
(RATE_LIMIT_CONFIG.backoffMultiplier + consecutiveFailures * 0.2),
RATE_LIMIT_CONFIG.maxDelay,
);
}
if (responseTime < 500 && timeSinceLastSuccess > 10000) {
return Math.max(currentDelay * 0.8, RATE_LIMIT_CONFIG.minDelay);
if (
responseTime < 300 &&
timeSinceLastSuccess > 5000 &&
consecutiveFailures === 0
) {
return Math.max(currentDelay * 0.7, RATE_LIMIT_CONFIG.minDelay);
}
return currentDelay;
}
function calculateAdaptiveBatchSize(
progress: MessagesProgress,
responseTime: number,
): number {
if (!RATE_LIMIT_CONFIG.adaptiveBatchSize) {
return progress.currentBatchSize;
}
const { currentBatchSize, failedRequests } = progress;
if (failedRequests > 2 || responseTime > 3000) {
return Math.max(
Math.floor(currentBatchSize * 0.7),
RATE_LIMIT_CONFIG.minBatchSize,
);
}
if (failedRequests === 0 && responseTime < 1000) {
return Math.min(
Math.floor(currentBatchSize * 1.2),
RATE_LIMIT_CONFIG.maxBatchSize,
);
}
return currentBatchSize;
}
async function estimateMessageCount(): Promise<number> {
try {
const firstBatch = await fetchMessages(0, 100);
@@ -157,6 +150,73 @@ async function estimateMessageCount(): Promise<number> {
}
}
function calculateAdaptiveBatchSize(
progress: MessagesProgress,
responseTime: number,
): number {
if (!RATE_LIMIT_CONFIG.adaptiveBatchSize) {
return progress.currentBatchSize;
}
const {
currentBatchSize,
failedRequests,
circuitBreakerOpen,
consecutiveFailures,
} = progress;
if (circuitBreakerOpen) {
return RATE_LIMIT_CONFIG.minBatchSize;
}
if (consecutiveFailures > 1 || failedRequests > 2 || responseTime > 2500) {
return Math.max(
Math.floor(currentBatchSize * 0.6),
RATE_LIMIT_CONFIG.minBatchSize,
);
}
if (failedRequests === 0 && responseTime < 800 && consecutiveFailures === 0) {
return Math.min(
Math.floor(currentBatchSize * 1.4),
RATE_LIMIT_CONFIG.maxBatchSize,
);
}
return currentBatchSize;
}
function checkCircuitBreaker(progress: MessagesProgress): boolean {
const now = Date.now();
if (
!progress.circuitBreakerOpen &&
progress.consecutiveFailures >= RATE_LIMIT_CONFIG.circuitBreakerThreshold
) {
progress.circuitBreakerOpen = true;
progress.circuitBreakerOpenTime = now;
console.warn(
`[Messages job] Circuit breaker opened due to ${progress.consecutiveFailures} consecutive failures`,
);
return true;
}
if (
progress.circuitBreakerOpen &&
now - progress.circuitBreakerOpenTime >
RATE_LIMIT_CONFIG.circuitBreakerResetTime
) {
progress.circuitBreakerOpen = false;
progress.consecutiveFailures = 0;
console.info(
`[Messages job] Circuit breaker closed after ${RATE_LIMIT_CONFIG.circuitBreakerResetTime}ms`,
);
return false;
}
return progress.circuitBreakerOpen;
}
async function processMessagesInParallel(
messages: any[],
existingIds: Set<string>,
@@ -173,7 +233,6 @@ async function processMessagesInParallel(
let consecutiveExisting = 0;
const updatedProgress = { ...progress };
// Filter out messages older than 2 years
const twoYearsAgo = Date.now() - 2 * 365 * 24 * 60 * 60 * 1000;
let shouldStop = false;
@@ -181,9 +240,8 @@ async function processMessagesInParallel(
const id = msg.id.toString();
const messageDate = new Date(msg.date).getTime();
// If we encounter a message older than 2 years, we should stop processing
// since messages are sorted by date descending
if (messageDate < twoYearsAgo) {
//! older than 2 years ago
shouldStop = true;
return false;
}
@@ -320,6 +378,9 @@ export const messagesJob: Job = {
processedIds: [],
streamingStarted: false,
totalEstimated: 0,
circuitBreakerOpen: false,
circuitBreakerOpenTime: 0,
consecutiveFailures: 0,
};
const existingIds = new Set((await ctx.getStoredItems()).map((i) => i.id));
@@ -451,6 +512,14 @@ export const messagesJob: Job = {
}
while (!progress.done) {
if (checkCircuitBreaker(progress)) {
console.warn(
"[Messages job] Circuit breaker is open, skipping processing",
);
await delay(RATE_LIMIT_CONFIG.maxDelay);
continue;
}
await delay(progress.currentDelay);
requestStartTime = Date.now();
@@ -459,6 +528,8 @@ export const messagesJob: Job = {
list = await fetchMessages(progress.offset, progress.currentBatchSize);
const responseTime = Date.now() - requestStartTime;
progress.consecutiveFailures = 0;
progress.currentDelay = calculateAdaptiveDelay(progress, responseTime);
progress.currentBatchSize = calculateAdaptiveBatchSize(
progress,
@@ -467,6 +538,7 @@ export const messagesJob: Job = {
} catch (e) {
console.error("[Messages job] list fetch failed:", e);
progress.failedRequests++;
progress.consecutiveFailures++;
progress.currentDelay = Math.min(
progress.currentDelay * RATE_LIMIT_CONFIG.backoffMultiplier,
RATE_LIMIT_CONFIG.maxDelay,
@@ -479,6 +551,7 @@ export const messagesJob: Job = {
if (list.status !== "200") {
progress.failedRequests++;
progress.consecutiveFailures++;
progress.processedIds = Array.from(processedIdsSet);
await ctx.setProgress(progress);
@@ -507,7 +580,6 @@ export const messagesJob: Job = {
itemsToStream.push(...processedItems);
// Update consecutive existing counter
consecutiveExisting = newConsecutiveExisting;
if (consecutiveExisting >= 20) {
progress.done = true;
@@ -529,14 +601,17 @@ export const messagesJob: Job = {
}
}
// Dispatch incremental search update if we processed new items
if (processedItems.length > 0) {
try {
const currentItems = await loadAllStoredItems();
currentItems.forEach(item => {
const jobDef = jobs[item.category] || Object.values(jobs).find(j => j.id === item.category) || jobs[item.renderComponentId];
currentItems.forEach((item) => {
const jobDef =
jobs[item.category] ||
Object.values(jobs).find((j) => j.id === item.category) ||
jobs[item.renderComponentId];
if (jobDef) {
const renderComponent = renderComponentMap[jobDef.renderComponentId];
const renderComponent =
renderComponentMap[jobDef.renderComponentId];
if (renderComponent) {
item.renderComponent = renderComponent;
}
@@ -545,11 +620,21 @@ export const messagesJob: Job = {
}
});
loadDynamicItems(currentItems);
window.dispatchEvent(new CustomEvent("dynamic-items-updated", {
detail: { incremental: true, jobId: "messages", newItemCount: processedItems.length, streaming: true }
}));
window.dispatchEvent(
new CustomEvent("dynamic-items-updated", {
detail: {
incremental: true,
jobId: "messages",
newItemCount: processedItems.length,
streaming: true,
},
}),
);
} catch (error) {
console.warn("[Messages job] Failed to dispatch incremental search update:", error);
console.warn(
"[Messages job] Failed to dispatch incremental search update:",
error,
);
}
}
@@ -596,6 +681,9 @@ export const messagesJob: Job = {
processedIds: [],
streamingStarted: false,
totalEstimated: 0,
circuitBreakerOpen: false,
circuitBreakerOpenTime: 0,
consecutiveFailures: 0,
});
} else {
progress.processedIds = Array.from(processedIdsSet);
@@ -309,10 +309,7 @@ export const notificationsJob: Job = {
await delay(NOTIFICATIONS_RATE_LIMIT.batchDelay);
}
const { success, item } = await processNotification(
notif,
ctx,
);
const { success, item } = await processNotification(notif, ctx);
if (!success) {
if (progress.retryQueue.length < 10) {
progress.retryQueue.push(notif.notificationID);
@@ -375,23 +372,38 @@ export const notificationsJob: Job = {
if (items.length > 0) {
try {
const currentItems = await loadAllStoredItems();
currentItems.forEach(item => {
const jobDef = jobs[item.category] || Object.values(jobs).find(j => j.id === item.category) || jobs[item.renderComponentId];
currentItems.forEach((item) => {
const jobDef =
jobs[item.category] ||
Object.values(jobs).find((j) => j.id === item.category) ||
jobs[item.renderComponentId];
if (jobDef) {
const renderComponent = renderComponentMap[jobDef.renderComponentId];
const renderComponent =
renderComponentMap[jobDef.renderComponentId];
if (renderComponent) {
item.renderComponent = renderComponent;
}
} else if (renderComponentMap[item.renderComponentId]) {
item.renderComponent = renderComponentMap[item.renderComponentId];
item.renderComponent =
renderComponentMap[item.renderComponentId];
}
});
loadDynamicItems(currentItems);
window.dispatchEvent(new CustomEvent("dynamic-items-updated", {
detail: { incremental: true, jobId: "notifications", newItemCount: items.length, streaming: true }
}));
window.dispatchEvent(
new CustomEvent("dynamic-items-updated", {
detail: {
incremental: true,
jobId: "notifications",
newItemCount: items.length,
streaming: true,
},
}),
);
} catch (error) {
console.warn("[Notifications job] Failed to dispatch incremental search update:", error);
console.warn(
"[Notifications job] Failed to dispatch incremental search update:",
error,
);
}
}
}
@@ -4,7 +4,7 @@ import type { IndexItem } from "../types";
let vectorIndex: EmbeddingIndex | null = null;
let isInitialized = false;
let currentAbortController: AbortController | null = null;
let loadedItemIds = new Set<string>(); // Track loaded items to prevent duplicates
let loadedItemIds = new Set<string>();
let streamingSession: {
isActive: boolean;
@@ -26,15 +26,12 @@ async function initWorker() {
await initializeModel();
vectorIndex = new EmbeddingIndex([]);
// Load existing items but track them to prevent duplicates
const stored = await vectorIndex.getAllObjectsFromIndexedDB();
if (stored.length > 0) {
console.debug(`Found ${stored.length} existing items in IndexedDB`);
// Clear any existing items from memory first
loadedItemIds.clear();
// Add items and track their IDs
stored.forEach((item) => {
if (item.id && !loadedItemIds.has(item.id)) {
vectorIndex!.add(item);
@@ -168,7 +165,6 @@ async function processStreamingItems() {
streamingSession.batchSize,
);
// Use our tracking set for more efficient deduplication
const unprocessedItems = batchToProcess.filter((item) => {
return item.id && !loadedItemIds.has(item.id);
});
@@ -190,12 +186,12 @@ async function processStreamingItems() {
try {
successfullyVectorized.forEach((item) => {
vectorIndex!.add(item);
loadedItemIds.add(item.id); // Track the added item
loadedItemIds.add(item.id);
});
if (
streamingSession.totalProcessed % (streamingSession.batchSize * 15) ===
0
streamingSession.totalProcessed % 50 === 0 ||
loadedItemIds.size % 200 === 0
) {
await vectorIndex!.saveIndex("indexedDB");
console.debug(
@@ -328,7 +324,6 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
}
}
// Use our tracking set for more efficient deduplication
const unprocessedItems = items.filter((item) => {
if (signal.aborted) return false;
return item.id && !loadedItemIds.has(item.id);
@@ -347,15 +342,22 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
}
if (unprocessedItems.length === 0) {
console.debug(`No new items to process. ${loadedItemIds.size} items already in index.`);
console.debug(
`No new items to process. ${loadedItemIds.size} items already in index.`,
);
self.postMessage({
type: "progress",
data: { status: "complete", message: `No new items to process (${loadedItemIds.size} items already indexed)` },
data: {
status: "complete",
message: `No new items to process (${loadedItemIds.size} items already indexed)`,
},
});
return;
}
console.debug(`Starting processing of ${unprocessedItems.length} items (${items.length - unprocessedItems.length} already processed).`);
console.debug(
`Starting processing of ${unprocessedItems.length} items (${items.length - unprocessedItems.length} already processed).`,
);
self.postMessage({
type: "progress",
data: {
@@ -402,7 +404,7 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
try {
successfullyVectorized.forEach((item) => {
vectorIndex!.add(item);
loadedItemIds.add(item.id); // Track the added item
loadedItemIds.add(item.id);
});
} catch (e) {
console.error("Error adding batch to index:", e);
@@ -425,15 +427,22 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
return;
}
try {
await vectorIndex!.saveIndex("indexedDB");
console.debug(`Saved index after processing batch ${i / BATCH_SIZE + 1} (${loadedItemIds.size} total unique items)`);
} catch (e) {
console.error("Error saving index batch:", e);
self.postMessage({
type: "progress",
data: { status: "error", message: `Error saving index batch: ${e}` },
});
if (
(i / BATCH_SIZE + 1) % 3 === 0 ||
i + BATCH_SIZE >= unprocessedItems.length
) {
try {
await vectorIndex!.saveIndex("indexedDB");
console.debug(
`Saved index after processing batch ${i / BATCH_SIZE + 1} (${loadedItemIds.size} total unique items)`,
);
} catch (e) {
console.error("Error saving index batch:", e);
self.postMessage({
type: "progress",
data: { status: "error", message: `Error saving index batch: ${e}` },
});
}
}
processedCount += batch.length;
@@ -448,7 +457,9 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
});
}
console.debug(`Processing complete. Total unique items in index: ${loadedItemIds.size}`);
console.debug(
`Processing complete. Total unique items in index: ${loadedItemIds.size}`,
);
self.postMessage({
type: "progress",
data: {
@@ -463,19 +474,15 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
async function resetWorker() {
console.debug("Resetting vector worker state...");
// Clear tracking
loadedItemIds.clear();
// Reset streaming session
if (streamingSession?.isActive) {
streamingSession.isActive = false;
streamingSession = null;
}
// Reset vector index
if (vectorIndex) {
try {
// Save current state before reset
await vectorIndex.saveIndex("indexedDB");
console.debug("Saved index before reset");
} catch (e) {
@@ -483,13 +490,14 @@ async function resetWorker() {
}
}
// Reinitialize
isInitialized = false;
vectorIndex = null;
await initWorker();
console.debug(`Vector worker reset complete. Loaded ${loadedItemIds.size} items.`);
console.debug(
`Vector worker reset complete. Loaded ${loadedItemIds.size} items.`,
);
self.postMessage({
type: "progress",
@@ -15,6 +15,9 @@ export class VectorWorkerManager {
private isInitialized = false;
private readyPromise: Promise<void> | null = null;
private progressCallback: ProgressCallback | null = null;
private initializationMutex = false;
private idleTimer: NodeJS.Timeout | null = null;
private lastActivityTime = 0;
private streamingSession: {
isActive: boolean;
@@ -23,7 +26,9 @@ export class VectorWorkerManager {
batchBuffer: IndexItem[];
batchSize: number;
flushTimer: NodeJS.Timeout | null;
jobId?: string; // Track which job owns the session
jobId?: string;
inactivityTimer: NodeJS.Timeout | null;
lastActivityTime: number;
} | null = null;
private constructor() {}
@@ -43,7 +48,6 @@ export class VectorWorkerManager {
console.debug("Lazy-loading vector worker...");
return new Promise<void>((resolve, reject) => {
// Terminate any existing worker before creating a new one
if (this.worker) {
console.debug("Terminating existing worker before creating new one");
this.worker.terminate();
@@ -62,8 +66,7 @@ export class VectorWorkerManager {
this.worker = null;
}
this.isInitialized = false;
// Don't reset readyPromise here to prevent race conditions
// It will be reset when a new initialization is attempted
reject(new Error("Worker initialization timed out"));
}, 10000);
@@ -75,6 +78,7 @@ export class VectorWorkerManager {
case "ready":
this.isInitialized = true;
clearTimeout(timeout);
this.updateActivity(); // Start idle timer after initialization
console.debug("Vector worker initialized and ready.");
resolve();
break;
@@ -90,10 +94,15 @@ export class VectorWorkerManager {
this.endStreamingSession();
}
// Dispatch search update when vectorization completes
window.dispatchEvent(new CustomEvent("dynamic-items-updated", {
detail: { incremental: true, jobId: "vectorization", vectorUpdate: true }
}));
window.dispatchEvent(
new CustomEvent("dynamic-items-updated", {
detail: {
incremental: true,
jobId: "vectorization",
vectorUpdate: true,
},
}),
);
}
}
break;
@@ -128,35 +137,73 @@ export class VectorWorkerManager {
this.isInitialized = false;
this.readyPromise = null;
this.progressCallback = null;
this.initializationMutex = false;
this.clearIdleTimer();
if (this.streamingSession?.isActive) {
this.endStreamingSession();
}
}
private startIdleTimer() {
this.clearIdleTimer();
this.idleTimer = setTimeout(() => {
if (!this.streamingSession?.isActive && this.isInitialized) {
console.debug("[VectorWorker] Auto-shutting down due to 2 minutes of inactivity");
this.resetWorkerState();
}
}, 120000); // 2 minutes
}
private clearIdleTimer() {
if (this.idleTimer) {
clearTimeout(this.idleTimer);
this.idleTimer = null;
}
}
private updateActivity() {
this.lastActivityTime = Date.now();
this.startIdleTimer();
}
private async ensureReady() {
// If we already have a ready promise, wait for it regardless of outcome
if (this.initializationMutex) {
while (this.initializationMutex) {
await new Promise((resolve) => setTimeout(resolve, 50));
}
if (this.isInitialized && this.worker) {
return;
}
}
if (this.readyPromise) {
try {
await this.readyPromise;
} catch (error) {
// If the previous initialization failed, reset state and try again
console.warn("Previous worker initialization failed, resetting state and retrying...", error);
console.warn(
"Previous worker initialization failed, resetting state and retrying...",
error,
);
this.resetWorkerState();
}
}
// Double-check if we're actually ready after waiting
if (this.isInitialized && this.worker) {
return;
}
// If we're not ready and there's no active promise, create one
if (!this.readyPromise) {
if (!this.readyPromise && !this.initializationMutex) {
console.warn("Worker not initialized, attempting init...");
this.readyPromise = this.initWorker();
this.initializationMutex = true;
try {
this.readyPromise = this.initWorker();
await this.readyPromise;
} finally {
this.initializationMutex = false;
}
}
await this.readyPromise;
if (!this.isInitialized || !this.worker) {
throw new Error(
"Vector Worker is not available after initialization attempt.",
@@ -165,27 +212,61 @@ export class VectorWorkerManager {
}
async processItems(items: IndexItem[], onProgress?: ProgressCallback) {
// Only initialize worker if we actually have items to process
if (items.length === 0) {
if (onProgress) {
onProgress({
status: "complete",
message: "No items to process"
});
}
return;
}
const uniqueItems = items.filter((item, index, arr) => {
return arr.findIndex((i) => i.id === item.id) === index;
});
if (uniqueItems.length !== items.length) {
console.debug(
`Filtered out ${items.length - uniqueItems.length} duplicate items before processing`,
);
}
// If after deduplication we have no items, don't initialize worker
if (uniqueItems.length === 0) {
if (onProgress) {
onProgress({
status: "complete",
message: "No unique items to process after deduplication"
});
}
return;
}
await this.ensureReady();
// Don't allow regular processing if streaming is active
if (this.streamingSession?.isActive) {
console.warn("Cannot process items while streaming session is active");
if (onProgress) {
onProgress({
status: "error",
message: "Cannot process items while streaming session is active"
message: "Cannot process items while streaming session is active",
});
}
return;
}
this.progressCallback = onProgress || null;
this.updateActivity();
console.debug(`Sending ${items.length} items to worker for processing.`);
console.debug(
`Sending ${uniqueItems.length} unique items to worker for processing.`,
);
this.worker!.postMessage({
type: "process",
data: { items: items },
data: { items: uniqueItems },
});
}
@@ -195,19 +276,22 @@ export class VectorWorkerManager {
batchSize: number = 10,
jobId?: string,
): Promise<void> {
// Only initialize if we expect items to process
if (totalExpectedItems === 0) {
console.debug("[VectorWorker] No items expected, not starting streaming session");
return;
}
await this.ensureReady();
// Check if another job already has an active streaming session
if (this.streamingSession?.isActive) {
if (this.streamingSession.jobId !== jobId) {
console.warn(`Cannot start streaming session for job ${jobId} - job ${this.streamingSession.jobId} already has an active session`);
if (onProgress) {
onProgress({
status: "error",
message: `Another job (${this.streamingSession.jobId}) already has an active streaming session`
});
}
return;
console.warn(
`Ending existing streaming session for job ${this.streamingSession.jobId} to start new session for job ${jobId}`,
);
await this.endStreamingSession();
await new Promise((resolve) => setTimeout(resolve, 100));
} else {
console.debug(`Streaming session for job ${jobId} already active`);
return;
@@ -215,6 +299,7 @@ export class VectorWorkerManager {
}
this.progressCallback = onProgress || null;
this.updateActivity();
this.streamingSession = {
isActive: true,
@@ -224,6 +309,8 @@ export class VectorWorkerManager {
batchSize,
flushTimer: null,
jobId,
inactivityTimer: null,
lastActivityTime: Date.now(),
};
console.debug(
@@ -252,7 +339,34 @@ export class VectorWorkerManager {
);
}
this.streamingSession.batchBuffer.push(...items);
const uniqueItems = items.filter((item, index, arr) => {
return arr.findIndex((i) => i.id === item.id) === index;
});
if (uniqueItems.length !== items.length) {
console.debug(
`[Streaming] Filtered out ${items.length - uniqueItems.length} duplicate items before streaming`,
);
}
if (uniqueItems.length > 0) {
this.streamingSession.batchBuffer.push(...uniqueItems);
this.streamingSession.lastActivityTime = Date.now();
this.updateActivity(); // Update worker activity
if (this.streamingSession.inactivityTimer) {
clearTimeout(this.streamingSession.inactivityTimer);
}
this.streamingSession.inactivityTimer = setTimeout(() => {
if (this.streamingSession?.isActive) {
console.debug(
"[VectorWorker] Auto-ending streaming session due to inactivity",
);
this.endStreamingSession();
}
}, 30000);
}
if (
this.streamingSession.batchBuffer.length >=
@@ -313,6 +427,10 @@ export class VectorWorkerManager {
clearTimeout(this.streamingSession.flushTimer);
}
if (this.streamingSession.inactivityTimer) {
clearTimeout(this.streamingSession.inactivityTimer);
}
this.streamingSession.isActive = false;
this.worker!.postMessage({
@@ -337,6 +455,7 @@ export class VectorWorkerManager {
return this.streamItems([item]);
}
isStreamingActive(): boolean {
return this.streamingSession?.isActive ?? false;
}