Files
repomix-mirror/tests/shared/processConcurrency.test.ts
Claude cf013a0c8f perf(shared): Raise TASKS_PER_THREAD from 100 to 200 to reduce worker contention
Background
----------
On a typical CLI run (`node bin/repomix.cjs --include 'src,tests' --quiet`,
258 files, 4-vCPU host), the metrics worker pool was sized as
`ceil(258 / 100) = 3 workers`. Combined with the security pool's hard cap
of 2 workers (securityCheck.ts:90) and the main thread, the process held
6 active threads on 4 cores during the overlap of `validateFileSafety`
and `calculateMetrics`.

Each metrics worker independently parses gpt-tokenizer's ~2.2 MB
`o200k_base.js` BPE table on its first task — a ~200-300 ms pure-CPU
operation per worker. Spawning 3 cold metrics workers in the warm-up
phase (calculateMetrics.ts:46-48) therefore drove the security workers
off the CPU during their own (concurrent) cold-start, inflating the
critical-path security phase.

Change
------
Raise `TASKS_PER_THREAD` from 100 to 200 so:

- ≤200 file repos:    1 metrics worker (was 1)         — no change
- 201-400 file repos: 2 metrics workers (was 3)        — -1 worker, the win
- 401-600 file repos: 3 metrics workers (was 4-cap)    — -1 worker
- 601-800 file repos: 4 metrics workers (was 4-cap)    — no change
- 801+ file repos:    4 metrics workers (was 4-cap)    — no change (cap)

For the 258-file benchmark this brings active workers during the
metrics+security overlap to 2 + 2 = 4, matching CPU count, and halves
the parallel BPE-loading work in the warm-up phase.

Tests for `getWorkerThreadCount` and `createWorkerPool` are updated to
reflect the new ratio.

Benchmark
---------
`node bin/repomix.cjs --include 'src,tests' --quiet` (258 files), n=20
paired interleaved (alternating BEFORE-first / AFTER-first ordering):

|        | min     | p25     | median  | p75     | mean    | sd     |
|--------|---------|---------|---------|---------|---------|--------|
| BEFORE | 1045 ms | 1092 ms | 1109 ms | 1122 ms | 1107 ms | 27 ms  |
| AFTER  |  937 ms |  973 ms |  991 ms | 1020 ms |  994 ms | 29 ms  |

Mean paired Δ:   +112.5 ms  (10.17 % wall-clock reduction)
Median paired Δ: +115.4 ms  (10.66 % wall-clock reduction)
Paired-delta SD: 36.2 ms  (paired t = 13.88, p < 0.001)
AFTER faster in 20/20 pairs (100 %)

Regression check — `node bin/repomix.cjs --quiet` (default, 1572 files),
n=15 paired interleaved:

|        | min     | p25     | median  | p75     | mean    | sd     |
|--------|---------|---------|---------|---------|---------|--------|
| BEFORE | 1933 ms | 1970 ms | 2016 ms | 2102 ms | 2028 ms | 62 ms  |
| AFTER  | 1955 ms | 1966 ms | 2004 ms | 2131 ms | 2034 ms | 74 ms  |

Mean paired Δ:   -6.2 ms (-0.31 %)  (paired t = -0.29, p > 0.05)
Median paired Δ: -12.7 ms (statistically neutral)

No regression on the large workload — both 100 and 200 saturate the
per-CPU cap at 4 workers for ≥800 file repos, so the dispatch-time
behavior is identical there.

Correctness
-----------
- 1256 / 1256 unit tests pass.
- `npm run lint` clean (only pre-existing warnings unrelated to this change).
- No behavioral change to file processing, tokenization, security checks,
  or output. Pool sizing is the only effect.
2026-05-07 01:11:11 +00:00

257 lines
8.8 KiB
TypeScript

