我的编程空间,编程开发者的网络收藏夹
学习永远不晚

sqoop脚本批量生成

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

sqoop脚本批量生成

  • 通过all_tab_columnss字典表生成hive的建表语句

create or replace view create_sql as
--通过all_tab_columnss字典表生成hive的建表语句
select owner,table_name, case

     when nm = 1 then
      'create table ' || owner || '.' || TABLE_NAME || ' (' ||
      COLUMN_NAME || ' ' || DATA_TYPE || ','
     when np = 1 then
      COLUMN_NAME || ' ' || DATA_TYPE || ') partitioned by (dt string);'
     else
      COLUMN_NAME || ' ' || DATA_TYPE || ','
   end create_sql

from (

    SELECT OWNER,
            TABLE_NAME,
            COLUMN_NAME,
            CASE
              WHEN DATA_TYPE IN ('VARCHAR2','LONG','CLOB','CHAR','NCHAR','BLOB','VARCHAR2') THEN
               'STRING'
              WHEN DATA_TYPE IN ('NUMBER') then
               'DECIMAL' || '(' || DATA_PRECISION || ',' || DATA_SCALE || ')'
              WHEN DATA_TYPE IN ('DATE', 'TIMESTAMP(6)') THEN
               'STRING'
              ELSE
               DATA_TYPE||'(' || DATA_PRECISION || ',' || DATA_SCALE || ')'
            END DATA_TYPE,
            COLUMN_ID,
            NM,
            NP
      FROM (select t.*,
                    row_number() over(partition by owner, TABLE_NAME order by COLUMN_ID) nm,
                    row_number() over(partition by owner, TABLE_NAME order by COLUMN_ID desc) np
             
               from all_tab_columns t
              ))

ORDER BY OWNER, TABLE_NAME, COLUMN_ID;


二、生成sqoop任务的配置表
select
owner||table_name job_name, --任务名称
'' table_type, --表类型dim 维表,fact_i 只增,Fact_iud 增删改
'' partition, --hive表是否是分区表 0 非,1 是
'BS' source_db, --来源业务系统,一般用源表用户名或库名
owner||'.'||table_name source_table, --源表名称
'' datatime_cloumn, --增量时间戳字段
'APPEND' incremental, --增量接入方式,append:增量接入,overwrite:全量接入
'' SPLIT_BY, --并行字段,选择重复数据较少的字段
'APP_NO' row_key, --主键字段
'fz_bs' hive_db, --指定接入的hive库
table_name hive_table, --指定接入的hive表,必须是无数据库名的存表名
'' check_column, --指定接入的源表字段
columns
from (
select owner,table_name,wm_concat(column_name)over(partition by owner,table_name order by column_id) columns,rn from (
select owner,table_name,column_name,column_id,row_number()over(partition by owner,table_name order by column_id desc) rn from all_tab_column
where owner||'_'||table_name in(
'BS_S_FAULT_RPT',
'BS_ARC_S_FAULT_RPT',
'HIS_ARC_S_FAULT_RPT',
'BS_S_FAULT_HANDLE',
'BS_ARC_S_FAULT_HANDLE',
'HIS_ARC_S_FAULT_HANDLE',
'BS_S_RETVISIT',
'BS_ARC_S_RETVISIT',
'HIS_ARC_S_RETVISIT',
'BS_S_95598_WKST_RELA',
'HIS_S_95598_WKST_RELA')
order by owner,table_name,column_id
))
where rn=1
order by owner,table_name;


  • 下面是自动生成sqoop配置任务的的python脚本

import json,os
def json_make(input_file='./tablelist'):

#input_file ='./sqoopjob/tablelist'
lines = open(input_file, "r",encoding="utf_8_sig").readlines()
[lines.remove(i) for i in lines if i in ['', '\n']]
lines = [line.strip() for line in lines]

# 获取键值
keys = lines[0].split('\t')
line_num = 1
total_lines = len(lines)
parsed_datas = []
while line_num < total_lines:
        values = lines[line_num].split("\t")
        parsed_datas.append(dict(zip(keys, values)))
        line_num = line_num + 1
json_str = json.dumps(parsed_datas, ensure_ascii=False, indent=4)
output_file = input_file+'.json'

