023 Использование worker threads

Тут мы для загрузки системы написали поиск факториала массива чисел

factorial.ts

export default function factorial(n: number): number {
	if (n == 1 || n == 0) return 1;
	return factorial(n - 1) * n;
}

app.ts

import factorial from "./factorial";
 
const compute = (array: number[]): number[] => {
	const arr: number[] = [];
	for (let i: number = 0; i < 10000000; i++) {
		arr.push(i * i);
	}
	return array.map((el): number => factorial(el));
};
 
const main = (): void => {
	performance.mark("start");
 
	const result = [
		compute([23, 23, 4, 34, 45, 32, 21, 45]),
		compute([23, 23, 4, 34, 45, 32, 21, 45]),
		compute([23, 23, 4, 34, 45, 32, 21, 45]),
		compute([23, 23, 4, 34, 45, 32, 21, 45]),
		compute([23, 23, 4, 34, 45, 32, 21, 45]),
	];
 
	console.log(result);
 
	performance.mark("end");
	performance.measure("main", "start", "end");
	console.log(performance.getEntriesByName("main").pop());
};
 
main();

Как итог: мы на 2 секунды заблокировали основной поток нашего приложения и не могли получать ни реквесты от пользователей, ни какие-либо операции выполнять (те же таймауты)

Далее воспользуемся созданием воркеров

Это всё так же сама наша функция для выполнения вычислений

factorial.js

module.exports = function factorial(n) {
	if (n == 1 || n == 0) return 1;
	return factorial(n - 1) * n;
};

Этот модуль будет отвечать за задачу, выполняемую отдельным воркером

worker.js

const factorial = require("./factorial");
// Далее нужно импортировать два модуля, которые позволяют работать отдельно с воркером
// parentPort - порт родителя (виртуальный), где можно обмениваться данными с родителем
// workerData - сами исходные данные воркера
const { parentPort, workerData } = require("worker_threads");
 
const compute = ({ array }) => {
	const arr = [];
	for (let i = 0; i < 10000000; i++) {
		arr.push(i * i);
	}
	return array.map((el) => factorial(el));
};
 
// ! Тут мы передадим данные родителю по порту
parentPort.postMessage(compute(workerData));

А уже тут мы используем промисы для ожидания ответа от нашего воркера

app-worker.js

// импортируем инстанс воркера
const { Worker } = require("worker_threads");
 
// функцию подсчёта оставим
const compute = (array) => {
	// и отсюда будем возвращать результат промиса
	return new Promise((resolve, reject) => {
		// Тут мы инстанциируем на каждый запрос по воркеру (в реальном проекте это небезопасно)
		const worker = new Worker("./worker.js", {
			// передаём сюда тип получаемых данных
			workerData: {
				array,
			},
		});
 
		// подписываемся на событие выполнения операции
		worker.on("message", (message) => {
			// тут мы можем узнать какой id потока был присвоен треду
			console.log(worker.threadId);
			resolve(message);
		});
 
		// на ошибку
		worker.on("error", (error) => {
			reject(error.message);
		});
 
		// и на отключение воркера
		worker.on("exit", () => {
			console.log(worker.threadId); // -1
			console.log("Завершил работу");
		});
	});
};
 
const main = async () => {
	try {
		performance.mark("start");
 
		// тут же мы можем с помощью Promise.all() подождать всю группу операций
		const result = await Promise.all([
			compute([23, 23, 4, 34, 45, 32, 21, 45]),
			compute([23, 23, 4, 34, 45, 32, 21, 45]),
			compute([23, 23, 4, 34, 45, 32, 21, 45]),
			compute([23, 23, 4, 34, 45, 32, 21, 45]),
			compute([23, 23, 4, 34, 45, 32, 21, 45]),
		]);
 
		console.log(result);
 
		performance.mark("end");
		performance.measure("main", "start", "end");
		console.log(performance.getEntriesByName("main").pop());
	} catch (err) {
		console.log(err.message);
	}
};
 
main();
 

И уже конкретно тут можно увидеть, что задача выполнилась почти в 3 раза быстрее, чем без разделения на потоки. Так же тут у нас будут отрабатывать вызванные таймауты и приниматься реквесты от пользователей.

024 Spawn и exec

