我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Python读取Hive数据库代码怎么写

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Python读取Hive数据库代码怎么写

今天小编给大家分享一下Python读取Hive数据库代码怎么写的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。

实际业务读取hive数据库的代码

import loggingimport pandas as pdfrom impala.dbapi import connectimport sqlalchemyfrom sqlalchemy.orm import sessionmakerimport osimport timeimport osimport datetimefrom dateutil.relativedelta import relativedeltafrom typing import Dict, Listimport loggingimport threadingimport pandas as pdimport pickleclass HiveHelper(object):    def __init__(        self,        host='10.2.32.22',        port=21051,        database='ur_ai_dw',        auth_mechanism='LDAP',        user='urbi',        password='Ur#730xd',        logger:logging.Logger=None        ):        self.host = host        self.port = port        self.database = database        self.auth_mechanism = auth_mechanism        self.user = user        self.password = password        self.logger = logger        self.impala_conn = None        self.conn = None        self.cursor = None        self.engine = None        self.session = None    def create_table_code(self, file_name):        '''创建表类代码'''        os.system(f'sqlacodegen {self.connection_str} > {file_name}')        return self.conn    def get_conn(self):        '''创建连接或获取连接'''        if self.conn is None:            engine = self.get_engine()            self.conn = engine.connect()        return self.conn    def get_impala_conn(self):        '''创建连接或获取连接'''        if self.impala_conn is None:            self.impala_conn = connect(                host=self.host,                port=self.port,                database=self.database,                auth_mechanism=self.auth_mechanism,                user=self.user,                password=self.password                )        return self.impala_conn    def get_engine(self):        '''创建连接或获取连接'''        if self.engine is None:            self.engine = sqlalchemy.create_engine('impala://', creator=self.get_impala_conn)        return self.engine    def get_cursor(self):        '''创建连接或获取连接'''        if self.cursor is None:            self.cursor = self.conn.cursor()        return self.cursor    def get_session(self) -> sessionmaker:        '''创建连接或获取连接'''        if self.session is None:            engine = self.get_engine()            Session = sessionmaker(bind=engine)            self.session = Session()        return self.session    def close_conn(self):        '''关闭连接'''        if self.conn is not None:            self.conn.close()            self.conn = None        self.dispose_engine()        self.close_impala_conn()    def close_impala_conn(self):        '''关闭impala连接'''        if self.impala_conn is not None:            self.impala_conn.close()            self.impala_conn = None    def close_session(self):        '''关闭连接'''        if self.session is not None:            self.session.close()            self.session = None        self.dispose_engine()    def dispose_engine(self):        '''释放engine'''        if self.engine is not None:            # self.engine.dispose(close=False)            self.engine.dispose()            self.engine = None    def close_cursor(self):        '''关闭cursor'''        if self.cursor is not None:            self.cursor.close()            self.cursor = None    def get_data(self, sql, auto_close=True) -> pd.DataFrame:        '''查询数据'''        conn = self.get_conn()        data = None        try:            # 异常重试3次            for i in range(3):                try:                    data = pd.read_sql(sql, conn)                    break                except Exception as ex:                    if i == 2:                        raise ex # 往外抛出异常                    time.sleep(60) # 一分钟后重试        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            if auto_close:                self.close_conn()        return datapassclass VarsHelper():    def __init__(self, save_dir, auto_save=True):        self.save_dir = save_dir        self.auto_save = auto_save        self.values = {}        if not os.path.exists(os.path.dirname(self.save_dir)):            os.makedirs(os.path.dirname(self.save_dir))        if os.path.exists(self.save_dir):            with open(self.save_dir, 'rb') as f:                self.values = pickle.load(f)                f.close()    def set_value(self, key, value):        self.values[key] = value        if self.auto_save:            self.save_file()    def get_value(self, key):        return self.values[key]    def has_key(self, key):        return key in self.values.keys()    def save_file(self):        with open(self.save_dir, 'wb') as f:            pickle.dump(self.values, f)            f.close()passclass GlobalShareArgs():    args = {        "debug": False    }    def get_args():        return GlobalShareArgs.args    def set_args(args):        GlobalShareArgs.args = args    def set_args_value(key, value):        GlobalShareArgs.args[key] = value    def get_args_value(key, default_value=None):        return GlobalShareArgs.args.get(key, default_value)    def contain_key(key):        return key in GlobalShareArgs.args.keys()    def update(args):        GlobalShareArgs.args.update(args)passclass ShareArgs():    args = {        "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 标签目录        "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚类导出标签目录        "common_datas_dir":"./hjx/data", # 共用数据目录。ur_bi_dw的公共        "only_predict": False, # 只识别,不训练        "delete_model": True, # 先删除模型,仅在训练时使用        "export_excel": False, # 导出excel        "classes": 12, # 聚类数        "batch_size": 16,        "hidden_size": 32,        "max_nrof_epochs": 100,        "learning_rate": 0.0005,        "loss_type": "categorical_crossentropy",        "avg_model_num": 10,        "steps_per_epoch": 4.0, # 4.0        "lr_callback_patience": 4,         "lr_callback_cooldown": 1,        "early_stopping_callback_patience": 6,        "get_data": True,    }    def get_args():        return ShareArgs.args    def set_args(args):        ShareArgs.args = args    def set_args_value(key, value):        ShareArgs.args[key] = value    def get_args_value(key, default_value=None):        return ShareArgs.args.get(key, default_value)    def contain_key(key):        return key in ShareArgs.args.keys()    def update(args):        ShareArgs.args.update(args)passclass UrBiGetDatasBase():    # 线程锁列表,同保存路径共用锁    lock_dict:Dict[str, threading.Lock] = {}    # 时间列表,用于判断是否超时    time_dict:Dict[str, datetime.datetime] = {}    # 用于记录是否需要更新超时时间    get_data_timeout_dict:Dict[str, bool] = {}    def __init__(        self,        host='10.2.32.22',        port=21051,        database='ur_ai_dw',        auth_mechanism='LDAP',        user='urbi',        password='Ur#730xd',        save_dir=None,        logger:logging.Logger=None,        ):        self.save_dir = save_dir        self.logger = logger        self.db_helper = HiveHelper(            host=host,            port=port,            database=database,            auth_mechanism=auth_mechanism,            user=user,            password=password,            logger=logger            )        # 创建子目录        if self.save_dir is not None and not os.path.exists(self.save_dir):            os.makedirs(self.save_dir)        self.vars_helper = None        if GlobalShareArgs.get_args_value('debug'):            self.vars_helper = VarsHelper('./hjx/data/vars/UrBiGetDatas')     def close(self):        '''关闭连接'''        self.db_helper.close_conn()    def get_last_time(self, key_name) -> bool:        '''获取是否超时'''        # 转静态路径,确保唯一性        key_name = os.path.abspath(key_name)        if self.vars_helper is not None and self.vars_helper.has_key('UrBiGetDatasBase.time_list'):            UrBiGetDatasBase.time_dict = self.vars_helper.get_value('UrBiGetDatasBase.time_list')        timeout = 12 # 12小时        if GlobalShareArgs.get_args_value('debug'):            timeout = 24 # 24小时        get_data_timeout = False        if key_name not in UrBiGetDatasBase.time_dict.keys() or (datetime.datetime.today() - UrBiGetDatasBase.time_dict[key_name]).total_seconds()>(timeout*60*60):            self.logger.info('超时%d小时,重新查数据:%s', timeout, key_name)            # UrBiGetDatasBase.time_list[key_name] = datetime.datetime.today()            get_data_timeout = True        else:            self.logger.info('未超时%d小时,跳过查数据:%s', timeout, key_name)        # if self.vars_helper is not None :        #     self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_list)        UrBiGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout        return get_data_timeout    def save_last_time(self, key_name):        '''更新状态超时'''        # 转静态路径,确保唯一性        key_name = os.path.abspath(key_name)        if UrBiGetDatasBase.get_data_timeout_dict[key_name]:            UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()        if self.vars_helper is not None :            UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()            self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_dict)    def get_lock(self, key_name) -> threading.Lock:        '''获取锁'''        # 转静态路径,确保唯一性        key_name = os.path.abspath(key_name)        if key_name not in UrBiGetDatasBase.lock_dict.keys():            UrBiGetDatasBase.lock_dict[key_name] = threading.Lock()        return UrBiGetDatasBase.lock_dict[key_name]    def get_data_of_date(        self,        save_dir,        sql,        sort_columns:List[str],        del_index_list=[-1], # 删除最后下标        start_date = datetime.datetime(2017, 1, 1), # 开始时间        offset = relativedelta(months=3), # 时间间隔        date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查询语句中替代时间参数的格式化        filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查询语句中替代时间参数的格式化        stop_date = '20700101', # 超过时间则停止        data_format_fun = None, # 格式化数据        ):        '''分时间增量读取数据'''        # 创建文件夹        if not os.path.exists(save_dir):            os.makedirs(save_dir)        else:            #删除最后一个文件            file_list = os.listdir(save_dir)            if len(file_list)>0:                file_list.sort()                for del_index in del_index_list:                    os.remove(os.path.join(save_dir,file_list[del_index]))                    print('删除最后一个文件:', file_list[del_index])        select_index = -1        # start_date = datetime.datetime(2017, 1, 1)        while True:            end_date = start_date + offset            start_date_str = date_format_fun(start_date)            end_date_str = date_format_fun(end_date)            self.logger.info('date: %s-%s', start_date_str, end_date_str)            file_path = os.path.join(save_dir, filename_format_fun(start_date))            # self.logger.info('file_path: %s', file_path)            if not os.path.exists(file_path):                data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))                if data is None:                    break                self.logger.info('data: %d', len(data))                # self.logger.info('data: %d', data.columns)                if len(data)>0:                    select_index+=1                    if data_format_fun is not None:                        data = data_format_fun(data)                    # 排序                    data = data.sort_values(sort_columns)                    data.to_csv(file_path)                elif select_index!=-1:                    break                elif stop_date < start_date_str:                    raise Exception("读取数据异常,时间超出最大值!")            start_date = end_datepassclass UrBiGetDatas(UrBiGetDatasBase):    def __init__(        self,        host='10.2.32.22',        port=21051,        database='ur_ai_dw',        auth_mechanism='LDAP',        user='urbi',        password='Ur#730xd',        save_dir='./hjx/data/ur_bi_dw_data',        logger:logging.Logger=None        ):        self.save_dir = save_dir        self.logger = logger        super().__init__(            host=host,            port=port,            database=database,            auth_mechanism=auth_mechanism,            user=user,            password=password,            save_dir=save_dir,            logger=logger            )    def get_dim_date(self):        '''日期数据'''        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_date.csv')        now_lock = self.get_lock(file_path)        now_lock.acquire() # 加锁        try:            # 设置超时4小时才重新查数据            if not self.get_last_time(file_path):                return            sql = 'SELECT * FROM ur_bi_dw.dim_date'            data:pd.DataFrame = self.db_helper.get_data(sql)            columns = list(data.columns)            columns = {c:'dim_date.'+c for c in columns}            data = data.rename(columns=columns)            data = data.sort_values(['dim_date.date_key'])            data.to_csv(file_path)            # 更新超时时间            self.save_last_time(file_path)        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            now_lock.release() # 释放锁    def get_dim_shop(self):        '''店铺数据'''        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_shop.csv')        now_lock = self.get_lock(file_path)        now_lock.acquire() # 加锁        try:            # 设置超时4小时才重新查数据            if not self.get_last_time(file_path):                return            sql = 'SELECT * FROM ur_bi_dw.dim_shop'            data:pd.DataFrame = self.db_helper.get_data(sql)            columns = list(data.columns)            columns = {c:'dim_shop.'+c for c in columns}            data = data.rename(columns=columns)            data = data.sort_values(['dim_shop.shop_no'])            data.to_csv(file_path)            # 更新超时时间            self.save_last_time(file_path)        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            now_lock.release() # 释放锁    def get_dim_vip(self):        '''会员数据'''        sub_dir = os.path.join(self.save_dir,'vip_no')        now_lock = self.get_lock(sub_dir)        now_lock.acquire() # 加锁        try:            # 设置超时4小时才重新查数据            if not self.get_last_time(sub_dir):                return            sql = '''SELECT dv.*, dd.date_key, dd.date_name2             FROM ur_bi_dw.dim_vip as dv            INNER JOIN ur_bi_dw.dim_date as dd            ON dv.card_create_date=dd.date_name2             where dd.date_key >= %s            and dd.date_key < %s'''            # data:pd.DataFrame = self.db_helper.get_data(sql)            sort_columns = ['dv.vip_no']            # TODO:            self.get_data_of_date(                save_dir=sub_dir,                sql=sql,                sort_columns=sort_columns,                start_date=datetime.datetime(2017, 1, 1), # 开始时间                offset=relativedelta(years=1)            )            # 更新超时时间            self.save_last_time(sub_dir)        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            now_lock.release() # 释放锁    def get_weather(self):        '''天气数据'''        sub_dir = os.path.join(self.save_dir,'weather')        now_lock = self.get_lock(sub_dir)        now_lock.acquire() # 加锁        try:            # 设置超时4小时才重新查数据            if not self.get_last_time(sub_dir):                return            sql = """            select weather.* from ur_bi_ods.ods_base_weather_data_1200 as weather            where weather.date_key>=%s and weather.date_key<%s            """            sort_columns = ['weather.date_key','weather.areaid']            def data_format_fun(data):                columns = list(data.columns)                columns = {c:'weather.'+c for c in columns}                data = data.rename(columns=columns)                return data            self.get_data_of_date(                save_dir=sub_dir,                sql=sql,                sort_columns=sort_columns,                del_index_list=[-2, -1], # 删除最后下标                data_format_fun=data_format_fun,            )            # 更新超时时间            self.save_last_time(sub_dir)        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            now_lock.release() # 释放锁    def get_weather_city(self):        '''天气城市数据'''        file_path = os.path.join(self.save_dir,'ur_bi_dw.weather_city.csv')        now_lock = self.get_lock(file_path)        now_lock.acquire() # 加锁        try:            # 设置超时4小时才重新查数据            if not self.get_last_time(file_path):                return            sql = 'SELECT * FROM ur_bi_dw.dim_weather_city as weather_city'            data:pd.DataFrame = self.db_helper.get_data(sql)            columns = list(data.columns)            columns = {c:'weather_city.'+c for c in columns}            data = data.rename(columns=columns)            data.to_csv(file_path)            # 更新超时时间            self.save_last_time(file_path)        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            now_lock.release() # 释放锁    def get_dim_goods(self):        '''货品数据'''        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods.csv')        now_lock = self.get_lock(file_path)        now_lock.acquire() # 加锁        try:            # 设置超时4小时才重新查数据            if not self.get_last_time(file_path):                return            sql = 'SELECT * FROM ur_bi_dw.dim_goods'            data:pd.DataFrame = self.db_helper.get_data(sql)            columns = list(data.columns)            columns = {c:'dim_goods.'+c for c in columns}            data = data.rename(columns=columns)            data.to_csv(file_path)            # 更新超时时间            self.save_last_time(file_path)        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            now_lock.release() # 释放锁    def get_dim_goods_market_shop_date(self):        '''店铺商品生命周期数据'''        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods_market_shop_date.csv')        now_lock = self.get_lock(file_path)        now_lock.acquire() # 加锁        try:            # 设置超时4小时才重新查数据            if not self.get_last_time(file_path):                return            # sql = 'SELECT * FROM ur_bi_dw.dim_goods_market_shop_date as goods_shop_date'            sql = '''            select shop_no, sku_no, shop_market_date, lifecycle_end_date, lifecycle_days            FROM ur_bi_dw.dim_goods_market_shop_date            where lifecycle_end_date is not null            '''            data:pd.DataFrame = self.db_helper.get_data(sql)            columns = list(data.columns)            columns = {c:c.replace('lifecycle_end_date.','') for c in columns}            data = data.rename(columns=columns)            data = data.sort_values(['shop_market_date'])            data.to_csv(file_path, index=False)            # 更新超时时间            self.save_last_time(file_path)        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            now_lock.release() # 释放锁    def get_dim_goods_market_date(self):        '''全国商品生命周期数据'''        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods_market_date.csv')        now_lock = self.get_lock(file_path)        now_lock.acquire() # 加锁        try:            # 设置超时4小时才重新查数据            if not self.get_last_time(file_path):                return            sql = '''            select * FROM ur_bi_dw.dim_goods_market_date            '''            data:pd.DataFrame = self.db_helper.get_data(sql)            columns = list(data.columns)            columns = {c:'dim_goods_market_date.'+c for c in columns}            data = data.rename(columns=columns)            data = data.sort_values(['dim_goods_market_date.sku_no'])            data.to_csv(file_path, index=False)            # 更新超时时间            self.save_last_time(file_path)        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            now_lock.release() # 释放锁    def get_dim_goods_color_dev_sizes(self):        '''商品开发码数数据'''        file_path = os.path.join(self.save_dir,'dim_goods_color_dev_sizes.csv')        now_lock = self.get_lock(file_path)        now_lock.acquire() # 加锁        try:            # 设置超时4小时才重新查数据            if not self.get_last_time(file_path):                return            # sql = 'SELECT * FROM ur_bi_dw.dim_goods_market_shop_date as goods_shop_date'            sql = 'SELECT * FROM ur_bi_dm.dim_goods_color_dev_sizes'            data:pd.DataFrame = self.db_helper.get_data(sql)            columns = list(data.columns)            columns = {c:c.replace('dim_goods_color_dev_sizes.','') for c in columns}            data = data.rename(columns=columns)            data.to_csv(file_path, index=False)            # 更新超时时间            self.save_last_time(file_path)        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            now_lock.release() # 释放锁    def get_dwd_daily_sales_size(self):        '''实际销售金额'''        sub_dir = os.path.join(self.save_dir,'dwd_daily_sales_size_all')        now_lock = self.get_lock(sub_dir)        now_lock.acquire() # 加锁        try:            # 设置超时4小时才重新查数据            if not self.get_last_time(sub_dir):                return            sql = """            select shop_no,sku_no,date_key,`size`,                sum(tag_price) as `tag_price`,                sum(sales_qty) as `sales_qty`,                sum(sales_tag_amt) as `sales_tag_amt`,                sum(sales_amt) as `sales_amt`,                count(0) as `sales_count`            from ur_bi_dw.dwd_daily_sales_size as sales            where sales.date_key>=%s and sales.date_key<%s                and sales.currency_code='CNY'            group by shop_no,sku_no,date_key,`size`            """            sort_columns = ['date_key','shop_no','sku_no']            self.get_data_of_date(                save_dir=sub_dir,                sql=sql,                sort_columns=sort_columns,                start_date=datetime.datetime(2017, 1, 1), # 开始时间            )            # 更新超时时间            self.save_last_time(sub_dir)        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            now_lock.release() # 释放锁    def get_dwd_daily_delivery_size(self):        '''实际配货金额'''        sub_dir = os.path.join(self.save_dir,'dwd_daily_delivery_size_all')        now_lock = self.get_lock(sub_dir)        now_lock.acquire() # 加锁        try:            # 设置超时4小时才重新查数据            if not self.get_last_time(sub_dir):                return            sql = """            select shop_no,sku_no,date_key,`size`,                sum(delivery.shop_distr_received_qty) as `shop_distr_received_qty`,                sum(delivery.shop_distr_received_amt) as `shop_distr_received_amt`,                sum(delivery.online_distr_received_qty) as `online_distr_received_qty`,                sum(delivery.online_distr_received_amt) as `online_distr_received_amt`,                sum(delivery.pr_received_qty) as `pr_received_qty`,                count(0) as `delivery_count`            from ur_bi_dw.dwd_daily_delivery_size as delivery            where delivery.date_key>=%s and delivery.date_key<%s                and delivery.currency_code='CNY'            group by shop_no,sku_no,date_key,`size`            """            sort_columns = ['date_key','shop_no','sku_no']            self.get_data_of_date(                save_dir=sub_dir,                sql=sql,                sort_columns=sort_columns,                start_date=datetime.datetime(2017, 1, 1), # 开始时间            )            # 更新超时时间            self.save_last_time(sub_dir)        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            now_lock.release() # 释放锁    def get_v_last_nation_sales_status(self):        '''商品畅滞销数据'''        file_path = os.path.join(self.save_dir,'v_last_nation_sales_status.csv')        now_lock = self.get_lock(file_path)        now_lock.acquire() # 加锁        try:            # 设置超时4小时才重新查数据            if not self.get_last_time(file_path):                return            sql = 'SELECT * FROM ur_bi_dw.v_last_nation_sales_status'            data:pd.DataFrame = self.db_helper.get_data(sql)            columns = list(data.columns)            columns = {c:c.replace('v_last_nation_sales_status.','') for c in columns}            data = data.rename(columns=columns)            data.to_csv(file_path, index=False)            # 更新超时时间            self.save_last_time(file_path)        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            now_lock.release() # 释放锁    def get_dwd_daily_finacial_goods(self):        '''商品成本价数据'''        file_path = os.path.join(self.save_dir,'dwd_daily_finacial_goods.csv')        now_lock = self.get_lock(file_path)        now_lock.acquire() # 加锁        try:            # 设置超时4小时才重新查数据            if not self.get_last_time(file_path):                return            sql = """            select t1.sku_no,t1.`size`,t1.cost_tax_incl from ur_bi_dw.dwd_daily_finacial_goods as t1            inner join (                select sku_no,`size`,max(date_key) as date_key                from ur_bi_dw.dwd_daily_finacial_goods                where currency_code='CNY' and country_code='CN'                group by sku_no,`size`            ) as t2            on t2.sku_no=t1.sku_no                and t2.`size`=t1.`size`                and t2.date_key=t1.date_key            where t1.currency_code='CNY' and t1.country_code='CN'            """            data:pd.DataFrame = self.db_helper.get_data(sql)            columns = list(data.columns)            columns = {c:c.replace('t1.','') for c in columns}            data = data.rename(columns=columns)            data.to_csv(file_path, index=False)            # 更新超时时间            self.save_last_time(file_path)        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            now_lock.release() # 释放锁    def get_dim_size_group(self):        '''尺码映射数据'''        file_path = os.path.join(self.save_dir,'dim_size_group.csv')        now_lock = self.get_lock(file_path)        now_lock.acquire() # 加锁        try:            # 设置超时4小时才重新查数据            if not self.get_last_time(file_path):                return            sql = """select * from ur_bi_dw.dim_size_group"""            data:pd.DataFrame = self.db_helper.get_data(sql)            columns = list(data.columns)            columns = {c:c.replace('dim_size_group.','') for c in columns}            data = data.rename(columns=columns)            data.to_csv(file_path, index=False)            # 更新超时时间            self.save_last_time(file_path)        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            now_lock.release() # 释放锁passdef get_common_datas(    host='10.2.32.22',    port=21051,    database='ur_ai_dw',    auth_mechanism='LDAP',    user='urbi',    password='Ur#730xd',    logger:logging.Logger=None):    # 共用文件    common_datas_dir = ShareArgs.get_args_value('common_datas_dir')    common_ur_bi_dir = os.path.join(common_datas_dir, 'ur_bi_data')    ur_bi_get_datas = UrBiGetDatas(        host=host,        port=port,        database=database,        auth_mechanism=auth_mechanism,        user=user,        password=password,        save_dir=common_ur_bi_dir,        logger=logger    )    try:        logger.info('正在查询日期数据...')        ur_bi_get_datas.get_dim_date()        logger.info('查询日期数据完成!')        logger.info('正在查询店铺数据...')        ur_bi_get_datas.get_dim_shop()        logger.info('查询店铺数据完成!')        logger.info('正在查询天气数据...')        ur_bi_get_datas.get_weather()        logger.info('查询天气数据完成!')        logger.info('正在查询天气城市数据...')        ur_bi_get_datas.get_weather_city()        logger.info('查询天气城市数据完成!')        logger.info('正在查询货品数据...')        ur_bi_get_datas.get_dim_goods()        logger.info('查询货品数据完成!')        logger.info('正在查询实际销量数据...')        ur_bi_get_datas.get_dwd_daily_sales_size()        logger.info('查询实际销量数据完成!')    except Exception as ex:        logger.exception(ex)        raise ex # 往外抛出异常    finally:        ur_bi_get_datas.close()passclass CustomUrBiGetDatas(UrBiGetDatasBase):    def __init__(        self,        host='10.2.32.22',        port=21051,        database='ur_ai_dw',        auth_mechanism='LDAP',        user='urbi',        password='Ur#730xd',        save_dir='./hjx/data/ur_bi_data',        logger:logging.Logger=None        ):        self.save_dir = save_dir        self.logger = logger        super().__init__(            host=host,            port=port,            database=database,            auth_mechanism=auth_mechanism,            user=user,            password=password,            save_dir=save_dir,            logger=logger            )    def get_sales_goal_amt(self):        '''销售目标金额'''        file_path = os.path.join(self.save_dir,'month_of_year_sales_goal_amt.csv')        now_lock = self.get_lock(file_path)        now_lock.acquire() # 加锁        try:            # 设置超时4小时才重新查数据            if not self.get_last_time(file_path):                return            sql = '''            select sales_goal.shop_no,                if(sales_goal.serial='Y','W',sales_goal.serial) as `sales_goal.serial`,                dates.month_of_year,                sum(sales_goal.sales_goal_amt) as sales_goal_amt            from ur_bi_dw.dwd_sales_goal_west as sales_goal            inner join ur_bi_dw.dim_date as dates                on sales_goal.date_key = dates.date_key            group by sales_goal.shop_no,                if(sales_goal.serial='Y','W',sales_goal.serial),                dates.month_of_year            '''            data:pd.DataFrame = self.db_helper.get_data(sql)            data = data.rename(columns={                'shop_no':'sales_goal.shop_no',                'serial':'sales_goal.serial',                'month_of_year':'dates.month_of_year',            })            # 排序            data = data.sort_values(['sales_goal.shop_no','sales_goal.serial','dates.month_of_year'])            data.to_csv(file_path)            # 更新超时时间            self.save_last_time(file_path)        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            now_lock.release() # 释放锁    def get_shop_serial_area(self):        '''店-系列面积'''        file_path = os.path.join(self.save_dir,'shop_serial_area.csv')        now_lock = self.get_lock(file_path)        now_lock.acquire() # 加锁        try:            # 设置超时4小时才重新查数据            if not self.get_last_time(file_path):                return            sql = '''            select shop_serial_area.shop_no,                if(shop_serial_area.serial='Y','W',shop_serial_area.serial) as `shop_serial_area.serial`,                shop_serial_area.month_of_year,                sum(shop_serial_area.area) as `shop_serial_area.area`            from ur_bi_dw.dwd_shop_serial_area as shop_serial_area            where shop_serial_area.area is not null            group by shop_serial_area.shop_no,if(shop_serial_area.serial='Y','W',shop_serial_area.serial),shop_serial_area.month_of_year            '''            data:pd.DataFrame = self.db_helper.get_data(sql)            data = data.rename(columns={                'shop_no':'shop_serial_area.shop_no',                'serial':'shop_serial_area.serial',                'month_of_year':'shop_serial_area.month_of_year',                'area':'shop_serial_area.area',            })            # 排序            data = data.sort_values(['shop_serial_area.shop_no','shop_serial_area.serial','shop_serial_area.month_of_year'])            data.to_csv(file_path)            # 更新超时时间            self.save_last_time(file_path)        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            now_lock.release() # 释放锁passdef get_datas(    host='10.2.32.22',    port=21051,    database='ur_ai_dw',    auth_mechanism='LDAP',    user='urbi',    password='Ur#730xd',    save_dir='./data/sales_forecast/ur_bi_dw_data',    logger:logging.Logger=None):    ur_bi_get_datas = CustomUrBiGetDatas(        host=host,        port=port,        database=database,        auth_mechanism=auth_mechanism,        user=user,        password=password,        save_dir=save_dir,        logger=logger    )    try:        # 店,系列,品类,年月,销售目标金额        logger.info('正在查询年月销售目标金额数据...')        ur_bi_get_datas.get_sales_goal_amt()        logger.info('查询年月销售目标金额数据完成!')    except Exception as ex:        logger.exception(ex)        raise ex # 往外抛出异常    finally:        ur_bi_get_datas.close()passdef getdata_ur_bi_dw(    host='10.2.32.22',    port=21051,    database='ur_ai_dw',    auth_mechanism='LDAP',    user='urbi',    password='Ur#730xd',    save_dir='./data/sales_forecast/ur_bi_dw_data',    logger=None):    get_common_datas(        host=host,        port=port,        database=database,        auth_mechanism=auth_mechanism,        user=user,        password=password,        logger=logger    )    get_datas(        host=host,        port=port,        database=database,        auth_mechanism=auth_mechanism,        user=user,        password=password,        save_dir=save_dir,        logger=logger    )pass# 代码入口# getdata_ur_bi_dw(#     host=ur_bi_dw_host,#     port=ur_bi_dw_port,#     database=ur_bi_dw_database,#     auth_mechanism=ur_bi_dw_auth_mechanism,#     user=ur_bi_dw_user,#     password=ur_bi_dw_password,#     save_dir=ur_bi_dw_save_dir,#     logger=logger#     )

