Flink中的Watermark机制

标签:无 1705人阅读 评论(0)
分类:

Watermark的概念

   通常情况下由于网络或者系统等外部因素影响下,事件数据往往不能及时传输至FLink系统中,导致系统的不稳定而造成数据乱序到达或者延迟达到等问题,因此需要有一种机制能够控制数据处理的进度。

   具体来讲,在创建一个基于时间的window后,需要确定属于该window的数据元素是否已经全部到达,确定后才可以对window中的所有数据做计算处理(如汇总、分组),如果数据并没有全部到达,则继续等待该窗口的数据全部到达后再开始计算。 

   但是对于但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。在这种情况下就需要用到水位线 (Watermark) 机制。

Watermark的作用

    它能够衡量数据处理进度,保证事件数据全部到达Flink系统,即使数据乱序或者延迟到达,也能够像预期一样计算出正确和连续的结果。通常watermark是结合window来实现。

Watermark的原理

   在 Flink 的窗口处理过程中,如果确定全部数据到达,就可以对 Window 的所有数据做窗口计算操作(如汇总、分组等),如果数据没有全部到达,则继续等待该窗口中的数据全部到达才开始处理。

   这种情况下就需要用到水位线(WaterMarks)机制,它能够衡量数据处理进度(表达数据到达的完整性),保证事件数据(全部)到达Flink系统,或者在乱序及延迟到达时,也能够像预期一样计算出正确并且连续的结果。

   当任何 Event 进入到 Flink 系统时,会根据当前最大事件时间产生 Watermarks 时间戳。 

计算 Watermark 的值

Watermark = 进入 Flink 的最大的事件产生时间(maxEventTime)- 指定的乱序时间(t)

有 Watermark 的 Window 是如何触发窗口函数

(1) watermark >= window的结束时间

(2) 该窗口必须有数据  注意:[window_start_time,window_end_time) 中有数据存在,前闭后开区间

Watermark 本质可以理解成一个延迟触发机制。

Watermark 的使用存在三种情况:

(1)有序的数据流中的watermark

      如果数据元素的事件时间是有序的,Watermark 时间戳会随着数据元素的事件时间按顺序生成,此时水位线的变化和事件时间保持一直(因为既然是有序的时间,就不需要设置延迟了,那么 t 就是 0。所以 watermark=maxtime-0 = maxtime),也就是理想状态下的水位线。当 Watermark 时间大于 Windows 结束时间就会触发对 Windows 的数据计算,以此类推, 下一个 Window 也是一样。

(2)乱序的数据流watermark

      现实情况下数据元素往往并不是按照其产生顺序接入到 Flink 系统中进行处理,而频繁出现乱序或迟到的情况,这种情况就需要使用 Watermarks 来应对。

(3)并行数据流中的 Watermark 

      在多并行度的情况下,Watermark 会有一个对齐机制,这个对齐机制会取所有 Channel 中最小的 Watermark。


watermark和eventtime

1 有序数据流中引入 Watermark 和 EventTime

将数据中的timestamp根据指定的字段提取得到Eventtime,然后使用Eventtime作为最新的watermark, 这种适合于事件按顺序生成,没有乱序事件的情况。

* 对于有序的数据,代码比较简洁,主要需要从源 Event 中抽取 EventTime。

* 代码:对socket中有序(按照时间递增)的数据流,进行每5s处理一次

import org.apache.commons.lang3.time.FastDateFormat

import org.apache.flink.api.common.functions.{MapFunction}

import org.apache.flink.api.java.tuple.Tuple

import org.apache.flink.streaming.api.TimeCharacteristic

import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

import org.apache.flink.streaming.api.windowing.time.Time

import org.apache.flink.streaming.api.windowing.windows.TimeWindow

import org.apache.flink.util.Collector

object OrderedStreamWaterMark {

