2 回答
TA贡献1799条经验 获得超8个赞
因此,我发现许多 node.js 流复合操作,例如pipeline()和.pipe()在错误处理方面非常糟糕/不完整。例如,如果你这样做:
fs.createReadStream("input.txt")
.pipe(fs.createWriteStream("output.txt"))
.on('error', err => {
console.log(err);
}).on('finish', () => {
console.log("all done");
});
您会期望,如果打开 readStream 时出现错误,您会在此处的错误处理程序中收到该错误,但“否”并非如此。打开该输入文件的错误将未处理。这有一些逻辑,因为.pipe()返回输出流并且输入错误不是输出流上的错误,但是当它没有通过时,很容易错过输入流上的错误。该.pipe()操作可以侦听输入流上的错误并传递错误(即使它是 apipeErr或不同的东西),然后它也可以在读取错误时正确清理 writeStream。但是,.pipe()并没有那么彻底地实施。它似乎想假设输入流永远不会出错。
相反,您必须单独保存 readStream 对象并直接为其附加错误处理程序才能看到该错误。所以,我只是不再相信这种复合的东西,而且文档从来没有真正解释过如何进行正确的错误处理。我试图查看代码,pipeline()看看我是否能理解错误处理,但这并没有证明是一项富有成效的努力。
因此,您的特定问题似乎可以通过转换流来完成:
const fs = require('fs');
const { Transform } = require('stream');
const myTransform = new Transform({
transform: function(chunk, encoding, callback) {
let str = chunk.toString('utf8');
this.push(str.toUpperCase());
callback();
}
});
function upperFile(input, output) {
return new Promise((resolve, reject) => {
// common function for cleaning up a partial output file
function errCleanup(err) {
fs.unlink(output, function(e) {
if (e) console.log(e);
reject(err);
});
}
let inputStream = fs.createReadStream(input, {encoding: 'utf8'});
let outputStream = fs.createWriteStream(output, {emitClose: true});
// have to separately listen for read/open errors
inputStream.on("error", err => {
// have to manually close writeStream when there was an error reading
if (outputStream) outputStream.destroy();
errCleanup(err);
});
inputStream.pipe(myTransform)
.pipe(outputStream)
.on("error", errCleanup)
.on("close", resolve);
});
}
// sample usage
upperFile("input.txt", "output.txt").then(() => {
console.log("all done");
}).catch(err => {
console.log("got error", err);
});
正如您所看到的,大约 2/3 的代码以稳健的方式处理错误(内置操作无法正确执行的部分)。
TA贡献1887条经验 获得超5个赞
pipe
有那些问题pipeline
旨在解决所有问题,它确实pipeline
如果从头到尾都有所有部分,那就太好了,但如果没有:即将推出的节点 17 版本将具有
stream.compose
解决该问题的功能在此之前,流链库是一个不错的选择
长篇大论的回答:
公认的答案只是忽略了pipeline
,但它是专门为解决这个问题而设计的。pipe
绝对受到它的影响(更多下文),但我没有发现pipeline
没有正确关闭文件、http 等周围的流的情况。带有随机 npm 包的 YMMV,但如果它具有close
ordestroy
功能以及on('error'
事件,应该没问题。
为了演示,这会调用 shell 以查看我们的测试文件是否打开:
const listOpenFiles = async () => {
const { stdout } = await promisify(exec)("lsof -c node | awk '{print $9}'");
// only show our test files
const openFiles = stdout.split('\n').filter((str) => str.endsWith('case.txt'));
console.log('***** open files:\n', openFiles, '\n-------------');
};
如果您在上面示例中的循环内调用它:
for await (const chunk of source) {
await listOpenFiles();
输出将不断重复:
***** open files:
[
'/path/to/lowercase.txt',
'/path/to/uppercase.txt'
]
如果你在 catch 之后再次调用它,你可以看到一切都关闭了。
***** open files:
[]
关于引用的文档:
文档在前 2 个要点中所指的pipeline是它不会关闭已经关闭的流,因为......好吧,它们已经关闭了。至于悬空的侦听器,它们确实留在传递给的各个流上pipeline。但是,在您的示例(典型案例)中,您无论如何都没有保留对各个流的引用;管道完成后,它们将立即被垃圾收集。例如,如果您经常引用其中一个,它会警告潜在的副作用。
// using this same instance over and over will end up with tons of dangling listeners
export const capitalizer = new Transform(// ...
相反,最好有“干净”的实例。现在生成器函数很容易链接,甚至根本就没有对转换的引用,但是您可以简单地创建一个返回新实例而不是具有常量实例的函数:
export const createCaptilizer = () => new Transform(// ...
简而言之,上面的例子在所有 3 点上都很好。
更多信息pipe
pipe,另一方面,确实存在上述传播问题。
const csvStream = (file) => {
// does not expose file errors, nor clean up the file stream on parsing errors!!!
return fs.createReadStream(file).pipe(createCsvTransform());
};
人们普遍认为这很痛苦/不直观,但现在改变它为时已晚。我尽量避免它,我pipeline尽可能推荐。但是,重要的是要注意这pipeline需要将所有部分放在一起。因此,例如,对于上述情况,您还需要最终Writable目标。pipe如果您只想构建链的一部分,您仍然必须在这种情况下使用。解决此问题的方法更容易单独推理:
const csvStream = (file) => {
const fileStream = fs.createReadStream(file);
const transform = createCsvTransform();
// pass file errors forward
fileStream.on('error', (error) => transform.emit('error', error));
// close file stream on parsing errors
transform.on('error', () => fileStream.close());
return transform;
}
不过,也有好消息。它仍然是实验性的,但很快流将公开一个stream.compose功能。它具有 的所有传播/清理优势pipeline,但只是返回一个新流。本质上,这是大多数人认为会pipe做的事情。;)
// NO propagation or cleanup
readable.pipe(transform);
// automatic propagation and cleanup
stream.compose(readable, transform);
在此之前,请查看https://www.npmjs.com/package/stream-chain
关于pipeline和await
请注意,上面的示例使用await pipeline(//...,但链接的文档是同步版本的。这不会返回一个承诺,所以await什么都不做。从节点 15 开始,您通常需要stream/promises这里的 api:https ://nodejs.org/api/stream.html#streams-promises-api
import { pipeline } from 'stream/promises'; // NOT 'stream'
在节点 15 之前,您可以使用 util's 使其成为一个承诺promisify:
import { pipeline } from 'stream';
import { promisify } from 'util';
await promisify(pipeline)(// ...
或者,为了简化整个文件:
import * as stream from 'stream';
import { promisify } from 'util';
const pipeline = promisify(stream.pipeline);
我之所以提到这一点,是因为如果您使用await同步版本,它实际上不会在 之后完成try/catch,因此可能会给人一种错误的印象,即它实际上尚未完成时清理失败。
添加回答
举报