代码说明和领悟

每个类的具体作用说明,代码需要根据下面的文字说明进行“食用”:

(第一层)HiveHelper完成了连接数据库、关闭数据库连接、生成事务、执行、引擎、连接等功能

VarsHelper提供了一个简单的持久化功能,可以将对象以文件的形式存放在磁盘上。并提供设置值、获取值、判断值是否存在的方法

GlobalShareArgs提供了一个字典,并且提供了获取字典、设置字典、设置字典键值对、设置字典键的值、判断键是否在字典中、更新字典等方法

ShareArgs跟GlobalShareArgs类似,只是一开始字典的初始化的键值对比较多

(第二层)UrBiGetDataBase类,提供了线程锁字典、时间字典、超时判断字典,都是类变量;使用了HiveHelper类,但注意,不是继承。在具体的sql读数时,提供了线程固定和时间判断

(第三层)UrBiGetDatas类,获取hive数据库那边的日期数据、店铺数据、会员数据、天气数据、天气城市数据、商品数据、店铺生命周期数据、全国商品生命周期数据、商品开发码数数据、实际销售金额、实际配货金额、商品畅滞销数据、商品成本价数据、尺码映射数据等。

(第四层)get_common_data函数,使用URBiGetData类读取日期、店铺、天气、天气城市、货品、实际销量数据,并缓存到文件夹./yongjian/data/ur_bi_data下面

