Flink-SQL


Flink 为日期和时间提供了丰富的数据类型, 包括 DATETIMETIMESTAMPTIMESTAMP_LTZINTERVAL YEAR TO MONTHINTERVAL DAY TO SECOND ,对多种时间类型和时区的支持使得跨时区的数据处理变得非常容易。

TIMESTAMP

  • TIMESTAMP(p)TIMESTAMP(p) WITHOUT TIME ZONE 的简写, 精度 p 支持的范围是0-9, 默认是6。
  • TIMESTAMP 用于描述年, 月, 日, 小时, 分钟, 秒 和 小数秒对应的时间戳。
package com.hph.sql.demo;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class PrintTableDemo {
    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);
        //测试生成数据
        tableEnv.executeSql("CREATE TABLE datagen (\n" +
                "    f_sequence INT,\n" +
                "    f_random INT,\n" +
                "    f_random_str STRING,\n" +
                "    ts as  localtimestamp\n" +
                "  ) WITH (\n" +
                "    'connector' = 'datagen',\n" +
                "    -- optional options --\n" +
                "    'rows-per-second'='5',\n" +
                "    'fields.f_sequence.kind'='sequence',\n" +
                "    'fields.f_sequence.start'='1',\n" +
                "    'fields.f_sequence.end'='9',\n" +
                "    'fields.f_random.min'='1',\n" +
                "    'fields.f_random.max'='9',\n" +
                "    'fields.f_random_str.length'='20'\n" +
                "  )");

        //测试写入数据
        tableEnv.executeSql("  CREATE TABLE print_table (\n" +
                "    f_sequence INT,\n" +
                "    f_random INT,\n" +
                "    f_random_str STRING,\n" +
                "    ts   TIMESTAMP\n" +
                "    ) WITH (\n" +
                "    'connector' = 'print'\n" +
                "  )");

        tableEnv.executeSql("desc   print_table").print();
        //测试写入print表中
        tableEnv.executeSql("  INSERT INTO print_table select f_sequence,f_random,f_random_str,ts from datagen");

    }
}

TIMESTAMP_LTZ 类型

  • TIMESTAMP_LTZ(p)TIMESTAMP(p) WITH LOCAL TIME ZONE 的简写, 精度 p 支持的范围是0-9, 默认是6。
  • TIMESTAMP_LTZ 用于描述时间线上的绝对时间点, 使用 long 保存从 epoch 至今的毫秒数, 使用int保存毫秒中的纳秒数。 epoch 时间是从 java 的标准 epoch 时间 1970-01-01T00:00:00Z 开始计算。 在计算和可视化时, 每个 TIMESTAMP_LTZ 类型的数据都是使用的 session (会话)中配置的时区。
  • TIMESTAMP_LTZ 没有字符串表达形式因此无法通过字符串来指定, 可以通过一个 long 类型的 epoch 时间来转化(例如: 通过 Java 来产生一个 long 类型的 epoch 时间 System.currentTimeMillis())
SET 'sql-client.execution.result-mode' = 'tableau';
CREATE VIEW MyView1 AS SELECT LOCALTIME, LOCALTIMESTAMP, CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_ROW_TIMESTAMP(), NOW(), PROCTIME();
DESC MyView1;

SET 'table.local-time-zone' = 'UTC';
SELECT * FROM MyView1;

SET 'table.local-time-zone' = 'Asia/Shanghai';
SELECT * FROM MyView1;

目前仅1.13.2版本之后支持,且TIMESTAMP_LTZ 转为String类型时设置会失效,这一版本很好的修复了之前 PROCTIME() 函数返回的类型是 TIMESTAMP , 返回值是UTC时区下的 TIMESTAMP的问题 ,不需要在对时间进行处理。

SQL Demo

了解了上述的时间相关概念我们可以学习如何使用Flink SQL 完成DataStream API的操作

PrintDemo