import os from 'node:os';
import { Tinypool } from 'tinypool';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import {
cleanupWorkerPool,
createWorkerPool,
getProcessConcurrency,
getWorkerThreadCount,
initTaskRunner,
} from '../../src/shared/processConcurrency.js';
vi.mock('node:os');
// Use vi.hoisted for class mock that needs to work as constructor
const { MockTinypool } = vi.hoisted(() => {
// Create a mock function wrapped class for spy functionality
const MockTinypool = vi.fn().mockImplementation(function (this: unknown) {
(this as Record<string, unknown>).run = vi.fn();
(this as Record<string, unknown>).destroy = vi.fn();
return this;
});
return { MockTinypool };
});
vi.mock('tinypool', () => ({
Tinypool: MockTinypool,
}));
describe('processConcurrency', () => {
describe('getProcessConcurrency', () => {
it('should use os.availableParallelism when available', () => {
const mockAvailableParallelism = vi.fn().mockReturnValue(4);
vi.mocked(os).availableParallelism = mockAvailableParallelism;
const result = getProcessConcurrency();
expect(result).toBe(4);
expect(mockAvailableParallelism).toHaveBeenCalled();
});
});
describe('getWorkerThreadCount', () => {
beforeEach(() => {
vi.mocked(os).availableParallelism = vi.fn().mockReturnValue(8);
});
it('should return minimum 1 thread', () => {
const { minThreads, maxThreads } = getWorkerThreadCount(1);
expect(minThreads).toBe(1);
expect(maxThreads).toBe(1);
});
it('should limit max threads based on number of tasks', () => {
const { minThreads, maxThreads } = getWorkerThreadCount(1000);
expect(minThreads).toBe(1);
expect(maxThreads).toBe(5); // Limited by task count: Math.min(8, ceil(1000/200)) = 5
});
it('should scale max threads based on task count', () => {
const { maxThreads: maxThreads1 } = getWorkerThreadCount(200);
const { maxThreads: maxThreads2 } = getWorkerThreadCount(400);
expect(maxThreads2).toBeGreaterThan(maxThreads1);
});
it('should handle large numbers of tasks', () => {
const { minThreads, maxThreads } = getWorkerThreadCount(10000);
expect(minThreads).toBe(1);
expect(maxThreads).toBe(8); // Limited by CPU count: Math.min(8, ceil(10000/200)) = 8
});
it('should handle zero tasks', () => {
const { minThreads, maxThreads } = getWorkerThreadCount(0);
expect(minThreads).toBe(1);
expect(maxThreads).toBe(1);
});
it('should cap max threads when maxWorkerThreads is provided', () => {
// CPU has 8 cores, 1000 tasks would normally give 8 threads
const { maxThreads } = getWorkerThreadCount(1000, 3);
expect(maxThreads).toBe(3);
});
it('should not exceed task-based limit even with higher maxWorkerThreads', () => {
// 400 tasks → ceil(400/200) = 2 threads, maxWorkerThreads=6 should not increase it
const { maxThreads } = getWorkerThreadCount(400, 6);
expect(maxThreads).toBe(2);
});
it('should ignore maxWorkerThreads when undefined', () => {
const { maxThreads } = getWorkerThreadCount(10000, undefined);
expect(maxThreads).toBe(8);
});
});
describe('initWorker', () => {
beforeEach(() => {
vi.mocked(os).availableParallelism = vi.fn().mockReturnValue(4);
// Use regular function syntax for constructor mock
vi.mocked(Tinypool).mockImplementation(function (this: unknown) {
(this as Record<string, unknown>).run = vi.fn();
(this as Record<string, unknown>).destroy = vi.fn();
return this as Tinypool;
});
});
it('should initialize Tinypool with correct configuration', () => {
const tinypool = createWorkerPool({ numOfTasks: 500, workerType: 'fileProcess', runtime: 'child_process' });
expect(Tinypool).toHaveBeenCalledWith({
filename: expect.stringContaining('fileProcessWorker.js'),
runtime: 'child_process',
minThreads: 1,
maxThreads: 3, // Math.min(4, ceil(500/200)) = 3
idleTimeout: 5000,
teardown: 'onWorkerTermination',
workerData: {
workerType: 'fileProcess',
logLevel: 2,
},
env: expect.objectContaining({
REPOMIX_LOG_LEVEL: '2',
FORCE_COLOR: expect.any(String),
TERM: expect.any(String),
}),
});
expect(tinypool).toBeDefined();
});
it('should initialize Tinypool with worker_threads runtime when specified', () => {
const tinypool = createWorkerPool({ numOfTasks: 500, workerType: 'securityCheck', runtime: 'worker_threads' });
expect(Tinypool).toHaveBeenCalledWith({
filename: expect.stringContaining('securityCheckWorker.js'),
runtime: 'worker_threads',
minThreads: 1,
maxThreads: 3, // Math.min(4, ceil(500/200)) = 3
idleTimeout: 5000,
teardown: 'onWorkerTermination',
workerData: {
workerType: 'securityCheck',
logLevel: 2,
},
});
expect(tinypool).toBeDefined();
});
});
describe('initTaskRunner', () => {
beforeEach(() => {
vi.mocked(os).availableParallelism = vi.fn().mockReturnValue(4);
// Use regular function syntax for constructor mock
vi.mocked(Tinypool).mockImplementation(function (this: unknown) {
(this as Record<string, unknown>).run = vi.fn();
(this as Record<string, unknown>).destroy = vi.fn();
return this as Tinypool;
});
});
it('should return a TaskRunner with run and cleanup methods', () => {
const taskRunner = initTaskRunner({ numOfTasks: 100, workerType: 'fileProcess', runtime: 'child_process' });
expect(taskRunner).toHaveProperty('run');
expect(taskRunner).toHaveProperty('cleanup');
expect(typeof taskRunner.run).toBe('function');
expect(typeof taskRunner.cleanup).toBe('function');
});
it('should pass runtime parameter to createWorkerPool', () => {
const taskRunner = initTaskRunner({ numOfTasks: 100, workerType: 'calculateMetrics', runtime: 'worker_threads' });
expect(Tinypool).toHaveBeenCalledWith(
expect.objectContaining({
runtime: 'worker_threads',
workerData: expect.objectContaining({
workerType: 'calculateMetrics',
}),
}),
);
expect(taskRunner).toHaveProperty('run');
expect(taskRunner).toHaveProperty('cleanup');
});
it('delegates run and cleanup to the underlying pool', async () => {
const runMock = vi.fn().mockResolvedValue('result');
const destroyMock = vi.fn().mockResolvedValue(undefined);
vi.mocked(Tinypool).mockImplementation(function (this: unknown) {
(this as Record<string, unknown>).run = runMock;
(this as Record<string, unknown>).destroy = destroyMock;
return this as Tinypool;
});
const taskRunner = initTaskRunner<{ payload: string }, string>({
numOfTasks: 10,
workerType: 'fileProcess',
runtime: 'worker_threads',
});
await expect(taskRunner.run({ payload: 'x' })).resolves.toBe('result');
expect(runMock).toHaveBeenCalledWith({ payload: 'x' });
await taskRunner.cleanup();
expect(destroyMock).toHaveBeenCalled();
});
});
describe('cleanupWorkerPool', () => {
it('calls destroy on standard Node runtime', async () => {
const destroy = vi.fn().mockResolvedValue(undefined);
const pool = { destroy } as unknown as Tinypool;
await cleanupWorkerPool(pool);
expect(destroy).toHaveBeenCalled();
});
it('skips destroy under Bun runtime', async () => {
const destroy = vi.fn();
const pool = { destroy } as unknown as Tinypool;
// Bun exposes process.versions.bun. Stub it for this test.
// Track whether the property originally existed so we can fully remove
// it on restore — assigning back `undefined` would leave the key
// defined-but-undefined and mutate process.versions for the rest of
// the suite.
const hadBun = Object.hasOwn(process.versions, 'bun');
const original = process.versions.bun;
Object.defineProperty(process.versions, 'bun', { value: '1.0.0', configurable: true });
try {
await cleanupWorkerPool(pool);
expect(destroy).not.toHaveBeenCalled();
} finally {
if (hadBun) {
Object.defineProperty(process.versions, 'bun', { value: original, configurable: true });
} else {
delete (process.versions as Record<string, unknown>).bun;
}
}
});
it('swallows destroy errors so shutdown never throws', async () => {
const pool = {
destroy: vi.fn().mockRejectedValue(new Error('teardown failed')),
} as unknown as Tinypool;
await expect(cleanupWorkerPool(pool)).resolves.toBeUndefined();
});
});
});