# Hudi

# 一、编译安装

组件 版本
Hadoop 3.2.4
Hive 3.1.3
Flink 1.15.3 scala-2.12
Spark 3.2.3,scala-2.12

# 1.1 安装Maven

# 解压放置
[tpxcer@hadoop1 ~]$ wget https://dlcdn.apache.org/maven/maven-3/3.8.6/binaries/apache-maven-3.8.6-bin.tar.gz
[root@hadoop1 ~]# mkdir /opt/module
[root@hadoop1 ~]# chmod 777 /opt/module/
[tpxcer@hadoop1 ~]$ tar -zxvf apache-maven-3.8.6-bin.tar.gz -C /opt/module/
[tpxcer@hadoop1 module]$ ln -s apache-maven-3.8.6/ maven

# 添加环境变量
sudo vim /etc/profile
#MAVEN_HOME
export MAVEN_HOME=/opt/module/maven
export PATH=$PATH:$MAVEN_HOME/bin
source /etc/profile
mvn -v

# maven修改为阿里镜像
[tpxcer@hadoop1 ~]$ vim /opt/module/maven/conf/settings.xml

<!-- 添加阿里云镜像-->
<mirror>
        <id>nexus-aliyun</id>
        <mirrorOf>central</mirrorOf>
        <name>Nexus aliyun</name>
        <url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 1.2 编译Hudi

# 2.2.1 上传源码包

[tpxcer@hadoop1 ~]$ wget https://dlcdn.apache.org/hudi/0.12.1/hudi-0.12.1.src.tgz
[tpxcer@hadoop1 ~]$ tar -zxvf hudi-0.12.1.src.tgz -C /opt/soft
1
2

# 2.2.2 修改pom文件

vim hudi/pom.xml

新增repository加速依赖下载

<repository>
        <id>nexus-aliyun</id>
        <name>nexus-aliyun</name>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
</repository>
1
2
3
4
5
6
7
8
9
10
11

修改依赖的组件版本

<hadoop.version>3.2.4</hadoop.version>
<hive.version>3.1.3</hive.version>
1
2

# 2.2.3 修改源码兼容hadoop3

Hudi默认依赖的hadoop2,要兼容hadoop3,除了修改版本,还需要修改如下代码:

[tpxcer@hadoop1 soft]$ vim /opt/soft/hudi/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java

