From 854c6ea826160ae384ee3d5fcd772c088291b4f2 Mon Sep 17 00:00:00 2001 From: SethBurkart123 Date: Sun, 25 May 2025 22:28:40 +1000 Subject: [PATCH] fix: indexer saving infinite items, other improvements --- package.json | 2 +- .../src/components/SearchBar.svelte | 5 +- .../built-in/globalSearch/src/core/index.ts | 44 ++++ .../globalSearch/src/indexing/indexer.ts | 245 +++++++++++------- .../src/indexing/jobs/messages.ts | 59 ++++- .../src/indexing/jobs/notifications.ts | 28 ++ .../src/indexing/worker/vectorWorker.ts | 124 ++++++--- .../indexing/worker/vectorWorkerManager.ts | 110 ++++++-- 8 files changed, 465 insertions(+), 152 deletions(-) diff --git a/package.json b/package.json index 22c5496b..ef4861f2 100644 --- a/package.json +++ b/package.json @@ -78,7 +78,7 @@ "codemirror": "^6.0.1", "color": "^5.0.0", "dompurify": "^3.2.4", - "embeddia": "^1.1.4", + "embeddia": "^1.2.1", "embla-carousel-autoplay": "^8.5.2", "embla-carousel-svelte": "^8.5.2", "esbuild": "^0.25.3", diff --git a/src/plugins/built-in/globalSearch/src/components/SearchBar.svelte b/src/plugins/built-in/globalSearch/src/components/SearchBar.svelte index 1be0b469..0498c629 100644 --- a/src/plugins/built-in/globalSearch/src/components/SearchBar.svelte +++ b/src/plugins/built-in/globalSearch/src/components/SearchBar.svelte @@ -122,7 +122,6 @@ window.addEventListener('indexing-progress', progressHandler as EventListener); const itemsUpdatedHandler = () => { - console.log('Search Bar received items-updated event, re-indexing...'); setupSearchIndexes(); performSearch(); }; @@ -387,7 +386,7 @@
{#if combinedResults.length > 0 || calculatorResult} -
+
{@render Shortcut({ text: 'Navigate', keybind: ['↑', '↓']})} {#if calculatorResult && selectedIndex === 0} @@ -421,7 +420,7 @@
{#each keybind as key} - {key} + {key} {/each}
{text} diff --git a/src/plugins/built-in/globalSearch/src/core/index.ts b/src/plugins/built-in/globalSearch/src/core/index.ts index 71ed2369..44e2f70a 100644 --- a/src/plugins/built-in/globalSearch/src/core/index.ts +++ b/src/plugins/built-in/globalSearch/src/core/index.ts @@ -13,6 +13,7 @@ import { runIndexing } from "../indexing/indexer"; import { initVectorSearch } from "../search/vector/vectorSearch"; import { cleanupSearchBar, mountSearchBar } from "./mountSearchBar"; import { IndexedDbManager } from "embeddia"; +import { VectorWorkerManager } from "../indexing/worker/vectorWorkerManager"; // Platform-aware default hotkey const getDefaultHotkey = () => { @@ -48,6 +49,15 @@ const settings = defineSettings({ const confirmed = confirm("Are you sure you want to reset the search index and storage?"); if (confirmed) { + try { + // Reset the vector worker first + const workerManager = VectorWorkerManager.getInstance(); + await workerManager.resetWorker(); + console.log("Vector worker reset successfully"); + } catch (e) { + console.warn("Failed to reset vector worker:", e); + } + // Delete both 'embeddiaDB' and 'betterseqta-index' using native IndexedDB APIs const deleteDb = (dbName: string) => { return new Promise((resolve, reject) => { @@ -116,6 +126,40 @@ const globalSearchPlugin: Plugin = { initVectorSearch(); + // Add debug helpers to window for troubleshooting + // @ts-ignore + window.globalSearchDebug = { + resetWorker: async () => { + const workerManager = VectorWorkerManager.getInstance(); + await workerManager.resetWorker(); + console.log("Vector worker reset via debug helper"); + }, + checkWorkerStatus: () => { + const workerManager = VectorWorkerManager.getInstance(); + console.log("Streaming active:", workerManager.isStreamingActive()); + }, + checkIndexedDBSize: async () => { + try { + const estimate = await navigator.storage.estimate(); + console.log("Storage estimate:", estimate); + + // Check embeddiaDB size + const dbRequest = indexedDB.open("embeddiaDB"); + dbRequest.onsuccess = () => { + const db = dbRequest.result; + const transaction = db.transaction(["embeddiaObjectStore"], "readonly"); + const store = transaction.objectStore("embeddiaObjectStore"); + const countRequest = store.count(); + countRequest.onsuccess = () => { + console.log("embeddiaDB item count:", countRequest.result); + }; + }; + } catch (e) { + console.error("Error checking storage:", e); + } + } + }; + if (api.settings.runIndexingOnLoad) { setTimeout(async () => { await runIndexing(); diff --git a/src/plugins/built-in/globalSearch/src/indexing/indexer.ts b/src/plugins/built-in/globalSearch/src/indexing/indexer.ts index e7b8aabc..fe63c150 100644 --- a/src/plugins/built-in/globalSearch/src/indexing/indexer.ts +++ b/src/plugins/built-in/globalSearch/src/indexing/indexer.ts @@ -9,6 +9,7 @@ const META_STORE = "meta"; const LOCK_KEY = "bsq-indexer-lock"; const HEARTBEAT_INTERVAL = 10000; const LOCK_TIMEOUT = 20000; +const LOCK_ACQUIRE_TIMEOUT = 5000; /* ─────────── Progress‑meta helpers ─────────── */ async function loadProgress(jobId: string): Promise { @@ -16,15 +17,13 @@ async function loadProgress(jobId: string): Promise { return rec?.progress as T | undefined; } -async function saveProgress( - jobId: string, - progress: T, -): Promise { - await put(META_STORE, { jobId, progress }, `progress:${jobId}`); +async function saveProgress(jobId: string, progress: T): Promise { + await put(META_STORE, { progress }, `progress:${jobId}`); } /* ───────────────────────────────────────────── */ let heartbeatTimer: ReturnType | null = null; +let isIndexingActive = false; function shouldRun(job: Job, lastRun?: number): boolean { const now = Date.now(); @@ -54,21 +53,65 @@ async function updateLastRunMeta(jobId: string): Promise { await put(META_STORE, { jobId, lastRun: Date.now() }, jobId); } -function shouldIndex(): boolean { - const last = parseInt(localStorage.getItem(LOCK_KEY) || "0", 10); - return isNaN(last) || Date.now() - last > LOCK_TIMEOUT; +async function acquireLock(): Promise { + if (isIndexingActive) { + console.debug("[Indexer] Already indexing in this tab"); + return false; + } + + const lockId = `${Date.now()}-${Math.random()}`; + const startTime = Date.now(); + + while (Date.now() - startTime < LOCK_ACQUIRE_TIMEOUT) { + const currentLock = localStorage.getItem(LOCK_KEY); + const currentTime = Date.now(); + + if (!currentLock) { + localStorage.setItem(LOCK_KEY, lockId); + await new Promise(resolve => setTimeout(resolve, 50)); + if (localStorage.getItem(LOCK_KEY) === lockId) { + isIndexingActive = true; + return true; + } + } else { + try { + const [timestamp] = currentLock.split('-'); + const lockTime = parseInt(timestamp, 10); + if (isNaN(lockTime) || currentTime - lockTime > LOCK_TIMEOUT) { + localStorage.setItem(LOCK_KEY, lockId); + await new Promise(resolve => setTimeout(resolve, 50)); + if (localStorage.getItem(LOCK_KEY) === lockId) { + isIndexingActive = true; + return true; + } + } + } catch (e) { + console.warn("[Indexer] Error parsing lock:", e); + } + } + + await new Promise(resolve => setTimeout(resolve, 100)); + } + + return false; } function startHeartbeat() { - localStorage.setItem(LOCK_KEY, `${Date.now()}`); + const lockId = localStorage.getItem(LOCK_KEY); + if (!lockId) return; + heartbeatTimer = setInterval(() => { - localStorage.setItem(LOCK_KEY, `${Date.now()}`); + if (localStorage.getItem(LOCK_KEY)?.endsWith(lockId.split('-')[1])) { + const newLockId = `${Date.now()}-${lockId.split('-')[1]}`; + localStorage.setItem(LOCK_KEY, newLockId); + } }, HEARTBEAT_INTERVAL); } function stopHeartbeat() { if (heartbeatTimer) clearInterval(heartbeatTimer); localStorage.removeItem(LOCK_KEY); + isIndexingActive = false; } function dispatchProgress( @@ -118,9 +161,9 @@ export async function loadAllStoredItems(): Promise { } export async function runIndexing(): Promise { - if (!shouldIndex()) { + if (!(await acquireLock())) { console.debug( - "%c[Indexer] Skipping indexing (another tab has the lock)", + "%c[Indexer] Could not acquire lock - another tab is indexing or this tab is already indexing", "color: gray", ); return; @@ -131,11 +174,11 @@ export async function runIndexing(): Promise { const jobIds = Object.keys(jobs); let completedJobs = 0; - // Add an extra step for vectorization const totalSteps = jobIds.length + 1; dispatchProgress(completedJobs, totalSteps, true, "Starting jobs"); - // --- Step 1: Run Fetching/Storing Jobs (Main Thread) --- + let hasStreamingJobs = false; + for (const jobId of jobIds) { dispatchProgress( completedJobs, @@ -202,8 +245,7 @@ export async function runIndexing(): Promise { console.debug(`%c[Indexer] Running job "${jobId}"...`, "color: #4ea1ff"); try { - const newItemsRaw = await job.run(ctx); // newItemsRaw are items *returned* by the job. - // Some jobs (like messages) might add via ctx.addItem and return []. + const newItemsRaw = await job.run(ctx); const stored = await getStoredItems(); let merged = mergeItems(stored, newItemsRaw); @@ -212,6 +254,10 @@ export async function runIndexing(): Promise { await setStoredItems(merged); await updateLastRunMeta(jobId); + if (jobId === 'messages' || jobId === 'notifications') { + hasStreamingJobs = true; + } + console.debug( `%c[Indexer] ${job.label}: ${newItemsRaw.length} new items reported by run, ${merged.length} total items now in '${jobId}' store.`, "color: #00c46f", @@ -230,116 +276,124 @@ export async function runIndexing(): Promise { ); } - // --- Step 2: Delegate Vectorization to Worker (Off Main Thread) --- - // Load ALL items from the primary stores. The worker will handle deduplication against its own vector store. - const allItemsInPrimaryStores = await loadAllStoredItems(); + if (!hasStreamingJobs) { + const allItemsInPrimaryStores = await loadAllStoredItems(); - if (allItemsInPrimaryStores.length > 0) { - console.debug( - `%c[Indexer] Sending ${allItemsInPrimaryStores.length} items from primary stores to worker for vectorization check...`, - "color: #4ea1ff", - ); - dispatchProgress(completedJobs, totalSteps, true, "Starting vectorization of stored items"); + if (allItemsInPrimaryStores.length > 0) { + console.debug( + `%c[Indexer] Sending ${allItemsInPrimaryStores.length} items from primary stores to worker for vectorization check...`, + "color: #4ea1ff", + ); + dispatchProgress(completedJobs, totalSteps, true, "Starting vectorization of stored items"); - try { - const workerManager = VectorWorkerManager.getInstance(); - await workerManager.processItems(allItemsInPrimaryStores, (progress) => { - let detailMessage = progress.message || ""; - if ( - progress.status === "processing" && - progress.total && - progress.processed !== undefined - ) { - detailMessage = `Vectorizing: ${progress.processed} / ${progress.total}`; - } else if (progress.status === "complete") { - detailMessage = "Vectorization complete"; - // Mark the vectorization step as complete - completedJobs++; // Increment completion count *after* vectorization finishes - dispatchProgress( - completedJobs, - totalSteps, - false, // Indexing finished - "Indexing finished", - detailMessage - ); - } else if (progress.status === "error") { - detailMessage = `Vectorization error: ${progress.message}`; - dispatchProgress( - completedJobs, - totalSteps, - false, // Indexing stopped - "Vectorization failed", - detailMessage, - ); - } else if (progress.status === "started") { - detailMessage = `Vectorization started for ${progress.total} items`; - } else if (progress.status === "cancelled") { - detailMessage = `Vectorization cancelled: ${progress.message}`; - dispatchProgress( - completedJobs, - totalSteps, - false, // Indexing stopped - "Vectorization cancelled", - detailMessage, - ); - } - - // Update the status detail for ongoing vectorization - if (progress.status !== "complete" && progress.status !== "error" && progress.status !== "cancelled") { + try { + const workerManager = VectorWorkerManager.getInstance(); + await workerManager.processItems(allItemsInPrimaryStores, (progress) => { + let detailMessage = progress.message || ""; + if ( + progress.status === "processing" && + progress.total && + progress.processed !== undefined + ) { + detailMessage = `Vectorizing: ${progress.processed} / ${progress.total}`; + } else if (progress.status === "complete") { + detailMessage = "Vectorization complete"; + completedJobs++; dispatchProgress( - completedJobs, // Still on job completion count + completedJobs, totalSteps, - true, // Indexing still active - "Vectorization in progress", + false, + "Indexing finished", + detailMessage + ); + } else if (progress.status === "error") { + detailMessage = `Vectorization error: ${progress.message}`; + dispatchProgress( + completedJobs, + totalSteps, + false, + "Vectorization failed", detailMessage, ); - } - }); + } else if (progress.status === "started") { + detailMessage = `Vectorization started for ${progress.total} items`; + } else if (progress.status === "cancelled") { + detailMessage = `Vectorization cancelled: ${progress.message}`; + dispatchProgress( + completedJobs, + totalSteps, + false, + "Vectorization cancelled", + detailMessage, + ); + } + + if (progress.status !== "complete" && progress.status !== "error" && progress.status !== "cancelled") { + dispatchProgress( + completedJobs, + totalSteps, + true, + "Vectorization in progress", + detailMessage, + ); + } + }); + console.debug( + "%c[Indexer] Vectorization task for stored items sent to worker.", + "color: green", + ); + } catch (error) { + console.error( + `%c[Indexer] ❌ Failed to send items to vector worker:`, + "color: red", + error, + ); + dispatchProgress( + completedJobs, + totalSteps, + false, + "Vectorization failed", + String(error), + ); + } + } else { console.debug( - "%c[Indexer] Vectorization task for stored items sent to worker.", - "color: green", - ); - } catch (error) { - console.error( - `%c[Indexer] ❌ Failed to send items to vector worker:`, - "color: red", - error, + "%c[Indexer] No items found in primary stores to send for vectorization.", + "color: gray", ); + completedJobs++; dispatchProgress( completedJobs, totalSteps, - false, // Indexing stopped - "Vectorization failed", - String(error), + false, + "Indexing finished (no items for vectorization)", ); } } else { console.debug( - "%c[Indexer] No items found in primary stores to send for vectorization.", - "color: gray", + "%c[Indexer] Skipping bulk vectorization - streaming jobs will handle vectorization", + "color: #4ea1ff", ); - completedJobs++; // Count the "skipped" vectorization step + completedJobs++; dispatchProgress( completedJobs, totalSteps, - false, // Indexing finished - "Indexing finished (no items for vectorization)", + false, + "Indexing finished (streaming vectorization active)", ); } stopHeartbeat(); - // Update dynamic items with everything that's now in the primary stores - // These items are either already vectorized or will be by the worker. + const allItemsInPrimaryStores = await loadAllStoredItems(); allItemsInPrimaryStores.forEach(item => { - // Ensure job still exists for renderComponentId mapping const jobDef = jobs[item.category] || Object.values(jobs).find(j => j.id === item.category) || jobs[item.renderComponentId]; if (jobDef) { const renderComponent = renderComponentMap[jobDef.renderComponentId]; if (renderComponent) { item.renderComponent = renderComponent; } - } else if (renderComponentMap[item.renderComponentId]) { // Fallback if category doesn't match a job id directly + } else if (renderComponentMap[item.renderComponentId]) { item.renderComponent = renderComponentMap[item.renderComponentId]; } }); @@ -349,7 +403,6 @@ export async function runIndexing(): Promise { function mergeItems(existing: IndexItem[], incoming: IndexItem[]): IndexItem[] { const map = new Map(); - // Prioritize incoming items if IDs clash for (const item of existing) { if (item && item.id) map.set(item.id, item); } diff --git a/src/plugins/built-in/globalSearch/src/indexing/jobs/messages.ts b/src/plugins/built-in/globalSearch/src/indexing/jobs/messages.ts index 4a3ac355..c2ef9663 100644 --- a/src/plugins/built-in/globalSearch/src/indexing/jobs/messages.ts +++ b/src/plugins/built-in/globalSearch/src/indexing/jobs/messages.ts @@ -2,6 +2,10 @@ import type { Job, IndexItem } from "../types"; import { htmlToPlainText } from "../utils"; import { delay } from "@/seqta/utils/delay"; import { VectorWorkerManager } from "../worker/vectorWorkerManager"; +import { loadDynamicItems } from "../../utils/dynamicItems"; +import { loadAllStoredItems } from "../indexer"; +import { renderComponentMap } from "../renderComponents"; +import { jobs } from "../jobs"; const RATE_LIMIT_CONFIG = { minDelay: 50, @@ -163,13 +167,27 @@ async function processMessagesInParallel( processedItems: IndexItem[]; consecutiveExisting: number; updatedProgress: MessagesProgress; + shouldStop: boolean; }> { const processedItems: IndexItem[] = []; let consecutiveExisting = 0; const updatedProgress = { ...progress }; + // Filter out messages older than 2 years + const twoYearsAgo = Date.now() - 2 * 365 * 24 * 60 * 60 * 1000; + let shouldStop = false; + const messagesToProcess = messages.filter((msg) => { const id = msg.id.toString(); + const messageDate = new Date(msg.date).getTime(); + + // If we encounter a message older than 2 years, we should stop processing + // since messages are sorted by date descending + if (messageDate < twoYearsAgo) { + shouldStop = true; + return false; + } + if (existingIds.has(id) || processedIdsSet.has(id)) { consecutiveExisting++; return false; @@ -179,7 +197,7 @@ async function processMessagesInParallel( }); if (messagesToProcess.length === 0) { - return { processedItems, consecutiveExisting, updatedProgress }; + return { processedItems, consecutiveExisting, updatedProgress, shouldStop }; } for ( @@ -281,7 +299,7 @@ async function processMessagesInParallel( ); } - return { processedItems, consecutiveExisting, updatedProgress }; + return { processedItems, consecutiveExisting, updatedProgress, shouldStop }; } export const messagesJob: Job = { @@ -323,6 +341,7 @@ export const messagesJob: Job = { ); }, RATE_LIMIT_CONFIG.vectorBatchSize, + "messages", ); progress.streamingStarted = true; console.log( @@ -472,6 +491,7 @@ export const messagesJob: Job = { processedItems, consecutiveExisting: newConsecutiveExisting, updatedProgress, + shouldStop, } = await processMessagesInParallel( list.payload.messages, existingIds, @@ -487,11 +507,13 @@ export const messagesJob: Job = { itemsToStream.push(...processedItems); + // Update consecutive existing counter consecutiveExisting = newConsecutiveExisting; if (consecutiveExisting >= 20) { progress.done = true; } + // Stream items to vector worker if we have any if (itemsToStream.length > 0 && progress.streamingStarted) { try { await vectorWorker.streamItems(itemsToStream); @@ -507,6 +529,30 @@ export const messagesJob: Job = { } } + // Dispatch incremental search update if we processed new items + if (processedItems.length > 0) { + try { + const currentItems = await loadAllStoredItems(); + currentItems.forEach(item => { + const jobDef = jobs[item.category] || Object.values(jobs).find(j => j.id === item.category) || jobs[item.renderComponentId]; + if (jobDef) { + const renderComponent = renderComponentMap[jobDef.renderComponentId]; + if (renderComponent) { + item.renderComponent = renderComponent; + } + } else if (renderComponentMap[item.renderComponentId]) { + item.renderComponent = renderComponentMap[item.renderComponentId]; + } + }); + loadDynamicItems(currentItems); + window.dispatchEvent(new CustomEvent("dynamic-items-updated", { + detail: { incremental: true, jobId: "messages", newItemCount: processedItems.length, streaming: true } + })); + } catch (error) { + console.warn("[Messages job] Failed to dispatch incremental search update:", error); + } + } + if (!list.payload.hasMore) progress.done = true; progress.offset += progress.currentBatchSize; @@ -520,6 +566,11 @@ export const messagesJob: Job = { `[Messages job] Progress: offset=${progress.offset}, batchSize=${progress.currentBatchSize}, delay=${progress.currentDelay}ms, failures=${progress.failedRequests}, retryQueue=${progress.retryQueue.length}, vectorStreamed=${itemsStreamedToVector}, parallelRequests=${RATE_LIMIT_CONFIG.parallelRequests}`, ); } + + if (shouldStop) { + progress.done = true; + break; + } } if (progress.streamingStarted) { @@ -555,7 +606,7 @@ export const messagesJob: Job = { }, purge: (items) => { - const fourYears = Date.now() - 4 * 365 * 24 * 60 * 60 * 1000; - return items.filter((i) => i.dateAdded >= fourYears); + const twoYears = Date.now() - 2 * 365 * 24 * 60 * 60 * 1000; + return items.filter((i) => i.dateAdded >= twoYears); }, }; diff --git a/src/plugins/built-in/globalSearch/src/indexing/jobs/notifications.ts b/src/plugins/built-in/globalSearch/src/indexing/jobs/notifications.ts index be3e4185..06d617d2 100644 --- a/src/plugins/built-in/globalSearch/src/indexing/jobs/notifications.ts +++ b/src/plugins/built-in/globalSearch/src/indexing/jobs/notifications.ts @@ -3,6 +3,10 @@ import { htmlToPlainText } from "../utils"; import { fetchMessageContent } from "./messages"; import { delay } from "@/seqta/utils/delay"; import { VectorWorkerManager } from "../worker/vectorWorkerManager"; +import { loadDynamicItems } from "../../utils/dynamicItems"; +import { loadAllStoredItems } from "../indexer"; +import { renderComponentMap } from "../renderComponents"; +import { jobs } from "../jobs"; const NOTIFICATIONS_RATE_LIMIT = { baseDelay: 150, @@ -202,6 +206,7 @@ export const notificationsJob: Job = { ); }, NOTIFICATIONS_RATE_LIMIT.vectorBatchSize, + "notifications", ); progress.streamingStarted = true; console.log( @@ -366,6 +371,29 @@ export const notificationsJob: Job = { if (progressUpdateCounter >= 5) { await ctx.setProgress(progress); progressUpdateCounter = 0; + + if (items.length > 0) { + try { + const currentItems = await loadAllStoredItems(); + currentItems.forEach(item => { + const jobDef = jobs[item.category] || Object.values(jobs).find(j => j.id === item.category) || jobs[item.renderComponentId]; + if (jobDef) { + const renderComponent = renderComponentMap[jobDef.renderComponentId]; + if (renderComponent) { + item.renderComponent = renderComponent; + } + } else if (renderComponentMap[item.renderComponentId]) { + item.renderComponent = renderComponentMap[item.renderComponentId]; + } + }); + loadDynamicItems(currentItems); + window.dispatchEvent(new CustomEvent("dynamic-items-updated", { + detail: { incremental: true, jobId: "notifications", newItemCount: items.length, streaming: true } + })); + } catch (error) { + console.warn("[Notifications job] Failed to dispatch incremental search update:", error); + } + } } } diff --git a/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorker.ts b/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorker.ts index 589b6df2..d7cac934 100644 --- a/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorker.ts +++ b/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorker.ts @@ -4,6 +4,7 @@ import type { IndexItem } from "../types"; let vectorIndex: EmbeddingIndex | null = null; let isInitialized = false; let currentAbortController: AbortController | null = null; +let loadedItemIds = new Set(); // Track loaded items to prevent duplicates let streamingSession: { isActive: boolean; @@ -25,11 +26,24 @@ async function initWorker() { await initializeModel(); vectorIndex = new EmbeddingIndex([]); + // Load existing items but track them to prevent duplicates const stored = await vectorIndex.getAllObjectsFromIndexedDB(); if (stored.length > 0) { - stored.forEach((item) => vectorIndex!.add(item)); + console.debug(`Found ${stored.length} existing items in IndexedDB`); + + // Clear any existing items from memory first + loadedItemIds.clear(); + + // Add items and track their IDs + stored.forEach((item) => { + if (item.id && !loadedItemIds.has(item.id)) { + vectorIndex!.add(item); + loadedItemIds.add(item.id); + } + }); + console.debug( - `Vector index loaded ${stored.length} items from IndexedDB.`, + `Vector index loaded ${loadedItemIds.size} unique items from IndexedDB.`, ); } else { console.debug("No existing vector index found in IndexedDB."); @@ -154,16 +168,14 @@ async function processStreamingItems() { streamingSession.batchSize, ); + // Use our tracking set for more efficient deduplication const unprocessedItems = batchToProcess.filter((item) => { - try { - return !vectorIndex!.get({ id: item.id }); - } catch (e) { - return true; - } + return item.id && !loadedItemIds.has(item.id); }); if (unprocessedItems.length === 0) { streamingSession.totalProcessed += batchToProcess.length; + console.debug(`Skipped ${batchToProcess.length} already processed items`); continue; } @@ -176,15 +188,18 @@ async function processStreamingItems() { if (successfullyVectorized.length > 0) { try { - successfullyVectorized.forEach((item) => vectorIndex!.add(item)); + successfullyVectorized.forEach((item) => { + vectorIndex!.add(item); + loadedItemIds.add(item.id); // Track the added item + }); if ( - streamingSession.totalProcessed % (streamingSession.batchSize * 3) === + streamingSession.totalProcessed % (streamingSession.batchSize * 15) === 0 ) { await vectorIndex!.saveIndex("indexedDB"); console.debug( - `Saved streaming index at ${streamingSession.totalProcessed} processed items`, + `Saved streaming index at ${streamingSession.totalProcessed} processed items (${loadedItemIds.size} total unique items)`, ); } } catch (e) { @@ -199,7 +214,7 @@ async function processStreamingItems() { data: { processed: streamingSession.totalProcessed, total: streamingSession.totalExpected, - message: `Processed ${streamingSession.totalProcessed}/${streamingSession.totalExpected} items`, + message: `Processed ${streamingSession.totalProcessed}/${streamingSession.totalExpected} items (${loadedItemIds.size} unique)`, }, }); @@ -313,13 +328,10 @@ async function processItems(items: IndexItem[], signal: AbortSignal) { } } + // Use our tracking set for more efficient deduplication const unprocessedItems = items.filter((item) => { if (signal.aborted) return false; - try { - return !vectorIndex!.get({ id: item.id }); - } catch (e) { - return true; - } + return item.id && !loadedItemIds.has(item.id); }); if (signal.aborted) { @@ -335,15 +347,15 @@ async function processItems(items: IndexItem[], signal: AbortSignal) { } if (unprocessedItems.length === 0) { - console.debug("No new items to process."); + console.debug(`No new items to process. ${loadedItemIds.size} items already in index.`); self.postMessage({ type: "progress", - data: { status: "complete", message: "No new items to process" }, + data: { status: "complete", message: `No new items to process (${loadedItemIds.size} items already indexed)` }, }); return; } - console.debug(`Starting processing of ${unprocessedItems.length} items.`); + console.debug(`Starting processing of ${unprocessedItems.length} items (${items.length - unprocessedItems.length} already processed).`); self.postMessage({ type: "progress", data: { @@ -388,7 +400,10 @@ async function processItems(items: IndexItem[], signal: AbortSignal) { if (successfullyVectorized.length > 0) { try { - successfullyVectorized.forEach((item) => vectorIndex!.add(item)); + successfullyVectorized.forEach((item) => { + vectorIndex!.add(item); + loadedItemIds.add(item.id); // Track the added item + }); } catch (e) { console.error("Error adding batch to index:", e); self.postMessage({ @@ -412,7 +427,7 @@ async function processItems(items: IndexItem[], signal: AbortSignal) { try { await vectorIndex!.saveIndex("indexedDB"); - console.debug(`Saved index after processing batch ${i / BATCH_SIZE + 1}`); + console.debug(`Saved index after processing batch ${i / BATCH_SIZE + 1} (${loadedItemIds.size} total unique items)`); } catch (e) { console.error("Error saving index batch:", e); self.postMessage({ @@ -421,29 +436,68 @@ async function processItems(items: IndexItem[], signal: AbortSignal) { }); } - processedCount = Math.min(i + BATCH_SIZE, unprocessedItems.length); - + processedCount += batch.length; self.postMessage({ type: "progress", data: { status: "processing", total: unprocessedItems.length, processed: processedCount, + message: `Processed ${processedCount}/${unprocessedItems.length} items (${loadedItemIds.size} total unique)`, }, }); - - await new Promise((resolve) => setTimeout(resolve, 0)); } - if (!signal.aborted) { - console.debug("Processing completed successfully."); - self.postMessage({ - type: "progress", - data: { status: "complete", message: "All items processed successfully" }, - }); - } else { - console.debug("Processing completed, but was cancelled."); + console.debug(`Processing complete. Total unique items in index: ${loadedItemIds.size}`); + self.postMessage({ + type: "progress", + data: { + status: "complete", + total: unprocessedItems.length, + processed: processedCount, + message: `Processing complete: ${processedCount} new items processed (${loadedItemIds.size} total unique items)`, + }, + }); +} + +async function resetWorker() { + console.debug("Resetting vector worker state..."); + + // Clear tracking + loadedItemIds.clear(); + + // Reset streaming session + if (streamingSession?.isActive) { + streamingSession.isActive = false; + streamingSession = null; } + + // Reset vector index + if (vectorIndex) { + try { + // Save current state before reset + await vectorIndex.saveIndex("indexedDB"); + console.debug("Saved index before reset"); + } catch (e) { + console.warn("Error saving index before reset:", e); + } + } + + // Reinitialize + isInitialized = false; + vectorIndex = null; + + await initWorker(); + + console.debug(`Vector worker reset complete. Loaded ${loadedItemIds.size} items.`); + + self.postMessage({ + type: "progress", + data: { + status: "complete", + message: `Worker reset complete. ${loadedItemIds.size} items loaded.`, + }, + }); } self.addEventListener("message", async (e) => { @@ -475,6 +529,10 @@ self.addEventListener("message", async (e) => { await endStreamingSession(); break; + case "reset": + await resetWorker(); + break; + default: console.warn("Unknown message type:", type); } diff --git a/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorkerManager.ts b/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorkerManager.ts index 02736ecf..c016353a 100644 --- a/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorkerManager.ts +++ b/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorkerManager.ts @@ -23,12 +23,14 @@ export class VectorWorkerManager { batchBuffer: IndexItem[]; batchSize: number; flushTimer: NodeJS.Timeout | null; + jobId?: string; // Track which job owns the session } | null = null; private constructor() {} static getInstance(): VectorWorkerManager { if (!VectorWorkerManager.instance) { + console.debug("Creating new VectorWorkerManager instance"); VectorWorkerManager.instance = new VectorWorkerManager(); } return VectorWorkerManager.instance; @@ -41,16 +43,27 @@ export class VectorWorkerManager { console.debug("Lazy-loading vector worker..."); return new Promise((resolve, reject) => { + // Terminate any existing worker before creating a new one + if (this.worker) { + console.debug("Terminating existing worker before creating new one"); + this.worker.terminate(); + this.worker = null; + } + + console.debug("Creating new vector worker instance"); this.worker = vectorWorker(); console.log("Worker initialized", this.worker); const timeout = setTimeout(() => { console.error("Vector worker initialization timed out"); - this.worker?.terminate(); - this.worker = null; + if (this.worker) { + this.worker.terminate(); + this.worker = null; + } this.isInitialized = false; - this.readyPromise = null; + // Don't reset readyPromise here to prevent race conditions + // It will be reset when a new initialization is attempted reject(new Error("Worker initialization timed out")); }, 10000); @@ -76,6 +89,11 @@ export class VectorWorkerManager { if (this.streamingSession?.isActive) { this.endStreamingSession(); } + + // Dispatch search update when vectorization completes + window.dispatchEvent(new CustomEvent("dynamic-items-updated", { + detail: { incremental: true, jobId: "vectorization", vectorUpdate: true } + })); } } break; @@ -101,11 +119,43 @@ export class VectorWorkerManager { }); } + private resetWorkerState() { + console.debug("Resetting vector worker state"); + if (this.worker) { + this.worker.terminate(); + this.worker = null; + } + this.isInitialized = false; + this.readyPromise = null; + this.progressCallback = null; + if (this.streamingSession?.isActive) { + this.endStreamingSession(); + } + } + private async ensureReady() { + // If we already have a ready promise, wait for it regardless of outcome + if (this.readyPromise) { + try { + await this.readyPromise; + } catch (error) { + // If the previous initialization failed, reset state and try again + console.warn("Previous worker initialization failed, resetting state and retrying...", error); + this.resetWorkerState(); + } + } + + // Double-check if we're actually ready after waiting + if (this.isInitialized && this.worker) { + return; + } + + // If we're not ready and there's no active promise, create one if (!this.readyPromise) { console.warn("Worker not initialized, attempting init..."); this.readyPromise = this.initWorker(); } + await this.readyPromise; if (!this.isInitialized || !this.worker) { throw new Error( @@ -117,6 +167,18 @@ export class VectorWorkerManager { async processItems(items: IndexItem[], onProgress?: ProgressCallback) { await this.ensureReady(); + // Don't allow regular processing if streaming is active + if (this.streamingSession?.isActive) { + console.warn("Cannot process items while streaming session is active"); + if (onProgress) { + onProgress({ + status: "error", + message: "Cannot process items while streaming session is active" + }); + } + return; + } + this.progressCallback = onProgress || null; console.debug(`Sending ${items.length} items to worker for processing.`); @@ -131,11 +193,25 @@ export class VectorWorkerManager { totalExpectedItems: number, onProgress?: ProgressCallback, batchSize: number = 10, + jobId?: string, ): Promise { await this.ensureReady(); + // Check if another job already has an active streaming session if (this.streamingSession?.isActive) { - this.endStreamingSession(); + if (this.streamingSession.jobId !== jobId) { + console.warn(`Cannot start streaming session for job ${jobId} - job ${this.streamingSession.jobId} already has an active session`); + if (onProgress) { + onProgress({ + status: "error", + message: `Another job (${this.streamingSession.jobId}) already has an active streaming session` + }); + } + return; + } else { + console.debug(`Streaming session for job ${jobId} already active`); + return; + } } this.progressCallback = onProgress || null; @@ -147,10 +223,11 @@ export class VectorWorkerManager { batchBuffer: [], batchSize, flushTimer: null, + jobId, }; console.debug( - `Starting streaming session for ${totalExpectedItems} items with batch size ${batchSize}`, + `Starting streaming session for job ${jobId} with ${totalExpectedItems} items (batch size ${batchSize})`, ); this.worker!.postMessage({ @@ -163,7 +240,7 @@ export class VectorWorkerManager { status: "started", total: totalExpectedItems, processed: 0, - message: "Starting streaming vectorization", + message: `Starting streaming vectorization for ${jobId}`, }); } } @@ -282,17 +359,20 @@ export class VectorWorkerManager { terminate() { console.debug("Terminating Vector Worker Manager..."); + this.resetWorkerState(); + } + async resetWorker(): Promise { + console.debug("Resetting vector worker..."); + if (this.streamingSession?.isActive) { - this.endStreamingSession(); + await this.endStreamingSession(); } - - if (this.worker) { - this.worker.terminate(); - this.worker = null; - } - this.isInitialized = false; - this.readyPromise = null; - this.progressCallback = null; + + await this.ensureReady(); + + this.worker!.postMessage({ type: "reset" }); + + console.debug("Reset command sent to worker"); } }