流是什么?
这个字进入我脑海我第一时间想到的是一句诗,抽刀断水水更流,举杯消愁...额,今天的主角是流。不好意思差点跑题了,嗯,流是一个抽象接口,被 Node 中的很多对象所实现。比如HTTP服务器request和response对象都是流。本人最近研究node,特意记下,分享一下。
对于流,官方文档是这样描述的:
流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)
Node.js 中有四种基本的流类型:
- Readable - 可读的流 (例如 fs.createReadStream()).
- Writable - 可写的流 (例如 fs.createWriteStream()).
- Duplex - 可读写的流 (例如 net.Socket).
- Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate())
今天主要分享的是node可读流和可写流
可写流
先上个流程图让大家直观了解整个流程
- Open()后write()开始写入
- 判断是否底层写入和缓存区是否小于最高水位线同步或异步进行
- 如果底层在写入中放到缓存区里面,否则就调用底层_write()
- 成功写入后判断缓存区是否有数据,如果有在写入则存加入缓冲区队列中,缓冲区排空后触发 drain 事件;
- 当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 并且返回 false。一旦所有当前所有缓存的数据块都排空了(被操作系统接受来进行输出), 那么 'drain' 事件就会被触发,一旦 write() 返回 false, 在 'drain' 事件触发前, 不能写入任何数据块。
可写流的模拟实现:
let EventEmitter = require('events');class WriteStream extends EventEmitter { constructor(path, options) { super(path, options); this.path = path; this.flags = options.flags || 'w'; this.mode = options.mode || 0o666; this.start = options.start || 0; this.pos = this.start;//文件的写入索引 this.encoding = options.encoding || 'utf8'; this.autoClose = options.autoClose; this.highWaterMark = options.highWaterMark || 16 * 1024; this.buffers = [];//缓存区,源码用的链表 this.writing = false;//表示内部正在写入数据 this.length = 0;//表示缓存区字节的长度 this.open(); } open() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { if (this.autoClose) { this.destroy(); } return this.emit('error', err); } this.fd = fd; this.emit('open'); }); } //如果底层已经在写入数据的话,则必须当前要写入数据放在缓冲区里 write(chunk, encoding, cb) { chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,this.encoding); let len = chunk.length; //缓存区的长度加上当前写入的长度 this.length += len; //判断当前最新的缓存区是否小于最高水位线 let ret = this.length < this.highWaterMark; if (this.writing) {//表示正在向底层写数据,则当前数据必须放在缓存区里 this.buffers.push({ chunk, encoding, cb }); } else {//直接调用底层的写入方法进行写入 //在底层写完当前数据后要清空缓存区 this.writing = true; this._write(chunk, encoding, () => this.clearBuffer()); } return ret; } clearBuffer() { //取出缓存区中的第一个buffer //8 7 let data = this.buffers.shift(); if(data){ this._write(data.chunk,data.encoding,()=>this.clearBuffer()) }else{ this.writing = false; //缓存区清空了 this.emit('drain'); } } _write(chunk, encoding, cb) { if(typeof this.fd != 'number'){ return this.once('open',()=>this._write(chunk, encoding, cb)); } fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,bytesWritten)=>{ if(err){ if(this.autoClose){ this.destroy(); this.emit('error',err); } } this.pos += bytesWritten; //写入多少字母,缓存区减少多少字节 this.length -= bytesWritten; cb && cb(); }) } destroy() { fs.close(this.fd, () => { this.emit('close'); }) }}module.exports = WriteStream;
可读流
可读流事实上工作在下面两种模式之一:flowing 和 paused
- 在 flowing 模式下, 可读流自动从系统底层读取数据,并通过 EventEmitter 接口的事件尽快将数据提供给应用。
- 在 paused 模式下,必须显式调用 stream.read() 方法来从流中读取数据片段。
flowing 流动模式
流动模式比较简单,代码实现如下:
let EventEmitter = require('events');let fs = require('fs');class ReadStream extends EventEmitter { constructor(path, options) { super(path, options); this.path = path; this.flags = options.flags || 'r'; this.mode = options.mode || 0o666; this.highWaterMark = options.highWaterMark || 64 * 1024; this.pos = this.start = options.start || 0; this.end = options.end; this.encoding = options.encoding; this.flowing = null; this.buffer = Buffer.alloc(this.highWaterMark); this.open();//准备打开文件读取 //当给这个实例添加了任意的监听函数时会触发newListener this.on('newListener',(type,listener)=>{ //如果监听了data事件,流会自动切换的流动模式 if(type == 'data'){ this.flowing = true; this.read(); } }); } read(){ if(typeof this.fd != 'number'){ return this.once('open',()=>this.read()); } let howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark; //this.buffer并不是缓存区 console.log('howMuchToRead',howMuchToRead); fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytes)=>{//bytes是实际读到的字节数 if(err){ if(this.autoClose) this.destroy(); return this.emit('error',err); } if(bytes){ let data = this.buffer.slice(0,bytes); this.pos += bytes; data = this.encoding?data.toString(this.encoding):data; this.emit('data',data); if(this.end && this.pos > this.end){ return this.endFn(); }else{ if(this.flowing) this.read(); } }else{ return this.endFn(); } }) } endFn(){ this.emit('end'); this.destroy(); } open() { fs.open(this.path,this.flags,this.mode,(err,fd)=>{ if(err){ if(this.autoClose){ this.destroy(); return this.emit('error',err); } } this.fd = fd; this.emit('open'); }) } destroy(){ fs.close(this.fd,()=>{ this.emit('close'); }); } pipe(dest){ this.on('data',data=>{ let flag = dest.write(data); if(!flag){ this.pause(); } }); dest.on('drain',()=>{ this.resume(); }); } //可读流会进入流动模式,当暂停的时候, pause(){ this.flowing = false; } resume(){ this.flowing = true; this.read(); }}module.exports = ReadStream;
paused 暂停模式:
暂停模式逻辑有点复杂, 画了一张图梳理一下
_read 方法是把数据存在缓存区中,因为是异步 的,流是通过readable事件来通知消耗方的。
说明一下,流中维护了一个缓存,当缓存中的数据足够多时,调用read()不会引起_read()的调用,即不需要向底层请求数据。state.highWaterMark是给缓存大小设置的一个上限阈值。如果取走n个数据后,缓存中保有的数据不足这个量,便会从底层取一次数据暂停模式代码模拟实现:
let fs = require('fs');let EventEmitter = require('events');class ReadStream extends EventEmitter { constructor(path, options) { super(path, options); this.path = path; this.highWaterMark = options.highWaterMark || 64 * 1024; this.buffer = Buffer.alloc(this.highWaterMark); this.flags = options.flags || 'r'; this.encoding = options.encoding; this.mode = options.mode || 0o666; this.start = options.start || 0; this.end = options.end; this.pos = this.start; this.autoClose = options.autoClose || true; this.bytesRead = 0; this.closed = false; this.flowing; this.needReadable = false; this.length = 0; this.buffers = []; this.on('end', function () { if (this.autoClose) { this.destroy(); } }); this.on('newListener', (type) => { if (type == 'data') { this.flowing = true; this.read(); } if (type == 'readable') { this.read(0); } }); this.open(); } open() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { if (this.autoClose) { this.destroy(); return this.emit('error', err); } } this.fd = fd; this.emit('open'); }); } read(n) { if (typeof this.fd != 'number') { return this.once('open', () => this.read()); } n = parseInt(n, 10); if (n != n) { n = this.length; } if (this.length == 0) this.needReadable = true; let ret; if (0 < n < this.length) { ret = Buffer.alloc(n); let b; let index = 0; while (null != (b = this.buffers.shift())) { for (let i = 0; i < b.length; i++) { ret[index++] = b[i]; if (index == ret.length) { this.length -= n; b = b.slice(i + 1); this.buffers.unshift(b); break; } } } if (this.encoding) ret = ret.toString(this.encoding); } //数据存缓存区中 let _read = () => { let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark; fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => { if (err) { return } let data; if (bytesRead > 0) { data = this.buffer.slice(0, bytesRead); this.pos += bytesRead; this.length += bytesRead; if (this.end && this.pos > this.end) { if (this.needReadable) { this.emit('readable'); } this.emit('end'); } else { this.buffers.push(data); if (this.needReadable) { this.emit('readable'); this.needReadable = false; } } } else { if (this.needReadable) { this.emit('readable'); } return this.emit('end'); } }) } if (this.length == 0 || (this.length < this.highWaterMark)) { _read(0); } return ret; } destroy() { fs.close(this.fd, (err) => { this.emit('close'); }); } pause() { this.flowing = false; } resume() { this.flowing = true; this.read(); } pipe(dest) { this.on('data', (data) => { let flag = dest.write(data); if (!flag) this.pause(); }); dest.on('drain', () => { this.resume(); }); this.on('end', () => { dest.end(); }); }}module.exports = ReadStream;
小弟我能力有限,欢迎各位大神指点,谢谢~