# write to the file
f = open(output_file, "w", encoding="utf-8")
f.write(json_str)
f.close()
print('json格式转换结束!详见%s'%(output_file))

def create_job(tablelist):

if os.path.exists('sqoopjob'):
    os.system('rm -fr sqoopjob/*')
    os.system('mkdir  sqoopjob/hive-bin')
    os.system('mkdir  sqoopjob/impala-bin')
    os.system('mkdir  sqoopjob/partition-bin')
    os.system('mkdir  sqoopjob/job-bin')
else:
    os.mkdir('sqoopjob')
sqoopmeta='jdbc:hsqldb:hsql://localhost:16000/sqoop'
jdbc='jdbc:oracle:thin:@10.90.87.35:11521:bdccdb2 --username sqoopuser --password Bigdata_2016'
sjf=open('sqoopjob/createjob.sh', "w")
hcf = open('./sqoopjob/hql_corntab.cron', 'w')
imcf=open('./sqoopjob/imql_corntab.cron', 'w')
crontabf=open('./sqoopjob/crontab.cron', 'w')
df=open('./sqoopjob/deletejob.sh', 'w')
#scmd=open('./sqoopjob/sqoopcmd.sh', 'w')
#scmd.write('''yesdate=`date -d last-day +%Y-%m-%d`;todday=`date +%Y-%m-%d`''')
#cf=open('./sqoopjob/cron.sh', 'w')
kerboros='kinit -kt /keytab/sqoopdba.keytab sqoopdba \n'

