Appearance
Writable 构造器中:
通过 writing 标识是否正在写入,默认为 false。
通过 needDrain 标识是否触发 drain 事件,默认为 false。
通过 len 标识写入的字节数,默认为 0。
通过 cache 缓存写入到内存中的数据,默认为 []
write:
如果数据不是 buffer,先转为 buffer。之后 this.len += buffer.length 更新 len
通过 len 是否小于 highWaterMark 决定 write() 调用的返回值是 true 还是 false, 如果这个值是是 false,说明都写入完成后需要触发 drain 事件,将 needDrain 更新为 true,之后判断 writing 是否为 true,第一次肯定是 false,需要写入文件中, 将 writing 更新为 true,然后调用 _write 并传入 chunk, encoding, cb,后面再调用 write 时,发现 writing 为 true,需要写入内存中,直接 this.cache.push({ chunk, encoding, cb })
clearBuffer:
- clearBuffer 取出缓存中的第 0 项,如果有值,递归调用 ReadStream.prototype._write 并传入chunk, encoding, cb,cb 也会像之前那样包装,这样就能通过递归调用把缓存清空, 当缓存清空时,即缓存的第 0 项为假,就把 writing 更新为 false,再判断一下 needDrain 是否为真,为真就 this.emit('drain')。
ReadStream 构造器中
参数处理
通过 offset 控制写入文件的偏移,默认为 0。
调用 open() 方法
_write:
在调用 _write() 时会传入 chunk,encoding,cb,这里的 cb 并不是我们调用 write 时 定义的 cb,而是在 write 中包装后的那个函数,内部除了执行我们定义的 cb,还会执行 Writable.prototype.clearBuffer。
通过延迟执行获取到 fd 后,fs.write() 写入,回调中 this.len -= written 更新 len, this.offset += written 更新 offset,再调用 cb() 执行我们定义的回调和 clearBuffer。
js
const fs = require('fs')
const path = require('path')
const EventEmitter = require('events')
class Writable extends EventEmitter {
constructor (options) {
super()
this.flags = options.flags || 'w'
this.encoding = options.encoding || 'utf-8'
this.autoClose = options.autoClose || true
this.highWaterMark = options.highWaterMark || 16 * 1024
this.writing = false // 是否正在写入
this.needDrain = false // 是否需要触发 drain 事件
this.len = 0 // 要写入的长度
this.cache = []
}
write (chunk, encoding = this.encoding, cb = () => {}) {
if (!Buffer.isBuffer(chunk)) {
chunk = Buffer.from(chunk)
}
this.len += chunk.length
const res = this.len < this.highWaterMark
if (!res) {
// 达到或超过预期
this.needDrain = true
}
// 需要判断写入文件还是放入缓存中
if (this.writing) {
this.cache.push({
chunk,
encoding,
cb
})
} else {
this.writing = true
this._write(chunk, encoding, () => {
cb() // 执行用户的回调
this.clearBuffer() // 清空缓存
})
}
return res
}
clearBuffer () {
const data = this.cache.shift()
if (data) {
const { chunk, encoding, cb } = data
this._write(chunk, encoding, () => {
cb()
this.clearBuffer()
})
} else{
// 缓存中的内容也写完了
this.writing = false
if (this.needDrain) {
this.needDrain = false
this.emit('drain')
}
}
}
}
class WriteStream extends Writable {
constructor (path, options = {}) {
super(options)
this.path = path
this.flags = options.flags || 'w'
this.encoding = options.encoding || 'utf-8'
this.autoClose = options.autoClose || true
this.highWaterMark = options.highWaterMark || 16 * 1024
this.open()
this.offset = 0
}
_write (chunk, encoding, cb) {
if (typeof this.fd !== 'number') {
return this.once('open', () => {
this._write(chunk, encoding, cb)
})
}
fs.write(this.fd, chunk, 0, chunk.length, this.offset, (er, written) => {
this.len -= written
this.offset += written
cb()
})
}
open () {
fs.open(this.path, this.flags, (er, fd) => {
if (er) {
return this.emit('error', er)
}
this.fd = fd
this.emit('open', fd)
})
}
}
const ws = new WriteStream(path.resolve(__dirname, 'foo.md'), {
highWaterMark: 2
})
ws.on('open', fd => {
// console.log(fd)
})
const res1 = ws.write('a')
console.log(res1)
const res2 = ws.write('b')
console.log(res2)
ws.on('drain', () => {
console.log('drain')
})