Node.js流模式编程详解

本文是Node.js设计模式的笔记, 代码都是自 <Node.js Design
Patterns> by Mario Casciaro.

流动的关键

一般我们处理数量来些许种模式, 一栽是buffer模式, 一栽是stream模式,
buffer模式就是取了数据一次性操作, stream模式就是是边取数据边操作.
选举个例子, 如果打开一个2G底文本, 用buffer模式就是先期分配2G的内存,
把公文全部朗诵出来, 然后开操作内存, 而用流模式的法门就是无尽读数据,
边开始处理.

自此处看到stream模式无论是当空间和时间及且优于buffer模式:
每当半空中达到, 内存只会占当前内需处理的均等块数据区域之大大小小,
而无是全部文件.
每当时光上, 因为不待全的数码就足以起来拍卖, 时间就相当给节约了,
从串行变成了并行操作(这里的相不是多线程的相,
而是生产者与买主互动).

还有一个利益就是是链式调用, 也不怕是不过做操作, 大大增加了代码的可重用性.
仍下面这个代码(中间的pipe可以十分便利的增删):

fs.createReadStream(file)
     .pipe(zlib.createGzip())
     //.pipe(crypto.createCipher('aes192', 'secret'))
     .pipe(req)
     .on('finish', function() {
       console.log('File succesfully sent');
     });

始于编码

nodejs里面的stream一般分四种, 其中转换流是同一栽非常之读写流.

  • 输入流(stream.Readable)
  • 输出流(stream.Writable)
  • 读写流(stream.Duplex)
  • 转换流(stream.Transform)

此外, nodejs里面的流有两种模式, 二进制模式和对象模式.

  • 亚进制模式, 每个分块都是buffer或者string对象.
  • 靶模式, 流内部处理的凡一模一样多级普通对象.

输入流(stream.Readable)

先期看一下庸采取输入流, 这里一般发生些许种植方法, 一个凡不流动模式,
一个是流动模式.
不流动模式就是是一直调用read()方法, 被动模式就是监听data事件.
脚直接扣代码:

// 非流动模式
process.stdin
    .on('readable', function() {
        // 有数据到了, 拼命读, 直到读完.
        var chunk;
        console.log('New data available');
        while((chunk = process.stdin.read()) !== null) {
            console.log(
            'Chunk read: (' + chunk.length + ') "' + chunk.toString() + '"'
            );
        }})
    .on('end', function() {
       process.stdout.write('End of stream');
    });

连下看流动模式怎么耍

// 流动模式
process.stdin
     .on('data', function(chunk) {
       console.log('New data available');
       console.log(
         'Chunk read: (' + chunk.length + ')" ' +
         chunk.toString() + '"'
       );
     })
     .on('end', function() {
       process.stdout.write('End of stream');
     });

同一实现一个输入流也格外简单, 主要是

  1. 继承Readable类
  2. 实现_read(size)接口(一般带产划线的代表中函数, 调用者不要直接调用,
    相当给C++里面的protect方法, 只是javascript里面没指向章程做区分,
    只能是命名上面分别一下了).
    脚看示例代码:

// randomStream.js
var stream = require('stream');
var util = require('util');
var chance = require('chance').Chance();

function RandomStream(options) {
    // option支持3个参数
    // encoding String 用于转换Buffer到String的编码类型(默认null)
    // objMode Boolean 用户指定是否是对象模式(默认false)
    // highWaterMark Number 最高水位(可读的最大数据量), 默认是16K
    stream.Readable.call(this, options);
}
util.inherits(RandomStream, stream.Readable);
RandomStream.prototype._read = function(size) {
    // 这是一个随机产生数据的流, 5%的概率输出null, 也就是流停止.
    var chunk = chance.string();
    console.log('Pushing chunk of size:' + chunk.length);
    this.push(chunk, 'utf8');
    if(chance.bool({likelihood: 5})) {
       this.push(null);
    }
}
module.exports = RandomStream;

哼, 接下来是安使:

// generateRandom.js
var RandomStream = require('./randomStream');
var randomStream = new RandomStream();
randomStream.on('readable', function() {
    var chunk;
    while((chunk = randomStream.read()) !== null) {
        console.log("Chunk received: " + chunk.toString());
    }
});

输出流(stream.Writable)

第一怎么用:

// 写数据
writable.write(chunk, [encoding], [callback]);
// 结束流
writable.end([chunk], [encoding], [callback])

回压(back-pressure)

