为了账号安全,请及时绑定邮箱和手机立即绑定

节点 - 在管道之后正确关闭流

节点 - 在管道之后正确关闭流

千巷猫影 2022-06-16 11:00:10
假设我有以下代码:try {    let size = 0;    await pipeline(        fs.createReadStream('lowercase.txt'),        async function* (source) {            for await (const chunk of source) {                size += chunk.length;                           if (size >= 1000000) {                    throw new Error('File is too big');                }                yield String(chunk).toUpperCase();            }        },        fs.createWriteStream('uppercase.txt')    );    console.log('Pipeline succeeded.');} catch (error) {    console.log('got error:', error);}如何确保在每种情况下都正确关闭流?节点文档没有多大帮助——他们只是告诉我我将有悬空事件侦听器:stream.pipeline() 将在所有流上调用 stream.destroy(err),除了:发出“结束”或“关闭”的可读流。发出“完成”或“关闭”的可写流。调用回调后,stream.pipeline() 会在流上留下悬空的事件侦听器。在失败后重用流的情况下,这可能会导致事件侦听器泄漏和吞噬错误。
查看完整描述

2 回答

?
守着星空守着你

TA贡献1799条经验 获得超8个赞

因此,我发现许多 node.js 流复合操作,例如pipeline()和.pipe()在错误处理方面非常糟糕/不完整。例如,如果你这样做:


fs.createReadStream("input.txt")

  .pipe(fs.createWriteStream("output.txt"))

  .on('error', err => {

      console.log(err);

  }).on('finish', () => {

      console.log("all done");

  });

您会期望,如果打开 readStream 时出现错误,您会在此处的错误处理程序中收到该错误,但“否”并非如此。打开该输入文件的错误将未处理。这有一些逻辑,因为.pipe()返回输出流并且输入错误不是输出流上的错误,但是当它没有通过时,很容易错过输入流上的错误。该.pipe()操作可以侦听输入流上的错误并传递错误(即使它是 apipeErr或不同的东西),然后它也可以在读取错误时正确清理 writeStream。但是,.pipe()并没有那么彻底地实施。它似乎想假设输入流永远不会出错。


相反,您必须单独保存 readStream 对象并直接为其附加错误处理程序才能看到该错误。所以,我只是不再相信这种复合的东西,而且文档从来没有真正解释过如何进行正确的错误处理。我试图查看代码,pipeline()看看我是否能理解错误处理,但这并没有证明是一项富有成效的努力。


因此,您的特定问题似乎可以通过转换流来完成:


const fs = require('fs');

const { Transform } = require('stream');


const myTransform = new Transform({

    transform: function(chunk, encoding, callback) {

        let str = chunk.toString('utf8');

        this.push(str.toUpperCase());

        callback();

    }

});


function upperFile(input, output) {

    return new Promise((resolve, reject) => {

        // common function for cleaning up a partial output file

        function errCleanup(err) {

            fs.unlink(output, function(e) {

                if (e) console.log(e);

                reject(err);

            });

        }


        let inputStream = fs.createReadStream(input, {encoding: 'utf8'});

        let outputStream = fs.createWriteStream(output, {emitClose: true});


        // have to separately listen for read/open errors

        inputStream.on("error", err => {

            // have to manually close writeStream when there was an error reading

            if (outputStream) outputStream.destroy();

            errCleanup(err);

        });

        inputStream.pipe(myTransform)

            .pipe(outputStream)

            .on("error", errCleanup)

            .on("close", resolve);        

    });

}


// sample usage

upperFile("input.txt", "output.txt").then(() => {

    console.log("all done");

}).catch(err => {

    console.log("got error", err);

});

正如您所看到的,大约 2/3 的代码以稳健的方式处理错误(内置操作无法正确执行的部分)。


查看完整回答
反对 回复 2022-06-16
?
慕工程0101907

TA贡献1887条经验 获得超5个赞

  • pipe有那些问题

  • pipeline旨在解决所有问题,它确实

  • pipeline如果从头到尾都有所有部分,那就太好了,但如果没有:

    • 即将推出的节点 17 版本将具有stream.compose解决该问题的功能

    • 在此之前,流链库是一个不错的选择