package com.hph.sql.demo;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class PrintTableDemo {
    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);

        //测试生成数据
        tableEnv.executeSql("CREATE TABLE datagen (\n" +
                "    f_sequence INT,\n" +
                "    f_random INT,\n" +
                "    f_random_str STRING,\n" +
                "    ts as  localtimestamp\n" +
                "  ) WITH (\n" +
                "    'connector' = 'datagen',\n" +
                "    -- optional options --\n" +
                "    'rows-per-second'='5',\n" +
                "    'fields.f_sequence.kind'='sequence',\n" +
                "    'fields.f_sequence.start'='1',\n" +
                "    'fields.f_sequence.end'='9',\n" +
                "    'fields.f_random.min'='1',\n" +
                "    'fields.f_random.max'='9',\n" +
                "    'fields.f_random_str.length'='20'\n" +
                "  )");

        //测试写入数据
        tableEnv.executeSql("  CREATE TABLE print_table (\n" +
                "    f_sequence INT,\n" +
                "    f_random INT,\n" +
                "    f_random_str STRING,\n" +
                "    ts   TIMESTAMP\n" +
                "    ) WITH (\n" +
                "    'connector' = 'print'\n" +
                "  )");

        tableEnv.executeSql("desc   print_table").print();
        //测试写入表中
        tableEnv.executeSql("  INSERT INTO print_table select f_sequence,f_random,f_random_str,ts from datagen");
    }
}

随机生成相关的数据,并将数据写入到print_table中

KafkaTable2FileDemo

package com.hph.sql.demo;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class KafkaTable2FileDemo {
    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);

        //创建Kafka 数据源
        tableEnv.executeSql("CREATE TABLE KafkaTable (\n" +
                "  `sepalLength` DOUBLE,\n" +
                "  `sepalWidth` DOUBLE,\n" +
                "  `petalLength` DOUBLE,\n" +
                "  `petalWidth`  DOUBLE,\n" +
                "   `type`       STRING,\n" +
                "  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'iris',\n" +
                "  'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092,hadoop104:9092',\n" +
                "  'properties.group.id' = 'test',\n" +
                "  'scan.startup.mode' = 'earliest-offset',\n" +
                "  'format' = 'json'\n" +
                ")");
        //写入文件中
        tableEnv.executeSql("CREATE TABLE MyUserTable (\n" +
                "  `sepalLength` DOUBLE,\n" +
                "  `sepalWidth` DOUBLE,\n" +
                "  `petalLength` DOUBLE,\n" +
                "  `petalWidth`  DOUBLE,\n" +
                "   `type`       STRING\n" +
                ") PARTITIONED BY (type) WITH (\n" +
                "  'connector' = 'filesystem',        \n" +
                "  'path' = '/tmp/iris_data',  \n" +
                "  'format' = 'json',                                    \n" +
                "  'partition.default-name' = 'type'\n" +
                ")");
        tableEnv.executeSql("INSERT INTO MyUserTable SELECT  sepalLength,sepalWidth,petalLength,petalWidth,type from KafkaTable");
    }
}

文件已经写入

KafkaTable2MysqlDemo

package com.hph.sql.demo;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class KafkaTable2MysqlDemo {
    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);
        //创建Kafka 数据源
        tableEnv.executeSql("CREATE TABLE KafkaTable (\n" +
                "  `sepalLength` DOUBLE,\n" +
                "  `sepalWidth` DOUBLE,\n" +
                "  `petalLength` DOUBLE,\n" +
                "  `petalWidth`  DOUBLE,\n" +
                "   `type`       STRING,\n" +
                "  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'iris',\n" +
                "  'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092,hadoop104:9092',\n" +
                "  'properties.group.id' = 'test',\n" +
                "  'scan.startup.mode' = 'earliest-offset',\n" +
                "  'format' = 'json'\n" +
                ")");
        //创建Mysql SinkTable
        tableEnv.executeSql("CREATE TABLE MysqlTable (\n" +
                "  `sepalLength` DOUBLE,\n" +
                "  `sepalWidth`  DOUBLE,\n" +
                "  `petalLength` DOUBLE,\n" +
                "  `petalWidth`  DOUBLE,\n" +
                "   `type`       STRING,\n" +
                "  PRIMARY KEY (type) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'jdbc',\n" +
                "  'url' = 'jdbc:mysql://hadoop102:3306/kafkasink',\n" +
                "  'driver'='com.mysql.jdbc.Driver' ,\n" +
                "  'username' = 'root',\n" +
                "  'table-name'='MysqlTable' ,\n" +
                "  'password' = '123456',\n" +
                "  'sink.buffer-flush.max-rows' = '1'\n" +
                ")");
        //写入Mysql表中
        tableEnv.executeSql("INSERT INTO MysqlTable SELECT  sepalLength,sepalWidth,petalLength,petalWidth,type from KafkaTable");

    }
}

