我们如何在SQOOP中自动化增量导入



我们如何在SQoop中自动化增量导入?

在增量导入中,我们需要给--last-value从上一个值开始导入,但我的工作是经常从RDBMS导入,我不想手动给最后一个值,有什么方法可以自动化这个过程吗?

@Durga Viswanath Gadiraju答案的另一种方法。

如果要将数据导入配置单元表,可以从配置单元表中查询上次更新的值,并将该值传递给sqoop导入查询。您可以使用shell脚本或oozie操作来实现这一点。

外壳脚本:

lastupdatedvalue=`hive -e 'select last_value from table` #tweak the selection query based on the logic.
sqoop import --connect jdbc:mysql://localhost:3306/ydb --table yloc --username root -P --incremental append --last-value ${lastupdatedvalue}

Oozie方法:

  1. 基于检索的逻辑的选择查询的配置单元操作上次更新的值
  2. Sqoop操作,用于从先前配置单元操作的捕获输出中进行增量加载

PFB a sudo工作流:

<workflow-app name="sqoop-to-hive" xmlns="uri:oozie:workflow:0.4">
<start to="hiveact"/>
<action name="hiveact">
    <hive xmlns="uri:oozie:hive-action:0.2">
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <configuration>
            <property>
                <name>mapred.job.queue.name</name>
                <value>${queueName}</value>
            </property>
        </configuration>
        <script>script.sql</script>
<capture-output/>
    </hive>    
    <ok to="sqoopact"/>
    <error to="kill"/>
<action name="sqoopact">
    <sqoop xmlns="uri:oozie:sqoop-action:0.2">
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <command>import --connect jdbc:mysql://localhost:3306/ydb --table yloc --username root -P --incremental append --last-value ${wf:actionData('hiveact')}</command>
     </sqoop>
    <ok to="end"/>
    <error to="kill"/>
</action>
<kill name="kill">
    <message>Action failed</message>
</kill>
<end name="end"/>

希望这能有所帮助。

您可以利用内置的Sqoop元存储

您可以使用以下内容创建一个简单的增量导入作业命令:

sqoop作业\--创建<>\--\进口\--连接<>\--用户名<>\--密码<>\--表格<>\--增量追加\--检查-<>\--最后值0

并以--exec参数开始:

sqoop job --exec <<Job Name>>

Sqoop将自动将上次导入的值序列化回每次成功增量作业后的元存储

这可以通过sqoop作业轻松实现

1.创建一个sqoop作业(在"导入"之前有一个空格)

sqoop job     --create JobName6 
           -- import  
                --connect jdbc:mysql://localhost:3306/retail_db 
                --username=username 
                --password-file /user/sqoop/password 
                --table departments 
                --target-dir /user/hive/warehouse/test.db/departments 
                --table departments 
                --split-by department_id 
                --check-column department_id 
                --incremental append 
               --last-value 0;

2.运行sqoop作业sqoop作业--exec JobName6;检查HDFS 中位置的值

3.在源表(mysql)中插入一些数据插入部门值(9,‘新数据1’),(10,‘新的数据2’);

2.再次运行sqoop作业sqoop作业--exec JobName6;再次检查HDFS中该位置的值。

类似于Hive Import

sqoop job     --create JobName1 
           -- import  
                --connect jdbc:mysql://localhost:3306/retail_db 
                --username=username
                --password-file /user/sqoop/password 
                --table departments 
                --hive-import 
                --hive-table department 
                --split-by department_id 
                --check-column department_id 
                --incremental append 
               --last-value 0; 

获得它的一种方法:

在数据库中创建日志表,并按照以下开发增量导入

Query the log table using sqoop eval command with the last value from last run
Run the sqoop import
Update the log table with the latest valueusing sqoop eval command

您需要自动处理sqoop evalsqoop importsqoop eval。您可以使用sqoop eval向任何具有连接的数据库提交任何有效查询。因此,您可以在导入之前运行select query以获取上次运行的最后一个值,并运行update query以使用当前运行的最后值更新日志表。

相关内容

  • 没有找到相关文章

最新更新