  def main(args: Array[String]): Unit = {

      //todo:1.构建流式处理环境

      val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

      import org.apache.flink.api.scala._

      environment.setParallelism(1)

     //todo:2.设置时间类型

     environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //todo:3.获取数据源

      val sourceStream: DataStream[String] = environment.socketTextStream("node01",9999)

    //todo:4. 数据处理

      val mapStream: DataStream[(String, Long)] = sourceStream.map(x=>(x.split(",")(0),x.split(",")(1).toLong))

    //todo: 5.从源Event中抽取eventTime

        val watermarkStream: DataStream[(String, Long)] = mapStream.assignAscendingTimestamps(x=>x._2)

    //todo:6. 数据计算

     watermarkStream.keyBy(0)

                    .timeWindow(Time.seconds(5))

                    .process(new ProcessWindowFunction[(String, Long),(String,Long),Tuple,TimeWindow] {

                        override def process(key: Tuple, context: Context, elements: Iterable[(String, Long)], out: Collector[(String, Long)]): Unit = {

                          val value: String = key.getField[String](0)

                          //窗口的开始时间

                          val startTime: Long = context.window.getStart

                          //窗口的结束时间

                          val startEnd: Long = context.window.getEnd

                          //获取当前的 watermark

                          val watermark: Long = context.currentWatermark

                          var sum:Long = 0

                          val toList: List[(String, Long)] = elements.toList

                          for(eachElement <-  toList){

                            sum +=1

                          }

                          println("窗口的数据条数:"+sum+

                            " |窗口的第一条数据:"+toList.head+

                            " |窗口的最后一条数据:"+toList.last+

                            " |窗口的开始时间: "+ startTime+

                            " |窗口的结束时间: "+ startEnd+

                            " |当前的watermark:"+ watermark)

                          out.collect((value,sum))

                        }

              }).print()

    environment.execute()

  }

}



2 乱序数据流中引入 Watermark 和 EventTime

对于乱序数据流,有两种常见的引入方法:周期性和间断性