CustomUrBiGetData类,继承了UrBiGetDatasBase类,读取销售目标金额、点系列面积数据。

(这个也是第四层)get_datas函数,通过CustomUrBiGetData类,读取年月销售目标金额。

总的函数:(这个是总的调用入口函数)get_data_ur_bi_dw函数,调用了get_common_data和get_datas函数进行读取数据,然后将数据保存到某个文件夹目录下面。

举一反三,如果你不是hive数据库,你可以将第一层这个底层更换成mysql。主页有解释如果进行更换。第二层不需要改变,第三层就是你想要进行读取的数据表,不同的数据库你想要读取的数据表也不同,所以sql需要你在这里写,套用里面的方法即可,基本上就是修改sql就好了。

这种方法的好处在于,数据不会重复读取,并且读取的数据都可以得到高效的使用。

后续附上修改成mysql的一个例子代码

import loggingimport pandas as pdfrom impala.dbapi import connectimport sqlalchemyfrom sqlalchemy.orm import sessionmakerimport osimport timeimport osimport datetimefrom dateutil.relativedelta import relativedeltafrom typing import Dict, Listimport loggingimport threadingimport pandas as pdimport pickleclass MySqlHelper(object):    def __init__(        self,        host='192.168.15.144',        port=3306,        database='test_ims',        user='spkjz_writer',        password='7cmoP3QDtueVJQj2q4Az',        logger:logging.Logger=None        ):        self.host = host        self.port = port        self.database = database        self.user = user        self.password = password        self.logger = logger        self.connection_str = 'mysql+pymysql://%s:%s@%s:%d/%s' %(            self.user, self.password, self.host, self.port, self.database        )        self.conn = None        self.cursor = None        self.engine = None        self.session = None    def create_table_code(self, file_name):        '''创建表类代码'''        os.system(f'sqlacodegen {self.connection_str} > {file_name}')        return self.conn    def get_conn(self):        '''创建连接或获取连接'''        if self.conn is None:            engine = self.get_engine()            self.conn = engine.connect()        return self.conn    def get_engine(self):        '''创建连接或获取连接'''        if self.engine is None:            self.engine = sqlalchemy.create_engine(self.connection_str)        return self.engine    def get_cursor(self):        '''创建连接或获取连接'''        if self.cursor is None:            self.cursor = self.conn.cursor()        return self.cursor    def get_session(self) -> sessionmaker:        '''创建连接或获取连接'''        if self.session is None:            engine = self.get_engine()            Session = sessionmaker(bind=engine)            self.session = Session()        return self.session    def close_conn(self):        '''关闭连接'''        if self.conn is not None:            self.conn.close()            self.conn = None        self.dispose_engine()    def close_session(self):        '''关闭连接'''        if self.session is not None:            self.session.close()            self.session = None        self.dispose_engine()    def dispose_engine(self):        '''释放engine'''        if self.engine is not None:            # self.engine.dispose(close=False)            self.engine.dispose()            self.engine = None    def close_cursor(self):        '''关闭cursor'''        if self.cursor is not None:            self.cursor.close()            self.cursor = None    def get_data(self, sql, auto_close=True) -> pd.DataFrame:        '''查询数据'''        conn = self.get_conn()        data = None        try:            # 异常重试3次            for i in range(3):                try:                    data = pd.read_sql(sql, conn)                    break                except Exception as ex:                    if i == 2:                        raise ex # 往外抛出异常                    time.sleep(60) # 一分钟后重试        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            if auto_close:                self.close_conn()        return datapassclass VarsHelper():    def __init__(self, save_dir, auto_save=True):        self.save_dir = save_dir        self.auto_save = auto_save        self.values = {}        if not os.path.exists(os.path.dirname(self.save_dir)):            os.makedirs(os.path.dirname(self.save_dir))        if os.path.exists(self.save_dir):            with open(self.save_dir, 'rb') as f:                self.values = pickle.load(f)                f.close()    def set_value(self, key, value):        self.values[key] = value        if self.auto_save:            self.save_file()    def get_value(self, key):        return self.values[key]    def has_key(self, key):        return key in self.values.keys()    def save_file(self):        with open(self.save_dir, 'wb') as f:            pickle.dump(self.values, f)            f.close()passclass GlobalShareArgs():    args = {        "debug": False    }    def get_args():        return GlobalShareArgs.args    def set_args(args):        GlobalShareArgs.args = args    def set_args_value(key, value):        GlobalShareArgs.args[key] = value    def get_args_value(key, default_value=None):        return GlobalShareArgs.args.get(key, default_value)    def contain_key(key):        return key in GlobalShareArgs.args.keys()    def update(args):        GlobalShareArgs.args.update(args)passclass ShareArgs():    args = {        "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 标签目录        "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚类导出标签目录        "common_datas_dir":"./hjx/data", # 共用数据目录。ur_bi_dw的公共        "only_predict": False, # 只识别,不训练        "delete_model": True, # 先删除模型,仅在训练时使用        "export_excel": False, # 导出excel        "classes": 12, # 聚类数        "batch_size": 16,        "hidden_size": 32,        "max_nrof_epochs": 100,        "learning_rate": 0.0005,        "loss_type": "categorical_crossentropy",        "avg_model_num": 10,        "steps_per_epoch": 4.0, # 4.0        "lr_callback_patience": 4,         "lr_callback_cooldown": 1,        "early_stopping_callback_patience": 6,        "get_data": True,    }    def get_args():        return ShareArgs.args    def set_args(args):        ShareArgs.args = args    def set_args_value(key, value):        ShareArgs.args[key] = value    def get_args_value(key, default_value=None):        return ShareArgs.args.get(key, default_value)    def contain_key(key):        return key in ShareArgs.args.keys()    def update(args):        ShareArgs.args.update(args)passclass IMSGetDatasBase():    # 线程锁列表,同保存路径共用锁    lock_dict:Dict[str, threading.Lock] = {}    # 时间列表,用于判断是否超时    time_dict:Dict[str, datetime.datetime] = {}    # 用于记录是否需要更新超时时间    get_data_timeout_dict:Dict[str, bool] = {}    def __init__(        self,        host='192.168.15.144',        port=3306,        database='test_ims',        user='spkjz_writer',        password='Ur#7cmoP3QDtueVJQj2q4Az',        save_dir=None,        logger:logging.Logger=None,        ):        self.save_dir = save_dir        self.logger = logger        self.db_helper = MySqlHelper(            host=host,            port=port,            database=database,            user=user,            password=password,            logger=logger            )        # 创建子目录        if self.save_dir is not None and not os.path.exists(self.save_dir):            os.makedirs(self.save_dir)        self.vars_helper = None        if GlobalShareArgs.get_args_value('debug'):            self.vars_helper = VarsHelper('./hjx/data/vars/IMSGetDatas') # 把超时时间保存到文件,注释该行即可停掉,只用于调试    def close(self):        '''关闭连接'''        self.db_helper.close_conn()    def get_last_time(self, key_name) -> bool:        '''获取是否超时'''        # 转静态路径,确保唯一性        key_name = os.path.abspath(key_name)        if self.vars_helper is not None and self.vars_helper.has_key('IMSGetDatasBase.time_list'):            IMSGetDatasBase.time_dict = self.vars_helper.get_value('IMSGetDatasBase.time_list')        timeout = 12 # 12小时        if GlobalShareArgs.get_args_value('debug'):            timeout = 24 # 24小时        get_data_timeout = False        if key_name not in IMSGetDatasBase.time_dict.keys() or (datetime.datetime.today() - IMSGetDatasBase.time_dict[key_name]).total_seconds()>(4*60*60):            self.logger.info('超时%d小时,重新查数据:%s', timeout, key_name)            # IMSGetDatasBase.time_list[key_name] = datetime.datetime.today()            get_data_timeout = True        else:            self.logger.info('未超时%d小时,跳过查数据:%s', timeout, key_name)        # if self.vars_helper is not None :        #     self.vars_helper.set_value('IMSGetDatasBase.time_list', IMSGetDatasBase.time_list)        IMSGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout        return get_data_timeout    def save_last_time(self, key_name):        '''更新状态超时'''        # 转静态路径,确保唯一性        key_name = os.path.abspath(key_name)        if IMSGetDatasBase.get_data_timeout_dict[key_name]:            IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today()        if self.vars_helper is not None :            IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today()            self.vars_helper.set_value('IMSGetDatasBase.time_list', IMSGetDatasBase.time_dict)    def get_lock(self, key_name) -> threading.Lock:        '''获取锁'''        # 转静态路径,确保唯一性        key_name = os.path.abspath(key_name)        if key_name not in IMSGetDatasBase.lock_dict.keys():            IMSGetDatasBase.lock_dict[key_name] = threading.Lock()        return IMSGetDatasBase.lock_dict[key_name]    def get_data_of_date(        self,        save_dir,        sql,        sort_columns:List[str],        del_index_list=[-1], # 删除最后下标        start_date = datetime.datetime(2017, 1, 1), # 开始时间        offset = relativedelta(months=3), # 时间间隔        date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查询语句中替代时间参数的格式化        filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查询语句中替代时间参数的格式化        stop_date = '20700101', # 超过时间则停止        ):        '''分时间增量读取数据'''        # 创建文件夹        if not os.path.exists(save_dir):            os.makedirs(save_dir)        else:            #删除最后一个文件            file_list = os.listdir(save_dir)            if len(file_list)>0:                file_list.sort()                for del_index in del_index_list:                    os.remove(os.path.join(save_dir,file_list[del_index]))                    print('删除最后一个文件:', file_list[del_index])        select_index = -1        # start_date = datetime.datetime(2017, 1, 1)        while True:            end_date = start_date + offset            start_date_str = date_format_fun(start_date)            end_date_str = date_format_fun(end_date)            self.logger.info('date: %s-%s', start_date_str, end_date_str)            file_path = os.path.join(save_dir, filename_format_fun(start_date))            # self.logger.info('file_path: %s', file_path)            if not os.path.exists(file_path):                data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))                if data is None:                    break                self.logger.info('data: %d', len(data))                # self.logger.info('data: %d', data.columns)                if len(data)>0:                    select_index+=1                    # 排序                    data = data.sort_values(sort_columns)                    data.to_csv(file_path)                elif select_index!=-1:                    break                elif stop_date < start_date_str:                    raise Exception("读取数据异常,时间超出最大值!")            start_date = end_datepassclass CustomIMSGetDatas(IMSGetDatasBase):    def __init__(        self,        host='192.168.13.134',        port=4000,        database='test_ims',        user='root',        password='rootimmsadmin',        save_dir='./hjx/data/export_ims_data',        logger:logging.Logger=None        ):        self.save_dir = save_dir        self.logger = logger        super().__init__(            host=host,            port=port,            database=database,            user=user,            password=password,            save_dir=save_dir,            logger=logger            )    def get_ims_w_amt_pro(self):        '''年月系列占比数据'''        file_path = os.path.join(self.save_dir,'ims_w_amt_pro.csv')        now_lock = self.get_lock(file_path)        now_lock.acquire() # 加锁        try:            # 设置超时4小时才重新查数据            # if not self.get_last_time(file_path):            #     return            sql = 'SELECT * FROM ims_w_amt_pro'            data:pd.DataFrame = self.db_helper.get_data(sql)            data = data.rename(columns={                'serial_forecast_proportion': 'forecast_proportion',            })            data.to_csv(file_path)            # # 更新超时时间            # self.save_last_time(file_path)        except Exception as ex:            self.logger.exception(ex)            raise ex # 往外抛出异常        finally:            now_lock.release() # 释放锁passdef get_datas(    host='192.168.13.134',    port=4000,    database='test_ims',    user='root',    password='rootimmsadmin',    save_dir='./hjx/data/export_ims_data',    logger:logging.Logger=None    ):    ur_bi_get_datas = CustomIMSGetDatas(        host=host,        port=port,        database=database,        user=user,        password=password,        save_dir=save_dir,        logger=logger    )    try:        # 年月系列占比数据        logger.info('正在查询年月系列占比数据...')        ur_bi_get_datas.get_ims_w_amt_pro()        logger.info('查询年月系列占比数据完成!')    except Exception as ex:        logger.exception(ex)        raise ex # 往外抛出异常    finally:        ur_bi_get_datas.close()passdef getdata_export_ims(    host='192.168.13.134',    port=4000,    database='test_ims',    user='root',    password='rootimmsadmin',    save_dir='./hjx/data/export_ims_data',    logger:logging.Logger=None    ):    get_datas(        host=host,        port=port,        database=database,        user=user,        password=password,        save_dir=save_dir,        logger=logger    )pass

