Flink-SQL Window


关于窗口这里不在展开论述,之前已经写到过了,本文重点关注SQL如何实现Flink Window的效果。本次使用的kafka数据JSON格式如下:

     "{\"Id\":9990,\"Name\":\"Nmae_9990\",\"Operation\":\"favourite\",\"t\":1630396020,\"opt\":2}"
package com.hph.bean;

public class UserAction {
    private int Id;
    private String Name;
    private String Operation;
    private Long t;
    private int opt;

    public int getId() {
        return Id;
    }

    public void setId(int id) {
        Id = id;
    }

    public String getName() {
        return Name;
    }

    public void setName(String name) {
        Name = name;
    }

    public String getOperation() {
        return Operation;
    }

    public void setOperation(String operation) {
        Operation = operation;
    }

    public Long getT() {
        return t;
    }

    public void setT(Long t) {
        this.t = t;
    }

    public int getOpt() {
        return opt;
    }

    public void setOpt(int opt) {
        this.opt = opt;
    }

    public UserAction(int id, String name, String operation, Long t, int opt) {
        Id = id;
        Name = name;
        Operation = operation;
        this.t = t;
        this.opt = opt;
    }

    @Override
    public String toString() {
        return "UserAction{" +
                "Id=" + Id +
                ", Name='" + Name + '\'' +
                ", Operation='" + Operation + '\'' +
                ", t=" + t +
                ", opt=" + opt +
                '}';
    }
}

测试发送kafka数据程序如下。

package com.hph.producer;

import com.google.gson.Gson;
import com.hph.bean.UserAction;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.IOException;
import java.util.Properties;
import java.util.Random;

public class UserActionProducer {

    public static void main(String[] args) throws IOException, InterruptedException {

        Properties props = new Properties();
        // Kafka服务端的主机名和端口号
        props.put("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092");
        // 等待所有副本节点的应答
        props.put("acks", "all");
        // 消息发送最大尝试次数
        props.put("retries", 0);
        // 一批消息处理大小
        props.put("batch.size", 16384);
        // 请求延时
        props.put("linger.ms", 1);
        // 发送缓存区内存大小
        props.put("buffer.memory", 33554432);
        // key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Gson gson = new Gson();
        Producer<String, String> producer = new KafkaProducer<>(props);

        String[] action = {"click", "favourite", "add", "commit", "post"};

        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            int opt = new Random().nextInt(100);
            long timestamp = System.currentTimeMillis() / 1000;
            //String id, String name, String operation, Long timestamp, String opt
            UserAction userAction = new UserAction(i, "Nmae_" + i, action[new Random().nextInt(3)], timestamp, new Random().nextInt(3));
            String json = gson.toJson(userAction);
            System.out.println(json);
            producer.send(new ProducerRecord<String, String>("us_ac", json));

        }

        producer.close();
    }

}

从Kafka监控中我们可以看到数据总条数为142006条。

下面主要演示的以处理时间为时间属性的Window SQL

ProcessTime

TumbleWindowsSQL

package com.hph.sql.window.processtime;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

/**
 * 滚动窗口Demo
 */

