feat: improve results and performance (syncronous)

This commit is contained in:
SethBurkart123
2025-04-13 10:58:05 +10:00
parent 9562368157
commit 454ab283ab
10 changed files with 369 additions and 377 deletions
+1 -1
View File
@@ -75,7 +75,7 @@
"@uiw/codemirror-extensions-color": "^4.23.10", "@uiw/codemirror-extensions-color": "^4.23.10",
"@uiw/codemirror-theme-github": "^4.23.10", "@uiw/codemirror-theme-github": "^4.23.10",
"autoprefixer": "^10.4.21", "autoprefixer": "^10.4.21",
"client-vector-search": "^0.2.0", "client-vector-search": "../client-vector-search",
"codemirror": "^6.0.1", "codemirror": "^6.0.1",
"color": "^5.0.0", "color": "^5.0.0",
"dompurify": "^3.2.4", "dompurify": "^3.2.4",
@@ -11,6 +11,7 @@
import Calculator from './components/Calculator.svelte'; import Calculator from './components/Calculator.svelte';
import { actionMap } from './indexing/actions'; import { actionMap } from './indexing/actions';
import type { IndexItem, HydratedIndexItem } from './indexing/types'; import type { IndexItem, HydratedIndexItem } from './indexing/types';
import debounce from 'lodash/debounce';
const { const {
transparencyEffects, transparencyEffects,
@@ -127,9 +128,15 @@
isLoading = false; isLoading = false;
}; };
const debouncedPerformSearch = debounce(performSearch, 10);
$effect(() => { $effect(() => {
if (commandPalleteOpen) { if (commandPalleteOpen) {
performSearch(); if (searchTerm === '') {
performSearch();
} else {
debouncedPerformSearch();
}
tick().then(() => searchbar?.focus()); tick().then(() => searchbar?.focus());
} else { } else {
searchTerm = ''; searchTerm = '';
@@ -139,13 +146,6 @@
} }
}); });
$effect(() => {
if (commandPalleteOpen && searchTerm !== prevSearchTerm) {
prevSearchTerm = searchTerm;
performSearch();
}
});
$effect(() => { $effect(() => {
if (combinedResults.length === 0 && calculatorResult && commandPalleteOpen) { if (combinedResults.length === 0 && calculatorResult && commandPalleteOpen) {
selectedIndex = 0; selectedIndex = 0;
@@ -31,7 +31,10 @@
// Process the input with debounce to avoid unnecessary calculations // Process the input with debounce to avoid unnecessary calculations
const processInput = (input: string) => { const processInput = (input: string) => {
try { try {
if (!input.trim()) { if (
!input.trim() ||
(input.trim().length <= 2 && !/\d/.test(input))
) {
result = null; result = null;
inputUnit = ''; inputUnit = '';
outputUnit = ''; outputUnit = '';
@@ -10,12 +10,10 @@ import renderSvelte from "@/interface/main";
import SearchBar from "../SearchBar.svelte"; import SearchBar from "../SearchBar.svelte";
import styles from "./styles.css?inline"; import styles from "./styles.css?inline";
import { unmount } from "svelte"; import { unmount } from "svelte";
import { loadDynamicItems } from "../dynamicSearch";
import { waitForElm } from "@/seqta/utils/waitForElm"; import { waitForElm } from "@/seqta/utils/waitForElm";
import { loadAllStoredItems, runIndexing } from "../indexing/indexer"; import { runIndexing } from "../indexing/indexer";
//import { initVectorSearch } from "../search/vector/vectorSearch";
import { VectorWorkerManager } from "../indexing/worker/vectorWorkerManager"; import { VectorWorkerManager } from "../indexing/worker/vectorWorkerManager";
import VectorSearchWorkerManager from "../search/vector/vectorSearch"; import { initVectorSearch } from "../search/vector/vectorSearch";
const settings = defineSettings({ const settings = defineSettings({
searchHotkey: stringSetting({ searchHotkey: stringSetting({
@@ -56,26 +54,6 @@ class GlobalSearchPlugin extends BasePlugin<typeof settings> {
const settingsInstance = new GlobalSearchPlugin(); const settingsInstance = new GlobalSearchPlugin();
const updateDynamicItemsFromIndex = async () => {
const indexedItems = await loadAllStoredItems();
loadDynamicItems(indexedItems);
console.log(`Loaded ${indexedItems.length} indexed items into search.`);
// Process items through vector search worker
const workerManager = VectorWorkerManager.getInstance();
await workerManager.processItems(indexedItems, (progress) => {
if (progress.status === "started") {
console.debug(`Starting vector processing of ${progress.total} items...`);
} else if (progress.status === "processing") {
console.debug(`Vectorized ${progress.processed}/${progress.total} items`);
} else if (progress.status === "complete") {
console.debug("Vector processing complete:", progress.message);
}
});
window.dispatchEvent(new CustomEvent("dynamic-items-updated"));
};
const globalSearchPlugin: Plugin<typeof settings> = { const globalSearchPlugin: Plugin<typeof settings> = {
id: "global-search", id: "global-search",
name: "Global Search", name: "Global Search",
@@ -88,13 +66,12 @@ const globalSearchPlugin: Plugin<typeof settings> = {
run: async (api) => { run: async (api) => {
let app: any; let app: any;
VectorSearchWorkerManager.getInstance(); initVectorSearch();
// Run initial indexing and update dynamic items // Run initial indexing and update dynamic items
if (api.settings.runIndexingOnLoad) { if (api.settings.runIndexingOnLoad) {
setTimeout(async () => { setTimeout(async () => {
await runIndexing(); await runIndexing();
await updateDynamicItemsFromIndex();
}, 2000); // Delay initial indexing to let page load }, 2000); // Delay initial indexing to let page load
} }
@@ -156,7 +133,6 @@ const globalSearchPlugin: Plugin<typeof settings> = {
// Clean up workers // Clean up workers
VectorWorkerManager.getInstance().terminate(); VectorWorkerManager.getInstance().terminate();
VectorSearchWorkerManager.getInstance().terminate();
unmount(app); unmount(app);
}; };
}, },
@@ -2,7 +2,7 @@ import { clear, getAll, put, remove } from "./db";
import { jobs } from "./jobs"; import { jobs } from "./jobs";
import { renderComponentMap } from "./renderComponents"; import { renderComponentMap } from "./renderComponents";
import type { HydratedIndexItem, IndexItem, Job, JobContext } from "./types"; import type { HydratedIndexItem, IndexItem, Job, JobContext } from "./types";
import { EmbeddingIndex, getEmbedding, initializeModel } from "client-vector-search"; import { VectorWorkerManager } from "./worker/vectorWorkerManager";
const META_STORE = "meta"; const META_STORE = "meta";
const LOCK_KEY = "bsq-indexer-lock"; const LOCK_KEY = "bsq-indexer-lock";
@@ -11,83 +11,6 @@ const LOCK_TIMEOUT = 20000;
let heartbeatTimer: ReturnType<typeof setInterval> | null = null; let heartbeatTimer: ReturnType<typeof setInterval> | null = null;
let vectorIndex: EmbeddingIndex | null = null;
let isInitialized = false;
async function initVectorSearch() {
if (isInitialized) return;
try {
await initializeModel();
vectorIndex = new EmbeddingIndex([]);
// Load existing items from IndexedDB
const stored = await vectorIndex.getAllObjectsFromIndexedDB();
if (stored.length > 0) {
stored.forEach((item) => vectorIndex!.add(item));
console.debug("Vector index loaded from IndexedDB");
}
isInitialized = true;
} catch (e) {
console.error("Failed to initialize vector search:", e);
throw e;
}
}
async function vectorizeItem(
item: HydratedIndexItem,
): Promise<HydratedIndexItem & { embedding: number[] }> {
const textToEmbed = [
item.text,
item.content,
item.category,
item.metadata?.author,
item.metadata?.subject,
]
.filter(Boolean)
.join(" ");
const embedding = await getEmbedding(textToEmbed);
return { ...item, embedding };
}
async function processItems(items: HydratedIndexItem[]) {
if (!vectorIndex) await initVectorSearch();
const unprocessedItems = items.filter((item) => {
try {
return !vectorIndex!.get({ id: item.id });
} catch {
return true;
}
});
if (unprocessedItems.length === 0) {
console.debug("No new items to vectorize");
return;
}
console.debug(`Vectorizing ${unprocessedItems.length} new items...`);
// Process in batches to avoid UI freeze
const BATCH_SIZE = 5;
for (let i = 0; i < unprocessedItems.length; i += BATCH_SIZE) {
const batch = unprocessedItems.slice(i, i + BATCH_SIZE);
const vectorized = await Promise.all(batch.map(vectorizeItem));
for (const item of vectorized) {
vectorIndex!.add(item);
}
// Save periodically to avoid losing progress
await vectorIndex!.saveIndex("indexedDB");
// Log progress
console.debug(
`Vectorized ${Math.min(i + BATCH_SIZE, unprocessedItems.length)}/${unprocessedItems.length} items`,
);
}
}
function shouldRun(job: Job, lastRun?: number): boolean { function shouldRun(job: Job, lastRun?: number): boolean {
const now = Date.now(); const now = Date.now();
@@ -133,29 +56,43 @@ function stopHeartbeat() {
localStorage.removeItem(LOCK_KEY); localStorage.removeItem(LOCK_KEY);
} }
function dispatchProgress(completed: number, total: number, indexing: boolean) { function dispatchProgress(completed: number, total: number, indexing: boolean, status?: string, detail?: string) {
const event = new CustomEvent("indexing-progress", { const event = new CustomEvent("indexing-progress", {
detail: { completed, total, indexing }, detail: { completed, total, indexing, status, detail },
}); });
window.dispatchEvent(event); window.dispatchEvent(event);
} }
export async function loadAllStoredItems(): Promise<HydratedIndexItem[]> { export async function loadAllStoredItems(): Promise<HydratedIndexItem[]> {
const all: HydratedIndexItem[] = []; const all: HydratedIndexItem[] = [];
const jobIds = Object.keys(jobs);
for (const jobId in jobs) { for (const jobId of jobIds) {
const items = await getAll(jobId); try {
const job = jobs[jobId]; const items = await getAll(jobId) as IndexItem[];
const renderComponent = renderComponentMap[job.renderComponentId]; const job = jobs[jobId];
const renderComponent = renderComponentMap[job.renderComponentId];
for (const item of items) { if (!renderComponent) {
all.push({ console.warn(`Render component not found for job ${jobId} (ID: ${job.renderComponentId})`);
...item, }
renderComponent,
}); for (const item of items) {
} // Ensure item has all required fields before pushing
if (item && item.id && item.text && item.category && item.actionId && job.renderComponentId) {
all.push({
...item,
renderComponent: renderComponent || undefined, // Assign undefined if not found
});
} else {
console.warn(`Skipping invalid item from job ${jobId}:`, item);
}
}
} catch (error) {
console.error(`Error loading items for job ${jobId}:`, error);
}
} }
console.debug(`[Indexer] Loaded ${all.length} items from non-vector storage.`);
return all; return all;
} }
@@ -173,11 +110,15 @@ export async function runIndexing(): Promise<void> {
const jobIds = Object.keys(jobs); const jobIds = Object.keys(jobs);
let completedJobs = 0; let completedJobs = 0;
dispatchProgress(completedJobs, jobIds.length, true); // Add an extra step for vectorization
const totalSteps = jobIds.length + 1;
dispatchProgress(completedJobs, totalSteps, true, "Starting jobs");
const allNewItems: HydratedIndexItem[] = []; const allItemsFromJobs: HydratedIndexItem[] = [];
// --- Step 1: Run Fetching/Storing Jobs (Main Thread) ---
for (const jobId of jobIds) { for (const jobId of jobIds) {
dispatchProgress(completedJobs, totalSteps, true, `Running job: ${jobs[jobId].label}`);
const job = jobs[jobId]; const job = jobs[jobId];
const lastRun = await getLastRunMeta(jobId); const lastRun = await getLastRunMeta(jobId);
@@ -187,17 +128,27 @@ export async function runIndexing(): Promise<void> {
"color: gray", "color: gray",
); );
completedJobs++; completedJobs++;
dispatchProgress(completedJobs, jobIds.length, true); dispatchProgress(completedJobs, totalSteps, true, `Skipped job: ${job.label}`);
continue; continue;
} }
// These DB operations happen on the main thread (acceptable per request)
const getStoredItems = async () => await getAll(jobId); const getStoredItems = async () => await getAll(jobId);
const setStoredItems = async (items: IndexItem[]) => { const setStoredItems = async (items: IndexItem[]) => {
await clear(jobId); await clear(jobId);
await Promise.all(items.map((i) => put(jobId, i, i.id))); // Add validation before putting
const validItems = items.filter(i => i && i.id);
if (validItems.length !== items.length) {
console.warn(`[Indexer Job ${jobId}] Filtered out ${items.length - validItems.length} invalid items before storing.`);
}
await Promise.all(validItems.map((i) => put(jobId, i, i.id)));
}; };
const addItem = async (item: IndexItem) => { const addItem = async (item: IndexItem) => {
await put(jobId, item, item.id); if (item && item.id) { // Add validation
await put(jobId, item, item.id);
} else {
console.warn(`[Indexer Job ${jobId}] Attempted to add invalid item:`, item);
}
}; };
const removeItem = async (id: string) => { const removeItem = async (id: string) => {
await remove(jobId, id); await remove(jobId, id);
@@ -213,24 +164,35 @@ export async function runIndexing(): Promise<void> {
console.debug(`%c[Indexer] Running job "${jobId}"...`, "color: #4ea1ff"); console.debug(`%c[Indexer] Running job "${jobId}"...`, "color: #4ea1ff");
try { try {
const newItems = await job.run(ctx); const newItemsRaw = await job.run(ctx);
const stored = await getStoredItems(); const stored = await getStoredItems();
let merged = mergeItems(stored, newItems); let merged = mergeItems(stored, newItemsRaw);
if (job.purge) merged = job.purge(merged); if (job.purge) merged = job.purge(merged);
await setStoredItems(merged); await setStoredItems(merged); // Store merged non-vector data
await updateLastRunMeta(jobId); await updateLastRunMeta(jobId);
// Add to our collection of new items for vector processing // Hydrate items for vector processing
const hydratedItems = merged.map((item) => ({ const renderComponent = renderComponentMap[job.renderComponentId];
...item, if (!renderComponent) {
renderComponent: renderComponentMap[job.renderComponentId], console.warn(`Render component not found for job ${jobId} (ID: ${job.renderComponentId}) during hydration`);
})); }
allNewItems.push(...hydratedItems); const hydratedItems = merged
.filter(item => item && item.id && item.text && item.category && item.actionId && job.renderComponentId) // Filter invalid before hydrating
.map((item) => ({
...item,
renderComponent: renderComponent || undefined, // Assign undefined if not found
}));
if (hydratedItems.length !== merged.length) {
console.warn(`[Indexer Job ${jobId}] Filtered out ${merged.length - hydratedItems.length} invalid items during hydration.`);
}
allItemsFromJobs.push(...hydratedItems);
console.debug( console.debug(
`%c[Indexer] ✅ ${job.label}: ${newItems.length} items indexed`, `%c[Indexer] ✅ ${job.label}: ${newItemsRaw.length} new items fetched, ${merged.length} total stored (non-vector).`,
"color: #00c46f", "color: #00c46f",
); );
} catch (err) { } catch (err) {
@@ -239,25 +201,84 @@ export async function runIndexing(): Promise<void> {
} }
completedJobs++; completedJobs++;
dispatchProgress(completedJobs, jobIds.length, true); dispatchProgress(completedJobs, totalSteps, true, `Finished job: ${job.label}`);
} }
// Process all new items through vector search // --- Step 2: Delegate Vectorization to Worker (Off Main Thread) ---
if (allNewItems.length > 0) { if (allItemsFromJobs.length > 0) {
console.debug( console.debug(
`%c[Indexer] Processing ${allNewItems.length} items for vector search...`, `%c[Indexer] Sending ${allItemsFromJobs.length} items to worker for vectorization...`,
"color: #4ea1ff", "color: #4ea1ff",
); );
await processItems(allNewItems); dispatchProgress(completedJobs, totalSteps, true, "Starting vectorization");
try {
const workerManager = VectorWorkerManager.getInstance();
// Pass a progress callback to the worker manager
await workerManager.processItems(allItemsFromJobs, (progress) => {
// Update overall progress based on worker feedback
let detailMessage = progress.message || '';
if (progress.status === 'processing' && progress.total && progress.processed !== undefined) {
detailMessage = `Vectorizing: ${progress.processed} / ${progress.total}`;
// You could potentially update the 'completed' count more granularly here
// For simplicity, we'll just update the detail message
} else if (progress.status === 'complete') {
detailMessage = "Vectorization complete";
// Mark the vectorization step as complete
dispatchProgress(totalSteps, totalSteps, true, "Vectorization finished");
} else if (progress.status === 'error') {
detailMessage = `Vectorization error: ${progress.message}`;
dispatchProgress(completedJobs, totalSteps, true, "Vectorization failed", detailMessage); // Show error
} else if (progress.status === 'started') {
detailMessage = `Vectorization started for ${progress.total} items`;
} else if (progress.status === 'cancelled') {
detailMessage = `Vectorization cancelled: ${progress.message}`;
dispatchProgress(completedJobs, totalSteps, true, "Vectorization cancelled", detailMessage);
}
// Update the status detail
dispatchProgress(completedJobs, totalSteps, true, "Vectorization in progress", detailMessage);
// When worker signals completion of *its* task, mark the final step complete
if (progress.status === 'complete') {
completedJobs++; // Increment completion count *after* vectorization finishes
dispatchProgress(completedJobs, totalSteps, false, "Indexing finished"); // Set indexing to false
} else if (progress.status === 'error' || progress.status === 'cancelled') {
// Don't increment completed count on failure/cancel, just stop indexing indicator
dispatchProgress(completedJobs, totalSteps, false, "Indexing stopped due to error/cancel");
}
});
console.debug("%c[Indexer] Vectorization task sent to worker.", "color: green");
// Note: runIndexing might return *before* vectorization is complete now.
// The progress updates will signal the true end state.
} catch (error) {
console.error(`%c[Indexer] ❌ Failed to send items to vector worker:`, "color: red", error);
dispatchProgress(completedJobs, totalSteps, false, "Vectorization failed", String(error)); // Stop indexing indicator
}
} else {
console.debug("%c[Indexer] No items to send for vectorization.", "color: gray");
// If no vectorization needed, indexing is done here.
completedJobs++; // Count the "skipped" vectorization step
dispatchProgress(completedJobs, totalSteps, false, "Indexing finished (no vectorization needed)");
} }
// Stop heartbeat ONLY when all jobs *and* the vectorization dispatch are done.
// The actual *completion* of vectorization is now asynchronous.
stopHeartbeat(); stopHeartbeat();
dispatchProgress(completedJobs, jobIds.length, false); // Final progress update might be handled by the worker callback now.
// dispatchProgress(completedJobs, totalSteps, false); // This might be premature
} }
function mergeItems(existing: IndexItem[], incoming: IndexItem[]): IndexItem[] { function mergeItems(existing: IndexItem[], incoming: IndexItem[]): IndexItem[] {
const map = new Map<string, IndexItem>(); const map = new Map<string, IndexItem>();
for (const item of existing) map.set(item.id, item); // Prioritize incoming items if IDs clash
for (const item of incoming) map.set(item.id, item); for (const item of existing) {
if (item && item.id) map.set(item.id, item);
}
for (const item of incoming) {
if (item && item.id) map.set(item.id, item);
}
return Array.from(map.values()); return Array.from(map.values());
} }
@@ -3,7 +3,7 @@ import vectorWorker from './vectorWorker.ts?inlineWorker';
import type { SearchResult } from 'client-vector-search'; import type { SearchResult } from 'client-vector-search';
export type ProgressCallback = (data: { export type ProgressCallback = (data: {
status: 'started' | 'processing' | 'complete'; status: 'started' | 'processing' | 'complete' | 'error' | 'cancelled';
total?: number; total?: number;
processed?: number; processed?: number;
message?: string; message?: string;
@@ -13,10 +13,17 @@ export class VectorWorkerManager {
private static instance: VectorWorkerManager; private static instance: VectorWorkerManager;
private worker: Worker | null = null; private worker: Worker | null = null;
private isInitialized = false; private isInitialized = false;
private readyPromise: Promise<void> | null = null; // To await initialization
private progressCallback: ProgressCallback | null = null; private progressCallback: ProgressCallback | null = null;
private searchPromises = new Map<string, { resolve: (value: SearchResult[]) => void, reject: (reason?: any) => void }>(); private searchPromises = new Map<string, { resolve: (value: SearchResult[]) => void, reject: (reason?: any) => void, timer: NodeJS.Timeout }>();
private debounceTimer: NodeJS.Timeout | null = null;
private lastSearchParams: { query: string; topK: number; resolve: (results: SearchResult[]) => void, reject: (reason?: any) => void } | null = null;
private constructor() {}
private constructor() {
// Start initialization immediately, but allow awaiting it
this.readyPromise = this.initWorker();
}
static getInstance(): VectorWorkerManager { static getInstance(): VectorWorkerManager {
if (!VectorWorkerManager.instance) { if (!VectorWorkerManager.instance) {
@@ -25,108 +32,190 @@ export class VectorWorkerManager {
return VectorWorkerManager.instance; return VectorWorkerManager.instance;
} }
async init() { private async initWorker(): Promise<void> {
if (this.isInitialized) return; // If already initialized or initializing, return the existing promise
if (this.isInitialized) return Promise.resolve();
if (this.readyPromise) return this.readyPromise;
// Create the worker return new Promise<void>((resolve, reject) => {
this.worker = vectorWorker(); // Create the worker
this.worker = vectorWorker();
// Set up message handling const timeout = setTimeout(() => {
this.worker.addEventListener('message', (e) => { console.error('Vector worker initialization timed out');
const { type, data } = e.data; this.worker?.terminate(); // Clean up worker if it exists
console.log(e); this.worker = null;
this.isInitialized = false; // Ensure state reflects failure
switch (type) { this.readyPromise = null; // Allow retrying init later
case 'ready': reject(new Error('Worker initialization timed out'));
this.isInitialized = true; }, 10000); // Increased timeout
console.debug('Vector worker initialized');
break; // Set up message handling
this.worker!.addEventListener('message', (e) => {
case 'progress': const { type, data } = e.data;
if (this.progressCallback) { console.debug("Message from vector worker:", type, data);
this.progressCallback(data);
switch (type) {
case 'ready':
this.isInitialized = true;
clearTimeout(timeout);
console.debug('Vector worker initialized and ready.');
resolve(); // Resolve the init promise
break;
case 'progress':
if (this.progressCallback) {
this.progressCallback(data);
}
break;
case 'searchResults':
const searchInfo = this.searchPromises.get(data.messageId);
if (searchInfo) {
clearTimeout(searchInfo.timer); // Clear timeout on success
searchInfo.resolve(data.results);
this.searchPromises.delete(data.messageId);
} else {
console.warn('Received search results for unknown messageId:', data.messageId);
}
break;
case 'searchError':
const errorInfo = this.searchPromises.get(data.messageId);
if (errorInfo) {
clearTimeout(errorInfo.timer); // Clear timeout on error
errorInfo.reject(new Error(data.error));
this.searchPromises.delete(data.messageId);
} else {
console.warn('Received search error for unknown messageId:', data.messageId);
}
break;
case 'searchCancelled':
const cancelledInfo = this.searchPromises.get(data.messageId);
if (cancelledInfo) {
clearTimeout(cancelledInfo.timer); // Clear timeout on cancel
// Reject with a specific cancellation error or resolve with empty? Let's reject.
cancelledInfo.reject(new Error('Search cancelled by worker'));
this.searchPromises.delete(data.messageId);
} else {
console.debug('Received cancellation for unknown messageId:', data.messageId);
}
break;
default:
console.warn('Unknown message from worker:', type, data);
} }
break; });
case 'searchResults': // Initialize the worker
const searchPromise = this.searchPromises.get(data.messageId); this.worker!.postMessage({ type: 'init' });
if (searchPromise) {
searchPromise.resolve(data.results);
this.searchPromises.delete(data.messageId);
} else {
console.warn('Received search results for unknown messageId:', data.messageId);
}
break;
case 'searchError':
const errorPromise = this.searchPromises.get(data.messageId);
if (errorPromise) {
errorPromise.reject(new Error(data.error));
this.searchPromises.delete(data.messageId);
} else {
console.warn('Received search error for unknown messageId:', data.messageId);
}
break;
case 'searchCancelled':
const cancelledPromise = this.searchPromises.get(data.messageId);
if (cancelledPromise) {
cancelledPromise.reject(new Error('Search cancelled'));
this.searchPromises.delete(data.messageId);
} else {
console.debug('Received cancellation for unknown messageId:', data.messageId);
}
break;
default:
console.warn('Unknown message from worker:', type, data);
}
});
// Initialize the worker
this.worker.postMessage({ type: 'init' });
// Wait for ready message
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('Worker initialization timed out'));
}, 5000);
const checkInit = (e: MessageEvent) => {
if (e.data.type === 'ready') {
this.worker!.removeEventListener('message', checkInit);
clearTimeout(timeout);
resolve();
}
};
this.worker!.addEventListener('message', checkInit);
}); });
} }
async processItems(items: HydratedIndexItem[], onProgress?: ProgressCallback) { // Ensures worker is ready before proceeding
if (!this.isInitialized) { private async ensureReady() {
await this.init(); if (!this.readyPromise) {
// If init wasn't called or failed, try again
console.warn("Worker not initialized, attempting init...");
this.readyPromise = this.initWorker();
} }
await this.readyPromise;
if (!this.isInitialized || !this.worker) {
throw new Error("Vector Worker is not available after initialization attempt.");
}
}
async processItems(items: HydratedIndexItem[], onProgress?: ProgressCallback) {
await this.ensureReady(); // Wait for worker to be ready
this.progressCallback = onProgress || null; this.progressCallback = onProgress || null;
// Cancel any ongoing search when starting processing
this.cancelAllSearches("Processing started");
console.debug(`Sending ${items.length} items to worker for processing.`);
this.worker!.postMessage({ this.worker!.postMessage({
type: 'process', type: 'process',
data: { items } data: { items }
}); });
} }
terminate() { // Public search method
if (this.worker) { public async search(query: string, topK: number = 10): Promise<SearchResult[]> {
// Clean up any pending promises await this.ensureReady();
for (const [messageId, promise] of this.searchPromises.entries()) {
promise.reject(new Error('Worker terminated')); return new Promise((resolve, reject) => {
this.searchPromises.delete(messageId); this.lastSearchParams = { query, topK, resolve, reject };
const messageId = crypto.randomUUID();
if (this.lastSearchParams && this.worker) {
const currentParams = this.lastSearchParams; // Capture current params
this.lastSearchParams = null; // Clear last params *before* posting
this.debounceTimer = null;
// Set a timeout for the search operation itself
const searchTimeout = 10000; // e.g., 10 seconds
const searchTimer = setTimeout(() => {
if (this.searchPromises.has(messageId)) {
console.error(`Search timed out for messageId: ${messageId}`);
currentParams.reject(new Error(`Search timed out after ${searchTimeout}ms`));
this.searchPromises.delete(messageId);
}
}, searchTimeout);
this.searchPromises.set(messageId, { resolve: currentParams.resolve, reject: currentParams.reject, timer: searchTimer });
console.debug(`Sending search request (ID: ${messageId}) to worker: "${currentParams.query}"`);
this.worker.postMessage({
type: "search",
data: { query: currentParams.query, topK: currentParams.topK },
messageId
});
} else if (this.lastSearchParams) {
// This case might happen if ensureReady failed but didn't throw
console.error("Worker unavailable when trying to send search request.");
this.lastSearchParams.reject(new Error("Worker unavailable for search"));
this.lastSearchParams = null;
this.debounceTimer = null;
} }
});
}
// Method to cancel all pending/debounced searches
private cancelAllSearches(reason: string = "Cancelled") {
if (this.debounceTimer) {
clearTimeout(this.debounceTimer);
this.debounceTimer = null;
if (this.lastSearchParams) {
this.lastSearchParams.reject(new Error(`Search cancelled: ${reason}`));
this.lastSearchParams = null;
}
}
// We might also want to tell the worker to cancel its *current* search
// if it supports it, but this requires worker modification.
// For now, just reject pending promises in the manager.
for (const [messageId, promiseInfo] of this.searchPromises.entries()) {
clearTimeout(promiseInfo.timer);
promiseInfo.reject(new Error(`Search cancelled: ${reason}`));
this.searchPromises.delete(messageId);
}
}
terminate() {
console.debug("Terminating Vector Worker Manager...");
this.cancelAllSearches("Worker terminated"); // Cancel pending searches
if (this.worker) {
this.worker.terminate(); this.worker.terminate();
this.worker = null; this.worker = null;
this.isInitialized = false;
} }
this.isInitialized = false;
this.readyPromise = null; // Reset init promise
this.progressCallback = null;
// Clear the static instance? Or assume app lifecycle handles this?
// VectorWorkerManager.instance = null; // Uncomment if needed
} }
} }
@@ -1,87 +1,42 @@
import type { VectorSearchResult } from "./vectorTypes"; /* import type { VectorSearchResult } from "./vectorTypes";
import vectorSearchWorker from "./vectorSearchWorker?inlineWorker"; import { VectorWorkerManager } from '../../indexing/worker/vectorWorkerManager';
export function searchVectors(query: string, topK: number = 10): Promise<VectorSearchResult[]> { export function searchVectors(query: string, topK: number = 10): Promise<VectorSearchResult[]> {
return VectorSearchWorkerManager.getInstance().search(query, topK); // Use the single instance of the VectorWorkerManager (from indexing) to perform the search
/* return new Promise((resolve) => { return VectorWorkerManager.getInstance().search(query, topK);
resolve([]); }
}); */ */
import { getEmbedding, EmbeddingIndex, initializeModel } from 'client-vector-search';
import type { HydratedIndexItem } from '../../indexing/types';
import type { SearchResult } from 'client-vector-search';
let vectorIndex: EmbeddingIndex | null = null;
export async function initVectorSearch() {
try {
await initializeModel();
vectorIndex = new EmbeddingIndex([]);
vectorIndex.preloadIndexedDB();
} catch (e) {
console.error('Error initializing vector search', e);
}
} }
class VectorSearchWorkerManager { export interface VectorSearchResult extends SearchResult {
private static instance: VectorSearchWorkerManager; object: HydratedIndexItem & { embedding: number[] };
private worker: Worker | null = null; }
private pendingSearches = new Map<string, (results: VectorSearchResult[]) => void>();
private debounceTimer: NodeJS.Timeout | null = null;
private lastSearchParams: { query: string; topK: number; resolve: (results: VectorSearchResult[]) => void } | null = null;
constructor() { export async function searchVectors(query: string, topK: number = 10): Promise<VectorSearchResult[]> {
this.initWorker(); if (!vectorIndex) await initVectorSearch();
}
private initWorker() {
try {
this.worker = vectorSearchWorker();
this.worker.addEventListener('message', this.messageHandler);
} catch (e) {
console.error("Failed to initialize vector search:", e);
throw e;
}
}
private messageHandler = (e: MessageEvent) => {
console.log("Message received", e.data);
if (e.data.type === 'searchResults') {
const resolve = this.pendingSearches.get(e.data.data.messageId);
if (resolve) {
resolve(e.data.data.results);
this.pendingSearches.delete(e.data.data.messageId);
}
}
};
public static getInstance(): VectorSearchWorkerManager {
if (!VectorSearchWorkerManager.instance) {
VectorSearchWorkerManager.instance = new VectorSearchWorkerManager();
}
return VectorSearchWorkerManager.instance;
}
public async search(query: string, topK: number = 10): Promise<VectorSearchResult[]> {
if (!this.worker) {
this.initWorker();
}
return new Promise((resolve) => { const queryEmbedding = await getEmbedding(query.slice(0, 100));
this.lastSearchParams = { query, topK, resolve };
if (this.debounceTimer) clearTimeout(this.debounceTimer);
this.debounceTimer = setTimeout(() => {
const messageId = crypto.randomUUID();
if (this.lastSearchParams) {
this.pendingSearches.set(messageId, this.lastSearchParams.resolve);
this.worker?.postMessage({
type: "search",
data: { query: this.lastSearchParams.query, topK: this.lastSearchParams.topK },
messageId
});
this.lastSearchParams = null;
}
this.debounceTimer = null;
}, query !== '' ? 300 : 0);
});
}
public terminate() { const results = await vectorIndex!.search(queryEmbedding, {
if (this.worker) { topK,
for (const [messageId, resolve] of this.pendingSearches.entries()) { useStorage: 'indexedDB',
resolve([]); dedupeEntries: true
this.pendingSearches.delete(messageId); });
}
return results as VectorSearchResult[];
this.worker.terminate(); }
this.worker = null;
}
}
}
export default VectorSearchWorkerManager;
@@ -1,56 +0,0 @@
import { EmbeddingIndex, getEmbedding, initializeModel } from "client-vector-search";
import type { VectorSearchResult } from "./vectorTypes";
console.log("%cVector search worker initialized", "background-color: #000; color: #fff;");
let vectorIndex: EmbeddingIndex | null = null;
let isInitialized = false;
async function initVectorSearch() {
if (isInitialized) return;
try {
await initializeModel();
vectorIndex = new EmbeddingIndex([]);
// Load existing items from IndexedDB
const stored = await vectorIndex.getAllObjectsFromIndexedDB();
if (stored.length > 0) {
stored.forEach((item) => vectorIndex!.add(item));
console.debug("Vector index loaded from IndexedDB");
}
isInitialized = true;
} catch (e) {
console.error("Failed to initialize vector search:", e);
throw e;
}
}
async function searchVectors(query: string, topK: number = 10): Promise<VectorSearchResult[]> {
if (!vectorIndex) await initVectorSearch();
const queryEmbedding = await getEmbedding(query);
const results = await vectorIndex!.search(queryEmbedding, {
topK,
useStorage: 'indexedDB'
});
return results as VectorSearchResult[];
}
self.addEventListener('message', async (e) => {
const { type, data, messageId } = e.data;
switch (type) {
case 'search':
console.log("Search request received", data);
searchVectors(data.query, data.topK).then((results) => {
self.postMessage({ type: 'searchResults', data: { messageId, results } });
});
break;
default:
console.warn(`Unknown message type: ${type}`);
}
});
initVectorSearch();
@@ -4,6 +4,7 @@ import { getDynamicItems } from "./dynamicSearch";
import type { CombinedResult } from "./core/types"; import type { CombinedResult } from "./core/types";
import type { HydratedIndexItem } from "./indexing/types"; import type { HydratedIndexItem } from "./indexing/types";
import { searchVectors } from "./search/vector/vectorSearch"; import { searchVectors } from "./search/vector/vectorSearch";
import type { VectorSearchResult } from "./search/vector/vectorTypes";
export function createSearchIndexes() { export function createSearchIndexes() {
const commands = getStaticCommands(); const commands = getStaticCommands();
@@ -154,7 +155,10 @@ export async function performSearch(
const fuseEndTime = performance.now(); const fuseEndTime = performance.now();
// Get vector results in parallel // Get vector results in parallel
const vectorResults = await searchVectors(query, 10); let vectorResults: VectorSearchResult[] = [];
try {
vectorResults = await searchVectors(query, 10);
} catch (e) {}
const vectorEndTime = performance.now(); const vectorEndTime = performance.now();
console.log("Vector results:", vectorResults); console.log("Vector results:", vectorResults);
+1 -1
View File
@@ -33,5 +33,5 @@
"node" "node"
] ]
}, },
"include": ["src/**/*.ts", "src/**/*.js", "src/**/*.svelte", "src/interface/+layout.sveltes"] "include": ["src/**/*.ts", "src/**/*.js", "src/**/*.svelte", "src/interface/+layout.svelte", "declarations.d.ts"]
} }