读取Socket数据源

DataStream API支持从Socket套接字读取数据。我们只需要指定要从其中读取数据的主机和端口号即可。读取Socket套接字的数据源函数定义如下:

  • socketTextStream(hostName, port)
  • socketTextStream(hostName, port, delimiter):可指定分隔符。
  • socketTextStream(hostName, port, delimiter, maxRetry):还可以指定API应该尝试获取数据的最大次数。

【示例】Socket数据源: 流应用程序示例,它接收来自web套接字的单词。

Scala实现:

import org.apache.flink.streaming.api.scala._

object SocketSourceDemo {

  val HOST = "192.168.190.133"      // host主机
  val PORT = 9999                   // 端口号

  def main(args: Array[String]): Unit = {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 连接Socket数据源
    val input = env.socketTextStream(HOST, PORT)

    // 流数据转换,并打印结果
    input
      .map(_.toLowerCase)
      .flatMap(_.split("\\W+"))
      .print

    // 触发流程序执行
    env.execute("Flink Socket Source")
  }
}

Java实现:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketSourceDemo {                         

	public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 源数据流
        String host = "xueai8";						// host主机或IP地址
        int port = 9999;							// 端口号
        DataStream<String> input = env.socketTextStream(host, port);

        // 对DataStream进行转换,向map和flatMap传入匿名内容类
        input.map(new MapFunction<String, String>() {
			@Override
			public String map(String s) throws Exception {
				return s.toLowerCase();			// 先转小写
			}
        }).flatMap(new FlatMapFunction<String, String>() {
			@Override
			public void flatMap(String s, Collector<String> collector) throws Exception {
				for(String word : s.split("\\W+")){	// 再分词
					collector.collect(word);		// 向下游发送
				}
			}
		}).print();

		// 执行流程序
		env.execute("Flink Socket Source");
	}
}

执行过程

1)打开一个终端窗口,执行如下的命令,启动一个netcat服务器,运行在9999端口:

  $ nc -lk 9999

2)运行流程序

3)在netcat运行窗口,输入以下内容,并回车:

Good good study
day day up

4)在程序执行窗口,可以看到如下计算输出:

5> good
5> good
5> study
6> day
6> day
6> up

《PySpark原理深入与编程实战》