以上就是“Python读取Hive数据库代码怎么写”这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注编程网行业资讯频道。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Python读取Hive数据库代码怎么写

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Python读取Hive数据库代码怎么写

今天小编给大家分享一下Python读取Hive数据库代码怎么写的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。实际业务读取hi
2023-07-05

Python读取Hive数据库实现代码详解

这篇文章主要介绍了Python读取Hive数据库实现代码,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
2023-03-01

python怎么读取oracle数据库数据

要从Oracle数据库中读取数据,可以使用Python中的cx_Oracle模块。首先,确保已经安装了cx_Oracle模块。可以使用以下命令进行安装:pip install cx_Oracle然后,使用以下代码示例从Oracle数据库
python怎么读取oracle数据库数据
2024-04-09

python怎么读取oracle数据库数据

使用Python读取Oracle数据库数据本指南介绍了使用Python读取Oracle数据库数据的多种方法,包括使用cx_Oracle、PyOracle、SQLAlchemy和Pandas。通过这些方法,可以与Oracle数据库交互,执行查询,检索结果,以及处理和分析数据。文中还提供了最佳实践建议,以提高性能、确保安全性并释放资源。
python怎么读取oracle数据库数据
2024-04-14

oracle数据库怎么写代码

要使用 oracle 数据库编写代码,需要遵循以下步骤:1. 安装 oracle 数据库管理系统 (dbms) 和集成开发环境 (ide)。2. 创建数据库连接。3. 使用 sql 编写代码。4. 执行 sql 代码。5. 使用编程语言与数
oracle数据库怎么写代码
2024-04-19