Модуль exec позволяет нам работать с нашим терминалом. Первым аргументом в него мы передаём саму команду, которую мы хотели бы выполнить, например, ls для вывода содержимого директории или ту же команду по запуску нод-скриптов node app.js. Дальше уже принимает в себя функция коллбэк-функцию с ошибкой приложения, выводом консоли и ошибкой консоли. Мы так же можем реагировать на ивенты с созданным таким образом процессом.

// этот модуль позволяет нам работать с терминалом
const { exec } = require("child_process");
 
// это сам инстанс процесса
const childProcess = exec("dir", (err, stdout, stderr) => {
	if (err) {
		console.error(err.message);
	}
 
	console.log(`stdout: ${stdout}`);
	console.log(`stderr: ${stderr}`);
});
 
// будет срабатывать при выходе из приложения
childProcess.on("exit", (code) => {
	console.log(`Код выхода: ${code}`);
});

Сам же модуль spawn сразу вызывает функцию в консоли и уже после мы можем обработать полученный результат из консоли по отдельным его частям

// этот модуль позволяет нам работать с терминалом
const { spawn } = require("child_process");
 
// это сам инстанс процесса
const childProcess = spawn("ls");
 
// обработка выхода
childProcess.stdout.on("data", (data) => {
	console.log(`stdout: ${data}`);
});
 
// обработка ошибки консоли
childProcess.stderr.on("data", (data) => {
	console.log(`stderr: ${data}`);
});
 
// подписываемся на выход из консоли
childProcess.on("exit", (data) => {
	console.log(`Код выхода: ${data}`);
});

025 Fork

Модуль fork позволяет запустить указанный файл для выполнения в отдельном потоке (это альтернативный запуск воркер-процесса)

Конкретно тут была реализована задача отправки сообщения в другой процесс (fork.js), откуда поступает ответ родителю при получении сообщения от родителя.

app.js

// тут мы непосредственно импортируем модуль
const { fork } = require("child_process");
 
// тут мы указываем запускаемый файл
const forkProcess = fork("fork.js");
 
// создаём ивент, который при получении сообщения, будет его выводить
forkProcess.on("message", (message) => {
	console.log(`Получено сообщение: ${message}`);
});
 
// Этот ивент будет срабатывать при завершении процесса
forkProcess.on("close", (code) => {
	console.log(`Exited code: ${code}`);
});
 
// ! Реализация передачи сообщения нашему процессу
// отправляем сообщение процессу
forkProcess.send("Ping");
// а тут отправляем сообщение об отключении
forkProcess.send("disconnect");

Уже тут при получении сообщения будет реализована логика как отправки сообщения, так и отключения процесса, если поступит запрос от родителя "disconnect"

fork.js

// тут мы отправляем ответ родителю при получении сообщения от него
process.on("message", (message) => {
	// если родитель отправит сообщении об отключении, то ...
	if (message === "disconnect") {
		// ... нужно будет отключить процесс
		process.disconnect();
		// и нужно остановить дальнейшее выполнение функции, чтобы не было ошибки
		return;
	}
	console.log(`Клиент получил: ${message}`);
	// отправляем родителю сообщение
	process.send("Pong!");
});

026 Упражнение - Производительность потоков

Это сам модуль для реализации загруженности системы

factorial.js

// функция фаториала
function factorial(n) {
	if (n == 1 || n == 0) {
		return 1;
	}
	return factorial(n - 1) * n;
}
 
// фукнция общего подсчёта всех значений
function compute({ array }) {
	const arr = [];
	for (let i = 0; i < 100000000; i++) {
		arr.push(i * i);
	}
	return array.map(el => factorial(el));
};
 
// экспорт функций
module.exports = { factorial, compute }

Отправляет результат вычислений по запросу от родителя. Родитель отправляет просто сообщение.

fork.js

// импортируем функцию расчёта
const { compute } = require('./factorial');
 
// срабатывает на получении сообщения от родительского процесса
process.on('message', (msg) => {
	// считаем отправленные родителем данные
	process.send(compute(msg));
	// отключаем процесс
	process.disconnect();
});

Ну и второй процесс, который так же реализует подсчёт значений по запросу родителя. Родитель отправляет сообщение в виде объекта.

worker.js

const { parentPort, workerData } = require('worker_threads');
const { compute } = require('./factorial');
 
// отпрравляет родителю результат вычислений по полученным данным
parentPort.postMessage(compute(workerData))

Это первая версия приложения, которая просто запускает оба вида создания потоков

app.js

const { Worker } = require("worker_threads");
const { fork } = require("child_process");
const { performance, PerformanceObserver } = require("perf_hooks");
 
