读取Socket数据源
DataStream API支持从Socket套接字读取数据。我们只需要指定要从其中读取数据的主机和端口号即可。读取Socket套接字的数据源函数定义如下:
- socketTextStream(hostName, port)
- socketTextStream(hostName, port, delimiter):可指定分隔符。
- socketTextStream(hostName, port, delimiter, maxRetry):还可以指定API应该尝试获取数据的最大次数。
【示例】Socket数据源: 流应用程序示例,它接收来自web套接字的单词。
Scala实现:
import org.apache.flink.streaming.api.scala._ object SocketSourceDemo { val HOST = "192.168.190.133" // host主机 val PORT = 9999 // 端口号 def main(args: Array[String]): Unit = { // 设置流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 连接Socket数据源 val input = env.socketTextStream(HOST, PORT) // 流数据转换,并打印结果 input .map(_.toLowerCase) .flatMap(_.split("\\W+")) .print // 触发流程序执行 env.execute("Flink Socket Source") } }
Java实现:
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class SocketSourceDemo { public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 源数据流 String host = "xueai8"; // host主机或IP地址 int port = 9999; // 端口号 DataStream<String> input = env.socketTextStream(host, port); // 对DataStream进行转换,向map和flatMap传入匿名内容类 input.map(new MapFunction<String, String>() { @Override public String map(String s) throws Exception { return s.toLowerCase(); // 先转小写 } }).flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String s, Collector<String> collector) throws Exception { for(String word : s.split("\\W+")){ // 再分词 collector.collect(word); // 向下游发送 } } }).print(); // 执行流程序 env.execute("Flink Socket Source"); } }
执行过程
1)打开一个终端窗口,执行如下的命令,启动一个netcat服务器,运行在9999端口:
$ nc -lk 9999
2)运行流程序
3)在netcat运行窗口,输入以下内容,并回车:
Good good study day day up
4)在程序执行窗口,可以看到如下计算输出:
5> good 5> good 5> study 6> day 6> day 6> up