Flink完全分布式集群安装

Flink支持完全分布式模式,这时它由一个master节点和多个worker节点构成。在本节,我们将搭建一个如下的三个节点的Flink集群。

Flink完全分布式集群搭建步骤如下:

1、配置从master到worker节点的SSH无密登录,并保持保节点上相同的目录结构。

(1) 在每台机器上,执行如下命令:

$ ssh localhost
$ ssh exit			# 记得最后通过这个命令退出ssh连接

(2)在master上,使用如下命令生成公私钥:

$ cd .ssh
$ ssh-keygen -t rsa

然后一路回车,在.ssh下生成公私钥。

(3)将master上的公钥分别加入master、worker1和worker2机器的授权文件中。

在master机器上,执行如下命令:

$ ssh-copy-id hduser@master
$ ssh-copy-id hduser@worker1
$ ssh-copy-id hduser@worker2

(4)测试。在master机器上,使用ssh分别连接master、worker1和worker2:

$ ssh master
$ ssh worker1
$ ssh worker2

这时会发现不需要输入密码,直接就ssh连接上了这两台机器。

2、Flink要求在主节点和所有工作节点上设置JAVA_HOME环境变量,并指向Java安装的目录。

使用如下命令检查Java的安装和版本信息:

$ java -version

3、下载Flink安装包。下载地址:下载Flink安装包 。可以选择任何喜欢的Hadoop/Scala组合。

4、将下载的最新版本的Flink压缩包拷贝到master节点的"~/software/"目录下,并解压缩到"~/bigdata/"目录下。

步骤如下:

$ cd ~/bigdata/
$ tar xzf ~/software/flink-1.10.0-bin-scala_2.11.tgz
$ cd flink-1.10.0

5、在master节点上配置Flink

所有的配置都在"conf/flink-conf.yaml"文件中。在实际应用中,以下几个配置项是非常重要的:

  • jobmanager.heap.mb:每个JobManager的可用内存量,以MB为单位。
  • taskmanager.heap.mb:每个TaskManager的可用内存量,以MB为单位。
  • taskmanager.numberOfTaskSlots:每台机器上可用的cpu数量,默认为1。
  • parallelism.default:集群中cpu的总数。
  • io.tmp.dirs:临时目录。

首先用编辑器nano打开该配置文件(你也可以用任何你喜欢的编辑器,如vim,都可以)。

$ nano conf/flink-conf.yaml

编辑如下内容(注意,冒号后面一定要有一个空格):

jobmanager.rpc.address: master		// 指向master节点
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m		// 定义允许JVM在每个节点上分配的最大主内存量
taskmanager.memory.process.size: 1024m
taskmanager.numberOfTaskSlots: 2
parallelism.default: 1

6、每个节点下的Flink必须保持相同的目录内容。因此将配置好的Flink拷贝到集群中的另外两个节点work01和work02,使用如下的命令:

$ scp -r ~/bigdata/flink-1.10.0  hduser@worker01:~/bigdata/
$ scp -r ~/bigdata/flink-1.10.0  hduser@worker02:~/bigdata/

7、最后,必须提供集群中所有用作worker节点的列表。在"conf/slaves"文件中添加每个slave节点信息(IP或hostname均可),每个节点一行,如下所示。每个工作节点稍后将运行一个TaskManager:

master
worker1
worker2

8、启动集群:

$ ./bin/start-cluster.sh

这个脚本会在本地节点启动一个JobManager并通过SSH连接到所有的worker节点(在slaves文件中列出的) 以启动每个节点上的TaskManager。注意观察启动过程中的输出信息,如下:

Starting cluster.
Starting standalonesession daemon on host master.
Starting taskexecutor daemon on host master.
Starting taskexecutor daemon on host worker1.
Starting taskexecutor daemon on host worker2.

可以看出,Flink先在master上启动standalonesession进程,然后依次在master、worker1和worker2上启动taskexecutor进程。

启动以后,分别在master、worker1和worker2节点上执行jps命令,查看各节点上的进程是否正常启动了。

9、关闭集群

$ ./bin/stop-cluster.sh

也可以分别停止JobManager和TaskManager。

执行以下命令,停止单个的Job Manager:

$ ./bin/jobmanager.sh stop cluster

执行以下命令,停止单个的Task Manager:

$ ./bin/taskmanager.sh stop cluster

执行Flink自带的流处理程序-单词计数

1、首先,启动netcat服务器,运行在9000端口:

$ nc -l 9000

2、在另一个终端,启动Flink示例程序,监听netcat服务器。它将从套接字中读取文本,并每5秒打印前5秒内每个不同单词出现的次数,即处理时间的滚动窗口。

$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname master --port 9000

3、回到第一个终端窗口,在正在运行的netcat终端窗口,随意输入一些内容,单词之间用空格分隔,Flink将会处理它。

good good study
day day up

4、分别使用ssh登录master、worker01和worker02节点,并执行以下命令,查看日志中的输出:

$ cd ~/bigdata/flink-1.10.0
$ tail -f  log/flink-*-taskexecutor-*.out

可以看到如下输出结果:

good : 2
study : 1
day : 2
up : 1

5、还可以检查Flink Web UI来查看Job是怎样执行的。

打开浏览器,输入地址:http://master:8081 ,可查看检查调度程序的web前端。web前端应该报告有三个可用的TaskManager实例,以及正在执行的作业。Flink WebUI包含许多关于Flink集群及其作业(JobGraph、指标、检查点统计、TaskManager状态等)的有用而有趣的信息。

点击正在运行的作业,查看作业运行的详细信息,如下图所示:

运行Flink自带的批处理程序-单词计数程序

Flink安装包自带了一个以文本文件作为数据源的单词计数程序,位于Flink下的"example/batch/"目录下的WordCount.jar包中。可以执行下面的命令来在Flink集群上执行该程序,读取HDFS上的输入数据文件进行处理,并输出计算结果到HDFS上。

注:从flink 1.8开始,Hadoop不再包含在Flink的安装包中,所以需要单独下载并拷贝到Flink的lib目录下。请从Flink官网下载 flink-shaded-hadoop2-uber-2.7.5-1.10.0.jar并拷贝到Flink的lib目录下。

$ start-dfs.sh
$ ./bin/flink run ./examples/batch/WordCount.jar \
--input  hdfs://master:8020/flink_data/wc.txt \
--output hdfs://master:8020/flink_data/result

上面的命令是在HDFS中运行WordCount,其中--input参数指定要处理的输入文件,--output指定计算结果输出到的结果目录(事先要不存在)。(注:如果不加hdfs://前缀,默认使用本地文件系统)

执行以下命令查询输出结果:

$ hdfs dfs -cat hdfs://master:8020/flink_data/result/*

可以看到以下计算结果:

day 2
good 2
study 1
up 1

《Flink原理深入与编程实战》