背景:在整个运维内部数据仓库构建中,我们使用了Hadoop大数据生态圈中的组件来支撑运维数据的数据仓库构建。我们使用了Hive作为数据仓库工具,同时使用Hue来对整个运维数据进行管理和查询,最终根据部门需求生成结构化数据存入关系型或K/V型数据库,以供其他部门进行商业化决策。但是在使用command-line方式和hue上操作hive时,经常会有些许问题,并且灵活性交差,因此为了改善数据到Hive的加载过程以及对Hive库中数据的操作,借此机会使用PyHive库进行操作管理Hive.

本篇简单记录下使用Python操作Hive。

前言

我们当前整个ETL过程大概如下: - 1.使用Python程序对各个维度的运维数据进行采集、加工和初步的清洗处理,按照一定的数据标准和数仓模型进行数据存储 - 2.将上述抽取出来的相对结构化的数据存储到HDFS集群中 - 3.使用Hive定期从HDFS集群中加载数据,并根据已有数据进行再次加工处理,并提取价值信息

ETL的质量问题具体表现为正确性、完整性、一致性、完备性、有效性、时效性和可获取性等几个特性。由于我们构建的运维数据仓库需要涉及到不同的数据源,且数据源的数据模型各不相同,为保证数据的正确性和完整性,数据加工过程选择在源数据库端执行,进行初步清洗加工后再进行转储到目标数据仓库中。

Python操作Hive

注意:想要使用hive,必须要有一个可用的hive集群,同时为了保证可用使用API操作hive,我们需要要求提供hiveserver2服务

假设我们的hiveserver2地址为10.0.1.18:10000,且用户为hdfs.使用PyHive库链接Hive.

安装pyhive模块

1
2
# 过程中可能需要依赖sasl,thrift等相关服务,如有需要可以使用系统的包管理器安装(apt-get或yum)
pip install sasl thrift thrift-sasl PyHive

Python链接Hive以及基本使用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
$ cat pytest_hive.py
# 导入hive模块
from pyhive import hive

# 获取一个hive链接对象(链接到HiveServer2上)
## Connection类的__init__方法:__init__(self, host=None, port=None, username=None, database=u'default', auth=None, configuration=None, kerberos_service_name=None, password=None, thrift_transport=None)
hiveconn = hive.Connection(host='10.0.1.18', port=10000, username='hdfs', database='aiops')

# 使用连接的cursor()方法获取一个游标对象
hivecur = hiveconn.cursor()

# 使用游标对象的execute()方法进行执行hivesql语句
## execute(self, operation, parameters=None, **kwargs)
hivecur.execute("show databases")
## executemany(self, operation, seq_of_parameters) method of pyhive.hive.Cursor instance 参数是一个序列
hivecur.executemany()

# 使用游标对象的fetch类方法获取执行结果(fetchone和fetchall以及fetchmany)
onedata = hivecur.fetchone()
alldata = hivecur.fetchall()
## fetchmany(self, size=None) method of pyhive.hive.Cursor instance
manydata = hivecur.fetchmany()

# 关闭cursor游标对象和conn连接对象
hivecur.close()
hiveconn.close()

# hive的回滚操作
hiveconn.rollback()

尝试用python脚本进行数据库操作

1. 数据库查询操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 首先我们使用pyhive库链接hive并查看指定数据库下的表
$ cat pyhive_test.py
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
from pyhive import hive

class hiveObj:
    def __init__(self,host,user,dbname=u'default',port=10000):
        self.host = host
        self.dbname = dbname
        self.user = user
        self.port = port
    def hiveConIns(self):
        conn = hive.Connection(host=self.host, port=self.port, username=self.user, database=self.dbname)
        return conn
    #通常查询个别数量的数据建议在sql中进行优化,可以仅使用cursor的fetchall()方法进行批量操作
    def querydata(self,sql,args=None):
        conn = self.hiveConIns()
        cur = conn.cursor()
        cur.execute(sql,args)
        alldata = cur.fetchall()
        cur.close()
        #cur.fetch类方法返回一个[tuple,tuple]
        for data in alldata:
            print(data)
        conn.close()

if __name__ == '__main__':
    #默认database为default,默认port为10000
    hiveobj = hiveObj("10.0.1.18","hdfs")
    #查询数据
    sql = '''show tables'''
    hiveobj.querydata(sql)

$ python pyhive_test.py
(u'asset',)

2. 数据库更新操作

思考:其实数据库可以分为两种操作(读和写),一种为单纯的查询操作,不会对库表结构和数据造成变更,也即为读操作;另外一种为写操作,会对库表结构和数据造成的变更操作,也即写操作.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# 给我们的hiveObj类增加一个写数据操作方法
$ cat pyhive_test.py
....
....
    def changedata(self,sql,args=None):
        conn = self.hiveConIns()
        cur = conn.cursor()
        try:
            #做一个粗暴的判断当args是list时就进行批量插入
            if isinstance(args,list):
                #executemany(sql,args)方法args支持tuple或者list类型
                cur.executemany(sql,args)
            else:
                #execute(sql,args)方法args支持string,tuple,list,dict
                cur.execute(sql,args)
            conn.commit()
        except Exception as e:
            #因为hive不支持事务,因此虽然提供了rollback()但是是没用的
            #conn.rollback()
            print(e)
        finally:
            cur.close()
            conn.close()



# 使用创建表来模拟库表变更(实际上库的变更操作应该由专业的管理员进行审核后操作)
if __name__ == '__main__':
    #默认database为default,默认port为10000
    hiveobj = hiveObj("10.0.1.18","hdfs")
    #查询数据
    sql = '''show tables'''
    hiveobj.querydata(sql)

    #hive库表变更操作
    tabledesc = '''
     create table appinfo (
        appname string,
        level string,
        leader string,
        dep string,
        ips  array<string>)
     ROW FORMAT DELIMITED
     FIELDS TERMINATED BY '|'
     COLLECTION ITEMS TERMINATED BY ','
    '''
    print("creating a table....")
    hiveobj.changedata(tabledesc)
    hiveobj.querydata(sql)

