我的编程空间,编程开发者的网络收藏夹
学习永远不晚

python怎么结合shell自动创建kafka的连接器

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

python怎么结合shell自动创建kafka的连接器

这篇“python怎么结合shell自动创建kafka的连接器”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“python怎么结合shell自动创建kafka的连接器”文章吧。

环境

cat /etc/redhat-release CentOS Linux release 7.5.1804 (Core) [root@localhost ~]# uname -aLinux localhost.localdomain 3.10.0-862.el7.x86_64 #1 SMP Fri Apr 20 16:44:24 UTC 2018 x86_64 x86_64 x86_64 GNU/Linuxpython -VPython 2.7.5

安装连接oracle的python包

pip install cx_Oracle==7.3

获取oracle表信息

cat query_oracle.py #!/usr/bin/env pythonimport cx_Oracleimport sysimport osimport csvimport tracebackfile = open("oracle.txt", 'w').close()user = "test"passwd = "test"listener = '10.0.2.15:1521/orcl'conn = cx_Oracle.connect(user, passwd, listener)cursor = conn.cursor()sql = "select table_name from user_tables" cursor.execute(sql)LIST1=[]while True:    row = cursor.fetchone()    if row == None:        break    for table in row:        #print table        LIST1.append(table)LIST2=[]for i in LIST1:    sql3 = "select COLUMN_NAME,DATA_TYPE,DATA_PRECISION,DATA_SCALE from cols WHERE TABLE_name=upper('%s')" %i    cursor.execute(sql3)    cursor.execute(sql3)    row3 = cursor.fetchall()    for data in row3:        #LIST2.append(i)        LIST2.extend(list(data))    LIST2.append(i)    f=open('oracle.txt','a+')    print >> f,LIST2    LIST2=[]#f=open('test.txt','a+')#select table_name,column_name,DATA_TYPE from cols WHERE TABLE_name=upper('student'); #select column_name,DATA_TYPE from cols WHERE TABLE_name=upper('student');

去掉多余部分

cat auto.sh #!/bin/bash#python query_oracle.py |tr "," ' '|tr "'" ' '|tr "[" " "|tr "]" " "#>oracle.txt>oracle_tables.txtcat oracle.txt |tr "[],'" " "|sed "s#[ ][ ]*# #g"|sed 's/^[ \t]*//g' >> oracle_tables.txt
cat oracle_tables.txt SNO NUMBER 19 0 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT DATE_DATE SNO2 NUMBER 19 0 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT2 INPUT_TIMESNO3 NUMBER 19 2 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT3 DATA_DATE

shell 脚本处理表信息文件

cat connect.sh #!/bin/bash#获取临时文件的行数FILE_NUM=$(cat oracle_tables.txt |egrep -v '#|^$'|wc -l)#清空自动创建连接器的脚本>create-connect.sh#循环临时文件每一行for i in `seq $FILE_NUM`do     FILE_LINE=$(sed -n ${i}p oracle_tables.txt)    TABLE_NAME=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk '{print $(NF-1)}')    COL_NUM=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk -F "[ ]" '{print NF}')    REAL_COL_NUM=`expr $COL_NUM - 2`    #清空临时文件    >${TABLE_NAME}.txt    >${TABLE_NAME}.sql    #循环临时文件每行列名所在的列    for j in `seq 1 4 $REAL_COL_NUM`    do        k=`expr $j + 1`        m=`expr $j + 2`        n=`expr $j + 3`        COL_NAME=$(echo $FILE_LINE|cut -d " " -f${j})        COL_DATA_TYPE=$(echo $FILE_LINE|cut -d " " -f${k})        COL_DATA_PRECISION=$(echo $FILE_LINE|cut -d " " -f${m})        COL_DATA_SCALE=$(echo $FILE_LINE|cut -d " " -f${n})        #判断列的数据类型是否是NUMBER        if [ "$COL_DATA_TYPE" = "NUMBER" ]        then        #循环拼接SQL查询中的CAST(* AS *) AS *部分,追加到临时文件中            echo "CAST($COL_NAME AS $COL_DATA_TYPE($COL_DATA_PRECISION,$COL_DATA_SCALE)) AS $COL_NAME" >> ${TABLE_NAME}.txt        else        #循环拼接SQL查询中的列名部分,追加到临时文件中            echo "$COL_NAME" >> ${TABLE_NAME}.txt        fi    done    #拼接完整的SQL语句,追加到临时文件中    echo "select $(cat ${TABLE_NAME}.txt |tr "\n" ","|sed -e 's/,$/\n/') from $TABLE_NAME where $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)>=trunc(sysdate-2) and $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)<trunc(sysdate-1)" >> ${TABLE_NAME}.sql#循环追加每个表对应的连接器到自动创建连接器的脚本中cat >> create-connect.sh << EOFcurl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{"name": "jdbc_source_$TABLE_NAME","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:orcl","connection.user": "{{ ORACLE_USER }}","connection.password": "{{ ORACLE_PASSWD }}","topic.prefix": "YC_$TABLE_NAME","mode": "{{ CONNECT_MODE }}","query": "$(cat ${TABLE_NAME}.sql)"}}' >/dev/null 2>&1EOFdone