怎么用Python读取千万级数据自动写入MySQL数据库

这篇文章主要介绍“怎么用Python读取千万级数据自动写入MySQL数据库”,在日常操作中,相信很多人在怎么用Python读取千万级数据自动写入MySQL数据库问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”怎
2023-07-02

Java读取数据库表的示例代码

这篇文章主要介绍了Java读取数据库表,本文通过示例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
2023-05-18

Android sd卡读取数据库实例代码

Android sd卡读取数据库实例代码 前言: 本文主要给大家讲解如何利用Android SD卡读取数据库,提供一些代码如下。先在 Manifest 里添加权限:
2022-06-06

mongodb创建数据库代码怎么写

在MongoDB中,可以使用以下代码来创建数据库:```javascript// 引入MongoDB驱动程序const MongoClient = require(mongodb).MongoClient;// 定义MongoDB的连接UR
2023-08-24

Python怎么实现Excel数据读取和写入

这篇“Python怎么实现Excel数据读取和写入”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Python怎么实现Exce
2023-06-30

redis怎么读取数据库

如何通过 redis 读取数据库直接连接到数据库Redis 提供了 EVAL 命令,允许在 Redis 服务器上直接执行 Lua 脚本。您可以使用该脚本连接到数据库并执行查询。例如:local redis = require("red
redis怎么读取数据库
2024-06-03

vb.net连接数据库的代码怎么写

以下是使用VB.NET连接数据库的基本代码示例:1. 首先需要在程序中引用 System.Data.SqlClient 命名空间。2. 在代码中定义连接字符串,指定要连接的数据库服务器、数据库名称、用户名和密码等信息。例如:Dim conn
2023-06-04

mysql创建数据库的代码怎么写

要在MySQL中创建一个数据库,可以使用以下SQL语句:CREATE DATABASE `数据库名`;例如,要在MySQL中创建一个名为mydatabase的数据库,可以使用以下代码:CREATE DATABASE `mydatabas
mysql创建数据库的代码怎么写
2024-03-02

SQLite5中怎么使用Python来读写数据库

本篇内容介绍了“SQLite5中怎么使用Python来读写数据库”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!SQLite支持多种编程语言的
2023-06-22

vb怎么读取sql数据库数据

在VB中,可以使用ADO.NET来读取SQL数据库的数据。ADO.NET提供了一系列的类和方法来与数据库进行交互。下面是一个读取数据库数据的示例代码:```vbImports System.Data.SqlClientModule Modu
2023-09-08

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录