fix: add timeout lock to ensure completion of vecotrisation

This commit is contained in:
2026-05-01 08:49:00 +09:30
parent f3f4491f04
commit f6472ea9bd
@@ -19,6 +19,8 @@ export class VectorWorkerManager {
private initializationMutex = false; private initializationMutex = false;
private idleTimer: NodeJS.Timeout | null = null; private idleTimer: NodeJS.Timeout | null = null;
private unloadTimer: NodeJS.Timeout | null = null; private unloadTimer: NodeJS.Timeout | null = null;
/** Non-streaming `process` jobs must not hit the idle shutdown mid-flight. */
private vectorizationLockCount = 0;
private streamingSession: { private streamingSession: {
isActive: boolean; isActive: boolean;
@@ -92,6 +94,12 @@ export class VectorWorkerManager {
break; break;
case "progress": case "progress":
if (
data.status === "processing" ||
data.status === "started"
) {
this.bumpActivityDuringVectorization();
}
if (this.progressCallback) { if (this.progressCallback) {
this.progressCallback(data); this.progressCallback(data);
@@ -120,6 +128,7 @@ export class VectorWorkerManager {
break; break;
case "streamingProgress": case "streamingProgress":
this.bumpActivityDuringVectorization();
if (this.progressCallback && this.streamingSession?.isActive) { if (this.progressCallback && this.streamingSession?.isActive) {
const { processed } = data; const { processed } = data;
this.progressCallback({ this.progressCallback({
@@ -150,6 +159,7 @@ export class VectorWorkerManager {
this.readyPromise = null; this.readyPromise = null;
this.progressCallback = null; this.progressCallback = null;
this.initializationMutex = false; this.initializationMutex = false;
this.vectorizationLockCount = 0;
this.clearIdleTimer(); this.clearIdleTimer();
this.clearUnloadTimer(); this.clearUnloadTimer();
if (this.streamingSession?.isActive) { if (this.streamingSession?.isActive) {
@@ -158,15 +168,27 @@ export class VectorWorkerManager {
} }
private startIdleTimer() { private startIdleTimer() {
if (this.vectorizationLockCount > 0 || this.streamingSession?.isActive) {
return;
}
this.clearIdleTimer(); this.clearIdleTimer();
this.idleTimer = setTimeout(() => { this.idleTimer = setTimeout(() => {
if (!this.streamingSession?.isActive && this.isInitialized) { if (this.vectorizationLockCount > 0) return;
console.debug("[VectorWorker] Auto-shutting down due to 2 minutes of inactivity"); if (this.streamingSession?.isActive) return;
this.resetWorkerState(); if (!this.isInitialized) return;
} console.debug("[VectorWorker] Auto-shutting down due to 2 minutes of inactivity");
this.resetWorkerState();
}, 120000); // 2 minutes }, 120000); // 2 minutes
} }
/** Extends idle deadline while embeddings run; cheap if no idle timer is scheduled. */
private bumpActivityDuringVectorization() {
if (this.vectorizationLockCount > 0 || this.streamingSession?.isActive) {
this.clearIdleTimer();
}
this.updateActivity();
}
private clearIdleTimer() { private clearIdleTimer() {
if (this.idleTimer) { if (this.idleTimer) {
clearTimeout(this.idleTimer); clearTimeout(this.idleTimer);
@@ -184,6 +206,7 @@ export class VectorWorkerManager {
private scheduleUnload(delay: number = 10000) { private scheduleUnload(delay: number = 10000) {
this.clearUnloadTimer(); this.clearUnloadTimer();
this.unloadTimer = setTimeout(() => { this.unloadTimer = setTimeout(() => {
if (this.vectorizationLockCount > 0) return;
if (!this.streamingSession?.isActive && this.isInitialized) { if (!this.streamingSession?.isActive && this.isInitialized) {
console.debug("[VectorWorker] Auto-unloading after processing complete"); console.debug("[VectorWorker] Auto-unloading after processing complete");
this.resetWorkerState(); this.resetWorkerState();
@@ -193,6 +216,9 @@ export class VectorWorkerManager {
private updateActivity() { private updateActivity() {
this.clearUnloadTimer(); this.clearUnloadTimer();
if (this.vectorizationLockCount > 0 || this.streamingSession?.isActive) {
return;
}
this.startIdleTimer(); this.startIdleTimer();
} }
@@ -303,32 +329,45 @@ export class VectorWorkerManager {
// stopHeartbeat/loadAll/loadDynamicItems on the main thread while // stopHeartbeat/loadAll/loadDynamicItems on the main thread while
// vectorization was still running — blocking indexing-progress handlers // vectorization was still running — blocking indexing-progress handlers
// and freezing the chip on “Vectorization in progress”. // and freezing the chip on “Vectorization in progress”.
await new Promise<void>((resolve) => { this.vectorizationLockCount++;
let settled = false; this.clearIdleTimer();
const wrap: ProgressCallback = (data) => { this.clearUnloadTimer();
onProgress?.(data);
if (
!settled &&
(data.status === "complete" ||
data.status === "error" ||
data.status === "cancelled")
) {
settled = true;
resolve();
}
};
this.progressCallback = wrap;
this.updateActivity();
console.debug( try {
`Sending ${uniqueItems.length} unique items to worker for processing.`, await new Promise<void>((resolve) => {
); let settled = false;
const wrap: ProgressCallback = (data) => {
onProgress?.(data);
if (
!settled &&
(data.status === "complete" ||
data.status === "error" ||
data.status === "cancelled")
) {
settled = true;
resolve();
}
};
this.progressCallback = wrap;
this.worker!.postMessage({ console.debug(
type: "process", `Sending ${uniqueItems.length} unique items to worker for processing.`,
data: { items: uniqueItems }, );
this.worker!.postMessage({
type: "process",
data: { items: uniqueItems },
});
}); });
}); } finally {
this.vectorizationLockCount = Math.max(0, this.vectorizationLockCount - 1);
if (
this.vectorizationLockCount === 0 &&
!this.streamingSession?.isActive
) {
this.startIdleTimer();
}
}
} }
async startStreamingSession( async startStreamingSession(