with open(tablelist, 'r') as load_f:
    load_dict = json.load(load_f)
    for job in load_dict:
        if job['table_type'].lower()=='dim' and job['partition']=='0':
            #处理档案表
            hivetruncate='''hive -e"use {hive_db};truncate table {hive_table}_new"\n'''.format(hive_db=job['hive_db'],hive_table=job['hive_table'])
            createjob = '''sqoop  job --create {job_name} --meta-connect {sqoopmeta} \\
                    -- import --connect {jdbc} \\
                    --table {source_table} \\
                    --incremental append \\
                    --split-by {split_by} \\
                    --hive-import \\
                    --hive-drop-import-delims \\
                    --hive-overwrite \\
                    --hive-table {hive_db}.{hive_table}_NEW \\
                    --check-column  {check_column} \\
                    --last-value '2011-01-01 11:00:00' \\
                    -m 2;\n'''.format(
                    job_name=job['job_name'].lower(),
                    sqoopmeta=sqoopmeta,
                    jdbc=jdbc,
                    hive_db=job['hive_db'],
                    source_table=job['source_table'].upper(),
                    split_by=job['split_by'].upper(),
                    hive_table=job['hive_table'].upper(),
                    check_column=job['datatime_cloumn'].upper()
                    ).replace('    ','')

            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ','')

            deledrop = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --delete {job_name};\n'''.format(job_name=job['job_name'].lower() ).replace('    ','')

            hql = '''hive -e "use {hive_db};
                create table {tmp_table} as
                select {columns} from
                (select t.*,row_number()over(partition by {pattition_by} ORDER BY  t.{order_by} desc) as rn
                from (select * from {table_name}_new union all select * from {table_name}) t
                ) tm
                where rn =1;
                drop table {table_name};
                alter table {table_name}_TMP rename to {table_name};
                "\n'''.format(
                                    hive_db=job['hive_db'],
                                    tmp_table=job['source_table'].replace('.', '_')+'_tmp',
                                    columns=job['columns'],
                                    pattition_by=','.join(['t.' + cl for cl in job['pattition_by'].split(',')]),
                                    order_by=job['datatime_cloumn'],
                                    table_name=job['source_table'].replace('.', '_'),
                                    ).replace('    ','')
            imql = '''impala-shell -i bigdata-w-001 -q "invalidate metadata;use {hive_db};
                create table {tmp_table} as
                select {columns} from
                (select t.*,row_number()over(partition by {pattition_by} ORDER BY  t.{order_by} desc) as rn
                from (select * from {table_name}_new union all select * from {table_name}) t
                ) tm
                where rn =1;
                drop table {table_name};
                alter table {table_name}_TMP rename to {table_name};
                "\n'''.format(
                                    hive_db=job['hive_db'],
                                    tmp_table=job['source_table'].replace('.', '_') + '_tmp',
                                    columns=job['columns'],
                                    pattition_by=','.join(['t.' + cl for cl in job['pattition_by'].split(',')]),
                                    order_by=job['datatime_cloumn'],
                                    table_name=job['source_table'].replace('.', '_'),
                                ).replace('    ','')
            #print(sjf)
            sjf.write(hivetruncate+createjob)
            df.write(deledrop)
            open('./sqoopjob/hive-bin/' + job['job_name'].lower() + '_hql.sh', 'w').write(kerboros + execjob  + hql)
            open('./sqoopjob/impala-bin/' + job['job_name'].lower() + '_imql.sh', 'w').write(kerboros + execjob  + imql)
            hcf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}_hql.sh >>../logs/{job_name}_hql.out 2>&1\n'''.format(
                job_name=job['job_name'].lower()
                    )
                    )
            imcf.write(
            '''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}_imql.sh >>../logs/{job_name}_imql.out 2>&1\n'''.format(
                job_name=job['job_name'].lower()))
            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(
                job_name=job['job_name'].lower()).replace('    ','')
            open('./sqoopjob/exec_run.sh', 'a').write(execjob)
        elif job['table_type'].lower()=='fact_iud'and job['partition']=='0':
            # 处理增量事实表,有增删改查的事实表
            hivetruncate = '''hive -e"use {hive_db};truncate table {hive_table}"\n'''.format(
                hive_db=job['hive_db'], hive_table=job['hive_table'])
            createjob = '''sqoop  job --create {job_name} --meta-connect {sqoopmeta} \\
                    -- import --connect {jdbc} \\
                    --table {source_table} \\
                    --split-by {split_by} \\
                    --hive-import \\
                    --hive-drop-import-delims \\
                    --hive-table {hive_db}.{hive_table} \\
                    --delete-target-dir \\
                    -m 2;\n'''.format(
                                            job_name=job['job_name'].lower(),
                                            sqoopmeta=sqoopmeta,
                                            jdbc=jdbc,
                                            hive_db=job['hive_db'],
                                            source_table=job['source_table'].upper(),
                                            split_by=job['split_by'].upper(),
                                            hive_table=job['hive_table'].upper(),
                                        ).replace('    ','')
            sjf.write(hivetruncate+createjob)
            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ',
                                                                                                             '')
            open('./sqoopjob/job-bin/' + job['job_name'].lower() + '.sh', 'w').write(kerboros + execjob)
            open('./sqoopjob/exec_run.sh', 'a').write(execjob)
            crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}.sh >>../logs/{job_name}.out 2>&1\n'''.format(
                    job_name=job['job_name'].lower()))
        elif job['table_type'].lower()=='fact_i'and job['partition']=='0':
            # 处理在线事实表,只有写入事务的事实表
            hivetruncate = '''hive -e"use {hive_db};truncate table {hive_table}"\n'''.format(
                hive_db=job['hive_db'], hive_table=job['hive_table'])
            createjob = '''sqoop  job --create {job_name} --meta-connect {sqoopmeta} \\
                                    -- import --connect {jdbc} \\
                                    --table {source_table} \\
                                    --incremental append \\
                                    --split-by {split_by} \\
                                    --hive-import \\
                                    --hive-drop-import-delims \\
                                    --hive-table {hive_db}.{hive_table} \\
                                    --check-column  {check_column} \\
                                    --last-value '2011-01-01 11:00:00' \\
                                    -m 2;\n'''.format(
                                                                job_name=job['job_name'].lower(),
                                                                sqoopmeta=sqoopmeta,
                                                                jdbc=jdbc,
                                                                hive_db=job['hive_db'],
                                                                source_table=job['source_table'].upper(),
                                                                split_by=job['split_by'].upper(),
                                                                hive_table=job['hive_table'].upper(),
                                                                check_column=job['datatime_cloumn'].upper()
                                                            ).replace('    ', '')
            sjf.write(hivetruncate+createjob)
            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ', '')
            open('./sqoopjob/job-bin/' + job['job_name'].lower() + '.sh', 'w').write(kerboros + execjob)
            open('./sqoopjob/exec_run.sh', 'a').write(execjob)
            crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}.sh >>../logs/{job_name}.out 2>&1\n'''.format(
                    job_name=job['job_name'].lower()))


        elif job['partition']=='1' and job['table_type'] in ['fact_i','fact_iud']:
            #处理带有where条件查询数据
            shell_cmd='''if [ $# -gt 1 ]; then
                        yesdate=$1
                        today=$2
                        var_len1=`echo ${yesdate} |wc -L`
                        var_len2=`echo ${today} |wc -L`
                        if [[ ${var_len1} != 10 || ${var_len2} != 10 ]];then
                            echo 'vars is wrong'
                            echo 'var input like:2017-01-01 2017-01-21'
                            exit
                        fi
                    else
                        yesdate=`date -d "today -1 day " +%Y-%m-%d`
                        today=`date -d today +%Y-%m-%d`
                    fi

                echo "data:${yesdate}  ${today}"\n'''.replace('    ',' ')
            createjob = '''hive -e"use {hive_db};alter table {hive_table} drop if  exists  partition (dt='$yesdate');
                    alter table {hive_table} add partition(dt='$yesdate') "
                    sqoop  import --connect {jdbc} \\
                    --table {source_table} \\
                    --where "{where} >= date'$yesdate' and {where}<date'$today' " \\
                    --split-by {split_by} \\
                    --hive-import \\
                    --hive-partition-key dt  \\
                    --hive-partition-value  $yesdate \\
                    --hive-drop-import-delims \\
                    --hive-table {hive_db}.{hive_table} \\
                    --delete-target-dir \\
                    -m 2;\n'''.format(
                                            job_name=job['job_name'].lower(),
                                            hive_db=job['hive_db'],
                                            sqoopmeta=sqoopmeta,
                                            jdbc=jdbc,
                                            source_table=job['source_table'].upper(),
                                            where=job['datatime_cloumn'],
                                            split_by=job['split_by'].upper(),
                                            hive_table=job['hive_table'].upper(),
                                        ).replace('    ','')
            #scmd.write(createjob)
            open('./sqoopjob/partition-bin/'+job['job_name'].lower()+'.sh', 'w').write(shell_cmd+createjob)
            open('./sqoopjob/exec_run.sh', 'a').write(shell_cmd+createjob)
            crontabf.write(
                '''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}.sh >>../logs/{job_name}.out 2>&1\n'''.format(
                    job_name=job['job_name'].lower()))
        elif job['partition'] == '1' and job['table_type'] in ['dim']:
            # 处理带有where条件查询数据
            shell_cmd = '''if [ $# -gt 1 ]; then
                    yesdate=$1
                    today=$2
                    var_len1=`echo ${yesdate} |wc -L`
                    var_len2=`echo ${today} |wc -L`
                    if [[ ${var_len1} != 10 || ${var_len2} != 10 ]];then
                        echo 'vars is wrong'
                        echo 'var input like:2017-01-01 2017-01-21'
                        exit
                    fi
                else
                    yesdate=`date -d "today -1 day " +%Y-%m-%d`
                    today=`date -d today +%Y-%m-%d`
                fi

            echo "data:${yesdate}  ${today}"\n'''.replace('    ',' ')
            createjob = '''hive -e"use {hive_db};alter table {hive_table} drop if  exists  partition (dt='$yesdate');
                       alter table {hive_table} add partition(dt='$yesdate') "
                       sqoop  import --connect {jdbc} \\
                       --table {source_table} \\
                       --split-by {split_by} \\
                       --hive-import \\
                       --hive-partition-key dt  \\
                       --hive-partition-value  $yesdate \\
                       --hive-drop-import-delims \\
                       --hive-table {hive_db}.{hive_table} \\
                       --delete-target-dir \\
                       -m 2;\n'''.format(
                                                job_name=job['job_name'].lower(),
                                                hive_db=job['hive_db'],
                                                sqoopmeta=sqoopmeta,
                                                jdbc=jdbc,
                                                source_table=job['source_table'].upper(),
                                                where=job['datatime_cloumn'],
                                                split_by=job['split_by'].upper(),
                                                hive_table=job['hive_table'].upper(),
                                                ).replace('    ', '')
            #scmd.write(createjob)
            open('./sqoopjob/partition-bin/' + job['job_name'].lower() + '.sh', 'w').write(shell_cmd+createjob)
            open('./sqoopjob/exec_run.sh', 'a').write(shell_cmd+createjob)
            crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}.sh >>../logs/{job_name}.out 2>&1\n'''.format(
                    job_name=job['job_name'].lower()))
