1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| from impala.dbapi import connect from impala.util import as_pandas
class HiveConnect(object): def __init__(self, data_info, keytab_info, local_tmp): self.client = connect(database=data_info['databases'], host=keytab_info['host'], port=keytab_info['hive_port'], auth_mechanism='GSSAPI', kerberos_service_name='hive') self.database = data_info.get('databases') self.table = data_info.get('table') self.date_col = data_info.get('date_col') self.start_date = data_info.get('start_date') self.end_date = data_info.get('end_date') self.local_tmp = local_tmp
def get_all_key(self, key_str): """获取增量更新的所有数据的所有key""" if self.date_col is not None and self.start_date is not None and self.end_date is not None: sql_str = "select {key_str} from {table} where {col}>={start_date} and {col}<={end_date} group by {key_str}".format( table=self.table, col=self.date_col, start_date=self.start_date, end_date=self.end_date, key_str=key_str) else: sql_str = "select {key_str} from {table} group by {key_str}".format( table=self.table, key_str=key_str) logger.info("get_all_keySQL:"+sql_str) cursor = self.client.cursor() cursor.execute(sql_str) key_df = as_pandas(cursor) return key_df
def download(self, key_name, key_values): where = "" logger.info("date_column: " + str(self.date_col)) logger.info("start_date: " + str(self.start_date)) logger.info("end_date: " + str(self.end_date)) if self.date_col is not None and self.start_date is not None and self.end_date is not None: logger.info("###where 条件触发###") where += " where %s >= '%s' and %s <= '%s'" % (self.date_col, self.start_date, self.date_col, self.end_date) result_file = self.local_tmp + "/%s.%s.csv" % (self.database, self.table) if key_name != None and key_values != None: where_id = "" # for key_value_list in key_value: # index = "(" + " and ".join(["{}='{}'".format(key_name[i], key_value_list[i]) for i in range(len(key_name))]) + ")" # where_id += index +" or " # where_id = where_id[:-3] list(map(list, zip(*key_values))) for key, value in zip(key_name, list(map(list, zip(*key_values)))): index = "{} in ('{}')".format(key, "', '".join(list(set(value)))) where_id += index + " and " where_id = where_id[:-5] if where!="": where += " and " + where_id else: where = "where " + where_id sql_str = "select * from " + self.table + " " + where logger.info(sql_str) cursor = self.client.cursor() cursor.execute(sql_str) data = as_pandas(cursor) data.columns = [i.split('.')[-1] for i in data.columns] data.to_csv(result_file, index=False) return result_file with krbContext(using_keytab=True, keytab_file=self.keytab_info['keytab_file'],principal=self.keytab_info['principal'], ccache_file='./tmp/cache'): HiveConnect(info, keytab_info, path)
|