长篇大论的回答:

公认的答案只是忽略了pipeline,但它是专门为解决这个问题而设计的。pipe绝对受到它的影响(更多下文),但我没有发现pipeline没有正确关闭文件、http 等周围的流的情况。带有随机 npm 包的 YMMV,但如果它具有closeordestroy功能以及on('error'事件,应该没问题。

为了演示,这会调用 shell 以查看我们的测试文件是否打开:

const listOpenFiles = async () => {

  const { stdout } = await promisify(exec)("lsof -c node | awk '{print $9}'");


  // only show our test files

  const openFiles = stdout.split('\n').filter((str) => str.endsWith('case.txt'));

  console.log('***** open files:\n', openFiles, '\n-------------');

};

如果您在上面示例中的循环内调用它:


for await (const chunk of source) {

  await listOpenFiles();

输出将不断重复:


***** open files:

[

  '/path/to/lowercase.txt',

  '/path/to/uppercase.txt'

]

如果你在 catch 之后再次调用它,你可以看到一切都关闭了。


***** open files:

 [] 

关于引用的文档:

文档在前 2 个要点中所指的pipeline是它不会关闭已经关闭的流,因为......好吧,它们已经关闭了。至于悬空的侦听器,它们确实留在传递给的各个流上pipeline。但是,在您的示例(典型案例)中,您无论如何都没有保留对各个流的引用;管道完成后,它们将立即被垃圾收集。例如,如果您经常引用其中一个,它会警告潜在的副作用。


// using this same instance over and over will end up with tons of dangling listeners

export const capitalizer = new Transform(// ...

相反,最好有“干净”的实例。现在生成器函数很容易链接,甚至根本就没有对转换的引用,但是您可以简单地创建一个返回新实例而不是具有常量实例的函数:


export const createCaptilizer = () => new Transform(// ...

简而言之,上面的例子在所有 3 点上都很好。


更多信息pipe

pipe,另一方面,确实存在上述传播问题。


const csvStream = (file) => {

  // does not expose file errors, nor clean up the file stream on parsing errors!!!

  return fs.createReadStream(file).pipe(createCsvTransform());

};

人们普遍认为这很痛苦/不直观,但现在改变它为时已晚。我尽量避免它,我pipeline尽可能推荐。但是,重要的是要注意这pipeline需要将所有部分放在一起。因此,例如,对于上述情况,您还需要最终Writable目标。pipe如果您只想构建链的一部分,您仍然必须在这种情况下使用。解决此问题的方法更容易单独推理:


const csvStream = (file) => {

  const fileStream = fs.createReadStream(file);

  const transform = createCsvTransform();

  // pass file errors forward

  fileStream.on('error', (error) => transform.emit('error', error));

  // close file stream on parsing errors

  transform.on('error', () => fileStream.close());


  return transform;

}

不过,也有好消息。它仍然是实验性的,但很快流将公开一个stream.compose功能。它具有 的所有传播/清理优势pipeline,但只是返回一个新流。本质上,这是大多数人认为会pipe做的事情。;)


// NO propagation or cleanup

readable.pipe(transform);


// automatic propagation and cleanup

stream.compose(readable, transform);

在此之前,请查看https://www.npmjs.com/package/stream-chain


关于pipeline和await

请注意,上面的示例使用await pipeline(//...,但链接的文档是同步版本的。这不会返回一个承诺,所以await什么都不做。从节点 15 开始,您通常需要stream/promises这里的 api:https ://nodejs.org/api/stream.html#streams-promises-api


import { pipeline } from 'stream/promises'; // NOT 'stream'

在节点 15 之前,您可以使用 util's 使其成为一个承诺promisify:


import { pipeline } from 'stream';

import { promisify } from 'util';


await promisify(pipeline)(// ...

或者,为了简化整个文件:


import * as stream from 'stream';

import { promisify } from 'util';


const pipeline = promisify(stream.pipeline);

我之所以提到这一点,是因为如果您使用await同步版本,它实际上不会在 之后完成try/catch,因此可能会给人一种错误的印象,即它实际上尚未完成时清理失败。


查看完整回答
反对 回复 2022-06-16
  • 2 回答
  • 0 关注
  • 189 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信