为了账号安全,请及时绑定邮箱和手机立即绑定

向 DataFrame 添加一个新列,其文字值类型为 set

向 DataFrame 添加一个新列,其文字值类型为 set

动漫人物 2021-12-10 16:51:34
Map<File, Dataset<Row> allWords = ...StructField[] structFields = new StructField[] {        new StructField("word", DataTypes.StringType, false, Metadata.empty()),        new StructField("count", DataTypes.IntegerType, false, Metadata.empty()),        new StructField("files", ???, false, Metadata.empty())};StructType structType = new StructType(structFields);Dataset<Row> allFilesWords = spark.createDataFrame(new ArrayList<>(), structType);for (Map.Entry<File, Dataset<Row>> entry : allWords.entrySet()) {    Integer fileIndex = files.indexOf(entry.getKey());    allFilesWords.unionAll(            allWords.get(entry.getKey()).withColumn("files", ???)    );}在上面给定的代码中,allWords表示从文件到其字数 ( Row: (string, integer))的映射。现在,我想将所有文件的结果聚合到一个 DataFrame 中,同时保留提到该单词的原始文件。由于最后,每个单词可能在多个文件中都被提到过,因此该files列设计为整数类型集(假设文件被映射为整数)。现在,我正在尝试向allWordsDataFrame添加一个新列,然后使用unionAll将它们全部合并在一起。但我不知道如何files使用仅包含一个 item 的 set来定义和初始化新列(此处命名)fileIndex。感谢评论中提供的链接,我知道我应该使用functions.typedLit但此函数要求提供第二个参数,我不知道该提供什么。另外,我不知道如何定义列。最后一件事,提供的链接是 Python 中的,我正在寻找 Java API。
查看完整描述

1 回答

?
慕侠2389804

TA贡献1719条经验 获得超6个赞

我自己找到了解决方案(在一些帮助下):


Map<File, Dataset<Row> allWords = ...

StructField[] structFields = new StructField[] {

        new StructField("word", DataTypes.StringType, false, Metadata.empty()),

        new StructField("count", DataTypes.IntegerType, false, Metadata.empty()),

        new StructField("files", DataTypes.createArrayType(DataTypes.IntegerType), true, Metadata.empty())

};

StructType structType = new StructType(structFields);


Dataset<Row> allFilesWords = spark.createDataFrame(new ArrayList<>(), structType);

for (Map.Entry<File, Dataset<Row>> entry : allWords.entrySet()) {

    Integer fileIndex = files.indexOf(entry.getKey());

    allFilesWords.unionAll(

            allWords.get(entry.getKey())

                    .withColumn("files", functions.typedLit(seq, MyTypeTags.SeqInteger()))

    );

}

问题是这TypeTag是来自 Scala 的编译时工件,根据我在另一个问题中得到的内容,它需要由 Scala 编译器生成,而您无法在 Java 中生成一个。因此,我必须TypeTag在 Scala 文件中编写自定义数据结构并将其添加到我的 Maven Java 项目中。为此,我关注了这篇文章。


这是我的MyTypeTags.scala文件:


import scala.reflect.runtime.universe._


object MyTypeTags {

  val SeqInteger = typeTag[Seq[Integer]]

}


查看完整回答
反对 回复 2021-12-10
  • 1 回答
  • 0 关注
  • 216 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信