public class TumbleWindowsSQL {
    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);


        //{"Id":455006,"Name":"Nmae_455006","Operation":"favourite","timestamp":1629293081,"opt":0}
        //创建Kafka 数据源
        tableEnv.executeSql("CREATE TABLE KafkaTable (\n" +
                "  `Id` INT,\n" +
                "  `Name` STRING,\n" +
                "  `Operation` STRING,\n" +
                "   `opt`       STRING,\n" +
                "  `t` AS PROCTIME()\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'us_ac',\n" +
                "  'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092,hadoop104:9092',\n" +
                "  'properties.group.id' = 'tumble',\n" +
                "  'scan.startup.mode' = 'earliest-offset',\n" +
                "  'format' = 'json'\n" +
                ")");


        //SQL 1
        // GROUP BY TUMBLE(t, INTERVAL '1' MINUTES) 相当于根据1m的时间来划分窗口
        // TUMBLE_START(t, INTERVAL '1' MINUTES) 获取窗口的开始时间
        // TUMBLE_END(t, INTERVAL '1' MINUTES) 获取窗口的结束时间
  /*      tableEnv.executeSql("SELECT TUMBLE_START(t, INTERVAL '1' MINUTES) AS window_start," +
                "TUMBLE_END(t, INTERVAL '1' MINUTES) AS window_end, count(Id) as One_PV FROM   KafkaTable"
                + " GROUP BY TUMBLE(t, INTERVAL '1' MINUTES)").print();
*/
        // SQL2 
        tableEnv.executeSql(" SELECT window_start, window_end,  count(Id) as Two_PV\n" +
                "  FROM TABLE(\n" +
                "    TUMBLE(TABLE KafkaTable, DESCRIPTOR(t), INTERVAL '1' MINUTES))\n" +
                "  GROUP BY window_start, window_end").print();

    }
}

CumulateWind

package com.hph.sql.window.processtime;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;


public class CumulateWindowSQL {
    public static void main(String[] args) {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);

        //{"Id":455006,"Name":"Nmae_455006","Operation":"favourite","timestamp":1629293081,"opt":0}
        //创建Kafka 数据源
        tableEnv.executeSql("CREATE TABLE KafkaTable (\n" +
                "  `Id` INT,\n" +
                "  `Name` STRING,\n" +
                "  `Operation` STRING,\n" +
                "   `opt`       STRING,\n" +
                "  `t` AS PROCTIME()\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'us_ac',\n" +
                "  'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092,hadoop104:9092',\n" +
                "  'properties.group.id' = 'cumulate',\n" +
                "  'scan.startup.mode' = 'earliest-offset',\n" +
                "  'format' = 'json'\n" +
                ")");

        tableEnv.executeSql("SELECT window_start, window_end, COUNT(Id) AS PV\n" +
                "  FROM TABLE(\n" +
                "    CUMULATE(TABLE KafkaTable, DESCRIPTOR(t), INTERVAL '2' SECOND, INTERVAL '2' SECOND))\n" +
                "  GROUP BY window_start, window_end").print();
    }
}

从图中我们可以看到us_ac中已经存在684条数据,在Flink中累计窗口

Cumulate window 就是累计窗口,简单来说,以上图里面时间轴上的一个区间为窗口步长。

第一个 window 统计的是一个区间的数据;

第二个 window 统计的是第一区间和第二个区间的数据;

第三个 window 统计的是第一区间,第二个区间和第三个区间的数据。

累积计算在业务场景中非常常见,如累积 UV 场景。累计PV场景等。

HopWindowsSQL

package com.hph.sql.window.processtime;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class HopWindowsSQL {

    //滑动窗口
    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);
        //{"Id":455006,"Name":"Nmae_455006","Operation":"favourite","timestamp":1629293081,"opt":0}


        //创建Kafka 数据源
        tableEnv.executeSql("CREATE TABLE KafkaTable (\n" +
                "  `Id` INT,\n" +
                "  `Name` STRING,\n" +
                "  `Operation` STRING,\n" +
                "   `opt`       STRING,\n" +
                "  `t` AS PROCTIME()\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'us_ac',\n" +
                "  'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092,hadoop104:9092',\n" +
                "  'properties.group.id' = 'hop_process_time',\n" +
                "  'scan.startup.mode' = 'earliest-offset',\n" +
                "  'format' = 'json'\n" +
                ")");


