From cefeac95ea992dfcd7a5736fc7e25e835e694089 Mon Sep 17 00:00:00 2001 From: SethBurkart123 Date: Sun, 25 May 2025 20:11:45 +1000 Subject: [PATCH] feat: major indexing performance improvements + visual fixes --- src/interface/components/HotkeyInput.svelte | 6 +- .../built-in/globalSearch/src/core/index.ts | 3 +- .../src/indexing/jobs/messages.ts | 549 ++++++++++++++++-- .../src/indexing/jobs/notifications.ts | 455 ++++++++++++--- .../src/indexing/jobs/subjects.ts | 5 +- .../src/indexing/worker/vectorWorker.ts | 455 +++++++++------ .../indexing/worker/vectorWorkerManager.ts | 321 +++++----- .../globalSearch/src/utils/rateLimiter.ts | 230 ++++++++ 8 files changed, 1536 insertions(+), 488 deletions(-) create mode 100644 src/plugins/built-in/globalSearch/src/utils/rateLimiter.ts diff --git a/src/interface/components/HotkeyInput.svelte b/src/interface/components/HotkeyInput.svelte index f575259c..11322d7b 100644 --- a/src/interface/components/HotkeyInput.svelte +++ b/src/interface/components/HotkeyInput.svelte @@ -171,15 +171,13 @@ {#if isRecording}
- {#if hotkeyParts.length === 0} - Press keys... - {/if} + Press keys...
{:else if hotkeyParts.length > 0} diff --git a/src/plugins/built-in/globalSearch/src/core/index.ts b/src/plugins/built-in/globalSearch/src/core/index.ts index ab04a008..71ed2369 100644 --- a/src/plugins/built-in/globalSearch/src/core/index.ts +++ b/src/plugins/built-in/globalSearch/src/core/index.ts @@ -55,8 +55,7 @@ const settings = defineSettings({ req.onsuccess = () => resolve(); req.onerror = () => reject(req.error); req.onblocked = () => { - alert(`Please close all other tabs using this app to reset the database: ${dbName}`); - reject(new Error('Delete blocked')); + reject(new Error(`One database is open, failed to remove: ${dbName}`)); }; }); }; diff --git a/src/plugins/built-in/globalSearch/src/indexing/jobs/messages.ts b/src/plugins/built-in/globalSearch/src/indexing/jobs/messages.ts index 75da72e7..4a3ac355 100644 --- a/src/plugins/built-in/globalSearch/src/indexing/jobs/messages.ts +++ b/src/plugins/built-in/globalSearch/src/indexing/jobs/messages.ts @@ -1,5 +1,35 @@ import type { Job, IndexItem } from "../types"; import { htmlToPlainText } from "../utils"; +import { delay } from "@/seqta/utils/delay"; +import { VectorWorkerManager } from "../worker/vectorWorkerManager"; + +const RATE_LIMIT_CONFIG = { + minDelay: 50, + maxDelay: 5000, + baseDelay: 200, + backoffMultiplier: 1.5, + maxRetries: 3, + adaptiveBatchSize: true, + minBatchSize: 10, + maxBatchSize: 100, + baseBatchSize: 50, + vectorBatchSize: 5, + parallelRequests: 5, + parallelDelay: 100, +}; + +interface MessagesProgress { + offset: number; + done: boolean; + currentBatchSize: number; + currentDelay: number; + failedRequests: number; + lastSuccessTime: number; + retryQueue: number[]; + processedIds: string[]; + streamingStarted: boolean; + totalEstimated: number; +} const fetchMessages = async (offset = 0, limit = 100) => { const res = await fetch(`${location.origin}/seqta/student/load/message`, { @@ -23,22 +53,235 @@ const fetchMessages = async (offset = 0, limit = 100) => { }>; }; -export const fetchMessageContent = async (id: number) => { - const res = await fetch(`${location.origin}/seqta/student/load/message`, { - method: "POST", - credentials: "include", - headers: { "Content-Type": "application/json; charset=utf-8" }, - body: JSON.stringify({ action: "message", id }), - }); - return res.json() as Promise<{ - payload: { contents: string }; - status: string; - }>; +export const fetchMessageContent = async ( + id: number, + retryCount = 0, +): Promise<{ + payload: { contents: string }; + status: string; +} | null> => { + try { + const res = await fetch(`${location.origin}/seqta/student/load/message`, { + method: "POST", + credentials: "include", + headers: { "Content-Type": "application/json; charset=utf-8" }, + body: JSON.stringify({ action: "message", id }), + }); + + if (!res.ok) { + throw new Error(`HTTP ${res.status}: ${res.statusText}`); + } + + return await res.json(); + } catch (error) { + console.warn( + `[Messages job] Failed to fetch content for message ${id} (attempt ${retryCount + 1}):`, + error, + ); + + if (retryCount < RATE_LIMIT_CONFIG.maxRetries) { + const retryDelay = + RATE_LIMIT_CONFIG.baseDelay * + Math.pow(RATE_LIMIT_CONFIG.backoffMultiplier, retryCount); + await delay(Math.min(retryDelay, RATE_LIMIT_CONFIG.maxDelay)); + return fetchMessageContent(id, retryCount + 1); + } + + return null; + } }; -interface MessagesProgress { - offset: number; - done: boolean; +function calculateAdaptiveDelay( + progress: MessagesProgress, + responseTime: number, +): number { + const { currentDelay, failedRequests, lastSuccessTime } = progress; + const timeSinceLastSuccess = Date.now() - lastSuccessTime; + + if (failedRequests > 0 || responseTime > 2000) { + return Math.min( + currentDelay * RATE_LIMIT_CONFIG.backoffMultiplier, + RATE_LIMIT_CONFIG.maxDelay, + ); + } + + if (responseTime < 500 && timeSinceLastSuccess > 10000) { + return Math.max(currentDelay * 0.8, RATE_LIMIT_CONFIG.minDelay); + } + + return currentDelay; +} + +function calculateAdaptiveBatchSize( + progress: MessagesProgress, + responseTime: number, +): number { + if (!RATE_LIMIT_CONFIG.adaptiveBatchSize) { + return progress.currentBatchSize; + } + + const { currentBatchSize, failedRequests } = progress; + + if (failedRequests > 2 || responseTime > 3000) { + return Math.max( + Math.floor(currentBatchSize * 0.7), + RATE_LIMIT_CONFIG.minBatchSize, + ); + } + + if (failedRequests === 0 && responseTime < 1000) { + return Math.min( + Math.floor(currentBatchSize * 1.2), + RATE_LIMIT_CONFIG.maxBatchSize, + ); + } + + return currentBatchSize; +} + +async function estimateMessageCount(): Promise { + try { + const firstBatch = await fetchMessages(0, 100); + if (firstBatch.status !== "200" || !firstBatch.payload.hasMore) { + return firstBatch.payload.messages.length; + } + + return Math.min(firstBatch.payload.messages.length * 20, 2000); + } catch (error) { + console.warn("[Messages job] Failed to estimate message count:", error); + return 500; + } +} + +async function processMessagesInParallel( + messages: any[], + existingIds: Set, + processedIdsSet: Set, + progress: MessagesProgress, + ctx: any, +): Promise<{ + processedItems: IndexItem[]; + consecutiveExisting: number; + updatedProgress: MessagesProgress; +}> { + const processedItems: IndexItem[] = []; + let consecutiveExisting = 0; + const updatedProgress = { ...progress }; + + const messagesToProcess = messages.filter((msg) => { + const id = msg.id.toString(); + if (existingIds.has(id) || processedIdsSet.has(id)) { + consecutiveExisting++; + return false; + } + consecutiveExisting = 0; + return true; + }); + + if (messagesToProcess.length === 0) { + return { processedItems, consecutiveExisting, updatedProgress }; + } + + for ( + let i = 0; + i < messagesToProcess.length; + i += RATE_LIMIT_CONFIG.parallelRequests + ) { + const batch = messagesToProcess.slice( + i, + i + RATE_LIMIT_CONFIG.parallelRequests, + ); + + if (i > 0) { + await delay( + Math.max(updatedProgress.currentDelay, RATE_LIMIT_CONFIG.parallelDelay), + ); + } + + const batchStartTime = Date.now(); + + const batchPromises = batch.map(async (msg) => { + const id = msg.id.toString(); + + try { + const full = await fetchMessageContent(msg.id); + const responseTime = Date.now() - batchStartTime; + + if (full && full.status === "200") { + const item: IndexItem = { + id, + text: msg.subject, + category: "messages", + content: `${htmlToPlainText(full.payload.contents)}\nFrom: ${msg.sender}`, + dateAdded: new Date(msg.date).getTime(), + metadata: { + messageId: msg.id, + author: msg.sender, + senderId: msg.sender_id, + senderType: msg.sender_type, + timestamp: msg.date, + hasAttachments: msg.attachments, + attachmentCount: msg.attachmentCount, + read: msg.read === 1, + }, + actionId: "message", + renderComponentId: "message", + }; + + return { success: true, item, id, responseTime }; + } else { + return { success: false, id, messageId: msg.id, responseTime }; + } + } catch (error) { + console.error(`[Messages job] content fetch failed (id ${id}):`, error); + return { success: false, id, messageId: msg.id, error }; + } + }); + + const batchResults = await Promise.all(batchPromises); + const batchResponseTime = Date.now() - batchStartTime; + + let batchSuccesses = 0; + let batchFailures = 0; + + for (const result of batchResults) { + if (result.success && result.item) { + await ctx.addItem(result.item); + existingIds.add(result.id); + processedIdsSet.add(result.id); + processedItems.push(result.item); + batchSuccesses++; + } else { + if (updatedProgress.retryQueue.length < 50 && result.messageId) { + updatedProgress.retryQueue.push(result.messageId); + } + batchFailures++; + } + } + + if (batchSuccesses > 0) { + updatedProgress.lastSuccessTime = Date.now(); + updatedProgress.failedRequests = Math.max( + 0, + updatedProgress.failedRequests - batchSuccesses, + ); + } + + if (batchFailures > 0) { + updatedProgress.failedRequests += batchFailures; + } + + updatedProgress.currentDelay = calculateAdaptiveDelay( + updatedProgress, + batchResponseTime, + ); + + console.log( + `[Messages job] Processed parallel batch: ${batchSuccesses} successes, ${batchFailures} failures, ${batchResponseTime}ms total time`, + ); + } + + return { processedItems, consecutiveExisting, updatedProgress }; } export const messagesJob: Job = { @@ -48,79 +291,265 @@ export const messagesJob: Job = { frequency: { type: "expiry", afterMs: 1000 * 60 * 60 * 24 }, run: async (ctx) => { - const limit = 100; const progress = (await ctx.getProgress()) ?? { offset: 0, done: false, + currentBatchSize: RATE_LIMIT_CONFIG.baseBatchSize, + currentDelay: RATE_LIMIT_CONFIG.baseDelay, + failedRequests: 0, + lastSuccessTime: Date.now(), + retryQueue: [], + processedIds: [], + streamingStarted: false, + totalEstimated: 0, }; const existingIds = new Set((await ctx.getStoredItems()).map((i) => i.id)); + const processedIdsSet = new Set(progress.processedIds); + + existingIds.forEach((id) => processedIdsSet.add(id)); + + const vectorWorker = VectorWorkerManager.getInstance(); + if (!progress.streamingStarted) { + progress.totalEstimated = await estimateMessageCount(); + + try { + await vectorWorker.startStreamingSession( + progress.totalEstimated, + (progressData) => { + console.log( + `[Messages job] Vector streaming progress: ${progressData.processed}/${progressData.total} (${progressData.status})`, + ); + }, + RATE_LIMIT_CONFIG.vectorBatchSize, + ); + progress.streamingStarted = true; + console.log( + `[Messages job] Started streaming vectorization session for ~${progress.totalEstimated} items`, + ); + } catch (error) { + console.warn( + "[Messages job] Failed to start streaming session:", + error, + ); + } + } + let consecutiveExisting = 0; + let requestStartTime = 0; + let progressUpdateCounter = 0; + let itemsStreamedToVector = 0; + + if (progress.retryQueue.length > 0) { + console.log( + `[Messages job] Processing ${Math.min(progress.retryQueue.length, 10)} items from retry queue`, + ); + + const retryBatch = progress.retryQueue.slice(0, 10); + const retryBatches = []; + + for ( + let i = 0; + i < retryBatch.length; + i += RATE_LIMIT_CONFIG.parallelRequests + ) { + retryBatches.push( + retryBatch.slice(i, i + RATE_LIMIT_CONFIG.parallelRequests), + ); + } + + for (const batch of retryBatches) { + await delay(progress.currentDelay); + const batchStartTime = Date.now(); + + const retryPromises = batch.map(async (messageId) => { + const id = messageId.toString(); + + if (processedIdsSet.has(id)) { + return { success: true, messageId, alreadyProcessed: true }; + } + + try { + const full = await fetchMessageContent(messageId); + const responseTime = Date.now() - batchStartTime; + + if (full && full.status === "200") { + return { success: true, messageId, responseTime }; + } else { + return { success: false, messageId, responseTime }; + } + } catch (error) { + console.error( + `[Messages job] Retry failed for message ${messageId}:`, + error, + ); + return { success: false, messageId, error }; + } + }); + + const retryResults = await Promise.all(retryPromises); + const batchResponseTime = Date.now() - batchStartTime; + + let retrySuccesses = 0; + let retryFailures = 0; + + for (const result of retryResults) { + if (result.success) { + if (!result.alreadyProcessed) { + processedIdsSet.add(result.messageId.toString()); + retrySuccesses++; + } + progress.retryQueue = progress.retryQueue.filter( + (mid) => mid !== result.messageId, + ); + } else { + retryFailures++; + } + } + + if (retrySuccesses > 0) { + progress.lastSuccessTime = Date.now(); + progress.failedRequests = Math.max( + 0, + progress.failedRequests - retrySuccesses, + ); + } + + if (retryFailures > 0) { + progress.failedRequests += retryFailures; + } + + progress.currentDelay = calculateAdaptiveDelay( + progress, + batchResponseTime, + ); + + console.log( + `[Messages job] Processed retry batch: ${retrySuccesses} successes, ${retryFailures} failures`, + ); + } + } while (!progress.done) { + await delay(progress.currentDelay); + requestStartTime = Date.now(); + let list; try { - list = await fetchMessages(progress.offset, limit); + list = await fetchMessages(progress.offset, progress.currentBatchSize); + const responseTime = Date.now() - requestStartTime; + + progress.currentDelay = calculateAdaptiveDelay(progress, responseTime); + progress.currentBatchSize = calculateAdaptiveBatchSize( + progress, + responseTime, + ); } catch (e) { console.error("[Messages job] list fetch failed:", e); + progress.failedRequests++; + progress.currentDelay = Math.min( + progress.currentDelay * RATE_LIMIT_CONFIG.backoffMultiplier, + RATE_LIMIT_CONFIG.maxDelay, + ); + + progress.processedIds = Array.from(processedIdsSet); + await ctx.setProgress(progress); break; } - if (list.status !== "200") break; + if (list.status !== "200") { + progress.failedRequests++; - for (const msg of list.payload.messages) { - const id = msg.id.toString(); + progress.processedIds = Array.from(processedIdsSet); + await ctx.setProgress(progress); + break; + } - if (existingIds.has(id)) { - consecutiveExisting += 1; - if (consecutiveExisting >= 20) { - progress.done = true; - break; - } - continue; - } - consecutiveExisting = 0; + const itemsToStream: IndexItem[] = []; - let full; + const { + processedItems, + consecutiveExisting: newConsecutiveExisting, + updatedProgress, + } = await processMessagesInParallel( + list.payload.messages, + existingIds, + processedIdsSet, + progress, + ctx, + ); + + progress.currentDelay = updatedProgress.currentDelay; + progress.failedRequests = updatedProgress.failedRequests; + progress.lastSuccessTime = updatedProgress.lastSuccessTime; + progress.retryQueue = updatedProgress.retryQueue; + + itemsToStream.push(...processedItems); + + consecutiveExisting = newConsecutiveExisting; + if (consecutiveExisting >= 20) { + progress.done = true; + } + + if (itemsToStream.length > 0 && progress.streamingStarted) { try { - full = await fetchMessageContent(msg.id); - } catch (e) { - console.error(`[Messages job] content fetch failed (id ${id}):`, e); - continue; + await vectorWorker.streamItems(itemsToStream); + itemsStreamedToVector += itemsToStream.length; + console.log( + `[Messages job] Streamed ${itemsToStream.length} items to vector worker (total: ${itemsStreamedToVector})`, + ); + } catch (error) { + console.warn( + "[Messages job] Failed to stream items to vector worker:", + error, + ); } - if (full.status !== "200") continue; - - const item: IndexItem = { - id, - text: msg.subject, - category: "messages", - content: `${htmlToPlainText(full.payload.contents)}\nFrom: ${msg.sender}`, - dateAdded: new Date(msg.date).getTime(), - metadata: { - messageId: msg.id, - author: msg.sender, - senderId: msg.sender_id, - senderType: msg.sender_type, - timestamp: msg.date, - hasAttachments: msg.attachments, - attachmentCount: msg.attachmentCount, - read: msg.read === 1, - }, - actionId: "message", - renderComponentId: "message", - }; - - await ctx.addItem(item); - existingIds.add(id); } if (!list.payload.hasMore) progress.done = true; - progress.offset += limit; - await ctx.setProgress(progress); + progress.offset += progress.currentBatchSize; + + progressUpdateCounter++; + if (progressUpdateCounter >= 10 || progress.done) { + progress.processedIds = Array.from(processedIdsSet); + await ctx.setProgress(progress); + progressUpdateCounter = 0; + + console.log( + `[Messages job] Progress: offset=${progress.offset}, batchSize=${progress.currentBatchSize}, delay=${progress.currentDelay}ms, failures=${progress.failedRequests}, retryQueue=${progress.retryQueue.length}, vectorStreamed=${itemsStreamedToVector}, parallelRequests=${RATE_LIMIT_CONFIG.parallelRequests}`, + ); + } } - if (progress.done) await ctx.setProgress({ offset: 0, done: false }); + if (progress.streamingStarted) { + try { + await vectorWorker.endStreamingSession(); + console.log( + `[Messages job] Ended streaming session. Total items streamed: ${itemsStreamedToVector}`, + ); + } catch (error) { + console.warn("[Messages job] Failed to end streaming session:", error); + } + } + + if (progress.done) { + await ctx.setProgress({ + offset: 0, + done: false, + currentBatchSize: RATE_LIMIT_CONFIG.baseBatchSize, + currentDelay: RATE_LIMIT_CONFIG.baseDelay, + failedRequests: 0, + lastSuccessTime: Date.now(), + retryQueue: progress.retryQueue.slice(0, 20), + processedIds: [], + streamingStarted: false, + totalEstimated: 0, + }); + } else { + progress.processedIds = Array.from(processedIdsSet); + await ctx.setProgress(progress); + } return []; }, diff --git a/src/plugins/built-in/globalSearch/src/indexing/jobs/notifications.ts b/src/plugins/built-in/globalSearch/src/indexing/jobs/notifications.ts index 62492a96..be3e4185 100644 --- a/src/plugins/built-in/globalSearch/src/indexing/jobs/notifications.ts +++ b/src/plugins/built-in/globalSearch/src/indexing/jobs/notifications.ts @@ -1,8 +1,18 @@ import type { Job, IndexItem } from "../types"; import { htmlToPlainText } from "../utils"; import { fetchMessageContent } from "./messages"; +import { delay } from "@/seqta/utils/delay"; +import { VectorWorkerManager } from "../worker/vectorWorkerManager"; + +const NOTIFICATIONS_RATE_LIMIT = { + baseDelay: 150, + maxDelay: 3000, + backoffMultiplier: 1.4, + maxRetries: 2, + batchDelay: 100, + vectorBatchSize: 3, +}; -/* ------------- Notification types ------------- */ interface MessageNotification { notificationID: number; type: "message"; @@ -28,10 +38,13 @@ interface AssessmentNotification { type Notification = MessageNotification | AssessmentNotification; interface NotificationsProgress { - lastTs: number; // ms since epoch of last processed notification + lastTs: number; + failedRequests: number; + currentDelay: number; + retryQueue: number[]; + streamingStarted: boolean; } -/* ------------- Helpers ------------- */ const fetchNotifications = async () => { const res = await fetch(`${location.origin}/seqta/student/heartbeat?`, { method: "POST", @@ -49,9 +62,9 @@ const fetchAssessmentName = async ( assessmentId: number, metaclassId: number, programmeId: number, + retryCount = 0, ): Promise => { const searchAssessment = (data: any): string | null => { - // Search syllabus for (const item of data.syllabus || []) { const found = (item.assessments || []).find( (a: any) => a.id === assessmentId, @@ -59,13 +72,11 @@ const fetchAssessmentName = async ( if (found) return found.title; } - // Search pending const foundPending = (data.pending || []).find( (a: any) => a.id === assessmentId, ); if (foundPending) return foundPending.title; - // Search tasks const foundTask = (data.tasks || []).find( (a: any) => a.id === assessmentId, ); @@ -75,38 +86,80 @@ const fetchAssessmentName = async ( }; const fetchAssessments = async (endpoint: string) => { - const res = await fetch(`${location.origin}${endpoint}`, { - method: "POST", - credentials: "include", - body: JSON.stringify({ - metaclass: metaclassId, - programme: programmeId, - }), - }); - const json = await res.json(); - return json.payload; + try { + const res = await fetch(`${location.origin}${endpoint}`, { + method: "POST", + credentials: "include", + body: JSON.stringify({ + metaclass: metaclassId, + programme: programmeId, + }), + }); + + if (!res.ok) { + throw new Error(`HTTP ${res.status}: ${res.statusText}`); + } + + const json = await res.json(); + return json.payload; + } catch (error) { + console.warn( + `[Notifications job] Failed to fetch assessments from ${endpoint} (attempt ${retryCount + 1}):`, + error, + ); + + if (retryCount < NOTIFICATIONS_RATE_LIMIT.maxRetries) { + const retryDelay = + NOTIFICATIONS_RATE_LIMIT.baseDelay * + Math.pow(NOTIFICATIONS_RATE_LIMIT.backoffMultiplier, retryCount); + await delay(Math.min(retryDelay, NOTIFICATIONS_RATE_LIMIT.maxDelay)); + return fetchAssessments(endpoint); + } + + throw error; + } }; - // Try from /past - let payload = await fetchAssessments("/seqta/student/assessment/list/past"); - let title = searchAssessment(payload); - if (title) return title; + try { + let payload = await fetchAssessments("/seqta/student/assessment/list/past"); + let title = searchAssessment(payload); + if (title) return title; - // Try from /upcoming if not found in /past - const upcomingPayload = await fetchAssessments( - "/seqta/student/assessment/list/upcoming", - ); - const foundUpcoming = (upcomingPayload || []).find( - (a: any) => a.id === assessmentId, - ); - if (foundUpcoming) return foundUpcoming.title; + await delay(NOTIFICATIONS_RATE_LIMIT.baseDelay); - throw new Error( - `Assessment with ID ${assessmentId} not found in past or upcoming.`, - ); + const upcomingPayload = await fetchAssessments( + "/seqta/student/assessment/list/upcoming", + ); + const foundUpcoming = (upcomingPayload || []).find( + (a: any) => a.id === assessmentId, + ); + if (foundUpcoming) return foundUpcoming.title; + + throw new Error( + `Assessment with ID ${assessmentId} not found in past or upcoming.`, + ); + } catch (error) { + if (retryCount < NOTIFICATIONS_RATE_LIMIT.maxRetries) { + const retryDelay = + NOTIFICATIONS_RATE_LIMIT.baseDelay * + Math.pow(NOTIFICATIONS_RATE_LIMIT.backoffMultiplier, retryCount); + await delay(Math.min(retryDelay, NOTIFICATIONS_RATE_LIMIT.maxDelay)); + return fetchAssessmentName( + assessmentId, + metaclassId, + programmeId, + retryCount + 1, + ); + } + + console.error( + `[Notifications job] Failed to fetch assessment name for ID ${assessmentId} after ${retryCount + 1} attempts:`, + error, + ); + return `Assessment ${assessmentId}`; + } }; -/* ------------- Job ------------- */ export const notificationsJob: Job = { id: "notifications", label: "Notifications", @@ -116,6 +169,10 @@ export const notificationsJob: Job = { run: async (ctx) => { const progress = (await ctx.getProgress()) ?? { lastTs: 0, + failedRequests: 0, + currentDelay: NOTIFICATIONS_RATE_LIMIT.baseDelay, + retryQueue: [], + streamingStarted: false, }; let notifications: Notification[]; @@ -123,72 +180,221 @@ export const notificationsJob: Job = { notifications = await fetchNotifications(); } catch (e) { console.error("[Notifications job] fetch failed:", e); + progress.failedRequests++; + progress.currentDelay = Math.min( + progress.currentDelay * NOTIFICATIONS_RATE_LIMIT.backoffMultiplier, + NOTIFICATIONS_RATE_LIMIT.maxDelay, + ); + await ctx.setProgress(progress); return []; } + const vectorWorker = VectorWorkerManager.getInstance(); + if (!progress.streamingStarted && notifications.length > 0) { + const estimatedTotal = Math.min(notifications.length * 1.2, 100); + + try { + await vectorWorker.startStreamingSession( + estimatedTotal, + (progressData) => { + console.log( + `[Notifications job] Vector streaming progress: ${progressData.processed}/${progressData.total} (${progressData.status})`, + ); + }, + NOTIFICATIONS_RATE_LIMIT.vectorBatchSize, + ); + progress.streamingStarted = true; + console.log( + `[Notifications job] Started streaming vectorization session for ~${estimatedTotal} items`, + ); + } catch (error) { + console.warn( + "[Notifications job] Failed to start streaming session:", + error, + ); + } + } + const notificationIsIndexed = async (id: string): Promise => { - const [inAssessments, inMessages] = await Promise.all([ - ctx - .getStoredItems("notifications") - .then((items) => items.some((i) => i.id === id)), - ctx - .getStoredItems("messages") - .then((items) => items.some((i) => i.id === id)), - ]); - return inAssessments || inMessages; + try { + const [inAssessments, inMessages] = await Promise.all([ + ctx + .getStoredItems("notifications") + .then((items) => items.some((i) => i.id === id)), + ctx + .getStoredItems("messages") + .then((items) => items.some((i) => i.id === id)), + ]); + return inAssessments || inMessages; + } catch (error) { + console.warn( + `[Notifications job] Error checking if notification ${id} is indexed:`, + error, + ); + return false; + } }; const items: IndexItem[] = []; + const itemsToStream: IndexItem[] = []; + let processedCount = 0; + let progressUpdateCounter = 0; + let itemsStreamedToVector = 0; - for (const notif of notifications) { - const id = notif.notificationID.toString(); - if (await notificationIsIndexed(id)) continue; + if (progress.retryQueue.length > 0) { + console.log( + `[Notifications job] Processing ${Math.min(progress.retryQueue.length, 3)} items from retry queue`, + ); - if (notif.type === "coneqtassessments") { - const a = notif.coneqtAssessments; + const retryBatch = progress.retryQueue.slice(0, 3); - const content = await fetchAssessmentName( - a.assessmentID, - a.metaclassID, - a.programmeID, + for (const notificationId of retryBatch) { + const notification = notifications.find( + (n) => n.notificationID === notificationId, ); - items.push({ - id, - text: a.title, - category: "assessments", - content: content, - dateAdded: new Date(notif.timestamp).getTime(), - metadata: { - assessmentId: a.assessmentID, - subject: a.subjectCode, - term: a.term, - programmeId: a.programmeID, - metaclassId: a.metaclassID, - timestamp: notif.timestamp, - }, - actionId: "assessment", - renderComponentId: "assessment", - }); - } else if (notif.type === "message") { - const content = await fetchMessageContent(notif.message.messageID); + if (!notification) { + progress.retryQueue = progress.retryQueue.filter( + (id) => id !== notificationId, + ); + continue; + } - await ctx.addItem( - { - id, - text: notif.message.title, - category: "messages", - content: `${htmlToPlainText(content.payload.contents)}\nFrom: ${notif.message.subtitle}`, - dateAdded: new Date(notif.timestamp).getTime(), - metadata: { - messageId: notif.message.messageID, - author: notif.message.subtitle, - timestamp: notif.timestamp, - isAssessmentNotification: true, - }, - actionId: "message", - renderComponentId: "message", - }, - "messages", + await delay(progress.currentDelay); + + try { + const { success, item } = await processNotification( + notification, + ctx, + ); + if (success) { + progress.retryQueue = progress.retryQueue.filter( + (id) => id !== notificationId, + ); + progress.failedRequests = Math.max(0, progress.failedRequests - 1); + progress.currentDelay = Math.max( + progress.currentDelay * 0.9, + NOTIFICATIONS_RATE_LIMIT.baseDelay, + ); + + if (item) { + items.push(item); + itemsToStream.push(item); + } + } + } catch (error) { + console.error( + `[Notifications job] Retry failed for notification ${notificationId}:`, + error, + ); + progress.failedRequests++; + } + } + } + + const notificationsToProcess = notifications.slice(0, 20); + + for (const notif of notificationsToProcess) { + const id = notif.notificationID.toString(); + + try { + if (await notificationIsIndexed(id)) continue; + if (progress.retryQueue.includes(notif.notificationID)) continue; + + if (processedCount > 0) { + await delay(NOTIFICATIONS_RATE_LIMIT.batchDelay); + } + + const { success, item } = await processNotification( + notif, + ctx, + ); + if (!success) { + if (progress.retryQueue.length < 10) { + progress.retryQueue.push(notif.notificationID); + } + progress.failedRequests++; + } else { + progress.failedRequests = Math.max(0, progress.failedRequests - 1); + progress.currentDelay = Math.max( + progress.currentDelay * 0.95, + NOTIFICATIONS_RATE_LIMIT.baseDelay, + ); + + if (item) { + items.push(item); + itemsToStream.push(item); + } + } + } catch (error) { + console.error( + `[Notifications job] Failed to process notification ${id}:`, + error, + ); + + if (progress.retryQueue.length < 10) { + progress.retryQueue.push(notif.notificationID); + } + progress.failedRequests++; + progress.currentDelay = Math.min( + progress.currentDelay * NOTIFICATIONS_RATE_LIMIT.backoffMultiplier, + NOTIFICATIONS_RATE_LIMIT.maxDelay, + ); + } + + processedCount++; + + if ( + itemsToStream.length >= NOTIFICATIONS_RATE_LIMIT.vectorBatchSize && + progress.streamingStarted + ) { + try { + await vectorWorker.streamItems([...itemsToStream]); + itemsStreamedToVector += itemsToStream.length; + console.log( + `[Notifications job] Streamed ${itemsToStream.length} items to vector worker (total: ${itemsStreamedToVector})`, + ); + itemsToStream.length = 0; + } catch (error) { + console.warn( + "[Notifications job] Failed to stream items to vector worker:", + error, + ); + } + } + + progressUpdateCounter++; + if (progressUpdateCounter >= 5) { + await ctx.setProgress(progress); + progressUpdateCounter = 0; + } + } + + if (itemsToStream.length > 0 && progress.streamingStarted) { + try { + await vectorWorker.streamItems([...itemsToStream]); + itemsStreamedToVector += itemsToStream.length; + console.log( + `[Notifications job] Streamed final ${itemsToStream.length} items to vector worker (total: ${itemsStreamedToVector})`, + ); + } catch (error) { + console.warn( + "[Notifications job] Failed to stream final items to vector worker:", + error, + ); + } + } + + if (progress.streamingStarted) { + try { + await vectorWorker.endStreamingSession(); + console.log( + `[Notifications job] Ended streaming session. Total items streamed: ${itemsStreamedToVector}`, + ); + progress.streamingStarted = false; + } catch (error) { + console.warn( + "[Notifications job] Failed to end streaming session:", + error, ); } } @@ -198,9 +404,14 @@ export const notificationsJob: Job = { ...items.map((i) => i.dateAdded), progress.lastTs, ); - await ctx.setProgress({ lastTs: latest }); + progress.lastTs = latest; } + await ctx.setProgress(progress); + console.log( + `[Notifications job] Processed ${processedCount} notifications, ${progress.retryQueue.length} in retry queue, ${progress.failedRequests} failures, ${itemsStreamedToVector} items streamed to vector worker`, + ); + return items; }, @@ -211,3 +422,73 @@ export const notificationsJob: Job = { return items.filter((i) => i.dateAdded >= date.getTime()); }, }; + +async function processNotification( + notif: Notification, + ctx: any, +): Promise<{ success: boolean; item?: IndexItem }> { + const id = notif.notificationID.toString(); + + try { + if (notif.type === "coneqtassessments") { + const a = notif.coneqtAssessments; + + const content = await fetchAssessmentName( + a.assessmentID, + a.metaclassID, + a.programmeID, + ); + + const item: IndexItem = { + id, + text: a.title, + category: "assessments", + content: content, + dateAdded: new Date(notif.timestamp).getTime(), + metadata: { + assessmentId: a.assessmentID, + subject: a.subjectCode, + term: a.term, + programmeId: a.programmeID, + metaclassId: a.metaclassID, + timestamp: notif.timestamp, + }, + actionId: "assessment", + renderComponentId: "assessment", + }; + + return { success: true, item }; + } else if (notif.type === "message") { + const content = await fetchMessageContent(notif.message.messageID); + + if (content && content.payload) { + const item: IndexItem = { + id, + text: notif.message.title, + category: "messages", + content: `${htmlToPlainText(content.payload.contents)}\nFrom: ${notif.message.subtitle}`, + dateAdded: new Date(notif.timestamp).getTime(), + metadata: { + messageId: notif.message.messageID, + author: notif.message.subtitle, + timestamp: notif.timestamp, + isAssessmentNotification: true, + }, + actionId: "message", + renderComponentId: "message", + }; + + await ctx.addItem(item, "messages"); + return { success: true, item }; + } + } + + return { success: false }; + } catch (error) { + console.error( + `[Notifications job] Error processing notification ${id}:`, + error, + ); + return { success: false }; + } +} diff --git a/src/plugins/built-in/globalSearch/src/indexing/jobs/subjects.ts b/src/plugins/built-in/globalSearch/src/indexing/jobs/subjects.ts index 85117daa..af3635c8 100755 --- a/src/plugins/built-in/globalSearch/src/indexing/jobs/subjects.ts +++ b/src/plugins/built-in/globalSearch/src/indexing/jobs/subjects.ts @@ -16,7 +16,10 @@ export const subjectsJob: Job = { id: "subjects", label: "Subjects", renderComponentId: "subject", - frequency: "pageLoad", + frequency: { + type: "expiry", + afterMs: 1000 * 60 * 60 * 24 * 30, + }, boostCriteria: (item, searchTerm) => { if (searchTerm == "") { return -100; diff --git a/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorker.ts b/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorker.ts index 26cacbd4..589b6df2 100644 --- a/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorker.ts +++ b/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorker.ts @@ -5,6 +5,16 @@ let vectorIndex: EmbeddingIndex | null = null; let isInitialized = false; let currentAbortController: AbortController | null = null; +let streamingSession: { + isActive: boolean; + totalExpected: number; + totalReceived: number; + totalProcessed: number; + batchSize: number; + pendingItems: IndexItem[]; + processingPromise: Promise | null; +} | null = null; + async function initWorker() { if (isInitialized) { console.debug("Vector worker already initialized."); @@ -28,16 +38,14 @@ async function initWorker() { console.debug("Vector worker initialized successfully."); } catch (e) { console.error("Failed to initialize vector worker:", e); - // Set as initialized even on error to prevent retries, but index will be null isInitialized = true; - vectorIndex = null; // Ensure index is null on error + vectorIndex = null; } } async function vectorizeItem( item: IndexItem, ): Promise<(IndexItem & { embedding: number[] }) | null> { - // Simplified for brevity - assumes embedding function doesn't need cancellation signal try { const textToEmbed = [ item.text, @@ -53,19 +61,246 @@ async function vectorizeItem( return { ...item, embedding }; } catch (error) { console.error(`Error vectorizing item ${item.id}:`, error); - return null; // Return null if vectorization fails for an item + return null; + } +} + +async function startStreamingSession( + totalExpected: number, + batchSize: number = 5, +) { + if (!vectorIndex) { + console.warn( + "Streaming requested but vector index not ready. Attempting init.", + ); + await initWorker(); + if (!vectorIndex) { + self.postMessage({ + type: "progress", + data: { + status: "error", + message: + "Vector index not available for streaming after init attempt.", + }, + }); + return; + } + } + + if (streamingSession?.isActive) { + await endStreamingSession(); + } + + streamingSession = { + isActive: true, + totalExpected, + totalReceived: 0, + totalProcessed: 0, + batchSize, + pendingItems: [], + processingPromise: null, + }; + + console.debug( + `Started streaming session for ${totalExpected} items with batch size ${batchSize}`, + ); + + self.postMessage({ + type: "streamingProgress", + data: { + processed: 0, + total: totalExpected, + message: "Streaming session started", + }, + }); +} + +async function processStreamingBatch( + items: IndexItem[], + isLast: boolean = false, +) { + if (!streamingSession?.isActive) { + console.warn("Received streaming batch but no active session"); + return; + } + + streamingSession.totalReceived += items.length; + streamingSession.pendingItems.push(...items); + + console.debug( + `Received streaming batch: ${items.length} items (${streamingSession.totalReceived}/${streamingSession.totalExpected})`, + ); + + const shouldProcess = + streamingSession.pendingItems.length >= streamingSession.batchSize || + isLast; + + if (shouldProcess && !streamingSession.processingPromise) { + streamingSession.processingPromise = processStreamingItems(); + } +} + +async function processStreamingItems() { + if (!streamingSession?.isActive || !vectorIndex) { + return; + } + + while ( + streamingSession.pendingItems.length > 0 && + streamingSession.isActive + ) { + const batchToProcess = streamingSession.pendingItems.splice( + 0, + streamingSession.batchSize, + ); + + const unprocessedItems = batchToProcess.filter((item) => { + try { + return !vectorIndex!.get({ id: item.id }); + } catch (e) { + return true; + } + }); + + if (unprocessedItems.length === 0) { + streamingSession.totalProcessed += batchToProcess.length; + continue; + } + + const vectorizationResults = await Promise.all( + unprocessedItems.map(vectorizeItem), + ); + const successfullyVectorized = vectorizationResults.filter( + (result) => result !== null, + ) as (IndexItem & { embedding: number[] })[]; + + if (successfullyVectorized.length > 0) { + try { + successfullyVectorized.forEach((item) => vectorIndex!.add(item)); + + if ( + streamingSession.totalProcessed % (streamingSession.batchSize * 3) === + 0 + ) { + await vectorIndex!.saveIndex("indexedDB"); + console.debug( + `Saved streaming index at ${streamingSession.totalProcessed} processed items`, + ); + } + } catch (e) { + console.error("Error processing streaming batch:", e); + } + } + + streamingSession.totalProcessed += batchToProcess.length; + + self.postMessage({ + type: "streamingProgress", + data: { + processed: streamingSession.totalProcessed, + total: streamingSession.totalExpected, + message: `Processed ${streamingSession.totalProcessed}/${streamingSession.totalExpected} items`, + }, + }); + + await new Promise((resolve) => setTimeout(resolve, 10)); + } + + streamingSession.processingPromise = null; + + if ( + streamingSession.totalReceived >= streamingSession.totalExpected && + streamingSession.pendingItems.length === 0 + ) { + await finalizeStreamingSession(); + } +} + +async function finalizeStreamingSession() { + if (!streamingSession?.isActive) { + return; + } + + try { + if (vectorIndex) { + await vectorIndex.saveIndex("indexedDB"); + console.debug("Final save of streaming index completed"); + } + } catch (e) { + console.error("Error in final streaming save:", e); + } + + const totalProcessed = streamingSession.totalProcessed; + const totalExpected = streamingSession.totalExpected; + + streamingSession.isActive = false; + + self.postMessage({ + type: "progress", + data: { + status: "complete", + total: totalExpected, + processed: totalProcessed, + message: `Streaming vectorization complete: ${totalProcessed}/${totalExpected} items processed`, + }, + }); + + console.debug( + `Streaming session completed: ${totalProcessed}/${totalExpected} items processed`, + ); +} + +async function endStreamingSession() { + if (!streamingSession?.isActive) { + return; + } + + console.debug("Ending streaming session..."); + + if (streamingSession.processingPromise) { + await streamingSession.processingPromise; + } + + if (streamingSession.pendingItems.length > 0) { + console.debug( + `Processing ${streamingSession.pendingItems.length} remaining items before ending session`, + ); + streamingSession.processingPromise = processStreamingItems(); + await streamingSession.processingPromise; + } + + try { + if (vectorIndex) { + await vectorIndex.saveIndex("indexedDB"); + console.debug("Final save before ending streaming session"); + } + } catch (e) { + console.error("Error in final save before ending session:", e); + } + + const wasActive = streamingSession.isActive; + streamingSession.isActive = false; + + if (wasActive) { + self.postMessage({ + type: "progress", + data: { + status: "cancelled", + message: "Streaming session ended early", + }, + }); } } async function processItems(items: IndexItem[], signal: AbortSignal) { console.debug("Worker received process request."); + if (!vectorIndex) { console.warn( "Processing requested but vector index not ready. Attempting init.", ); - await initWorker(); // Attempt initialization if not ready + await initWorker(); if (!vectorIndex) { - // Check again after attempt self.postMessage({ type: "progress", data: { @@ -78,13 +313,11 @@ async function processItems(items: IndexItem[], signal: AbortSignal) { } } - // Find items we haven't processed yet by checking against the index instance const unprocessedItems = items.filter((item) => { - if (signal.aborted) return false; // Check cancellation during filtering + if (signal.aborted) return false; try { return !vectorIndex!.get({ id: item.id }); } catch (e) { - // If get throws (e.g., item not found), it means it's unprocessed return true; } }); @@ -136,7 +369,6 @@ async function processItems(items: IndexItem[], signal: AbortSignal) { } const batch = unprocessedItems.slice(i, i + BATCH_SIZE); - // Vectorize batch const vectorizationResults = await Promise.all(batch.map(vectorizeItem)); const successfullyVectorized = vectorizationResults.filter( (result) => result !== null, @@ -154,7 +386,6 @@ async function processItems(items: IndexItem[], signal: AbortSignal) { return; } - // Add successfully vectorized items to index if (successfullyVectorized.length > 0) { try { successfullyVectorized.forEach((item) => vectorIndex!.add(item)); @@ -164,8 +395,6 @@ async function processItems(items: IndexItem[], signal: AbortSignal) { type: "progress", data: { status: "error", message: `Error adding to index: ${e}` }, }); - // Decide whether to continue or stop on error - // return; // Example: Stop processing if adding fails } } @@ -181,7 +410,6 @@ async function processItems(items: IndexItem[], signal: AbortSignal) { return; } - // Save index after processing the batch try { await vectorIndex!.saveIndex("indexedDB"); console.debug(`Saved index after processing batch ${i / BATCH_SIZE + 1}`); @@ -191,13 +419,10 @@ async function processItems(items: IndexItem[], signal: AbortSignal) { type: "progress", data: { status: "error", message: `Error saving index batch: ${e}` }, }); - // Continue processing next batch even if saving failed? Or stop? - // return; // Example: Stop if saving fails } processedCount = Math.min(i + BATCH_SIZE, unprocessedItems.length); - // Report progress self.postMessage({ type: "progress", data: { @@ -207,7 +432,6 @@ async function processItems(items: IndexItem[], signal: AbortSignal) { }, }); - // Yield control briefly to allow other messages (like cancellation) to be processed await new Promise((resolve) => setTimeout(resolve, 0)); } @@ -219,196 +443,49 @@ async function processItems(items: IndexItem[], signal: AbortSignal) { }); } else { console.debug("Processing completed, but was cancelled."); - // No need to send 'cancelled' again if already sent during batching - // self.postMessage({ type: 'progress', data: { status: 'cancelled', message: 'Processing finished but was cancelled' }}); } } -async function search( - query: string, - topK: number, - signal: AbortSignal, - messageId: string, -) { - console.debug( - `Worker received search request (ID: ${messageId}): "${query}"`, - ); - if (!vectorIndex) { - console.warn( - `Search (ID: ${messageId}) requested but vector index not ready. Attempting init.`, - ); - await initWorker(); // Attempt initialization - // Re-check after waiting/init attempt - if (!vectorIndex) { - console.error( - `Search (ID: ${messageId}) failed: Vector index unavailable after init attempt.`, - ); - self.postMessage({ - type: "searchError", - data: { messageId, error: "Vector index not available." }, - }); - return; - } - console.debug( - `Vector index ready after init for search (ID: ${messageId}).`, - ); - } - - if (signal.aborted) { - console.debug(`Search (ID: ${messageId}) cancelled before starting.`); - self.postMessage({ type: "searchCancelled", data: { messageId } }); - return; - } - - try { - console.debug(`Getting embedding for query (ID: ${messageId})...`); - const queryEmbedding = await getEmbedding(query); - - if (signal.aborted) { - console.debug(`Search (ID: ${messageId}) cancelled after embedding.`); - self.postMessage({ type: "searchCancelled", data: { messageId } }); - return; - } - - console.debug(`Performing vector search (ID: ${messageId})...`); - // Await the search and let TypeScript infer the type - const results = await vectorIndex!.search(queryEmbedding, { - topK, - useStorage: "indexedDB", // Ensure we search the stored index - }); - - console.debug( - `Vector search (ID: ${messageId}) completed with ${results.length} results.`, - ); - - if (signal.aborted) { - console.debug( - `Search (ID: ${messageId}) cancelled after search completed, discarding results.`, - ); - self.postMessage({ type: "searchCancelled", data: { messageId } }); - return; - } - - // Post results back to the main thread - self.postMessage({ type: "searchResults", data: { messageId, results } }); - } catch (error) { - console.error(`Vector search error in worker (ID: ${messageId}):`, error); - // Ensure signal isn't checked *after* an error occurred before posting error message - if (!signal.aborted) { - // Only post error if not cancelled - self.postMessage({ - type: "searchError", - data: { - messageId, - error: error instanceof Error ? error.message : String(error), - }, - }); - } else { - console.debug( - `Search (ID: ${messageId}) encountered error but was cancelled, suppressing error message.`, - ); - self.postMessage({ type: "searchCancelled", data: { messageId } }); // Still notify of cancellation - } - } -} - -// Handle messages from the main thread self.addEventListener("message", async (e) => { - // Make sure data and type exist - if (!e.data || !e.data.type) { - console.warn("Worker received message with no data or type."); - return; - } - - const { type, data, messageId } = e.data; // messageId used for requests needing response/cancellation tracking - - // Cancel previous long-running operation (process or search) if a new one starts - if (type === "process" || type === "search") { - if (currentAbortController) { - console.debug( - `Worker cancelling previous operation due to new '${type}' request.`, - ); - currentAbortController.abort(`New '${type}' operation requested`); - } - currentAbortController = new AbortController(); - console.debug(`Worker starting new '${type}' operation.`); - } - - // Use the signal from the *current* controller for the task being started - const signal = currentAbortController?.signal; + const { type, data } = e.data; switch (type) { - case "process": - if (signal && data?.items) { - await processItems(data.items, signal); - } else if (!signal) { - console.error( - "Process message received but no abort signal available.", - ); - } else if (!data?.items) { - console.error("Process message received without 'items' data."); - self.postMessage({ - type: "progress", - data: { - status: "error", - message: "Process command received without items.", - }, - }); - } - break; - - case "search": - if (signal && messageId && typeof data?.query === "string") { - await search(data.query, data.topK ?? 10, signal, messageId); - } else { - const errorReason = !signal - ? "Missing signal" - : !messageId - ? "Missing messageId" - : "Missing or invalid query"; - console.error(`Search message received invalid: ${errorReason}.`, { - data, - messageId, - signalExists: !!signal, - }); - // Send an error back if messageId exists - if (messageId) { - self.postMessage({ - type: "searchError", - data: { messageId, error: `Worker internal error: ${errorReason}` }, - }); - } - } - break; - case "init": - // Init should not be cancellable in the same way, it's foundational - // Check if already initialized before potentially running it again - if (!isInitialized) { - await initWorker(); - self.postMessage({ type: "ready" }); // Signal ready *after* init attempt - } else { - console.debug("Received init message, but worker already initialized."); - self.postMessage({ type: "ready" }); // Signal ready anyway - } + await initWorker(); + self.postMessage({ type: "ready" }); break; - // No explicit 'cancel' case needed as new tasks auto-cancel previous ones + case "process": + if (currentAbortController) { + currentAbortController.abort(); + } + currentAbortController = new AbortController(); + await processItems(data.items, currentAbortController.signal); + break; + + case "startStreaming": + await startStreamingSession(data.totalExpected, data.batchSize); + break; + + case "streamBatch": + await processStreamingBatch(data.items, data.isLast); + break; + + case "endStreaming": + await endStreamingSession(); + break; default: - console.warn("Unknown message type received by vector worker:", type); + console.warn("Unknown message type:", type); } }); -// Initial check or trigger for initialization when the worker starts initWorker() .then(() => { self.postMessage({ type: "ready" }); }) .catch((err) => { console.error("Initial worker initialization failed:", err); - // Still need to signal readiness, perhaps with an error state? - // Or rely on the first 'process' or 'search' to retry init. - // For now, just signal ready, but the index might be null. + self.postMessage({ type: "ready" }); }); diff --git a/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorkerManager.ts b/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorkerManager.ts index efcc9cfd..02736ecf 100644 --- a/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorkerManager.ts +++ b/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorkerManager.ts @@ -1,7 +1,6 @@ import { refreshVectorCache } from "../../search/vector/vectorSearch"; import type { IndexItem } from "../types"; import vectorWorker from "./vectorWorker.ts?inlineWorker"; -import type { SearchResult } from "embeddia"; export type ProgressCallback = (data: { status: "started" | "processing" | "complete" | "error" | "cancelled"; @@ -14,28 +13,19 @@ export class VectorWorkerManager { private static instance: VectorWorkerManager; private worker: Worker | null = null; private isInitialized = false; - private readyPromise: Promise | null = null; // To await initialization + private readyPromise: Promise | null = null; private progressCallback: ProgressCallback | null = null; - private searchPromises = new Map< - string, - { - resolve: (value: SearchResult[]) => void; - reject: (reason?: any) => void; - timer: NodeJS.Timeout; - } - >(); - private debounceTimer: NodeJS.Timeout | null = null; - private lastSearchParams: { - query: string; - topK: number; - resolve: (results: SearchResult[]) => void; - reject: (reason?: any) => void; + + private streamingSession: { + isActive: boolean; + totalExpected: number; + totalSent: number; + batchBuffer: IndexItem[]; + batchSize: number; + flushTimer: NodeJS.Timeout | null; } | null = null; - private constructor() { - // Start initialization immediately, but allow awaiting it - this.readyPromise = this.initWorker(); - } + private constructor() {} static getInstance(): VectorWorkerManager { if (!VectorWorkerManager.instance) { @@ -45,26 +35,25 @@ export class VectorWorkerManager { } private async initWorker(): Promise { - // If already initialized or initializing, return the existing promise if (this.isInitialized) return Promise.resolve(); if (this.readyPromise) return this.readyPromise; + console.debug("Lazy-loading vector worker..."); + return new Promise((resolve, reject) => { - // Create the worker this.worker = vectorWorker(); console.log("Worker initialized", this.worker); const timeout = setTimeout(() => { console.error("Vector worker initialization timed out"); - this.worker?.terminate(); // Clean up worker if it exists + this.worker?.terminate(); this.worker = null; - this.isInitialized = false; // Ensure state reflects failure - this.readyPromise = null; // Allow retrying init later + this.isInitialized = false; + this.readyPromise = null; reject(new Error("Worker initialization timed out")); - }, 10000); // Increased timeout + }, 10000); - // Set up message handling this.worker!.addEventListener("message", (e) => { const { type, data } = e.data; console.debug("Message from vector worker:", type, data); @@ -74,7 +63,7 @@ export class VectorWorkerManager { this.isInitialized = true; clearTimeout(timeout); console.debug("Vector worker initialized and ready."); - resolve(); // Resolve the init promise + resolve(); break; case "progress": @@ -83,50 +72,23 @@ export class VectorWorkerManager { if (data.status === "complete") { refreshVectorCache(); + + if (this.streamingSession?.isActive) { + this.endStreamingSession(); + } } } break; - case "searchResults": - const searchInfo = this.searchPromises.get(data.messageId); - if (searchInfo) { - clearTimeout(searchInfo.timer); // Clear timeout on success - searchInfo.resolve(data.results); - this.searchPromises.delete(data.messageId); - } else { - console.warn( - "Received search results for unknown messageId:", - data.messageId, - ); - } - break; - - case "searchError": - const errorInfo = this.searchPromises.get(data.messageId); - if (errorInfo) { - clearTimeout(errorInfo.timer); // Clear timeout on error - errorInfo.reject(new Error(data.error)); - this.searchPromises.delete(data.messageId); - } else { - console.warn( - "Received search error for unknown messageId:", - data.messageId, - ); - } - break; - - case "searchCancelled": - const cancelledInfo = this.searchPromises.get(data.messageId); - if (cancelledInfo) { - clearTimeout(cancelledInfo.timer); // Clear timeout on cancel - // Reject with a specific cancellation error or resolve with empty? Let's reject. - cancelledInfo.reject(new Error("Search cancelled by worker")); - this.searchPromises.delete(data.messageId); - } else { - console.debug( - "Received cancellation for unknown messageId:", - data.messageId, - ); + case "streamingProgress": + if (this.progressCallback && this.streamingSession?.isActive) { + const { processed } = data; + this.progressCallback({ + status: "processing", + processed, + total: this.streamingSession.totalExpected, + message: `Streaming vectorization: ${processed}/${this.streamingSession.totalExpected} items`, + }); } break; @@ -135,15 +97,12 @@ export class VectorWorkerManager { } }); - // Initialize the worker this.worker!.postMessage({ type: "init" }); }); } - // Ensures worker is ready before proceeding private async ensureReady() { if (!this.readyPromise) { - // If init wasn't called or failed, try again console.warn("Worker not initialized, attempting init..."); this.readyPromise = this.initWorker(); } @@ -155,113 +114,185 @@ export class VectorWorkerManager { } } - async processItems( - items: IndexItem[], - onProgress?: ProgressCallback, - ) { - await this.ensureReady(); // Wait for worker to be ready + async processItems(items: IndexItem[], onProgress?: ProgressCallback) { + await this.ensureReady(); this.progressCallback = onProgress || null; - // Cancel any ongoing search when starting processing - this.cancelAllSearches("Processing started"); - console.debug(`Sending ${items.length} items to worker for processing.`); - this.worker!.postMessage({ type: "process", data: { items: items }, }); } - // Public search method - public async search( - query: string, - topK: number = 10, - ): Promise { + async startStreamingSession( + totalExpectedItems: number, + onProgress?: ProgressCallback, + batchSize: number = 10, + ): Promise { await this.ensureReady(); - return new Promise((resolve, reject) => { - this.lastSearchParams = { query, topK, resolve, reject }; + if (this.streamingSession?.isActive) { + this.endStreamingSession(); + } - const messageId = crypto.randomUUID(); - if (this.lastSearchParams && this.worker) { - const currentParams = this.lastSearchParams; // Capture current params - this.lastSearchParams = null; // Clear last params *before* posting - this.debounceTimer = null; + this.progressCallback = onProgress || null; - // Set a timeout for the search operation itself - const searchTimeout = 10000; // e.g., 10 seconds - const searchTimer = setTimeout(() => { - if (this.searchPromises.has(messageId)) { - console.error(`Search timed out for messageId: ${messageId}`); - currentParams.reject( - new Error(`Search timed out after ${searchTimeout}ms`), - ); - this.searchPromises.delete(messageId); - } - }, searchTimeout); + this.streamingSession = { + isActive: true, + totalExpected: totalExpectedItems, + totalSent: 0, + batchBuffer: [], + batchSize, + flushTimer: null, + }; - this.searchPromises.set(messageId, { - resolve: currentParams.resolve, - reject: currentParams.reject, - timer: searchTimer, - }); + console.debug( + `Starting streaming session for ${totalExpectedItems} items with batch size ${batchSize}`, + ); - console.debug( - `Sending search request (ID: ${messageId}) to worker: "${currentParams.query}"`, - ); - console.log(this.worker); - this.worker.postMessage({ - type: "search", - data: { query: currentParams.query, topK: currentParams.topK }, - messageId, - }); - } else if (this.lastSearchParams) { - // This case might happen if ensureReady failed but didn't throw - console.error("Worker unavailable when trying to send search request."); - this.lastSearchParams.reject( - new Error("Worker unavailable for search"), - ); - this.lastSearchParams = null; - this.debounceTimer = null; + this.worker!.postMessage({ + type: "startStreaming", + data: { totalExpected: totalExpectedItems, batchSize }, + }); + + if (this.progressCallback) { + this.progressCallback({ + status: "started", + total: totalExpectedItems, + processed: 0, + message: "Starting streaming vectorization", + }); + } + } + + async streamItems(items: IndexItem[]): Promise { + if (!this.streamingSession?.isActive) { + throw new Error( + "No active streaming session. Call startStreamingSession first.", + ); + } + + this.streamingSession.batchBuffer.push(...items); + + if ( + this.streamingSession.batchBuffer.length >= + this.streamingSession.batchSize + ) { + await this.flushBatch(); + } else { + if (this.streamingSession.flushTimer) { + clearTimeout(this.streamingSession.flushTimer); } + + this.streamingSession.flushTimer = setTimeout(() => { + this.flushBatch(); + }, 1000); + } + } + + private async flushBatch(): Promise { + if ( + !this.streamingSession?.isActive || + this.streamingSession.batchBuffer.length === 0 + ) { + return; + } + + const batch = [...this.streamingSession.batchBuffer]; + this.streamingSession.batchBuffer = []; + this.streamingSession.totalSent += batch.length; + + if (this.streamingSession.flushTimer) { + clearTimeout(this.streamingSession.flushTimer); + this.streamingSession.flushTimer = null; + } + + console.debug( + `Streaming batch of ${batch.length} items to worker (${this.streamingSession.totalSent}/${this.streamingSession.totalExpected})`, + ); + + this.worker!.postMessage({ + type: "streamBatch", + data: { + items: batch, + isLast: + this.streamingSession.totalSent >= + this.streamingSession.totalExpected, + }, }); } - // Method to cancel all pending/debounced searches - private cancelAllSearches(reason: string = "Cancelled") { - if (this.debounceTimer) { - clearTimeout(this.debounceTimer); - this.debounceTimer = null; - if (this.lastSearchParams) { - this.lastSearchParams.reject(new Error(`Search cancelled: ${reason}`)); - this.lastSearchParams = null; - } + async endStreamingSession(): Promise { + if (!this.streamingSession?.isActive) { + return; } - // We might also want to tell the worker to cancel its *current* search - // if it supports it, but this requires worker modification. - // For now, just reject pending promises in the manager. - for (const [messageId, promiseInfo] of this.searchPromises.entries()) { - clearTimeout(promiseInfo.timer); - promiseInfo.reject(new Error(`Search cancelled: ${reason}`)); - this.searchPromises.delete(messageId); + + await this.flushBatch(); + + if (this.streamingSession.flushTimer) { + clearTimeout(this.streamingSession.flushTimer); } + + this.streamingSession.isActive = false; + + this.worker!.postMessage({ + type: "endStreaming", + }); + + console.debug("Streaming session ended"); + + if (this.progressCallback) { + this.progressCallback({ + status: "complete", + total: this.streamingSession.totalExpected, + processed: this.streamingSession.totalSent, + message: "Streaming vectorization complete", + }); + } + + this.streamingSession = null; + } + + async streamItem(item: IndexItem): Promise { + return this.streamItems([item]); + } + + isStreamingActive(): boolean { + return this.streamingSession?.isActive ?? false; + } + + getStreamingProgress(): { + sent: number; + expected: number; + buffered: number; + } | null { + if (!this.streamingSession?.isActive) { + return null; + } + + return { + sent: this.streamingSession.totalSent, + expected: this.streamingSession.totalExpected, + buffered: this.streamingSession.batchBuffer.length, + }; } terminate() { console.debug("Terminating Vector Worker Manager..."); - this.cancelAllSearches("Worker terminated"); // Cancel pending searches + + if (this.streamingSession?.isActive) { + this.endStreamingSession(); + } if (this.worker) { this.worker.terminate(); this.worker = null; } this.isInitialized = false; - this.readyPromise = null; // Reset init promise + this.readyPromise = null; this.progressCallback = null; - // Clear the static instance? Or assume app lifecycle handles this? - // VectorWorkerManager.instance = null; // Uncomment if needed } } diff --git a/src/plugins/built-in/globalSearch/src/utils/rateLimiter.ts b/src/plugins/built-in/globalSearch/src/utils/rateLimiter.ts new file mode 100644 index 00000000..917270ff --- /dev/null +++ b/src/plugins/built-in/globalSearch/src/utils/rateLimiter.ts @@ -0,0 +1,230 @@ +import { delay } from "@/seqta/utils/delay"; + +export interface RateLimiterConfig { + minDelay: number; // Minimum delay between requests (ms) + maxDelay: number; // Maximum delay between requests (ms) + baseDelay: number; // Base delay between requests (ms) + backoffMultiplier: number; // Exponential backoff multiplier + maxRetries: number; // Maximum retries for failed requests + adaptiveBatchSize?: boolean; // Enable adaptive batch sizing + minBatchSize?: number; // Minimum batch size + maxBatchSize?: number; // Maximum batch size + baseBatchSize?: number; // Starting batch size +} + +export interface RateLimiterState { + currentDelay: number; + failedRequests: number; + lastSuccessTime: number; + currentBatchSize?: number; +} + +export class RateLimiter { + private config: RateLimiterConfig; + private state: RateLimiterState; + + constructor(config: RateLimiterConfig, initialState?: Partial) { + this.config = config; + this.state = { + currentDelay: config.baseDelay, + failedRequests: 0, + lastSuccessTime: Date.now(), + currentBatchSize: config.baseBatchSize || 50, + ...initialState, + }; + } + + /** + * Wait for the appropriate delay before making the next request + */ + async waitForNext(): Promise { + await delay(this.state.currentDelay); + } + + /** + * Record a successful request and adjust delays accordingly + */ + recordSuccess(responseTime?: number): void { + this.state.lastSuccessTime = Date.now(); + this.state.failedRequests = Math.max(0, this.state.failedRequests - 1); + + if (responseTime !== undefined) { + this.state.currentDelay = this.calculateAdaptiveDelay(responseTime); + + if (this.config.adaptiveBatchSize && this.state.currentBatchSize !== undefined) { + this.state.currentBatchSize = this.calculateAdaptiveBatchSize(responseTime); + } + } + } + + /** + * Record a failed request and increase delays + */ + recordFailure(): void { + this.state.failedRequests++; + this.state.currentDelay = Math.min( + this.state.currentDelay * this.config.backoffMultiplier, + this.config.maxDelay + ); + + if (this.config.adaptiveBatchSize && this.state.currentBatchSize !== undefined) { + this.state.currentBatchSize = Math.max( + Math.floor(this.state.currentBatchSize * 0.7), + this.config.minBatchSize || 10 + ); + } + } + + /** + * Execute a request with automatic retry logic + */ + async executeWithRetry( + requestFn: () => Promise, + retryCount = 0 + ): Promise { + try { + const startTime = Date.now(); + const result = await requestFn(); + const responseTime = Date.now() - startTime; + + this.recordSuccess(responseTime); + return result; + } catch (error) { + console.warn(`Request failed (attempt ${retryCount + 1}):`, error); + + if (retryCount < this.config.maxRetries) { + this.recordFailure(); + await this.waitForNext(); + return this.executeWithRetry(requestFn, retryCount + 1); + } + + this.recordFailure(); + return null; + } + } + + /** + * Get current state for persistence + */ + getState(): RateLimiterState { + return { ...this.state }; + } + + /** + * Update state from persisted data + */ + setState(state: Partial): void { + this.state = { ...this.state, ...state }; + } + + /** + * Get current batch size (if adaptive batching is enabled) + */ + getCurrentBatchSize(): number { + return this.state.currentBatchSize || this.config.baseBatchSize || 50; + } + + /** + * Get current delay + */ + getCurrentDelay(): number { + return this.state.currentDelay; + } + + /** + * Get failure count + */ + getFailureCount(): number { + return this.state.failedRequests; + } + + private calculateAdaptiveDelay(responseTime: number): number { + const { currentDelay, failedRequests, lastSuccessTime } = this.state; + const timeSinceLastSuccess = Date.now() - lastSuccessTime; + + // Increase delay if we're seeing failures or slow responses + if (failedRequests > 0 || responseTime > 2000) { + return Math.min(currentDelay * this.config.backoffMultiplier, this.config.maxDelay); + } + + // Decrease delay if we're doing well and it's been a while since last success + if (responseTime < 500 && timeSinceLastSuccess > 10000) { + return Math.max(currentDelay * 0.8, this.config.minDelay); + } + + return currentDelay; + } + + private calculateAdaptiveBatchSize(responseTime: number): number { + if (!this.config.adaptiveBatchSize || this.state.currentBatchSize === undefined) { + return this.state.currentBatchSize || this.config.baseBatchSize || 50; + } + + const { currentBatchSize, failedRequests } = this.state; + + // Reduce batch size if we're seeing failures or slow responses + if (failedRequests > 2 || responseTime > 3000) { + return Math.max( + Math.floor(currentBatchSize * 0.7), + this.config.minBatchSize || 10 + ); + } + + // Increase batch size if we're doing well + if (failedRequests === 0 && responseTime < 1000) { + return Math.min( + Math.floor(currentBatchSize * 1.2), + this.config.maxBatchSize || 100 + ); + } + + return currentBatchSize; + } +} + +/** + * Predefined rate limiter configurations for different job types + */ +export const RATE_LIMITER_PRESETS = { + MESSAGES: { + minDelay: 50, + maxDelay: 5000, + baseDelay: 200, + backoffMultiplier: 1.5, + maxRetries: 3, + adaptiveBatchSize: true, + minBatchSize: 10, + maxBatchSize: 100, + baseBatchSize: 50, + } as RateLimiterConfig, + + NOTIFICATIONS: { + minDelay: 100, + maxDelay: 3000, + baseDelay: 150, + backoffMultiplier: 1.4, + maxRetries: 2, + adaptiveBatchSize: false, + } as RateLimiterConfig, + + FORUMS: { + minDelay: 75, + maxDelay: 2000, + baseDelay: 100, + backoffMultiplier: 1.3, + maxRetries: 2, + adaptiveBatchSize: true, + minBatchSize: 5, + maxBatchSize: 50, + baseBatchSize: 25, + } as RateLimiterConfig, + + SUBJECTS: { + minDelay: 50, + maxDelay: 1000, + baseDelay: 75, + backoffMultiplier: 1.2, + maxRetries: 1, + adaptiveBatchSize: false, + } as RateLimiterConfig, +}; \ No newline at end of file