从kafka的监控看到iris中的数据为300条,这里为了效果设置为1条就写入Mysql。

数据已经写入

KafkaTable2HbaseDemo

package com.hph.sql.demo;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class KafkaTable2HbaseDemo {
    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);

        //创建Kafka 数据源
        tableEnv.executeSql("CREATE TABLE KafkaTable (\n" +
                "  `sepalLength` DOUBLE,\n" +
                "  `sepalWidth` DOUBLE,\n" +
                "  `petalLength` DOUBLE,\n" +
                "  `petalWidth`  DOUBLE,\n" +
                "   `type`       STRING,\n" +
                "  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'iris',\n" +
                "  'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092,hadoop104:9092',\n" +
                "  'properties.group.id' = 'test',\n" +
                "  'scan.startup.mode' = 'earliest-offset',\n" +
                "  'format' = 'json'\n" +
                ")");


        //注册Hbase表
        tableEnv.executeSql("CREATE TABLE hTable (\n" +
                " type VARCHAR,\n" +
                " f1 ROW<sepalLength DOUBLE, sepalWidth DOUBLE,petalLength DOUBLE, petalWidth DOUBLE>\n" +
                ") WITH (\n" +
                " 'connector' = 'hbase-2.2',\n" +
                " 'table-name' = 'kafka2hbase',\n" +
                " 'sink.buffer-flush.max-rows'='1' , \n" +
                " 'zookeeper.quorum' = 'hadoop102:2181,hadoop103:2181,hadoop104:2181'\n" +
                ")\n");
        tableEnv.executeSql("INSERT INTO hTable SELECT  type,ROW(sepalLength,sepalWidth,petalLength,petalWidth) from KafkaTable");

    }
}

这里之所以保留3条是因为数据本身的ROW为 Iris数据集的类型 改数据集中只有3个类型 因此实际保留的数据只有3条。

KafkaTable2EsDemo

package com.hph.sql.demo;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class KafkaTable2EsDemo {
    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);

        //创建Kafka 数据源
        tableEnv.executeSql("CREATE TABLE KafkaTable (\n" +
                "  `sepalLength` DOUBLE,\n" +
                "  `sepalWidth` DOUBLE,\n" +
                "  `petalLength` DOUBLE,\n" +
                "  `petalWidth`  DOUBLE,\n" +
                "   `type`       STRING,\n" +
                "  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'iris',\n" +
                "  'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092,hadoop104:9092',\n" +
                "  'properties.group.id' = 'test',\n" +
                "  'scan.startup.mode' = 'earliest-offset',\n" +
                "  'format' = 'json'\n" +
                ")");
        //写入ES表中
        tableEnv.executeSql("CREATE TABLE esTable (\n" +
                "  `sepalLength` DOUBLE,\n" +
                "  `sepalWidth` DOUBLE,\n" +
                "  `petalLength` DOUBLE,\n" +
                "  `petalWidth`  DOUBLE,\n" +
                "   `type`       STRING\n" +
                ") WITH (\n" +
                "    'connector' = 'elasticsearch-7',\n" +
                "    'hosts' = 'http://192.168.2.123:9200',\n" +
                "    'index' = 'iris')");
        tableEnv.executeSql("INSERT INTO esTable SELECT  sepalLength,sepalWidth,petalLength,petalWidth,type from KafkaTable");

    }
}


文章作者: 清风笑丶
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 清风笑丶 !
 上一篇
Flink-SQL Window Flink-SQL Window
关于窗口这里不在展开论述,之前已经写到过了,本文重点关注SQL如何实现Flink Window的效果。本次使用的kafka数据JSON格式如下: "{\"Id\":9990,\"Name\":\"Nmae_9990\",\"Ope
2021-09-04
下一篇 
Flink-Table Flink-Table
简介FlinkTable API和DataStream相似,有相同的编程模型,需要构建相应的TableEnviroment环境,才能够使用相应API。 开发环境使用Table需要引入相关的依赖这里以Flink 1.13.2为例,所需部分依赖
2021-08-25
  目录