发布日期:2023-03-16 VIP内容

Spark读写Hive中文列名乱码问题

Spark SQL支持将DataFrame数据保存到Hive表中,并从中读取数据。通常大家在定义表的列名时,用的都是英文单词或者拼音,这没有什么问题。

但在最近的某个项目中,为了分析及可视化方便,计划将Spark DataFrame中包含中文列名的数据存储在Hive表中,数据如下:

+----------+----------------------+----------------------+
|year_month|富士苹果全国平均批发价|红富士苹果集贸市场价格|
+----------+----------------------+----------------------+
|    202103|                  7.05|                   9.8|
|    202104|                  6.99|                   9.6|
|    202105|                   6.7|                   9.4|
|    202106|                  6.38|                  9.34|
|    202107|                  6.25|                  9.46|
|    202108|                  6.34|                  9.53|
|    202109|                  6.62|                  9.47|
|    202110|                  6.67|                   9.3|
|    202111|                  6.63|                  9.28|
|    202112|                  6.66|                  9.42|
|    202201|                  6.62|                  9.74|
|    202202|                  6.92|                  9.71|
|    202203|                  6.91|                  9.95|
|    202204|                  7.26|                 10.21|
|    202205|                  7.88|                  10.5|
|    202206|                  8.13|                 10.92|
|    202207|                  8.31|                 11.11|
|    202208|                  8.43|                 11.23|
|    202209|                  8.51|                 11.31|
|    202210|                  8.32|                 11.09|
+----------+----------------------+----------------------+
only showing top 20 rows

将该包含中文标题的DataFrame写入Hive表中的代码如下:

// 写入DWS
df
    .write
    .format("parquet")
    .mode("overwrite") 
    .saveAsTable("dws.price_data")    

写入过程很顺利。然后为了测试写入是否成功,执行下面的代码,将刚刚写入的数据从Hive表中读取出来并显示,代码如下:

// 查看
spark.table("dws.price_data").show   

不幸的是,读取过程并不顺利,而是抛出了如下的异常信息:

org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema: `?????`, `????`, `???`
  at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:120)
  at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:55)
  at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:75)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:441)
  at org.apache.spark.sql.execution.datasources.FindDataSourceTable.$anonfun$readDataSourceTable$1(DataSourceStrategy.scala:261)
  at org.sparkproject.guava.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
  at org.sparkproject.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
  at org.sparkproject.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
  at org.sparkproject.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
  at org.sparkproject.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
  at org.sparkproject.guava.cache.LocalCache.get(LocalCache.java:4000)
  at org.sparkproject.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:155)
  at org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:249)
  at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:288)   
...

虽然上面的异常全是英文,看上去很吓人,我也看不懂,但不要怕,就看第一行AynlysisException这个单词后面的信息:

org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema: `?????`, `????`, `???`

还好,我认识duplicate column(s)这两个单词,意思是“有重复的列”。而后面若干的问号(?),很熟悉,中文乱码时不就是这样显示吗?所以我猜测,应该是在读取的时候,因为编码的原因,Spark SQL获取的中文列名都是问号(?)这样的乱码,因此不同的中文列名,有些却都是相同的乱码问号,因此就造成了重复列名的错误。究其根本,还是中文编码的问题。

那么像列名这样的信息,是存储在哪里呢?学过小白学苑Hive大数据课程的同学都清楚,列名这样的元数据信息,肯定是存在Hive的metastore中,具体来说,就是配置Hive时创建的MySQL相关元数据表。因此,经过到MySQL中名为Hive的数据库(即Hive的元数据存储数据库)中一番探索,发现像列名和列数据类型这样的元数据信息,存储在MySQL的名为Hive的数据库中的名为TABLE_PARAMS表中。好,就是它了!

接下来,查看TABLE_PARAMS表的定义语句。在mysql中,执行如下语句:

use hive;

show create table TABLE_PARAMS;

然后可以看到创建TABLE_PARAMS表的语句如下:

 CREATE TABLE `TABLE_PARAMS` (
  `TBL_ID` bigint(20) NOT NULL,
  `PARAM_KEY` varchar(256) CHARACTER SET latin1 COLLATE latin1_bin NOT NULL,
  `PARAM_VALUE` mediumtext CHARACTER SET latin1 COLLATE latin1_bin,
  PRIMARY KEY (`TBL_ID`,`PARAM_KEY`),
  KEY `TABLE_PARAMS_N49` (`TBL_ID`),
  CONSTRAINT `TABLE_PARAMS_FK1` FOREIGN KEY (`TBL_ID`) REFERENCES `TBLS` (`TBL_ID`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1 

注意到,其中的`PARAM_VALUE`字段是用来存储列的定义的,定的编码是latin1。因此中文列名就显示为问号(?)。

既然找到了原因,那我们把`PARAM_VALUE`字段的编码修改为utf8即可。在mysql中执行如下语句修改:

ALTER TABLE TABLE_PARAMS modify column PARAM_VALUE mediumtext character set utf8;   

执行完该修改语句,再次调用"show create table TABLE_PARAMS;"语句查看,表定义语句如下:

 CREATE TABLE `TABLE_PARAMS` (
  `TBL_ID` bigint(20) NOT NULL,
  `PARAM_KEY` varchar(256) CHARACTER SET latin1 COLLATE latin1_bin NOT NULL,
  `PARAM_VALUE` mediumtext CHARACTER SET utf8,
  PRIMARY KEY (`TBL_ID`,`PARAM_KEY`),
  KEY `TABLE_PARAMS_N49` (`TBL_ID`),
  CONSTRAINT `TABLE_PARAMS_FK1` FOREIGN KEY (`TBL_ID`) REFERENCES `TBLS` (`TBL_ID`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1

然后,重启MySQL服务器,命令如下:

$ sudo service mysql restart

接下来,记得先把之前写入的Hive表删除,使用的Spark SQL代码如下:

spark.sql("drop table dws.price_data")

最后,重新执行Spark SQL写入Hive表并读取的代码:

// 写入DWS
df
    .write
    .format("parquet")
    .mode("overwrite") 
    .saveAsTable("dws.price_data")    

// 查看
spark.table("dws.price_data").show   

此时,会欣喜地发现,读取带有中文列名的Hive表数据是如此地丝滑,毫无异常!