Skip to content

Writable 构造器中:

  • 通过 writing 标识是否正在写入,默认为 false。

  • 通过 needDrain 标识是否触发 drain 事件,默认为 false。

  • 通过 len 标识写入的字节数,默认为 0。

  • 通过 cache 缓存写入到内存中的数据,默认为 []

write:

  • 如果数据不是 buffer,先转为 buffer。之后 this.len += buffer.length 更新 len

  • 通过 len 是否小于 highWaterMark 决定 write() 调用的返回值是 true 还是 false, 如果这个值是是 false,说明都写入完成后需要触发 drain 事件,将 needDrain 更新为 true,之后判断 writing 是否为 true,第一次肯定是 false,需要写入文件中, 将 writing 更新为 true,然后调用 _write 并传入 chunk, encoding, cb,后面再调用 write 时,发现 writing 为 true,需要写入内存中,直接 this.cache.push({ chunk, encoding, cb })

clearBuffer:

  • clearBuffer 取出缓存中的第 0 项,如果有值,递归调用 ReadStream.prototype._write 并传入chunk, encoding, cb,cb 也会像之前那样包装,这样就能通过递归调用把缓存清空, 当缓存清空时,即缓存的第 0 项为假,就把 writing 更新为 false,再判断一下 needDrain 是否为真,为真就 this.emit('drain')。

ReadStream 构造器中

  • 参数处理

  • 通过 offset 控制写入文件的偏移,默认为 0。

  • 调用 open() 方法

_write:

  • 在调用 _write() 时会传入 chunk,encoding,cb,这里的 cb 并不是我们调用 write 时 定义的 cb,而是在 write 中包装后的那个函数,内部除了执行我们定义的 cb,还会执行 Writable.prototype.clearBuffer。

  • 通过延迟执行获取到 fd 后,fs.write() 写入,回调中 this.len -= written 更新 len, this.offset += written 更新 offset,再调用 cb() 执行我们定义的回调和 clearBuffer。

js
const fs = require('fs')
const path = require('path')
const EventEmitter = require('events')

class Writable extends EventEmitter {
  constructor (options) {
    super()
    this.flags = options.flags || 'w'
    this.encoding = options.encoding || 'utf-8'
    this.autoClose = options.autoClose || true
    this.highWaterMark = options.highWaterMark || 16 * 1024
    this.writing = false // 是否正在写入
    this.needDrain = false // 是否需要触发 drain 事件
    this.len = 0 // 要写入的长度
    this.cache = []
  }

  write (chunk, encoding = this.encoding, cb = () => {}) {
    if (!Buffer.isBuffer(chunk)) {
      chunk = Buffer.from(chunk)
    }

    this.len += chunk.length
    const res = this.len < this.highWaterMark

    if (!res) {
      // 达到或超过预期
      this.needDrain = true
    }

    // 需要判断写入文件还是放入缓存中
    if (this.writing) {
      this.cache.push({
        chunk,
        encoding,
        cb
      })
    } else {
      this.writing = true
      this._write(chunk, encoding, () => {
        cb() // 执行用户的回调
        this.clearBuffer() // 清空缓存
      })
    }

    return res
  }

  clearBuffer () {
    const data = this.cache.shift()
    
    if (data) {
      const { chunk, encoding, cb } = data
      this._write(chunk, encoding, () => {
        cb()
        this.clearBuffer()
      })
    } else{
      // 缓存中的内容也写完了
      this.writing = false
      if (this.needDrain) {
        this.needDrain = false
        this.emit('drain')
      }
    }
  }
}

class WriteStream extends Writable {
  constructor (path, options = {}) {
    super(options)
    this.path = path
    this.flags = options.flags || 'w'
    this.encoding = options.encoding || 'utf-8'
    this.autoClose = options.autoClose || true
    this.highWaterMark = options.highWaterMark || 16 * 1024

    this.open()
    this.offset = 0
  }

  _write (chunk, encoding, cb) {
    if (typeof this.fd !== 'number') {
      return this.once('open', () => {
        this._write(chunk, encoding, cb)
      })
    }

    fs.write(this.fd, chunk, 0, chunk.length, this.offset, (er, written) => {
      this.len -= written
      this.offset += written
      cb()
    })
  }

  open () {
    fs.open(this.path, this.flags, (er, fd) => {
      if (er) {
        return this.emit('error', er)
      }

      this.fd = fd
      this.emit('open', fd)
    })
  }
}


const ws = new WriteStream(path.resolve(__dirname, 'foo.md'), {
  highWaterMark: 2
})

ws.on('open', fd => {
  // console.log(fd)
})

const res1 = ws.write('a')
console.log(res1)
const res2 = ws.write('b')
console.log(res2)
ws.on('drain', () => {
  console.log('drain')
})