$ python pyhive_test.py
(u'asset',)
creating a table....
(u'appinfo',)
(u'asset',) 

3. 进行数据加载和读取操作

注意:上面其实我们已经封装了两个抽象的读写方法,可以对hive表进行数据加载和读取操作了

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 假如我们的hdfs上已经存在一份如下结构化的数据
$ hdfs dfs -cat /ips.txt;
data-web|p0|bgbiao|ops1|10.0.0.1,10.0.0.2
data-api|p0|biaoge|sre1|192.168.0.1,192.168.0.2
data-models|p1|xxbandy|sre1|10.0.0.3,192.168.0.3

$ cat pyhive_test.py
...
...
if __name__ == '__main__':
		#首先进行将hdfs中的数据加载到appinfo表中,加载完成后查询appinfo表
    sql1 = "load data  inpath 'hdfs://hdfs-name/ips.txt' overwrite into table appinfo"
    hiveobj.changedata(sql1)
    hiveobj.querydata('select * from appinfo')

$ python pyhive_test.py
(u'data-web', u'p0', u'bgbiao', u'ops1', u'["10.0.0.1","10.0.0.2"]')
(u'data-api', u'p0', u'biaoge', u'sre1', u'["192.168.0.1","192.168.0.2"]')
(u'data-models', u'p1', u'xxbandy', u'sre1', u'["10.0.0.3","192.168.0.3"]')

# 接下来我们对上述表进行一个拆分查询
$ cat pyhive_test.py
...
...
if __name__ == '__main__':
    #对array对象中的元素进行遍历查询
    sql = "select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
    hiveobj.querydata(sql)

# 这样子我们就知道每个ip对应的关联关系了
$ python pyhive_test.py
(u'10.0.0.1', u'data-web', u'bgbiao', u'ops1')
(u'10.0.0.2', u'data-web', u'bgbiao', u'ops1')
(u'192.168.0.1', u'data-api', u'biaoge', u'sre1')
(u'192.168.0.2', u'data-api', u'biaoge', u'sre1')
(u'10.0.0.3', u'data-models', u'xxbandy', u'sre1')
(u'192.168.0.3', u'data-models', u'xxbandy', u'sre1')

# 临时表的创建和使用
    #对array对象中的元素进行遍历查询[临时表的创建第一次必须使用create table name as select ],更新数据需要使用[insert into|overwrite table name select] into是追加数据,overwrite是覆盖数据
    #sql = "create  table tmpapp as select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
    #sql = "insert into table tmpapp select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
    sql = "insert overwrite table tmpapp select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
    hiveobj.changedata(sql)
    hiveobj.querydata('select * from tmpapp limit 1')

4. 源码文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
$ cat pyhive_test.py
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
from pyhive import hive

class hiveObj:
    def __init__(self,host,user,dbname=u'default',port=10000):
        self.host = host
        self.dbname = dbname
        self.user = user
        self.port = port
    def hiveConIns(self):
        conn = hive.Connection(host=self.host, port=self.port, username=self.user, database=self.dbname)
        return conn
    #通常查询个别数量的数据建议在sql中进行优化,可以仅使用cursor的fetchall()方法进行批量操作
    def querydata(self,sql,args=None):
        conn = self.hiveConIns()
        cur = conn.cursor()
        cur.execute(sql,args)
        alldata = cur.fetchall()
        cur.close()
        #cur.fetch类方法返回一个[tuple,tuple]
        for data in alldata:
            print(data)
        conn.close()
    #注意:hivesql的execute类方法的args是执行过程的参数,而不是sql的参数.比如cursor.execute('SELECT * FROM my_awesome_data LIMIT 10', async=True)表示异步执行
    def changedata(self,sql,args=None):
        conn = self.hiveConIns()
        cur = conn.cursor()
        try:
            #做一个粗暴的判断当args是list时就进行批量插入
            if isinstance(args,list):
                #executemany(sql,args)方法args支持tuple或者list类型
                cur.executemany(sql,args)
            else:
                #execute(sql,args)方法args支持string,tuple,list,dict
                cur.execute(sql,args)
            conn.commit()
        except Exception as e:
            #因为hive不支持事务,因此虽然提供了rollback()但是是没用的
            #conn.rollback()
            print(e)
        finally:
            cur.close()
            conn.close()

if __name__ == '__main__':
    #默认database为default,默认port为10000
    hiveobj = hiveObj("10.0.1.18","hdfs")
    '''
    #查询数据
    sql = "show tables"
    hiveobj.querydata(sql)

    #hive创建表
    tabledesc = "create table appinfo (appname string,level string,leader string,dep string,ips  array<string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' COLLECTION ITEMS TERMINATED BY ',' "
    print("creating a table....")
    hiveobj.changedata(tabledesc)
    hiveobj.querydata(sql)

    #插入数据
    sql1 = "load data  inpath 'hdfs://hdfs-name/ips.txt' overwrite into table appinfo"
    hiveobj.changedata(sql1)
    hiveobj.querydata('select * from appinfo')
    '''
    #对array对象中的元素进行遍历查询[临时表的创建第一次必须使用create table name as select ],更新数据需要使用[insert into|overwrite table name select] into是追加数据,overwrite是覆盖数据
    #sql = "create  table tmpapp as select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
    #sql = "insert into table tmpapp select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
    sql = "insert overwrite table tmpapp select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
    hiveobj.changedata(sql)
    hiveobj.querydata('select * from tmpapp limit 1')