//        tableEnv.executeSql("SELECT HOP_START(t, INTERVAL '2' SECOND, INTERVAL '2' SECOND) AS window_start,"
//                + "HOP_END(t, INTERVAL '2' SECOND, INTERVAL '2' SECOND) AS window_end, COUNT(Id) FROM "
//                + "KafkaTable" + " GROUP BY HOP(t, INTERVAL '2' SECOND, INTERVAL '2' SECOND)").print();
//        tableEnv.executeSql("SELECT HOP_START(t, INTERVAL '2' SECOND, INTERVAL '2' SECOND) AS window_start,"
//                + "HOP_END(t, INTERVAL '2' SECOND, INTERVAL '2' SECOND) AS window_end, COUNT(Id) FROM "
//                + "KafkaTable" + " GROUP BY HOP(t, INTERVAL '2' SECOND, INTERVAL '2' SECOND)").print();


        tableEnv.executeSql("SELECT window_start, window_end, COUNT(Id)  \n" +
                "  FROM TABLE(\n" +
                "    HOP(TABLE KafkaTable, DESCRIPTOR(t), INTERVAL '1' MINUTES,INTERVAL '1' MINUTES))\n" +
                "  GROUP BY window_start, window_end").print();

    }
}

从结果我们可以看到在1分钟之内滑动窗口结果为684,包含了对累计历史累计数据的计算。

每隔1s发送到kafka1条数据,我们可以看到1分钟的滑动窗口计算结果复合我们的预期。

EventTime

EventTimeCumulateWindowSQL

package com.hph.sql.window.eventtime;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class EventTimeCumulateWindowSQL {
    public static void main(String[] args) {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);


        //{"Id":455006,"Name":"Nmae_455006","Operation":"favourite","timestamp":1629293081,"opt":0}
        //创建Kafka 数据源
        tableEnv.executeSql("CREATE TABLE KafkaTable (\n" +
                "  `Id` INT,\n" +
                "  `Name` STRING,\n" +
                "  `Operation` STRING,\n" +
                "  `t`           BIGINT,   \n" +
                "   `opt`       STRING,\n" +
                "   time_ltz AS TO_TIMESTAMP_LTZ(t, 0),\n" +
                "  WATERMARK FOR time_ltz AS time_ltz - INTERVAL '0' SECOND \n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'us_ac',\n" +
                "  'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092,hadoop104:9092',\n" +
                "  'properties.group.id' = 'cumulate',\n" +
                "  'scan.startup.mode' = 'earliest-offset',\n" +
                "  'format' = 'json'\n" +
                ")");

        tableEnv.executeSql("SELECT window_start, window_end, COUNT(Id) AS cnt\n" +
                "  FROM TABLE(\n" +
                "    CUMULATE(TABLE KafkaTable, DESCRIPTOR(time_ltz), INTERVAL '1' MINUTES, INTERVAL '1' MINUTES))\n" +
                "  GROUP BY window_start, window_end").print();
    }
}

由于是EventTime,当满足窗口时便会触发计算。

EventTimeHopWindowsSQL

package com.hph.sql.window.eventtime;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class EventTimeHopWindowsSQL {

    //滑动窗口
    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);
        //{"Id":455006,"Name":"Nmae_455006","Operation":"favourite","timestamp":1629293081,"opt":0}


        //创建Kafka 数据源
        tableEnv.executeSql("CREATE TABLE KafkaTable (\n" +
                "  `Id`          INT,\n" +
                "  `Name`        STRING,\n" +
                "  `Operation`   STRING,\n" +
                "   `opt`        STRING,\n" +
                "  `t`           BIGINT,   \n" +
                "   time_ltz AS TO_TIMESTAMP_LTZ(t, 0),\n" +
                "  WATERMARK FOR time_ltz AS time_ltz - INTERVAL '2' SECOND \n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'us_ac',\n" +
                "  'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092,hadoop104:9092',\n" +
                "  'properties.group.id' = 'hop_process_time',\n" +
                "  'scan.startup.mode' = 'earliest-offset',\n" +
                "  'format' = 'json'\n" +
                ")");


