Skip to content

ReadStream 类继承了 Readable 类,Readable 类继承了 EventEmitter 类,Readable 只是 提供了一个骨架。

rm.read() => Readable.prototype.read() => ReadStream.prototype._read()

ReadStream 构造器中:

  • 参数处理

  • 通过 offset 控制写入内存的偏移,默认为 0。

  • 通过 flowing 来控制 _read 的调用,默认为 false。

  • 调用 open() 方法,内部 this.emit('open', fd),源码中内部还会直接 fs.read() 读取 文件内容,这里优化成当用户监听 data 时再开始读取文件内容,因此构造器中监听 newListener,当 type 为 data 时 flowing 更新为 true 并调用 _read()

open:

  • 内部调用 fs.open(),fs.open() 回调中 this.fd = fd,this.emit('open', fd)

_read:

  • 订阅 data 后的回调中调用 _read(),由于 fs.open() 是异步,这里是同步,导致 _read 中 this.fd 为 undefined,通过 immediate 或者 once 解决。 再次执行时 _read 时 通过 fs.read() 读取内容,如果没配置 end 读取大小就是 highWaterMark,配置了 end 就是 Math.min(this.highWaterMark, this.end - this.offset + 1) 将读取到的内容通过 this.emit('data', buffer.slice(0, bytesRead)) 发布出去, 之后 this.offset += bytesRead 更新 offset,再判断 this.flowing 是否为 true, 为 true 则递归调用 _read() 进行下一轮读取。当读取不到内容时 this.emit('end') 并调用 destory()。

destory:

  • 如果参数 er 有值,就 this.emit('error', er)。

  • 否则通过 typeof fd === 'number' && autoClose 决定是否调用 fs.close()

pause:

  • this.flowing = false

resume:

  • 如果 this.flowing 为 false,this.flowing = true,this._read() 恢复读取。
js
const EventEmitter = require('events')

class ReadStrem extends EventEmitter {
  constructor (path, options = {}) {
    super()
    
    // 处理参数
    this.path = path
    this.flags = options.flags || 'r'
    this.encoding = options.encoding || null
    this.autoClose = options.autoClose || true
    this.start = options.start || 0
    this.end = options.end || undefined
    this.highWaterMark = options.highWaterMark || 64 * 1024
    this.offset = 0
    this.flowing = false // 默认不是流动模式

    // 打开文件,内部调用 fs.open(),是异步的,没法立即获取 fd。
    this.open()
    // 用户可能直接监听 data,此时 fd 为 undefined,延迟执行。
    this.on('newListener', type => {
      if (type === 'data') {
        // 真正的读取方法,源码中触发 Readable.read 方法,内部会调用子类自己实现的
        // _read 方法
        this.flowing = true // 流动模式
        this._read()
      }
    })
  }

  _read () {
    // this.fd 是 undefined,open 是 I/O,在 poll 中,_read 是同步,拿不到 fd。
    // 放入 process.nextTick 也不行,执行完一个宏任务才开始清空所有微任务。
    // 放入定时器中不能保证一定在 I/O 执行完之前放入队列,可能走的更慢。
    // 监听 open,能保证拿到 fd。
    if (typeof this.fd !== 'number') {
      // 等 open 触发了,再调用一次 _read,相当于延迟了 _read 操作。 
      return this.once('open', () => this._read())
    }

    // 或者
    // if (typeof this.fd !== 'number') {
    //   setImmediate(() => {
    //     this._read()
    //   })
    //   return
    // }
    
    // 如果传了 end,就不一定是读取全部,所以不能直接用 highWaterMark
    // 123 456 789 1 => 没传 end
    // 123 456 78    => end 为 7
    const howMuchToRead = this.end
      ? Math.min(this.highWaterMark, this.end - this.offset + 1)
      : this.highWaterMark
    // Buffer 不能复用,不然后面写入的数据会覆盖之前的。
    const buffer = Buffer.alloc(howMuchToRead)
    fs.read(this.fd, buffer, 0, howMuchToRead, this.offset, (er, bytesRead) => {
      if (er) {
        this.destory(er)
      }

      if (bytesRead > 0) {
        this.emit('data', buffer.slice(0, bytesRead))
        this.offset += bytesRead
        
        if (this.flowing) {
          this._read() // 继续读
        }
        
      } else {
        this.emit('end')
        this.destory()
      }
    })
  }

  open () {
    fs.open(this.path, this.flags, (er, fd) => {
      if (er) {
        return this.destory(er)
      }

      this.fd = fd
      this.emit('open', fd)

      // 源码中打开后就立即读取,这里改为构造器中发现监听 data 后读取。
    })
  }

  destory (er) {
    if (er) {
      return this.emit('error', er)
    }

    // 关闭
    const { fd } = this
    if (typeof fd === 'number' && this.autoClose) {
      fs.close(fd, () => {
        this.emit('close')
      })
    }
  }

  pause () {
    this.flowing = false 
  }

  resume () {
    if (!this.flowing) {
      // 恢复读取
      this.flowing = true
      this._read()
    }
  }
}

const readStream = new ReadStrem(path.resolve(__dirname, 'foo.md'), {
  flags: 'r',
  encoding: null,
  autoClose: true,
  start: 0,
  end: 100, // 从索引 0 的位置读取到索引 100 的位置
  highWaterMark: 3 // 每次读取 3 个字节
})

readStream.on('open', function (fd) {
  console.log(fd)
})

// 监听 data 事件,内部会不停的将数据发送出来。
const ary = []
readStream.on('data', function (chunk) {
  console.log(chunk.toString())
  ary.push(chunk)
  readStream.pause() // 暂停读取
  // 消费
})

// 文件读取完毕后触发
readStream.on('end', function () {
  console.log(Buffer.concat(ary).toString())
})

// close 事件需要等待读取完毕后才触发
readStream.on('close', function () {
  console.log('close')
})

readStream.on('error', function (er) {
  console.log(er)
})

setInterval(() => {
  readStream.resume() // 恢复读取
}, 1000)