PySpark SQL: 将Python字典列表转换为PySpark DataFrame

2021-11-05 09:26:58.0

1、从字典推断schema

从Spark 2.x开始,可直接从python字典推断出schema。

# Dict List
data = [{"Category": 'Category A', "ID": 1, "Value": 12.40},
        {"Category": 'Category B', "ID": 2, "Value": 30.10},
        {"Category": 'Category C', "ID": 3, "Value": 100.01}
       ]

# 创建DataFrame
df = spark.createDataFrame(data)
print(df.schema)
df.show()

执行以上代码,输出结果如下:

StructType(List(StructField(Category,StringType,true),StructField(ID,LongType,true),StructField(Value,DoubleType,true)))
+----------+---+------+
|  Category| ID| Value|
+----------+---+------+
|Category A|  1|  12.4|
|Category B|  2|  30.1|
|Category C|  3|100.01|
+----------+---+------+

2、使用Row对象

在这个代码中,我们使用了pyspark.sql.Row用于解析字典项,还使用**解包每个字典中的关键字。

from pyspark.sql import Row

# Dict List
data = [{"Category": 'Category A', "ID": 1, "Value": 12.40},
        {"Category": 'Category B', "ID": 2, "Value": 30.10},
        {"Category": 'Category C', "ID": 3, "Value": 100.01}
       ]

# 创建DataFrame
df = spark.createDataFrame([Row(**i) for i in data])
print(df.schema)
df.show()

执行以上代码,输出结果如下:

StructType(List(StructField(Category,StringType,true),StructField(ID,LongType,true),StructField(Value,DoubleType,true)))
+----------+---+------+
|  Category| ID| Value|
+----------+---+------+
|Category A|  1|  12.4|
|Category B|  2|  30.1|
|Category C|  3|100.01|
+----------+---+------+

3、显式指定schema

当然,我们可以显式地定义DataFrame的模式。在下面的代码中,我们根据字典中的数据类型来定义模式:

from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType, DecimalType
from decimal import Decimal

# Dict List
data = [{"Category": 'Category A', "ID": 1, "Value": Decimal(12.40)},
        {"Category": 'Category B', "ID": 2, "Value": Decimal(30.10)},
        {"Category": 'Category C', "ID": 3, "Value": Decimal(100.01)}
       ]

# 构造schema
schema = StructType([
    StructField('Category', StringType(), False),
    StructField('ID', IntegerType(), False),
    StructField('Value', DecimalType(scale=2), True)
])

# 创建DataFrame
df = spark.createDataFrame(data, schema)
print(df.schema)
df.show()

执行以上代码,输出结果如下:

StructType(List(StructField(Category,StringType,false),StructField(ID,IntegerType,false),StructField(Value,DecimalType(10,2),true)))
+----------+---+------+
|  Category| ID| Value|
+----------+---+------+
|Category A|  1| 12.40|
|Category B|  2| 30.10|
|Category C|  3|100.01|
+----------+---+------+

《Spark原理深入与编程实战》