案例:服务器故障检测报警程序
对于Hadoop运维人员,需要监控生产环境大数据各个组件的状态信息,由于Regionserver经常挂掉,这里模拟监控Regionserver的状态信息,每次Regionserver上线和下线都会发送一个状态信息。这里如果Regionserver挂掉后,服务自启动功能在30秒以内没有将Regionserver拉起(这里只是为了测试使用30秒),那么就进行持续的告警,直到收到上线消息,告警取消。
1)这里消息模拟从socket接收服务器告警消息,消息格式包含三个字段分别是:主机名hostname,告警时间time,状态status(RUNNING服务正常,DEAD服务停止);
2)接收消息之后对数据流按照主机名进行分组,对于状态为DEAD的消息,设置定时器30秒以内如果状态不恢复为RUNNING,则定时进行告警,如果30秒内恢复RUNNING状态,则认为上一条消息是误报,则删除定时器,取消报警。
Scala代码:
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector /** * 服务器故障检测报警程序 */ object KeyedProcessFunDemo2 { // 服务器信息类型:主机名,时间,状态(RUNNING-正常,DEAD-宕机) case class MessageInfo(hostname: String, msgTime: Stri ............
抱歉,只有登录会员才可浏览!会员登录