hive连接

  1. 连接Hive之impala
  2. 推荐使用shell稳定,或者pyspark

连接Hive之impala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from impala.dbapi import connect
from impala.util import as_pandas


class HiveConnect(object):
def __init__(self, data_info, keytab_info, local_tmp):
self.client = connect(database=data_info['databases'], host=keytab_info['host'], port=keytab_info['hive_port'],
auth_mechanism='GSSAPI', kerberos_service_name='hive')
self.database = data_info.get('databases')
self.table = data_info.get('table')
self.date_col = data_info.get('date_col')
self.start_date = data_info.get('start_date')
self.end_date = data_info.get('end_date')
self.local_tmp = local_tmp

def get_all_key(self, key_str):
"""获取增量更新的所有数据的所有key"""
if self.date_col is not None and self.start_date is not None and self.end_date is not None:
sql_str = "select {key_str} from {table} where {col}>={start_date} and {col}<={end_date} group by {key_str}".format(
table=self.table, col=self.date_col, start_date=self.start_date, end_date=self.end_date, key_str=key_str)
else:
sql_str = "select {key_str} from {table} group by {key_str}".format(
table=self.table, key_str=key_str)
logger.info("get_all_keySQL:"+sql_str)
cursor = self.client.cursor()
cursor.execute(sql_str)
key_df = as_pandas(cursor)
return key_df

def download(self, key_name, key_values):
where = ""
logger.info("date_column: " + str(self.date_col))
logger.info("start_date: " + str(self.start_date))
logger.info("end_date: " + str(self.end_date))
if self.date_col is not None and self.start_date is not None and self.end_date is not None:
logger.info("###where 条件触发###")
where += " where %s >= '%s' and %s <= '%s'" % (self.date_col, self.start_date, self.date_col, self.end_date)
result_file = self.local_tmp + "/%s.%s.csv" % (self.database, self.table)
if key_name != None and key_values != None:
where_id = ""
# for key_value_list in key_value:
# index = "(" + " and ".join(["{}='{}'".format(key_name[i], key_value_list[i]) for i in range(len(key_name))]) + ")"
# where_id += index +" or "
# where_id = where_id[:-3]
list(map(list, zip(*key_values)))
for key, value in zip(key_name, list(map(list, zip(*key_values)))):
index = "{} in ('{}')".format(key, "', '".join(list(set(value))))
where_id += index + " and "
where_id = where_id[:-5]
if where!="":
where += " and " + where_id
else:
where = "where " + where_id
sql_str = "select * from " + self.table + " " + where
logger.info(sql_str)
cursor = self.client.cursor()
cursor.execute(sql_str)
data = as_pandas(cursor)
data.columns = [i.split('.')[-1] for i in data.columns]
data.to_csv(result_file, index=False)
return result_file

with krbContext(using_keytab=True, keytab_file=self.keytab_info['keytab_file'],principal=self.keytab_info['principal'], ccache_file='./tmp/cache'):
HiveConnect(info, keytab_info, path)

推荐使用shell稳定,或者pyspark


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 zoubinbf@163.com

×

喜欢就点赞,疼爱就打赏