// 第一百行
try (FSDataOutputStream outputStream = new FSDataOutputStream(baos)) {
改为
try (FSDataOutputStream outputStream = new FSDataOutputStream(baos)) {
1
2
3
4
5
6

# 2.2.4 解决spark模块依赖冲突

修改了Hive版本为3.1.2,其携带的jetty是0.9.3,hudi本身用的0.9.4,存在依赖冲突。

java.lang.NoSuchMethodError: org.apache.hudi.org.apache.jetty.server.session.SessionHandler.setHttpOnly(Z)V
1
  1. 修改hudi-spark-bundle的pom文件,排除低版本jetty,添加hudi指定版本的jetty: vim /opt/soft/hudi/packaging/hudi-spark-bundle/pom.xml







 
 
 
 
 
 
 
 
 
 
 
 















 
 
 
 
 
 
 
 
 
 
 
 








 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 








 
 
 
 
 
 
 
 
 
 


 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

<!-- Hive -->
    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-service</artifactId>
      <version>${hive.version}</version>
      <scope>${spark.bundle.hive.scope}</scope>
      <exclusions>
        <exclusion>
          <artifactId>guava</artifactId>
          <groupId>com.google.guava</groupId>
        </exclusion>
        <exclusion>
          <groupId>org.eclipse.jetty</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.pentaho</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-service-rpc</artifactId>
      <version>${hive.version}</version>
      <scope>${spark.bundle.hive.scope}</scope>
    </dependency>
    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>${hive.version}</version>
      <scope>${spark.bundle.hive.scope}</scope>
      <exclusions>
        <exclusion>
          <groupId>javax.servlet</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>javax.servlet.jsp</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.eclipse.jetty</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-metastore</artifactId>
      <version>${hive.version}</version>
      <scope>${spark.bundle.hive.scope}</scope>
      <exclusions>
        <exclusion>
          <groupId>javax.servlet</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.datanucleus</groupId>
          <artifactId>datanucleus-core</artifactId>
        </exclusion>
        <exclusion>
          <groupId>javax.servlet.jsp</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <artifactId>guava</artifactId>
          <groupId>com.google.guava</groupId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-common</artifactId>
      <version>${hive.version}</version>
      <scope>${spark.bundle.hive.scope}</scope>
      <exclusions>
        <exclusion>
          <groupId>org.eclipse.jetty.orbit</groupId>
          <artifactId>javax.servlet</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.eclipse.jetty</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>
</dependency>

    <!-- 增加hudi配置版本的jetty -->
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-server</artifactId>
      <version>${jetty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-util</artifactId>
      <version>${jetty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-webapp</artifactId>
      <version>${jetty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-http</artifactId>
      <version>${jetty.version}</version>
    </dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
  1. 修改hudi-utilities-bundle的pom文件,排除低版本jetty,添加hudi指定版本的jetty:

vim /opt/soft/hudi-0.12.1/packaging/hudi-utilities-bundle/pom.xml







 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 














 
 
 
 
 
 
 
 
 
 
 
 
 
 







 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 







 
 
 
 
 
 
 
 
 
 


 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

<!-- Hive -->
    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-service</artifactId>
      <version>${hive.version}</version>
      <scope>${utilities.bundle.hive.scope}</scope>
      <exclusions>
		<exclusion>
          <artifactId>servlet-api</artifactId>
          <groupId>javax.servlet</groupId>
        </exclusion>
        <exclusion>
          <artifactId>guava</artifactId>
          <groupId>com.google.guava</groupId>
        </exclusion>
        <exclusion>
          <groupId>org.eclipse.jetty</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.pentaho</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-service-rpc</artifactId>
      <version>${hive.version}</version>
      <scope>${utilities.bundle.hive.scope}</scope>
    </dependency>

    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>${hive.version}</version>
      <scope>${utilities.bundle.hive.scope}</scope>
      <exclusions>
        <exclusion>
          <groupId>javax.servlet</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>javax.servlet.jsp</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.eclipse.jetty</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-metastore</artifactId>
      <version>${hive.version}</version>
      <scope>${utilities.bundle.hive.scope}</scope>
      <exclusions>
        <exclusion>
          <groupId>javax.servlet</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.datanucleus</groupId>
          <artifactId>datanucleus-core</artifactId>
        </exclusion>
        <exclusion>
          <groupId>javax.servlet.jsp</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <artifactId>guava</artifactId>
          <groupId>com.google.guava</groupId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-common</artifactId>
      <version>${hive.version}</version>
      <scope>${utilities.bundle.hive.scope}</scope>
      <exclusions>
        <exclusion>
          <groupId>org.eclipse.jetty.orbit</groupId>
          <artifactId>javax.servlet</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.eclipse.jetty</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>
</dependency>

    <!-- 增加hudi配置版本的jetty -->
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-server</artifactId>
      <version>${jetty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-util</artifactId>
      <version>${jetty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-webapp</artifactId>
      <version>${jetty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.eclipse.jetty</groupId>
      <artifactId>jetty-http</artifactId>
      <version>${jetty.version}</version>
    </dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117

# 2.2.6 执行编译命令

mvn clean package -DskipTests -Dspark3.2 -Dflink1.15 -Dscala-2.12 -Dhadoop.version=3.2.4 -Pflink-bundle-shade-hive3
1

# 2.2.7 编译成功

[tpxcer@hadoop1 hudi]$ hudi-cli/hudi-cli.sh
===================================================================
*         ___                          ___                        *
*        /\__\          ___           /\  \           ___         *
*       / /  /         /\__\         /  \  \         /\  \        *
*      / /__/         / /  /        / /\ \  \        \ \  \       *
*     /  \  \ ___    / /  /        / /  \ \__\       /  \__\      *
*    / /\ \  /\__\  / /__/  ___   / /__/ \ |__|     / /\/__/      *
*    \/  \ \/ /  /  \ \  \ /\__\  \ \  \ / /  /  /\/ /  /         *
*         \  /  /    \ \  / /  /   \ \  / /  /   \  /__/          *
*         / /  /      \ \/ /  /     \ \/ /  /     \ \__\          *
*        / /  /        \  /  /       \  /  /       \/__/          *
*        \/__/          \/__/         \/__/    Apache Hudi CLI    *
*                                                                 *
===================================================================
1491 [main] INFO  org.apache.hudi.cli.Main [] - Starting Main v0.12.1 using Java 1.8.0_311 on hadoop1 with PID 9566 (/opt/soft/hudi-0.12.1/hudi-cli/target/hudi-cli-0.12.1.jar started by tpxcer in /opt/soft/hudi-0.12.1)
1503 [main] INFO  org.apache.hudi.cli.Main [] - No active profile set, falling back to 1 default profile: "default"
Table command getting loaded
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/soft/hudi-0.12.1/hudi-cli/target/lib/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/soft/hudi-0.12.1/hudi-cli/target/lib/slf4j-reload4j-1.7.35.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
2842 [main] WARN  org.jline [] - The Parser of class org.springframework.shell.jline.ExtendedDefaultParser does not support the CompletingParsedLine interface. Completion with escaped or quoted words won't work correctly.
2916 [main] INFO  org.apache.hudi.cli.Main [] - Started Main in 1.873 seconds (JVM running for 2.969)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

编译完成后,相关的包在packaging目录的各个模块中

[tpxcer@hadoop1 hudi]$ ls packaging/
hudi-aws-bundle           hudi-flink-bundle  hudi-hadoop-mr-bundle  hudi-integ-test-bundle     hudi-presto-bundle  hudi-timeline-server-bundle  hudi-utilities-bundle       README.md
hudi-datahub-sync-bundle  hudi-gcp-bundle    hudi-hive-sync-bundle  hudi-kafka-connect-bundle  hudi-spark-bundle   hudi-trino-bundle            hudi-utilities-slim-bundle
1
2
3

# 二、集成 Spark

Hudi官方Spark文档Spark Quick Start Guide (opens new window)

# 2.1 安装Spark

# 2.1.1 安装准备

[tpxcer@node01 ~]$ wget https://dlcdn.apache.org/spark/spark-3.2.3/spark-3.2.3-bin-hadoop3.2.tgz
[tpxcer@node01 ~]$ tar -zxvf spark-3.2.3-bin-hadoop3.2.tgz -C /opt/module/
[tpxcer@node01 ~]$ cd /opt/module/
[tpxcer@node01 module]$ ln -s spark-3.2.3-bin-hadoop3.2/ spark
1
2
3
4

# 2.1.2 配置环境变量

sudo vim /etc/profile.d/my_env.sh

#SPARK_HOME
export SPARK_HOME=/opt/module/spark
export PATH=$PATH:$SPARK_HOME/bin

source /etc/profile.d/my_env.sh
1
2
3
4
5
6
7

# 2.1.3 拷贝编译好的包到spark的jars目录

[tpxcer@node01 target]$ cp /opt/soft/hudi/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.1.jar /opt/module/spark/jars/
1

# 2.2 Spark Shell

# 2.2.1 启动 spark-shell

启动命令

#针对Spark 3.2
spark-shell \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
1
2
3
4
5

设置表名,基本路径和数据生成器

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
1
2
3
4
5
6
7
8
9
10

# 2.2.2 插入数据

新增数据,生成一些数据,将其加载到DataFrame中,然后将DataFrame写入Hudi表。

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)
1
2
3
4
5
6
7
8
9
10

查看生成的数据

cd /tmp/hudi_trips_cow/
ls
1
2

# 2.2.3 查询数据

  1. 转换成DF
val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
1
2
3
4
5

注意:该表有三级分区(区域/国家/城市),在0.9.0版本以前的hudi,在load中的路径需要按照分区目录拼接"",如:load(basePath + "////*"),当前版本不需要。

  1. 查询
scala> spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
+------------------+-------------------+-------------------+-------------+
|              fare|          begin_lon|          begin_lat|           ts|
+------------------+-------------------+-------------------+-------------+
| 64.27696295884016| 0.4923479652912024| 0.5731835407930634|1673128628106|
| 27.79478688582596| 0.6273212202489661|0.11488393157088261|1673106003304|
| 93.56018115236618|0.14285051259466197|0.21624150367601136|1673191167365|
| 33.92216483948643| 0.9694586417848392| 0.1856488085068272|1672667908175|
|34.158284716382845|0.46157858450465483| 0.4726905879569653|1673050451069|
|  43.4923811219014| 0.8779402295427752| 0.6100070562136587|1673184204562|
| 66.62084366450246|0.03844104444445928| 0.0750588760043035|1672972246078|
| 41.06290929046368| 0.8192868687714224|  0.651058505660742|1672713875654|
+------------------+-------------------+-------------------+-------------+
1
2
3
4
5
6
7
8
9
10
11
12
13
  1. 时间旅行查询
spark.read.
  format("hudi").
  option("as.of.instant", "20210728141108100").
  load(basePath)

spark.read.
  format("hudi").
  option("as.of.instant", "2021-07-28 14:11:08.200").
  load(basePath)

// 表示 "as.of.instant = 2021-07-28 00:00:00"
spark.read.
  format("hudi").
  option("as.of.instant", "2021-07-28").
  load(basePath)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 2.2.4 更新数据

类似于插入新数据,使用数据生成器生成新数据对历史数据进行更新。将数据加载到DataFrame中并将DataFrame写入Hudi表中。

val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)
1
2
3
4
5
6
7
8
9
10

注意:保存模式现在是Append。通常,除非是第一次创建表,否则请始终使用追加模式。现在再次查询数据将显示更新的行程数据。每个写操作都会生成一个用时间戳表示的新提交。查找以前提交中相同的_hoodie_record_keys在该表的_hoodie_commit_time、rider、driver字段中的变化。

查询更新后的数据,要重新加载该hudi表:

val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath)
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
1
2
3
4
5
6
7

# 2.2.5 增量查询

Hudi还提供了增量查询的方式,可以获取从给定提交时间戳以来更改的数据流。需要指定增量查询的beginTime,选择性指定endTime。如果我们希望在给定提交之后进行所有更改,则不需要指定endTime(这是常见的情况)

  1. 重新加载数据
spark.
  read.
  format("hudi").
  load(basePath).
  createOrReplaceTempView("hudi_trips_snapshot")
1
2
3
4
5
  1. 获取指定beginTime
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) 
1
2
  1. 创建增量查询的表
val tripsIncrementalDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
1
2
3
4
5
  1. 查询增量表
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
1

这将过滤出beginTime之后提交且fare>20的数据。 利用增量查询,我们能在批处理数据上创建streaming pipelines。

# 2.2.6 指定时间点查询

查询特定时间点的数据,可以将endTime指向特定时间,beginTime指向000(表示最早提交时间)

  1. 指定beginTime和endTime
val beginTime = "000" 
val endTime = commits(commits.length - 2) 
1
2
  1. 根据指定时间创建表
val tripsPointInTimeDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  option(END_INSTANTTIME_OPT_KEY, endTime).
  load(basePath)
tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
1
2
3
4
5
6
  1. 查询
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
1

# 2.2.7 删除数据

根据传入的HoodieKeys来删除(uuid + partitionpath),只有append模式,才支持删除功能。

  1. 获取总行数
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
1
  1. 取其中2条用来删除
val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
1
  1. 将待删除的2条数据构建DF
val deletes = dataGen.generateDeletes(ds.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
1
2
  1. 执行删除
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(OPERATION_OPT_KEY,"delete").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)
1
2
3
4
5
6
7
8
9
  1. 统计删除数据后的行数,验证删除是否成功
val roAfterDeleteViewDF = spark.
  read.
  format("hudi").
  load(basePath)
roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")

// 返回的总行数应该比原来少2行
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
1
2
3
4
5
6
7
8

# 2.2.8 覆盖数据

  1. 查看当前表的key
spark.
  read.format("hudi").
  load(basePath).
  select("uuid","partitionpath").
  sort("partitionpath","uuid").
  show(100, false)
1
2
3
4
5
6
  1. 生成一些新的行程数据
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.
  read.json(spark.sparkContext.parallelize(inserts, 2)).
  filter("partitionpath = 'americas/united_states/san_francisco'")
1
2
3
4
  1. 覆盖指定分区
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(OPERATION.key(),"insert_overwrite").
  option(PRECOMBINE_FIELD.key(), "ts").
  option(RECORDKEY_FIELD.key(), "uuid").
  option(PARTITIONPATH_FIELD.key(), "partitionpath").
  option(TBL_NAME.key(), tableName).
  mode(Append).
  save(basePath)
1
2
3
4
5
6
7
8
9
  1. 查询覆盖后的key,发生了变化
spark.
  read.format("hudi").
  load(basePath).
  select("uuid","partitionpath").
  sort("partitionpath","uuid").
  show(100, false)
1
2
3
4
5
6

# 2.3 Spark SQL方式

# 2.3.1 创建表

  1. 启动Hive的Metastore
nohup hive --service metastore & 
1
  1. 启动spark-sql

如果没有配置hive环境变量,手动拷贝hive-site.xml到spark的conf下 把mysql-connector-java-8.0.30.jar放到/opt/module/spark/jars

#针对Spark 3.2
spark-sql \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
1
2
3
4
5
  1. 建表参数
参数名 默认值 说明
primaryKey uuid 表的主键名,多个字段用逗号分隔。 同 hoodie.datasource.write.recordkey.field
preCombineField 表的预合并字段。 同 hoodie.datasource.write.precombine.field
type cow 创建的表类型: type = 'cow' type = 'mor' 同hoodie.datasource.write.table.type
  1. 创建非分区表
-- 创建一个cow表,默认primaryKey 'uuid',不提供preCombineField
create table hudi_cow_nonpcf_tbl (
  uuid int,
  name string,
  price double
) using hudi;

-- 创建一个mor非分区表
create table hudi_mor_tbl (
  id int,
  name string,
  price double,
  ts bigint
) using hudi
tblproperties (
  type = 'mor',
  primaryKey = 'id',
  preCombineField = 'ts'
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  1. 创建分区表
-- 创建一个cow分区外部表,指定primaryKey和preCombineField
create table hudi_cow_pt_tbl (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  1. 在已有的hudi表上创建新表

不需要指定模式和非分区列(如果存在)之外的任何属性,Hudi可以自动识别模式和配置。

-- 非分区表
create table hudi_existing_tbl0 using hudi
location 'file:///tmp/hudi/dataframe_hudi_nonpt_table';

-- 分区表
create table hudi_existing_tbl1 using hudi
partitioned by (dt, hh)
location 'file:///tmp/hudi/dataframe_hudi_pt_table';
1
2
3
4
5
6
7
8
  1. 通过CTAS (Create Table As Select)建表

为了提高向hudi表加载数据的性能,CTAS使用批量插入作为写操作。

-- 通过CTAS创建cow非分区表,不指定preCombineField 
create table hudi_ctas_cow_nonpcf_tbl
using hudi
tblproperties (primaryKey = 'id')
as
select 1 as id, 'a1' as name, 10 as price;
1
2
3
4
5
6
-- 通过CTAS创建cow分区表,指定preCombineField
create table hudi_ctas_cow_pt_tbl
using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
partitioned by (dt)
as
select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt;
1
2
3
4
5
6
7

通过CTAS从其他表加载数据

-- 创建内部表
create table parquet_mngd using parquet location 'file:///tmp/parquet_dataset/*.parquet';

-- 通过CTAS加载数据
create table hudi_ctas_cow_pt_tbl2 using hudi location 'file:/tmp/hudi/hudi_tbl/' options (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (datestr) as select * from parquet_mngd;
1
2
3
4
5
6
7
8
9
10

# 2.3.2 插入数据

默认情况下,如果提供了preCombineKey,则insert into的写操作类型为upsert,否则使用insert。

  1. 向非分区表插入数据
insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
insert into hudi_mor_tbl select 1, 'a1', 20, 1000;
1
2
  1. 向分区表动态分区插入数据
insert into hudi_cow_pt_tbl partition (dt, hh)
select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;
1
2
  1. 向分区表静态分区插入数据
insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') 
select 2, 'a2', 1000;
1
2
  1. 使用bulk_insert插入数据

hudi支持使用bulk_insert作为写操作的类型,只需要设置两个配置:hoodie.sql.bulk.insert.enablehoodie.sql.insert.mode

-- 向指定preCombineKey的表插入数据,则写操作为upsert
insert into hudi_mor_tbl select 1, 'a1_1', 20, 1001;
select id, name, price, ts from hudi_mor_tbl;
1   a1_1    20.0    1001

-- 向指定preCombineKey的表插入数据,指定写操作为bulk_insert 
set hoodie.sql.bulk.insert.enable=true;
set hoodie.sql.insert.mode=non-strict;

insert into hudi_mor_tbl select 1, 'a1_2', 20, 1002;
select id, name, price, ts from hudi_mor_tbl;
1   a1_1    20.0    1001
1   a1_2    20.0    1002
1
2
3
4
5
6
7
8
9
10
11
12
13

# 2.3.3 查询数据

  1. 查询
select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0
1
  1. 时间旅行查询

Hudi从0.9.0开始就支持时间旅行查询。Spark SQL方式要求Spark版本 3.2及以上。

-- 关闭前面开启的bulk_insert
set hoodie.sql.bulk.insert.enable=false;

create table hudi_cow_pt_tbl1 (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl1';


-- 插入一条id为1的数据
insert into hudi_cow_pt_tbl1 select 1, 'a0', 1000, '2021-12-09', '10';
select * from hudi_cow_pt_tbl1;

-- 修改id为1的数据
insert into hudi_cow_pt_tbl1 select 1, 'a1', 1001, '2021-12-09', '10';
select * from hudi_cow_pt_tbl1;

-- 基于第一次提交时间进行时间旅行
select * from hudi_cow_pt_tbl1 timestamp as of '20220307091628793' where id = 1;

-- 其他时间格式的时间旅行写法
select * from hudi_cow_pt_tbl1 timestamp as of '2022-03-07 09:16:28.100' where id = 1;

select * from hudi_cow_pt_tbl1 timestamp as of '2022-03-08' where id = 1;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# 2.3.4 更新数据

  1. update

更新操作需要指定preCombineField。

-- 语法
UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]

-- 执行更新
update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;

update hudi_cow_pt_tbl1 set name = 'a1_1', ts = 1001 where id = 1;

-- update using non-PK field
update hudi_cow_pt_tbl1 set ts = 1111 where name = 'a1_1';
1
2
3
4
5
6
7
8
9
10
  1. MergeInto
-- 语法
MERGE INTO tableIdentifier AS target_alias
USING (sub_query | tableIdentifier) AS source_alias
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ]  THEN <not_matched_action> ]

<merge_condition> =A equal bool condition 
<matched_action>  =
  DELETE  |
  UPDATE SET *  |
  UPDATE SET column1 = expression1 [, column2 = expression2 ...]
<not_matched_action>  =
  INSERT *  |
  INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])

-- 执行案例
-- 1、准备source表:非分区的hudi表,插入数据
create table merge_source (id int, name string, price double, ts bigint) using hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts');
insert into merge_source values (1, "old_a1", 22.22, 2900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);

merge into hudi_mor_tbl as target
using merge_source as source
on target.id = source.id
when matched then update set *
when not matched then insert *
;


-- 2、准备source表:分区的parquet表,插入数据
create table merge_source2 (id int, name string, flag string, dt string, hh string) using parquet;
insert into merge_source2 values (1, "new_a1", 'update', '2021-12-09', '10'), (2, "new_a2", 'delete', '2021-12-09', '11'), (3, "new_a3", 'insert', '2021-12-09', '12');

merge into hudi_cow_pt_tbl1 as target
using (
  select id, name, '2000' as ts, flag, dt, hh from merge_source2
) source
on target.id = source.id
when matched and flag != 'delete' then
 update set id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hh
when matched and flag = 'delete' then delete
when not matched then
 insert (id, name, ts, dt, hh) values(source.id, source.name, source.ts, source.dt, source.hh)
;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

# 2.3.5 删除数据

  1. 语法
DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]
1
  1. 案例
delete from hudi_cow_nonpcf_tbl where uuid = 1;

delete from hudi_mor_tbl where id % 2 = 0;

-- 使用非主键字段删除
delete from hudi_cow_pt_tbl1 where name = 'a1_1';
1
2
3
4
5
6

...........

# 3.1 环境准备

  1. 拷贝编译好的jar包到Flink的lib目录下
cp /opt/soft/hudi/packaging/hudi-flink-bundle/target/hudi-flink1.15-bundle-0.12.1.jar /opt/module/flink/lib/
1
  1. 拷贝guava包,解决依赖冲突
cp /opt/module/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar /opt/module/flink/lib/
1
  1. 配置Hadoop环境变量
sudo vim /etc/profile.d/my_env.sh

export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

source /etc/profile.d/my_env.sh
1
2
3
4
5
6

# 3.2 sql-client方式

  1. 修改flink-conf.yaml配置
vim /opt/module/flink/conf/flink-conf.yaml

classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 4

state.backend: rocksdb
execution.checkpointing.interval: 30000
state.checkpoints.dir: hdfs://node01:8020/ckps
state.backend.incremental: true
rest.bind-address: 0.0.0.0
1
2
3
4
5
6
7
8
9
10
  1. local模式

修改workers

vim /opt/module/flink/conf/workers

#表示:会在本地启动3个TaskManager的 local集群
localhost
localhost
localhost
1
2
3
4
5
6

启动Flink

[tpxcer@node01 module]$ /opt/module/flink/bin/start-cluster.sh
[tpxcer@node01 module]$ jps
3680 TaskManagerRunner
3284 TaskManagerRunner
2518 StandaloneSessionClusterEntrypoint
2894 TaskManagerRunner
1
2
3
4
5
6

查看webui http://node01:8081

启动Flink的sql-client

[tpxcer@node01 module]$ /opt/module/flink/bin/sql-client.sh embedded
1
  1. yarn-session模式

解决依赖问题

[tpxcer@node01 module]$ cp /opt/module/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.2.4.jar /opt/module/flink/lib/
1

启动yarn-session

/opt/module/flink/bin/yarn-session.sh -d
1

通过http://node01:8088/cluster http://node01:8088/proxy/application_1671429969753_0004/#/overview查看部署情况

启动sql-client

/opt/module/flink/bin/sql-client.sh embedded -s yarn-session
1

# 3.3 插入数据

set sql-client.execution.result-mode=tableau;

-- 创建hudi表
CREATE TABLE t1(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://node01:8020/tmp/hudi_flink/t1',
  'table.type' = 'MERGE_ON_READ'- 默认是COW
);
或如下写法
CREATE TABLE t1(
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20),
  PRIMARY KEY(uuid) NOT ENFORCED
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://node01:8020/tmp/hudi_flink/t1',
  'table.type' = 'MERGE_ON_READ'
);


-- 插入数据
INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

# 3.4 查询数据

select * from t1;
1

# 3.5 更新数据

insert into t1 values
  ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');
1
2

# 3.6 流式插入

  1. 创建测试表
CREATE TABLE sourceT (
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  `partition` varchar(20)
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1'
);

create table t2(
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  `partition` varchar(20)
)
with (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_flink/t2',
  'table.type' = 'MERGE_ON_READ'
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  1. 执行插入
insert into t2 select * from sourceT;
[INFO] Submitting SQL update statement to the cluster...
2023-01-31 11:39:14,715 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node01/192.168.50.223:8032
2023-01-31 11:39:14,715 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-01-31 11:39:14,722 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node03:33271 of application 'application_1675069792410_0001'.
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: dbf339339c2741082172d6a1df61e3d1
1
2
3
4
5
6
7
  1. 通过上文中的node03:33271查看运行状态

# 3.7 IDEA编码方式

除了用sql-client,还可以自己编写FlinkSQL程序,打包提交Flink作业。flink-hudi-demo

  1. 提交运行

将代码打成jar包,上传到目录myjars,执行提交命令:

bin/flink run -t yarn-per-job \
-c com.xxxx.hudi.flink.HudiDemo \
./myjars/flink-hudi-demo-1.0-SNAPSHOT.jar
1
2
3

# 3.8 类型映射

Flink SQL Type Hudi Type Avro logical type
CHAR / VARCHAR / STRING string
BOOLEAN boolean
BINARY / VARBINARY bytes
DECIMAL fixed decimal
TINYINT int
SMALLINT int
INT int
BIGINT long
FLOAT float
DOUBLE double
DATE int date
TIME int time-millis
TIMESTAMP long timestamp-millis
ARRAY array
MAP (key must be string/char/varchar type) map
MULTISET (element must be string/char/varchar type) map
ROW record
更新时间: 2/7/2023, 11:37:46 AM