sjf.close()
hcf.close()
imcf.close()
df.close()
#cf.close()
print('脚本生成结束,详见./sqoopjob/*')

if __name__=='__main__':

#生成json文件
json_make(input_file='./tablelist')
#生成sqoop脚本
create_job(tablelist='tablelist.json')




import json,os
def json_make(input_file='./hbase_tablelist'):

#input_file ='./sqoopjob/tablelist'
lines = open(input_file, "r",encoding="utf_8_sig").readlines()
[lines.remove(i) for i in lines if i in ['', '\n']]
lines = [line.strip() for line in lines]

# 获取键值
keys = lines[0].split('\t')
line_num = 1
total_lines = len(lines)
parsed_datas = []
while line_num < total_lines:
        values = lines[line_num].split("\t")
        parsed_datas.append(dict(zip(keys, values)))
        line_num = line_num + 1
json_str = json.dumps(parsed_datas, ensure_ascii=False, indent=4)
output_file = input_file+'.json'

# write to the file
f = open(output_file, "w", encoding="utf-8")
f.write(json_str)
f.close()
print('json格式转换结束!详见%s'%(output_file))

def create_job(tablelist):

if os.path.exists('sqoopjob'):
    os.system('rm -fr sqoopjob/*')
    os.system('mkdir  sqoopjob/hbase-bin')
