fix: indexer saving infinite items, other improvements

This commit is contained in:
SethBurkart123
2025-05-25 22:28:40 +10:00
parent cefeac95ea
commit 854c6ea826
8 changed files with 465 additions and 152 deletions
+1 -1
View File
@@ -78,7 +78,7 @@
"codemirror": "^6.0.1",
"color": "^5.0.0",
"dompurify": "^3.2.4",
"embeddia": "^1.1.4",
"embeddia": "^1.2.1",
"embla-carousel-autoplay": "^8.5.2",
"embla-carousel-svelte": "^8.5.2",
"esbuild": "^0.25.3",
@@ -122,7 +122,6 @@
window.addEventListener('indexing-progress', progressHandler as EventListener);
const itemsUpdatedHandler = () => {
console.log('Search Bar received items-updated event, re-indexing...');
setupSearchIndexes();
performSearch();
};
@@ -387,7 +386,7 @@
</ul>
<div class="px-3 py-2 w-full border-t border-zinc-900/5 dark:border-zinc-100/5 bg-white/5">
{#if combinedResults.length > 0 || calculatorResult}
<div class="flex justify-between items-center h-5 text-sm text-zinc-500 dark:text-zinc-400">
<div class="flex justify-between items-center h-7 text-sm text-zinc-500 dark:text-zinc-400">
<div class="flex gap-4 items-center">
{@render Shortcut({ text: 'Navigate', keybind: ['↑', '↓']})}
{#if calculatorResult && selectedIndex === 0}
@@ -421,7 +420,7 @@
<div class="flex gap-2 items-center">
<div class="flex gap-1 items-center">
{#each keybind as key}
<kbd class="px-1 py-0.5 text-[0.8rem] text-center align-middle rounded min-w-6 bg-zinc-100 dark:bg-zinc-100/10">{key}</kbd>
<kbd class="size-6 text-[0.9rem] flex justify-center items-center rounded bg-zinc-100 dark:bg-zinc-100/10">{key}</kbd>
{/each}
</div>
<span>{text}</span>
@@ -13,6 +13,7 @@ import { runIndexing } from "../indexing/indexer";
import { initVectorSearch } from "../search/vector/vectorSearch";
import { cleanupSearchBar, mountSearchBar } from "./mountSearchBar";
import { IndexedDbManager } from "embeddia";
import { VectorWorkerManager } from "../indexing/worker/vectorWorkerManager";
// Platform-aware default hotkey
const getDefaultHotkey = () => {
@@ -48,6 +49,15 @@ const settings = defineSettings({
const confirmed = confirm("Are you sure you want to reset the search index and storage?");
if (confirmed) {
try {
// Reset the vector worker first
const workerManager = VectorWorkerManager.getInstance();
await workerManager.resetWorker();
console.log("Vector worker reset successfully");
} catch (e) {
console.warn("Failed to reset vector worker:", e);
}
// Delete both 'embeddiaDB' and 'betterseqta-index' using native IndexedDB APIs
const deleteDb = (dbName: string) => {
return new Promise<void>((resolve, reject) => {
@@ -116,6 +126,40 @@ const globalSearchPlugin: Plugin<typeof settings> = {
initVectorSearch();
// Add debug helpers to window for troubleshooting
// @ts-ignore
window.globalSearchDebug = {
resetWorker: async () => {
const workerManager = VectorWorkerManager.getInstance();
await workerManager.resetWorker();
console.log("Vector worker reset via debug helper");
},
checkWorkerStatus: () => {
const workerManager = VectorWorkerManager.getInstance();
console.log("Streaming active:", workerManager.isStreamingActive());
},
checkIndexedDBSize: async () => {
try {
const estimate = await navigator.storage.estimate();
console.log("Storage estimate:", estimate);
// Check embeddiaDB size
const dbRequest = indexedDB.open("embeddiaDB");
dbRequest.onsuccess = () => {
const db = dbRequest.result;
const transaction = db.transaction(["embeddiaObjectStore"], "readonly");
const store = transaction.objectStore("embeddiaObjectStore");
const countRequest = store.count();
countRequest.onsuccess = () => {
console.log("embeddiaDB item count:", countRequest.result);
};
};
} catch (e) {
console.error("Error checking storage:", e);
}
}
};
if (api.settings.runIndexingOnLoad) {
setTimeout(async () => {
await runIndexing();
@@ -9,6 +9,7 @@ const META_STORE = "meta";
const LOCK_KEY = "bsq-indexer-lock";
const HEARTBEAT_INTERVAL = 10000;
const LOCK_TIMEOUT = 20000;
const LOCK_ACQUIRE_TIMEOUT = 5000;
/* ─────────── Progressmeta helpers ─────────── */
async function loadProgress<T = any>(jobId: string): Promise<T | undefined> {
@@ -16,15 +17,13 @@ async function loadProgress<T = any>(jobId: string): Promise<T | undefined> {
return rec?.progress as T | undefined;
}
async function saveProgress<T = any>(
jobId: string,
progress: T,
): Promise<void> {
await put(META_STORE, { jobId, progress }, `progress:${jobId}`);
async function saveProgress<T = any>(jobId: string, progress: T): Promise<void> {
await put(META_STORE, { progress }, `progress:${jobId}`);
}
/* ───────────────────────────────────────────── */
let heartbeatTimer: ReturnType<typeof setInterval> | null = null;
let isIndexingActive = false;
function shouldRun(job: Job, lastRun?: number): boolean {
const now = Date.now();
@@ -54,21 +53,65 @@ async function updateLastRunMeta(jobId: string): Promise<void> {
await put(META_STORE, { jobId, lastRun: Date.now() }, jobId);
}
function shouldIndex(): boolean {
const last = parseInt(localStorage.getItem(LOCK_KEY) || "0", 10);
return isNaN(last) || Date.now() - last > LOCK_TIMEOUT;
async function acquireLock(): Promise<boolean> {
if (isIndexingActive) {
console.debug("[Indexer] Already indexing in this tab");
return false;
}
const lockId = `${Date.now()}-${Math.random()}`;
const startTime = Date.now();
while (Date.now() - startTime < LOCK_ACQUIRE_TIMEOUT) {
const currentLock = localStorage.getItem(LOCK_KEY);
const currentTime = Date.now();
if (!currentLock) {
localStorage.setItem(LOCK_KEY, lockId);
await new Promise(resolve => setTimeout(resolve, 50));
if (localStorage.getItem(LOCK_KEY) === lockId) {
isIndexingActive = true;
return true;
}
} else {
try {
const [timestamp] = currentLock.split('-');
const lockTime = parseInt(timestamp, 10);
if (isNaN(lockTime) || currentTime - lockTime > LOCK_TIMEOUT) {
localStorage.setItem(LOCK_KEY, lockId);
await new Promise(resolve => setTimeout(resolve, 50));
if (localStorage.getItem(LOCK_KEY) === lockId) {
isIndexingActive = true;
return true;
}
}
} catch (e) {
console.warn("[Indexer] Error parsing lock:", e);
}
}
await new Promise(resolve => setTimeout(resolve, 100));
}
return false;
}
function startHeartbeat() {
localStorage.setItem(LOCK_KEY, `${Date.now()}`);
const lockId = localStorage.getItem(LOCK_KEY);
if (!lockId) return;
heartbeatTimer = setInterval(() => {
localStorage.setItem(LOCK_KEY, `${Date.now()}`);
if (localStorage.getItem(LOCK_KEY)?.endsWith(lockId.split('-')[1])) {
const newLockId = `${Date.now()}-${lockId.split('-')[1]}`;
localStorage.setItem(LOCK_KEY, newLockId);
}
}, HEARTBEAT_INTERVAL);
}
function stopHeartbeat() {
if (heartbeatTimer) clearInterval(heartbeatTimer);
localStorage.removeItem(LOCK_KEY);
isIndexingActive = false;
}
function dispatchProgress(
@@ -118,9 +161,9 @@ export async function loadAllStoredItems(): Promise<IndexItem[]> {
}
export async function runIndexing(): Promise<void> {
if (!shouldIndex()) {
if (!(await acquireLock())) {
console.debug(
"%c[Indexer] Skipping indexing (another tab has the lock)",
"%c[Indexer] Could not acquire lock - another tab is indexing or this tab is already indexing",
"color: gray",
);
return;
@@ -131,11 +174,11 @@ export async function runIndexing(): Promise<void> {
const jobIds = Object.keys(jobs);
let completedJobs = 0;
// Add an extra step for vectorization
const totalSteps = jobIds.length + 1;
dispatchProgress(completedJobs, totalSteps, true, "Starting jobs");
// --- Step 1: Run Fetching/Storing Jobs (Main Thread) ---
let hasStreamingJobs = false;
for (const jobId of jobIds) {
dispatchProgress(
completedJobs,
@@ -202,8 +245,7 @@ export async function runIndexing(): Promise<void> {
console.debug(`%c[Indexer] Running job "${jobId}"...`, "color: #4ea1ff");
try {
const newItemsRaw = await job.run(ctx); // newItemsRaw are items *returned* by the job.
// Some jobs (like messages) might add via ctx.addItem and return [].
const newItemsRaw = await job.run(ctx);
const stored = await getStoredItems();
let merged = mergeItems(stored, newItemsRaw);
@@ -212,6 +254,10 @@ export async function runIndexing(): Promise<void> {
await setStoredItems(merged);
await updateLastRunMeta(jobId);
if (jobId === 'messages' || jobId === 'notifications') {
hasStreamingJobs = true;
}
console.debug(
`%c[Indexer] ${job.label}: ${newItemsRaw.length} new items reported by run, ${merged.length} total items now in '${jobId}' store.`,
"color: #00c46f",
@@ -230,116 +276,124 @@ export async function runIndexing(): Promise<void> {
);
}
// --- Step 2: Delegate Vectorization to Worker (Off Main Thread) ---
// Load ALL items from the primary stores. The worker will handle deduplication against its own vector store.
const allItemsInPrimaryStores = await loadAllStoredItems();
if (!hasStreamingJobs) {
const allItemsInPrimaryStores = await loadAllStoredItems();
if (allItemsInPrimaryStores.length > 0) {
console.debug(
`%c[Indexer] Sending ${allItemsInPrimaryStores.length} items from primary stores to worker for vectorization check...`,
"color: #4ea1ff",
);
dispatchProgress(completedJobs, totalSteps, true, "Starting vectorization of stored items");
if (allItemsInPrimaryStores.length > 0) {
console.debug(
`%c[Indexer] Sending ${allItemsInPrimaryStores.length} items from primary stores to worker for vectorization check...`,
"color: #4ea1ff",
);
dispatchProgress(completedJobs, totalSteps, true, "Starting vectorization of stored items");
try {
const workerManager = VectorWorkerManager.getInstance();
await workerManager.processItems(allItemsInPrimaryStores, (progress) => {
let detailMessage = progress.message || "";
if (
progress.status === "processing" &&
progress.total &&
progress.processed !== undefined
) {
detailMessage = `Vectorizing: ${progress.processed} / ${progress.total}`;
} else if (progress.status === "complete") {
detailMessage = "Vectorization complete";
// Mark the vectorization step as complete
completedJobs++; // Increment completion count *after* vectorization finishes
dispatchProgress(
completedJobs,
totalSteps,
false, // Indexing finished
"Indexing finished",
detailMessage
);
} else if (progress.status === "error") {
detailMessage = `Vectorization error: ${progress.message}`;
dispatchProgress(
completedJobs,
totalSteps,
false, // Indexing stopped
"Vectorization failed",
detailMessage,
);
} else if (progress.status === "started") {
detailMessage = `Vectorization started for ${progress.total} items`;
} else if (progress.status === "cancelled") {
detailMessage = `Vectorization cancelled: ${progress.message}`;
dispatchProgress(
completedJobs,
totalSteps,
false, // Indexing stopped
"Vectorization cancelled",
detailMessage,
);
}
// Update the status detail for ongoing vectorization
if (progress.status !== "complete" && progress.status !== "error" && progress.status !== "cancelled") {
try {
const workerManager = VectorWorkerManager.getInstance();
await workerManager.processItems(allItemsInPrimaryStores, (progress) => {
let detailMessage = progress.message || "";
if (
progress.status === "processing" &&
progress.total &&
progress.processed !== undefined
) {
detailMessage = `Vectorizing: ${progress.processed} / ${progress.total}`;
} else if (progress.status === "complete") {
detailMessage = "Vectorization complete";
completedJobs++;
dispatchProgress(
completedJobs, // Still on job completion count
completedJobs,
totalSteps,
true, // Indexing still active
"Vectorization in progress",
false,
"Indexing finished",
detailMessage
);
} else if (progress.status === "error") {
detailMessage = `Vectorization error: ${progress.message}`;
dispatchProgress(
completedJobs,
totalSteps,
false,
"Vectorization failed",
detailMessage,
);
}
});
} else if (progress.status === "started") {
detailMessage = `Vectorization started for ${progress.total} items`;
} else if (progress.status === "cancelled") {
detailMessage = `Vectorization cancelled: ${progress.message}`;
dispatchProgress(
completedJobs,
totalSteps,
false,
"Vectorization cancelled",
detailMessage,
);
}
if (progress.status !== "complete" && progress.status !== "error" && progress.status !== "cancelled") {
dispatchProgress(
completedJobs,
totalSteps,
true,
"Vectorization in progress",
detailMessage,
);
}
});
console.debug(
"%c[Indexer] Vectorization task for stored items sent to worker.",
"color: green",
);
} catch (error) {
console.error(
`%c[Indexer] ❌ Failed to send items to vector worker:`,
"color: red",
error,
);
dispatchProgress(
completedJobs,
totalSteps,
false,
"Vectorization failed",
String(error),
);
}
} else {
console.debug(
"%c[Indexer] Vectorization task for stored items sent to worker.",
"color: green",
);
} catch (error) {
console.error(
`%c[Indexer] ❌ Failed to send items to vector worker:`,
"color: red",
error,
"%c[Indexer] No items found in primary stores to send for vectorization.",
"color: gray",
);
completedJobs++;
dispatchProgress(
completedJobs,
totalSteps,
false, // Indexing stopped
"Vectorization failed",
String(error),
false,
"Indexing finished (no items for vectorization)",
);
}
} else {
console.debug(
"%c[Indexer] No items found in primary stores to send for vectorization.",
"color: gray",
"%c[Indexer] Skipping bulk vectorization - streaming jobs will handle vectorization",
"color: #4ea1ff",
);
completedJobs++; // Count the "skipped" vectorization step
completedJobs++;
dispatchProgress(
completedJobs,
totalSteps,
false, // Indexing finished
"Indexing finished (no items for vectorization)",
false,
"Indexing finished (streaming vectorization active)",
);
}
stopHeartbeat();
// Update dynamic items with everything that's now in the primary stores
// These items are either already vectorized or will be by the worker.
const allItemsInPrimaryStores = await loadAllStoredItems();
allItemsInPrimaryStores.forEach(item => {
// Ensure job still exists for renderComponentId mapping
const jobDef = jobs[item.category] || Object.values(jobs).find(j => j.id === item.category) || jobs[item.renderComponentId];
if (jobDef) {
const renderComponent = renderComponentMap[jobDef.renderComponentId];
if (renderComponent) {
item.renderComponent = renderComponent;
}
} else if (renderComponentMap[item.renderComponentId]) { // Fallback if category doesn't match a job id directly
} else if (renderComponentMap[item.renderComponentId]) {
item.renderComponent = renderComponentMap[item.renderComponentId];
}
});
@@ -349,7 +403,6 @@ export async function runIndexing(): Promise<void> {
function mergeItems(existing: IndexItem[], incoming: IndexItem[]): IndexItem[] {
const map = new Map<string, IndexItem>();
// Prioritize incoming items if IDs clash
for (const item of existing) {
if (item && item.id) map.set(item.id, item);
}
@@ -2,6 +2,10 @@ import type { Job, IndexItem } from "../types";
import { htmlToPlainText } from "../utils";
import { delay } from "@/seqta/utils/delay";
import { VectorWorkerManager } from "../worker/vectorWorkerManager";
import { loadDynamicItems } from "../../utils/dynamicItems";
import { loadAllStoredItems } from "../indexer";
import { renderComponentMap } from "../renderComponents";
import { jobs } from "../jobs";
const RATE_LIMIT_CONFIG = {
minDelay: 50,
@@ -163,13 +167,27 @@ async function processMessagesInParallel(
processedItems: IndexItem[];
consecutiveExisting: number;
updatedProgress: MessagesProgress;
shouldStop: boolean;
}> {
const processedItems: IndexItem[] = [];
let consecutiveExisting = 0;
const updatedProgress = { ...progress };
// Filter out messages older than 2 years
const twoYearsAgo = Date.now() - 2 * 365 * 24 * 60 * 60 * 1000;
let shouldStop = false;
const messagesToProcess = messages.filter((msg) => {
const id = msg.id.toString();
const messageDate = new Date(msg.date).getTime();
// If we encounter a message older than 2 years, we should stop processing
// since messages are sorted by date descending
if (messageDate < twoYearsAgo) {
shouldStop = true;
return false;
}
if (existingIds.has(id) || processedIdsSet.has(id)) {
consecutiveExisting++;
return false;
@@ -179,7 +197,7 @@ async function processMessagesInParallel(
});
if (messagesToProcess.length === 0) {
return { processedItems, consecutiveExisting, updatedProgress };
return { processedItems, consecutiveExisting, updatedProgress, shouldStop };
}
for (
@@ -281,7 +299,7 @@ async function processMessagesInParallel(
);
}
return { processedItems, consecutiveExisting, updatedProgress };
return { processedItems, consecutiveExisting, updatedProgress, shouldStop };
}
export const messagesJob: Job = {
@@ -323,6 +341,7 @@ export const messagesJob: Job = {
);
},
RATE_LIMIT_CONFIG.vectorBatchSize,
"messages",
);
progress.streamingStarted = true;
console.log(
@@ -472,6 +491,7 @@ export const messagesJob: Job = {
processedItems,
consecutiveExisting: newConsecutiveExisting,
updatedProgress,
shouldStop,
} = await processMessagesInParallel(
list.payload.messages,
existingIds,
@@ -487,11 +507,13 @@ export const messagesJob: Job = {
itemsToStream.push(...processedItems);
// Update consecutive existing counter
consecutiveExisting = newConsecutiveExisting;
if (consecutiveExisting >= 20) {
progress.done = true;
}
// Stream items to vector worker if we have any
if (itemsToStream.length > 0 && progress.streamingStarted) {
try {
await vectorWorker.streamItems(itemsToStream);
@@ -507,6 +529,30 @@ export const messagesJob: Job = {
}
}
// Dispatch incremental search update if we processed new items
if (processedItems.length > 0) {
try {
const currentItems = await loadAllStoredItems();
currentItems.forEach(item => {
const jobDef = jobs[item.category] || Object.values(jobs).find(j => j.id === item.category) || jobs[item.renderComponentId];
if (jobDef) {
const renderComponent = renderComponentMap[jobDef.renderComponentId];
if (renderComponent) {
item.renderComponent = renderComponent;
}
} else if (renderComponentMap[item.renderComponentId]) {
item.renderComponent = renderComponentMap[item.renderComponentId];
}
});
loadDynamicItems(currentItems);
window.dispatchEvent(new CustomEvent("dynamic-items-updated", {
detail: { incremental: true, jobId: "messages", newItemCount: processedItems.length, streaming: true }
}));
} catch (error) {
console.warn("[Messages job] Failed to dispatch incremental search update:", error);
}
}
if (!list.payload.hasMore) progress.done = true;
progress.offset += progress.currentBatchSize;
@@ -520,6 +566,11 @@ export const messagesJob: Job = {
`[Messages job] Progress: offset=${progress.offset}, batchSize=${progress.currentBatchSize}, delay=${progress.currentDelay}ms, failures=${progress.failedRequests}, retryQueue=${progress.retryQueue.length}, vectorStreamed=${itemsStreamedToVector}, parallelRequests=${RATE_LIMIT_CONFIG.parallelRequests}`,
);
}
if (shouldStop) {
progress.done = true;
break;
}
}
if (progress.streamingStarted) {
@@ -555,7 +606,7 @@ export const messagesJob: Job = {
},
purge: (items) => {
const fourYears = Date.now() - 4 * 365 * 24 * 60 * 60 * 1000;
return items.filter((i) => i.dateAdded >= fourYears);
const twoYears = Date.now() - 2 * 365 * 24 * 60 * 60 * 1000;
return items.filter((i) => i.dateAdded >= twoYears);
},
};
@@ -3,6 +3,10 @@ import { htmlToPlainText } from "../utils";
import { fetchMessageContent } from "./messages";
import { delay } from "@/seqta/utils/delay";
import { VectorWorkerManager } from "../worker/vectorWorkerManager";
import { loadDynamicItems } from "../../utils/dynamicItems";
import { loadAllStoredItems } from "../indexer";
import { renderComponentMap } from "../renderComponents";
import { jobs } from "../jobs";
const NOTIFICATIONS_RATE_LIMIT = {
baseDelay: 150,
@@ -202,6 +206,7 @@ export const notificationsJob: Job = {
);
},
NOTIFICATIONS_RATE_LIMIT.vectorBatchSize,
"notifications",
);
progress.streamingStarted = true;
console.log(
@@ -366,6 +371,29 @@ export const notificationsJob: Job = {
if (progressUpdateCounter >= 5) {
await ctx.setProgress(progress);
progressUpdateCounter = 0;
if (items.length > 0) {
try {
const currentItems = await loadAllStoredItems();
currentItems.forEach(item => {
const jobDef = jobs[item.category] || Object.values(jobs).find(j => j.id === item.category) || jobs[item.renderComponentId];
if (jobDef) {
const renderComponent = renderComponentMap[jobDef.renderComponentId];
if (renderComponent) {
item.renderComponent = renderComponent;
}
} else if (renderComponentMap[item.renderComponentId]) {
item.renderComponent = renderComponentMap[item.renderComponentId];
}
});
loadDynamicItems(currentItems);
window.dispatchEvent(new CustomEvent("dynamic-items-updated", {
detail: { incremental: true, jobId: "notifications", newItemCount: items.length, streaming: true }
}));
} catch (error) {
console.warn("[Notifications job] Failed to dispatch incremental search update:", error);
}
}
}
}
@@ -4,6 +4,7 @@ import type { IndexItem } from "../types";
let vectorIndex: EmbeddingIndex | null = null;
let isInitialized = false;
let currentAbortController: AbortController | null = null;
let loadedItemIds = new Set<string>(); // Track loaded items to prevent duplicates
let streamingSession: {
isActive: boolean;
@@ -25,11 +26,24 @@ async function initWorker() {
await initializeModel();
vectorIndex = new EmbeddingIndex([]);
// Load existing items but track them to prevent duplicates
const stored = await vectorIndex.getAllObjectsFromIndexedDB();
if (stored.length > 0) {
stored.forEach((item) => vectorIndex!.add(item));
console.debug(`Found ${stored.length} existing items in IndexedDB`);
// Clear any existing items from memory first
loadedItemIds.clear();
// Add items and track their IDs
stored.forEach((item) => {
if (item.id && !loadedItemIds.has(item.id)) {
vectorIndex!.add(item);
loadedItemIds.add(item.id);
}
});
console.debug(
`Vector index loaded ${stored.length} items from IndexedDB.`,
`Vector index loaded ${loadedItemIds.size} unique items from IndexedDB.`,
);
} else {
console.debug("No existing vector index found in IndexedDB.");
@@ -154,16 +168,14 @@ async function processStreamingItems() {
streamingSession.batchSize,
);
// Use our tracking set for more efficient deduplication
const unprocessedItems = batchToProcess.filter((item) => {
try {
return !vectorIndex!.get({ id: item.id });
} catch (e) {
return true;
}
return item.id && !loadedItemIds.has(item.id);
});
if (unprocessedItems.length === 0) {
streamingSession.totalProcessed += batchToProcess.length;
console.debug(`Skipped ${batchToProcess.length} already processed items`);
continue;
}
@@ -176,15 +188,18 @@ async function processStreamingItems() {
if (successfullyVectorized.length > 0) {
try {
successfullyVectorized.forEach((item) => vectorIndex!.add(item));
successfullyVectorized.forEach((item) => {
vectorIndex!.add(item);
loadedItemIds.add(item.id); // Track the added item
});
if (
streamingSession.totalProcessed % (streamingSession.batchSize * 3) ===
streamingSession.totalProcessed % (streamingSession.batchSize * 15) ===
0
) {
await vectorIndex!.saveIndex("indexedDB");
console.debug(
`Saved streaming index at ${streamingSession.totalProcessed} processed items`,
`Saved streaming index at ${streamingSession.totalProcessed} processed items (${loadedItemIds.size} total unique items)`,
);
}
} catch (e) {
@@ -199,7 +214,7 @@ async function processStreamingItems() {
data: {
processed: streamingSession.totalProcessed,
total: streamingSession.totalExpected,
message: `Processed ${streamingSession.totalProcessed}/${streamingSession.totalExpected} items`,
message: `Processed ${streamingSession.totalProcessed}/${streamingSession.totalExpected} items (${loadedItemIds.size} unique)`,
},
});
@@ -313,13 +328,10 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
}
}
// Use our tracking set for more efficient deduplication
const unprocessedItems = items.filter((item) => {
if (signal.aborted) return false;
try {
return !vectorIndex!.get({ id: item.id });
} catch (e) {
return true;
}
return item.id && !loadedItemIds.has(item.id);
});
if (signal.aborted) {
@@ -335,15 +347,15 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
}
if (unprocessedItems.length === 0) {
console.debug("No new items to process.");
console.debug(`No new items to process. ${loadedItemIds.size} items already in index.`);
self.postMessage({
type: "progress",
data: { status: "complete", message: "No new items to process" },
data: { status: "complete", message: `No new items to process (${loadedItemIds.size} items already indexed)` },
});
return;
}
console.debug(`Starting processing of ${unprocessedItems.length} items.`);
console.debug(`Starting processing of ${unprocessedItems.length} items (${items.length - unprocessedItems.length} already processed).`);
self.postMessage({
type: "progress",
data: {
@@ -388,7 +400,10 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
if (successfullyVectorized.length > 0) {
try {
successfullyVectorized.forEach((item) => vectorIndex!.add(item));
successfullyVectorized.forEach((item) => {
vectorIndex!.add(item);
loadedItemIds.add(item.id); // Track the added item
});
} catch (e) {
console.error("Error adding batch to index:", e);
self.postMessage({
@@ -412,7 +427,7 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
try {
await vectorIndex!.saveIndex("indexedDB");
console.debug(`Saved index after processing batch ${i / BATCH_SIZE + 1}`);
console.debug(`Saved index after processing batch ${i / BATCH_SIZE + 1} (${loadedItemIds.size} total unique items)`);
} catch (e) {
console.error("Error saving index batch:", e);
self.postMessage({
@@ -421,29 +436,68 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
});
}
processedCount = Math.min(i + BATCH_SIZE, unprocessedItems.length);
processedCount += batch.length;
self.postMessage({
type: "progress",
data: {
status: "processing",
total: unprocessedItems.length,
processed: processedCount,
message: `Processed ${processedCount}/${unprocessedItems.length} items (${loadedItemIds.size} total unique)`,
},
});
await new Promise((resolve) => setTimeout(resolve, 0));
}
if (!signal.aborted) {
console.debug("Processing completed successfully.");
self.postMessage({
type: "progress",
data: { status: "complete", message: "All items processed successfully" },
});
} else {
console.debug("Processing completed, but was cancelled.");
console.debug(`Processing complete. Total unique items in index: ${loadedItemIds.size}`);
self.postMessage({
type: "progress",
data: {
status: "complete",
total: unprocessedItems.length,
processed: processedCount,
message: `Processing complete: ${processedCount} new items processed (${loadedItemIds.size} total unique items)`,
},
});
}
async function resetWorker() {
console.debug("Resetting vector worker state...");
// Clear tracking
loadedItemIds.clear();
// Reset streaming session
if (streamingSession?.isActive) {
streamingSession.isActive = false;
streamingSession = null;
}
// Reset vector index
if (vectorIndex) {
try {
// Save current state before reset
await vectorIndex.saveIndex("indexedDB");
console.debug("Saved index before reset");
} catch (e) {
console.warn("Error saving index before reset:", e);
}
}
// Reinitialize
isInitialized = false;
vectorIndex = null;
await initWorker();
console.debug(`Vector worker reset complete. Loaded ${loadedItemIds.size} items.`);
self.postMessage({
type: "progress",
data: {
status: "complete",
message: `Worker reset complete. ${loadedItemIds.size} items loaded.`,
},
});
}
self.addEventListener("message", async (e) => {
@@ -475,6 +529,10 @@ self.addEventListener("message", async (e) => {
await endStreamingSession();
break;
case "reset":
await resetWorker();
break;
default:
console.warn("Unknown message type:", type);
}
@@ -23,12 +23,14 @@ export class VectorWorkerManager {
batchBuffer: IndexItem[];
batchSize: number;
flushTimer: NodeJS.Timeout | null;
jobId?: string; // Track which job owns the session
} | null = null;
private constructor() {}
static getInstance(): VectorWorkerManager {
if (!VectorWorkerManager.instance) {
console.debug("Creating new VectorWorkerManager instance");
VectorWorkerManager.instance = new VectorWorkerManager();
}
return VectorWorkerManager.instance;
@@ -41,16 +43,27 @@ export class VectorWorkerManager {
console.debug("Lazy-loading vector worker...");
return new Promise<void>((resolve, reject) => {
// Terminate any existing worker before creating a new one
if (this.worker) {
console.debug("Terminating existing worker before creating new one");
this.worker.terminate();
this.worker = null;
}
console.debug("Creating new vector worker instance");
this.worker = vectorWorker();
console.log("Worker initialized", this.worker);
const timeout = setTimeout(() => {
console.error("Vector worker initialization timed out");
this.worker?.terminate();
this.worker = null;
if (this.worker) {
this.worker.terminate();
this.worker = null;
}
this.isInitialized = false;
this.readyPromise = null;
// Don't reset readyPromise here to prevent race conditions
// It will be reset when a new initialization is attempted
reject(new Error("Worker initialization timed out"));
}, 10000);
@@ -76,6 +89,11 @@ export class VectorWorkerManager {
if (this.streamingSession?.isActive) {
this.endStreamingSession();
}
// Dispatch search update when vectorization completes
window.dispatchEvent(new CustomEvent("dynamic-items-updated", {
detail: { incremental: true, jobId: "vectorization", vectorUpdate: true }
}));
}
}
break;
@@ -101,11 +119,43 @@ export class VectorWorkerManager {
});
}
private resetWorkerState() {
console.debug("Resetting vector worker state");
if (this.worker) {
this.worker.terminate();
this.worker = null;
}
this.isInitialized = false;
this.readyPromise = null;
this.progressCallback = null;
if (this.streamingSession?.isActive) {
this.endStreamingSession();
}
}
private async ensureReady() {
// If we already have a ready promise, wait for it regardless of outcome
if (this.readyPromise) {
try {
await this.readyPromise;
} catch (error) {
// If the previous initialization failed, reset state and try again
console.warn("Previous worker initialization failed, resetting state and retrying...", error);
this.resetWorkerState();
}
}
// Double-check if we're actually ready after waiting
if (this.isInitialized && this.worker) {
return;
}
// If we're not ready and there's no active promise, create one
if (!this.readyPromise) {
console.warn("Worker not initialized, attempting init...");
this.readyPromise = this.initWorker();
}
await this.readyPromise;
if (!this.isInitialized || !this.worker) {
throw new Error(
@@ -117,6 +167,18 @@ export class VectorWorkerManager {
async processItems(items: IndexItem[], onProgress?: ProgressCallback) {
await this.ensureReady();
// Don't allow regular processing if streaming is active
if (this.streamingSession?.isActive) {
console.warn("Cannot process items while streaming session is active");
if (onProgress) {
onProgress({
status: "error",
message: "Cannot process items while streaming session is active"
});
}
return;
}
this.progressCallback = onProgress || null;
console.debug(`Sending ${items.length} items to worker for processing.`);
@@ -131,11 +193,25 @@ export class VectorWorkerManager {
totalExpectedItems: number,
onProgress?: ProgressCallback,
batchSize: number = 10,
jobId?: string,
): Promise<void> {
await this.ensureReady();
// Check if another job already has an active streaming session
if (this.streamingSession?.isActive) {
this.endStreamingSession();
if (this.streamingSession.jobId !== jobId) {
console.warn(`Cannot start streaming session for job ${jobId} - job ${this.streamingSession.jobId} already has an active session`);
if (onProgress) {
onProgress({
status: "error",
message: `Another job (${this.streamingSession.jobId}) already has an active streaming session`
});
}
return;
} else {
console.debug(`Streaming session for job ${jobId} already active`);
return;
}
}
this.progressCallback = onProgress || null;
@@ -147,10 +223,11 @@ export class VectorWorkerManager {
batchBuffer: [],
batchSize,
flushTimer: null,
jobId,
};
console.debug(
`Starting streaming session for ${totalExpectedItems} items with batch size ${batchSize}`,
`Starting streaming session for job ${jobId} with ${totalExpectedItems} items (batch size ${batchSize})`,
);
this.worker!.postMessage({
@@ -163,7 +240,7 @@ export class VectorWorkerManager {
status: "started",
total: totalExpectedItems,
processed: 0,
message: "Starting streaming vectorization",
message: `Starting streaming vectorization for ${jobId}`,
});
}
}
@@ -282,17 +359,20 @@ export class VectorWorkerManager {
terminate() {
console.debug("Terminating Vector Worker Manager...");
this.resetWorkerState();
}
async resetWorker(): Promise<void> {
console.debug("Resetting vector worker...");
if (this.streamingSession?.isActive) {
this.endStreamingSession();
await this.endStreamingSession();
}
if (this.worker) {
this.worker.terminate();
this.worker = null;
}
this.isInitialized = false;
this.readyPromise = null;
this.progressCallback = null;
await this.ensureReady();
this.worker!.postMessage({ type: "reset" });
console.debug("Reset command sent to worker");
}
}