//        tableEnv.executeSql("SELECT HOP_START(t, INTERVAL '2' SECOND, INTERVAL '2' SECOND) AS window_start,"
//                + "HOP_END(t, INTERVAL '2' SECOND, INTERVAL '2' SECOND) AS window_end, COUNT(Id) FROM "
//                + "KafkaTable" + " GROUP BY HOP(t, INTERVAL '2' SECOND, INTERVAL '2' SECOND)").print();
//        tableEnv.executeSql("SELECT HOP_START(t, INTERVAL '2' SECOND, INTERVAL '2' SECOND) AS window_start,"
//                + "HOP_END(t, INTERVAL '2' SECOND, INTERVAL '2' SECOND) AS window_end, COUNT(Id) FROM "
//                + "KafkaTable" + " GROUP BY HOP(t, INTERVAL '2' SECOND, INTERVAL '2' SECOND)").print();


        tableEnv.executeSql("SELECT window_start, window_end, COUNT(Id)  \n" +
                "  FROM TABLE(\n" +
                "    HOP(TABLE KafkaTable, DESCRIPTOR(time_ltz), INTERVAL '1' MINUTES,INTERVAL '2' MINUTES))\n" +
                "  GROUP BY window_start, window_end").print();

    }
}

步长为1分钟,滑动窗口为2分钟时,计算结果如图所示。

EventTimeTumbleWindowsSQL

package com.hph.sql.window.eventtime;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class EventTimeTumbleWindowsSQL {
    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);


        //{"Id":455006,"Name":"Nmae_455006","Operation":"favourite","timestamp":1629293081,"opt":0}
        //创建Kafka 数据源
        tableEnv.executeSql("CREATE TABLE KafkaTable (\n" +
                "  `Id` INT,\n" +
                "  `Name` STRING,\n" +
                "  `Operation` STRING,\n" +
                "  `opt`       STRING,\n" +
                "  `t`         BIGINT,\n" +
                "   time_ltz AS TO_TIMESTAMP_LTZ(t, 0),\n" +
                "  WATERMARK FOR time_ltz AS time_ltz - INTERVAL '4' SECOND \n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'us_ac',\n" +
                "  'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092,hadoop104:9092',\n" +
                "  'properties.group.id' = 'tumble',\n" +
                "  'scan.startup.mode' = 'earliest-offset',\n" +
                "  'format' = 'json'\n" +
                ")");


        // GROUP BY TUMBLE(t, INTERVAL '4' SECOND) 相当于根据4s的时间来划分窗口
        // TUMBLE_START(t, INTERVAL '2' SECOND) 获取窗口的开始时间
        // TUMBLE_END(t, INTERVAL '2' SECOND) 获取窗口的结束时间
//        tableEnv.executeSql("SELECT TUMBLE_START(t, INTERVAL '2' SECOND) AS window_start," +
//                "TUMBLE_END(t, INTERVAL '2' SECOND) AS window_end, count(Id) as PV FROM   KafkaTable"
//                + " GROUP BY TUMBLE(t, INTERVAL '2' SECOND)").print();

        tableEnv.executeSql(" SELECT window_start, window_end,  count(Id) as PV\n" +
                "  FROM TABLE(\n" +
                "    TUMBLE(TABLE KafkaTable, DESCRIPTOR(time_ltz), INTERVAL '4' SECOND))\n" +
                "  GROUP BY window_start, window_end").print();

    }
}


文章作者: 清风笑丶
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 清风笑丶 !
 上一篇
Flink Metrics任务监控 Flink Metrics任务监控
监控指标Flink任务提交得集群后,需要对任务进行有效监控,对Flink得监控指标可以分为系统指标和用户指标。Flink 提供的 Metrics 可以在 Flink 内部收集一些指标,通过这些指标让开发人员更好地理解作业或集群的状态。由于集
2021-11-14
下一篇 
Flink-SQL Flink-SQL
Flink 为日期和时间提供了丰富的数据类型, 包括 DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ, INTERVAL YEAR TO MONTH, INTERVAL DAY TO SECOND ,对多种时间类型
2021-08-26
  目录