From 57b4daa9b7eec5e8c48d3335ef214b5853fe652f Mon Sep 17 00:00:00 2001 From: SethBurkart123 Date: Thu, 12 Jun 2025 14:45:36 +1000 Subject: [PATCH] feat: global search bug fixes and performance improvements --- .../src/components/SearchBar.svelte | 3 +- .../built-in/globalSearch/src/core/index.ts | 13 ++ .../globalSearch/src/core/mountSearchBar.ts | 13 +- .../built-in/globalSearch/src/indexing/db.ts | 71 ++++-- .../globalSearch/src/indexing/indexer.ts | 150 ++++++------- .../src/indexing/jobs/messages.ts | 202 +++++++++++++----- .../src/indexing/jobs/notifications.ts | 38 ++-- .../src/indexing/worker/vectorWorker.ts | 86 ++++---- .../indexing/worker/vectorWorkerManager.ts | 197 +++++++++++++---- 9 files changed, 519 insertions(+), 254 deletions(-) diff --git a/src/plugins/built-in/globalSearch/src/components/SearchBar.svelte b/src/plugins/built-in/globalSearch/src/components/SearchBar.svelte index 5427e4d9..12482816 100644 --- a/src/plugins/built-in/globalSearch/src/components/SearchBar.svelte +++ b/src/plugins/built-in/globalSearch/src/components/SearchBar.svelte @@ -24,7 +24,6 @@ searchHotkey: string }>(); - // Make searchHotkey reactive to setting changes let currentSearchHotkey = $state(initialSearchHotkey); let commandsFuse = $state>(); @@ -177,7 +176,7 @@ isLoading = false; }; - const debouncedPerformSearch = debounce(performSearch, 10); + const debouncedPerformSearch = debounce(performSearch, 200); $effect(() => { if (commandPalleteOpen) { diff --git a/src/plugins/built-in/globalSearch/src/core/index.ts b/src/plugins/built-in/globalSearch/src/core/index.ts index 44e2f70a..35ae03ba 100644 --- a/src/plugins/built-in/globalSearch/src/core/index.ts +++ b/src/plugins/built-in/globalSearch/src/core/index.ts @@ -126,6 +126,19 @@ const globalSearchPlugin: Plugin = { initVectorSearch(); + // Warm up vector worker in background to improve initial response time + setTimeout(async () => { + try { + const workerManager = VectorWorkerManager.getInstance(); + console.debug("[Global Search] Warming up vector worker..."); + // Just ensure the worker is ready, don't process anything yet + await workerManager.processItems([], () => {}); + console.debug("[Global Search] Vector worker warmed up successfully"); + } catch (error) { + console.warn("[Global Search] Vector worker warm-up failed:", error); + } + }, 1000); + // Add debug helpers to window for troubleshooting // @ts-ignore window.globalSearchDebug = { diff --git a/src/plugins/built-in/globalSearch/src/core/mountSearchBar.ts b/src/plugins/built-in/globalSearch/src/core/mountSearchBar.ts index d2c3cbb9..39e4ac5e 100644 --- a/src/plugins/built-in/globalSearch/src/core/mountSearchBar.ts +++ b/src/plugins/built-in/globalSearch/src/core/mountSearchBar.ts @@ -8,7 +8,7 @@ import browser from "webextension-polyfill"; export function mountSearchBar( titleElement: Element, api: any, - appRef: { current: any }, + appRef: { current: any; storageChangeHandler?: any }, ) { if (titleElement.querySelector(".search-trigger")) { return; @@ -49,6 +49,9 @@ export function mountSearchBar( browser.storage.onChanged.addListener(handleStorageChange); + // Store reference to cleanup function for proper removal + appRef.storageChangeHandler = handleStorageChange; + const searchRoot = document.createElement("div"); document.body.appendChild(searchRoot); const searchRootShadow = searchRoot.attachShadow({ mode: "open" }); @@ -69,7 +72,7 @@ export function mountSearchBar( } } -export function cleanupSearchBar(appRef: { current: any }) { +export function cleanupSearchBar(appRef: { current: any; storageChangeHandler?: any }) { if (appRef.current) { try { unmount(appRef.current); @@ -94,6 +97,8 @@ export function cleanupSearchBar(appRef: { current: any }) { // Clean up vector worker VectorWorkerManager.getInstance().terminate(); - // Remove storage listener - browser.storage.onChanged.removeListener(() => {}); + if (appRef.storageChangeHandler) { + browser.storage.onChanged.removeListener(appRef.storageChangeHandler); + appRef.storageChangeHandler = null; + } } diff --git a/src/plugins/built-in/globalSearch/src/indexing/db.ts b/src/plugins/built-in/globalSearch/src/indexing/db.ts index 6e4a0cc6..3c3046eb 100644 --- a/src/plugins/built-in/globalSearch/src/indexing/db.ts +++ b/src/plugins/built-in/globalSearch/src/indexing/db.ts @@ -3,19 +3,22 @@ const META_STORE = "meta"; const VERSION_KEY = "betterseqta-index-version"; let dbPromise: Promise | null = null; +let cachedDb: IDBDatabase | null = null; -// Get the current version from localStorage or start at 1 function getCurrentVersion(): number { const storedVersion = localStorage.getItem(VERSION_KEY); return storedVersion ? parseInt(storedVersion, 10) : 1; } -// Update the version in localStorage function updateVersion(version: number) { localStorage.setItem(VERSION_KEY, version.toString()); } function openDB(): Promise { + if (cachedDb && cachedDb.version >= getCurrentVersion()) { + return Promise.resolve(cachedDb); + } + if (dbPromise) return dbPromise; const currentVersion = getCurrentVersion(); @@ -26,8 +29,11 @@ function openDB(): Promise { try { request = indexedDB.open(DB_NAME, currentVersion); } catch (e) { - // If there's a version error, try to delete the database and start fresh console.warn("Database version conflict, recreating database..."); + if (cachedDb) { + cachedDb.close(); + cachedDb = null; + } indexedDB.deleteDatabase(DB_NAME); localStorage.removeItem(VERSION_KEY); request = indexedDB.open(DB_NAME, 1); @@ -38,22 +44,37 @@ function openDB(): Promise { const db = request.result; const existingStores = Array.from(db.objectStoreNames); - // Always ensure META_STORE exists if (!existingStores.includes(META_STORE)) { db.createObjectStore(META_STORE); } - // Update version in localStorage to match the database updateVersion(event.newVersion || 1); }; - request.onsuccess = () => resolve(request.result); + request.onsuccess = () => { + if (cachedDb && cachedDb !== request.result) { + cachedDb.close(); + } + cachedDb = request.result; + + cachedDb.onclose = () => { + cachedDb = null; + dbPromise = null; + }; + + resolve(request.result); + }; request.onerror = () => { console.error("Error opening database:", request.error); - // If there's an error, try to recover by deleting and recreating + + if (cachedDb) { + cachedDb.close(); + cachedDb = null; + } indexedDB.deleteDatabase(DB_NAME); localStorage.removeItem(VERSION_KEY); + dbPromise = null; reject(request.error); }; }); @@ -64,11 +85,12 @@ function openDB(): Promise { async function getStore(store: string, mode: IDBTransactionMode = "readonly") { const db = await openDB(); - // Create store dynamically if needed if (!db.objectStoreNames.contains(store)) { - db.close(); await upgradeDB(store); - return getStore(store, mode); + + const upgradedDb = await openDB(); + const tx = upgradedDb.transaction(store, mode); + return tx.objectStore(store); } const tx = db.transaction(store, mode); @@ -80,11 +102,11 @@ function upgradeDB(newStore: string): Promise { const currentVersion = getCurrentVersion(); const newVersion = currentVersion + 1; - // Close any existing connections - if (dbPromise) { - dbPromise.then((db) => db.close()); - dbPromise = null; + if (cachedDb) { + cachedDb.close(); + cachedDb = null; } + dbPromise = null; const request = indexedDB.open(DB_NAME, newVersion); @@ -93,11 +115,18 @@ function upgradeDB(newStore: string): Promise { if (!db.objectStoreNames.contains(newStore)) { db.createObjectStore(newStore); } - // Update version in localStorage + updateVersion(event.newVersion || newVersion); }; request.onsuccess = () => { + cachedDb = request.result; + + cachedDb.onclose = () => { + cachedDb = null; + dbPromise = null; + }; + dbPromise = Promise.resolve(request.result); resolve(); }; @@ -183,11 +212,17 @@ export async function clear(store: string): Promise { } } -// Helper function to reset the database if needed export async function resetDatabase(): Promise { + if (cachedDb) { + cachedDb.close(); + cachedDb = null; + } + if (dbPromise) { - const db = await dbPromise; - db.close(); + try { + const db = await dbPromise; + db.close(); + } catch (e) {} dbPromise = null; } diff --git a/src/plugins/built-in/globalSearch/src/indexing/indexer.ts b/src/plugins/built-in/globalSearch/src/indexing/indexer.ts index fe63c150..93873bfd 100644 --- a/src/plugins/built-in/globalSearch/src/indexing/indexer.ts +++ b/src/plugins/built-in/globalSearch/src/indexing/indexer.ts @@ -276,116 +276,102 @@ export async function runIndexing(): Promise { ); } - if (!hasStreamingJobs) { - const allItemsInPrimaryStores = await loadAllStoredItems(); + let allItemsInPrimaryStores = await loadAllStoredItems(); - if (allItemsInPrimaryStores.length > 0) { - console.debug( - `%c[Indexer] Sending ${allItemsInPrimaryStores.length} items from primary stores to worker for vectorization check...`, - "color: #4ea1ff", - ); - dispatchProgress(completedJobs, totalSteps, true, "Starting vectorization of stored items"); + if (allItemsInPrimaryStores.length > 0) { + console.debug( + `%c[Indexer] Sending ${allItemsInPrimaryStores.length} items from primary stores to worker for vectorization check...`, + "color: #4ea1ff", + ); + dispatchProgress(completedJobs, totalSteps, true, "Starting vectorization of stored items"); - try { - const workerManager = VectorWorkerManager.getInstance(); - await workerManager.processItems(allItemsInPrimaryStores, (progress) => { - let detailMessage = progress.message || ""; - if ( - progress.status === "processing" && - progress.total && - progress.processed !== undefined - ) { - detailMessage = `Vectorizing: ${progress.processed} / ${progress.total}`; - } else if (progress.status === "complete") { - detailMessage = "Vectorization complete"; - completedJobs++; + try { + const workerManager = VectorWorkerManager.getInstance(); + await workerManager.processItems(allItemsInPrimaryStores, (progress) => { + let detailMessage = progress.message || ""; + if ( + progress.status === "processing" && + progress.total && + progress.processed !== undefined + ) { + detailMessage = `Vectorizing: ${progress.processed} / ${progress.total}`; + } else if (progress.status === "complete") { + detailMessage = "Vectorization complete"; + completedJobs++; + dispatchProgress( + completedJobs, + totalSteps, + false, + "Indexing finished", + detailMessage + ); + } else if (progress.status === "error") { + detailMessage = `Vectorization error: ${progress.message}`; + dispatchProgress( + completedJobs, + totalSteps, + false, + "Vectorization failed", + detailMessage, + ); + } else if (progress.status === "started") { + detailMessage = `Vectorization started for ${progress.total} items`; + } else if (progress.status === "cancelled") { + detailMessage = `Vectorization cancelled: ${progress.message}`; + dispatchProgress( + completedJobs, + totalSteps, + false, + "Vectorization cancelled", + detailMessage, + ); + } + + if (progress.status !== "complete" && progress.status !== "error" && progress.status !== "cancelled") { dispatchProgress( completedJobs, totalSteps, - false, - "Indexing finished", - detailMessage - ); - } else if (progress.status === "error") { - detailMessage = `Vectorization error: ${progress.message}`; - dispatchProgress( - completedJobs, - totalSteps, - false, - "Vectorization failed", + true, + "Vectorization in progress", detailMessage, ); - } else if (progress.status === "started") { - detailMessage = `Vectorization started for ${progress.total} items`; - } else if (progress.status === "cancelled") { - detailMessage = `Vectorization cancelled: ${progress.message}`; - dispatchProgress( - completedJobs, - totalSteps, - false, - "Vectorization cancelled", - detailMessage, - ); - } - - if (progress.status !== "complete" && progress.status !== "error" && progress.status !== "cancelled") { - dispatchProgress( - completedJobs, - totalSteps, - true, - "Vectorization in progress", - detailMessage, - ); - } - }); - console.debug( - "%c[Indexer] Vectorization task for stored items sent to worker.", - "color: green", - ); - } catch (error) { - console.error( - `%c[Indexer] ❌ Failed to send items to vector worker:`, - "color: red", - error, - ); - dispatchProgress( - completedJobs, - totalSteps, - false, - "Vectorization failed", - String(error), - ); - } - } else { + } + }); console.debug( - "%c[Indexer] No items found in primary stores to send for vectorization.", - "color: gray", + "%c[Indexer] Vectorization task for stored items sent to worker.", + "color: green", + ); + } catch (error) { + console.error( + `%c[Indexer] ❌ Failed to send items to vector worker:`, + "color: red", + error, ); - completedJobs++; dispatchProgress( completedJobs, totalSteps, false, - "Indexing finished (no items for vectorization)", + "Vectorization failed", + String(error), ); } } else { console.debug( - "%c[Indexer] Skipping bulk vectorization - streaming jobs will handle vectorization", - "color: #4ea1ff", + "%c[Indexer] No items found in primary stores to send for vectorization.", + "color: gray", ); completedJobs++; dispatchProgress( completedJobs, totalSteps, false, - "Indexing finished (streaming vectorization active)", + "Indexing finished (no items for vectorization)", ); } stopHeartbeat(); - const allItemsInPrimaryStores = await loadAllStoredItems(); + allItemsInPrimaryStores = await loadAllStoredItems(); allItemsInPrimaryStores.forEach(item => { const jobDef = jobs[item.category] || Object.values(jobs).find(j => j.id === item.category) || jobs[item.renderComponentId]; if (jobDef) { diff --git a/src/plugins/built-in/globalSearch/src/indexing/jobs/messages.ts b/src/plugins/built-in/globalSearch/src/indexing/jobs/messages.ts index c2ef9663..f3faf700 100644 --- a/src/plugins/built-in/globalSearch/src/indexing/jobs/messages.ts +++ b/src/plugins/built-in/globalSearch/src/indexing/jobs/messages.ts @@ -8,18 +8,20 @@ import { renderComponentMap } from "../renderComponents"; import { jobs } from "../jobs"; const RATE_LIMIT_CONFIG = { - minDelay: 50, - maxDelay: 5000, - baseDelay: 200, - backoffMultiplier: 1.5, + minDelay: 30, + maxDelay: 3000, + baseDelay: 150, + backoffMultiplier: 1.3, maxRetries: 3, adaptiveBatchSize: true, - minBatchSize: 10, - maxBatchSize: 100, - baseBatchSize: 50, - vectorBatchSize: 5, - parallelRequests: 5, - parallelDelay: 100, + minBatchSize: 15, + maxBatchSize: 150, + baseBatchSize: 75, + vectorBatchSize: 10, + parallelRequests: 8, + parallelDelay: 50, + circuitBreakerThreshold: 5, + circuitBreakerResetTime: 30000, }; interface MessagesProgress { @@ -33,6 +35,9 @@ interface MessagesProgress { processedIds: string[]; streamingStarted: boolean; totalEstimated: number; + circuitBreakerOpen: boolean; + circuitBreakerOpenTime: number; + consecutiveFailures: number; } const fetchMessages = async (offset = 0, limit = 100) => { @@ -99,50 +104,38 @@ function calculateAdaptiveDelay( progress: MessagesProgress, responseTime: number, ): number { - const { currentDelay, failedRequests, lastSuccessTime } = progress; + const { + currentDelay, + failedRequests, + lastSuccessTime, + circuitBreakerOpen, + consecutiveFailures, + } = progress; const timeSinceLastSuccess = Date.now() - lastSuccessTime; - if (failedRequests > 0 || responseTime > 2000) { + if (circuitBreakerOpen) { + return RATE_LIMIT_CONFIG.maxDelay; + } + + if (consecutiveFailures > 2 || failedRequests > 3 || responseTime > 3000) { return Math.min( - currentDelay * RATE_LIMIT_CONFIG.backoffMultiplier, + currentDelay * + (RATE_LIMIT_CONFIG.backoffMultiplier + consecutiveFailures * 0.2), RATE_LIMIT_CONFIG.maxDelay, ); } - if (responseTime < 500 && timeSinceLastSuccess > 10000) { - return Math.max(currentDelay * 0.8, RATE_LIMIT_CONFIG.minDelay); + if ( + responseTime < 300 && + timeSinceLastSuccess > 5000 && + consecutiveFailures === 0 + ) { + return Math.max(currentDelay * 0.7, RATE_LIMIT_CONFIG.minDelay); } return currentDelay; } -function calculateAdaptiveBatchSize( - progress: MessagesProgress, - responseTime: number, -): number { - if (!RATE_LIMIT_CONFIG.adaptiveBatchSize) { - return progress.currentBatchSize; - } - - const { currentBatchSize, failedRequests } = progress; - - if (failedRequests > 2 || responseTime > 3000) { - return Math.max( - Math.floor(currentBatchSize * 0.7), - RATE_LIMIT_CONFIG.minBatchSize, - ); - } - - if (failedRequests === 0 && responseTime < 1000) { - return Math.min( - Math.floor(currentBatchSize * 1.2), - RATE_LIMIT_CONFIG.maxBatchSize, - ); - } - - return currentBatchSize; -} - async function estimateMessageCount(): Promise { try { const firstBatch = await fetchMessages(0, 100); @@ -157,6 +150,73 @@ async function estimateMessageCount(): Promise { } } +function calculateAdaptiveBatchSize( + progress: MessagesProgress, + responseTime: number, +): number { + if (!RATE_LIMIT_CONFIG.adaptiveBatchSize) { + return progress.currentBatchSize; + } + + const { + currentBatchSize, + failedRequests, + circuitBreakerOpen, + consecutiveFailures, + } = progress; + + if (circuitBreakerOpen) { + return RATE_LIMIT_CONFIG.minBatchSize; + } + + if (consecutiveFailures > 1 || failedRequests > 2 || responseTime > 2500) { + return Math.max( + Math.floor(currentBatchSize * 0.6), + RATE_LIMIT_CONFIG.minBatchSize, + ); + } + + if (failedRequests === 0 && responseTime < 800 && consecutiveFailures === 0) { + return Math.min( + Math.floor(currentBatchSize * 1.4), + RATE_LIMIT_CONFIG.maxBatchSize, + ); + } + + return currentBatchSize; +} + +function checkCircuitBreaker(progress: MessagesProgress): boolean { + const now = Date.now(); + + if ( + !progress.circuitBreakerOpen && + progress.consecutiveFailures >= RATE_LIMIT_CONFIG.circuitBreakerThreshold + ) { + progress.circuitBreakerOpen = true; + progress.circuitBreakerOpenTime = now; + console.warn( + `[Messages job] Circuit breaker opened due to ${progress.consecutiveFailures} consecutive failures`, + ); + return true; + } + + if ( + progress.circuitBreakerOpen && + now - progress.circuitBreakerOpenTime > + RATE_LIMIT_CONFIG.circuitBreakerResetTime + ) { + progress.circuitBreakerOpen = false; + progress.consecutiveFailures = 0; + console.info( + `[Messages job] Circuit breaker closed after ${RATE_LIMIT_CONFIG.circuitBreakerResetTime}ms`, + ); + return false; + } + + return progress.circuitBreakerOpen; +} + async function processMessagesInParallel( messages: any[], existingIds: Set, @@ -173,21 +233,19 @@ async function processMessagesInParallel( let consecutiveExisting = 0; const updatedProgress = { ...progress }; - // Filter out messages older than 2 years const twoYearsAgo = Date.now() - 2 * 365 * 24 * 60 * 60 * 1000; let shouldStop = false; - + const messagesToProcess = messages.filter((msg) => { const id = msg.id.toString(); const messageDate = new Date(msg.date).getTime(); - - // If we encounter a message older than 2 years, we should stop processing - // since messages are sorted by date descending + if (messageDate < twoYearsAgo) { + //! older than 2 years ago shouldStop = true; return false; } - + if (existingIds.has(id) || processedIdsSet.has(id)) { consecutiveExisting++; return false; @@ -320,6 +378,9 @@ export const messagesJob: Job = { processedIds: [], streamingStarted: false, totalEstimated: 0, + circuitBreakerOpen: false, + circuitBreakerOpenTime: 0, + consecutiveFailures: 0, }; const existingIds = new Set((await ctx.getStoredItems()).map((i) => i.id)); @@ -451,6 +512,14 @@ export const messagesJob: Job = { } while (!progress.done) { + if (checkCircuitBreaker(progress)) { + console.warn( + "[Messages job] Circuit breaker is open, skipping processing", + ); + await delay(RATE_LIMIT_CONFIG.maxDelay); + continue; + } + await delay(progress.currentDelay); requestStartTime = Date.now(); @@ -459,6 +528,8 @@ export const messagesJob: Job = { list = await fetchMessages(progress.offset, progress.currentBatchSize); const responseTime = Date.now() - requestStartTime; + progress.consecutiveFailures = 0; + progress.currentDelay = calculateAdaptiveDelay(progress, responseTime); progress.currentBatchSize = calculateAdaptiveBatchSize( progress, @@ -467,6 +538,7 @@ export const messagesJob: Job = { } catch (e) { console.error("[Messages job] list fetch failed:", e); progress.failedRequests++; + progress.consecutiveFailures++; progress.currentDelay = Math.min( progress.currentDelay * RATE_LIMIT_CONFIG.backoffMultiplier, RATE_LIMIT_CONFIG.maxDelay, @@ -479,6 +551,7 @@ export const messagesJob: Job = { if (list.status !== "200") { progress.failedRequests++; + progress.consecutiveFailures++; progress.processedIds = Array.from(processedIdsSet); await ctx.setProgress(progress); @@ -507,7 +580,6 @@ export const messagesJob: Job = { itemsToStream.push(...processedItems); - // Update consecutive existing counter consecutiveExisting = newConsecutiveExisting; if (consecutiveExisting >= 20) { progress.done = true; @@ -529,14 +601,17 @@ export const messagesJob: Job = { } } - // Dispatch incremental search update if we processed new items if (processedItems.length > 0) { try { const currentItems = await loadAllStoredItems(); - currentItems.forEach(item => { - const jobDef = jobs[item.category] || Object.values(jobs).find(j => j.id === item.category) || jobs[item.renderComponentId]; + currentItems.forEach((item) => { + const jobDef = + jobs[item.category] || + Object.values(jobs).find((j) => j.id === item.category) || + jobs[item.renderComponentId]; if (jobDef) { - const renderComponent = renderComponentMap[jobDef.renderComponentId]; + const renderComponent = + renderComponentMap[jobDef.renderComponentId]; if (renderComponent) { item.renderComponent = renderComponent; } @@ -545,11 +620,21 @@ export const messagesJob: Job = { } }); loadDynamicItems(currentItems); - window.dispatchEvent(new CustomEvent("dynamic-items-updated", { - detail: { incremental: true, jobId: "messages", newItemCount: processedItems.length, streaming: true } - })); + window.dispatchEvent( + new CustomEvent("dynamic-items-updated", { + detail: { + incremental: true, + jobId: "messages", + newItemCount: processedItems.length, + streaming: true, + }, + }), + ); } catch (error) { - console.warn("[Messages job] Failed to dispatch incremental search update:", error); + console.warn( + "[Messages job] Failed to dispatch incremental search update:", + error, + ); } } @@ -596,6 +681,9 @@ export const messagesJob: Job = { processedIds: [], streamingStarted: false, totalEstimated: 0, + circuitBreakerOpen: false, + circuitBreakerOpenTime: 0, + consecutiveFailures: 0, }); } else { progress.processedIds = Array.from(processedIdsSet); diff --git a/src/plugins/built-in/globalSearch/src/indexing/jobs/notifications.ts b/src/plugins/built-in/globalSearch/src/indexing/jobs/notifications.ts index 06d617d2..ecae38f8 100644 --- a/src/plugins/built-in/globalSearch/src/indexing/jobs/notifications.ts +++ b/src/plugins/built-in/globalSearch/src/indexing/jobs/notifications.ts @@ -309,10 +309,7 @@ export const notificationsJob: Job = { await delay(NOTIFICATIONS_RATE_LIMIT.batchDelay); } - const { success, item } = await processNotification( - notif, - ctx, - ); + const { success, item } = await processNotification(notif, ctx); if (!success) { if (progress.retryQueue.length < 10) { progress.retryQueue.push(notif.notificationID); @@ -371,27 +368,42 @@ export const notificationsJob: Job = { if (progressUpdateCounter >= 5) { await ctx.setProgress(progress); progressUpdateCounter = 0; - + if (items.length > 0) { try { const currentItems = await loadAllStoredItems(); - currentItems.forEach(item => { - const jobDef = jobs[item.category] || Object.values(jobs).find(j => j.id === item.category) || jobs[item.renderComponentId]; + currentItems.forEach((item) => { + const jobDef = + jobs[item.category] || + Object.values(jobs).find((j) => j.id === item.category) || + jobs[item.renderComponentId]; if (jobDef) { - const renderComponent = renderComponentMap[jobDef.renderComponentId]; + const renderComponent = + renderComponentMap[jobDef.renderComponentId]; if (renderComponent) { item.renderComponent = renderComponent; } } else if (renderComponentMap[item.renderComponentId]) { - item.renderComponent = renderComponentMap[item.renderComponentId]; + item.renderComponent = + renderComponentMap[item.renderComponentId]; } }); loadDynamicItems(currentItems); - window.dispatchEvent(new CustomEvent("dynamic-items-updated", { - detail: { incremental: true, jobId: "notifications", newItemCount: items.length, streaming: true } - })); + window.dispatchEvent( + new CustomEvent("dynamic-items-updated", { + detail: { + incremental: true, + jobId: "notifications", + newItemCount: items.length, + streaming: true, + }, + }), + ); } catch (error) { - console.warn("[Notifications job] Failed to dispatch incremental search update:", error); + console.warn( + "[Notifications job] Failed to dispatch incremental search update:", + error, + ); } } } diff --git a/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorker.ts b/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorker.ts index d7cac934..4f4c3cb2 100644 --- a/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorker.ts +++ b/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorker.ts @@ -4,7 +4,7 @@ import type { IndexItem } from "../types"; let vectorIndex: EmbeddingIndex | null = null; let isInitialized = false; let currentAbortController: AbortController | null = null; -let loadedItemIds = new Set(); // Track loaded items to prevent duplicates +let loadedItemIds = new Set(); let streamingSession: { isActive: boolean; @@ -26,22 +26,19 @@ async function initWorker() { await initializeModel(); vectorIndex = new EmbeddingIndex([]); - // Load existing items but track them to prevent duplicates const stored = await vectorIndex.getAllObjectsFromIndexedDB(); if (stored.length > 0) { console.debug(`Found ${stored.length} existing items in IndexedDB`); - - // Clear any existing items from memory first + loadedItemIds.clear(); - - // Add items and track their IDs + stored.forEach((item) => { if (item.id && !loadedItemIds.has(item.id)) { vectorIndex!.add(item); loadedItemIds.add(item.id); } }); - + console.debug( `Vector index loaded ${loadedItemIds.size} unique items from IndexedDB.`, ); @@ -168,7 +165,6 @@ async function processStreamingItems() { streamingSession.batchSize, ); - // Use our tracking set for more efficient deduplication const unprocessedItems = batchToProcess.filter((item) => { return item.id && !loadedItemIds.has(item.id); }); @@ -190,12 +186,12 @@ async function processStreamingItems() { try { successfullyVectorized.forEach((item) => { vectorIndex!.add(item); - loadedItemIds.add(item.id); // Track the added item + loadedItemIds.add(item.id); }); if ( - streamingSession.totalProcessed % (streamingSession.batchSize * 15) === - 0 + streamingSession.totalProcessed % 50 === 0 || + loadedItemIds.size % 200 === 0 ) { await vectorIndex!.saveIndex("indexedDB"); console.debug( @@ -328,7 +324,6 @@ async function processItems(items: IndexItem[], signal: AbortSignal) { } } - // Use our tracking set for more efficient deduplication const unprocessedItems = items.filter((item) => { if (signal.aborted) return false; return item.id && !loadedItemIds.has(item.id); @@ -347,15 +342,22 @@ async function processItems(items: IndexItem[], signal: AbortSignal) { } if (unprocessedItems.length === 0) { - console.debug(`No new items to process. ${loadedItemIds.size} items already in index.`); + console.debug( + `No new items to process. ${loadedItemIds.size} items already in index.`, + ); self.postMessage({ type: "progress", - data: { status: "complete", message: `No new items to process (${loadedItemIds.size} items already indexed)` }, + data: { + status: "complete", + message: `No new items to process (${loadedItemIds.size} items already indexed)`, + }, }); return; } - console.debug(`Starting processing of ${unprocessedItems.length} items (${items.length - unprocessedItems.length} already processed).`); + console.debug( + `Starting processing of ${unprocessedItems.length} items (${items.length - unprocessedItems.length} already processed).`, + ); self.postMessage({ type: "progress", data: { @@ -402,7 +404,7 @@ async function processItems(items: IndexItem[], signal: AbortSignal) { try { successfullyVectorized.forEach((item) => { vectorIndex!.add(item); - loadedItemIds.add(item.id); // Track the added item + loadedItemIds.add(item.id); }); } catch (e) { console.error("Error adding batch to index:", e); @@ -425,15 +427,22 @@ async function processItems(items: IndexItem[], signal: AbortSignal) { return; } - try { - await vectorIndex!.saveIndex("indexedDB"); - console.debug(`Saved index after processing batch ${i / BATCH_SIZE + 1} (${loadedItemIds.size} total unique items)`); - } catch (e) { - console.error("Error saving index batch:", e); - self.postMessage({ - type: "progress", - data: { status: "error", message: `Error saving index batch: ${e}` }, - }); + if ( + (i / BATCH_SIZE + 1) % 3 === 0 || + i + BATCH_SIZE >= unprocessedItems.length + ) { + try { + await vectorIndex!.saveIndex("indexedDB"); + console.debug( + `Saved index after processing batch ${i / BATCH_SIZE + 1} (${loadedItemIds.size} total unique items)`, + ); + } catch (e) { + console.error("Error saving index batch:", e); + self.postMessage({ + type: "progress", + data: { status: "error", message: `Error saving index batch: ${e}` }, + }); + } } processedCount += batch.length; @@ -448,7 +457,9 @@ async function processItems(items: IndexItem[], signal: AbortSignal) { }); } - console.debug(`Processing complete. Total unique items in index: ${loadedItemIds.size}`); + console.debug( + `Processing complete. Total unique items in index: ${loadedItemIds.size}`, + ); self.postMessage({ type: "progress", data: { @@ -462,35 +473,32 @@ async function processItems(items: IndexItem[], signal: AbortSignal) { async function resetWorker() { console.debug("Resetting vector worker state..."); - - // Clear tracking + loadedItemIds.clear(); - - // Reset streaming session + if (streamingSession?.isActive) { streamingSession.isActive = false; streamingSession = null; } - - // Reset vector index + if (vectorIndex) { try { - // Save current state before reset await vectorIndex.saveIndex("indexedDB"); console.debug("Saved index before reset"); } catch (e) { console.warn("Error saving index before reset:", e); } } - - // Reinitialize + isInitialized = false; vectorIndex = null; - + await initWorker(); - - console.debug(`Vector worker reset complete. Loaded ${loadedItemIds.size} items.`); - + + console.debug( + `Vector worker reset complete. Loaded ${loadedItemIds.size} items.`, + ); + self.postMessage({ type: "progress", data: { diff --git a/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorkerManager.ts b/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorkerManager.ts index c016353a..71f2f252 100644 --- a/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorkerManager.ts +++ b/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorkerManager.ts @@ -15,6 +15,9 @@ export class VectorWorkerManager { private isInitialized = false; private readyPromise: Promise | null = null; private progressCallback: ProgressCallback | null = null; + private initializationMutex = false; + private idleTimer: NodeJS.Timeout | null = null; + private lastActivityTime = 0; private streamingSession: { isActive: boolean; @@ -23,7 +26,9 @@ export class VectorWorkerManager { batchBuffer: IndexItem[]; batchSize: number; flushTimer: NodeJS.Timeout | null; - jobId?: string; // Track which job owns the session + jobId?: string; + inactivityTimer: NodeJS.Timeout | null; + lastActivityTime: number; } | null = null; private constructor() {} @@ -43,13 +48,12 @@ export class VectorWorkerManager { console.debug("Lazy-loading vector worker..."); return new Promise((resolve, reject) => { - // Terminate any existing worker before creating a new one if (this.worker) { console.debug("Terminating existing worker before creating new one"); this.worker.terminate(); this.worker = null; } - + console.debug("Creating new vector worker instance"); this.worker = vectorWorker(); @@ -62,8 +66,7 @@ export class VectorWorkerManager { this.worker = null; } this.isInitialized = false; - // Don't reset readyPromise here to prevent race conditions - // It will be reset when a new initialization is attempted + reject(new Error("Worker initialization timed out")); }, 10000); @@ -75,6 +78,7 @@ export class VectorWorkerManager { case "ready": this.isInitialized = true; clearTimeout(timeout); + this.updateActivity(); // Start idle timer after initialization console.debug("Vector worker initialized and ready."); resolve(); break; @@ -89,11 +93,16 @@ export class VectorWorkerManager { if (this.streamingSession?.isActive) { this.endStreamingSession(); } - - // Dispatch search update when vectorization completes - window.dispatchEvent(new CustomEvent("dynamic-items-updated", { - detail: { incremental: true, jobId: "vectorization", vectorUpdate: true } - })); + + window.dispatchEvent( + new CustomEvent("dynamic-items-updated", { + detail: { + incremental: true, + jobId: "vectorization", + vectorUpdate: true, + }, + }), + ); } } break; @@ -128,35 +137,73 @@ export class VectorWorkerManager { this.isInitialized = false; this.readyPromise = null; this.progressCallback = null; + this.initializationMutex = false; + this.clearIdleTimer(); if (this.streamingSession?.isActive) { this.endStreamingSession(); } } + private startIdleTimer() { + this.clearIdleTimer(); + this.idleTimer = setTimeout(() => { + if (!this.streamingSession?.isActive && this.isInitialized) { + console.debug("[VectorWorker] Auto-shutting down due to 2 minutes of inactivity"); + this.resetWorkerState(); + } + }, 120000); // 2 minutes + } + + private clearIdleTimer() { + if (this.idleTimer) { + clearTimeout(this.idleTimer); + this.idleTimer = null; + } + } + + private updateActivity() { + this.lastActivityTime = Date.now(); + this.startIdleTimer(); + } + private async ensureReady() { - // If we already have a ready promise, wait for it regardless of outcome + if (this.initializationMutex) { + while (this.initializationMutex) { + await new Promise((resolve) => setTimeout(resolve, 50)); + } + + if (this.isInitialized && this.worker) { + return; + } + } + if (this.readyPromise) { try { await this.readyPromise; } catch (error) { - // If the previous initialization failed, reset state and try again - console.warn("Previous worker initialization failed, resetting state and retrying...", error); + console.warn( + "Previous worker initialization failed, resetting state and retrying...", + error, + ); this.resetWorkerState(); } } - - // Double-check if we're actually ready after waiting + if (this.isInitialized && this.worker) { return; } - - // If we're not ready and there's no active promise, create one - if (!this.readyPromise) { + + if (!this.readyPromise && !this.initializationMutex) { console.warn("Worker not initialized, attempting init..."); - this.readyPromise = this.initWorker(); + this.initializationMutex = true; + try { + this.readyPromise = this.initWorker(); + await this.readyPromise; + } finally { + this.initializationMutex = false; + } } - - await this.readyPromise; + if (!this.isInitialized || !this.worker) { throw new Error( "Vector Worker is not available after initialization attempt.", @@ -165,27 +212,61 @@ export class VectorWorkerManager { } async processItems(items: IndexItem[], onProgress?: ProgressCallback) { + // Only initialize worker if we actually have items to process + if (items.length === 0) { + if (onProgress) { + onProgress({ + status: "complete", + message: "No items to process" + }); + } + return; + } + + const uniqueItems = items.filter((item, index, arr) => { + return arr.findIndex((i) => i.id === item.id) === index; + }); + + if (uniqueItems.length !== items.length) { + console.debug( + `Filtered out ${items.length - uniqueItems.length} duplicate items before processing`, + ); + } + + // If after deduplication we have no items, don't initialize worker + if (uniqueItems.length === 0) { + if (onProgress) { + onProgress({ + status: "complete", + message: "No unique items to process after deduplication" + }); + } + return; + } + await this.ensureReady(); - // Don't allow regular processing if streaming is active if (this.streamingSession?.isActive) { console.warn("Cannot process items while streaming session is active"); if (onProgress) { onProgress({ status: "error", - message: "Cannot process items while streaming session is active" + message: "Cannot process items while streaming session is active", }); } return; } this.progressCallback = onProgress || null; + this.updateActivity(); - console.debug(`Sending ${items.length} items to worker for processing.`); + console.debug( + `Sending ${uniqueItems.length} unique items to worker for processing.`, + ); this.worker!.postMessage({ type: "process", - data: { items: items }, + data: { items: uniqueItems }, }); } @@ -195,19 +276,22 @@ export class VectorWorkerManager { batchSize: number = 10, jobId?: string, ): Promise { + // Only initialize if we expect items to process + if (totalExpectedItems === 0) { + console.debug("[VectorWorker] No items expected, not starting streaming session"); + return; + } + await this.ensureReady(); - // Check if another job already has an active streaming session if (this.streamingSession?.isActive) { if (this.streamingSession.jobId !== jobId) { - console.warn(`Cannot start streaming session for job ${jobId} - job ${this.streamingSession.jobId} already has an active session`); - if (onProgress) { - onProgress({ - status: "error", - message: `Another job (${this.streamingSession.jobId}) already has an active streaming session` - }); - } - return; + console.warn( + `Ending existing streaming session for job ${this.streamingSession.jobId} to start new session for job ${jobId}`, + ); + await this.endStreamingSession(); + + await new Promise((resolve) => setTimeout(resolve, 100)); } else { console.debug(`Streaming session for job ${jobId} already active`); return; @@ -215,6 +299,7 @@ export class VectorWorkerManager { } this.progressCallback = onProgress || null; + this.updateActivity(); this.streamingSession = { isActive: true, @@ -224,6 +309,8 @@ export class VectorWorkerManager { batchSize, flushTimer: null, jobId, + inactivityTimer: null, + lastActivityTime: Date.now(), }; console.debug( @@ -252,7 +339,34 @@ export class VectorWorkerManager { ); } - this.streamingSession.batchBuffer.push(...items); + const uniqueItems = items.filter((item, index, arr) => { + return arr.findIndex((i) => i.id === item.id) === index; + }); + + if (uniqueItems.length !== items.length) { + console.debug( + `[Streaming] Filtered out ${items.length - uniqueItems.length} duplicate items before streaming`, + ); + } + + if (uniqueItems.length > 0) { + this.streamingSession.batchBuffer.push(...uniqueItems); + this.streamingSession.lastActivityTime = Date.now(); + this.updateActivity(); // Update worker activity + + if (this.streamingSession.inactivityTimer) { + clearTimeout(this.streamingSession.inactivityTimer); + } + + this.streamingSession.inactivityTimer = setTimeout(() => { + if (this.streamingSession?.isActive) { + console.debug( + "[VectorWorker] Auto-ending streaming session due to inactivity", + ); + this.endStreamingSession(); + } + }, 30000); + } if ( this.streamingSession.batchBuffer.length >= @@ -313,6 +427,10 @@ export class VectorWorkerManager { clearTimeout(this.streamingSession.flushTimer); } + if (this.streamingSession.inactivityTimer) { + clearTimeout(this.streamingSession.inactivityTimer); + } + this.streamingSession.isActive = false; this.worker!.postMessage({ @@ -337,6 +455,7 @@ export class VectorWorkerManager { return this.streamItems([item]); } + isStreamingActive(): boolean { return this.streamingSession?.isActive ?? false; } @@ -364,15 +483,15 @@ export class VectorWorkerManager { async resetWorker(): Promise { console.debug("Resetting vector worker..."); - + if (this.streamingSession?.isActive) { await this.endStreamingSession(); } - + await this.ensureReady(); - + this.worker!.postMessage({ type: "reset" }); - + console.debug("Reset command sent to worker"); } }