此地涉及一个概念, 回压(back-pressure),
意思就是当生产者速度高于消费者的时光, 输出流的水位会没完没了升起,
当到达设定的高水位时候, 就会见刻画副失败, 这上啊就算是发了back-pressure,
那什么样处理也, 此时输入流于水位下降到零点的时节会起一个drain事件发送,
只要监听是事件, 在事变产生的早晚就是好继承往流动写副数据了.
直白看代码:

var chance = require('chance').Chance();
require('http').createServer(function (req, res) {
    res.writeHead(200, {'Content-Type': 'text/plain'});
    function generateMore() {             //[1]
        while(chance.bool({likelihood: 95})) {
            var shouldContinue = res.write(
                chance.string({length: (16 * 1024) – 1})
            );
            if(!shouldContinue) {             //[3]
                console.log('Backpressure');
                return res.once('drain', generateMore);
            }
        }
        res.end('\nThe end...\n', function() {
            console.log('All data was sent');
        });
    }
    generateMore();
}).listen(8080, function () {
  console.log('Listening');
});

一致, 实现一个输出流也颇粗略, 只要继续Writable类,
实现_write()接口就可.
示范代码:

// toFileStream.js
var stream = require('stream');
var fs = require('fs');
var util = require('util');
var path = require('path');
var mkdirp = require('mkdirp');

function ToFileStream() {
    // 这次我们用对象模式
    stream.Writable.call(this, {objectMode: true});
};
util.inherits(ToFileStream, stream.Writable);
ToFileStream.prototype._write = function(chunk, encoding, callback) {
    var self = this;
    mkdirp(path.dirname(chunk.path), function(err) {
        if(err) {
            return callback(err);
        }
        fs.writeFile(chunk.path, chunk.content, callback);
    });
}
module.exports = ToFileStream;

下面是调用的代码

var ToFileStream = require('./toFileStream');
var tfs = new ToFileStream();
tfs.write({path: "file1.txt", content: "Hello"});
tfs.write({path: "file2.txt", content: "Node.js"});
tfs.write({path: "file3.txt", content: "Streams"});
tfs.end(function() {
    console.log("All files created");
});

读写流(stream.Duplex)

虽把输入流和输出流的接口都落实了.
注意:
这时候option参数是同时传被了中间的Readable和Writeable,
如果要采用不同之选料项, 就要分开配置,
像这样:
this._writableState.objectMode
this._readableState.objectMode
同时, Duplex又大多了一个选项allowHalfOpen, 这个选项之意是,
当其中一个流关闭的时刻, 另外一条流是否也又关闭, 默认是true,
也就是例外时关闭.

转换流(stream.Transform)

对此读写流来说, 要兑现的凡 _read() 和 _write() 接口,
而转换流要实现的凡 _transform() 和 _flush()接口.
别是啊, 转换流一般在transform的进程遭到拿读写都开了,
也便是以处理输入的早晚, 直接就是输出了. 最后以输入了之时光_flush()
会被调用, 就足以拿剩余的里数据一致连出口了.

示范代码:

// 这代码写的很漂亮, 解决的问题是在流中操作替换操作
// 其中替换的部分可以仔细看一下, stream和buffer一个很大的区别就是stream会被切割
// 导致要替换的数据也有可能被切割, 这个例子就提供了一种解决方法, 
// 这个在后续实践中肯定也会遇到的.
var stream = require('stream');
var util = require('util');
function ReplaceStream(searchString, replaceString) {
    stream.Transform.call(this, {decodeStrings: false});
    this.searchString = searchString;
    this.replaceString = replaceString;
    this.tailPiece = '';
}
util.inherits(ReplaceStream, stream.Transform);
ReplaceStream.prototype._transform = function(chunk, encoding, callback) {
    var pieces = (this.tailPiece + chunk).split(this.searchString);
    var lastPiece = pieces[pieces.length - 1];
    var tailPieceLen = this.searchString.length - 1;
    this.tailPiece = lastPiece.slice(-tailPieceLen);
    pieces[pieces.length - 1] = lastPiece.slice(0, -tailPieceLen);
    this.push(pieces.join(this.replaceString));       //[3]
    callback();
}
ReplaceStream.prototype._flush = function(callback) {
    this.push(this.tailPiece);
    callback();
}
module.exports = ReplaceStream;

差一点只流操作相关的有因此包

程序员就是懒, 有应声几个确保就是得少写有代码了.

  • readable-stream, 统一了nodejs 实现的异版本stream接口
  • [through2](https://npmjs.org/
    package/through2), 用于快速创建转化流
  • from2,
    用于快速创建输入流
  • writable2,
    用于快速创建输出流