else:
    os.mkdir('sqoopjob')
sqoopmeta='jdbc:hsqldb:hsql://localhost:16000/sqoop'
jdbc='jdbc:oracle:thin:@10.90.87.35:11521:bdccdb2 --username sqoopuser --password Bigdata_2016'
sjf=open('sqoopjob/createjob.sh', "w")
crontabf=open('./sqoopjob/crontab.cron', 'w')
df=open('./sqoopjob/deletejob.sh', 'w')
kerboros='kinit -kt /keytab/sqoopdba.keytab sqoopdba \n'

with open(tablelist, 'r') as load_f:
    load_dict = json.load(load_f)
    for job in load_dict:
        if job['table_type'].lower()=='dim' and job['partition']=='0':
            #处理档案表
            createjob = '''sqoop  job --create {job_name} --meta-connect {sqoopmeta} \\
                    -- import --connect {jdbc} \\
                    --table {source_table} \\
                    --incremental append \\
                    --split-by {split_by} \\
                    --hbase-create-table \\
                    --hbase-table {hive_table} \\
                    --check-column  {check_column} \\
                    --last-value '2011-01-01 11:00:00' \\
                    --hbase-row-key {row_key} \\
                    --column-family cf \\
                    -m 2;\n'''.format(
                    job_name=job['job_name'].lower(),
                    sqoopmeta=sqoopmeta,
                    jdbc=jdbc,
                    source_table=job['source_table'].upper(),
                    split_by=job['split_by'].upper(),
                    hive_table=job['hive_table'].upper(),
                    check_column=job['datatime_cloumn'].upper(),
                    row_key=job['row_key']
                    ).replace('    ','')

            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ','')

            deledrop = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --delete {job_name};\n'''.format(job_name=job['job_name'].lower() ).replace('    ','')

            createtable=''' CREATE EXTERNAL TABLE {hive_table}({columns})

STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:LAST_ANALYZED,cf:SAMPLE_SIZE,cf:CHARACTER_SET_NAME")
TBLPROPERTIES("hbase.table.name" = "{hive_table}", "hbase.mapred.output.outputtable" = "{hive_table}");
'''.format(hive_table=job['hive_table'],

       columns=job['colums'].split(',').join())
            #print(sjf)
            sjf.write(createjob)

            df.write(deledrop)
            open('./sqoopjob/hbase-bin/' + job['job_name'].lower() + '.sh', 'w').write(execjob)
            crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}_hql.sh >>../logs/{job_name}_hql.out 2>&1\n'''.format(
                job_name=job['job_name'].lower()))
            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ','')
            open('./sqoopjob/exec_run.sh', 'a').write(execjob)
        else :pass

sjf.close()
df.close()
#cf.close()
print('脚本生成结束,详见./sqoopjob/*')

