# Sqoop

# 把MySQL数据导入到HDFS

# 样本数据

样本数据库地址:http://dev.mysql.com/doc/employee/en/

wget https://launchpadlibrarian.net/24493586/employees_db-full-1.0.6.tar.bz2
tar -xjf employees_db-full-1.0.6.tar.bz2
cd employees_db
mysql -u root -p -t < employees.sql
1
2
3
4

# 安装MySQL 驱动

CDH 文档Installing the JDBC Drivers

测试一下:显示数据库和表格清单

$ sqoop list-databases --connect jdbc:mysql://<<mysql-server>>/employees --username airawat --password
$ sqoop list-tables --connect jdbc:mysql://<<mysql-server>>/employees --username airawat --password myPassword
1
2

# 实例

# 1.创建配置文件

每次执行都要打全参数太吃力,我们可以使用配置文件把通用配置信息写进去,如下:

$ vi SqoopImportOptions.txt 
#
#Options file for sqoop import
#

import
--connect
jdbc:mysql://airawat-mySqlServer-node/employees
--username
myUID
--password
myPwd

#
#All other commands should be specified in the command line
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 2.复制整表

m 表示map的数量

$ sqoop --options-file SqoopImportOptions.txt 

--table departments 

-m 1 

--target-dir departments
1
2
3
4
5
6
7

# 3.复制整表到数据库,并指定字段

$ sqoop --options-file SqoopImportOptions.txt \
--table  dept_emp \
--columns EMP_NO,DEPT_NO,FROM_DATE \
--as-textfile \
-m 1 \
--target-dir dept_emp
1
2
3
4
5
6

# 4.导入所有列单过滤行

$ sqoop --options-file SqoopImportOptions.txt \
--table employees  \
--where "emp_no > 499948" \
--as-textfile \
-m 1 \
--target-dir employees
1
2
3
4
5
6

# 5.自定义查询导入数据(包含where子句)

$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where  $CONDITIONS' \
-m 1 \
--target-dir  employeeFrfrmQrySmpl2
1
2
3
4

WARNING

注意,自定义查询必须跟 $CONDITIONS,如果要并行执行,则必须带--split-by 参数来拆分数据.具体可以看官方的说明:

$Conditions

If you want to import the results of a query in parallel, then each map task will need to execute a copy of the query, with results partitioned bybounding conditions inferred by Sqoop. Your query must include the token $CONDITIONS which each Sqoop process will replace with a unique condition expression. You must also select a splitting column with --split-by.

Controlling parallelism

Sqoop imports data in parallel from most database sources. You can specify the number of map tasks (parallel processes) to use to perform the import by using the -m or --num-mappers argument. Each of these arguments takes an integer value which corresponds to the degree of parallelism to employ. By default, four tasks are used. Some databases may see improved performance by increasing this value to 8 or 16. Do not increase the degree of parallelism greater than that available within your MapReduce cluster; tasks will run serially and will likely increase the amount of time required to perform the import. Likewise, do not increase the degree of parallism higher than that which your database can reasonably support. Connecting 100 concurrent clients to your database may increase the load on the database server to a point where performance suffers as a result. When performing parallel imports, Sqoop needs a criterion by which it can split the workload. Sqoop uses a splitting column to split the workload. By default, Sqoop will identify the primary key column (if present) in a table and use it as the splitting column. The low and high values for the splitting column are retrieved from the database, and the map tasks operate on evenly-sized components of the total range. For example, if you had a table with a primary key column of id whose minimum value was 0 and maximum value was 1000, and Sqoop was directed to use 4 tasks, Sqoop would run four processes which each execute SQL statements of the form SELECT * FROM sometable WHERE id &gt;= lo AND id &lt; hi, with (lo, hi) set to (0, 250), (250, 500), (500, 750), and (750, 1001) in the different tasks. If the actual values for the primary key are not uniformly distributed across its range, then this can result in unbalanced tasks. You should explicitly choose a different column with the --split-by argument. For example, --split-by employee_id. Note: Sqoop cannot currently split on multi-column indices. If your table has no index column, or has a multi-column key, then you must also manually choose a splitting column.

# 6.自定义查询,带where子句

$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where EMP_NO < 20000 AND $CONDITIONS' \
-m 1 \
--target-dir employeeFrfrmQry1
1
2
3
4

# 7.Direct 连接

默认情况下,导入使用的是JDBC ,不过一般来说数据库自带的工具传输性能更佳,当我们指定--direct这个参数的时候,Sqoop会尝试直接导入. 注意:当前,direct模式不支持大对象列.

$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where EMP_NO < 20000 AND $CONDITIONS' \
-m 1 \
--direct \
--target-dir  employeeUsingDirect
1
2
3
4
5

# 8.Split by

并行执行进行记录划分

$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
--split-by EMP_NO \
--direct \
--target-dir  SplitByExampleImport
1
2
3
4
5

# 9.指定边界(Boundary) 查询

还是并行相关的操作, 默认情况下sqoop会对split-by的列进行最大最小取值,然后进行划分. 不过偶尔不起作用,因此可以使用--boundary-query 进行指定.

$ sqoop --options-file SqoopImportOptions.txt   --query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS'  --boundary-query "SELECT MIN(EMP_NO), MAX(EMP_NO) from employees" --split-by EMP_NO  --direct --target-dir  BoundaryQuerySample
1

# 10.指定一次获取数量 Fetch size

$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
--fetch-size=50000 \
--split-by EMP_NO \
--direct \
--target-dir FetchSize
1
2
3
4
5
6

# 11.压缩 Compression

$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
-z \
--split-by EMP_NO \
--direct \
--target-dir CompressedSample
1
2
3
4
5
6

# 12.增量导入 Incremental imports

预备

The command:

$ sqoop --options-file SqoopImportOptions.txt 

--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where EMP_NO < 15000 AND $CONDITIONS' 

--split-by EMP_NO 

--direct 

--target-dir /user/airawat/sqoop-mysql/IncrementalImports
1
2
3
4
5
6
7
8
9

The number of records imported: 4999

$ hadoop fs -ls -R sqoop-mysql/IncrementalImports |grep part* | awk '{print $8}' |xargs hadoop fs -cat | wc -l
1

执行增量

三个参数:

  • check-column : 指定增量导入需要检测的列
  • incremental : 指定增量的方式有append 和 lastmodified
  • last-value: 指定检测列的前一次最大导入值

命令

$ sqoop --options-file SqoopImportOptions.txt 

--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' 

--check-column EMP_NO 

--incremental append 

--last-value 14999 

--split-by EMP_NO 

--direct 

--target-dir IncrementalImports
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 13.输出格式选项

$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
--fields-terminated-by , \
--escaped-by \\ \
--enclosed-by '\"' \
--split-by EMP_NO \
--direct \
--target-dir LineFormattingOptions
1
2
3
4
5
6
7
8

# 14.导入所有表

$ sqoop import-all-tables --options-file  SqoopImportAllTablesOptions.txt \
--direct \
--warehouse-dir sqoop-mysql/EmployeeDatabase
#其他参考
1
2
3
4

# 其他参考

参考官方文档 Sqoop User Guide (v1.4.4-cdh5.0.0)

更新时间: 1/22/2020, 9:32:11 PM