示例:读写XML文件到DataFrame
Apache Spark可用于处理或读取简单到复杂的嵌套XML文件到Spark DataFrame中,并将DataFrame写回XML,通过使用Databricks的 Spark XML API (spark-xml)库。
Databricks Spark XML API (spark-xml)库
Databricks的 Spark XML API (spark-xml)库是一个用Apache Spark解析和查询XML数据的库,用于Spark SQL和DataFrames。结构和测试工具大多是从Spark的CSV数据源中复制的。
spark-xml支持以分布式的方式处理无格式的XML文件,不像Spark中的JSON数据源限制内联JSON格式。Spark XML API 参考。
对于Maven项目,在pom.xml文件中添加如下依赖:
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-xml_2.12</artifactId>
<version>0.16.0</version>
</dependency>
如果在Spark shell中使用,可以通过--packages命令行选项将该包添加到Spark。例如:
$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-xml_2.12:0.16.0
如果在Zeppelin Notebook中使用,可以通过以下配置加载该包到Spark中(在执行任何Spark代码之前先执行):
%spark.conf spark.jars.packages com.databricks:spark-xml_2.12:0.16.0
或者:
%spark.conf spark.jars /home/hduser/software/spark-xml_2.12-0.16.0.jar
Spark读取XML到DataFrame
Databricks Spark-XML包允许我们将简单或嵌套的XML文件读取到DataFrame中,一旦DataFrame被创建,我们就可以利用它的API来执行转换和操作,就像任何其他DataFrame一样。
Spark-XML API在读取XML文件时接受几个选项。例如,选项rowTag用于指定行标签。rootTag用于指定输入嵌套XML的根标签。
例如,有下面这样的一个XML文件
<persons>
<person id="1">
<username>张三</username>
<nickname></nickname>
<dob_year>1980</dob_year>
<dob_month>1</dob_month>
<gender>男</gender>
<salary currency="元">10000</salary>
<addresses>
<address>
<street>新华大街21号</street>
<city>石家庄市</city>
<state>河北省</state>
</address>
<address>
<street>红红火火烧烤大街456号</street>
<city>淄博市</city>
<state>山东省</state>
</address>
</addresses>
</person>
<person id="2">
<username>李小四</username>
<nickname>四儿</nickname>
<dob_year>1990</dob_year>
<dob_month>6</dob_month>
<gender>男</gender>
<salary currency="元">10000</salary>
<addresses>
<address>
<street>长安大街4512号</street>
<city>北京市</city>
<state>北京</state>
</address>
<address>
<street>宝塔路4367号</street>
<city>延安市</city>
<state>陕西省</state>
</address>
</addresses>
</person>
</persons>
将其读取到DataFrame中,代码如下:
// xml文件路径
val xmlPath = "file:///home/hduser/data/spark/peoples.xml"
// 加载
val df = spark.read.format("xml")
.format("com.databricks.spark.xml")
.option("rowTag", "person")
.load(xmlPath)
// 查看模式
df.printSchema()
当API将XML文件读入DataFrame时,它会根据数据自动推断模式。下面是df.printSchma()的模式输出。
root |-- _id: long (nullable = true) |-- addresses: struct (nullable = true) | |-- address: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- city: string (nullable = true) | | | |-- state: string (nullable = true) | | | |-- street: string (nullable = true) |-- dob_month: long (nullable = true) |-- dob_year: long (nullable = true) |-- gender: string (nullable = true) |-- nickname: string (nullable = true) |-- salary: struct (nullable = true) | |-- _VALUE: long (nullable = true) | |-- _currency: string (nullable = true) |-- username: string (nullable = true)
我们还可以提供自己的结构模式,并在读取文件时使用它,代码如下:
import org.apache.spark.sql.types._
// 定义模式
val schema = new StructType()
.add("_id",LongType)
.add("username",StringType)
.add("nickname",StringType)
.add("dob_year",LongType)
.add("dob_month",LongType)
.add("gender",StringType)
.add("salary",StringType)
.add("addresses",StringType)
// 使用自定义模式加载
val df = spark.read.format("xml")
.format("com.databricks.spark.xml")
.option("rowTag", "person")
.load(xmlPath)
// 查看
df.show(false)
执行以上代码,输出内容如下:
+---+-----------------------------------------------------------------------------+---------+--------+------+--------+-----------+--------+
|_id|addresses |dob_month|dob_year|gender|nickname|salary |username|
+---+-----------------------------------------------------------------------------+---------+--------+------+--------+-----------+--------+
|1 |{[{石家庄市, 河北省, 新华大街21号}, {淄博市, 山东省, 红红火火烧烤大街456号}]}|1 |1980 |男 | |{10000, 元}|张三 |
|2 |{[{北京市, 北京, 长安大街4512号}, {延安市, 陕西省, 宝塔路4367号}]} |6 |1990 |男 |四儿 |{10000, 元}|李小四 |
+---+-----------------------------------------------------------------------------+---------+--------+------+--------+-----------+--------+
处理XML属性
“_”被添加到属性的变量前缀中,例如,_VALUE & _currency是XML文件中的属性。我们可以使用attributePrefix选项将前缀更改为任何特殊字符。可以使用excludeAttribute选项禁用处理属性。
Spark写DataFrame到XML文件
使用DataFrameWriter的.format(“com.databricks.spark.xml”)格式方法,将Spark DataFrame写入XML文件。该数据源作为Spark-XML API的一部分提供。
与读类似,写也使用选项rootTag和rowTag分别指定输出XML文件上的根标签和行标签。
// 要写出的目录,事先要不存在
val xmlOutput = "file:///home/hduser/data/spark/persons_xml"
// 写出到xml文件中
df.select("username","nickname","dob_year","dob_month","gender","salary","addresses")
.write
.format("com.databricks.spark.xml")
.option("rootTag", "persons")
.option("rowTag", "person")
.save(xmlOutput)
此代码片段将Spark DataFrame df 写入XML文件persons_xml,其中persons作为根标签,person作为行标签。
限制
这个API在读取和写入简单的XML文件时非常有用。然而,在撰写本文时,该API具有以下限制:
- 这个API不支持根元素的读写属性。
- 不支持复杂的XML结构,如果希望读取header和footer以及行元素。