Flink集成Hudi


公司业务需要需要调研hudi相关数据湖开源组件,下面简单记录下对应的踩坑记录

基础环境准备

组件 版本
Flink 1.17.0
Hudi 0.14.0
Hive 2.3.1
CDH 6.3.2
Kafka 2.2.1
Spark 3.2.1

添加 cdh 对应的pom依赖

<repository>
  <id>cloudera</id>
  <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
</repository>

修改Hadoop版本为3.0.0

运行编译命令

mvn clean install -DskipTests -DskipITs -Dcheckstyle.skip=true -Drat.skip=true -Dhadoop.version=3.0.0  -Pflink-bundle-shade-hive2

image-20230802103049946

修改对应的代码为

      zkServer.shutdown();

编译master分支成功

image-20230802165932622

安装flink1.17.1 stanlone模式

image-20230802172444029

需要添加对应的jar包依赖

wget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
rz hudi-flink1.17-bundle-0.14.0-SNAPSHOT.jar

启动sql-client 添加下面的作业

CREATE TABLE  source_orders (
    id bigint,
    order_number BIGINT,
    price        DECIMAL(5,2),
    buyer        string,
    order_time   TIMESTAMP_LTZ(3)
) WITH (
  'connector' = 'datagen',
  'rows-per-second'= '1'
);


set execution.checkpointing.interval=1000;
set state.checkpoints.dir=file:///tmp/checkpoints
set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;




CREATE TABLE  hudi_orders  (
    id bigint,
    order_number BIGINT,
    price        DECIMAL(5,2),
    buyer        string,
    order_time   TIMESTAMP_LTZ(3)
)
WITH (
'connector' = 'hudi'
, 'path' = 'hdfs://node0/tmp/hudi/hudi_orders'
, 'hoodie.datasource.write.recordkey.field' = 'id'    -- 主键
, 'write.precombine.field' = 'order_time'             -- 相同的键值时,取此字段最大值,默认ts字段
, 'write.tasks' = '1'
, 'compaction.tasks' = '1'
, 'write.rate.limit' = '2000'                          -- 限制每秒多少条
, 'table.type' = 'MERGE_ON_READ'                       -- 默认COPY_ON_WRITE
, 'compaction.async.enabled' = 'true'                  -- 在线压缩
, 'compaction.trigger.strategy' = 'num_commits'        -- 按次数压缩
, 'compaction.delta_commits' = '1'                     --  默认为5
, 'hive_sync.enable' = 'true'                          -- 启用hive同步
, 'hive_sync.mode' = 'hms'                             -- 启用hive hms同步,默认jdbc
, 'hive_sync.metastore.uris' = 'thrift://node0:9083'    -- required, metastore的端口
, 'hive_sync.jdbc_url' = 'jdbc:hive2://node0:10000'     -- required, hiveServer地址
, 'hive_sync.table' = 'hudi_table'                      -- required, hive 新建的表名
, 'hive_sync.db' = 'hudi'                               -- required, hive 新建的数据库名
, 'hive_sync.username' = 'hdfs'                         -- required, HMS 用户名
, 'hive_sync.password' = ''                             -- required, HMS Password
, 'hive_sync.skip_ro_suffix' = 'true'                   -- 去除ro后缀
);

insert into hudi_orders select * from source_orders;

image-20230803193908785

页面上已经出现任务了,但是在查询hive 的时候数据和表结构还是没有同步过去,需要进行下面的操作。

1.将 下面的包需要放到flink 的lib包下

image-20230804094839048

2.安装 YARN MapReduce 框架 JAR

image-20230804095706247

3.配置Hive的辅助jar包

image-20230804100313300

image-20230804100329916

对应的jar包如图所示

4.由于CDH-6.3.2对应的hive版本为 hive 2.1.1,修改hive依赖需要改的源码比较多,印次我们这边需要将hive版本从2.1.1 升级到2.3.1

解压对应版本的hive tar包

tar -zxvf apache-hive-2.3.1-bin.tar.gz
cp -r /opt/soft/apache-hive-2.3.1-bin/lib/  /opt/cloudera/parcels/CDH/lib/hive/lib231
cd /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hive/bin/
vim hive 

image-20230804110635097

修改为上图所示

备份元数据

mysqldump -u root   -p  metastore  > /home/hive_meatastore.sql

升级对应的hive,逐步升级

source /opt/soft/apache-hive-2.3.1-bin/scripts/metastore/upgrade/mysql/upgrade-2.1.0-to-2.2.0.mysql.sql;
source /opt/soft/apache-hive-2.3.1-bin/scripts/metastore/upgrade/mysql/upgrade-2.2.0-to-2.3.0.mysql.sql;

文章作者: 清风笑丶
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 清风笑丶 !
 上一篇
Nginx日志监控可视化 Nginx日志监控可视化
简介自己的博客是部署在腾讯云上的,除了部署博客和相关的学习软件之外,服务器并没有很好的利用起,于想使用grafanan对nginx的数据进行可视化展示以及监控服务器的基本情况。 监控服务器的相关配置使用dokcer-file对grafana
2023-11-15
下一篇 
Spark 集成Hudi Spark 集成Hudi
Hudi 支持Spark 版本 Hudi Supported Spark 3 version 0.12.x 3.3.x,3.2.x,3.1.x 0.11.x 3.2.x(default build, Spark bundle
2022-12-15
  目录