发布日期:2023-06-15 VIP内容

示例:读写XML文件到DataFrame

Apache Spark可用于处理或读取简单到复杂的嵌套XML文件到Spark DataFrame中,并将DataFrame写回XML,通过使用Databricks的 Spark XML API (spark-xml)库。

Databricks Spark XML API (spark-xml)库

Databricks的 Spark XML API (spark-xml)库是一个用Apache Spark解析和查询XML数据的库,用于Spark SQL和DataFrames。结构和测试工具大多是从Spark的CSV数据源中复制的。

spark-xml支持以分布式的方式处理无格式的XML文件,不像Spark中的JSON数据源限制内联JSON格式。Spark XML API 参考

对于Maven项目,在pom.xml文件中添加如下依赖:

<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-xml_2.12</artifactId>
    <version>0.16.0</version>
</dependency>

如果在Spark shell中使用,可以通过--packages命令行选项将该包添加到Spark。例如:

$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-xml_2.12:0.16.0

如果在Zeppelin Notebook中使用,可以通过以下配置加载该包到Spark中(在执行任何Spark代码之前先执行):

%spark.conf
spark.jars.packages   com.databricks:spark-xml_2.12:0.16.0

或者:

%spark.conf
spark.jars   /home/hduser/software/spark-xml_2.12-0.16.0.jar

Spark读取XML到DataFrame

Databricks Spark-XML包允许我们将简单或嵌套的XML文件读取到DataFrame中,一旦DataFrame被创建,我们就可以利用它的API来执行转换和操作,就像任何其他DataFrame一样。

Spark-XML API在读取XML文件时接受几个选项。例如,选项rowTag用于指定行标签。rootTag用于指定输入嵌套XML的根标签。

例如,有下面这样的一个XML文件

<persons>
    <person id="1">
        <username>张三</username>
        <nickname></nickname>
        <dob_year>1980</dob_year>
        <dob_month>1</dob_month>
        <gender>男</gender>
        <salary currency="元">10000</salary>
        <addresses>
            <address>
                <street>新华大街21号</street>
                <city>石家庄市</city>
                <state>河北省</state>
            </address>
            <address>
                <street>红红火火烧烤大街456号</street>
                <city>淄博市</city>
                <state>山东省</state>
            </address>
        </addresses>
    </person>
    <person id="2">
        <username>李小四</username>
        <nickname>四儿</nickname>
        <dob_year>1990</dob_year>
        <dob_month>6</dob_month>
        <gender>男</gender>
        <salary currency="元">10000</salary>
        <addresses>
            <address>
                <street>长安大街4512号</street>
                <city>北京市</city>
                <state>北京</state>
            </address>
            <address>
                <street>宝塔路4367号</street>
                <city>延安市</city>
                <state>陕西省</state>
            </address>
        </addresses>
    </person>
</persons>

将其读取到DataFrame中,代码如下:

// xml文件路径
val xmlPath = "file:///home/hduser/data/spark/peoples.xml"

// 加载
val df = spark.read.format("xml")
    .format("com.databricks.spark.xml")
    .option("rowTag", "person")
    .load(xmlPath)

// 查看模式
df.printSchema()

当API将XML文件读入DataFrame时,它会根据数据自动推断模式。下面是df.printSchma()的模式输出。

root
 |-- _id: long (nullable = true)
 |-- addresses: struct (nullable = true)
 |    |-- address: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- city: string (nullable = true)
 |    |    |    |-- state: string (nullable = true)
 |    |    |    |-- street: string (nullable = true)
 |-- dob_month: long (nullable = true)
 |-- dob_year: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- nickname: string (nullable = true)
 |-- salary: struct (nullable = true)
 |    |-- _VALUE: long (nullable = true)
 |    |-- _currency: string (nullable = true)
 |-- username: string (nullable = true)

我们还可以提供自己的结构模式,并在读取文件时使用它,代码如下:

import org.apache.spark.sql.types._

// 定义模式
val schema = new StructType()
      .add("_id",LongType)
      .add("username",StringType)
      .add("nickname",StringType)
      .add("dob_year",LongType)
      .add("dob_month",LongType)
      .add("gender",StringType)
      .add("salary",StringType)
      .add("addresses",StringType)

// 使用自定义模式加载
val df = spark.read.format("xml")
    .format("com.databricks.spark.xml")
    .option("rowTag", "person")
    .load(xmlPath)
 
// 查看
df.show(false)

执行以上代码,输出内容如下:

+---+-----------------------------------------------------------------------------+---------+--------+------+--------+-----------+--------+
|_id|addresses                                                                    |dob_month|dob_year|gender|nickname|salary     |username|
+---+-----------------------------------------------------------------------------+---------+--------+------+--------+-----------+--------+
|1  |{[{石家庄市, 河北省, 新华大街21号}, {淄博市, 山东省, 红红火火烧烤大街456号}]}|1        |1980    |男    |        |{10000, 元}|张三    |
|2  |{[{北京市, 北京, 长安大街4512号}, {延安市, 陕西省, 宝塔路4367号}]}           |6        |1990    |男    |四儿    |{10000, 元}|李小四  |
+---+-----------------------------------------------------------------------------+---------+--------+------+--------+-----------+--------+

处理XML属性

“_”被添加到属性的变量前缀中,例如,_VALUE & _currency是XML文件中的属性。我们可以使用attributePrefix选项将前缀更改为任何特殊字符。可以使用excludeAttribute选项禁用处理属性。

Spark写DataFrame到XML文件

使用DataFrameWriter的.format(“com.databricks.spark.xml”)格式方法,将Spark DataFrame写入XML文件。该数据源作为Spark-XML API的一部分提供。

与读类似,写也使用选项rootTag和rowTag分别指定输出XML文件上的根标签和行标签。

// 要写出的目录,事先要不存在
val xmlOutput = "file:///home/hduser/data/spark/persons_xml"

// 写出到xml文件中
df.select("username","nickname","dob_year","dob_month","gender","salary","addresses")
  .write
  .format("com.databricks.spark.xml")
  .option("rootTag", "persons")
  .option("rowTag", "person")
  .save(xmlOutput) 

此代码片段将Spark DataFrame df 写入XML文件persons_xml,其中persons作为根标签,person作为行标签。

限制

这个API在读取和写入简单的XML文件时非常有用。然而,在撰写本文时,该API具有以下限制:

  • 这个API不支持根元素的读写属性。
  • 不支持复杂的XML结构,如果希望读取header和footer以及行元素。