我的编程空间,编程开发者的网络收藏夹
学习永远不晚

「Flink」事件时间与水印

短信预约 信息系统项目管理师 报名、考试、查分时间动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

「Flink」事件时间与水印

「Flink」事件时间与水印

我们先来以滚动时间窗口为例,来看一下窗口的几个时间参数与Flink流处理系统时间特性的关系。

获取窗口开始时间Flink源代码

获取窗口的开始时间为以下代码:

org.apache.flink.streaming.api.windowing.windows.TimeWindow

public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
    }

这一段代码,我们可以认为Flink并不是把时间戳直接作为窗口的开始时间,而是做了一些“对齐”操作,确保时间能够整除8。

不同时间类型的窗口时间计算

1、当TimeCharacteristic为ProcessingTime时

窗口的开始时间:与窗口接收到的第一条消息的处理时间有关。例如:window operator是2020-02-06 22:02:33接收到的第一条消息,那么窗口的开始时间就是2020-02-06 22:02:33。

窗口的结束时间:一旦窗口的开始时间确定了,因为窗口的长度是固定的。那么窗口的结束时间就确定下来了,例如:假设这里的时间窗口是3秒,那么窗口的结束时间就是2020-02-06 22:02:36。

窗口的触发计算时间:假设有一条新的消息到达window operator,此时如果对应operator的系统时间,大于结束时间,就会触发计算。

一旦窗口的开始时间确定了,那么后续窗口的开始时间,也就都确定下来了。

问题:

假设某个时间窗口,2020-2-6 22:12:20 - 2020-2-6 22:12:23,之间没有任何一条数据进来。Flink会如何处理?

Flink会直接抛弃掉这个时间窗口,新来的事件消息会到其他的时间窗口中计算。


2、当TimeCharacteristic为IngestionTime时

窗口的开始时间:与source operator接收到的第一条消息有关。例如:source接收到这条消息的时间是2020-2-6 22:14:50,那么窗口的开始时间就是2020-2-6 22:14:50

窗口的结束时间:与ProcessTime一致

窗口的触发计算时间:假设有一条新的消息到达source operator,那么此时的时间如果大于结束时间,就会触发计算。


除了窗口的开始时间、触发时间都是与source operator算子有关,其他与Processing Time是类似的。


3、但TimeCharacteristic为EventTime时

窗口的开始时间:与window operator接收到的第一条消息的事件时间有关,例如:如果这条消息的水印时间是2020-2-6 22:17:50,那么窗口的的开始时间就是2020-2-6 22:17:50

窗口的结束时间:与ProcessTime一致

窗口的触发计算时间:假设有一条新的消息到达window operator,如果该事件的水印时间大于窗口的结束时间,就会触发计算。

通常,我们会让水印时间比事件时间允许延迟几秒钟。这样,如果是因为网络延迟消息晚到了几秒,也不会影响到统计结果了。

