Flink中的Watermark机制
Watermark的概念
通常情况下由于网络或者系统等外部因素影响下,事件数据往往不能及时传输至FLink系统中,导致系统的不稳定而造成数据乱序到达或者延迟达到等问题,因此需要有一种机制能够控制数据处理的进度。
具体来讲,在创建一个基于时间的window后,需要确定属于该window的数据元素是否已经全部到达,确定后才可以对window中的所有数据做计算处理(如汇总、分组),如果数据并没有全部到达,则继续等待该窗口的数据全部到达后再开始计算。
但是对于但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。在这种情况下就需要用到水位线 (Watermark) 机制。
Watermark的作用
它能够衡量数据处理进度,保证事件数据全部到达Flink系统,即使数据乱序或者延迟到达,也能够像预期一样计算出正确和连续的结果。通常watermark是结合window来实现。
Watermark的原理
在 Flink 的窗口处理过程中,如果确定全部数据到达,就可以对 Window 的所有数据做窗口计算操作(如汇总、分组等),如果数据没有全部到达,则继续等待该窗口中的数据全部到达才开始处理。
这种情况下就需要用到水位线(WaterMarks)机制,它能够衡量数据处理进度(表达数据到达的完整性),保证事件数据(全部)到达Flink系统,或者在乱序及延迟到达时,也能够像预期一样计算出正确并且连续的结果。
当任何 Event 进入到 Flink 系统时,会根据当前最大事件时间产生 Watermarks 时间戳。
计算 Watermark 的值
Watermark = 进入 Flink 的最大的事件产生时间(maxEventTime)- 指定的乱序时间(t)
有 Watermark 的 Window 是如何触发窗口函数
(1) watermark >= window的结束时间
(2) 该窗口必须有数据 注意:[window_start_time,window_end_time) 中有数据存在,前闭后开区间
Watermark 本质可以理解成一个延迟触发机制。
Watermark 的使用存在三种情况:
(1)有序的数据流中的watermark
如果数据元素的事件时间是有序的,Watermark 时间戳会随着数据元素的事件时间按顺序生成,此时水位线的变化和事件时间保持一直(因为既然是有序的时间,就不需要设置延迟了,那么 t 就是 0。所以 watermark=maxtime-0 = maxtime),也就是理想状态下的水位线。当 Watermark 时间大于 Windows 结束时间就会触发对 Windows 的数据计算,以此类推, 下一个 Window 也是一样。
(2)乱序的数据流watermark
现实情况下数据元素往往并不是按照其产生顺序接入到 Flink 系统中进行处理,而频繁出现乱序或迟到的情况,这种情况就需要使用 Watermarks 来应对。
(3)并行数据流中的 Watermark
在多并行度的情况下,Watermark 会有一个对齐机制,这个对齐机制会取所有 Channel 中最小的 Watermark。
watermark和eventtime
1 有序数据流中引入 Watermark 和 EventTime
将数据中的timestamp根据指定的字段提取得到Eventtime,然后使用Eventtime作为最新的watermark, 这种适合于事件按顺序生成,没有乱序事件的情况。
* 对于有序的数据,代码比较简洁,主要需要从源 Event 中抽取 EventTime。
* 代码:对socket中有序(按照时间递增)的数据流,进行每5s处理一次
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.functions.{MapFunction}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object OrderedStreamWaterMark {
def main(args: Array[String]): Unit = {
//todo:1.构建流式处理环境
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
environment.setParallelism(1)
//todo:2.设置时间类型
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//todo:3.获取数据源
val sourceStream: DataStream[String] = environment.socketTextStream("node01",9999)
//todo:4. 数据处理
val mapStream: DataStream[(String, Long)] = sourceStream.map(x=>(x.split(",")(0),x.split(",")(1).toLong))
//todo: 5.从源Event中抽取eventTime
val watermarkStream: DataStream[(String, Long)] = mapStream.assignAscendingTimestamps(x=>x._2)
//todo:6. 数据计算
watermarkStream.keyBy(0)
.timeWindow(Time.seconds(5))
.process(new ProcessWindowFunction[(String, Long),(String,Long),Tuple,TimeWindow] {
override def process(key: Tuple, context: Context, elements: Iterable[(String, Long)], out: Collector[(String, Long)]): Unit = {
val value: String = key.getField[String](0)
//窗口的开始时间
val startTime: Long = context.window.getStart
//窗口的结束时间
val startEnd: Long = context.window.getEnd
//获取当前的 watermark
val watermark: Long = context.currentWatermark
var sum:Long = 0
val toList: List[(String, Long)] = elements.toList
for(eachElement <- toList){
sum +=1
}
println("窗口的数据条数:"+sum+
" |窗口的第一条数据:"+toList.head+
" |窗口的最后一条数据:"+toList.last+
" |窗口的开始时间: "+ startTime+
" |窗口的结束时间: "+ startEnd+
" |当前的watermark:"+ watermark)
out.collect((value,sum))
}
}).print()
environment.execute()
}
}
2 乱序数据流中引入 Watermark 和 EventTime
对于乱序数据流,有两种常见的引入方法:周期性和间断性
1、With Periodic(周期性的) Watermark
周期性地生成 Watermark 的生成,默认是 100ms。每隔 N 毫秒自动向流里注入一个 Watermark,时间间隔由 streamEnv.getConfig.setAutoWatermarkInterval()决定。
* 代码:对socket中无序数据流,进行每5s处理一次,数据中会有延迟
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
//对无序的数据流周期性的添加水印
object OutOfOrderStreamPeriodicWaterMark {
def main(args: Array[String]): Unit = {
//todo:1.构建流式处理环境
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
environment.setParallelism(1)
//todo:2.设置时间类型
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//todo:3.获取数据源
val sourceStream: DataStream[String] = environment.socketTextStream("node01",9999)
//todo:4. 数据处理
val mapStream: DataStream[(String, Long)] = sourceStream.map(x=>(x.split(",")(0),x.split(",")(1).toLong))
//todo:5. 添加水位线
mapStream.assignTimestampsAndWatermarks(
new AssignerWithPeriodicWatermarks[(String, Long)] {
//定义延迟时间长度
//表示在5秒以内的数据延时有效,超过5秒的数据被认定为迟到事件
val maxOutOfOrderness=5000L
//历史最大事件时间
var currentMaxTimestamp:Long=_
var watermark:Watermark=_
//周期性的生成水位线watermark
override def getCurrentWatermark: Watermark ={
watermark = new Watermark(currentMaxTimestamp -maxOutOfOrderness)
watermark
}
//抽取事件时间
override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long ={
//获取事件时间
val currentElementEventTime: Long = element._2
//对比当前事件时间和历史最大事件时间, 将较大值重新赋值给currentMaxTimestamp
currentMaxTimestamp=Math.max(currentMaxTimestamp,currentElementEventTime)
println("接受到的事件:"+element+" |事件时间: "+currentElementEventTime)
currentElementEventTime
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.process(new ProcessWindowFunction[(String, Long),(String,Long),Tuple,TimeWindow] {
override def process(key: Tuple, context: Context, elements: Iterable[(String, Long)], out: Collector[(String, Long)]): Unit = {
val value: String = key.getField[String](0)
//窗口的开始时间
val startTime: Long = context.window.getStart
//窗口的结束时间
val startEnd: Long = context.window.getEnd
//获取当前的 watermark
val watermark: Long = context.currentWatermark
var sum:Long = 0
val toList: List[(String, Long)] = elements.toList
for(eachElement <- toList){
sum +=1
}
//窗口的开始时间
val startTime: Long = context.window.getStart
//窗口的结束时间
val startEnd: Long = context.window.getEnd
//获取当前的 watermark
val watermark: Long = context.currentWatermark
var sum:Long = 0
val toList: List[(String, Long)] = elements.toList
for(eachElement <- toList){
sum +=1
}
println("窗口的数据条数:"+sum+
" |窗口的第一条数据:"+toList.head+
" |窗口的最后一条数据:"+toList.last+
" |窗口的开始时间: "+ startTime +
" |窗口的结束时间: "+ startEnd+
" |当前的watermark:"+watermark)
out.collect((value,sum))
}
}).print()
environment.execute()
}
2、With Punctuated(间断性的) Watermark
间断性的生成 Watermark 一般是基于某些事件触发 Watermark 的生成和发送。
比如说只给用户id为000001的添加watermark,其他的用户就不添加
Window 的allowedLateness处理延迟太大的数据
基于 Event-Time 的窗口处理流式数据,虽然提供了 Watermark 机制,却只能在一定程度上解决了数据乱序的问题。但在某些情况下数据可能延时会非常严重,即使通过 Watermark 机制也无法等到数据全部进入窗口再进行处理。
Flink 中默认会将这些迟到的数据做丢弃处理,但是有些时候用户希望即使数据延迟到达的情况下,也能够正常按照流程处理并输出结果,此时就需要使用 Allowed Lateness 机制来对迟到的数据进行额外的处理。
* 迟到数据的处理机制:
* 1、直接丢弃
* 2、指定允许再次迟到的时间
assignTimestampsAndWatermarks(new EventTimeExtractor() )
.keyBy(0)
.timeWindow(Time.seconds(3))
.allowedLateness(Time.seconds(2)) // 允许事件再迟到2秒
.process(new SumProcessWindowFunction())
.print().setParallelism(1);
//注意:
//(1). 当我们设置允许迟到2秒的事件,第一次 window 触发的条件是 watermark >= window_end_time
//(2). 第二次(或者多次)触发的条件是watermark < window_end_time + allowedLateness
* 3、收集迟到太多的数据
assignTimestampsAndWatermarks(new EventTimeExtractor() )
.keyBy(0)
.timeWindow(Time.seconds(3))
.allowedLateness(Time.seconds(2)) //允许事件再迟到2秒
.sideOutputLateData(outputTag) //收集迟到太多的数据
.process(new SumProcessWindowFunction())
.print().setParallelism(1);
3 多并行度下的WaterMark
本地测试的过程中,如果不设置并行度的话,默认读取本机CPU数量设置并行度,可以手动设置并行度environment.setParallelism(1),每一个线程都会有一个watermark.
多并行度的情况下,一个window可能会接受到多个不同线程waterMark,
* watermark对齐会取所有channel最小的watermark,以最小的watermark为准。