为了账号安全,请及时绑定邮箱和手机立即绑定

解析 Pyspark 数据框中的 XML 列

解析 Pyspark 数据框中的 XML 列

qq_笑_17 2023-05-23 19:12:10
我对 PySpark 比较陌生,正在尝试解决数据问题。我有一个 pyspark DF,它是用从 MS SQL Server 中提取的数据创建的,有 2 列:ID(整数)和 XMLMsg(字符串)。第 2 列 XMLMsg 包含 XML 格式的数据。目标是解析 XMLMsg 列并在同一个 DF 中使用从 XML 中提取的列创建其他列。以下是 pyspark DF 的示例结构:ID  XMLMsg101 ...<a><b>name1</b><c>loc1</c></a>...<d>dept1</d>...102 ...<a><b>name2</b><c>loc2</c></a>...<d>dept2</d>...103 ...<a><b>name3</b><c>loc3</c></a>...<d>dept3</d>...预期输出是:ID  XMLMsg                                              b       c       d101 ...<a><b>name1</b><c>loc1</c></a>...<d>dept1</d>... name1   loc1    dept1102 ...<a><b>name2</b><c>loc2</c></a>...<d>dept2</d>... name2   loc2    dept2103 ...<a><b>name3</b><c>loc3</c></a>...<d>dept3</d>... name3   loc3    dept3根据我在 SO 中的搜索,我尝试了一些建议;然而,未能达到预期的效果。因此,寻求一些帮助和指导。谢谢你的时间。
查看完整描述

1 回答

?
largeQ

TA贡献2039条经验 获得超7个赞

考虑到我必须从一个巨大的 XML 文件中获取来自 4 个节点的文本,我最终使用 Lambda 和 UDF 解决了这个问题。由于 XML 文件已经在列中并且是 pyspark Dataframe 的一部分,我不想写成文件并再次解析整个 XML。我还想避免使用 XSD 架构。实际的 xml 有多个命名空间和一些具有特定条件的节点。例子:


<ap:applicationproduct xmlns:xsi="http://www.example.com/2005/XMLSchema-instance" xmlns:ap="http://example.com/productinfo/1_6" xmlns:ct="http://example.com/commontypes/1_0" xmlns:dc="http://example.com/datacontent/1_0" xmlns:tp="http://aexample.com/prmvalue/1_0" ....." schemaVersion="..">

  <ap:ParameterInfo>

    <ap:Header>

      <ct:Version>1.0</ct:Version>

      <ct:Sender>ABC</ct:Sender>

      <ct:SenderVersion />

      <ct:SendTime>...</ct:SendTime>

    </ap:Header>

    <ap:ProductID>

      <ct:Model>

        <ct:Series>34AP</ct:Series>

        <ct:ModelNo>013780</ct:ModelNo>

     ..............

      ..............

   <ap:Object>

        <ap:Parameter schemaVersion="2.5" Code="DDA">

          <dc:Value>

            <tp:Blob>mbQAEAgBTgKQEBAX4KJJYABAIASL0AA==</tp:Blob>

          </dc:Value>

        </ap:Parameter>

.........

........

在这里我需要从 ct:ModelNo 和 tp:Blob 中提取值


from pyspark.sql.types import *

from pyspark.sql.functions import udf

import xml.etree.ElementTree as ET

# List of namespaces to be used:

ns = {'ap' : 'http://example.com/productinfo/1_6', 

'ct':'http://example.com/commontypes/1_0', 

'dc':'http://example.com/datacontent/1_0', 

'tp':'http://aexample.com/prmvalue/1_0' 

    }


parsed_model = (lambda x:    ET.fromstring(x).find('ap:ParameterInfo/ap:ProductID/ct:Model/ct:ModelNo', ns).text)

udf_model = udf(parsed_model)

parsed_model_df = df.withColumn('ModelNo', udf_Model('XMLMsg'))

同样对于具有 blob 值的节点,可以编写类似的函数,但节点的路径将是:'ap:ParameterInfo/ap:Object/ap:Parameter[@Code="DDA"]/dc:Value/tp:Blob'


这对我有用,我能够在 pyspark DF 中添加所需的值。欢迎提出任何建议以使其变得更好。谢谢你!


查看完整回答
反对 回复 2023-05-23
  • 1 回答
  • 0 关注
  • 155 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信