博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
深入理解Node中可读流和可写流
阅读量:6157 次
发布时间:2019-06-21

本文共 10655 字,大约阅读时间需要 35 分钟。

12.jpg

流是什么?

这个字进入我脑海我第一时间想到的是一句诗,抽刀断水水更流,举杯消愁...额,今天的主角是流。不好意思差点跑题了,嗯,流是一个抽象接口,被 Node 中的很多对象所实现。比如HTTP服务器request和response对象都是流。本人最近研究node,特意记下,分享一下。

对于流,官方文档是这样描述的:

流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)

Node.js 中有四种基本的流类型:

  • Readable - 可读的流 (例如 fs.createReadStream()).
  • Writable - 可写的流 (例如 fs.createWriteStream()).
  • Duplex - 可读写的流 (例如 net.Socket).
  • Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate())

今天主要分享的是node可读流和可写流

可写流

先上个流程图让大家直观了解整个流程

Writable.png

  • Open()后write()开始写入
  • 判断是否底层写入和缓存区是否小于最高水位线同步或异步进行
  • 如果底层在写入中放到缓存区里面,否则就调用底层_write()
  • 成功写入后判断缓存区是否有数据,如果有在写入则存加入缓冲区队列中,缓冲区排空后触发 drain 事件;
  • 当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 并且返回 false。一旦所有当前所有缓存的数据块都排空了(被操作系统接受来进行输出), 那么 'drain' 事件就会被触发,一旦 write() 返回 false, 在 'drain' 事件触发前, 不能写入任何数据块。

可写流的模拟实现:

let EventEmitter = require('events');class WriteStream extends EventEmitter {    constructor(path, options) {        super(path, options);        this.path = path;        this.flags = options.flags || 'w';        this.mode = options.mode || 0o666;        this.start = options.start || 0;        this.pos = this.start;//文件的写入索引        this.encoding = options.encoding || 'utf8';        this.autoClose = options.autoClose;        this.highWaterMark = options.highWaterMark || 16 * 1024;        this.buffers = [];//缓存区,源码用的链表        this.writing = false;//表示内部正在写入数据        this.length = 0;//表示缓存区字节的长度        this.open();    }    open() {        fs.open(this.path, this.flags, this.mode, (err, fd) => {            if (err) {                if (this.autoClose) {                    this.destroy();                }                return this.emit('error', err);            }            this.fd = fd;            this.emit('open');        });    }    //如果底层已经在写入数据的话,则必须当前要写入数据放在缓冲区里    write(chunk, encoding, cb) {        chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,this.encoding);        let len = chunk.length;        //缓存区的长度加上当前写入的长度        this.length += len;        //判断当前最新的缓存区是否小于最高水位线        let ret = this.length < this.highWaterMark;        if (this.writing) {//表示正在向底层写数据,则当前数据必须放在缓存区里            this.buffers.push({                chunk,                encoding,                cb            });        } else {//直接调用底层的写入方法进行写入            //在底层写完当前数据后要清空缓存区            this.writing = true;            this._write(chunk, encoding, () => this.clearBuffer());        }        return ret;    }    clearBuffer() {        //取出缓存区中的第一个buffer        //8 7        let data = this.buffers.shift();        if(data){            this._write(data.chunk,data.encoding,()=>this.clearBuffer())        }else{            this.writing = false;            //缓存区清空了            this.emit('drain');        }    }    _write(chunk, encoding, cb) {       if(typeof this.fd != 'number'){           return this.once('open',()=>this._write(chunk, encoding, cb));       }        fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,bytesWritten)=>{            if(err){                if(this.autoClose){                    this.destroy();                    this.emit('error',err);                }            }            this.pos += bytesWritten;            //写入多少字母,缓存区减少多少字节            this.length -= bytesWritten;            cb && cb();       })    }    destroy() {        fs.close(this.fd, () => {            this.emit('close');        })    }}module.exports = WriteStream;

可读流

可读流事实上工作在下面两种模式之一:flowing 和 paused

  • 在 flowing 模式下, 可读流自动从系统底层读取数据,并通过 EventEmitter 接口的事件尽快将数据提供给应用。
  • 在 paused 模式下,必须显式调用 stream.read() 方法来从流中读取数据片段。

flowing 流动模式

流动模式比较简单,代码实现如下:

let EventEmitter = require('events');let fs = require('fs');class ReadStream extends EventEmitter {    constructor(path, options) {        super(path, options);        this.path = path;        this.flags = options.flags || 'r';        this.mode = options.mode || 0o666;        this.highWaterMark = options.highWaterMark || 64 * 1024;        this.pos = this.start = options.start || 0;        this.end = options.end;        this.encoding = options.encoding;        this.flowing = null;        this.buffer = Buffer.alloc(this.highWaterMark);        this.open();//准备打开文件读取        //当给这个实例添加了任意的监听函数时会触发newListener        this.on('newListener',(type,listener)=>{            //如果监听了data事件,流会自动切换的流动模式            if(type == 'data'){              this.flowing = true;              this.read();            }        });    }    read(){        if(typeof this.fd != 'number'){            return this.once('open',()=>this.read());        }        let howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark;        //this.buffer并不是缓存区        console.log('howMuchToRead',howMuchToRead);        fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytes)=>{//bytes是实际读到的字节数            if(err){                if(this.autoClose)                    this.destroy();                return this.emit('error',err);            }            if(bytes){                let data = this.buffer.slice(0,bytes);                this.pos += bytes;                data = this.encoding?data.toString(this.encoding):data;                this.emit('data',data);                if(this.end && this.pos > this.end){                   return this.endFn();                }else{                    if(this.flowing)                      this.read();                }            }else{                return this.endFn();            }        })    }    endFn(){        this.emit('end');        this.destroy();    }    open() {        fs.open(this.path,this.flags,this.mode,(err,fd)=>{           if(err){               if(this.autoClose){                   this.destroy();                   return this.emit('error',err);               }           }           this.fd = fd;           this.emit('open');        })    }    destroy(){        fs.close(this.fd,()=>{            this.emit('close');        });    }    pipe(dest){        this.on('data',data=>{            let flag = dest.write(data);            if(!flag){                this.pause();            }        });        dest.on('drain',()=>{            this.resume();        });    }    //可读流会进入流动模式,当暂停的时候,    pause(){        this.flowing = false;    }    resume(){       this.flowing = true;       this.read();    }}module.exports = ReadStream;

paused 暂停模式:

暂停模式逻辑有点复杂, 画了一张图梳理一下

Readable.png

_read 方法是把数据存在缓存区中,因为是异步 的,流是通过readable事件来通知消耗方的。

说明一下,流中维护了一个缓存,当缓存中的数据足够多时,调用read()不会引起_read()的调用,即不需要向底层请求数据。state.highWaterMark是给缓存大小设置的一个上限阈值。如果取走n个数据后,缓存中保有的数据不足这个量,便会从底层取一次数据

暂停模式代码模拟实现:

let fs = require('fs');let EventEmitter = require('events');class ReadStream extends EventEmitter {    constructor(path, options) {        super(path, options);        this.path = path;        this.highWaterMark = options.highWaterMark || 64 * 1024;        this.buffer = Buffer.alloc(this.highWaterMark);        this.flags = options.flags || 'r';        this.encoding = options.encoding;        this.mode = options.mode || 0o666;        this.start = options.start || 0;        this.end = options.end;        this.pos = this.start;        this.autoClose = options.autoClose || true;        this.bytesRead = 0;        this.closed = false;        this.flowing;        this.needReadable = false;        this.length = 0;        this.buffers = [];        this.on('end', function () {            if (this.autoClose) {                this.destroy();            }        });        this.on('newListener', (type) => {            if (type == 'data') {                this.flowing = true;                this.read();            }            if (type == 'readable') {                this.read(0);            }        });        this.open();    }    open() {        fs.open(this.path, this.flags, this.mode, (err, fd) => {            if (err) {                if (this.autoClose) {                    this.destroy();                    return this.emit('error', err);                }            }            this.fd = fd;            this.emit('open');        });    }    read(n) {        if (typeof this.fd != 'number') {            return this.once('open', () => this.read());        }        n = parseInt(n, 10);        if (n != n) {            n = this.length;        }        if (this.length == 0)            this.needReadable = true;        let ret;        if (0 < n < this.length) {            ret = Buffer.alloc(n);            let b;            let index = 0;            while (null != (b = this.buffers.shift())) {                for (let i = 0; i < b.length; i++) {                    ret[index++] = b[i];                    if (index == ret.length) {                        this.length -= n;                        b = b.slice(i + 1);                        this.buffers.unshift(b);                        break;                    }                }            }            if (this.encoding) ret = ret.toString(this.encoding);        }        //数据存缓存区中        let _read = () => {            let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;            fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {                if (err) {                    return                }                let data;                if (bytesRead > 0) {                    data = this.buffer.slice(0, bytesRead);                    this.pos += bytesRead;                    this.length += bytesRead;                    if (this.end && this.pos > this.end) {                        if (this.needReadable) {                            this.emit('readable');                        }                        this.emit('end');                    } else {                        this.buffers.push(data);                        if (this.needReadable) {                            this.emit('readable');                            this.needReadable = false;                        }                    }                } else {                    if (this.needReadable) {                        this.emit('readable');                    }                    return this.emit('end');                }            })        }        if (this.length == 0 || (this.length < this.highWaterMark)) {            _read(0);        }        return ret;    }    destroy() {        fs.close(this.fd, (err) => {            this.emit('close');        });    }    pause() {        this.flowing = false;    }    resume() {        this.flowing = true;        this.read();    }    pipe(dest) {        this.on('data', (data) => {            let flag = dest.write(data);            if (!flag) this.pause();        });        dest.on('drain', () => {            this.resume();        });        this.on('end', () => {            dest.end();        });    }}module.exports = ReadStream;

小弟我能力有限,欢迎各位大神指点,谢谢~

转载地址:http://fssfa.baihongyu.com/

你可能感兴趣的文章
csrf 跨站请求伪造相关以及django的中间件
查看>>
MySQL数据类型--与MySQL零距离接触2-11MySQL自动编号
查看>>
生日小助手源码运行的步骤
查看>>
Configuration python CGI in XAMPP in win-7
查看>>
bzoj 5006(洛谷 4547) [THUWC2017]Bipartite 随机二分图——期望DP
查看>>
CF 888E Maximum Subsequence——折半搜索
查看>>
欧几里德算法(辗转相除法)
查看>>
面试题1-----SVM和LR的异同
查看>>
MFC控件的SubclassDlgItem
查看>>
如何避免历史回退到登录页面
查看>>
《图解HTTP》1~53Page Web网络基础 HTTP协议 HTTP报文内的HTTP信息
查看>>
unix环境高级编程-高级IO(2)
查看>>
树莓派是如何免疫 Meltdown 和 Spectre 漏洞的
查看>>
雅虎瓦片地图切片问题
查看>>
HTML 邮件链接,超链接发邮件
查看>>
HDU 5524:Subtrees
查看>>
手机端userAgent
查看>>
pip安装Mysql-python报错EnvironmentError: mysql_config not found
查看>>
http协议组成(请求状态码)
查看>>
怎样成为一个高手观后感
查看>>