feat: improved job indexing

This commit is contained in:
SethBurkart123
2025-05-05 17:09:38 +10:00
parent cd247cfde4
commit ec42f1bb27
5 changed files with 313 additions and 368 deletions
@@ -1,4 +1,4 @@
import { clear, getAll, put, remove } from "./db";
import { clear, getAll, get, put, remove } from "./db";
import { jobs } from "./jobs";
import { renderComponentMap } from "./renderComponents";
import type { HydratedIndexItem, IndexItem, Job, JobContext } from "./types";
@@ -9,6 +9,17 @@ const LOCK_KEY = "bsq-indexer-lock";
const HEARTBEAT_INTERVAL = 10000;
const LOCK_TIMEOUT = 20000;
/* ─────────── Progressmeta helpers ─────────── */
async function loadProgress<T = any>(jobId: string): Promise<T | undefined> {
const rec = await get(META_STORE, `progress:${jobId}`);
return rec?.progress as T | undefined;
}
async function saveProgress<T = any>(jobId: string, progress: T): Promise<void> {
await put(META_STORE, { jobId, progress }, `progress:${jobId}`);
}
/* ───────────────────────────────────────────── */
let heartbeatTimer: ReturnType<typeof setInterval> | null = null;
function shouldRun(job: Job, lastRun?: number): boolean {
@@ -132,26 +143,27 @@ export async function runIndexing(): Promise<void> {
continue;
}
// These DB operations happen on the main thread (acceptable per request)
const getStoredItems = async () => await getAll(jobId);
const setStoredItems = async (items: IndexItem[]) => {
await clear(jobId);
// Add validation before putting
const getStoredItems = async (storeId?: string) => await getAll(storeId ?? jobId);
const setStoredItems = async (items: IndexItem[], storeId?: string) => {
const targetStore = storeId ?? jobId;
await clear(targetStore);
const validItems = items.filter(i => i && i.id);
if (validItems.length !== items.length) {
console.warn(`[Indexer Job ${jobId}] Filtered out ${items.length - validItems.length} invalid items before storing.`);
console.warn(`[Indexer Job ${jobId} -> Store ${targetStore}] Filtered out ${items.length - validItems.length} invalid items before storing.`);
}
await Promise.all(validItems.map((i) => put(jobId, i, i.id)));
await Promise.all(validItems.map((i) => put(targetStore, i, i.id)));
};
const addItem = async (item: IndexItem) => {
if (item && item.id) { // Add validation
await put(jobId, item, item.id);
const addItem = async (item: IndexItem, storeId?: string) => {
const targetStore = storeId ?? jobId;
if (item && item.id) {
await put(targetStore, item, item.id);
} else {
console.warn(`[Indexer Job ${jobId}] Attempted to add invalid item:`, item);
console.warn(`[Indexer Job ${jobId} -> Store ${targetStore}] Attempted to add invalid item:`, item);
}
};
const removeItem = async (id: string) => {
await remove(jobId, id);
const removeItem = async (id: string, storeId?: string) => {
const targetStore = storeId ?? jobId;
await remove(targetStore, id);
};
const ctx: JobContext = {
@@ -159,6 +171,8 @@ export async function runIndexing(): Promise<void> {
setStoredItems,
addItem,
removeItem,
getProgress: () => loadProgress(jobId),
setProgress: (p) => saveProgress(jobId, p),
};
console.debug(`%c[Indexer] Running job "${jobId}"...`, "color: #4ea1ff");
@@ -195,11 +209,11 @@ export async function runIndexing(): Promise<void> {
allItemsFromJobs.push(...hydratedItems);
console.debug(
`%c[Indexer] ${job.label}: ${newItemsRaw.length} new items fetched, ${merged.length} total stored (non-vector).`,
`%c[Indexer] ${job.label}: ${newItemsRaw.length} new items from run, ${merged.length} total stored in '${jobId}' store (non-vector).`,
"color: #00c46f",
);
} catch (err) {
console.debug(`%c[Indexer] ${job.label} failed:`, "color: red");
console.debug(`%c[Indexer] Job ${job.label} failed:`, "color: red");
console.error(err);
}
@@ -1,350 +1,8 @@
import type { Job } from "./types";
import type { IndexItem } from "./types";
interface MessageNotification {
notificationID: number;
type: "message";
message: {
subtitle: string;
messageID: number;
title: string;
};
timestamp: string;
}
interface AssessmentNotification {
notificationID: number;
type: "coneqtassessments";
coneqtAssessments: {
programmeID: number;
metaclassID: number;
subtitle: string;
term: string;
title: string;
assessmentID: number;
subjectCode: string;
};
timestamp: string;
}
type Notification = MessageNotification | AssessmentNotification;
interface MessageListResponse {
payload: {
hasMore: boolean;
messages: {
date: string;
attachments: boolean;
attachmentCount: number;
read: number;
sender: string;
sender_id: number;
sender_type: string;
subject: string;
id: number;
participants: Array<{
name: string;
photo: string;
type: string;
}>;
}[];
ts: string;
};
status: string;
}
interface MessageContentResponse {
payload: {
date: string;
blind: boolean;
read: boolean;
subject: string;
sender_type: string;
sender_id: number;
starred: boolean;
contents: string;
sender: string;
files: any[];
id: number;
participants: Array<{
read: number;
name: string;
photo: string;
id: number;
type: string;
}>;
};
status: string;
}
// Helper to strip HTML tags from text
function stripHtmlTags(html: string): string {
return html.replace(/<[^>]*>/g, "");
}
// Helper to fetch messages with pagination
async function fetchMessages(
offset: number = 0,
limit: number = 100,
): Promise<MessageListResponse> {
const response = await fetch(
`${location.origin}/seqta/student/load/message`,
{
method: "POST",
credentials: "include",
headers: {
"Content-Type": "application/json; charset=utf-8",
},
body: JSON.stringify({
searchValue: "",
sortBy: "date",
sortOrder: "desc",
action: "list",
label: "inbox",
offset,
limit,
datetimeUntil: null,
}),
},
);
return await response.json();
}
// Helper to fetch message content
async function fetchMessageContent(
messageId: number,
): Promise<MessageContentResponse> {
const response = await fetch(
`${location.origin}/seqta/student/load/message`,
{
method: "POST",
credentials: "include",
headers: {
"Content-Type": "application/json; charset=utf-8",
},
body: JSON.stringify({
action: "message",
id: messageId,
}),
},
);
return await response.json();
}
// Helper to fetch notifications
async function fetchNotifications(): Promise<Notification[]> {
const response = await fetch(`${location.origin}/seqta/student/heartbeat?`, {
method: "POST",
headers: {
"Content-Type": "application/json; charset=utf-8",
},
body: JSON.stringify({
timestamp: "1970-01-01 00:00:00.0",
hash: "#?page=/notifications",
}),
});
const json = await response.json();
return json.notifications ?? [];
}
import { messagesJob } from "./jobs/messages";
import { assessmentsJob } from "./jobs/assessments";
export const jobs: Record<string, Job> = {
messages: {
id: "messages",
label: "Messages",
renderComponentId: "message",
frequency: { type: "expiry", afterMs: 1000 }, // every 5 minutes
run: async (ctx) => {
// Get existing items first
const existing = await ctx.getStoredItems();
const existingIds = new Set(existing.map((i) => i.id));
const newItems: IndexItem[] = [];
let offset = 0;
const limit = 100;
let hasMore = true;
let consecutiveExisting = 0;
// Fetch all messages with pagination
while (hasMore) {
try {
const response = await fetchMessages(offset, limit);
if (response.status !== "200") {
console.error("Failed to fetch messages:", response);
break;
}
const messages = response.payload.messages;
hasMore = response.payload.hasMore;
// Process each message
for (const message of messages) {
const id = message.id.toString();
// Skip if we already have this message
if (existingIds.has(id)) {
consecutiveExisting++;
// If we've found 20 consecutive existing messages, assume we've caught up
if (consecutiveExisting >= 20) {
console.debug(
"[Messages Job] Found 20 consecutive existing messages, stopping fetch",
);
hasMore = false;
break;
}
continue;
}
// Reset consecutive counter when we find a new message
consecutiveExisting = 0;
try {
// Fetch message content
const contentResponse = await fetchMessageContent(message.id);
if (contentResponse.status !== "200") {
console.error(
"Failed to fetch message content:",
contentResponse,
);
continue;
}
const content = stripHtmlTags(contentResponse.payload.contents);
newItems.push({
id,
text: message.subject,
category: "messages",
content: `From: ${message.sender}\n\n${content}`,
dateAdded: new Date(message.date).getTime(),
metadata: {
messageId: message.id,
author: message.sender,
senderId: message.sender_id,
senderType: message.sender_type,
timestamp: message.date,
hasAttachments: message.attachments,
attachmentCount: message.attachmentCount,
read: message.read === 1,
},
actionId: "message",
renderComponentId: "message",
});
// Add to existingIds as we process to prevent duplicates in the same run
existingIds.add(id);
} catch (error) {
console.error("Error fetching message content:", error);
continue;
}
}
offset += limit;
} catch (error) {
console.error("Error fetching messages:", error);
break;
}
// Small delay to avoid overwhelming the server
await new Promise((resolve) => setTimeout(resolve, 100));
}
console.debug(`[Messages Job] Found ${newItems.length} new messages`);
return newItems;
},
purge: (items) => {
const cutoff = Date.now() - 4 * 12 * 30 * 24 * 60 * 60 * 1000;
return items.filter((i) => i.dateAdded >= cutoff);
},
},
assessments: {
id: "assessments",
label: "Assessments",
renderComponentId: "assessment",
frequency: { type: "expiry", afterMs: 1000 * 60 * 15 }, // every 15 minutes
run: async (ctx) => {
const notifications = await fetchNotifications();
const assessmentNotifications = notifications.filter(
(n): n is MessageNotification | AssessmentNotification =>
n.type === "coneqtassessments" ||
(n.type === "message" &&
n.message.title.toLowerCase().includes("assessment")),
);
const existing = await ctx.getStoredItems();
const existingIds = new Set(existing.map((i) => i.id));
const newItems: IndexItem[] = [];
for (const notification of assessmentNotifications) {
const id = notification.notificationID.toString();
if (existingIds.has(id)) continue;
if (notification.type === "coneqtassessments") {
const { coneqtAssessments: assessment } = notification;
newItems.push({
id,
text: assessment.title,
category: "assessments",
content: assessment.subtitle,
dateAdded: new Date(notification.timestamp).getTime(),
metadata: {
assessmentId: assessment.assessmentID,
subject: assessment.subjectCode,
term: assessment.term,
programmeId: assessment.programmeID,
metaclassId: assessment.metaclassID,
timestamp: notification.timestamp,
},
actionId: "assessment",
renderComponentId: "assessment",
});
} else {
// Handle message-based assessments
const { message } = notification;
newItems.push({
id,
text: message.title,
category: "assessments",
content: `From: ${message.subtitle}`,
dateAdded: new Date(notification.timestamp).getTime(),
metadata: {
messageId: message.messageID,
author: message.subtitle,
timestamp: notification.timestamp,
isMessageBased: true,
},
actionId: "assessment",
renderComponentId: "assessment",
});
}
}
return newItems;
},
purge: (items) => {
// Keep assessments from the current year
const date = new Date();
date.setMonth(0); // January
date.setDate(1);
date.setHours(0);
date.setMinutes(0);
date.setSeconds(0);
const cutoff = date.getTime();
return items.filter((i) => i.dateAdded >= cutoff);
},
},
// We can add more job types here as needed:
// - notices
// - timetable changes
// - homework
// etc.
messages: messagesJob,
assessments: assessmentsJob,
};
@@ -0,0 +1,138 @@
import type { Job, IndexItem } from "../types";
/* ------------- Notification types ------------- */
interface MessageNotification {
notificationID: number;
type: "message";
message: { subtitle: string; messageID: number; title: string };
timestamp: string;
}
interface AssessmentNotification {
notificationID: number;
type: "coneqtassessments";
coneqtAssessments: {
programmeID: number;
metaclassID: number;
subtitle: string;
term: string;
title: string;
assessmentID: number;
subjectCode: string;
};
timestamp: string;
}
type Notification = MessageNotification | AssessmentNotification;
/* ------------- Progress model ------------- */
interface AssessmentsProgress {
lastTs: number; // ms since epoch of last processed notification
}
/* ------------- Helpers ------------- */
const fetchNotifications = async () => {
const res = await fetch(`${location.origin}/seqta/student/heartbeat?`, {
method: "POST",
headers: { "Content-Type": "application/json; charset=utf-8" },
body: JSON.stringify({
timestamp: "1970-01-01 00:00:00.0",
hash: "#?page=/notifications",
}),
});
const json = await res.json();
return (json.notifications ?? []) as Notification[];
};
/* ------------- Job ------------- */
export const assessmentsJob: Job = {
id: "assessments",
label: "Assessments",
renderComponentId: "assessment",
frequency: { type: "expiry", afterMs: 15 * 60 * 1000 },
run: async (ctx) => {
const progress =
(await ctx.getProgress<AssessmentsProgress>()) ?? { lastTs: 0 };
let notifications: Notification[];
try {
notifications = await fetchNotifications();
} catch (e) {
console.error("[Assessments job] fetch failed:", e);
return [];
}
const notificationIsIndexed = async (id: string): Promise<boolean> => {
const [inAssessments, inMessages] = await Promise.all([
ctx.getStoredItems("assessments").then((items) => items.some((i) => i.id === id)),
ctx.getStoredItems("messages").then((items) => items.some((i) => i.id === id)),
]);
return inAssessments || inMessages;
};
const items: IndexItem[] = [];
for (const notif of notifications) {
const id = notif.notificationID.toString();
if (await notificationIsIndexed(id)) continue;
if (notif.type === "coneqtassessments") {
const a = notif.coneqtAssessments;
items.push({
id,
text: a.title,
category: "assessments",
content: a.subtitle,
dateAdded: new Date(notif.timestamp).getTime(),
metadata: {
assessmentId: a.assessmentID,
subject: a.subjectCode,
term: a.term,
programmeId: a.programmeID,
metaclassId: a.metaclassID,
timestamp: notif.timestamp,
},
actionId: "assessment",
renderComponentId: "assessment",
});
} else {
await ctx.addItem(
{
id,
text: notif.message.title,
category: "messages",
content: `From: ${notif.message.subtitle}`,
dateAdded: new Date(notif.timestamp).getTime(),
metadata: {
messageId: notif.message.messageID,
author: notif.message.subtitle,
timestamp: notif.timestamp,
isAssessmentNotification: true,
},
actionId: "message",
renderComponentId: "message",
},
"messages"
);
}
}
if (items.length) {
const latest = Math.max(
...items.map((i) => i.dateAdded),
progress.lastTs,
);
await ctx.setProgress({ lastTs: latest });
}
return items;
},
purge: (items) => {
const date = new Date();
date.setMonth(0, 1);
date.setHours(0, 0, 0, 0);
return items.filter((i) => i.dateAdded >= date.getTime());
},
};
@@ -0,0 +1,133 @@
import type { Job, IndexItem } from "../types";
const stripHtmlTags = (html: string) => html.replace(/<[^>]*>/g, "");
const fetchMessages = async (offset = 0, limit = 100) => {
const res = await fetch(`${location.origin}/seqta/student/load/message`, {
method: "POST",
credentials: "include",
headers: { "Content-Type": "application/json; charset=utf-8" },
body: JSON.stringify({
searchValue: "",
sortBy: "date",
sortOrder: "desc",
action: "list",
label: "inbox",
offset,
limit,
datetimeUntil: null,
}),
});
return res.json() as Promise<{
payload: { hasMore: boolean; messages: any[]; ts: string };
status: string;
}>;
};
const fetchMessageContent = async (id: number) => {
const res = await fetch(`${location.origin}/seqta/student/load/message`, {
method: "POST",
credentials: "include",
headers: { "Content-Type": "application/json; charset=utf-8" },
body: JSON.stringify({ action: "message", id }),
});
return res.json() as Promise<{
payload: { contents: string };
status: string;
}>;
};
interface MessagesProgress {
offset: number;
done: boolean;
}
export const messagesJob: Job = {
id: "messages",
label: "Messages",
renderComponentId: "message",
frequency: { type: "expiry", afterMs: 1000 * 60 * 60 * 24 },
run: async (ctx) => {
const limit = 100;
const progress =
(await ctx.getProgress<MessagesProgress>()) ?? { offset: 0, done: false };
const existingIds = new Set(
(await ctx.getStoredItems()).map((i) => i.id),
);
let consecutiveExisting = 0;
while (!progress.done) {
let list;
try {
list = await fetchMessages(progress.offset, limit);
} catch (e) {
console.error("[Messages job] list fetch failed:", e);
break;
}
if (list.status !== "200") break;
for (const msg of list.payload.messages) {
const id = msg.id.toString();
if (existingIds.has(id)) {
consecutiveExisting += 1;
if (consecutiveExisting >= 20) {
progress.done = true;
break;
}
continue;
}
consecutiveExisting = 0;
let full;
try {
full = await fetchMessageContent(msg.id);
} catch (e) {
console.error(`[Messages job] content fetch failed (id ${id}):`, e);
continue;
}
if (full.status !== "200") continue;
const item: IndexItem = {
id,
text: msg.subject,
category: "messages",
content: `From: ${msg.sender}\n\n${stripHtmlTags(full.payload.contents)}`,
dateAdded: new Date(msg.date).getTime(),
metadata: {
messageId: msg.id,
author: msg.sender,
senderId: msg.sender_id,
senderType: msg.sender_type,
timestamp: msg.date,
hasAttachments: msg.attachments,
attachmentCount: msg.attachmentCount,
read: msg.read === 1,
},
actionId: "message",
renderComponentId: "message",
};
await ctx.addItem(item);
existingIds.add(id);
}
if (!list.payload.hasMore) progress.done = true;
progress.offset += limit;
await ctx.setProgress(progress);
}
if (progress.done) await ctx.setProgress({ offset: 0, done: false });
return [];
},
purge: (items) => {
const fourYears = Date.now() - 4 * 365 * 24 * 60 * 60 * 1000;
return items.filter((i) => i.dateAdded >= fourYears);
},
};
@@ -21,10 +21,12 @@ export type Frequency =
| { type: "expiry"; afterMs: number };
export interface JobContext {
getStoredItems: () => Promise<IndexItem[]>;
setStoredItems: (items: IndexItem[]) => Promise<void>;
addItem: (item: IndexItem) => Promise<void>;
removeItem: (id: string) => Promise<void>;
getStoredItems: (storeId?: string) => Promise<IndexItem[]>;
setStoredItems: (items: IndexItem[], storeId?: string) => Promise<void>;
addItem: (item: IndexItem, storeId?: string) => Promise<void>;
removeItem: (id: string, storeId?: string) => Promise<void>;
getProgress: <T = any>() => Promise<T | undefined>;
setProgress: <T = any>(progress: T) => Promise<void>;
}
export interface Job {