Stream 借鉴自 Unix 编程哲学中的 pipe。
Unix shell 命令中,管道式的操作 | 将上一个命令的输出作为下一个命令的输入。Node.js stream 中则是通过 .pip() 方法来进行的。
来看一个 stream 的运用场景:从服务器读取文件并返回给页面。
朴素的实现: var http = require('http'); var fs = require('fs'); var server = http.createServer(function (req, res) { fs.readFile(__dirname + '/data.txt', function (err, data) { res.end(data); }); }); server.listen(8000); stream 实现: var http = require('http'); var fs = require('fs'); var server = http.createServer(function (req, res) { var stream = fs.createReadStream(__dirname + '/data.txt'); stream.pipe(res); }); server.listen(8000);好处:
代码更加简洁。可自由组合各种模块来处理数据。分五种:
readablewritableduplextransformclassicreadable 类型的流产生的数据,可通过 .pip() 输送到能够消费(consume)流数据的地方,比如 writable,transform,duplex 类型的对象。
一个 readable stream 示例:
var Readable = require('stream').Readable; var rs = new Readable; rs.push('beep '); rs.push('boop\n'); rs.push(null); rs.pipe(process.stdout);运行结果:
$ node read0.js beep boop上面 rs.push(null) 表示没有更多数据了。直接将数据塞入到 readable 流中,然后被缓冲起来,直到被消费(示例中消费方为 process.stdout,即输出到命令行。)。因为消费者有可能并不能立即消费这些内容,直接 push 数据后会消耗不必要的资源。
更好的做法是,让 readable 流只在消费者需要数据的时候再 push。这是通过定义能 raedable 对象定义 ._read 方法来完成的。
var Readable = require('stream').Readable; var rs = Readable(); var c = 97; rs._read = function () { rs.push(String.fromCharCode(c++)); if (c > 'z'.charCodeAt(0)) rs.push(null); }; rs.pipe(process.stdout);运行结果:
$ node read1.js abcdefghijklmnopqrstuvwxyz这种方式下,定义了 readable 流产生数据的方法 ._read,但并没有马上执行并输出数据,而是在 process.stdout 读取时,才调用输出的。
_read 方法可动态接收一个可选的 size 参数,由消费方指定一次读取想要多少字节的数据,当然,_read 方法的实现中是可以忽略这个入参的。
下面的示例可证明 _read 方法是消费方调用的时候才执行的,而不是主动执行。
var Readable = require('stream').Readable; var rs = Readable(); var c = 97 - 1; rs._read = function () { if (c >= 'z'.charCodeAt(0)) return rs.push(null); setTimeout(function () { rs.push(String.fromCharCode(++c)); }, 100); }; rs.pipe(process.stdout); process.on('exit', function () { console.error('\n_read() called ' + (c - 97) + ' times'); }); process.stdout.on('error', process.exit);上面展示的是输出简单字符串,如果需要输出其他复杂数据,初始化时设置上正确的 objectMode 参数,Readable({ objectMode: true })
一般情况下, 我们会将 readable 产生的数据直接传递给其他的消费方,比如 through 或 concat-stream 创建的流对象。但直接操作来自 readable 的数据也是可以的。
process.stdin.on('readable', function () { var buf = process.stdin.read(); console.dir(buf); });上面的示例代码中,监听了来自命令行的输入,有数据时 stdin readable 事件便会触发,然后可通过调用 read() 方法获取到数据。数据结束时 read() 返回 null 表示没有更多数据了。
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js <Buffer 61 62 63 0a> <Buffer 64 65 66 0a> <Buffer 67 68 69 0a> null同时 read() 方法支持传递一个 size 指定一次获取多少的字节(bytes)。比如一次只获取 3 字节的数据:
process.stdin.on('readable', function () { var buf = process.stdin.read(3); console.dir(buf); });这将导致我们获取到的数据是不完整的,因为指定了尺寸后,超出的部分是拿不到的。
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js <Buffer 61 62 63> <Buffer 0a 64 65> <Buffer 66 0a 67>此时可通过调用 read(0) 告诉 Node.js 我们不希望丢弃超出的部分。这样超出的部分会在后面的读取中陆续输出。
process.stdin.on('readable', function () { var buf = process.stdin.read(3); console.dir(buf); + process.stdin.read(0); }); $ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js <Buffer 61 62 63> <Buffer **0a** 64 65> <Buffer 66 **0a** 67> <Buffer 68 69 **0a**>除了调用 read(0),还可调用 unshift() 方法将多余数据放回去。这样下次 read() 的时候数据都还在。比如一个解析换行的示例,遇到换行时将本行输出,剩下的数据塞回,以在下一行处理时使用。
var offset = 0; process.stdin.on('readable', function () { var buf = process.stdin.read(); if (!buf) return; for (; offset < buf.length; offset++) { if (buf[offset] === 0x0a) { console.dir(buf.slice(0, offset).toString()); buf = buf.slice(offset + 1); offset = 0; process.stdin.unshift(buf); return; } } process.stdin.unshift(buf); }); $ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js 'hearties' 'heartiest' 'heartily' 'heartiness' 'heartiness\'s' 'heartland' 'heartland\'s' 'heartlands' 'heartless' 'heartlessly'writable 流可作为 .pip() 的对象。
src.pipe(writableStream)需要实现 ._write(chunk, enc, next) 方法,其中:
chunk 为接收到的数据enc 当 opts.decodeString 为 false 且收到的数据这字符串时,它表示字符串的编码next(err) 数据处理后的回调,可传递一个错误信息以表示数据处理失败默认情况下,获取到的字符串数据会转为 Buffer,可设置 Writable({ decodeStrings: false }) 来获取字符串数据。
一个 writable 示例:
var Writable = require('stream').Writable; var ws = Writable(); ws._write = function (chunk, enc, next) { console.dir(chunk); next(); }; process.stdin.pipe(ws);通过调用 writable 流的 write 方法来写入。
process.stdout.write('beep boop\n');通过调用 end() 来结束数据的写入。
var fs = require('fs'); var ws = fs.createWriteStream('message.txt'); ws.write('beep '); setTimeout(function () { ws.end('boop\n'); }, 1000);双工类型的流,同时具有 writable 和 readable 流的功能。Node.js 内建的 zlib,TCP sockets 以及 crypto 都是双工类型的。
所以可对双工类型的流进行如下操作:
a.pip(b).pip(a)一种特殊类型的双工流,区别在于 transform 类型其输出是输入的转换。跟它的名字一样,这里面对数据进行一些转换后输出。比如,通过 zlib.createGzip 来对数据进行 gzip 的压缩。有时候也将这种类型的流称为 through steam。
这里指使用旧版 api 的流。当一个流身上绑定了 data 事件的监听时,便会回退为经典旧版的流。
当有数据时它会派发 data 事件,数据输出结束时派发 end 事件给消费者。
.pipe() 通过检查 stream.readable 以判断该流是否是 readable 类型。
一个 classic readable 流的创建示例:
var Stream = require('stream'); var stream = new Stream; stream.readable = true; var c = 64; var iv = setInterval(function () { if (++c >= 75) { clearInterval(iv); stream.emit('end'); } else stream.emit('data', String.fromCharCode(c)); }, 100); stream.pipe(process.stdout);数据读取是通过监听流上的 data 与 end 事件。
一个从 classic readable 流读取数据的示例:
process.stdin.on('data', function (buf) { console.log(buf); }); process.stdin.on('end', function () { console.log('__END__'); });一般不建议通过这种方式来操作,一旦给流绑定 data 事件处理器,即回退到旧的 api 来使用流。如果真的有兼容操作旧版流的需求,应该通过 through 或 concat-stream 来进行。
只需要实现 .write(buf), .end(buf) 及 .destroy() 方法即可,比较简单。
本质上,所有流都是 EventEmitter,通过事件可写入和读取数据。但通过新的 stream api,可方便地通过 .pipe() 方法来使用流而不是事件的方式。
使用 stream 可显著提高程序性能,特别是处理数据量大的情况下。它可以将数据分片处理,而不是粗暴地将数据看作一个整体。但掌握并熟练是需要一定时间练习的。
转载于:https://www.cnblogs.com/Wayou/p/node_stream.html
相关资源:node.js中 stream使用教程