1、With Periodic(周期性的) Watermark

      周期性地生成 Watermark 的生成,默认是 100ms。每隔 N 毫秒自动向流里注入一个 Watermark,时间间隔由 streamEnv.getConfig.setAutoWatermarkInterval()决定。

  * 代码:对socket中无序数据流,进行每5s处理一次,数据中会有延迟

  import org.apache.flink.api.java.tuple.Tuple

  import org.apache.flink.streaming.api.TimeCharacteristic

  import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks

  import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

  import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction

  import org.apache.flink.streaming.api.watermark.Watermark

  import org.apache.flink.streaming.api.windowing.time.Time

  import org.apache.flink.streaming.api.windowing.windows.TimeWindow

  import org.apache.flink.util.Collector

    //对无序的数据流周期性的添加水印

    object OutOfOrderStreamPeriodicWaterMark {

      def main(args: Array[String]): Unit = {

           //todo:1.构建流式处理环境

      val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

      import org.apache.flink.api.scala._

      environment.setParallelism(1)

           //todo:2.设置时间类型

     environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //todo:3.获取数据源

      val sourceStream: DataStream[String] = environment.socketTextStream("node01",9999)

    //todo:4. 数据处理

      val mapStream: DataStream[(String, Long)] = sourceStream.map(x=>(x.split(",")(0),x.split(",")(1).toLong))

          //todo:5. 添加水位线

    mapStream.assignTimestampsAndWatermarks(

        new AssignerWithPeriodicWatermarks[(String, Long)] {

        //定义延迟时间长度

        //表示在5秒以内的数据延时有效,超过5秒的数据被认定为迟到事件

        val maxOutOfOrderness=5000L

        //历史最大事件时间

        var currentMaxTimestamp:Long=_

        var watermark:Watermark=_

         //周期性的生成水位线watermark

        override def getCurrentWatermark: Watermark ={

          watermark =  new Watermark(currentMaxTimestamp -maxOutOfOrderness)

          watermark

        }

        //抽取事件时间

        override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long ={

            //获取事件时间

            val currentElementEventTime: Long = element._2

            //对比当前事件时间和历史最大事件时间, 将较大值重新赋值给currentMaxTimestamp

            currentMaxTimestamp=Math.max(currentMaxTimestamp,currentElementEventTime)

            println("接受到的事件:"+element+" |事件时间: "+currentElementEventTime)

            currentElementEventTime

        }

      })

          .keyBy(0)

          .timeWindow(Time.seconds(5))

          .process(new ProcessWindowFunction[(String, Long),(String,Long),Tuple,TimeWindow] {

            override def process(key: Tuple, context: Context, elements: Iterable[(String, Long)], out: Collector[(String, Long)]): Unit = {

         val value: String = key.getField[String](0)

             //窗口的开始时间

              val startTime: Long = context.window.getStart

              //窗口的结束时间

              val startEnd: Long = context.window.getEnd

              //获取当前的 watermark

              val watermark: Long = context.currentWatermark

              var sum:Long = 0

              val toList: List[(String, Long)] = elements.toList

              for(eachElement <-  toList){

                sum +=1

              }

              //窗口的开始时间

              val startTime: Long = context.window.getStart

              //窗口的结束时间

              val startEnd: Long = context.window.getEnd

              //获取当前的 watermark

              val watermark: Long = context.currentWatermark

              var sum:Long = 0

              val toList: List[(String, Long)] = elements.toList

              for(eachElement <-  toList){

                sum +=1

              }  

              println("窗口的数据条数:"+sum+

                " |窗口的第一条数据:"+toList.head+

                " |窗口的最后一条数据:"+toList.last+

                " |窗口的开始时间: "+  startTime +

                " |窗口的结束时间: "+ startEnd+

                " |当前的watermark:"+watermark)

              out.collect((value,sum))

            }

          }).print()       

         environment.execute() 

      }  


2、With Punctuated(间断性的) Watermark

  间断性的生成 Watermark 一般是基于某些事件触发 Watermark 的生成和发送。

  比如说只给用户id为000001的添加watermark,其他的用户就不添加

 


    Window 的allowedLateness处理延迟太大的数据

    基于 Event-Time 的窗口处理流式数据,虽然提供了 Watermark 机制,却只能在一定程度上解决了数据乱序的问题。但在某些情况下数据可能延时会非常严重,即使通过 Watermark 机制也无法等到数据全部进入窗口再进行处理。

    Flink 中默认会将这些迟到的数据做丢弃处理,但是有些时候用户希望即使数据延迟到达的情况下,也能够正常按照流程处理并输出结果,此时就需要使用 Allowed Lateness 机制来对迟到的数据进行额外的处理。

* 迟到数据的处理机制:

  * 1、直接丢弃

  * 2、指定允许再次迟到的时间

    assignTimestampsAndWatermarks(new EventTimeExtractor() )

                    .keyBy(0)

                    .timeWindow(Time.seconds(3))

                    .allowedLateness(Time.seconds(2)) // 允许事件再迟到2秒

                    .process(new SumProcessWindowFunction())

                    .print().setParallelism(1);

    //注意:

    //(1). 当我们设置允许迟到2秒的事件,第一次 window 触发的条件是 watermark >= window_end_time

    //(2). 第二次(或者多次)触发的条件是watermark < window_end_time + allowedLateness

  * 3、收集迟到太多的数据

    assignTimestampsAndWatermarks(new EventTimeExtractor() )

                    .keyBy(0)

                    .timeWindow(Time.seconds(3))

                    .allowedLateness(Time.seconds(2)) //允许事件再迟到2秒

                    .sideOutputLateData(outputTag)    //收集迟到太多的数据

                    .process(new SumProcessWindowFunction())

                    .print().setParallelism(1);

 

3 多并行度下的WaterMark

    本地测试的过程中,如果不设置并行度的话,默认读取本机CPU数量设置并行度,可以手动设置并行度environment.setParallelism(1),每一个线程都会有一个watermark.

    多并行度的情况下,一个window可能会接受到多个不同线程waterMark,

* watermark对齐会取所有channel最小的watermark,以最小的watermark为准。

查看评论

暂无评论

发表评论
  • 评论内容:
      
首页
团队介绍
发展历史
组织结构
MESA大事记
新闻中心
通知
组内动态
科研成果
专利
论文
项目
获奖
软著
人才培养
MESA毕业生
MESA在读生
MESA员工
招贤纳士
走进MESA
学长分享
招聘通知
招生宣传
知识库
文章
地址:北京市朝阳区华严北里甲22号楼五层 | 邮编:100029
邮箱:nelist@iie.ac.cn
京ICP备15019404号-1