Appearance
先读取 => 调用 write(),如果超过预期,pause() => 写入完成后触发 drain => resume() 恢复读取 => 周而复始,从而实现边读边写。
js
const fs = require('fs')
const path = require('path')
const readStream = fs.createReadStream(path.resolve(__dirname, 'foo.md'), {
highWaterMark: 4
})
const writeStream = fs.createWriteStream(path.resolve(__dirname, 'bar.md'), {
highWaterMark: 1
})
readStream.on('data', chunk => {
const flag = writeStream.write(chunk)
if (!flag) {
readStream.pause()
}
})
writeStream.on('drain', () => {
readStream.resume()
})pipe
js
const fs = require('fs')
const path = require('path')
const EventEmitter = require('events')
class ReadStream extends EventEmitter {
constructor (path, options = {}) {
super()
// 处理参数
this.path = path
this.flags = options.flags || 'r'
this.encoding = options.encoding || null
this.autoClose = options.autoClose || true
this.start = options.start || 0
this.end = options.end || undefined
this.highWaterMark = options.highWaterMark || 64 * 1024
this.offset = 0
this.flowing = false // 默认不是流动模式
// 打开文件,内部调用 fs.open(),是异步的,没法立即获取 fd。
this.open()
// 用户可能直接监听 data,此时 fd 为 undefined,延迟执行。
this.on('newListener', type => {
if (type === 'data') {
// 真正的读取方法,源码中触发 Readable.read 方法,内部会调用子类自己实现的
// _read 方法
this.flowing = true // 流动模式
this._read()
}
})
}
_read () {
// this.fd 是 undefined,open 是 I/O,在 poll 中,_read 是同步,拿不到 fd。
// 放入 process.nextTick 也不行,执行完一个宏任务才开始清空所有微任务。
// 放入定时器中不能保证一定在 I/O 执行完之前放入队列,可能走的更慢。
// 监听 open,能保证拿到 fd。
if (typeof this.fd !== 'number') {
// 等 open 触发了,再调用一次 _read,相当于延迟了 _read 操作。
return this.once('open', () => this._read())
}
// 如果传了 end,就不一定是读取全部,所以不能直接用 highWaterMark
// 123 456 789 1 => 没传 end
// 123 456 78 => end 为 7
const howMuchToRead = this.end
? Math.min(this.highWaterMark, this.end - this.offset + 1)
: this.highWaterMark
// Buffer 不能复用,不然后面写入的数据会覆盖之前的。
const buffer = Buffer.alloc(howMuchToRead)
fs.read(this.fd, buffer, 0, howMuchToRead, this.offset, (er, bytesRead) => {
if (er) {
this.destory(er)
}
if (bytesRead > 0) {
this.emit('data', buffer.slice(0, bytesRead))
this.offset += bytesRead
if (this.flowing) {
this._read() // 继续读
}
} else {
this.emit('end')
this.destory()
}
})
}
open () {
fs.open(this.path, this.flags, (er, fd) => {
if (er) {
return this.destory(er)
}
this.fd = fd
this.emit('open', fd)
// 源码中打开后就立即读取,这里改为构造器中发现监听 data 后读取。
})
}
destory (er) {
if (er) {
return this.emit('error', er)
}
// 关闭
const { fd } = this
if (typeof fd === 'number' && this.autoClose) {
fs.close(fd, () => {
this.emit('close')
})
}
}
pause () {
this.flowing = false
}
resume () {
if (!this.flowing) {
// 恢复读取
this.flowing = true
this._read()
}
}
pipe (writeStream) {
this.on('data', chunk => {
const flag = writeStream.write(chunk)
if (!flag) {
this.pause()
}
})
writeStream.on('drain', () => {
this.resume()
})
}
}
const writeStream = fs.createWriteStream(path.resolve(__dirname, 'bar.md'), {
highWaterMark: 1
})
const readStream = new ReadStream(path.resolve(__dirname, 'foo.md'), {
highWaterMark: 3
})
readStream.pipe(writeStream)