const performanceObserver = new PerformanceObserver((items) => {
	items.getEntries().forEach((entry) => {
		console.log(`${entry.name}: ${entry.duration}`);
	});
});
 
performanceObserver.observe({ entryTypes: ["measure"] });
 
const workerFunction = (array) => {
	return new Promise((resolve, reject) => {
		performance.mark("worker start");
 
		const worker = new Worker("./worker.js", {
			workerData: { array },
		});
 
		worker.on("message", (message) => {
			performance.mark("worker end");
			performance.measure("worker", "worker start", "worker end");
			resolve(message);
		});
	});
};
 
const forkFunction = (array) => {
	return new Promise((resolve, reject) => {
		performance.mark("fork start");
 
		const forkProcess = new fork("./fork.js");
 
		forkProcess.send({ array });
		
		forkProcess.on("message", (message) => {
			performance.mark("fork end");
			performance.measure("fork", "fork start", "fork end");
 
			resolve(message);
		});
	});
};
 
const main = async () => {
	try {
		await workerFunction([25, 19, 48, 30]);
		await forkFunction([25, 19, 48, 30]);
	} catch (err) {
		console.error(err.message);
	}
};
 
main();

Как можно увидеть - разница между обоими выполнениями операций не так велика

И тут нужно поговорить про концептуальне отличия форка и воркера:

  • Форк - это отдельный инстанс ноды, а воркер - это просто новый поток в том же процессе ноды
  • Для коммуниации между форками используется отдельный IPC канал, а для воркера уже используется просто общая память с нодовским процессом

Если говорить итогово, то форк стоит использовать только для очень больших процессов, так как он имеет хоть и небольшой, но оверхед. Если нужно реализовать небольшой процесс, то тут воркер может спасти некоторое количество миллисекунд.

И далее используем пример, в котором мы уже добавим видео для передачи между нашими процессами

app.js

const { Worker } = require('worker_threads');
const { fork } = require('child_process');
const { performance, PerformanceObserver } = require('perf_hooks');
const { readFileSync } = require('fs');
 
// тут мы непосредственно считываем файл, с которым нужно будет работать
const file = readFileSync('./file.mp4');
 
// реализуем отслеживателя нашей производительности
const performanceObserver = new PerformanceObserver((items) => {
	// тут переберём все наши используемые значения для перебора
	items.getEntries().forEach((entry) => {
		console.log(`${entry.name}: ${entry.duration}`);
	});
});
// будем следить за значениями measure
performanceObserver.observe({ entryTypes: ['measure'] });
 
// эта функция будет выполнять свои операции с использованием модуля воркера 
const workerFunction = (array) => {
	return new Promise((resolve, reject) => {
		performance.mark('worker start');
		
		const worker = new Worker('./worker.js', {
			workerData: {
				array,
				file
			}
		});
		worker.on('message', (msg) => {
			performance.mark('worker end');
			performance.measure('worker', 'worker start', 'worker end');
 
			resolve(msg);
		});
	});
};
 
// эта функция будет выполнять свои операции с использованием синтаксиса форка
const forkFunction = (array) => {
	return new Promise((resolve, reject) => {
		performance.mark('fork start');
		
		const forkProcess = fork('./fork.js');
		
		forkProcess.send({ array, file });
		
		forkProcess.on('message', (msg) => {
			performance.mark('fork end');
			performance.measure('fork', 'fork start', 'fork end');
			resolve(msg);
		});
 
	});
};
 
// это точка входа в нашу программу
const main = async () => {
	try {
		// обе функции выполняются асинхронно, чтобы друг другу не мешать
		await workerFunction([25, 20, 19, 48, 30, 50]);
		await forkFunction([25, 20, 19, 48, 30, 50]);
	} catch (e) {
		console.error(e.message);
	}
};
 
// и общий запуск нашей программы
main();

И сейчас разница во времени между выполняемыми процессами стала более заметной. Дело опять же в том, что воркер используется общую память с основным процессом, что позволяет более быстро выполнять операции передачи данных. Форк же, в свою очередь, ппередаёт данные по IPC каналу, что сильно стопорит процесс передачи данных.

Если получится, то стоит всегда использовать на практике воркеры

Однако, если мы используем воркеры, то нам нужно их будет создать и выделить заранее, чтобы не образовывать уязвимости для наших серверов (их легко будет заDDOSить)