为了账号安全,请及时绑定邮箱和手机立即绑定

在不耗尽内存的情况下生成打印数据帧

在不耗尽内存的情况下生成打印数据帧

守着星空守着你 2022-08-03 16:24:32
如何在Java中打印整个数据帧而不会耗尽内存?Dataset<Row> df = ...我知道那件事:df.show() 将显示数据帧,但是对于足够大的数据帧,这可能会耗尽内存。我知道我可以使用以下内容限制内容:df.show(rowCount, false)但是想要打印整个数据帧,我不想限制内容...我试过:df.foreachPartition(iter -> {    while(iter.hasNext()){       System.out.println(rowIter.next().mkString(",");)     }});但这将打印在每个相应的节点上,而不是在驱动程序上...如果有什么方法可以打印驱动程序中的所有内容而不会耗尽内存?
查看完整描述

2 回答

?
小怪兽爱吃肉

TA贡献1852条经验 获得超1个赞

AFAIK,打印数据框的想法是查看数据。

不建议根据内存不足的数据帧大小打印大型数据帧。

我会提供以下方法,如果你想看到内容,那么你可以保存在hive表中并查询内容。或写入可读的csv或json

例子:

1) 保存在蜂巢表中

df.write.mode("overwrite").saveAsTable("database.tableName")

稍后从配置单元表查询。

2) csv 或 json

df.write.csv("/your/location/data.csv")
 df.write.json("/your/location/data.json")

如果您希望使用单个文件,上述内容将生成多个零件文件(但这会再次将数据移动到一个节点,除非您绝对需要它,否则不鼓励这样做)coalesce(1)

另一种选择是使用localIterator逐行打印,请参阅此处,这也将数据传输到节点...因此它不是一个好主意


查看完整回答
反对 回复 2022-08-03
?
杨魅力

TA贡献1811条经验 获得超6个赞

您将不得不将所有数据带到驱动程序,这将:(一点地占用您的内存...


解决方案可能是拆分数据帧并在驱动程序中逐个打印。当然,这取决于数据本身的结构,它看起来像这样:


long count = df.count();

long inc = count / 10;

for (long i = 0; i < count; i += inc) {

  Dataset<Row> filteredDf =

      df.where("id>=" + i + " AND id<" + (i + inc));


  List<Row> rows = filteredDf.collectAsList();

  for (Row r : rows) {

    System.out.printf("%d: %s\n", r.getAs(0), r.getString(1));

  }

}

我将数据集拆分为10,但我知道我的id从1到100...


完整的示例可以是:


package net.jgp.books.sparkWithJava.ch20.lab900_splitting_dataframe;


import java.util.ArrayList;

import java.util.List;


import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.RowFactory;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.types.DataTypes;

import org.apache.spark.sql.types.StructField;

import org.apache.spark.sql.types.StructType;


/**

 * Splitting a dataframe to bring it back to the driver for local

 * processing.

 * 

 * @author jgp

 */

public class SplittingDataframeApp {


  /**

   * main() is your entry point to the application.

   * 

   * @param args

   */

  public static void main(String[] args) {

    SplittingDataframeApp app = new SplittingDataframeApp();

    app.start();

  }


  /**

   * The processing code.

   */

  private void start() {

    // Creates a session on a local master

    SparkSession spark = SparkSession.builder()

        .appName("Splitting a dataframe to collect it")

        .master("local")

        .getOrCreate();


    Dataset<Row> df = createRandomDataframe(spark);

    df = df.cache();


    df.show();

    long count = df.count();

    long inc = count / 10;

    for (long i = 0; i < count; i += inc) {

      Dataset<Row> filteredDf =

          df.where("id>=" + i + " AND id<" + (i + inc));


      List<Row> rows = filteredDf.collectAsList();

      for (Row r : rows) {

        System.out.printf("%d: %s\n", r.getAs(0), r.getString(1));

      }

    }

  }


  private static Dataset<Row> createRandomDataframe(SparkSession spark) {

    StructType schema = DataTypes.createStructType(new StructField[] {

        DataTypes.createStructField(

            "id",

            DataTypes.IntegerType,

            false),

        DataTypes.createStructField(

            "value",

            DataTypes.StringType,

            false) });


    List<Row> rows = new ArrayList<Row>();

    for (int i = 0; i < 100; i++) {

      rows.add(RowFactory.create(i, "Row #" + i));

    }

    Dataset<Row> df = spark.createDataFrame(rows, schema);

    return df;

  }

}

你认为这可以帮助吗?


它不像将其保存在数据库中那样优雅,但它可以避免在体系结构中使用其他组件。这段代码不是很通用,我不确定你能在当前版本的Spark中使它成为通用的。


查看完整回答
反对 回复 2022-08-03
  • 2 回答
  • 0 关注
  • 112 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信