将流计算结果保存到MySQL数据库中
Flink还支持使用自定义的Sink来满足多样化的输出需求。
想要实现自定义的Sink,需要直接或者间接实现SinkFunction接口。通常情况下,我们都是实现其抽象类RichSinkFunction,相比于 SinkFunction,其提供了更多的与生命周期相关的方法。
【示例】改写上一示例,将流计算结果保存到MySQL数据库中。
下面我们自定义一个 FlinkToMySQLSink,将流计算结果写出到指定的MySQL数据库中。
1、在MySQL的xueai8数据库中,创建一个数据表wc,用来存储Flink的计算结果。创建wc表的脚本代码如下:
mysql> create table wc(word varchar(30), cnt int);
2、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-scala/flink-quickstart-java项目模板。(Flink项目创建过程,请参见2.2节)
3、设置依赖。在pom.xml中添加依赖。
4、创建流应用程序类。代码如下:
Scala代码:
import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.functions.sink.SinkFunction.Context import org.apache.flink.streaming.api.scala._ /** * Created by www.xueai8.com * Data Sink:使用addSink(...)方法,指定MySQL数据库作为 Data Sink */ object DataSinkToMysql { // 自定义的 Sink类,继承自 RichSinkFunction class FlinkToMySQLSink extends RichSinkFunction[(String, Int)] { // 定义mysql数据库连接url和驱动程序及账号、密码 val url = "jdbc:mysql://192.168.190.133:3306/xueai8?characterEncoding=UTF-8&useSSL=false" val driver = "com.mysql.jdbc.Driver" val username = "root" val userpwd = "admin" // 声明数据库连接对象和SQL预编译语句 var conn: Connection = _ var stmt: PreparedStatement = _ // 重写open方法:加载驱动,建立连接,预编译SQL语句 // 先于invoker方法用,仅执行一次 override def open(parameters: Configuration): Unit = { super.open(parameters) // 加载驱动程序 Class.forName(driver) // 连接数据库 conn = DriverManager.getConnection(url, username, userpwd) // 执行SQL语句 val sql = "insert into wc(word, cnt) values(?, ?)" stmt = conn.prepareStatement(sql) } // 重写事件驱动方法调用 override def invoke(value: (String, Int), context: Context): Unit = { stmt.setString(1, value._1) stmt.setInt(2, value._2) stmt.executeUpdate } // 关闭资源 // 后于invoke方法调用,仅执行一次 override def close(): Unit = { super.close() if (stmt != null) stmt.close() if (conn != null) conn.close() } } // 测试 def main(args: Array[String]): Unit = { // 设置流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 为流数据启用检查点 env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE) // 得到输入数据,进行转换: val input = env.fromElements("Good good study", "Day day up") .map(_.toLowerCase) // 转小写 .flatMap(_.split("\\W+")) // 相当于先map,再flatten .map((_,1)) // 转换为元组 // 使用自定义的Sink input.addSink(new FlinkToMySQLSink) // 写出到数据库中 // 触发流程序执行 env.execute("Data Sink Demo") } }
Java代码:
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.util.Collector; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; /** * Data Sink:使用addSink(...)方法,指定MySQL数据库作为 Data Sink */ public class DataSinkToMysql { // 自定义的 Sink类,继承自 RichSinkFunction public static class FlinkToMySQLSink extends RichSinkFunction<Tuple2<String,Integer>> { // 定义mysql数据库连接url和驱动程序及账号、密码 private static String JDBC_URL = "jdbc:mysql://localhost:3306/xueai8" + "?characterEncoding=UTF-8&useSSL=false"; private static String USER_NAME = "root"; private static String USER_PWD = "1234"; // 声明数据库连接对象和SQL预编译语句 private PreparedStatement stmt; private Connection conn; // 重写open方法:加载驱动,建立连接,预编译SQL语句 @Override public void open(Configuration parameters) throws Exception { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection(JDBC_URL, USER_NAME, USER_PWD); String sql = "insert into wc(word, cnt) values(?, ?)"; stmt = conn.prepareStatement(sql); } // 重写事件驱动方法调用 @Override public void invoke(Tuple2<String,Integer> t, Context context) throws Exception { stmt.setString(1, t.f0); stmt.setInt(2, t.f1); stmt.executeUpdate(); } // 关闭资源 @Override public void close() throws Exception { super.close(); if (stmt != null) { stmt.close(); } if (conn != null) { conn.close(); } } } public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 为流数据启用检查点 env.enableCheckpointing(2000); // 获得数据,执行map和flatMap转换 DataStream<Tuple2<String,Integer>> stream = env .fromElements("Good good study","Day day up") .map(String::toLowerCase) .flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String,Integer>> out) throws Exception { for(String word : s.split("\\W+")){ out.collect(new Tuple2<>(word,1)); } } }); // 指定使用自定义的Sink stream.addSink(new FlinkToMySQLSink()); // 执行流程序 env.execute("Data Sink Demo"); } }
执行以上程序,然后在MySQL中查看表内容:
mysql> select * from wc;
可以看到,流程序计算的结果已经保存到数据库中了。
在上面的代码中,我们自定义Sink是继承自RichSinkFunction类,它是SinkFunction的“Rich”变体。与SinkFunction相比,它有一些额外的方法,包括:
- open(Configuration c)
- close()
- getRuntimeContext()
在操作符初始化期间调用Open()一次。例如,这是加载一些静态数据或打开到外部服务的连接的机会。而getRuntimeContext()提供了对底层环境的访问能力,例如创建和访问由Flink管理的状态。