Skip to content

先读取 => 调用 write(),如果超过预期,pause() => 写入完成后触发 drain => resume() 恢复读取 => 周而复始,从而实现边读边写。

js
const fs = require('fs')
const path = require('path')

const readStream = fs.createReadStream(path.resolve(__dirname, 'foo.md'), {
  highWaterMark: 4
})

const writeStream = fs.createWriteStream(path.resolve(__dirname, 'bar.md'), {
  highWaterMark: 1
})

readStream.on('data', chunk => {
  const flag = writeStream.write(chunk)

  if (!flag) {
    readStream.pause()
  }
})

writeStream.on('drain', () => {
  readStream.resume()
})

pipe

js
const fs = require('fs')
const path = require('path')
const EventEmitter = require('events')

class ReadStream extends EventEmitter {
  constructor (path, options = {}) {
    super()
    
    // 处理参数
    this.path = path
    this.flags = options.flags || 'r'
    this.encoding = options.encoding || null
    this.autoClose = options.autoClose || true
    this.start = options.start || 0
    this.end = options.end || undefined
    this.highWaterMark = options.highWaterMark || 64 * 1024
    this.offset = 0
    this.flowing = false // 默认不是流动模式

    // 打开文件,内部调用 fs.open(),是异步的,没法立即获取 fd。
    this.open()
    // 用户可能直接监听 data,此时 fd 为 undefined,延迟执行。
    this.on('newListener', type => {
      if (type === 'data') {
        // 真正的读取方法,源码中触发 Readable.read 方法,内部会调用子类自己实现的
        // _read 方法
        this.flowing = true // 流动模式
        this._read()
      }
    })
  }

  _read () {
    // this.fd 是 undefined,open 是 I/O,在 poll 中,_read 是同步,拿不到 fd。
    // 放入 process.nextTick 也不行,执行完一个宏任务才开始清空所有微任务。
    // 放入定时器中不能保证一定在 I/O 执行完之前放入队列,可能走的更慢。
    // 监听 open,能保证拿到 fd。
    if (typeof this.fd !== 'number') {
      // 等 open 触发了,再调用一次 _read,相当于延迟了 _read 操作。 
      return this.once('open', () => this._read())
    }
    
    // 如果传了 end,就不一定是读取全部,所以不能直接用 highWaterMark
    // 123 456 789 1 => 没传 end
    // 123 456 78    => end 为 7
    const howMuchToRead = this.end
      ? Math.min(this.highWaterMark, this.end - this.offset + 1)
      : this.highWaterMark
    // Buffer 不能复用,不然后面写入的数据会覆盖之前的。
    const buffer = Buffer.alloc(howMuchToRead)
    fs.read(this.fd, buffer, 0, howMuchToRead, this.offset, (er, bytesRead) => {
      if (er) {
        this.destory(er)
      }

      if (bytesRead > 0) {
        this.emit('data', buffer.slice(0, bytesRead))
        this.offset += bytesRead
        
        if (this.flowing) {
          this._read() // 继续读
        }
        
      } else {
        this.emit('end')
        this.destory()
      }
    })
  }

  open () {
    fs.open(this.path, this.flags, (er, fd) => {
      if (er) {
        return this.destory(er)
      }

      this.fd = fd
      this.emit('open', fd)

      // 源码中打开后就立即读取,这里改为构造器中发现监听 data 后读取。
    })
  }

  destory (er) {
    if (er) {
      return this.emit('error', er)
    }

    // 关闭
    const { fd } = this
    if (typeof fd === 'number' && this.autoClose) {
      fs.close(fd, () => {
        this.emit('close')
      })
    }
  }

  pause () {
    this.flowing = false 
  }

  resume () {
    if (!this.flowing) {
      // 恢复读取
      this.flowing = true
      this._read()
    }
  }

  pipe (writeStream) {
    this.on('data', chunk => {
      const flag = writeStream.write(chunk)
    
      if (!flag) {
        this.pause()
      }
    })
    
    writeStream.on('drain', () => {
      this.resume()
    })
  }
}

const writeStream = fs.createWriteStream(path.resolve(__dirname, 'bar.md'), {
  highWaterMark: 1 
})

const readStream = new ReadStream(path.resolve(__dirname, 'foo.md'), {
  highWaterMark: 3 
})

readStream.pipe(writeStream)