if __name__=='__main__':

#生成json文件
json_make(input_file='./hbase_tablelist')
#生成sqoop脚本
create_job(tablelist='./hbase_tablelist.json')


import json,os
def json_make(input_file='./hbase_tablelist'):

#input_file ='./sqoopjob/tablelist'
lines = open(input_file, "r",encoding="utf_8_sig").readlines()
[lines.remove(i) for i in lines if i in ['', '\n']]
lines = [line.strip() for line in lines]

# 获取键值
keys = lines[0].split('\t')
line_num = 1
total_lines = len(lines)
parsed_datas = []
while line_num < total_lines:
        values = lines[line_num].split("\t")
        parsed_datas.append(dict(zip(keys, values)))
        line_num = line_num + 1
json_str = json.dumps(parsed_datas, ensure_ascii=False, indent=4)
output_file = input_file+'.json'

# write to the file
f = open(output_file, "w", encoding="utf-8")
f.write(json_str)
f.close()
print('json格式转换结束!详见%s'%(output_file))

def create_job(tablelist):

if os.path.exists('sqoopjob'):
    os.system('rm -fr sqoopjob/*')
    os.system('mkdir  sqoopjob/hbase-bin')
else:
    os.mkdir('sqoopjob')
sqoopmeta='jdbc:hsqldb:hsql://localhost:16000/sqoop'
jdbc='jdbc:oracle:thin:@10.90.87.35:11521:bdccdb2 --username sqoopuser --password Bigdata_2016'
sjf=open('sqoopjob/createjob.sh', "w")
crontabf=open('./sqoopjob/crontab.cron', 'w')
df=open('./sqoopjob/deletejob.sh', 'w')
createtablef=open('./sqoopjob/createtable.sh', 'w')
kerboros='kinit -kt /keytab/sqoopdba.keytab sqoopdba \n'

