feat: major indexing performance improvements + visual fixes

This commit is contained in:
SethBurkart123
2025-05-25 20:11:45 +10:00
parent e09eeccfee
commit cefeac95ea
8 changed files with 1536 additions and 488 deletions
+2 -4
View File
@@ -171,15 +171,13 @@
{#if isRecording}
<!-- Recording state -->
<div
class="flex items-center justify-center px-3 py-2 text-sm rounded-md dark:bg-[#38373D]/50 bg-[#DDDDDD]/50 border-[#DDDDDD]/30 dark:border-[#38373D]/30 dark:text-white border cursor-pointer text-nowrap"
class="flex items-center justify-center px-3 py-1.5 text-sm rounded-md dark:bg-[#38373D]/50 bg-[#DDDDDD]/50 border-[#DDDDDD]/30 dark:border-[#38373D]/30 dark:text-white border cursor-pointer text-nowrap"
onclick={startRecording}
onkeydown={startRecording}
role="button"
tabindex="0"
>
{#if hotkeyParts.length === 0}
<span class="text-gray-500 dark:text-gray-400">Press keys...</span>
{/if}
Press keys...
</div>
{:else if hotkeyParts.length > 0}
<!-- Display current hotkey -->
@@ -55,8 +55,7 @@ const settings = defineSettings({
req.onsuccess = () => resolve();
req.onerror = () => reject(req.error);
req.onblocked = () => {
alert(`Please close all other tabs using this app to reset the database: ${dbName}`);
reject(new Error('Delete blocked'));
reject(new Error(`One database is open, failed to remove: ${dbName}`));
};
});
};
@@ -1,5 +1,35 @@
import type { Job, IndexItem } from "../types";
import { htmlToPlainText } from "../utils";
import { delay } from "@/seqta/utils/delay";
import { VectorWorkerManager } from "../worker/vectorWorkerManager";
const RATE_LIMIT_CONFIG = {
minDelay: 50,
maxDelay: 5000,
baseDelay: 200,
backoffMultiplier: 1.5,
maxRetries: 3,
adaptiveBatchSize: true,
minBatchSize: 10,
maxBatchSize: 100,
baseBatchSize: 50,
vectorBatchSize: 5,
parallelRequests: 5,
parallelDelay: 100,
};
interface MessagesProgress {
offset: number;
done: boolean;
currentBatchSize: number;
currentDelay: number;
failedRequests: number;
lastSuccessTime: number;
retryQueue: number[];
processedIds: string[];
streamingStarted: boolean;
totalEstimated: number;
}
const fetchMessages = async (offset = 0, limit = 100) => {
const res = await fetch(`${location.origin}/seqta/student/load/message`, {
@@ -23,74 +53,161 @@ const fetchMessages = async (offset = 0, limit = 100) => {
}>;
};
export const fetchMessageContent = async (id: number) => {
export const fetchMessageContent = async (
id: number,
retryCount = 0,
): Promise<{
payload: { contents: string };
status: string;
} | null> => {
try {
const res = await fetch(`${location.origin}/seqta/student/load/message`, {
method: "POST",
credentials: "include",
headers: { "Content-Type": "application/json; charset=utf-8" },
body: JSON.stringify({ action: "message", id }),
});
return res.json() as Promise<{
payload: { contents: string };
status: string;
}>;
};
interface MessagesProgress {
offset: number;
done: boolean;
if (!res.ok) {
throw new Error(`HTTP ${res.status}: ${res.statusText}`);
}
export const messagesJob: Job = {
id: "messages",
label: "Messages",
renderComponentId: "message",
frequency: { type: "expiry", afterMs: 1000 * 60 * 60 * 24 },
return await res.json();
} catch (error) {
console.warn(
`[Messages job] Failed to fetch content for message ${id} (attempt ${retryCount + 1}):`,
error,
);
run: async (ctx) => {
const limit = 100;
const progress = (await ctx.getProgress<MessagesProgress>()) ?? {
offset: 0,
done: false,
if (retryCount < RATE_LIMIT_CONFIG.maxRetries) {
const retryDelay =
RATE_LIMIT_CONFIG.baseDelay *
Math.pow(RATE_LIMIT_CONFIG.backoffMultiplier, retryCount);
await delay(Math.min(retryDelay, RATE_LIMIT_CONFIG.maxDelay));
return fetchMessageContent(id, retryCount + 1);
}
return null;
}
};
const existingIds = new Set((await ctx.getStoredItems()).map((i) => i.id));
function calculateAdaptiveDelay(
progress: MessagesProgress,
responseTime: number,
): number {
const { currentDelay, failedRequests, lastSuccessTime } = progress;
const timeSinceLastSuccess = Date.now() - lastSuccessTime;
let consecutiveExisting = 0;
if (failedRequests > 0 || responseTime > 2000) {
return Math.min(
currentDelay * RATE_LIMIT_CONFIG.backoffMultiplier,
RATE_LIMIT_CONFIG.maxDelay,
);
}
while (!progress.done) {
let list;
if (responseTime < 500 && timeSinceLastSuccess > 10000) {
return Math.max(currentDelay * 0.8, RATE_LIMIT_CONFIG.minDelay);
}
return currentDelay;
}
function calculateAdaptiveBatchSize(
progress: MessagesProgress,
responseTime: number,
): number {
if (!RATE_LIMIT_CONFIG.adaptiveBatchSize) {
return progress.currentBatchSize;
}
const { currentBatchSize, failedRequests } = progress;
if (failedRequests > 2 || responseTime > 3000) {
return Math.max(
Math.floor(currentBatchSize * 0.7),
RATE_LIMIT_CONFIG.minBatchSize,
);
}
if (failedRequests === 0 && responseTime < 1000) {
return Math.min(
Math.floor(currentBatchSize * 1.2),
RATE_LIMIT_CONFIG.maxBatchSize,
);
}
return currentBatchSize;
}
async function estimateMessageCount(): Promise<number> {
try {
list = await fetchMessages(progress.offset, limit);
} catch (e) {
console.error("[Messages job] list fetch failed:", e);
break;
const firstBatch = await fetchMessages(0, 100);
if (firstBatch.status !== "200" || !firstBatch.payload.hasMore) {
return firstBatch.payload.messages.length;
}
if (list.status !== "200") break;
return Math.min(firstBatch.payload.messages.length * 20, 2000);
} catch (error) {
console.warn("[Messages job] Failed to estimate message count:", error);
return 500;
}
}
for (const msg of list.payload.messages) {
async function processMessagesInParallel(
messages: any[],
existingIds: Set<string>,
processedIdsSet: Set<string>,
progress: MessagesProgress,
ctx: any,
): Promise<{
processedItems: IndexItem[];
consecutiveExisting: number;
updatedProgress: MessagesProgress;
}> {
const processedItems: IndexItem[] = [];
let consecutiveExisting = 0;
const updatedProgress = { ...progress };
const messagesToProcess = messages.filter((msg) => {
const id = msg.id.toString();
if (existingIds.has(id)) {
consecutiveExisting += 1;
if (consecutiveExisting >= 20) {
progress.done = true;
break;
}
continue;
if (existingIds.has(id) || processedIdsSet.has(id)) {
consecutiveExisting++;
return false;
}
consecutiveExisting = 0;
return true;
});
let full;
try {
full = await fetchMessageContent(msg.id);
} catch (e) {
console.error(`[Messages job] content fetch failed (id ${id}):`, e);
continue;
if (messagesToProcess.length === 0) {
return { processedItems, consecutiveExisting, updatedProgress };
}
if (full.status !== "200") continue;
for (
let i = 0;
i < messagesToProcess.length;
i += RATE_LIMIT_CONFIG.parallelRequests
) {
const batch = messagesToProcess.slice(
i,
i + RATE_LIMIT_CONFIG.parallelRequests,
);
if (i > 0) {
await delay(
Math.max(updatedProgress.currentDelay, RATE_LIMIT_CONFIG.parallelDelay),
);
}
const batchStartTime = Date.now();
const batchPromises = batch.map(async (msg) => {
const id = msg.id.toString();
try {
const full = await fetchMessageContent(msg.id);
const responseTime = Date.now() - batchStartTime;
if (full && full.status === "200") {
const item: IndexItem = {
id,
text: msg.subject,
@@ -111,16 +228,328 @@ export const messagesJob: Job = {
renderComponentId: "message",
};
await ctx.addItem(item);
existingIds.add(id);
return { success: true, item, id, responseTime };
} else {
return { success: false, id, messageId: msg.id, responseTime };
}
} catch (error) {
console.error(`[Messages job] content fetch failed (id ${id}):`, error);
return { success: false, id, messageId: msg.id, error };
}
});
const batchResults = await Promise.all(batchPromises);
const batchResponseTime = Date.now() - batchStartTime;
let batchSuccesses = 0;
let batchFailures = 0;
for (const result of batchResults) {
if (result.success && result.item) {
await ctx.addItem(result.item);
existingIds.add(result.id);
processedIdsSet.add(result.id);
processedItems.push(result.item);
batchSuccesses++;
} else {
if (updatedProgress.retryQueue.length < 50 && result.messageId) {
updatedProgress.retryQueue.push(result.messageId);
}
batchFailures++;
}
}
if (batchSuccesses > 0) {
updatedProgress.lastSuccessTime = Date.now();
updatedProgress.failedRequests = Math.max(
0,
updatedProgress.failedRequests - batchSuccesses,
);
}
if (batchFailures > 0) {
updatedProgress.failedRequests += batchFailures;
}
updatedProgress.currentDelay = calculateAdaptiveDelay(
updatedProgress,
batchResponseTime,
);
console.log(
`[Messages job] Processed parallel batch: ${batchSuccesses} successes, ${batchFailures} failures, ${batchResponseTime}ms total time`,
);
}
return { processedItems, consecutiveExisting, updatedProgress };
}
export const messagesJob: Job = {
id: "messages",
label: "Messages",
renderComponentId: "message",
frequency: { type: "expiry", afterMs: 1000 * 60 * 60 * 24 },
run: async (ctx) => {
const progress = (await ctx.getProgress<MessagesProgress>()) ?? {
offset: 0,
done: false,
currentBatchSize: RATE_LIMIT_CONFIG.baseBatchSize,
currentDelay: RATE_LIMIT_CONFIG.baseDelay,
failedRequests: 0,
lastSuccessTime: Date.now(),
retryQueue: [],
processedIds: [],
streamingStarted: false,
totalEstimated: 0,
};
const existingIds = new Set((await ctx.getStoredItems()).map((i) => i.id));
const processedIdsSet = new Set(progress.processedIds);
existingIds.forEach((id) => processedIdsSet.add(id));
const vectorWorker = VectorWorkerManager.getInstance();
if (!progress.streamingStarted) {
progress.totalEstimated = await estimateMessageCount();
try {
await vectorWorker.startStreamingSession(
progress.totalEstimated,
(progressData) => {
console.log(
`[Messages job] Vector streaming progress: ${progressData.processed}/${progressData.total} (${progressData.status})`,
);
},
RATE_LIMIT_CONFIG.vectorBatchSize,
);
progress.streamingStarted = true;
console.log(
`[Messages job] Started streaming vectorization session for ~${progress.totalEstimated} items`,
);
} catch (error) {
console.warn(
"[Messages job] Failed to start streaming session:",
error,
);
}
}
let consecutiveExisting = 0;
let requestStartTime = 0;
let progressUpdateCounter = 0;
let itemsStreamedToVector = 0;
if (progress.retryQueue.length > 0) {
console.log(
`[Messages job] Processing ${Math.min(progress.retryQueue.length, 10)} items from retry queue`,
);
const retryBatch = progress.retryQueue.slice(0, 10);
const retryBatches = [];
for (
let i = 0;
i < retryBatch.length;
i += RATE_LIMIT_CONFIG.parallelRequests
) {
retryBatches.push(
retryBatch.slice(i, i + RATE_LIMIT_CONFIG.parallelRequests),
);
}
for (const batch of retryBatches) {
await delay(progress.currentDelay);
const batchStartTime = Date.now();
const retryPromises = batch.map(async (messageId) => {
const id = messageId.toString();
if (processedIdsSet.has(id)) {
return { success: true, messageId, alreadyProcessed: true };
}
try {
const full = await fetchMessageContent(messageId);
const responseTime = Date.now() - batchStartTime;
if (full && full.status === "200") {
return { success: true, messageId, responseTime };
} else {
return { success: false, messageId, responseTime };
}
} catch (error) {
console.error(
`[Messages job] Retry failed for message ${messageId}:`,
error,
);
return { success: false, messageId, error };
}
});
const retryResults = await Promise.all(retryPromises);
const batchResponseTime = Date.now() - batchStartTime;
let retrySuccesses = 0;
let retryFailures = 0;
for (const result of retryResults) {
if (result.success) {
if (!result.alreadyProcessed) {
processedIdsSet.add(result.messageId.toString());
retrySuccesses++;
}
progress.retryQueue = progress.retryQueue.filter(
(mid) => mid !== result.messageId,
);
} else {
retryFailures++;
}
}
if (retrySuccesses > 0) {
progress.lastSuccessTime = Date.now();
progress.failedRequests = Math.max(
0,
progress.failedRequests - retrySuccesses,
);
}
if (retryFailures > 0) {
progress.failedRequests += retryFailures;
}
progress.currentDelay = calculateAdaptiveDelay(
progress,
batchResponseTime,
);
console.log(
`[Messages job] Processed retry batch: ${retrySuccesses} successes, ${retryFailures} failures`,
);
}
}
while (!progress.done) {
await delay(progress.currentDelay);
requestStartTime = Date.now();
let list;
try {
list = await fetchMessages(progress.offset, progress.currentBatchSize);
const responseTime = Date.now() - requestStartTime;
progress.currentDelay = calculateAdaptiveDelay(progress, responseTime);
progress.currentBatchSize = calculateAdaptiveBatchSize(
progress,
responseTime,
);
} catch (e) {
console.error("[Messages job] list fetch failed:", e);
progress.failedRequests++;
progress.currentDelay = Math.min(
progress.currentDelay * RATE_LIMIT_CONFIG.backoffMultiplier,
RATE_LIMIT_CONFIG.maxDelay,
);
progress.processedIds = Array.from(processedIdsSet);
await ctx.setProgress(progress);
break;
}
if (list.status !== "200") {
progress.failedRequests++;
progress.processedIds = Array.from(processedIdsSet);
await ctx.setProgress(progress);
break;
}
const itemsToStream: IndexItem[] = [];
const {
processedItems,
consecutiveExisting: newConsecutiveExisting,
updatedProgress,
} = await processMessagesInParallel(
list.payload.messages,
existingIds,
processedIdsSet,
progress,
ctx,
);
progress.currentDelay = updatedProgress.currentDelay;
progress.failedRequests = updatedProgress.failedRequests;
progress.lastSuccessTime = updatedProgress.lastSuccessTime;
progress.retryQueue = updatedProgress.retryQueue;
itemsToStream.push(...processedItems);
consecutiveExisting = newConsecutiveExisting;
if (consecutiveExisting >= 20) {
progress.done = true;
}
if (itemsToStream.length > 0 && progress.streamingStarted) {
try {
await vectorWorker.streamItems(itemsToStream);
itemsStreamedToVector += itemsToStream.length;
console.log(
`[Messages job] Streamed ${itemsToStream.length} items to vector worker (total: ${itemsStreamedToVector})`,
);
} catch (error) {
console.warn(
"[Messages job] Failed to stream items to vector worker:",
error,
);
}
}
if (!list.payload.hasMore) progress.done = true;
progress.offset += limit;
progress.offset += progress.currentBatchSize;
progressUpdateCounter++;
if (progressUpdateCounter >= 10 || progress.done) {
progress.processedIds = Array.from(processedIdsSet);
await ctx.setProgress(progress);
progressUpdateCounter = 0;
console.log(
`[Messages job] Progress: offset=${progress.offset}, batchSize=${progress.currentBatchSize}, delay=${progress.currentDelay}ms, failures=${progress.failedRequests}, retryQueue=${progress.retryQueue.length}, vectorStreamed=${itemsStreamedToVector}, parallelRequests=${RATE_LIMIT_CONFIG.parallelRequests}`,
);
}
}
if (progress.done) await ctx.setProgress({ offset: 0, done: false });
if (progress.streamingStarted) {
try {
await vectorWorker.endStreamingSession();
console.log(
`[Messages job] Ended streaming session. Total items streamed: ${itemsStreamedToVector}`,
);
} catch (error) {
console.warn("[Messages job] Failed to end streaming session:", error);
}
}
if (progress.done) {
await ctx.setProgress({
offset: 0,
done: false,
currentBatchSize: RATE_LIMIT_CONFIG.baseBatchSize,
currentDelay: RATE_LIMIT_CONFIG.baseDelay,
failedRequests: 0,
lastSuccessTime: Date.now(),
retryQueue: progress.retryQueue.slice(0, 20),
processedIds: [],
streamingStarted: false,
totalEstimated: 0,
});
} else {
progress.processedIds = Array.from(processedIdsSet);
await ctx.setProgress(progress);
}
return [];
},
@@ -1,8 +1,18 @@
import type { Job, IndexItem } from "../types";
import { htmlToPlainText } from "../utils";
import { fetchMessageContent } from "./messages";
import { delay } from "@/seqta/utils/delay";
import { VectorWorkerManager } from "../worker/vectorWorkerManager";
const NOTIFICATIONS_RATE_LIMIT = {
baseDelay: 150,
maxDelay: 3000,
backoffMultiplier: 1.4,
maxRetries: 2,
batchDelay: 100,
vectorBatchSize: 3,
};
/* ------------- Notification types ------------- */
interface MessageNotification {
notificationID: number;
type: "message";
@@ -28,10 +38,13 @@ interface AssessmentNotification {
type Notification = MessageNotification | AssessmentNotification;
interface NotificationsProgress {
lastTs: number; // ms since epoch of last processed notification
lastTs: number;
failedRequests: number;
currentDelay: number;
retryQueue: number[];
streamingStarted: boolean;
}
/* ------------- Helpers ------------- */
const fetchNotifications = async () => {
const res = await fetch(`${location.origin}/seqta/student/heartbeat?`, {
method: "POST",
@@ -49,9 +62,9 @@ const fetchAssessmentName = async (
assessmentId: number,
metaclassId: number,
programmeId: number,
retryCount = 0,
): Promise<string> => {
const searchAssessment = (data: any): string | null => {
// Search syllabus
for (const item of data.syllabus || []) {
const found = (item.assessments || []).find(
(a: any) => a.id === assessmentId,
@@ -59,13 +72,11 @@ const fetchAssessmentName = async (
if (found) return found.title;
}
// Search pending
const foundPending = (data.pending || []).find(
(a: any) => a.id === assessmentId,
);
if (foundPending) return foundPending.title;
// Search tasks
const foundTask = (data.tasks || []).find(
(a: any) => a.id === assessmentId,
);
@@ -75,6 +86,7 @@ const fetchAssessmentName = async (
};
const fetchAssessments = async (endpoint: string) => {
try {
const res = await fetch(`${location.origin}${endpoint}`, {
method: "POST",
credentials: "include",
@@ -83,16 +95,38 @@ const fetchAssessmentName = async (
programme: programmeId,
}),
});
if (!res.ok) {
throw new Error(`HTTP ${res.status}: ${res.statusText}`);
}
const json = await res.json();
return json.payload;
} catch (error) {
console.warn(
`[Notifications job] Failed to fetch assessments from ${endpoint} (attempt ${retryCount + 1}):`,
error,
);
if (retryCount < NOTIFICATIONS_RATE_LIMIT.maxRetries) {
const retryDelay =
NOTIFICATIONS_RATE_LIMIT.baseDelay *
Math.pow(NOTIFICATIONS_RATE_LIMIT.backoffMultiplier, retryCount);
await delay(Math.min(retryDelay, NOTIFICATIONS_RATE_LIMIT.maxDelay));
return fetchAssessments(endpoint);
}
throw error;
}
};
// Try from /past
try {
let payload = await fetchAssessments("/seqta/student/assessment/list/past");
let title = searchAssessment(payload);
if (title) return title;
// Try from /upcoming if not found in /past
await delay(NOTIFICATIONS_RATE_LIMIT.baseDelay);
const upcomingPayload = await fetchAssessments(
"/seqta/student/assessment/list/upcoming",
);
@@ -104,9 +138,28 @@ const fetchAssessmentName = async (
throw new Error(
`Assessment with ID ${assessmentId} not found in past or upcoming.`,
);
} catch (error) {
if (retryCount < NOTIFICATIONS_RATE_LIMIT.maxRetries) {
const retryDelay =
NOTIFICATIONS_RATE_LIMIT.baseDelay *
Math.pow(NOTIFICATIONS_RATE_LIMIT.backoffMultiplier, retryCount);
await delay(Math.min(retryDelay, NOTIFICATIONS_RATE_LIMIT.maxDelay));
return fetchAssessmentName(
assessmentId,
metaclassId,
programmeId,
retryCount + 1,
);
}
console.error(
`[Notifications job] Failed to fetch assessment name for ID ${assessmentId} after ${retryCount + 1} attempts:`,
error,
);
return `Assessment ${assessmentId}`;
}
};
/* ------------- Job ------------- */
export const notificationsJob: Job = {
id: "notifications",
label: "Notifications",
@@ -116,6 +169,10 @@ export const notificationsJob: Job = {
run: async (ctx) => {
const progress = (await ctx.getProgress<NotificationsProgress>()) ?? {
lastTs: 0,
failedRequests: 0,
currentDelay: NOTIFICATIONS_RATE_LIMIT.baseDelay,
retryQueue: [],
streamingStarted: false,
};
let notifications: Notification[];
@@ -123,10 +180,43 @@ export const notificationsJob: Job = {
notifications = await fetchNotifications();
} catch (e) {
console.error("[Notifications job] fetch failed:", e);
progress.failedRequests++;
progress.currentDelay = Math.min(
progress.currentDelay * NOTIFICATIONS_RATE_LIMIT.backoffMultiplier,
NOTIFICATIONS_RATE_LIMIT.maxDelay,
);
await ctx.setProgress(progress);
return [];
}
const vectorWorker = VectorWorkerManager.getInstance();
if (!progress.streamingStarted && notifications.length > 0) {
const estimatedTotal = Math.min(notifications.length * 1.2, 100);
try {
await vectorWorker.startStreamingSession(
estimatedTotal,
(progressData) => {
console.log(
`[Notifications job] Vector streaming progress: ${progressData.processed}/${progressData.total} (${progressData.status})`,
);
},
NOTIFICATIONS_RATE_LIMIT.vectorBatchSize,
);
progress.streamingStarted = true;
console.log(
`[Notifications job] Started streaming vectorization session for ~${estimatedTotal} items`,
);
} catch (error) {
console.warn(
"[Notifications job] Failed to start streaming session:",
error,
);
}
}
const notificationIsIndexed = async (id: string): Promise<boolean> => {
try {
const [inAssessments, inMessages] = await Promise.all([
ctx
.getStoredItems("notifications")
@@ -136,14 +226,210 @@ export const notificationsJob: Job = {
.then((items) => items.some((i) => i.id === id)),
]);
return inAssessments || inMessages;
} catch (error) {
console.warn(
`[Notifications job] Error checking if notification ${id} is indexed:`,
error,
);
return false;
}
};
const items: IndexItem[] = [];
const itemsToStream: IndexItem[] = [];
let processedCount = 0;
let progressUpdateCounter = 0;
let itemsStreamedToVector = 0;
for (const notif of notifications) {
if (progress.retryQueue.length > 0) {
console.log(
`[Notifications job] Processing ${Math.min(progress.retryQueue.length, 3)} items from retry queue`,
);
const retryBatch = progress.retryQueue.slice(0, 3);
for (const notificationId of retryBatch) {
const notification = notifications.find(
(n) => n.notificationID === notificationId,
);
if (!notification) {
progress.retryQueue = progress.retryQueue.filter(
(id) => id !== notificationId,
);
continue;
}
await delay(progress.currentDelay);
try {
const { success, item } = await processNotification(
notification,
ctx,
);
if (success) {
progress.retryQueue = progress.retryQueue.filter(
(id) => id !== notificationId,
);
progress.failedRequests = Math.max(0, progress.failedRequests - 1);
progress.currentDelay = Math.max(
progress.currentDelay * 0.9,
NOTIFICATIONS_RATE_LIMIT.baseDelay,
);
if (item) {
items.push(item);
itemsToStream.push(item);
}
}
} catch (error) {
console.error(
`[Notifications job] Retry failed for notification ${notificationId}:`,
error,
);
progress.failedRequests++;
}
}
}
const notificationsToProcess = notifications.slice(0, 20);
for (const notif of notificationsToProcess) {
const id = notif.notificationID.toString();
if (await notificationIsIndexed(id)) continue;
try {
if (await notificationIsIndexed(id)) continue;
if (progress.retryQueue.includes(notif.notificationID)) continue;
if (processedCount > 0) {
await delay(NOTIFICATIONS_RATE_LIMIT.batchDelay);
}
const { success, item } = await processNotification(
notif,
ctx,
);
if (!success) {
if (progress.retryQueue.length < 10) {
progress.retryQueue.push(notif.notificationID);
}
progress.failedRequests++;
} else {
progress.failedRequests = Math.max(0, progress.failedRequests - 1);
progress.currentDelay = Math.max(
progress.currentDelay * 0.95,
NOTIFICATIONS_RATE_LIMIT.baseDelay,
);
if (item) {
items.push(item);
itemsToStream.push(item);
}
}
} catch (error) {
console.error(
`[Notifications job] Failed to process notification ${id}:`,
error,
);
if (progress.retryQueue.length < 10) {
progress.retryQueue.push(notif.notificationID);
}
progress.failedRequests++;
progress.currentDelay = Math.min(
progress.currentDelay * NOTIFICATIONS_RATE_LIMIT.backoffMultiplier,
NOTIFICATIONS_RATE_LIMIT.maxDelay,
);
}
processedCount++;
if (
itemsToStream.length >= NOTIFICATIONS_RATE_LIMIT.vectorBatchSize &&
progress.streamingStarted
) {
try {
await vectorWorker.streamItems([...itemsToStream]);
itemsStreamedToVector += itemsToStream.length;
console.log(
`[Notifications job] Streamed ${itemsToStream.length} items to vector worker (total: ${itemsStreamedToVector})`,
);
itemsToStream.length = 0;
} catch (error) {
console.warn(
"[Notifications job] Failed to stream items to vector worker:",
error,
);
}
}
progressUpdateCounter++;
if (progressUpdateCounter >= 5) {
await ctx.setProgress(progress);
progressUpdateCounter = 0;
}
}
if (itemsToStream.length > 0 && progress.streamingStarted) {
try {
await vectorWorker.streamItems([...itemsToStream]);
itemsStreamedToVector += itemsToStream.length;
console.log(
`[Notifications job] Streamed final ${itemsToStream.length} items to vector worker (total: ${itemsStreamedToVector})`,
);
} catch (error) {
console.warn(
"[Notifications job] Failed to stream final items to vector worker:",
error,
);
}
}
if (progress.streamingStarted) {
try {
await vectorWorker.endStreamingSession();
console.log(
`[Notifications job] Ended streaming session. Total items streamed: ${itemsStreamedToVector}`,
);
progress.streamingStarted = false;
} catch (error) {
console.warn(
"[Notifications job] Failed to end streaming session:",
error,
);
}
}
if (items.length) {
const latest = Math.max(
...items.map((i) => i.dateAdded),
progress.lastTs,
);
progress.lastTs = latest;
}
await ctx.setProgress(progress);
console.log(
`[Notifications job] Processed ${processedCount} notifications, ${progress.retryQueue.length} in retry queue, ${progress.failedRequests} failures, ${itemsStreamedToVector} items streamed to vector worker`,
);
return items;
},
purge: (items) => {
const date = new Date();
date.setMonth(0, 1);
date.setHours(0, 0, 0, 0);
return items.filter((i) => i.dateAdded >= date.getTime());
},
};
async function processNotification(
notif: Notification,
ctx: any,
): Promise<{ success: boolean; item?: IndexItem }> {
const id = notif.notificationID.toString();
try {
if (notif.type === "coneqtassessments") {
const a = notif.coneqtAssessments;
@@ -152,7 +438,8 @@ export const notificationsJob: Job = {
a.metaclassID,
a.programmeID,
);
items.push({
const item: IndexItem = {
id,
text: a.title,
category: "assessments",
@@ -168,12 +455,14 @@ export const notificationsJob: Job = {
},
actionId: "assessment",
renderComponentId: "assessment",
});
};
return { success: true, item };
} else if (notif.type === "message") {
const content = await fetchMessageContent(notif.message.messageID);
await ctx.addItem(
{
if (content && content.payload) {
const item: IndexItem = {
id,
text: notif.message.title,
category: "messages",
@@ -187,27 +476,19 @@ export const notificationsJob: Job = {
},
actionId: "message",
renderComponentId: "message",
},
"messages",
);
}
}
if (items.length) {
const latest = Math.max(
...items.map((i) => i.dateAdded),
progress.lastTs,
);
await ctx.setProgress({ lastTs: latest });
}
return items;
},
purge: (items) => {
const date = new Date();
date.setMonth(0, 1);
date.setHours(0, 0, 0, 0);
return items.filter((i) => i.dateAdded >= date.getTime());
},
};
await ctx.addItem(item, "messages");
return { success: true, item };
}
}
return { success: false };
} catch (error) {
console.error(
`[Notifications job] Error processing notification ${id}:`,
error,
);
return { success: false };
}
}
@@ -16,7 +16,10 @@ export const subjectsJob: Job = {
id: "subjects",
label: "Subjects",
renderComponentId: "subject",
frequency: "pageLoad",
frequency: {
type: "expiry",
afterMs: 1000 * 60 * 60 * 24 * 30,
},
boostCriteria: (item, searchTerm) => {
if (searchTerm == "") {
return -100;
@@ -5,6 +5,16 @@ let vectorIndex: EmbeddingIndex | null = null;
let isInitialized = false;
let currentAbortController: AbortController | null = null;
let streamingSession: {
isActive: boolean;
totalExpected: number;
totalReceived: number;
totalProcessed: number;
batchSize: number;
pendingItems: IndexItem[];
processingPromise: Promise<void> | null;
} | null = null;
async function initWorker() {
if (isInitialized) {
console.debug("Vector worker already initialized.");
@@ -28,16 +38,14 @@ async function initWorker() {
console.debug("Vector worker initialized successfully.");
} catch (e) {
console.error("Failed to initialize vector worker:", e);
// Set as initialized even on error to prevent retries, but index will be null
isInitialized = true;
vectorIndex = null; // Ensure index is null on error
vectorIndex = null;
}
}
async function vectorizeItem(
item: IndexItem,
): Promise<(IndexItem & { embedding: number[] }) | null> {
// Simplified for brevity - assumes embedding function doesn't need cancellation signal
try {
const textToEmbed = [
item.text,
@@ -53,19 +61,246 @@ async function vectorizeItem(
return { ...item, embedding };
} catch (error) {
console.error(`Error vectorizing item ${item.id}:`, error);
return null; // Return null if vectorization fails for an item
return null;
}
}
async function startStreamingSession(
totalExpected: number,
batchSize: number = 5,
) {
if (!vectorIndex) {
console.warn(
"Streaming requested but vector index not ready. Attempting init.",
);
await initWorker();
if (!vectorIndex) {
self.postMessage({
type: "progress",
data: {
status: "error",
message:
"Vector index not available for streaming after init attempt.",
},
});
return;
}
}
if (streamingSession?.isActive) {
await endStreamingSession();
}
streamingSession = {
isActive: true,
totalExpected,
totalReceived: 0,
totalProcessed: 0,
batchSize,
pendingItems: [],
processingPromise: null,
};
console.debug(
`Started streaming session for ${totalExpected} items with batch size ${batchSize}`,
);
self.postMessage({
type: "streamingProgress",
data: {
processed: 0,
total: totalExpected,
message: "Streaming session started",
},
});
}
async function processStreamingBatch(
items: IndexItem[],
isLast: boolean = false,
) {
if (!streamingSession?.isActive) {
console.warn("Received streaming batch but no active session");
return;
}
streamingSession.totalReceived += items.length;
streamingSession.pendingItems.push(...items);
console.debug(
`Received streaming batch: ${items.length} items (${streamingSession.totalReceived}/${streamingSession.totalExpected})`,
);
const shouldProcess =
streamingSession.pendingItems.length >= streamingSession.batchSize ||
isLast;
if (shouldProcess && !streamingSession.processingPromise) {
streamingSession.processingPromise = processStreamingItems();
}
}
async function processStreamingItems() {
if (!streamingSession?.isActive || !vectorIndex) {
return;
}
while (
streamingSession.pendingItems.length > 0 &&
streamingSession.isActive
) {
const batchToProcess = streamingSession.pendingItems.splice(
0,
streamingSession.batchSize,
);
const unprocessedItems = batchToProcess.filter((item) => {
try {
return !vectorIndex!.get({ id: item.id });
} catch (e) {
return true;
}
});
if (unprocessedItems.length === 0) {
streamingSession.totalProcessed += batchToProcess.length;
continue;
}
const vectorizationResults = await Promise.all(
unprocessedItems.map(vectorizeItem),
);
const successfullyVectorized = vectorizationResults.filter(
(result) => result !== null,
) as (IndexItem & { embedding: number[] })[];
if (successfullyVectorized.length > 0) {
try {
successfullyVectorized.forEach((item) => vectorIndex!.add(item));
if (
streamingSession.totalProcessed % (streamingSession.batchSize * 3) ===
0
) {
await vectorIndex!.saveIndex("indexedDB");
console.debug(
`Saved streaming index at ${streamingSession.totalProcessed} processed items`,
);
}
} catch (e) {
console.error("Error processing streaming batch:", e);
}
}
streamingSession.totalProcessed += batchToProcess.length;
self.postMessage({
type: "streamingProgress",
data: {
processed: streamingSession.totalProcessed,
total: streamingSession.totalExpected,
message: `Processed ${streamingSession.totalProcessed}/${streamingSession.totalExpected} items`,
},
});
await new Promise((resolve) => setTimeout(resolve, 10));
}
streamingSession.processingPromise = null;
if (
streamingSession.totalReceived >= streamingSession.totalExpected &&
streamingSession.pendingItems.length === 0
) {
await finalizeStreamingSession();
}
}
async function finalizeStreamingSession() {
if (!streamingSession?.isActive) {
return;
}
try {
if (vectorIndex) {
await vectorIndex.saveIndex("indexedDB");
console.debug("Final save of streaming index completed");
}
} catch (e) {
console.error("Error in final streaming save:", e);
}
const totalProcessed = streamingSession.totalProcessed;
const totalExpected = streamingSession.totalExpected;
streamingSession.isActive = false;
self.postMessage({
type: "progress",
data: {
status: "complete",
total: totalExpected,
processed: totalProcessed,
message: `Streaming vectorization complete: ${totalProcessed}/${totalExpected} items processed`,
},
});
console.debug(
`Streaming session completed: ${totalProcessed}/${totalExpected} items processed`,
);
}
async function endStreamingSession() {
if (!streamingSession?.isActive) {
return;
}
console.debug("Ending streaming session...");
if (streamingSession.processingPromise) {
await streamingSession.processingPromise;
}
if (streamingSession.pendingItems.length > 0) {
console.debug(
`Processing ${streamingSession.pendingItems.length} remaining items before ending session`,
);
streamingSession.processingPromise = processStreamingItems();
await streamingSession.processingPromise;
}
try {
if (vectorIndex) {
await vectorIndex.saveIndex("indexedDB");
console.debug("Final save before ending streaming session");
}
} catch (e) {
console.error("Error in final save before ending session:", e);
}
const wasActive = streamingSession.isActive;
streamingSession.isActive = false;
if (wasActive) {
self.postMessage({
type: "progress",
data: {
status: "cancelled",
message: "Streaming session ended early",
},
});
}
}
async function processItems(items: IndexItem[], signal: AbortSignal) {
console.debug("Worker received process request.");
if (!vectorIndex) {
console.warn(
"Processing requested but vector index not ready. Attempting init.",
);
await initWorker(); // Attempt initialization if not ready
await initWorker();
if (!vectorIndex) {
// Check again after attempt
self.postMessage({
type: "progress",
data: {
@@ -78,13 +313,11 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
}
}
// Find items we haven't processed yet by checking against the index instance
const unprocessedItems = items.filter((item) => {
if (signal.aborted) return false; // Check cancellation during filtering
if (signal.aborted) return false;
try {
return !vectorIndex!.get({ id: item.id });
} catch (e) {
// If get throws (e.g., item not found), it means it's unprocessed
return true;
}
});
@@ -136,7 +369,6 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
}
const batch = unprocessedItems.slice(i, i + BATCH_SIZE);
// Vectorize batch
const vectorizationResults = await Promise.all(batch.map(vectorizeItem));
const successfullyVectorized = vectorizationResults.filter(
(result) => result !== null,
@@ -154,7 +386,6 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
return;
}
// Add successfully vectorized items to index
if (successfullyVectorized.length > 0) {
try {
successfullyVectorized.forEach((item) => vectorIndex!.add(item));
@@ -164,8 +395,6 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
type: "progress",
data: { status: "error", message: `Error adding to index: ${e}` },
});
// Decide whether to continue or stop on error
// return; // Example: Stop processing if adding fails
}
}
@@ -181,7 +410,6 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
return;
}
// Save index after processing the batch
try {
await vectorIndex!.saveIndex("indexedDB");
console.debug(`Saved index after processing batch ${i / BATCH_SIZE + 1}`);
@@ -191,13 +419,10 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
type: "progress",
data: { status: "error", message: `Error saving index batch: ${e}` },
});
// Continue processing next batch even if saving failed? Or stop?
// return; // Example: Stop if saving fails
}
processedCount = Math.min(i + BATCH_SIZE, unprocessedItems.length);
// Report progress
self.postMessage({
type: "progress",
data: {
@@ -207,7 +432,6 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
},
});
// Yield control briefly to allow other messages (like cancellation) to be processed
await new Promise((resolve) => setTimeout(resolve, 0));
}
@@ -219,196 +443,49 @@ async function processItems(items: IndexItem[], signal: AbortSignal) {
});
} else {
console.debug("Processing completed, but was cancelled.");
// No need to send 'cancelled' again if already sent during batching
// self.postMessage({ type: 'progress', data: { status: 'cancelled', message: 'Processing finished but was cancelled' }});
}
}
async function search(
query: string,
topK: number,
signal: AbortSignal,
messageId: string,
) {
console.debug(
`Worker received search request (ID: ${messageId}): "${query}"`,
);
if (!vectorIndex) {
console.warn(
`Search (ID: ${messageId}) requested but vector index not ready. Attempting init.`,
);
await initWorker(); // Attempt initialization
// Re-check after waiting/init attempt
if (!vectorIndex) {
console.error(
`Search (ID: ${messageId}) failed: Vector index unavailable after init attempt.`,
);
self.postMessage({
type: "searchError",
data: { messageId, error: "Vector index not available." },
});
return;
}
console.debug(
`Vector index ready after init for search (ID: ${messageId}).`,
);
}
if (signal.aborted) {
console.debug(`Search (ID: ${messageId}) cancelled before starting.`);
self.postMessage({ type: "searchCancelled", data: { messageId } });
return;
}
try {
console.debug(`Getting embedding for query (ID: ${messageId})...`);
const queryEmbedding = await getEmbedding(query);
if (signal.aborted) {
console.debug(`Search (ID: ${messageId}) cancelled after embedding.`);
self.postMessage({ type: "searchCancelled", data: { messageId } });
return;
}
console.debug(`Performing vector search (ID: ${messageId})...`);
// Await the search and let TypeScript infer the type
const results = await vectorIndex!.search(queryEmbedding, {
topK,
useStorage: "indexedDB", // Ensure we search the stored index
});
console.debug(
`Vector search (ID: ${messageId}) completed with ${results.length} results.`,
);
if (signal.aborted) {
console.debug(
`Search (ID: ${messageId}) cancelled after search completed, discarding results.`,
);
self.postMessage({ type: "searchCancelled", data: { messageId } });
return;
}
// Post results back to the main thread
self.postMessage({ type: "searchResults", data: { messageId, results } });
} catch (error) {
console.error(`Vector search error in worker (ID: ${messageId}):`, error);
// Ensure signal isn't checked *after* an error occurred before posting error message
if (!signal.aborted) {
// Only post error if not cancelled
self.postMessage({
type: "searchError",
data: {
messageId,
error: error instanceof Error ? error.message : String(error),
},
});
} else {
console.debug(
`Search (ID: ${messageId}) encountered error but was cancelled, suppressing error message.`,
);
self.postMessage({ type: "searchCancelled", data: { messageId } }); // Still notify of cancellation
}
}
}
// Handle messages from the main thread
self.addEventListener("message", async (e) => {
// Make sure data and type exist
if (!e.data || !e.data.type) {
console.warn("Worker received message with no data or type.");
return;
}
const { type, data, messageId } = e.data; // messageId used for requests needing response/cancellation tracking
// Cancel previous long-running operation (process or search) if a new one starts
if (type === "process" || type === "search") {
if (currentAbortController) {
console.debug(
`Worker cancelling previous operation due to new '${type}' request.`,
);
currentAbortController.abort(`New '${type}' operation requested`);
}
currentAbortController = new AbortController();
console.debug(`Worker starting new '${type}' operation.`);
}
// Use the signal from the *current* controller for the task being started
const signal = currentAbortController?.signal;
const { type, data } = e.data;
switch (type) {
case "process":
if (signal && data?.items) {
await processItems(data.items, signal);
} else if (!signal) {
console.error(
"Process message received but no abort signal available.",
);
} else if (!data?.items) {
console.error("Process message received without 'items' data.");
self.postMessage({
type: "progress",
data: {
status: "error",
message: "Process command received without items.",
},
});
}
break;
case "search":
if (signal && messageId && typeof data?.query === "string") {
await search(data.query, data.topK ?? 10, signal, messageId);
} else {
const errorReason = !signal
? "Missing signal"
: !messageId
? "Missing messageId"
: "Missing or invalid query";
console.error(`Search message received invalid: ${errorReason}.`, {
data,
messageId,
signalExists: !!signal,
});
// Send an error back if messageId exists
if (messageId) {
self.postMessage({
type: "searchError",
data: { messageId, error: `Worker internal error: ${errorReason}` },
});
}
}
break;
case "init":
// Init should not be cancellable in the same way, it's foundational
// Check if already initialized before potentially running it again
if (!isInitialized) {
await initWorker();
self.postMessage({ type: "ready" }); // Signal ready *after* init attempt
} else {
console.debug("Received init message, but worker already initialized.");
self.postMessage({ type: "ready" }); // Signal ready anyway
}
self.postMessage({ type: "ready" });
break;
// No explicit 'cancel' case needed as new tasks auto-cancel previous ones
case "process":
if (currentAbortController) {
currentAbortController.abort();
}
currentAbortController = new AbortController();
await processItems(data.items, currentAbortController.signal);
break;
case "startStreaming":
await startStreamingSession(data.totalExpected, data.batchSize);
break;
case "streamBatch":
await processStreamingBatch(data.items, data.isLast);
break;
case "endStreaming":
await endStreamingSession();
break;
default:
console.warn("Unknown message type received by vector worker:", type);
console.warn("Unknown message type:", type);
}
});
// Initial check or trigger for initialization when the worker starts
initWorker()
.then(() => {
self.postMessage({ type: "ready" });
})
.catch((err) => {
console.error("Initial worker initialization failed:", err);
// Still need to signal readiness, perhaps with an error state?
// Or rely on the first 'process' or 'search' to retry init.
// For now, just signal ready, but the index might be null.
self.postMessage({ type: "ready" });
});
@@ -1,7 +1,6 @@
import { refreshVectorCache } from "../../search/vector/vectorSearch";
import type { IndexItem } from "../types";
import vectorWorker from "./vectorWorker.ts?inlineWorker";
import type { SearchResult } from "embeddia";
export type ProgressCallback = (data: {
status: "started" | "processing" | "complete" | "error" | "cancelled";
@@ -14,28 +13,19 @@ export class VectorWorkerManager {
private static instance: VectorWorkerManager;
private worker: Worker | null = null;
private isInitialized = false;
private readyPromise: Promise<void> | null = null; // To await initialization
private readyPromise: Promise<void> | null = null;
private progressCallback: ProgressCallback | null = null;
private searchPromises = new Map<
string,
{
resolve: (value: SearchResult[]) => void;
reject: (reason?: any) => void;
timer: NodeJS.Timeout;
}
>();
private debounceTimer: NodeJS.Timeout | null = null;
private lastSearchParams: {
query: string;
topK: number;
resolve: (results: SearchResult[]) => void;
reject: (reason?: any) => void;
private streamingSession: {
isActive: boolean;
totalExpected: number;
totalSent: number;
batchBuffer: IndexItem[];
batchSize: number;
flushTimer: NodeJS.Timeout | null;
} | null = null;
private constructor() {
// Start initialization immediately, but allow awaiting it
this.readyPromise = this.initWorker();
}
private constructor() {}
static getInstance(): VectorWorkerManager {
if (!VectorWorkerManager.instance) {
@@ -45,26 +35,25 @@ export class VectorWorkerManager {
}
private async initWorker(): Promise<void> {
// If already initialized or initializing, return the existing promise
if (this.isInitialized) return Promise.resolve();
if (this.readyPromise) return this.readyPromise;
console.debug("Lazy-loading vector worker...");
return new Promise<void>((resolve, reject) => {
// Create the worker
this.worker = vectorWorker();
console.log("Worker initialized", this.worker);
const timeout = setTimeout(() => {
console.error("Vector worker initialization timed out");
this.worker?.terminate(); // Clean up worker if it exists
this.worker?.terminate();
this.worker = null;
this.isInitialized = false; // Ensure state reflects failure
this.readyPromise = null; // Allow retrying init later
this.isInitialized = false;
this.readyPromise = null;
reject(new Error("Worker initialization timed out"));
}, 10000); // Increased timeout
}, 10000);
// Set up message handling
this.worker!.addEventListener("message", (e) => {
const { type, data } = e.data;
console.debug("Message from vector worker:", type, data);
@@ -74,7 +63,7 @@ export class VectorWorkerManager {
this.isInitialized = true;
clearTimeout(timeout);
console.debug("Vector worker initialized and ready.");
resolve(); // Resolve the init promise
resolve();
break;
case "progress":
@@ -83,50 +72,23 @@ export class VectorWorkerManager {
if (data.status === "complete") {
refreshVectorCache();
if (this.streamingSession?.isActive) {
this.endStreamingSession();
}
}
}
break;
case "searchResults":
const searchInfo = this.searchPromises.get(data.messageId);
if (searchInfo) {
clearTimeout(searchInfo.timer); // Clear timeout on success
searchInfo.resolve(data.results);
this.searchPromises.delete(data.messageId);
} else {
console.warn(
"Received search results for unknown messageId:",
data.messageId,
);
}
break;
case "searchError":
const errorInfo = this.searchPromises.get(data.messageId);
if (errorInfo) {
clearTimeout(errorInfo.timer); // Clear timeout on error
errorInfo.reject(new Error(data.error));
this.searchPromises.delete(data.messageId);
} else {
console.warn(
"Received search error for unknown messageId:",
data.messageId,
);
}
break;
case "searchCancelled":
const cancelledInfo = this.searchPromises.get(data.messageId);
if (cancelledInfo) {
clearTimeout(cancelledInfo.timer); // Clear timeout on cancel
// Reject with a specific cancellation error or resolve with empty? Let's reject.
cancelledInfo.reject(new Error("Search cancelled by worker"));
this.searchPromises.delete(data.messageId);
} else {
console.debug(
"Received cancellation for unknown messageId:",
data.messageId,
);
case "streamingProgress":
if (this.progressCallback && this.streamingSession?.isActive) {
const { processed } = data;
this.progressCallback({
status: "processing",
processed,
total: this.streamingSession.totalExpected,
message: `Streaming vectorization: ${processed}/${this.streamingSession.totalExpected} items`,
});
}
break;
@@ -135,15 +97,12 @@ export class VectorWorkerManager {
}
});
// Initialize the worker
this.worker!.postMessage({ type: "init" });
});
}
// Ensures worker is ready before proceeding
private async ensureReady() {
if (!this.readyPromise) {
// If init wasn't called or failed, try again
console.warn("Worker not initialized, attempting init...");
this.readyPromise = this.initWorker();
}
@@ -155,113 +114,185 @@ export class VectorWorkerManager {
}
}
async processItems(
items: IndexItem[],
onProgress?: ProgressCallback,
) {
await this.ensureReady(); // Wait for worker to be ready
async processItems(items: IndexItem[], onProgress?: ProgressCallback) {
await this.ensureReady();
this.progressCallback = onProgress || null;
// Cancel any ongoing search when starting processing
this.cancelAllSearches("Processing started");
console.debug(`Sending ${items.length} items to worker for processing.`);
this.worker!.postMessage({
type: "process",
data: { items: items },
});
}
// Public search method
public async search(
query: string,
topK: number = 10,
): Promise<SearchResult[]> {
async startStreamingSession(
totalExpectedItems: number,
onProgress?: ProgressCallback,
batchSize: number = 10,
): Promise<void> {
await this.ensureReady();
return new Promise((resolve, reject) => {
this.lastSearchParams = { query, topK, resolve, reject };
const messageId = crypto.randomUUID();
if (this.lastSearchParams && this.worker) {
const currentParams = this.lastSearchParams; // Capture current params
this.lastSearchParams = null; // Clear last params *before* posting
this.debounceTimer = null;
// Set a timeout for the search operation itself
const searchTimeout = 10000; // e.g., 10 seconds
const searchTimer = setTimeout(() => {
if (this.searchPromises.has(messageId)) {
console.error(`Search timed out for messageId: ${messageId}`);
currentParams.reject(
new Error(`Search timed out after ${searchTimeout}ms`),
);
this.searchPromises.delete(messageId);
if (this.streamingSession?.isActive) {
this.endStreamingSession();
}
}, searchTimeout);
this.searchPromises.set(messageId, {
resolve: currentParams.resolve,
reject: currentParams.reject,
timer: searchTimer,
});
this.progressCallback = onProgress || null;
this.streamingSession = {
isActive: true,
totalExpected: totalExpectedItems,
totalSent: 0,
batchBuffer: [],
batchSize,
flushTimer: null,
};
console.debug(
`Sending search request (ID: ${messageId}) to worker: "${currentParams.query}"`,
`Starting streaming session for ${totalExpectedItems} items with batch size ${batchSize}`,
);
console.log(this.worker);
this.worker.postMessage({
type: "search",
data: { query: currentParams.query, topK: currentParams.topK },
messageId,
this.worker!.postMessage({
type: "startStreaming",
data: { totalExpected: totalExpectedItems, batchSize },
});
if (this.progressCallback) {
this.progressCallback({
status: "started",
total: totalExpectedItems,
processed: 0,
message: "Starting streaming vectorization",
});
} else if (this.lastSearchParams) {
// This case might happen if ensureReady failed but didn't throw
console.error("Worker unavailable when trying to send search request.");
this.lastSearchParams.reject(
new Error("Worker unavailable for search"),
);
this.lastSearchParams = null;
this.debounceTimer = null;
}
}
async streamItems(items: IndexItem[]): Promise<void> {
if (!this.streamingSession?.isActive) {
throw new Error(
"No active streaming session. Call startStreamingSession first.",
);
}
this.streamingSession.batchBuffer.push(...items);
if (
this.streamingSession.batchBuffer.length >=
this.streamingSession.batchSize
) {
await this.flushBatch();
} else {
if (this.streamingSession.flushTimer) {
clearTimeout(this.streamingSession.flushTimer);
}
this.streamingSession.flushTimer = setTimeout(() => {
this.flushBatch();
}, 1000);
}
}
private async flushBatch(): Promise<void> {
if (
!this.streamingSession?.isActive ||
this.streamingSession.batchBuffer.length === 0
) {
return;
}
const batch = [...this.streamingSession.batchBuffer];
this.streamingSession.batchBuffer = [];
this.streamingSession.totalSent += batch.length;
if (this.streamingSession.flushTimer) {
clearTimeout(this.streamingSession.flushTimer);
this.streamingSession.flushTimer = null;
}
console.debug(
`Streaming batch of ${batch.length} items to worker (${this.streamingSession.totalSent}/${this.streamingSession.totalExpected})`,
);
this.worker!.postMessage({
type: "streamBatch",
data: {
items: batch,
isLast:
this.streamingSession.totalSent >=
this.streamingSession.totalExpected,
},
});
}
// Method to cancel all pending/debounced searches
private cancelAllSearches(reason: string = "Cancelled") {
if (this.debounceTimer) {
clearTimeout(this.debounceTimer);
this.debounceTimer = null;
if (this.lastSearchParams) {
this.lastSearchParams.reject(new Error(`Search cancelled: ${reason}`));
this.lastSearchParams = null;
async endStreamingSession(): Promise<void> {
if (!this.streamingSession?.isActive) {
return;
}
await this.flushBatch();
if (this.streamingSession.flushTimer) {
clearTimeout(this.streamingSession.flushTimer);
}
// We might also want to tell the worker to cancel its *current* search
// if it supports it, but this requires worker modification.
// For now, just reject pending promises in the manager.
for (const [messageId, promiseInfo] of this.searchPromises.entries()) {
clearTimeout(promiseInfo.timer);
promiseInfo.reject(new Error(`Search cancelled: ${reason}`));
this.searchPromises.delete(messageId);
this.streamingSession.isActive = false;
this.worker!.postMessage({
type: "endStreaming",
});
console.debug("Streaming session ended");
if (this.progressCallback) {
this.progressCallback({
status: "complete",
total: this.streamingSession.totalExpected,
processed: this.streamingSession.totalSent,
message: "Streaming vectorization complete",
});
}
this.streamingSession = null;
}
async streamItem(item: IndexItem): Promise<void> {
return this.streamItems([item]);
}
isStreamingActive(): boolean {
return this.streamingSession?.isActive ?? false;
}
getStreamingProgress(): {
sent: number;
expected: number;
buffered: number;
} | null {
if (!this.streamingSession?.isActive) {
return null;
}
return {
sent: this.streamingSession.totalSent,
expected: this.streamingSession.totalExpected,
buffered: this.streamingSession.batchBuffer.length,
};
}
terminate() {
console.debug("Terminating Vector Worker Manager...");
this.cancelAllSearches("Worker terminated"); // Cancel pending searches
if (this.streamingSession?.isActive) {
this.endStreamingSession();
}
if (this.worker) {
this.worker.terminate();
this.worker = null;
}
this.isInitialized = false;
this.readyPromise = null; // Reset init promise
this.readyPromise = null;
this.progressCallback = null;
// Clear the static instance? Or assume app lifecycle handles this?
// VectorWorkerManager.instance = null; // Uncomment if needed
}
}
@@ -0,0 +1,230 @@
import { delay } from "@/seqta/utils/delay";
export interface RateLimiterConfig {
minDelay: number; // Minimum delay between requests (ms)
maxDelay: number; // Maximum delay between requests (ms)
baseDelay: number; // Base delay between requests (ms)
backoffMultiplier: number; // Exponential backoff multiplier
maxRetries: number; // Maximum retries for failed requests
adaptiveBatchSize?: boolean; // Enable adaptive batch sizing
minBatchSize?: number; // Minimum batch size
maxBatchSize?: number; // Maximum batch size
baseBatchSize?: number; // Starting batch size
}
export interface RateLimiterState {
currentDelay: number;
failedRequests: number;
lastSuccessTime: number;
currentBatchSize?: number;
}
export class RateLimiter {
private config: RateLimiterConfig;
private state: RateLimiterState;
constructor(config: RateLimiterConfig, initialState?: Partial<RateLimiterState>) {
this.config = config;
this.state = {
currentDelay: config.baseDelay,
failedRequests: 0,
lastSuccessTime: Date.now(),
currentBatchSize: config.baseBatchSize || 50,
...initialState,
};
}
/**
* Wait for the appropriate delay before making the next request
*/
async waitForNext(): Promise<void> {
await delay(this.state.currentDelay);
}
/**
* Record a successful request and adjust delays accordingly
*/
recordSuccess(responseTime?: number): void {
this.state.lastSuccessTime = Date.now();
this.state.failedRequests = Math.max(0, this.state.failedRequests - 1);
if (responseTime !== undefined) {
this.state.currentDelay = this.calculateAdaptiveDelay(responseTime);
if (this.config.adaptiveBatchSize && this.state.currentBatchSize !== undefined) {
this.state.currentBatchSize = this.calculateAdaptiveBatchSize(responseTime);
}
}
}
/**
* Record a failed request and increase delays
*/
recordFailure(): void {
this.state.failedRequests++;
this.state.currentDelay = Math.min(
this.state.currentDelay * this.config.backoffMultiplier,
this.config.maxDelay
);
if (this.config.adaptiveBatchSize && this.state.currentBatchSize !== undefined) {
this.state.currentBatchSize = Math.max(
Math.floor(this.state.currentBatchSize * 0.7),
this.config.minBatchSize || 10
);
}
}
/**
* Execute a request with automatic retry logic
*/
async executeWithRetry<T>(
requestFn: () => Promise<T>,
retryCount = 0
): Promise<T | null> {
try {
const startTime = Date.now();
const result = await requestFn();
const responseTime = Date.now() - startTime;
this.recordSuccess(responseTime);
return result;
} catch (error) {
console.warn(`Request failed (attempt ${retryCount + 1}):`, error);
if (retryCount < this.config.maxRetries) {
this.recordFailure();
await this.waitForNext();
return this.executeWithRetry(requestFn, retryCount + 1);
}
this.recordFailure();
return null;
}
}
/**
* Get current state for persistence
*/
getState(): RateLimiterState {
return { ...this.state };
}
/**
* Update state from persisted data
*/
setState(state: Partial<RateLimiterState>): void {
this.state = { ...this.state, ...state };
}
/**
* Get current batch size (if adaptive batching is enabled)
*/
getCurrentBatchSize(): number {
return this.state.currentBatchSize || this.config.baseBatchSize || 50;
}
/**
* Get current delay
*/
getCurrentDelay(): number {
return this.state.currentDelay;
}
/**
* Get failure count
*/
getFailureCount(): number {
return this.state.failedRequests;
}
private calculateAdaptiveDelay(responseTime: number): number {
const { currentDelay, failedRequests, lastSuccessTime } = this.state;
const timeSinceLastSuccess = Date.now() - lastSuccessTime;
// Increase delay if we're seeing failures or slow responses
if (failedRequests > 0 || responseTime > 2000) {
return Math.min(currentDelay * this.config.backoffMultiplier, this.config.maxDelay);
}
// Decrease delay if we're doing well and it's been a while since last success
if (responseTime < 500 && timeSinceLastSuccess > 10000) {
return Math.max(currentDelay * 0.8, this.config.minDelay);
}
return currentDelay;
}
private calculateAdaptiveBatchSize(responseTime: number): number {
if (!this.config.adaptiveBatchSize || this.state.currentBatchSize === undefined) {
return this.state.currentBatchSize || this.config.baseBatchSize || 50;
}
const { currentBatchSize, failedRequests } = this.state;
// Reduce batch size if we're seeing failures or slow responses
if (failedRequests > 2 || responseTime > 3000) {
return Math.max(
Math.floor(currentBatchSize * 0.7),
this.config.minBatchSize || 10
);
}
// Increase batch size if we're doing well
if (failedRequests === 0 && responseTime < 1000) {
return Math.min(
Math.floor(currentBatchSize * 1.2),
this.config.maxBatchSize || 100
);
}
return currentBatchSize;
}
}
/**
* Predefined rate limiter configurations for different job types
*/
export const RATE_LIMITER_PRESETS = {
MESSAGES: {
minDelay: 50,
maxDelay: 5000,
baseDelay: 200,
backoffMultiplier: 1.5,
maxRetries: 3,
adaptiveBatchSize: true,
minBatchSize: 10,
maxBatchSize: 100,
baseBatchSize: 50,
} as RateLimiterConfig,
NOTIFICATIONS: {
minDelay: 100,
maxDelay: 3000,
baseDelay: 150,
backoffMultiplier: 1.4,
maxRetries: 2,
adaptiveBatchSize: false,
} as RateLimiterConfig,
FORUMS: {
minDelay: 75,
maxDelay: 2000,
baseDelay: 100,
backoffMultiplier: 1.3,
maxRetries: 2,
adaptiveBatchSize: true,
minBatchSize: 5,
maxBatchSize: 50,
baseBatchSize: 25,
} as RateLimiterConfig,
SUBJECTS: {
minDelay: 50,
maxDelay: 1000,
baseDelay: 75,
backoffMultiplier: 1.2,
maxRetries: 1,
adaptiveBatchSize: false,
} as RateLimiterConfig,
};