public class WordCountWindow {
    public static void main(String[] args) throws Exception {
        // 1. 初始化流式运行环境
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

        // 2. 设置时间处理类型,这里设置的方式处理时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 3. 定义数据源,每秒发送一个hadoop单词
        SingleOutputStreamOperator> wordDSWithWaterMark = env.addSource(new RichSourceFunction>() {

            private boolean isCanaled = false;
            private int TOTAL_NUM = 20;

            @Override
            public void run(SourceContext> ctx) throws Exception {
                while (!isCanaled) {
                    ctx.collect(Tuple2.of("hadooop", System.currentTimeMillis()));

                    // 打印窗口开始、结束时间
                    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    System.out.println("事件发送时间:" + sdf.format(System.currentTimeMillis()));
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanaled = true;
            }
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor>(Time.seconds(5)) {
            @Override
            public long extractTimestamp(Tuple2 element) {
                return element.f1;
            }
        });

        // 4. 每5秒进行一次,分组统计
        // 4.1 转换为元组
        wordDSWithWaterMark.map(word -> {
            return Tuple2.of(word.f0, 1);

        })
                // 指定返回类型
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                // 按照单词进行分组
                .keyBy(t -> t.f0)
                // 滚动窗口,3秒计算一次
                .timeWindow(Time.seconds(3))
                .reduce(new ReduceFunction>() {
                    @Override
                    public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                }, new RichWindowFunction, Tuple2, String, TimeWindow>() {
                    @Override
                    public void apply(String word, TimeWindow window, Iterable> input, Collector> out) throws Exception {

                        // 打印窗口开始、结束时间
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        System.out.println("窗口开始时间:" + sdf.format(window.getStart())
                                + " 窗口结束时间:" + sdf.format(window.getEnd())
                                + " 窗口计算时间:" + sdf.format(System.currentTimeMillis()));

                        int sum = 0;
                        Iterator> iterator = input.iterator();
                        while(iterator.hasNext()) {
                            Integer count = iterator.next().f1;
                            sum += count;
                        }
                        out.collect(Tuple2.of(word, sum));
                    }
                }).print();

        env.execute("app");
    }
}

输出结果如下:

事件发送时间:2020-02-06 22:35:08
事件发送时间:2020-02-06 22:35:09
事件发送时间:2020-02-06 22:35:10
事件发送时间:2020-02-06 22:35:11
事件发送时间:2020-02-06 22:35:12
事件发送时间:2020-02-06 22:35:13
事件发送时间:2020-02-06 22:35:14
窗口开始时间:2020-02-06 22:35:06 窗口结束时间:2020-02-06 22:35:09 窗口计算时间:2020-02-06 22:35:14
4> (hadooop,1)

事件发送时间:2020-02-06 22:35:15
事件发送时间:2020-02-06 22:35:16
事件发送时间:2020-02-06 22:35:17
窗口开始时间:2020-02-06 22:35:09 窗口结束时间:2020-02-06 22:35:12 窗口计算时间:2020-02-06 22:35:17
4> (hadooop,3)


参考文件:

https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/event_time.html

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

「Flink」事件时间与水印

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

「Flink」事件时间与水印

我们先来以滚动时间窗口为例,来看一下窗口的几个时间参数与Flink流处理系统时间特性的关系。获取窗口开始时间Flink源代码获取窗口的开始时间为以下代码:org.apache.flink.streaming.api.windowing.windows.Time
「Flink」事件时间与水印
2019-07-13

2021下半年软件水平考试准考证打印时间

  很多考生关注“2021下半年软件水平考试准考证打印时间”,准备参加2021下半年软考的考生赶紧来看下软件水平考试准考证打印时间及入口吧!  各省2021下半年软件水平考试准考证打印时间会有所不同,大部分地区在考前一周打印,考生可以关注2021下半年软考准考证打印时间及入口,查看当地软件水平考
2021下半年软件水平考试准考证打印时间
2024-04-19

Android视频处理之动态时间水印效果

最近的项目中遇到一个非常头痛的需求,在Android端录制视频的时候动态添加像监控画面一样的精确到秒的时间信息,关键是,并不是说只在播放器的界面显示时间就可以了,而是录制到视频里面去,这个MP4在电脑上播放也能看到每个画面的时间。 最后想
2022-06-06

2021下半年软件水平考试准考证打印时间及入口

  很多考生关注“2021下半年软件水平考试准考证打印时间及入口”,准备参加2021下半年软考的考生赶紧来看下软件水平考试准考证打印时间及入口吧!  各省2021下半年软件水平考试准考证打印时间会有所不同,大部分地区在考前一周打印,考生可以关注2021下半年软考准考证打印时间及入口,查看当地软件
2021下半年软件水平考试准考证打印时间及入口
2024-04-19

2020软件水平考试报名时间

  很多考生准备报名2020软件水平考试,那么什么时候报名呢?一起来看看2020年软件水平考试报名时间吧!今天编程学习网小编来为大家解答。  2020软件水平考试报名时间是什么时候?软件水平考试每年有两次考试,上半年报名时间和下半年报名时间有所不同,各省报名时间也有所不同。  2020上半年软件水平考试报名时间  上半年软
2020软件水平考试报名时间
2024-04-18

浙江2019下半年计算机软件水平考试准考证打印时间

      浙江2019下半年计算机软件水平考试准考证打印时间已公布,更多关于计算机软件水平考试动态  根据《关于2019年下半年计算机技术与软件专业技术资格(水平)考试浙江考区考务工作有关事宜的通知》得知,浙江2019下半年计算机软件水平考试准考证打印时间如下:  参加考试  全省各市政
浙江2019下半年计算机软件水平考试准考证打印时间
2024-04-18

2024上半年计算机软件水平考试准考证打印时间汇总

2024上半年计算机软件水平考试准考证打印时间5月20日开始,各地区2024上半年计算机软件水平考试准考证打印时间具体不同,编程学习网小编将各地区2024上半年计算机软件水平考试准考证打印时间进行了汇总,详见正文。
2024上半年计算机软件水平考试准考证打印时间汇总
2024-04-23

2023年软件水平考试报名时间

  2023年软件水平考试报名时间是什么时候?不少考生准备报名2023年软件水平考试,赶紧来看下2023年软件水平考试什么时候可以报名吧!  软件水平考试每年有两次考试,根据往年各省软件水平考试报名时间安排来看,编程学习网小编预计2023年上半年软件水平考试报名时间在3月份开始;2023年下半年软件水平考试报名时间在8月份
2023年软件水平考试报名时间
2024-04-19

2019年软件水平考试报名时间

软件水平考试每年有两次考试,分别安排在上半年和下半年,准备报名的考生跟编程学习网小编一起来看看那么2019年软件水平考试报名时间是什么时候呢?根据往年软件水平考试报名时间来看,2019上半年报名预计各省从2月底陆续开始;2019下半年报名预计各省从7月底陆续开始。各地区软件水平考试报名入口有所不同,部分地区是在全国软考办官
2019年软件水平考试报名时间
2024-04-18

2021年软件水平考试报名时间

  2021年软件水平考试报名时间是什么时候?不少考生准备报名2021年软件水平考试,那么什么时候报名呢?赶紧来看下2021年软件水平考试报名时间吧!今天编程学习网小编就给大家详细来介绍一下。  2021年软件水平考试时间已经公布,考试时间分别安排在5月29日、30日和11月6日、7日。  2021年软件水平考试报名时间 
2021年软件水平考试报名时间
2024-04-18

2020年计算机软件水平考试时间

  2020年计算机软件水平考试时间是什么时候?准备报名2020年计算机软考的考生赶紧来看下考试时间吧!今天编程学习网小编来为大家解答。  计算机软件水平考试每年有两次考试,根据人力资源社会保障部办公厅关于2020年度专业技术人员职业资格考试计划及有关事项的通知,2020上半年软考考试时间:5月23、24日,2020下半年
2020年计算机软件水平考试时间
2024-04-18

2019年江苏软件水平考试报名时间

2019年江苏软件水平考试报名时间呢?准备报名的考生跟编程学习网小编一起来看看2019年江苏软件水平考试报名时间吧?根据往年江苏软件水平考试报名时间来看,2019上半年江苏软件水平考试预计3月中旬开始报名,下半年预计8月初开始报名。考生可以登录中国计算机技术职业资格网,点击考生报名入口,进行网上报名。网上报名后,等待报名审
2019年江苏软件水平考试报名时间
2024-04-18

python获取文件修改时间与创建时间

转载自:  http://blog.csdn.net/liyuan_669/article/details/25347037import osimport time y = time.ctie(os.stat("D:\\test\\1.tx
2023-01-31

阿里云服务器泡水事件回顾与解决方案

阿里云服务器是一种高可用性的云计算产品,提供稳定可靠的计算能力。然而,近日有一份关于阿里云服务器泡水的报告,引起了人们的关注。这篇文章将详细回顾该事件,分析其原因,并提出解决方案。事件回顾:近日,一份关于阿里云服务器泡水的报告在网络上流传。报告称,一家名为“蓝天科技”的企业,在购买阿里云服务器后,由于服务器未安装
阿里云服务器泡水事件回顾与解决方案
2023-11-02

2020年全国计算机软件水平考试时间

  不少考生在关注2020年全国计算机软件水平考试时间,那么2020年软件水平考试是什么时候呢?今天编程学习网小编来为大家解答。  根据人力资源社会保障部办公厅发布的《2020年度专业技术人员职业资格考试工作计划》,2020年上半年全国计算机软件水平考试时间为5月23日,2020年下半年全国计算机软件水平考试时间为11月7
2020年全国计算机软件水平考试时间
2024-04-18

编程热搜

目录