with open(tablelist, 'r') as load_f:
    load_dict = json.load(load_f)
    for job in load_dict:
        if job['table_type'].lower()=='dim' and job['partition']=='0':
            #处理档案表
            createjob = '''sqoop  job --create {job_name} --meta-connect {sqoopmeta} \\
                    -- import --connect {jdbc} \\
                    --table {source_table} \\
                    --incremental append \\
                    --split-by {split_by} \\
                    --hbase-create-table \\
                    --hbase-table {hive_table} \\
                    --check-column  {check_column} \\
                    --last-value '2011-01-01 11:00:00' \\
                    --hbase-row-key {row_key} \\
                    --column-family cf \\
                    -m 2;\n'''.format(
                    job_name=job['job_name'].lower(),
                    sqoopmeta=sqoopmeta,
                    jdbc=jdbc,
                    source_table=job['source_table'].upper(),
                    split_by=job['split_by'].upper(),
                    hive_table=job['hive_table'].upper(),
                    check_column=job['datatime_cloumn'].upper(),
                    row_key=job['row_key']
                    ).replace('    ','')

            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ','')

            deledrop = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --delete {job_name};\n'''.format(job_name=job['job_name'].lower() ).replace('    ','')

            createtable='''hive -e "use{hive_db};CREATE EXTERNAL TABLE {hive_table}_external(key string,{columns_hive})
                        STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
                        WITH SERDEPROPERTIES ('hbase.columns.mapping' = '{columns_hbase}')
                        TBLPROPERTIES('hbase.table.name' = '{hive_table}',
                        'hbase.mapred.output.outputtable' = '{hive_table}')";\n '''.format(
                                    hive_table=job['hive_table'],
                                    hive_db=job['hive_db'],
                                    columns_hive=' string,'.join(job['columns'].split(','))+' string',
                                    columns_hbase=':key,cf:'+',cf:'.join(job['columns'].split(','))
                                    ).replace('    ','')
            sjf.write(createjob)
            createtablef.write(createtable)
            df.write(deledrop)
            open('./sqoopjob/hbase-bin/' + job['job_name'].lower() + '.sh', 'w').write(execjob)
            crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}_hql.sh >>../logs/{job_name}_hql.out 2>&1\n'''.format(
                job_name=job['job_name'].lower()))
            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ','')
            open('./sqoopjob/exec_run.sh', 'a').write(execjob)
        else :pass

sjf.close()
df.close()
#cf.close()
print('脚本生成结束,详见./sqoopjob/*')

if __name__=='__main__':

#生成json文件
json_make(input_file='./hbase_tablelist')
#生成sqoop脚本
create_job(tablelist='./hbase_tablelist.json')

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

sqoop脚本批量生成

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

sqoop脚本批量生成

通过all_tab_columnss字典表生成hive的建表语句create or replace view create_sql as--通过all_tab_columnss字典表生成hive的建表语句select owner,table
2023-01-31

如何自动生成批量执行SQL脚本的批处理

这篇文章主要介绍如何自动生成批量执行SQL脚本的批处理,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!场景: DBA那边给我导出了所有的存储、函数等等对象的创建脚本,有上千个文件. 现在需要将这些对象创建脚本导入到另外
2023-06-08

Shell脚本实现批量生成nagios配置文件

如果管理的站点和服务器较多的情况下,每次修改配置文件都相当痛苦。因而想到了用shell脚本来批量生成配置文件和配置数据。下面这个脚本是为了批量生成nagios监控配置文件的一个shell脚本程序。其原理是事先定义一个shell脚本模板,然后
2022-06-04

Shell脚本怎么实现批量生成nagios配置文件

这篇文章主要讲解了“Shell脚本怎么实现批量生成nagios配置文件”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Shell脚本怎么实现批量生成nagios配置文件”吧!如果管理的站点和服
2023-06-09

批量管理python脚本

新出炉的脚本, 有错的地方还望指出,谢谢。 #!/usr/bin/env python # -*- coding: utf-8 -*- # #  Syscloud Operation platform.py #
2023-01-31

python脚本生成html

#-*- coding: utf-8 -*-from pyh import *CONST_LIST = [ ['1','AAA','100','100','100','300'],               ['2','BBB','99'
2023-01-31

有哪些BAT批处理一键生成APK包脚本

这篇文章主要介绍“有哪些BAT批处理一键生成APK包脚本”,在日常操作中,相信很多人在有哪些BAT批处理一键生成APK包脚本问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”有哪些BAT批处理一键生成APK包脚本
2023-06-08

wpsvba如何批量生成word

这篇文章主要介绍“wpsvba如何批量生成word”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“wpsvba如何批量生成word”文章能帮助大家解决问题。wpsvba批量生成word的方法:1、将以
2023-07-04

python3 拼接并批量生成sql

#coding=utf-8from openpyxl import load_workbook#读取excel的数据def read_excel(): #打开一个workbook wb = load_workbook(filen
2023-01-31

Java如何批量执行datax脚本

Java批量执行DataX脚本的方法包括:使用DataX命令行工具通过JavaAPI加载和执行脚本借助第三方工具(如ApacheAirflow、Luigi)以命令行工具批量执行为例,通过datax命令加上多个脚本文件,可并行执行多个脚本。优化脚本性能、单元测试和监控报警等措施有助于提升执行效率和可靠性。
Java如何批量执行datax脚本
2024-04-02

python 批量压缩图片的脚本

目录简介需要 Needs用法 Usage代码实现效果另外一种图片压缩实现方式简介 用Python批量压缩图片,把文件夹或图片直接拖入即可 需要 Needs Python 3 Pillow (用pip install pillow来安装即可)
2022-06-02

shell如何批量curl接口脚本

这篇文章主要介绍了shell如何批量curl接口脚本,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。最近,刚接一需求,如下: DBA会将一些服务规则的数据导出,然后一条条手动去
2023-06-09

Linux怎么批量执行redis脚本

要批量执行redis脚本,你可以使用redis-cli工具和Shell脚本来实现。以下是一个示例的Shell脚本,用于批量执行redis脚本:#!/bin/bash# 读取redis命令脚本文件列表script_files=("scr
2023-10-27

MySQL8 批量修改字符集脚本

目录1. 批量修改库字符集2. 批量修改表字符集3. 批量修改列字符集从低版本迁移到mysql 8后,可能由于字符集问题出现 Illegal mix of collations (utf8mb4_general_ci,IMPLICIT) a
2023-03-24

MySQL8批量修改字符集脚本

本文主要介绍了MySQL8批量修改字符集脚本,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
2023-03-24

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录