博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark SQL将数据写入Mysql表的一些坑
阅读量:2396 次
发布时间:2019-05-10

本文共 1306 字,大约阅读时间需要 4 分钟。

最近,在使用Spark SQL分析一些数据,要求将分析之后的结果数据存入到相应的MySQL表中。

    但是将数据处理完了之后,存入Mysql时,报错了:

    

   代码的基本形式为: 

[java] 
 
  1. val r1: Dataset[Row]  = data.groupBy(***)...  
  2.   
  3. r1.write.jdbc(url,"iptimecount",prop)  
    根据图片中的报错,搜索资料,得知是由于Spark SQL 中的Save Mode导致的,中对Savemode进行了说明:

    

    默认情况下,使用SaveMode.ErrorIfExists,也就是说,当从Spark中插入到MySQL表中的时候,如果表已经存在,则直接报错,想想真觉得这默认值有点坑。

 于是修改Savemode,将代码改成:

   

[java] 
 
  1. r1.write.mode(SaveMode.Append).jdbc(url,"iptimecount",prop)  

   再次执行,本以为应该会顺利存入到数据库中了,没想到还是报错:


    看到这个错误,我有点茫然,不清楚是哪里的问题。

    后来,在一次测试中,将MySQL中将原来的表格删除后,再次提交任务,能顺利将数据存入到MySQL中,但是,使用desc查看表的结构,发现在Spark SQL中列类型为String类型的,在MySQL中对应为Text类型,于是我猜测应该是我之前创建的表格中,将列的类型定义为char和varchar导致的。

    于是,我删除表格,重新创建表格,将char和varchar类型改为Text,再次执行,顺利的将数据从Spark SQL中存入到了Mysql.

         另附一些注意事项:

数据存入Mysql注意事项
A. 尽量先设置好存储模式
默认为
SaveMode.ErrorIfExists
模式,该模式下,如果数据库中已经存在该表,则会直接报异常,导致数据不能存入数据库.另外三种模式如下:
SaveMode.Append 如果表已经存在,则追加在该表中;若该表不存在,则会先创建表,再插入数据;
SaveMode.Overwrite 重写模式,其实质是先将已有的表及其数据全都删除,再重新创建该表,最后插入新的数据;
SaveMode.Ignore 若表不存在,则创建表,并存入数据;在表存在的情况下,直接跳过数据的存储,不会报错。
B. 设置存储模式的步骤为:
org.apache.spark.sql.SaveMode
......
df.write.
mode(SaveMode.Append)
C. 若提前在数据库中手动创建表,需要注意
列名称和数据类型
下面的源码说明了,需要
保证Spark SQL中schema中的field name与Mysql中的列名称一致!
若提前手动创建Mysql表,需要注意Spark SQL 中Schema中的数据类型与Mysql中的数据类型的对应关系,如下图所示:
特别注意: Scala中的String类型,在MySQL中对应的是Text类型(经过亲自测试所知)
上面是本人在Spark SQL 读取与写入Mysql方面的遇到的一些坑,特在此备忘。

你可能感兴趣的文章
Oracle--锁(概述、分类)
查看>>
Oracle--加锁的方法
查看>>
Lambda表达式语法
查看>>
Lambda函数式接口
查看>>
Lambda方法引用、构造器引用、闭包
查看>>
Lambda表达式使用场景及实例
查看>>
Docker概述
查看>>
Docker--基本组成
查看>>
数据结构--循环双链表实现、详解
查看>>
数据结构--优先队列实现、模拟线程调度
查看>>
Java并发--Java中的13个原子操作类详解
查看>>
Java并发--数据依赖性、as-if-aerial、程序顺序规则、重排序对多线程的影响
查看>>
数据结构--数组、使用数组表示矩阵
查看>>
数据结构--稀疏矩阵常用的三种压缩存储(顺序、行单链表、十字链表)
查看>>
Java并发--监视器(monitor)、等待/通知机制
查看>>
Zookeeper--数据初始化过程
查看>>
Zookeeper--数据同步
查看>>
Zookeeper--配置详解
查看>>
Swagger2注解详细说明
查看>>
使用Turbine聚合监控
查看>>