为了账号安全,请及时绑定邮箱和手机立即绑定

使用 C# 读取数百万个小文件

使用 C# 读取数百万个小文件

PHP
宝慕林4294392 2024-01-20 21:03:41
我有数百万个每天生成的日志文件,我需要读取所有这些文件并将其放在一起作为单个文件,以便在其他应用程序中对其进行一些处理。我正在寻找最快的方法来做到这一点。目前我正在使用线程、任务和并行,如下所示:Parallel.For(0, files.Length, new ParallelOptions { MaxDegreeOfParallelism = 100 }, i =>{    ReadFiles(files[i]);});void ReadFiles(string file){    try    {        var txt = File.ReadAllText(file);        filesTxt.Add(tmp);    }    catch { }    GlobalCls.ThreadNo--;}或者foreach (var file in files){    //Int64 index = i;    //var file = files[index];    while (Process.GetCurrentProcess().Threads.Count > 100)    {         Thread.Sleep(100);        Application.DoEvents();    }    new Thread(() => ReadFiles(file)).Start();    GlobalCls.ThreadNo++;    // Task.Run(() => ReadFiles(file));      }问题是,读取几千个文件后,读取速度越来越慢!知道为什么吗?读取数百万个小文件的最快方法是什么?谢谢。
查看完整描述

3 回答

?
呼如林

TA贡献1798条经验 获得超3个赞

看起来您正在将所有文件的内容加载到内存中,然后再将它们写回单个文件。这可以解释为什么这个过程随着时间的推移变得更慢。

优化该过程的一种方法是将读取部分与写入部分分开,并并行进行。这称为生产者-消费者模式。Parallel它可以使用类、线程或任务来实现,但我将演示基于强大的TPL 数据流库的实现,该库特别适合此类作业。

private static async Task MergeFiles(IEnumerable<string> sourceFilePaths,

    string targetFilePath, CancellationToken cancellationToken = default,

    IProgress<int> progress = null)

{

    var readerBlock = new TransformBlock<string, string>(async filePath =>

    {

        return File.ReadAllText(filePath); // Read the small file

    }, new ExecutionDataflowBlockOptions()

    {

        MaxDegreeOfParallelism = 2, // Reading is parallelizable

        BoundedCapacity = 100, // No more than 100 file-paths buffered

        CancellationToken = cancellationToken, // Cancel at any time

    });


    StreamWriter streamWriter = null;


    int filesProcessed = 0;

    var writerBlock = new ActionBlock<string>(text =>

    {

        streamWriter.Write(text); // Append to the target file

        filesProcessed++;

        if (filesProcessed % 10 == 0) progress?.Report(filesProcessed);

    }, new ExecutionDataflowBlockOptions()

    {

        MaxDegreeOfParallelism = 1, // We can't parallelize the writer

        BoundedCapacity = 100, // No more than 100 file-contents buffered

        CancellationToken = cancellationToken, // Cancel at any time

    });


    readerBlock.LinkTo(writerBlock,

        new DataflowLinkOptions() { PropagateCompletion = true });


    // This is a tricky part. We use BoundedCapacity, so we must propagate manually

    // a possible failure of the writer to the reader, otherwise a deadlock may occur.

    PropagateFailure(writerBlock, readerBlock);


    // Open the output stream

    using (streamWriter = new StreamWriter(targetFilePath))

    {

        // Feed the reader with the file paths

        foreach (var filePath in sourceFilePaths)

        {

            var accepted = await readerBlock.SendAsync(filePath,

                cancellationToken); // Cancel at any time

            if (!accepted) break; // This will happen if the reader fails

        }

        readerBlock.Complete();

        await writerBlock.Completion;

    }


    async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2)

    {

        try { await block1.Completion.ConfigureAwait(false); }

        catch (Exception ex)

        {

            if (block1.Completion.IsCanceled) return; // On cancellation do nothing

            block2.Fault(ex);

        }

    }

}

使用示例:


var cts = new CancellationTokenSource();

var progress = new Progress<int>(value =>

{

    // Safe to update the UI

    Console.WriteLine($"Files processed: {value:#,0}");

});

var sourceFilePaths = Directory.EnumerateFiles(@"C:\SourceFolder", "*.log",

    SearchOption.AllDirectories); // Include subdirectories

await MergeFiles(sourceFilePaths, @"C:\AllLogs.log", cts.Token, progress);

BoundedCapacity用于控制内存使用。

如果磁盘驱动器是SSD,您可以尝试使用MaxDegreeOfParallelism大于2的值读取。

为了获得最佳性能,您可以考虑写入与包含源文件的驱动器不同的磁盘驱动器。

TPL 数据流库可作为.NET Framework 的包提供,并且内置于 .NET Core。


查看完整回答
反对 回复 2024-01-20
?
智慧大石

TA贡献1946条经验 获得超3个赞

当涉及到IO操作时,CPU并行是没有用的。您的 IO 设备(磁盘、网络等)是您的瓶颈。同时从设备读取数据可能会降低性能。



查看完整回答
反对 回复 2024-01-20
?
MYYA

TA贡献1868条经验 获得超4个赞

也许您可以只使用 PowerShell 来连接文件,

另一种替代方法是编写一个程序,使用FileSystemWatcher类来监视新文件并在创建时追加它们。


查看完整回答
反对 回复 2024-01-20
  • 3 回答
  • 0 关注
  • 225 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信