diff --git a/src/interface/components/HotkeyInput.svelte b/src/interface/components/HotkeyInput.svelte
index f575259c..11322d7b 100644
--- a/src/interface/components/HotkeyInput.svelte
+++ b/src/interface/components/HotkeyInput.svelte
@@ -171,15 +171,13 @@
{#if isRecording}
- {#if hotkeyParts.length === 0}
- Press keys...
- {/if}
+ Press keys...
{:else if hotkeyParts.length > 0}
diff --git a/src/plugins/built-in/globalSearch/src/core/index.ts b/src/plugins/built-in/globalSearch/src/core/index.ts
index ab04a008..71ed2369 100644
--- a/src/plugins/built-in/globalSearch/src/core/index.ts
+++ b/src/plugins/built-in/globalSearch/src/core/index.ts
@@ -55,8 +55,7 @@ const settings = defineSettings({
req.onsuccess = () => resolve();
req.onerror = () => reject(req.error);
req.onblocked = () => {
- alert(`Please close all other tabs using this app to reset the database: ${dbName}`);
- reject(new Error('Delete blocked'));
+ reject(new Error(`One database is open, failed to remove: ${dbName}`));
};
});
};
diff --git a/src/plugins/built-in/globalSearch/src/indexing/jobs/messages.ts b/src/plugins/built-in/globalSearch/src/indexing/jobs/messages.ts
index 75da72e7..4a3ac355 100644
--- a/src/plugins/built-in/globalSearch/src/indexing/jobs/messages.ts
+++ b/src/plugins/built-in/globalSearch/src/indexing/jobs/messages.ts
@@ -1,5 +1,35 @@
import type { Job, IndexItem } from "../types";
import { htmlToPlainText } from "../utils";
+import { delay } from "@/seqta/utils/delay";
+import { VectorWorkerManager } from "../worker/vectorWorkerManager";
+
+const RATE_LIMIT_CONFIG = {
+ minDelay: 50,
+ maxDelay: 5000,
+ baseDelay: 200,
+ backoffMultiplier: 1.5,
+ maxRetries: 3,
+ adaptiveBatchSize: true,
+ minBatchSize: 10,
+ maxBatchSize: 100,
+ baseBatchSize: 50,
+ vectorBatchSize: 5,
+ parallelRequests: 5,
+ parallelDelay: 100,
+};
+
+interface MessagesProgress {
+ offset: number;
+ done: boolean;
+ currentBatchSize: number;
+ currentDelay: number;
+ failedRequests: number;
+ lastSuccessTime: number;
+ retryQueue: number[];
+ processedIds: string[];
+ streamingStarted: boolean;
+ totalEstimated: number;
+}
const fetchMessages = async (offset = 0, limit = 100) => {
const res = await fetch(`${location.origin}/seqta/student/load/message`, {
@@ -23,22 +53,235 @@ const fetchMessages = async (offset = 0, limit = 100) => {
}>;
};
-export const fetchMessageContent = async (id: number) => {
- const res = await fetch(`${location.origin}/seqta/student/load/message`, {
- method: "POST",
- credentials: "include",
- headers: { "Content-Type": "application/json; charset=utf-8" },
- body: JSON.stringify({ action: "message", id }),
- });
- return res.json() as Promise<{
- payload: { contents: string };
- status: string;
- }>;
+export const fetchMessageContent = async (
+ id: number,
+ retryCount = 0,
+): Promise<{
+ payload: { contents: string };
+ status: string;
+} | null> => {
+ try {
+ const res = await fetch(`${location.origin}/seqta/student/load/message`, {
+ method: "POST",
+ credentials: "include",
+ headers: { "Content-Type": "application/json; charset=utf-8" },
+ body: JSON.stringify({ action: "message", id }),
+ });
+
+ if (!res.ok) {
+ throw new Error(`HTTP ${res.status}: ${res.statusText}`);
+ }
+
+ return await res.json();
+ } catch (error) {
+ console.warn(
+ `[Messages job] Failed to fetch content for message ${id} (attempt ${retryCount + 1}):`,
+ error,
+ );
+
+ if (retryCount < RATE_LIMIT_CONFIG.maxRetries) {
+ const retryDelay =
+ RATE_LIMIT_CONFIG.baseDelay *
+ Math.pow(RATE_LIMIT_CONFIG.backoffMultiplier, retryCount);
+ await delay(Math.min(retryDelay, RATE_LIMIT_CONFIG.maxDelay));
+ return fetchMessageContent(id, retryCount + 1);
+ }
+
+ return null;
+ }
};
-interface MessagesProgress {
- offset: number;
- done: boolean;
+function calculateAdaptiveDelay(
+ progress: MessagesProgress,
+ responseTime: number,
+): number {
+ const { currentDelay, failedRequests, lastSuccessTime } = progress;
+ const timeSinceLastSuccess = Date.now() - lastSuccessTime;
+
+ if (failedRequests > 0 || responseTime > 2000) {
+ return Math.min(
+ currentDelay * RATE_LIMIT_CONFIG.backoffMultiplier,
+ RATE_LIMIT_CONFIG.maxDelay,
+ );
+ }
+
+ if (responseTime < 500 && timeSinceLastSuccess > 10000) {
+ return Math.max(currentDelay * 0.8, RATE_LIMIT_CONFIG.minDelay);
+ }
+
+ return currentDelay;
+}
+
+function calculateAdaptiveBatchSize(
+ progress: MessagesProgress,
+ responseTime: number,
+): number {
+ if (!RATE_LIMIT_CONFIG.adaptiveBatchSize) {
+ return progress.currentBatchSize;
+ }
+
+ const { currentBatchSize, failedRequests } = progress;
+
+ if (failedRequests > 2 || responseTime > 3000) {
+ return Math.max(
+ Math.floor(currentBatchSize * 0.7),
+ RATE_LIMIT_CONFIG.minBatchSize,
+ );
+ }
+
+ if (failedRequests === 0 && responseTime < 1000) {
+ return Math.min(
+ Math.floor(currentBatchSize * 1.2),
+ RATE_LIMIT_CONFIG.maxBatchSize,
+ );
+ }
+
+ return currentBatchSize;
+}
+
+async function estimateMessageCount(): Promise {
+ try {
+ const firstBatch = await fetchMessages(0, 100);
+ if (firstBatch.status !== "200" || !firstBatch.payload.hasMore) {
+ return firstBatch.payload.messages.length;
+ }
+
+ return Math.min(firstBatch.payload.messages.length * 20, 2000);
+ } catch (error) {
+ console.warn("[Messages job] Failed to estimate message count:", error);
+ return 500;
+ }
+}
+
+async function processMessagesInParallel(
+ messages: any[],
+ existingIds: Set,
+ processedIdsSet: Set,
+ progress: MessagesProgress,
+ ctx: any,
+): Promise<{
+ processedItems: IndexItem[];
+ consecutiveExisting: number;
+ updatedProgress: MessagesProgress;
+}> {
+ const processedItems: IndexItem[] = [];
+ let consecutiveExisting = 0;
+ const updatedProgress = { ...progress };
+
+ const messagesToProcess = messages.filter((msg) => {
+ const id = msg.id.toString();
+ if (existingIds.has(id) || processedIdsSet.has(id)) {
+ consecutiveExisting++;
+ return false;
+ }
+ consecutiveExisting = 0;
+ return true;
+ });
+
+ if (messagesToProcess.length === 0) {
+ return { processedItems, consecutiveExisting, updatedProgress };
+ }
+
+ for (
+ let i = 0;
+ i < messagesToProcess.length;
+ i += RATE_LIMIT_CONFIG.parallelRequests
+ ) {
+ const batch = messagesToProcess.slice(
+ i,
+ i + RATE_LIMIT_CONFIG.parallelRequests,
+ );
+
+ if (i > 0) {
+ await delay(
+ Math.max(updatedProgress.currentDelay, RATE_LIMIT_CONFIG.parallelDelay),
+ );
+ }
+
+ const batchStartTime = Date.now();
+
+ const batchPromises = batch.map(async (msg) => {
+ const id = msg.id.toString();
+
+ try {
+ const full = await fetchMessageContent(msg.id);
+ const responseTime = Date.now() - batchStartTime;
+
+ if (full && full.status === "200") {
+ const item: IndexItem = {
+ id,
+ text: msg.subject,
+ category: "messages",
+ content: `${htmlToPlainText(full.payload.contents)}\nFrom: ${msg.sender}`,
+ dateAdded: new Date(msg.date).getTime(),
+ metadata: {
+ messageId: msg.id,
+ author: msg.sender,
+ senderId: msg.sender_id,
+ senderType: msg.sender_type,
+ timestamp: msg.date,
+ hasAttachments: msg.attachments,
+ attachmentCount: msg.attachmentCount,
+ read: msg.read === 1,
+ },
+ actionId: "message",
+ renderComponentId: "message",
+ };
+
+ return { success: true, item, id, responseTime };
+ } else {
+ return { success: false, id, messageId: msg.id, responseTime };
+ }
+ } catch (error) {
+ console.error(`[Messages job] content fetch failed (id ${id}):`, error);
+ return { success: false, id, messageId: msg.id, error };
+ }
+ });
+
+ const batchResults = await Promise.all(batchPromises);
+ const batchResponseTime = Date.now() - batchStartTime;
+
+ let batchSuccesses = 0;
+ let batchFailures = 0;
+
+ for (const result of batchResults) {
+ if (result.success && result.item) {
+ await ctx.addItem(result.item);
+ existingIds.add(result.id);
+ processedIdsSet.add(result.id);
+ processedItems.push(result.item);
+ batchSuccesses++;
+ } else {
+ if (updatedProgress.retryQueue.length < 50 && result.messageId) {
+ updatedProgress.retryQueue.push(result.messageId);
+ }
+ batchFailures++;
+ }
+ }
+
+ if (batchSuccesses > 0) {
+ updatedProgress.lastSuccessTime = Date.now();
+ updatedProgress.failedRequests = Math.max(
+ 0,
+ updatedProgress.failedRequests - batchSuccesses,
+ );
+ }
+
+ if (batchFailures > 0) {
+ updatedProgress.failedRequests += batchFailures;
+ }
+
+ updatedProgress.currentDelay = calculateAdaptiveDelay(
+ updatedProgress,
+ batchResponseTime,
+ );
+
+ console.log(
+ `[Messages job] Processed parallel batch: ${batchSuccesses} successes, ${batchFailures} failures, ${batchResponseTime}ms total time`,
+ );
+ }
+
+ return { processedItems, consecutiveExisting, updatedProgress };
}
export const messagesJob: Job = {
@@ -48,79 +291,265 @@ export const messagesJob: Job = {
frequency: { type: "expiry", afterMs: 1000 * 60 * 60 * 24 },
run: async (ctx) => {
- const limit = 100;
const progress = (await ctx.getProgress()) ?? {
offset: 0,
done: false,
+ currentBatchSize: RATE_LIMIT_CONFIG.baseBatchSize,
+ currentDelay: RATE_LIMIT_CONFIG.baseDelay,
+ failedRequests: 0,
+ lastSuccessTime: Date.now(),
+ retryQueue: [],
+ processedIds: [],
+ streamingStarted: false,
+ totalEstimated: 0,
};
const existingIds = new Set((await ctx.getStoredItems()).map((i) => i.id));
+ const processedIdsSet = new Set(progress.processedIds);
+
+ existingIds.forEach((id) => processedIdsSet.add(id));
+
+ const vectorWorker = VectorWorkerManager.getInstance();
+ if (!progress.streamingStarted) {
+ progress.totalEstimated = await estimateMessageCount();
+
+ try {
+ await vectorWorker.startStreamingSession(
+ progress.totalEstimated,
+ (progressData) => {
+ console.log(
+ `[Messages job] Vector streaming progress: ${progressData.processed}/${progressData.total} (${progressData.status})`,
+ );
+ },
+ RATE_LIMIT_CONFIG.vectorBatchSize,
+ );
+ progress.streamingStarted = true;
+ console.log(
+ `[Messages job] Started streaming vectorization session for ~${progress.totalEstimated} items`,
+ );
+ } catch (error) {
+ console.warn(
+ "[Messages job] Failed to start streaming session:",
+ error,
+ );
+ }
+ }
+
let consecutiveExisting = 0;
+ let requestStartTime = 0;
+ let progressUpdateCounter = 0;
+ let itemsStreamedToVector = 0;
+
+ if (progress.retryQueue.length > 0) {
+ console.log(
+ `[Messages job] Processing ${Math.min(progress.retryQueue.length, 10)} items from retry queue`,
+ );
+
+ const retryBatch = progress.retryQueue.slice(0, 10);
+ const retryBatches = [];
+
+ for (
+ let i = 0;
+ i < retryBatch.length;
+ i += RATE_LIMIT_CONFIG.parallelRequests
+ ) {
+ retryBatches.push(
+ retryBatch.slice(i, i + RATE_LIMIT_CONFIG.parallelRequests),
+ );
+ }
+
+ for (const batch of retryBatches) {
+ await delay(progress.currentDelay);
+ const batchStartTime = Date.now();
+
+ const retryPromises = batch.map(async (messageId) => {
+ const id = messageId.toString();
+
+ if (processedIdsSet.has(id)) {
+ return { success: true, messageId, alreadyProcessed: true };
+ }
+
+ try {
+ const full = await fetchMessageContent(messageId);
+ const responseTime = Date.now() - batchStartTime;
+
+ if (full && full.status === "200") {
+ return { success: true, messageId, responseTime };
+ } else {
+ return { success: false, messageId, responseTime };
+ }
+ } catch (error) {
+ console.error(
+ `[Messages job] Retry failed for message ${messageId}:`,
+ error,
+ );
+ return { success: false, messageId, error };
+ }
+ });
+
+ const retryResults = await Promise.all(retryPromises);
+ const batchResponseTime = Date.now() - batchStartTime;
+
+ let retrySuccesses = 0;
+ let retryFailures = 0;
+
+ for (const result of retryResults) {
+ if (result.success) {
+ if (!result.alreadyProcessed) {
+ processedIdsSet.add(result.messageId.toString());
+ retrySuccesses++;
+ }
+ progress.retryQueue = progress.retryQueue.filter(
+ (mid) => mid !== result.messageId,
+ );
+ } else {
+ retryFailures++;
+ }
+ }
+
+ if (retrySuccesses > 0) {
+ progress.lastSuccessTime = Date.now();
+ progress.failedRequests = Math.max(
+ 0,
+ progress.failedRequests - retrySuccesses,
+ );
+ }
+
+ if (retryFailures > 0) {
+ progress.failedRequests += retryFailures;
+ }
+
+ progress.currentDelay = calculateAdaptiveDelay(
+ progress,
+ batchResponseTime,
+ );
+
+ console.log(
+ `[Messages job] Processed retry batch: ${retrySuccesses} successes, ${retryFailures} failures`,
+ );
+ }
+ }
while (!progress.done) {
+ await delay(progress.currentDelay);
+ requestStartTime = Date.now();
+
let list;
try {
- list = await fetchMessages(progress.offset, limit);
+ list = await fetchMessages(progress.offset, progress.currentBatchSize);
+ const responseTime = Date.now() - requestStartTime;
+
+ progress.currentDelay = calculateAdaptiveDelay(progress, responseTime);
+ progress.currentBatchSize = calculateAdaptiveBatchSize(
+ progress,
+ responseTime,
+ );
} catch (e) {
console.error("[Messages job] list fetch failed:", e);
+ progress.failedRequests++;
+ progress.currentDelay = Math.min(
+ progress.currentDelay * RATE_LIMIT_CONFIG.backoffMultiplier,
+ RATE_LIMIT_CONFIG.maxDelay,
+ );
+
+ progress.processedIds = Array.from(processedIdsSet);
+ await ctx.setProgress(progress);
break;
}
- if (list.status !== "200") break;
+ if (list.status !== "200") {
+ progress.failedRequests++;
- for (const msg of list.payload.messages) {
- const id = msg.id.toString();
+ progress.processedIds = Array.from(processedIdsSet);
+ await ctx.setProgress(progress);
+ break;
+ }
- if (existingIds.has(id)) {
- consecutiveExisting += 1;
- if (consecutiveExisting >= 20) {
- progress.done = true;
- break;
- }
- continue;
- }
- consecutiveExisting = 0;
+ const itemsToStream: IndexItem[] = [];
- let full;
+ const {
+ processedItems,
+ consecutiveExisting: newConsecutiveExisting,
+ updatedProgress,
+ } = await processMessagesInParallel(
+ list.payload.messages,
+ existingIds,
+ processedIdsSet,
+ progress,
+ ctx,
+ );
+
+ progress.currentDelay = updatedProgress.currentDelay;
+ progress.failedRequests = updatedProgress.failedRequests;
+ progress.lastSuccessTime = updatedProgress.lastSuccessTime;
+ progress.retryQueue = updatedProgress.retryQueue;
+
+ itemsToStream.push(...processedItems);
+
+ consecutiveExisting = newConsecutiveExisting;
+ if (consecutiveExisting >= 20) {
+ progress.done = true;
+ }
+
+ if (itemsToStream.length > 0 && progress.streamingStarted) {
try {
- full = await fetchMessageContent(msg.id);
- } catch (e) {
- console.error(`[Messages job] content fetch failed (id ${id}):`, e);
- continue;
+ await vectorWorker.streamItems(itemsToStream);
+ itemsStreamedToVector += itemsToStream.length;
+ console.log(
+ `[Messages job] Streamed ${itemsToStream.length} items to vector worker (total: ${itemsStreamedToVector})`,
+ );
+ } catch (error) {
+ console.warn(
+ "[Messages job] Failed to stream items to vector worker:",
+ error,
+ );
}
- if (full.status !== "200") continue;
-
- const item: IndexItem = {
- id,
- text: msg.subject,
- category: "messages",
- content: `${htmlToPlainText(full.payload.contents)}\nFrom: ${msg.sender}`,
- dateAdded: new Date(msg.date).getTime(),
- metadata: {
- messageId: msg.id,
- author: msg.sender,
- senderId: msg.sender_id,
- senderType: msg.sender_type,
- timestamp: msg.date,
- hasAttachments: msg.attachments,
- attachmentCount: msg.attachmentCount,
- read: msg.read === 1,
- },
- actionId: "message",
- renderComponentId: "message",
- };
-
- await ctx.addItem(item);
- existingIds.add(id);
}
if (!list.payload.hasMore) progress.done = true;
- progress.offset += limit;
- await ctx.setProgress(progress);
+ progress.offset += progress.currentBatchSize;
+
+ progressUpdateCounter++;
+ if (progressUpdateCounter >= 10 || progress.done) {
+ progress.processedIds = Array.from(processedIdsSet);
+ await ctx.setProgress(progress);
+ progressUpdateCounter = 0;
+
+ console.log(
+ `[Messages job] Progress: offset=${progress.offset}, batchSize=${progress.currentBatchSize}, delay=${progress.currentDelay}ms, failures=${progress.failedRequests}, retryQueue=${progress.retryQueue.length}, vectorStreamed=${itemsStreamedToVector}, parallelRequests=${RATE_LIMIT_CONFIG.parallelRequests}`,
+ );
+ }
}
- if (progress.done) await ctx.setProgress({ offset: 0, done: false });
+ if (progress.streamingStarted) {
+ try {
+ await vectorWorker.endStreamingSession();
+ console.log(
+ `[Messages job] Ended streaming session. Total items streamed: ${itemsStreamedToVector}`,
+ );
+ } catch (error) {
+ console.warn("[Messages job] Failed to end streaming session:", error);
+ }
+ }
+
+ if (progress.done) {
+ await ctx.setProgress({
+ offset: 0,
+ done: false,
+ currentBatchSize: RATE_LIMIT_CONFIG.baseBatchSize,
+ currentDelay: RATE_LIMIT_CONFIG.baseDelay,
+ failedRequests: 0,
+ lastSuccessTime: Date.now(),
+ retryQueue: progress.retryQueue.slice(0, 20),
+ processedIds: [],
+ streamingStarted: false,
+ totalEstimated: 0,
+ });
+ } else {
+ progress.processedIds = Array.from(processedIdsSet);
+ await ctx.setProgress(progress);
+ }
return [];
},
diff --git a/src/plugins/built-in/globalSearch/src/indexing/jobs/notifications.ts b/src/plugins/built-in/globalSearch/src/indexing/jobs/notifications.ts
index 62492a96..be3e4185 100644
--- a/src/plugins/built-in/globalSearch/src/indexing/jobs/notifications.ts
+++ b/src/plugins/built-in/globalSearch/src/indexing/jobs/notifications.ts
@@ -1,8 +1,18 @@
import type { Job, IndexItem } from "../types";
import { htmlToPlainText } from "../utils";
import { fetchMessageContent } from "./messages";
+import { delay } from "@/seqta/utils/delay";
+import { VectorWorkerManager } from "../worker/vectorWorkerManager";
+
+const NOTIFICATIONS_RATE_LIMIT = {
+ baseDelay: 150,
+ maxDelay: 3000,
+ backoffMultiplier: 1.4,
+ maxRetries: 2,
+ batchDelay: 100,
+ vectorBatchSize: 3,
+};
-/* ------------- Notification types ------------- */
interface MessageNotification {
notificationID: number;
type: "message";
@@ -28,10 +38,13 @@ interface AssessmentNotification {
type Notification = MessageNotification | AssessmentNotification;
interface NotificationsProgress {
- lastTs: number; // ms since epoch of last processed notification
+ lastTs: number;
+ failedRequests: number;
+ currentDelay: number;
+ retryQueue: number[];
+ streamingStarted: boolean;
}
-/* ------------- Helpers ------------- */
const fetchNotifications = async () => {
const res = await fetch(`${location.origin}/seqta/student/heartbeat?`, {
method: "POST",
@@ -49,9 +62,9 @@ const fetchAssessmentName = async (
assessmentId: number,
metaclassId: number,
programmeId: number,
+ retryCount = 0,
): Promise => {
const searchAssessment = (data: any): string | null => {
- // Search syllabus
for (const item of data.syllabus || []) {
const found = (item.assessments || []).find(
(a: any) => a.id === assessmentId,
@@ -59,13 +72,11 @@ const fetchAssessmentName = async (
if (found) return found.title;
}
- // Search pending
const foundPending = (data.pending || []).find(
(a: any) => a.id === assessmentId,
);
if (foundPending) return foundPending.title;
- // Search tasks
const foundTask = (data.tasks || []).find(
(a: any) => a.id === assessmentId,
);
@@ -75,38 +86,80 @@ const fetchAssessmentName = async (
};
const fetchAssessments = async (endpoint: string) => {
- const res = await fetch(`${location.origin}${endpoint}`, {
- method: "POST",
- credentials: "include",
- body: JSON.stringify({
- metaclass: metaclassId,
- programme: programmeId,
- }),
- });
- const json = await res.json();
- return json.payload;
+ try {
+ const res = await fetch(`${location.origin}${endpoint}`, {
+ method: "POST",
+ credentials: "include",
+ body: JSON.stringify({
+ metaclass: metaclassId,
+ programme: programmeId,
+ }),
+ });
+
+ if (!res.ok) {
+ throw new Error(`HTTP ${res.status}: ${res.statusText}`);
+ }
+
+ const json = await res.json();
+ return json.payload;
+ } catch (error) {
+ console.warn(
+ `[Notifications job] Failed to fetch assessments from ${endpoint} (attempt ${retryCount + 1}):`,
+ error,
+ );
+
+ if (retryCount < NOTIFICATIONS_RATE_LIMIT.maxRetries) {
+ const retryDelay =
+ NOTIFICATIONS_RATE_LIMIT.baseDelay *
+ Math.pow(NOTIFICATIONS_RATE_LIMIT.backoffMultiplier, retryCount);
+ await delay(Math.min(retryDelay, NOTIFICATIONS_RATE_LIMIT.maxDelay));
+ return fetchAssessments(endpoint);
+ }
+
+ throw error;
+ }
};
- // Try from /past
- let payload = await fetchAssessments("/seqta/student/assessment/list/past");
- let title = searchAssessment(payload);
- if (title) return title;
+ try {
+ let payload = await fetchAssessments("/seqta/student/assessment/list/past");
+ let title = searchAssessment(payload);
+ if (title) return title;
- // Try from /upcoming if not found in /past
- const upcomingPayload = await fetchAssessments(
- "/seqta/student/assessment/list/upcoming",
- );
- const foundUpcoming = (upcomingPayload || []).find(
- (a: any) => a.id === assessmentId,
- );
- if (foundUpcoming) return foundUpcoming.title;
+ await delay(NOTIFICATIONS_RATE_LIMIT.baseDelay);
- throw new Error(
- `Assessment with ID ${assessmentId} not found in past or upcoming.`,
- );
+ const upcomingPayload = await fetchAssessments(
+ "/seqta/student/assessment/list/upcoming",
+ );
+ const foundUpcoming = (upcomingPayload || []).find(
+ (a: any) => a.id === assessmentId,
+ );
+ if (foundUpcoming) return foundUpcoming.title;
+
+ throw new Error(
+ `Assessment with ID ${assessmentId} not found in past or upcoming.`,
+ );
+ } catch (error) {
+ if (retryCount < NOTIFICATIONS_RATE_LIMIT.maxRetries) {
+ const retryDelay =
+ NOTIFICATIONS_RATE_LIMIT.baseDelay *
+ Math.pow(NOTIFICATIONS_RATE_LIMIT.backoffMultiplier, retryCount);
+ await delay(Math.min(retryDelay, NOTIFICATIONS_RATE_LIMIT.maxDelay));
+ return fetchAssessmentName(
+ assessmentId,
+ metaclassId,
+ programmeId,
+ retryCount + 1,
+ );
+ }
+
+ console.error(
+ `[Notifications job] Failed to fetch assessment name for ID ${assessmentId} after ${retryCount + 1} attempts:`,
+ error,
+ );
+ return `Assessment ${assessmentId}`;
+ }
};
-/* ------------- Job ------------- */
export const notificationsJob: Job = {
id: "notifications",
label: "Notifications",
@@ -116,6 +169,10 @@ export const notificationsJob: Job = {
run: async (ctx) => {
const progress = (await ctx.getProgress()) ?? {
lastTs: 0,
+ failedRequests: 0,
+ currentDelay: NOTIFICATIONS_RATE_LIMIT.baseDelay,
+ retryQueue: [],
+ streamingStarted: false,
};
let notifications: Notification[];
@@ -123,72 +180,221 @@ export const notificationsJob: Job = {
notifications = await fetchNotifications();
} catch (e) {
console.error("[Notifications job] fetch failed:", e);
+ progress.failedRequests++;
+ progress.currentDelay = Math.min(
+ progress.currentDelay * NOTIFICATIONS_RATE_LIMIT.backoffMultiplier,
+ NOTIFICATIONS_RATE_LIMIT.maxDelay,
+ );
+ await ctx.setProgress(progress);
return [];
}
+ const vectorWorker = VectorWorkerManager.getInstance();
+ if (!progress.streamingStarted && notifications.length > 0) {
+ const estimatedTotal = Math.min(notifications.length * 1.2, 100);
+
+ try {
+ await vectorWorker.startStreamingSession(
+ estimatedTotal,
+ (progressData) => {
+ console.log(
+ `[Notifications job] Vector streaming progress: ${progressData.processed}/${progressData.total} (${progressData.status})`,
+ );
+ },
+ NOTIFICATIONS_RATE_LIMIT.vectorBatchSize,
+ );
+ progress.streamingStarted = true;
+ console.log(
+ `[Notifications job] Started streaming vectorization session for ~${estimatedTotal} items`,
+ );
+ } catch (error) {
+ console.warn(
+ "[Notifications job] Failed to start streaming session:",
+ error,
+ );
+ }
+ }
+
const notificationIsIndexed = async (id: string): Promise => {
- const [inAssessments, inMessages] = await Promise.all([
- ctx
- .getStoredItems("notifications")
- .then((items) => items.some((i) => i.id === id)),
- ctx
- .getStoredItems("messages")
- .then((items) => items.some((i) => i.id === id)),
- ]);
- return inAssessments || inMessages;
+ try {
+ const [inAssessments, inMessages] = await Promise.all([
+ ctx
+ .getStoredItems("notifications")
+ .then((items) => items.some((i) => i.id === id)),
+ ctx
+ .getStoredItems("messages")
+ .then((items) => items.some((i) => i.id === id)),
+ ]);
+ return inAssessments || inMessages;
+ } catch (error) {
+ console.warn(
+ `[Notifications job] Error checking if notification ${id} is indexed:`,
+ error,
+ );
+ return false;
+ }
};
const items: IndexItem[] = [];
+ const itemsToStream: IndexItem[] = [];
+ let processedCount = 0;
+ let progressUpdateCounter = 0;
+ let itemsStreamedToVector = 0;
- for (const notif of notifications) {
- const id = notif.notificationID.toString();
- if (await notificationIsIndexed(id)) continue;
+ if (progress.retryQueue.length > 0) {
+ console.log(
+ `[Notifications job] Processing ${Math.min(progress.retryQueue.length, 3)} items from retry queue`,
+ );
- if (notif.type === "coneqtassessments") {
- const a = notif.coneqtAssessments;
+ const retryBatch = progress.retryQueue.slice(0, 3);
- const content = await fetchAssessmentName(
- a.assessmentID,
- a.metaclassID,
- a.programmeID,
+ for (const notificationId of retryBatch) {
+ const notification = notifications.find(
+ (n) => n.notificationID === notificationId,
);
- items.push({
- id,
- text: a.title,
- category: "assessments",
- content: content,
- dateAdded: new Date(notif.timestamp).getTime(),
- metadata: {
- assessmentId: a.assessmentID,
- subject: a.subjectCode,
- term: a.term,
- programmeId: a.programmeID,
- metaclassId: a.metaclassID,
- timestamp: notif.timestamp,
- },
- actionId: "assessment",
- renderComponentId: "assessment",
- });
- } else if (notif.type === "message") {
- const content = await fetchMessageContent(notif.message.messageID);
+ if (!notification) {
+ progress.retryQueue = progress.retryQueue.filter(
+ (id) => id !== notificationId,
+ );
+ continue;
+ }
- await ctx.addItem(
- {
- id,
- text: notif.message.title,
- category: "messages",
- content: `${htmlToPlainText(content.payload.contents)}\nFrom: ${notif.message.subtitle}`,
- dateAdded: new Date(notif.timestamp).getTime(),
- metadata: {
- messageId: notif.message.messageID,
- author: notif.message.subtitle,
- timestamp: notif.timestamp,
- isAssessmentNotification: true,
- },
- actionId: "message",
- renderComponentId: "message",
- },
- "messages",
+ await delay(progress.currentDelay);
+
+ try {
+ const { success, item } = await processNotification(
+ notification,
+ ctx,
+ );
+ if (success) {
+ progress.retryQueue = progress.retryQueue.filter(
+ (id) => id !== notificationId,
+ );
+ progress.failedRequests = Math.max(0, progress.failedRequests - 1);
+ progress.currentDelay = Math.max(
+ progress.currentDelay * 0.9,
+ NOTIFICATIONS_RATE_LIMIT.baseDelay,
+ );
+
+ if (item) {
+ items.push(item);
+ itemsToStream.push(item);
+ }
+ }
+ } catch (error) {
+ console.error(
+ `[Notifications job] Retry failed for notification ${notificationId}:`,
+ error,
+ );
+ progress.failedRequests++;
+ }
+ }
+ }
+
+ const notificationsToProcess = notifications.slice(0, 20);
+
+ for (const notif of notificationsToProcess) {
+ const id = notif.notificationID.toString();
+
+ try {
+ if (await notificationIsIndexed(id)) continue;
+ if (progress.retryQueue.includes(notif.notificationID)) continue;
+
+ if (processedCount > 0) {
+ await delay(NOTIFICATIONS_RATE_LIMIT.batchDelay);
+ }
+
+ const { success, item } = await processNotification(
+ notif,
+ ctx,
+ );
+ if (!success) {
+ if (progress.retryQueue.length < 10) {
+ progress.retryQueue.push(notif.notificationID);
+ }
+ progress.failedRequests++;
+ } else {
+ progress.failedRequests = Math.max(0, progress.failedRequests - 1);
+ progress.currentDelay = Math.max(
+ progress.currentDelay * 0.95,
+ NOTIFICATIONS_RATE_LIMIT.baseDelay,
+ );
+
+ if (item) {
+ items.push(item);
+ itemsToStream.push(item);
+ }
+ }
+ } catch (error) {
+ console.error(
+ `[Notifications job] Failed to process notification ${id}:`,
+ error,
+ );
+
+ if (progress.retryQueue.length < 10) {
+ progress.retryQueue.push(notif.notificationID);
+ }
+ progress.failedRequests++;
+ progress.currentDelay = Math.min(
+ progress.currentDelay * NOTIFICATIONS_RATE_LIMIT.backoffMultiplier,
+ NOTIFICATIONS_RATE_LIMIT.maxDelay,
+ );
+ }
+
+ processedCount++;
+
+ if (
+ itemsToStream.length >= NOTIFICATIONS_RATE_LIMIT.vectorBatchSize &&
+ progress.streamingStarted
+ ) {
+ try {
+ await vectorWorker.streamItems([...itemsToStream]);
+ itemsStreamedToVector += itemsToStream.length;
+ console.log(
+ `[Notifications job] Streamed ${itemsToStream.length} items to vector worker (total: ${itemsStreamedToVector})`,
+ );
+ itemsToStream.length = 0;
+ } catch (error) {
+ console.warn(
+ "[Notifications job] Failed to stream items to vector worker:",
+ error,
+ );
+ }
+ }
+
+ progressUpdateCounter++;
+ if (progressUpdateCounter >= 5) {
+ await ctx.setProgress(progress);
+ progressUpdateCounter = 0;
+ }
+ }
+
+ if (itemsToStream.length > 0 && progress.streamingStarted) {
+ try {
+ await vectorWorker.streamItems([...itemsToStream]);
+ itemsStreamedToVector += itemsToStream.length;
+ console.log(
+ `[Notifications job] Streamed final ${itemsToStream.length} items to vector worker (total: ${itemsStreamedToVector})`,
+ );
+ } catch (error) {
+ console.warn(
+ "[Notifications job] Failed to stream final items to vector worker:",
+ error,
+ );
+ }
+ }
+
+ if (progress.streamingStarted) {
+ try {
+ await vectorWorker.endStreamingSession();
+ console.log(
+ `[Notifications job] Ended streaming session. Total items streamed: ${itemsStreamedToVector}`,
+ );
+ progress.streamingStarted = false;
+ } catch (error) {
+ console.warn(
+ "[Notifications job] Failed to end streaming session:",
+ error,
);
}
}
@@ -198,9 +404,14 @@ export const notificationsJob: Job = {
...items.map((i) => i.dateAdded),
progress.lastTs,
);
- await ctx.setProgress({ lastTs: latest });
+ progress.lastTs = latest;
}
+ await ctx.setProgress(progress);
+ console.log(
+ `[Notifications job] Processed ${processedCount} notifications, ${progress.retryQueue.length} in retry queue, ${progress.failedRequests} failures, ${itemsStreamedToVector} items streamed to vector worker`,
+ );
+
return items;
},
@@ -211,3 +422,73 @@ export const notificationsJob: Job = {
return items.filter((i) => i.dateAdded >= date.getTime());
},
};
+
+async function processNotification(
+ notif: Notification,
+ ctx: any,
+): Promise<{ success: boolean; item?: IndexItem }> {
+ const id = notif.notificationID.toString();
+
+ try {
+ if (notif.type === "coneqtassessments") {
+ const a = notif.coneqtAssessments;
+
+ const content = await fetchAssessmentName(
+ a.assessmentID,
+ a.metaclassID,
+ a.programmeID,
+ );
+
+ const item: IndexItem = {
+ id,
+ text: a.title,
+ category: "assessments",
+ content: content,
+ dateAdded: new Date(notif.timestamp).getTime(),
+ metadata: {
+ assessmentId: a.assessmentID,
+ subject: a.subjectCode,
+ term: a.term,
+ programmeId: a.programmeID,
+ metaclassId: a.metaclassID,
+ timestamp: notif.timestamp,
+ },
+ actionId: "assessment",
+ renderComponentId: "assessment",
+ };
+
+ return { success: true, item };
+ } else if (notif.type === "message") {
+ const content = await fetchMessageContent(notif.message.messageID);
+
+ if (content && content.payload) {
+ const item: IndexItem = {
+ id,
+ text: notif.message.title,
+ category: "messages",
+ content: `${htmlToPlainText(content.payload.contents)}\nFrom: ${notif.message.subtitle}`,
+ dateAdded: new Date(notif.timestamp).getTime(),
+ metadata: {
+ messageId: notif.message.messageID,
+ author: notif.message.subtitle,
+ timestamp: notif.timestamp,
+ isAssessmentNotification: true,
+ },
+ actionId: "message",
+ renderComponentId: "message",
+ };
+
+ await ctx.addItem(item, "messages");
+ return { success: true, item };
+ }
+ }
+
+ return { success: false };
+ } catch (error) {
+ console.error(
+ `[Notifications job] Error processing notification ${id}:`,
+ error,
+ );
+ return { success: false };
+ }
+}
diff --git a/src/plugins/built-in/globalSearch/src/indexing/jobs/subjects.ts b/src/plugins/built-in/globalSearch/src/indexing/jobs/subjects.ts
index 85117daa..af3635c8 100755
--- a/src/plugins/built-in/globalSearch/src/indexing/jobs/subjects.ts
+++ b/src/plugins/built-in/globalSearch/src/indexing/jobs/subjects.ts
@@ -16,7 +16,10 @@ export const subjectsJob: Job = {
id: "subjects",
label: "Subjects",
renderComponentId: "subject",
- frequency: "pageLoad",
+ frequency: {
+ type: "expiry",
+ afterMs: 1000 * 60 * 60 * 24 * 30,
+ },
boostCriteria: (item, searchTerm) => {
if (searchTerm == "") {
return -100;
diff --git a/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorker.ts b/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorker.ts
index 26cacbd4..589b6df2 100644
--- a/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorker.ts
+++ b/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorker.ts
@@ -5,6 +5,16 @@ let vectorIndex: EmbeddingIndex | null = null;
let isInitialized = false;
let currentAbortController: AbortController | null = null;
+let streamingSession: {
+ isActive: boolean;
+ totalExpected: number;
+ totalReceived: number;
+ totalProcessed: number;
+ batchSize: number;
+ pendingItems: IndexItem[];
+ processingPromise: Promise | null;
+} | null = null;
+
async function initWorker() {
if (isInitialized) {
console.debug("Vector worker already initialized.");
@@ -28,16 +38,14 @@ async function initWorker() {
console.debug("Vector worker initialized successfully.");
} catch (e) {
console.error("Failed to initialize vector worker:", e);
- // Set as initialized even on error to prevent retries, but index will be null
isInitialized = true;
- vectorIndex = null; // Ensure index is null on error
+ vectorIndex = null;
}
}
async function vectorizeItem(
item: IndexItem,
): Promise<(IndexItem & { embedding: number[] }) | null> {
- // Simplified for brevity - assumes embedding function doesn't need cancellation signal
try {
const textToEmbed = [
item.text,
@@ -53,19 +61,246 @@ async function vectorizeItem(
return { ...item, embedding };
} catch (error) {
console.error(`Error vectorizing item ${item.id}:`, error);
- return null; // Return null if vectorization fails for an item
+ return null;
+ }
+}
+
+async function startStreamingSession(
+ totalExpected: number,
+ batchSize: number = 5,
+) {
+ if (!vectorIndex) {
+ console.warn(
+ "Streaming requested but vector index not ready. Attempting init.",
+ );
+ await initWorker();
+ if (!vectorIndex) {
+ self.postMessage({
+ type: "progress",
+ data: {
+ status: "error",
+ message:
+ "Vector index not available for streaming after init attempt.",
+ },
+ });
+ return;
+ }
+ }
+
+ if (streamingSession?.isActive) {
+ await endStreamingSession();
+ }
+
+ streamingSession = {
+ isActive: true,
+ totalExpected,
+ totalReceived: 0,
+ totalProcessed: 0,
+ batchSize,
+ pendingItems: [],
+ processingPromise: null,
+ };
+
+ console.debug(
+ `Started streaming session for ${totalExpected} items with batch size ${batchSize}`,
+ );
+
+ self.postMessage({
+ type: "streamingProgress",
+ data: {
+ processed: 0,
+ total: totalExpected,
+ message: "Streaming session started",
+ },
+ });
+}
+
+async function processStreamingBatch(
+ items: IndexItem[],
+ isLast: boolean = false,
+) {
+ if (!streamingSession?.isActive) {
+ console.warn("Received streaming batch but no active session");
+ return;
+ }
+
+ streamingSession.totalReceived += items.length;
+ streamingSession.pendingItems.push(...items);
+
+ console.debug(
+ `Received streaming batch: ${items.length} items (${streamingSession.totalReceived}/${streamingSession.totalExpected})`,
+ );
+
+ const shouldProcess =
+ streamingSession.pendingItems.length >= streamingSession.batchSize ||
+ isLast;
+
+ if (shouldProcess && !streamingSession.processingPromise) {
+ streamingSession.processingPromise = processStreamingItems();
+ }
+}
+
+async function processStreamingItems() {
+ if (!streamingSession?.isActive || !vectorIndex) {
+ return;
+ }
+
+ while (
+ streamingSession.pendingItems.length > 0 &&
+ streamingSession.isActive
+ ) {
+ const batchToProcess = streamingSession.pendingItems.splice(
+ 0,
+ streamingSession.batchSize,
+ );
+
+ const unprocessedItems = batchToProcess.filter((item) => {
+ try {
+ return !vectorIndex!.get({ id: item.id });
+ } catch (e) {
+ return true;
+ }
+ });
+
+ if (unprocessedItems.length === 0) {
+ streamingSession.totalProcessed += batchToProcess.length;
+ continue;
+ }
+
+ const vectorizationResults = await Promise.all(
+ unprocessedItems.map(vectorizeItem),
+ );
+ const successfullyVectorized = vectorizationResults.filter(
+ (result) => result !== null,
+ ) as (IndexItem & { embedding: number[] })[];
+
+ if (successfullyVectorized.length > 0) {
+ try {
+ successfullyVectorized.forEach((item) => vectorIndex!.add(item));
+
+ if (
+ streamingSession.totalProcessed % (streamingSession.batchSize * 3) ===
+ 0
+ ) {
+ await vectorIndex!.saveIndex("indexedDB");
+ console.debug(
+ `Saved streaming index at ${streamingSession.totalProcessed} processed items`,
+ );
+ }
+ } catch (e) {
+ console.error("Error processing streaming batch:", e);
+ }
+ }
+
+ streamingSession.totalProcessed += batchToProcess.length;
+
+ self.postMessage({
+ type: "streamingProgress",
+ data: {
+ processed: streamingSession.totalProcessed,
+ total: streamingSession.totalExpected,
+ message: `Processed ${streamingSession.totalProcessed}/${streamingSession.totalExpected} items`,
+ },
+ });
+
+ await new Promise((resolve) => setTimeout(resolve, 10));
+ }
+
+ streamingSession.processingPromise = null;
+
+ if (
+ streamingSession.totalReceived >= streamingSession.totalExpected &&
+ streamingSession.pendingItems.length === 0
+ ) {
+ await finalizeStreamingSession();
+ }
+}
+
+async function finalizeStreamingSession() {
+ if (!streamingSession?.isActive) {
+ return;
+ }
+
+ try {
+ if (vectorIndex) {
+ await vectorIndex.saveIndex("indexedDB");
+ console.debug("Final save of streaming index completed");
+ }
+ } catch (e) {
+ console.error("Error in final streaming save:", e);
+ }
+
+ const totalProcessed = streamingSession.totalProcessed;
+ const totalExpected = streamingSession.totalExpected;
+
+ streamingSession.isActive = false;
+
+ self.postMessage({
+ type: "progress",
+ data: {
+ status: "complete",
+ total: totalExpected,
+ processed: totalProcessed,
+ message: `Streaming vectorization complete: ${totalProcessed}/${totalExpected} items processed`,
+ },
+ });
+
+ console.debug(
+ `Streaming session completed: ${totalProcessed}/${totalExpected} items processed`,
+ );
+}
+
+async function endStreamingSession() {
+ if (!streamingSession?.isActive) {
+ return;
+ }
+
+ console.debug("Ending streaming session...");
+
+ if (streamingSession.processingPromise) {
+ await streamingSession.processingPromise;
+ }
+
+ if (streamingSession.pendingItems.length > 0) {
+ console.debug(
+ `Processing ${streamingSession.pendingItems.length} remaining items before ending session`,
+ );
+ streamingSession.processingPromise = processStreamingItems();
+ await streamingSession.processingPromise;
+ }
+
+ try {
+ if (vectorIndex) {
+ await vectorIndex.saveIndex("indexedDB");
+ console.debug("Final save before ending streaming session");
+ }
+ } catch (e) {
+ console.error("Error in final save before ending session:", e);
+ }
+
+ const wasActive = streamingSession.isActive;
+ streamingSession.isActive = false;
+
+ if (wasActive) {
+ self.postMessage({
+ type: "progress",
+ data: {
+ status: "cancelled",
+ message: "Streaming session ended early",
+ },
+ });
}
}
async function processItems(items: IndexItem[], signal: AbortSignal) {
console.debug("Worker received process request.");
+
if (!vectorIndex) {
console.warn(
"Processing requested but vector index not ready. Attempting init.",
);
- await initWorker(); // Attempt initialization if not ready
+ await initWorker();
if (!vectorIndex) {
- // Check again after attempt
self.postMessage({
type: "progress",
data: {
@@ -78,13 +313,11 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
}
}
- // Find items we haven't processed yet by checking against the index instance
const unprocessedItems = items.filter((item) => {
- if (signal.aborted) return false; // Check cancellation during filtering
+ if (signal.aborted) return false;
try {
return !vectorIndex!.get({ id: item.id });
} catch (e) {
- // If get throws (e.g., item not found), it means it's unprocessed
return true;
}
});
@@ -136,7 +369,6 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
}
const batch = unprocessedItems.slice(i, i + BATCH_SIZE);
- // Vectorize batch
const vectorizationResults = await Promise.all(batch.map(vectorizeItem));
const successfullyVectorized = vectorizationResults.filter(
(result) => result !== null,
@@ -154,7 +386,6 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
return;
}
- // Add successfully vectorized items to index
if (successfullyVectorized.length > 0) {
try {
successfullyVectorized.forEach((item) => vectorIndex!.add(item));
@@ -164,8 +395,6 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
type: "progress",
data: { status: "error", message: `Error adding to index: ${e}` },
});
- // Decide whether to continue or stop on error
- // return; // Example: Stop processing if adding fails
}
}
@@ -181,7 +410,6 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
return;
}
- // Save index after processing the batch
try {
await vectorIndex!.saveIndex("indexedDB");
console.debug(`Saved index after processing batch ${i / BATCH_SIZE + 1}`);
@@ -191,13 +419,10 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
type: "progress",
data: { status: "error", message: `Error saving index batch: ${e}` },
});
- // Continue processing next batch even if saving failed? Or stop?
- // return; // Example: Stop if saving fails
}
processedCount = Math.min(i + BATCH_SIZE, unprocessedItems.length);
- // Report progress
self.postMessage({
type: "progress",
data: {
@@ -207,7 +432,6 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
},
});
- // Yield control briefly to allow other messages (like cancellation) to be processed
await new Promise((resolve) => setTimeout(resolve, 0));
}
@@ -219,196 +443,49 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
});
} else {
console.debug("Processing completed, but was cancelled.");
- // No need to send 'cancelled' again if already sent during batching
- // self.postMessage({ type: 'progress', data: { status: 'cancelled', message: 'Processing finished but was cancelled' }});
}
}
-async function search(
- query: string,
- topK: number,
- signal: AbortSignal,
- messageId: string,
-) {
- console.debug(
- `Worker received search request (ID: ${messageId}): "${query}"`,
- );
- if (!vectorIndex) {
- console.warn(
- `Search (ID: ${messageId}) requested but vector index not ready. Attempting init.`,
- );
- await initWorker(); // Attempt initialization
- // Re-check after waiting/init attempt
- if (!vectorIndex) {
- console.error(
- `Search (ID: ${messageId}) failed: Vector index unavailable after init attempt.`,
- );
- self.postMessage({
- type: "searchError",
- data: { messageId, error: "Vector index not available." },
- });
- return;
- }
- console.debug(
- `Vector index ready after init for search (ID: ${messageId}).`,
- );
- }
-
- if (signal.aborted) {
- console.debug(`Search (ID: ${messageId}) cancelled before starting.`);
- self.postMessage({ type: "searchCancelled", data: { messageId } });
- return;
- }
-
- try {
- console.debug(`Getting embedding for query (ID: ${messageId})...`);
- const queryEmbedding = await getEmbedding(query);
-
- if (signal.aborted) {
- console.debug(`Search (ID: ${messageId}) cancelled after embedding.`);
- self.postMessage({ type: "searchCancelled", data: { messageId } });
- return;
- }
-
- console.debug(`Performing vector search (ID: ${messageId})...`);
- // Await the search and let TypeScript infer the type
- const results = await vectorIndex!.search(queryEmbedding, {
- topK,
- useStorage: "indexedDB", // Ensure we search the stored index
- });
-
- console.debug(
- `Vector search (ID: ${messageId}) completed with ${results.length} results.`,
- );
-
- if (signal.aborted) {
- console.debug(
- `Search (ID: ${messageId}) cancelled after search completed, discarding results.`,
- );
- self.postMessage({ type: "searchCancelled", data: { messageId } });
- return;
- }
-
- // Post results back to the main thread
- self.postMessage({ type: "searchResults", data: { messageId, results } });
- } catch (error) {
- console.error(`Vector search error in worker (ID: ${messageId}):`, error);
- // Ensure signal isn't checked *after* an error occurred before posting error message
- if (!signal.aborted) {
- // Only post error if not cancelled
- self.postMessage({
- type: "searchError",
- data: {
- messageId,
- error: error instanceof Error ? error.message : String(error),
- },
- });
- } else {
- console.debug(
- `Search (ID: ${messageId}) encountered error but was cancelled, suppressing error message.`,
- );
- self.postMessage({ type: "searchCancelled", data: { messageId } }); // Still notify of cancellation
- }
- }
-}
-
-// Handle messages from the main thread
self.addEventListener("message", async (e) => {
- // Make sure data and type exist
- if (!e.data || !e.data.type) {
- console.warn("Worker received message with no data or type.");
- return;
- }
-
- const { type, data, messageId } = e.data; // messageId used for requests needing response/cancellation tracking
-
- // Cancel previous long-running operation (process or search) if a new one starts
- if (type === "process" || type === "search") {
- if (currentAbortController) {
- console.debug(
- `Worker cancelling previous operation due to new '${type}' request.`,
- );
- currentAbortController.abort(`New '${type}' operation requested`);
- }
- currentAbortController = new AbortController();
- console.debug(`Worker starting new '${type}' operation.`);
- }
-
- // Use the signal from the *current* controller for the task being started
- const signal = currentAbortController?.signal;
+ const { type, data } = e.data;
switch (type) {
- case "process":
- if (signal && data?.items) {
- await processItems(data.items, signal);
- } else if (!signal) {
- console.error(
- "Process message received but no abort signal available.",
- );
- } else if (!data?.items) {
- console.error("Process message received without 'items' data.");
- self.postMessage({
- type: "progress",
- data: {
- status: "error",
- message: "Process command received without items.",
- },
- });
- }
- break;
-
- case "search":
- if (signal && messageId && typeof data?.query === "string") {
- await search(data.query, data.topK ?? 10, signal, messageId);
- } else {
- const errorReason = !signal
- ? "Missing signal"
- : !messageId
- ? "Missing messageId"
- : "Missing or invalid query";
- console.error(`Search message received invalid: ${errorReason}.`, {
- data,
- messageId,
- signalExists: !!signal,
- });
- // Send an error back if messageId exists
- if (messageId) {
- self.postMessage({
- type: "searchError",
- data: { messageId, error: `Worker internal error: ${errorReason}` },
- });
- }
- }
- break;
-
case "init":
- // Init should not be cancellable in the same way, it's foundational
- // Check if already initialized before potentially running it again
- if (!isInitialized) {
- await initWorker();
- self.postMessage({ type: "ready" }); // Signal ready *after* init attempt
- } else {
- console.debug("Received init message, but worker already initialized.");
- self.postMessage({ type: "ready" }); // Signal ready anyway
- }
+ await initWorker();
+ self.postMessage({ type: "ready" });
break;
- // No explicit 'cancel' case needed as new tasks auto-cancel previous ones
+ case "process":
+ if (currentAbortController) {
+ currentAbortController.abort();
+ }
+ currentAbortController = new AbortController();
+ await processItems(data.items, currentAbortController.signal);
+ break;
+
+ case "startStreaming":
+ await startStreamingSession(data.totalExpected, data.batchSize);
+ break;
+
+ case "streamBatch":
+ await processStreamingBatch(data.items, data.isLast);
+ break;
+
+ case "endStreaming":
+ await endStreamingSession();
+ break;
default:
- console.warn("Unknown message type received by vector worker:", type);
+ console.warn("Unknown message type:", type);
}
});
-// Initial check or trigger for initialization when the worker starts
initWorker()
.then(() => {
self.postMessage({ type: "ready" });
})
.catch((err) => {
console.error("Initial worker initialization failed:", err);
- // Still need to signal readiness, perhaps with an error state?
- // Or rely on the first 'process' or 'search' to retry init.
- // For now, just signal ready, but the index might be null.
+
self.postMessage({ type: "ready" });
});
diff --git a/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorkerManager.ts b/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorkerManager.ts
index efcc9cfd..02736ecf 100644
--- a/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorkerManager.ts
+++ b/src/plugins/built-in/globalSearch/src/indexing/worker/vectorWorkerManager.ts
@@ -1,7 +1,6 @@
import { refreshVectorCache } from "../../search/vector/vectorSearch";
import type { IndexItem } from "../types";
import vectorWorker from "./vectorWorker.ts?inlineWorker";
-import type { SearchResult } from "embeddia";
export type ProgressCallback = (data: {
status: "started" | "processing" | "complete" | "error" | "cancelled";
@@ -14,28 +13,19 @@ export class VectorWorkerManager {
private static instance: VectorWorkerManager;
private worker: Worker | null = null;
private isInitialized = false;
- private readyPromise: Promise | null = null; // To await initialization
+ private readyPromise: Promise | null = null;
private progressCallback: ProgressCallback | null = null;
- private searchPromises = new Map<
- string,
- {
- resolve: (value: SearchResult[]) => void;
- reject: (reason?: any) => void;
- timer: NodeJS.Timeout;
- }
- >();
- private debounceTimer: NodeJS.Timeout | null = null;
- private lastSearchParams: {
- query: string;
- topK: number;
- resolve: (results: SearchResult[]) => void;
- reject: (reason?: any) => void;
+
+ private streamingSession: {
+ isActive: boolean;
+ totalExpected: number;
+ totalSent: number;
+ batchBuffer: IndexItem[];
+ batchSize: number;
+ flushTimer: NodeJS.Timeout | null;
} | null = null;
- private constructor() {
- // Start initialization immediately, but allow awaiting it
- this.readyPromise = this.initWorker();
- }
+ private constructor() {}
static getInstance(): VectorWorkerManager {
if (!VectorWorkerManager.instance) {
@@ -45,26 +35,25 @@ export class VectorWorkerManager {
}
private async initWorker(): Promise {
- // If already initialized or initializing, return the existing promise
if (this.isInitialized) return Promise.resolve();
if (this.readyPromise) return this.readyPromise;
+ console.debug("Lazy-loading vector worker...");
+
return new Promise((resolve, reject) => {
- // Create the worker
this.worker = vectorWorker();
console.log("Worker initialized", this.worker);
const timeout = setTimeout(() => {
console.error("Vector worker initialization timed out");
- this.worker?.terminate(); // Clean up worker if it exists
+ this.worker?.terminate();
this.worker = null;
- this.isInitialized = false; // Ensure state reflects failure
- this.readyPromise = null; // Allow retrying init later
+ this.isInitialized = false;
+ this.readyPromise = null;
reject(new Error("Worker initialization timed out"));
- }, 10000); // Increased timeout
+ }, 10000);
- // Set up message handling
this.worker!.addEventListener("message", (e) => {
const { type, data } = e.data;
console.debug("Message from vector worker:", type, data);
@@ -74,7 +63,7 @@ export class VectorWorkerManager {
this.isInitialized = true;
clearTimeout(timeout);
console.debug("Vector worker initialized and ready.");
- resolve(); // Resolve the init promise
+ resolve();
break;
case "progress":
@@ -83,50 +72,23 @@ export class VectorWorkerManager {
if (data.status === "complete") {
refreshVectorCache();
+
+ if (this.streamingSession?.isActive) {
+ this.endStreamingSession();
+ }
}
}
break;
- case "searchResults":
- const searchInfo = this.searchPromises.get(data.messageId);
- if (searchInfo) {
- clearTimeout(searchInfo.timer); // Clear timeout on success
- searchInfo.resolve(data.results);
- this.searchPromises.delete(data.messageId);
- } else {
- console.warn(
- "Received search results for unknown messageId:",
- data.messageId,
- );
- }
- break;
-
- case "searchError":
- const errorInfo = this.searchPromises.get(data.messageId);
- if (errorInfo) {
- clearTimeout(errorInfo.timer); // Clear timeout on error
- errorInfo.reject(new Error(data.error));
- this.searchPromises.delete(data.messageId);
- } else {
- console.warn(
- "Received search error for unknown messageId:",
- data.messageId,
- );
- }
- break;
-
- case "searchCancelled":
- const cancelledInfo = this.searchPromises.get(data.messageId);
- if (cancelledInfo) {
- clearTimeout(cancelledInfo.timer); // Clear timeout on cancel
- // Reject with a specific cancellation error or resolve with empty? Let's reject.
- cancelledInfo.reject(new Error("Search cancelled by worker"));
- this.searchPromises.delete(data.messageId);
- } else {
- console.debug(
- "Received cancellation for unknown messageId:",
- data.messageId,
- );
+ case "streamingProgress":
+ if (this.progressCallback && this.streamingSession?.isActive) {
+ const { processed } = data;
+ this.progressCallback({
+ status: "processing",
+ processed,
+ total: this.streamingSession.totalExpected,
+ message: `Streaming vectorization: ${processed}/${this.streamingSession.totalExpected} items`,
+ });
}
break;
@@ -135,15 +97,12 @@ export class VectorWorkerManager {
}
});
- // Initialize the worker
this.worker!.postMessage({ type: "init" });
});
}
- // Ensures worker is ready before proceeding
private async ensureReady() {
if (!this.readyPromise) {
- // If init wasn't called or failed, try again
console.warn("Worker not initialized, attempting init...");
this.readyPromise = this.initWorker();
}
@@ -155,113 +114,185 @@ export class VectorWorkerManager {
}
}
- async processItems(
- items: IndexItem[],
- onProgress?: ProgressCallback,
- ) {
- await this.ensureReady(); // Wait for worker to be ready
+ async processItems(items: IndexItem[], onProgress?: ProgressCallback) {
+ await this.ensureReady();
this.progressCallback = onProgress || null;
- // Cancel any ongoing search when starting processing
- this.cancelAllSearches("Processing started");
-
console.debug(`Sending ${items.length} items to worker for processing.`);
-
this.worker!.postMessage({
type: "process",
data: { items: items },
});
}
- // Public search method
- public async search(
- query: string,
- topK: number = 10,
- ): Promise {
+ async startStreamingSession(
+ totalExpectedItems: number,
+ onProgress?: ProgressCallback,
+ batchSize: number = 10,
+ ): Promise {
await this.ensureReady();
- return new Promise((resolve, reject) => {
- this.lastSearchParams = { query, topK, resolve, reject };
+ if (this.streamingSession?.isActive) {
+ this.endStreamingSession();
+ }
- const messageId = crypto.randomUUID();
- if (this.lastSearchParams && this.worker) {
- const currentParams = this.lastSearchParams; // Capture current params
- this.lastSearchParams = null; // Clear last params *before* posting
- this.debounceTimer = null;
+ this.progressCallback = onProgress || null;
- // Set a timeout for the search operation itself
- const searchTimeout = 10000; // e.g., 10 seconds
- const searchTimer = setTimeout(() => {
- if (this.searchPromises.has(messageId)) {
- console.error(`Search timed out for messageId: ${messageId}`);
- currentParams.reject(
- new Error(`Search timed out after ${searchTimeout}ms`),
- );
- this.searchPromises.delete(messageId);
- }
- }, searchTimeout);
+ this.streamingSession = {
+ isActive: true,
+ totalExpected: totalExpectedItems,
+ totalSent: 0,
+ batchBuffer: [],
+ batchSize,
+ flushTimer: null,
+ };
- this.searchPromises.set(messageId, {
- resolve: currentParams.resolve,
- reject: currentParams.reject,
- timer: searchTimer,
- });
+ console.debug(
+ `Starting streaming session for ${totalExpectedItems} items with batch size ${batchSize}`,
+ );
- console.debug(
- `Sending search request (ID: ${messageId}) to worker: "${currentParams.query}"`,
- );
- console.log(this.worker);
- this.worker.postMessage({
- type: "search",
- data: { query: currentParams.query, topK: currentParams.topK },
- messageId,
- });
- } else if (this.lastSearchParams) {
- // This case might happen if ensureReady failed but didn't throw
- console.error("Worker unavailable when trying to send search request.");
- this.lastSearchParams.reject(
- new Error("Worker unavailable for search"),
- );
- this.lastSearchParams = null;
- this.debounceTimer = null;
+ this.worker!.postMessage({
+ type: "startStreaming",
+ data: { totalExpected: totalExpectedItems, batchSize },
+ });
+
+ if (this.progressCallback) {
+ this.progressCallback({
+ status: "started",
+ total: totalExpectedItems,
+ processed: 0,
+ message: "Starting streaming vectorization",
+ });
+ }
+ }
+
+ async streamItems(items: IndexItem[]): Promise {
+ if (!this.streamingSession?.isActive) {
+ throw new Error(
+ "No active streaming session. Call startStreamingSession first.",
+ );
+ }
+
+ this.streamingSession.batchBuffer.push(...items);
+
+ if (
+ this.streamingSession.batchBuffer.length >=
+ this.streamingSession.batchSize
+ ) {
+ await this.flushBatch();
+ } else {
+ if (this.streamingSession.flushTimer) {
+ clearTimeout(this.streamingSession.flushTimer);
}
+
+ this.streamingSession.flushTimer = setTimeout(() => {
+ this.flushBatch();
+ }, 1000);
+ }
+ }
+
+ private async flushBatch(): Promise {
+ if (
+ !this.streamingSession?.isActive ||
+ this.streamingSession.batchBuffer.length === 0
+ ) {
+ return;
+ }
+
+ const batch = [...this.streamingSession.batchBuffer];
+ this.streamingSession.batchBuffer = [];
+ this.streamingSession.totalSent += batch.length;
+
+ if (this.streamingSession.flushTimer) {
+ clearTimeout(this.streamingSession.flushTimer);
+ this.streamingSession.flushTimer = null;
+ }
+
+ console.debug(
+ `Streaming batch of ${batch.length} items to worker (${this.streamingSession.totalSent}/${this.streamingSession.totalExpected})`,
+ );
+
+ this.worker!.postMessage({
+ type: "streamBatch",
+ data: {
+ items: batch,
+ isLast:
+ this.streamingSession.totalSent >=
+ this.streamingSession.totalExpected,
+ },
});
}
- // Method to cancel all pending/debounced searches
- private cancelAllSearches(reason: string = "Cancelled") {
- if (this.debounceTimer) {
- clearTimeout(this.debounceTimer);
- this.debounceTimer = null;
- if (this.lastSearchParams) {
- this.lastSearchParams.reject(new Error(`Search cancelled: ${reason}`));
- this.lastSearchParams = null;
- }
+ async endStreamingSession(): Promise {
+ if (!this.streamingSession?.isActive) {
+ return;
}
- // We might also want to tell the worker to cancel its *current* search
- // if it supports it, but this requires worker modification.
- // For now, just reject pending promises in the manager.
- for (const [messageId, promiseInfo] of this.searchPromises.entries()) {
- clearTimeout(promiseInfo.timer);
- promiseInfo.reject(new Error(`Search cancelled: ${reason}`));
- this.searchPromises.delete(messageId);
+
+ await this.flushBatch();
+
+ if (this.streamingSession.flushTimer) {
+ clearTimeout(this.streamingSession.flushTimer);
}
+
+ this.streamingSession.isActive = false;
+
+ this.worker!.postMessage({
+ type: "endStreaming",
+ });
+
+ console.debug("Streaming session ended");
+
+ if (this.progressCallback) {
+ this.progressCallback({
+ status: "complete",
+ total: this.streamingSession.totalExpected,
+ processed: this.streamingSession.totalSent,
+ message: "Streaming vectorization complete",
+ });
+ }
+
+ this.streamingSession = null;
+ }
+
+ async streamItem(item: IndexItem): Promise {
+ return this.streamItems([item]);
+ }
+
+ isStreamingActive(): boolean {
+ return this.streamingSession?.isActive ?? false;
+ }
+
+ getStreamingProgress(): {
+ sent: number;
+ expected: number;
+ buffered: number;
+ } | null {
+ if (!this.streamingSession?.isActive) {
+ return null;
+ }
+
+ return {
+ sent: this.streamingSession.totalSent,
+ expected: this.streamingSession.totalExpected,
+ buffered: this.streamingSession.batchBuffer.length,
+ };
}
terminate() {
console.debug("Terminating Vector Worker Manager...");
- this.cancelAllSearches("Worker terminated"); // Cancel pending searches
+
+ if (this.streamingSession?.isActive) {
+ this.endStreamingSession();
+ }
if (this.worker) {
this.worker.terminate();
this.worker = null;
}
this.isInitialized = false;
- this.readyPromise = null; // Reset init promise
+ this.readyPromise = null;
this.progressCallback = null;
- // Clear the static instance? Or assume app lifecycle handles this?
- // VectorWorkerManager.instance = null; // Uncomment if needed
}
}
diff --git a/src/plugins/built-in/globalSearch/src/utils/rateLimiter.ts b/src/plugins/built-in/globalSearch/src/utils/rateLimiter.ts
new file mode 100644
index 00000000..917270ff
--- /dev/null
+++ b/src/plugins/built-in/globalSearch/src/utils/rateLimiter.ts
@@ -0,0 +1,230 @@
+import { delay } from "@/seqta/utils/delay";
+
+export interface RateLimiterConfig {
+ minDelay: number; // Minimum delay between requests (ms)
+ maxDelay: number; // Maximum delay between requests (ms)
+ baseDelay: number; // Base delay between requests (ms)
+ backoffMultiplier: number; // Exponential backoff multiplier
+ maxRetries: number; // Maximum retries for failed requests
+ adaptiveBatchSize?: boolean; // Enable adaptive batch sizing
+ minBatchSize?: number; // Minimum batch size
+ maxBatchSize?: number; // Maximum batch size
+ baseBatchSize?: number; // Starting batch size
+}
+
+export interface RateLimiterState {
+ currentDelay: number;
+ failedRequests: number;
+ lastSuccessTime: number;
+ currentBatchSize?: number;
+}
+
+export class RateLimiter {
+ private config: RateLimiterConfig;
+ private state: RateLimiterState;
+
+ constructor(config: RateLimiterConfig, initialState?: Partial) {
+ this.config = config;
+ this.state = {
+ currentDelay: config.baseDelay,
+ failedRequests: 0,
+ lastSuccessTime: Date.now(),
+ currentBatchSize: config.baseBatchSize || 50,
+ ...initialState,
+ };
+ }
+
+ /**
+ * Wait for the appropriate delay before making the next request
+ */
+ async waitForNext(): Promise {
+ await delay(this.state.currentDelay);
+ }
+
+ /**
+ * Record a successful request and adjust delays accordingly
+ */
+ recordSuccess(responseTime?: number): void {
+ this.state.lastSuccessTime = Date.now();
+ this.state.failedRequests = Math.max(0, this.state.failedRequests - 1);
+
+ if (responseTime !== undefined) {
+ this.state.currentDelay = this.calculateAdaptiveDelay(responseTime);
+
+ if (this.config.adaptiveBatchSize && this.state.currentBatchSize !== undefined) {
+ this.state.currentBatchSize = this.calculateAdaptiveBatchSize(responseTime);
+ }
+ }
+ }
+
+ /**
+ * Record a failed request and increase delays
+ */
+ recordFailure(): void {
+ this.state.failedRequests++;
+ this.state.currentDelay = Math.min(
+ this.state.currentDelay * this.config.backoffMultiplier,
+ this.config.maxDelay
+ );
+
+ if (this.config.adaptiveBatchSize && this.state.currentBatchSize !== undefined) {
+ this.state.currentBatchSize = Math.max(
+ Math.floor(this.state.currentBatchSize * 0.7),
+ this.config.minBatchSize || 10
+ );
+ }
+ }
+
+ /**
+ * Execute a request with automatic retry logic
+ */
+ async executeWithRetry(
+ requestFn: () => Promise,
+ retryCount = 0
+ ): Promise {
+ try {
+ const startTime = Date.now();
+ const result = await requestFn();
+ const responseTime = Date.now() - startTime;
+
+ this.recordSuccess(responseTime);
+ return result;
+ } catch (error) {
+ console.warn(`Request failed (attempt ${retryCount + 1}):`, error);
+
+ if (retryCount < this.config.maxRetries) {
+ this.recordFailure();
+ await this.waitForNext();
+ return this.executeWithRetry(requestFn, retryCount + 1);
+ }
+
+ this.recordFailure();
+ return null;
+ }
+ }
+
+ /**
+ * Get current state for persistence
+ */
+ getState(): RateLimiterState {
+ return { ...this.state };
+ }
+
+ /**
+ * Update state from persisted data
+ */
+ setState(state: Partial): void {
+ this.state = { ...this.state, ...state };
+ }
+
+ /**
+ * Get current batch size (if adaptive batching is enabled)
+ */
+ getCurrentBatchSize(): number {
+ return this.state.currentBatchSize || this.config.baseBatchSize || 50;
+ }
+
+ /**
+ * Get current delay
+ */
+ getCurrentDelay(): number {
+ return this.state.currentDelay;
+ }
+
+ /**
+ * Get failure count
+ */
+ getFailureCount(): number {
+ return this.state.failedRequests;
+ }
+
+ private calculateAdaptiveDelay(responseTime: number): number {
+ const { currentDelay, failedRequests, lastSuccessTime } = this.state;
+ const timeSinceLastSuccess = Date.now() - lastSuccessTime;
+
+ // Increase delay if we're seeing failures or slow responses
+ if (failedRequests > 0 || responseTime > 2000) {
+ return Math.min(currentDelay * this.config.backoffMultiplier, this.config.maxDelay);
+ }
+
+ // Decrease delay if we're doing well and it's been a while since last success
+ if (responseTime < 500 && timeSinceLastSuccess > 10000) {
+ return Math.max(currentDelay * 0.8, this.config.minDelay);
+ }
+
+ return currentDelay;
+ }
+
+ private calculateAdaptiveBatchSize(responseTime: number): number {
+ if (!this.config.adaptiveBatchSize || this.state.currentBatchSize === undefined) {
+ return this.state.currentBatchSize || this.config.baseBatchSize || 50;
+ }
+
+ const { currentBatchSize, failedRequests } = this.state;
+
+ // Reduce batch size if we're seeing failures or slow responses
+ if (failedRequests > 2 || responseTime > 3000) {
+ return Math.max(
+ Math.floor(currentBatchSize * 0.7),
+ this.config.minBatchSize || 10
+ );
+ }
+
+ // Increase batch size if we're doing well
+ if (failedRequests === 0 && responseTime < 1000) {
+ return Math.min(
+ Math.floor(currentBatchSize * 1.2),
+ this.config.maxBatchSize || 100
+ );
+ }
+
+ return currentBatchSize;
+ }
+}
+
+/**
+ * Predefined rate limiter configurations for different job types
+ */
+export const RATE_LIMITER_PRESETS = {
+ MESSAGES: {
+ minDelay: 50,
+ maxDelay: 5000,
+ baseDelay: 200,
+ backoffMultiplier: 1.5,
+ maxRetries: 3,
+ adaptiveBatchSize: true,
+ minBatchSize: 10,
+ maxBatchSize: 100,
+ baseBatchSize: 50,
+ } as RateLimiterConfig,
+
+ NOTIFICATIONS: {
+ minDelay: 100,
+ maxDelay: 3000,
+ baseDelay: 150,
+ backoffMultiplier: 1.4,
+ maxRetries: 2,
+ adaptiveBatchSize: false,
+ } as RateLimiterConfig,
+
+ FORUMS: {
+ minDelay: 75,
+ maxDelay: 2000,
+ baseDelay: 100,
+ backoffMultiplier: 1.3,
+ maxRetries: 2,
+ adaptiveBatchSize: true,
+ minBatchSize: 5,
+ maxBatchSize: 50,
+ baseBatchSize: 25,
+ } as RateLimiterConfig,
+
+ SUBJECTS: {
+ minDelay: 50,
+ maxDelay: 1000,
+ baseDelay: 75,
+ backoffMultiplier: 1.2,
+ maxRetries: 1,
+ adaptiveBatchSize: false,
+ } as RateLimiterConfig,
+};
\ No newline at end of file