说明:脚本中{{ 变量名 }}部分的内容是获取ansible中的变量,这个脚本是和ansible结合使用的。

增强版处理表信息脚本

#!/bin/bash#获取临时文件的行数FILE_NUM=$(cat oracle_time_tables.txt |egrep -v '#|^$'|wc -l)#清空创建连接器的脚本并追加echos函数> create-jdbc-connect.shcat >> create-jdbc-connect.sh << EOF#!/bin/bashechos(){case \$1 inred)    echo -e "\033[31m \$2 \033[0m";;green)  echo -e "\033[32m \$2 \033[0m";;yellow) echo -e "\033[33m \$2 \033[0m";;blue)   echo -e "\033[34m \$2 \033[0m";;purple) echo -e "\033[35m \$2 \033[0m";;*)      echo "\$2";;esac}EOF> create-jdbc-connect-time.shcat >> create-jdbc-connect-time.sh << EOF#!/bin/bashechos(){case \$1 inred)    echo -e "\033[31m \$2 \033[0m";;green)  echo -e "\033[32m \$2 \033[0m";;yellow) echo -e "\033[33m \$2 \033[0m";;blue)   echo -e "\033[34m \$2 \033[0m";;purple) echo -e "\033[35m \$2 \033[0m";;*)      echo "\$2";;esac}EOF#创建表相关文件目录mkdir -p ./TABLE_TIME#循环临时文件每一行for i in `seq $FILE_NUM`do     FILE_LINE=$(sed -n ${i}p oracle_time_tables.txt)    TABLE_NAME=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk '{print $(NF)}')    COL_NUM=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk -F "[ ]" '{print NF}')    REAL_COL_NUM=`expr $COL_NUM - 2`    #清空临时文件    >./TABLE_TIME/${TABLE_NAME}_time.txt    >./TABLE_TIME/${TABLE_NAME}_time.sql    >./TABLE_TIME/${TABLE_NAME}.sql    #循环临时文件每行列名所在的列    for j in `seq 1 4 $REAL_COL_NUM`    do        k=`expr $j + 1`        m=`expr $j + 2`        n=`expr $j + 3`        COL_NAME=$(echo $FILE_LINE|cut -d " " -f${j})        COL_DATA_TYPE=$(echo $FILE_LINE|cut -d " " -f${k})        COL_DATA_PRECISION=$(echo $FILE_LINE|cut -d " " -f${m})        COL_DATA_SCALE=$(echo $FILE_LINE|cut -d " " -f${n})        #判断列的数据类型是否是NUMBER        if [ "$COL_DATA_TYPE" = "NUMBER" ]        then        #循环拼接SQL查询中的CAST(* AS *) AS *部分,追加到临时文件中            echo "CAST($COL_NAME AS $COL_DATA_TYPE($COL_DATA_PRECISION,$COL_DATA_SCALE)) AS $COL_NAME" >> ./TABLE_TIME/${TABLE_NAME}_time.txt        else        #循环拼接SQL查询中的列名部分,追加到临时文件中            echo "$COL_NAME" >> ./TABLE_TIME/${TABLE_NAME}_time.txt        fi        #判断是否存在hosts中定义的时间列,如果有就追加该列名进一个临时文件中        TIME_COL=({{ TABLE_TIME_COL }})        for TIME in ${TIME_COL[@]}        do            if [ "$COL_NAME" = "$TIME" ]            then                echo "$COL_NAME" > ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt            fi        done    done    #拼接完整的SQL语句,追加到临时文件中    if [ -f "./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt" ]    then    #echo "select $(cat ./TABLE_TIME/${TABLE_NAME}.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME where $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)>=trunc(sysdate-2) and $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)<trunc(sysdate-1)" >> ./TABLE_TIME/${TABLE_NAME}_time.sql        echo "select $(cat ./TABLE_TIME/${TABLE_NAME}_time.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME where $(cat ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt)>=trunc(sysdate-2) and $(cat ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt)<trunc(sysdate-1)" >> ./TABLE_TIME/${TABLE_NAME}_time.sql    else        echo "select $(cat ./TABLE_TIME/${TABLE_NAME}_time.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME" >> ./TABLE_TIME/${TABLE_NAME}.sql    fi#循环追加每个表对应的连接器到自动创建连接器的脚本中if [ -f "./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt" ]thencat >> create-jdbc-connect-time.sh << EOF#创建表 $TABLE_NAME 连接器的命令如下curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{"name": "jdbc_time_$TABLE_NAME","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:{{ ORACLE_SERVER_NAME }}","connection.user": "{{ ORACLE_USER }}","connection.password": "{{ ORACLE_PASSWD }}","topic.prefix": "YC_${TABLE_NAME}_INSERT","poll.interval.ms": "86400000","mode": "{{ CONNECT_MODE }}","numeric.mapping": "best_fit","query": "$(cat ./TABLE_TIME/${TABLE_NAME}_time.sql)"}}' >/dev/null 2>&1#判断连接器是否创建成功if [ \$? -eq 0 ]then    echos green "\$(date +"%F %H:%M:%S") 创建jdbc_time_${TABLE_NAME} 连接器成功"else    echos red "\$(date +"%F %H:%M:%S") 创建jdbc_time_${TABLE_NAME} 连接器失败"fiEOFelsecat >> create-jdbc-connect.sh << EOF#创建表 $TABLE_NAME 连接器的命令如下curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{"name": "jdbc_$TABLE_NAME","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:{{ ORACLE_SERVER_NAME }}","connection.user": "{{ ORACLE_USER }}","connection.password": "{{ ORACLE_PASSWD }}","topic.prefix": "YC_${TABLE_NAME}_INSERT","poll.interval.ms": "86400000","mode": "{{ CONNECT_MODE }}","numeric.mapping": "best_fit","query": "$(cat ./TABLE_TIME/${TABLE_NAME}.sql)"}}' >/dev/null 2>&1#判断连接器是否创建成功if [ \$? -eq 0 ]then    echos green "\$(date +"%F %H:%M:%S") 创建jdbc_${TABLE_NAME} 连接器成功"else    echos red "\$(date +"%F %H:%M:%S") 创建jdbc_${TABLE_NAME} 连接器失败"fiEOFfidone

以上就是关于“python怎么结合shell自动创建kafka的连接器”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注编程网行业资讯频道。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

python怎么结合shell自动创建kafka的连接器

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

python怎么结合shell自动创建kafka的连接器

这篇“python怎么结合shell自动创建kafka的连接器”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“python怎么
2023-06-30

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录