Для управления потоковыми данными наподобие видео или большого файла в Node.js реализован отдельный потоковый модуль.

В текущей статье мы сконцентрируемся на некоторых принципах его функционирования.

Разделы статьи

  1. Типы потоков в Node.js;
  2. Буферы потоков;
  3. Генератор событий;
  4. Читающий поток;
  5. Чтение в режиме передачи и паузы;
  6. Функционирование буфера читающего потока.

Типы потоков

В Node существует 4 типа потоков:

  • Читающие потоки — отвечают за организацию потока считывания данных. Например, чтение большого файла крупными частями.
  • Пишущие потоки — позволяют производить запись, в том числе большого количества информации в файл.
  • Дуплексные потоки — позволяют как считывать данные, так и записывать их. Распространенный пример — это соединения между клиентом и сервером.
  • Преобразующие потоки — аналогичны дуплексным, но дополнительно позволяют изменять данные в процессе чтения или записи. В качестве примера можно взять сжатие данных клиентом или сервером перед отправкой запроса или во время его выполнения.

Буферы в потоках

Потоки функционируют на основе буферов. Буфер — это участок временной памяти, используемый потоком для хранения данных до момента их обработки.

Размер буфера определяется свойством highWatermark, находящимся в экземпляре класса потока и обозначающем величину буфера в байтах.

По умолчанию память буфера в Node работает на String и Buffer. Мы также можем организовать ее работу на объектах JavaScript. Для этого нужно установить свойство objectNode потока в значение true.

Если мы отправляем какие-либо данные в поток, то эти данные попадают в буфер и находятся в нем до полной их обработки.

Если же мы попробуем отправить данные в поток при заполненном буфере, то он их не примет и вернет значение false.

Потоки и генераторы событий

Потоки в Node.js расширяют классEventEmitter. Мы же можем прослушивать генерируемые события вроде data или end.

Для этого используется функция stream.on(), доступная в потоке.

Читающие потоки

Такие потоки могут использоваться для считывания файлов с сервера или потокового видео. 

Зачастую можно встретить их сравнение с водопроводным краном, который пропускает через себя жидкость, делая для пользователя доступным ее потребление. 

В этом разделе мы создадим читающий поток для большого текстового файла, чтобы продемонстрировать сам процесс:

import { createReadStream, ReadStream } from 'fs';

var readStream: ReadStream = createReadStream('./data.txt');

readStream.on('data', chunk => {
  console.log('---------------------------------');
  console.log(chunk);
  console.log('---------------------------------');
});

readStream.on('open', () => {
  console.log('Stream opened...');
});

readStream.on('end', () => {
  console.log('Stream Closed...');
});

После выполнения этого кода мы увидим следующее:

Stream opened...
---------------------------------
<Buffer 4c 6f 72 65 6d 20 69 70 73 75 6d 20 64 6f 6c 6f 72 20 73 69 74 20 61 6d 65 74 2c 20 63 6f 6e 73 65 63 74 65 74 75 72 20 61 64 69 70 69 73 63 69 6e 67 ... >
---------------------------------
---------------------------------
<Buffer 74 20 6e 75 6e 63 20 76 69 74 61 65 20 66 65 72 6d 65 6e 74 75 6d 2e 20 49 6e 20 75 74 20 61 72 63 75 20 74 65 6d 70 6f 72 2c 20 66 61 75 63 69 62 75 ... >
---------------------------------
---------------------------------
<Buffer 20 76 69 74 61 65 2c 20 65 67 65 73 74 61 73 20 69 64 20 73 65 6d 2e 20 44 6f 6e 65 63 20 75 74 20 75 6c 74 72 69 63 69 65 73 20 6c 6f 72 65 6d 2c 20 ... >
---------------------------------
Stream Closed...

Мы получаем данные буфера, которые являются байтовыми данными содержимого буфера потока.

Пауза и возобновление читающего потока

Пауза и возобновление производится простым вызовом функции pause() и resume() потока.

Режим передачи и режим паузы

Читающие потоки могут работать в двух режимах:

  • Передача данных — поток постоянно передает данные, которые можно непосредственно прослушать при помощи события data.
  • Пауза —поток не передает данные автоматически. Вместо этого он хранит их в буфере и позволяет считывание при вызове метода read().

Приведенный выше код демонстрирует режим передачи, т.к. мы просто прослушивали события data и функция срабатывала каждый раз, когда из потока поступали новые данные. Далее изображен простой пример режима паузы:

import { createReadStream, ReadStream } from 'fs';

var readStream: ReadStream = createReadStream('./data.txt');

setTimeout(() => {
  const data = readStream.read(10);
  console.log(data);
}, 10);

Выполнение этого кода даст нам следующий результат:

<Buffer 4c 6f 72 65 6d 20 69 70 73 75>

Как это работает? Мы создали читающий поток, используя метод createReadStream из модуля файловой системы.

Как только поток создан, данные файла начинают передаваться в переменную потока. Мы также установили время задержки начала считывания при помощи метода setTimeout, чтобы получить данные из заполненного за это время буфера.

Спустя 10 миллисекунд отсчет setTimeout заканчивается и мы считываем первые 10 байт, применив метод read(), вызванный с аргументом 10 (размер в байтах).

Функционирование буфера читающего потока

В вышеприведенном коде, если мы вызовем функцию read() вновь после console.log(data) и вызовем на печать новые данные, то увидим, что данные на выводе отличаются от предыдущих: 

import { createReadStream, ReadStream } from 'fs';

var readStream: ReadStream = createReadStream('./data.txt');

setTimeout(() => {
  const data = readStream.read(10);
  console.log(data);

  const data2 = readStream.read(10);
  console.log(data2);
}, 10);

Для этого кода вывод будет следующим:

<Buffer 4c 6f 72 65 6d 20 69 70 73 75>
<Buffer 6d 20 64 6f 6c 6f 72 20 73 69>

Логированные значения отличаются, т.к. буфер удаляет данные после их считывания. 

Таким образом, для первого вызова метода read() мы получаем первые 10 байт данных из буфера, а для второго вызова мы получаем данные с 11 по 20 байт от общего объема данных, которые на этот момент являются первыми 10 байтами в буфере.

Читайте также:


Перевод статьи Kunal Tandon: Streams and Buffers in Node.js

Предыдущая статьяОдно слово для «быстрой» Pandas
Следующая статьяDocker для разработки Go с горячей перезагрузкой