Appearance
ReadStream 类继承了 Readable 类,Readable 类继承了 EventEmitter 类,Readable 只是 提供了一个骨架。
rm.read() => Readable.prototype.read() => ReadStream.prototype._read()
ReadStream 构造器中:
参数处理
通过 offset 控制写入内存的偏移,默认为 0。
通过 flowing 来控制 _read 的调用,默认为 false。
调用 open() 方法,内部 this.emit('open', fd),源码中内部还会直接 fs.read() 读取 文件内容,这里优化成当用户监听 data 时再开始读取文件内容,因此构造器中监听 newListener,当 type 为 data 时 flowing 更新为 true 并调用 _read()
open:
- 内部调用 fs.open(),fs.open() 回调中 this.fd = fd,this.emit('open', fd)
_read:
- 订阅 data 后的回调中调用 _read(),由于 fs.open() 是异步,这里是同步,导致 _read 中 this.fd 为 undefined,通过 immediate 或者 once 解决。 再次执行时 _read 时 通过 fs.read() 读取内容,如果没配置 end 读取大小就是 highWaterMark,配置了 end 就是 Math.min(this.highWaterMark, this.end - this.offset + 1) 将读取到的内容通过 this.emit('data', buffer.slice(0, bytesRead)) 发布出去, 之后 this.offset += bytesRead 更新 offset,再判断 this.flowing 是否为 true, 为 true 则递归调用 _read() 进行下一轮读取。当读取不到内容时 this.emit('end') 并调用 destory()。
destory:
如果参数 er 有值,就 this.emit('error', er)。
否则通过 typeof fd === 'number' && autoClose 决定是否调用 fs.close()
pause:
- this.flowing = false
resume:
- 如果 this.flowing 为 false,this.flowing = true,this._read() 恢复读取。
js
const EventEmitter = require('events')
class ReadStrem extends EventEmitter {
constructor (path, options = {}) {
super()
// 处理参数
this.path = path
this.flags = options.flags || 'r'
this.encoding = options.encoding || null
this.autoClose = options.autoClose || true
this.start = options.start || 0
this.end = options.end || undefined
this.highWaterMark = options.highWaterMark || 64 * 1024
this.offset = 0
this.flowing = false // 默认不是流动模式
// 打开文件,内部调用 fs.open(),是异步的,没法立即获取 fd。
this.open()
// 用户可能直接监听 data,此时 fd 为 undefined,延迟执行。
this.on('newListener', type => {
if (type === 'data') {
// 真正的读取方法,源码中触发 Readable.read 方法,内部会调用子类自己实现的
// _read 方法
this.flowing = true // 流动模式
this._read()
}
})
}
_read () {
// this.fd 是 undefined,open 是 I/O,在 poll 中,_read 是同步,拿不到 fd。
// 放入 process.nextTick 也不行,执行完一个宏任务才开始清空所有微任务。
// 放入定时器中不能保证一定在 I/O 执行完之前放入队列,可能走的更慢。
// 监听 open,能保证拿到 fd。
if (typeof this.fd !== 'number') {
// 等 open 触发了,再调用一次 _read,相当于延迟了 _read 操作。
return this.once('open', () => this._read())
}
// 或者
// if (typeof this.fd !== 'number') {
// setImmediate(() => {
// this._read()
// })
// return
// }
// 如果传了 end,就不一定是读取全部,所以不能直接用 highWaterMark
// 123 456 789 1 => 没传 end
// 123 456 78 => end 为 7
const howMuchToRead = this.end
? Math.min(this.highWaterMark, this.end - this.offset + 1)
: this.highWaterMark
// Buffer 不能复用,不然后面写入的数据会覆盖之前的。
const buffer = Buffer.alloc(howMuchToRead)
fs.read(this.fd, buffer, 0, howMuchToRead, this.offset, (er, bytesRead) => {
if (er) {
this.destory(er)
}
if (bytesRead > 0) {
this.emit('data', buffer.slice(0, bytesRead))
this.offset += bytesRead
if (this.flowing) {
this._read() // 继续读
}
} else {
this.emit('end')
this.destory()
}
})
}
open () {
fs.open(this.path, this.flags, (er, fd) => {
if (er) {
return this.destory(er)
}
this.fd = fd
this.emit('open', fd)
// 源码中打开后就立即读取,这里改为构造器中发现监听 data 后读取。
})
}
destory (er) {
if (er) {
return this.emit('error', er)
}
// 关闭
const { fd } = this
if (typeof fd === 'number' && this.autoClose) {
fs.close(fd, () => {
this.emit('close')
})
}
}
pause () {
this.flowing = false
}
resume () {
if (!this.flowing) {
// 恢复读取
this.flowing = true
this._read()
}
}
}
const readStream = new ReadStrem(path.resolve(__dirname, 'foo.md'), {
flags: 'r',
encoding: null,
autoClose: true,
start: 0,
end: 100, // 从索引 0 的位置读取到索引 100 的位置
highWaterMark: 3 // 每次读取 3 个字节
})
readStream.on('open', function (fd) {
console.log(fd)
})
// 监听 data 事件,内部会不停的将数据发送出来。
const ary = []
readStream.on('data', function (chunk) {
console.log(chunk.toString())
ary.push(chunk)
readStream.pause() // 暂停读取
// 消费
})
// 文件读取完毕后触发
readStream.on('end', function () {
console.log(Buffer.concat(ary).toString())
})
// close 事件需要等待读取完毕后才触发
readStream.on('close', function () {
console.log('close')
})
readStream.on('error', function (er) {
console.log(er)
})
setInterval(() => {
readStream.resume() // 恢复读取
}, 1000)