前言: 最近做内部运维数据的数据仓库,最终将Hive中的数据清洗后需要业务决策相关的数据进行结构化处理,并存储到关系型数据库MySQL中,以供后期对外接口使用。本篇简单记录下使用Python操作MySQL数据库的简单操作。

MySQL数据库环境准备

注意:在当前容器化基础设施已经全面覆盖的时代,为了快速验证效果,我们及其推荐使用以Docker为代表的容器化基础设施来快速构建你的基础环境。

DockerHub上有丰富的基础中间件的镜像,我们可以使用Docker快速的构建我们的MySQL基础环境,而不必每次重新安装各种复杂的中间件环境,因为我们只是使用者,我相信每个团队都会有专门的中间件维护者。好吧,如果没有,那你依然可以自己根据实际的需求和标准进行构建Docker镜像,这样就为我们创造了一个未来很长一段时间可复用的组件。总之,想说的一件事就是,下面的MySQL环境是用Docker容器跑的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# 确保docker环境正常
$ docker ps
CONTAINER ID        IMAGE               COMMAND             CREATED             STATUS              PORTS               NAMES

# 下载MySQL指定版本的镜像
$ docker pull mysql:5.6
$ docker images | grep mysql
mysql                                     5.6                 d1f491b20727        2 days ago          256 MB

# 创建一个mysql实例[需要指定至少一个环境变量:MYSQL_ROOT_PASSWORD, MYSQL_ALLOW_EMPTY_PASSWORD and MYSQL_RANDOM_ROOT_PASSWORD]
$ docker run -itd --name mysql -e MYSQL_ROOT_PASSWORD="123456"  -P mysql:5.6
6c4428b341516c7eeec48cbc5b658a464f76b5f7d42b3e689151392f5cd8ac56

# MySQL密码为123456,端口为32773
$ docker ps
CONTAINER ID        IMAGE                                          COMMAND                  CREATED             STATUS              PORTS                                               NAMES
6c4428b34151        mysql:5.6                                      "docker-entrypoint.sh"   5 seconds ago       Up 3 seconds        0.0.0.0:32773->3306/tcp                             mysql

# mysql数据库登录测试
$ mysql -h 127.0.0.1 -uroot -P 32773 -p123456
...
Server version: 5.6.42 MySQL Community Server (GPL)
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
...
mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| mysql              |
| performance_schema |
+--------------------+
3 rows in set (0.00 sec)

# 查看数据库字符编码格式
mysql> show variables like '%char%';
+--------------------------+----------------------------+
| Variable_name            | Value                      |
+--------------------------+----------------------------+
| character_set_client     | utf8                       |
| character_set_connection | utf8                       |
| character_set_database   | utf8                       |
| character_set_filesystem | binary                     |
| character_set_results    | utf8                       |
| character_set_server     | utf8                       |
| character_set_system     | utf8                       |
| character_sets_dir       | /usr/share/mysql/charsets/ |
+--------------------------+----------------------------+
8 rows in set (0.00 sec)

至此,一个MySQL数据库已经准备好,需要我们注意的是,因为使用的是官方的Docker image,我们需要进行相关配置的检查和设置,否则可能会为后期的操作造成一定麻烦,比如设置数据库的字符编码.
让MySQL支持中文,一般而言需要关注以下几个点:

    1. 修改MySQL配置中客户端和服务端的字符编码为utf8,分别为[mysqld的default-character-set和character-set-server参数以及client的default-character-set参数]
    1. 创建表时指定表的字符编码(default charset=utf8;)
    1. 链接数据库的时候指定链接字符编码(charset=utf8)
    1. 使用Python操作数据库时需要对Python文件进行utf8支持(#encoding=utf-8和sys.setdefaultencoding(utf-8))
    1. 使用 show variables like '%char%';命令检查mysql字符集是否为utf8格式,并使用SET NAMES UTF8; 或者set character_set_server = utf8;进行设置

使用Python进行操作MySQL

首先,在使用之前我们需要对Python版的MySQL库有一个了解,当前主流的库有MySQLdb,PyMySQLSQLAlchemy.

  • MySQLdb:一般是Linux系统发行版中默认支持的,通常包名为Python-MySQL,核心由C语言打造,接口精炼,性能最棒,缺点是环境依赖较多,安装复杂,近两年已停止更新,只支持Python2,不支持Python3
  • PyMySQL:纯python打造,接口与Python-MySQL兼容,安装方便,支持Python3
  • SQLAlchemy: 一个ORM框架,它并不提供底层的数据库操作,而是要借助于MySQLdb、PyMySQL等第三方库来完成,目前SQLAlchemy在Web编程领域应用广泛 备注:其实还有类似mysqlclient之类的库,主要集成在一些web框架中作为依赖

由于为了快速实现业务逻辑,在接下来的操作中主要使用PyMySQL库进行操作数据库,虽然性能不及MySQLdb,但是可以使用pymysql.install_as_MySQLdb()来兼容MySQLdb,在业务正式上线时可以不改变业务代码逻辑而平滑的使用MySQLdb库。

安装pymysql库

Linux环境下,大多数系统工具使用Python语言进行编写,因此在安装额外的Python模块时,通常会有几种选择:

    1. 使用系统自带工具安装apt-get install or yum install,该种方式会将模块默认安装的系统环境,可能会影响系统环境
    1. 使用Python原声的包管理工具pip install ,该种方式会默认安装到pip命令所在的Python解释环境下,因此取决于Python环境是否独立于系统环境的Python,通常情况下会使用pyenv之类的工具进行环境隔离
    1. 使用包管理工具conda相关工具进行管理python,可以有效管理python多环境依赖,并且可以很方便构建数据科学相关环境.conda使用指南
1
2
3
4
# 安装pymysql库
$ pip install pymysql
or 
$ conda install pymysql

python链接MySQL以及基本使用

使用pymysql库操作mysql

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ cat pytest_mysql.py
import pymysql
# 获取一个mysql链接对象
conn = pymysql.connect(host=host, port=port, user=user, passwd=passwd, db=db, charset='utf8')
# 使用数据库链接的cursor()方法获取一个游标对象
cursor = conn.cursor()
# 使用游标对象的execute()方法进行执行sql语句
cursor.execute("SELECT VERSION()")
## execute方法的定义如下,其中args可以是tuple, list or dict,如果是list or tuple的话,%s会被当做查询的一个占位符;如果是dict的话%(name)s会被当做一个占位符
## execute(self, query, args=None) 


# 使用游标对象的fetch类方法获取数据
## fetchone返回一条数据,fetchall返回查询的所有数据。fetch类方法会返回一个list类型的tuple结构类型对象.[(),()...]
onedata = cursor.fetchone()
alldata = cursor.fetchall()

# 提交数据库操作[一般在更新数据库操作时需要注意执行]
conn.commit()

# 及时关闭数据库链接以及打开的游标[以防止在并发情况下系统打开连接数过多]
cursor.close()
conn.close()

尝试用python脚本进行数据库操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
$ cat test_show_table.py
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import pymysql

class mysqlObj:
    def __init__(self,host,dbname,user,passwd,port=3306):
        self.host = host
        self.dbname = dbname
        self.user = user
        self.passwd = passwd
        self.port = port
    def mysqlConIns(self):
        conn = pymysql.connect(host=self.host, port=self.port, user=self.user, passwd=self.passwd, db=self.dbname, charset='utf8')
        return conn
    def querydata(self,sql,args=None):
        conn = self.mysqlConIns()
        cur = conn.cursor()
        cur.execute(sql,args)
        alldata = cur.fetchall()
        cur.close()
        for data in alldata:
            print(data)
        conn.close()


if __name__ == '__main__':
    mysqlobj = mysqlObj('localhost','mysql','root','123456',32773)
    mysqlobj.querydata("show tables;")

# 对mysql库进行查看tables操作,返回的是一个tuple
$ python test_show_table.py
(u'columns_priv',)
(u'db',)
(u'event',)
(u'func',)
(u'general_log',)
(u'help_category',)
(u'help_keyword',)
(u'help_relation',)
(u'help_topic',)
(u'innodb_index_stats',)
(u'innodb_table_stats',)
(u'ndb_binlog_index',)
(u'plugin',)
(u'proc',)
(u'procs_priv',)
(u'proxies_priv',)
(u'servers',)
(u'slave_master_info',)
(u'slave_relay_log_info',)
(u'slave_worker_info',)
(u'slow_log',)
(u'tables_priv',)
(u'time_zone',)
(u'time_zone_leap_second',)
(u'time_zone_name',)
(u'time_zone_transition',)
(u'time_zone_transition_type',)
(u'user',)

MySQL数据库常用的一些操作

注意:在之前我们创建的MySQL实例中仅是一个空的数据库,在实际使用之前,我们需要进行数据库的库表结构创建,以及相关的数据库授权,而这一部分操作通常会由专业的数据库管理员(DBA)进行操作和处理

接下来对一个website数据库和use表进行操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
mysql> create database website;
Query OK, 1 row affected (0.00 sec)
mysql> CREATE TABLE  IF NOT EXISTS useinfo (userid int(10) primary key not null auto_increment,username varchar(20) not null,usersite varchar(50),other varchar(50)) DEFAULT CHARSET=utf8;
Query OK, 0 rows affected (0.02 sec)
mysql> describe useinfo;
+----------+-------------+------+-----+---------+----------------+
| Field    | Type        | Null | Key | Default | Extra          |
+----------+-------------+------+-----+---------+----------------+
| userid   | int(10)     | NO   | PRI | NULL    | auto_increment |
| username | varchar(20) | NO   |     | NULL    |                |
| usersite | varchar(50) | YES  |     | NULL    |                |
| other    | varchar(50) | YES  |     | NULL    |                |
+----------+-------------+------+-----+---------+----------------+
4 rows in set (0.00 sec)

# 数据库授权[授权所有的主机可以以root用户,123456的密码去操作website库]
mysql> grant all on website.* to root@'%' identified by "123456";
Query OK, 0 rows affected (0.00 sec)

对指定数据库进行相关查询操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# 依然是上面那个test_show_table.py脚本,不过我们改变一下`__main__`
$ cat test_show_table.py
.....
if __name__ == '__main__':
    mysqlobj = mysqlObj('localhost','website','root','123456',32773)
    args = ["show tables;","describe useinfo;"]
    for arg in args:
        mysqlobj.querydata(arg)

# 可以看到我们的website库下有useinfo一张表,并且该表包含userid,username,usersite,other4个字段
$ python test_show_table.py
(u'useinfo',)
(u'userid', u'int(10)', u'NO', u'PRI', None, u'auto_increment')
(u'username', u'varchar(20)', u'NO', u'', None, u'')
(u'usersite', u'varchar(20)', u'YES', u'', None, u'')
(u'other', u'varchar(20)', u'YES', u'', None, u'')

对MySQL数据库进行插入操作

注意:插入操作分为单条记录插入和批量插入,一般数据库都支持批量插入方法,在pysql中为cursor.executemany(sql,args) 为我们的mysqlObj类增加一个插入操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
$ cat test_show_table.py
....
....
    #注意插入数据时单条记录使用tuple()类型;批量插入数据时使用list()类型
    def changedata(self,sql,args=None):
        conn = self.mysqlConIns()
        cur = conn.cursor()
        try:
            #做一个粗暴的判断当args是list时就进行批量插入
            if isinstance(args,list):
                #executemany(sql,args)方法args支持tuple或者list类型
                cur.executemany(sql,args)
            else:
                #execute(sql,args)方法args支持string,tuple,list,dict
                cur.execute(sql,args)
            conn.commit()
        except Exception as e:
            conn.rollback()
            print(e)
        finally:
            cur.close()
            conn.close()
....
...
if __name__ == '__main__':
    mysqlobj = mysqlObj('localhost','website','root','123456',32773)
    '''
    args = ["show tables;","describe useinfo;"]
    for arg in args:
        mysqlobj.querydata(arg)
    '''
    #插入一条数据
    sql = "insert into  useinfo (username) values(%s)"
    arg = "彪哥"
    mysqlobj.changedata(sql,arg)
    sql1 = "insert into  useinfo (username,usersite) values(%s,%s)"
    arg1 = ("xxbandy","http://xxbandy.github.io")
    mysqlobj.changedata(sql1,arg1)
    #批量插入数据

    argslist = [("彪哥","http://xxbandy.github.io"),("bgbiao","https://www.jianshu.com/u/9c46ece5b7bd")]
    mysqlobj.changedata(sql1,argslist)
    #查询数据
    mysqlobj.querydata("select * from useinfo")
	
		print("updating the data")
		#更新数据[需要注意的是指定了字段之后由于usersite是varchar类型,占位符必须是"%s",如果是'%s'会有问题]
    data = "https://my.oschina.net/xxbAndy"
    mysqlobj.changedata('update useinfo set usersite=%s where userid = 1',data)
    mysqlobj.querydata("select * from useinfo")

# 插入数据并查看数据
$ python /tmp/abc.py
(1, u'\u5f6a\u54e5', None, None)
(2, u'xxbandy', u'http://xxbandy.github.io', None)
(3, u'\u5f6a\u54e5', u'http://xxbandy.github.io', None)
(4, u'bgbiao', u'https://www.jianshu.com/u/9c46ece5b7bd', None)
updating the data
(1, u'\u5f6a\u54e5', u'https://my.oschina.net/xxbAndy', None)
(2, u'xxbandy', u'http://xxbandy.github.io', None)
(3, u'\u5f6a\u54e5', u'http://xxbandy.github.io', None)
(4, u'bgbiao', u'https://www.jianshu.com/u/9c46ece5b7bd', None)

# 数据库查询记录
mysql> select * from website.useinfo;
+--------+----------+----------------------------------------+-------+
| userid | username | usersite                               | other |
+--------+----------+----------------------------------------+-------+
|      1 | 彪哥   | https://my.oschina.net/xxbAndy         | NULL  |
|      2 | xxbandy  | http://xxbandy.github.io               | NULL  |
|      3 | 彪哥   | http://xxbandy.github.io               | NULL  |
|      4 | bgbiao   | https://www.jianshu.com/u/9c46ece5b7bd | NULL  |
+--------+----------+----------------------------------------+-------+

源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import pymysql

class mysqlObj:
    def __init__(self,host,dbname,user,passwd,port=3306):
        self.host = host
        self.dbname = dbname
        self.user = user
        self.passwd = passwd
        self.port = port
    def mysqlConIns(self):
        conn = pymysql.connect(host=self.host, port=self.port, user=self.user, passwd=self.passwd, db=self.dbname, charset='utf8')
        return conn
    def querydata(self,sql,args=None):
        conn = self.mysqlConIns()
        cur = conn.cursor()
        cur.execute(sql,args)
        alldata = cur.fetchall()
        cur.close()
        for data in alldata:
            print(data)
        conn.close()
    #注意插入数据时单条记录使用tuple()类型;批量插入数据时使用list()类型
    def changedata(self,sql,args=None):
        conn = self.mysqlConIns()
        cur = conn.cursor()
        try:
            #做一个粗暴的判断当args是list时就进行批量插入
            if isinstance(args,list):
                #executemany(sql,args)方法args支持tuple或者list类型
                cur.executemany(sql,args)
            else:
                #execute(sql,args)方法args支持string,tuple,list,dict
                cur.execute(sql,args)
            conn.commit()
        except Exception as e:
            conn.rollback()
            print(e)
        finally:
            cur.close()
            conn.close()



if __name__ == '__main__':

    mysqlobj = mysqlObj('localhost','website','root','123456',32773)
    '''
    args = ["show tables;","describe useinfo;"]
    for arg in args:
        mysqlobj.querydata(arg)
    '''
    #插入一条数据
    sql = "insert into  useinfo (username) values(%s)"
    arg = "彪哥"
    mysqlobj.changedata(sql,arg)
    sql1 = "insert into  useinfo (username,usersite) values(%s,%s)"
    arg1 = ("xxbandy","http://xxbandy.github.io")
    mysqlobj.changedata(sql1,arg1)
    #批量插入数据

    argslist = [("彪哥","http://xxbandy.github.io"),("bgbiao","https://www.jianshu.com/u/9c46ece5b7bd")]
    mysqlobj.changedata(sql1,argslist)


    #查询数据
    mysqlobj.querydata("select * from useinfo")
    print("updating the data")
    #更新数据
    data = "https://my.oschina.net/xxbAndy"
    mysqlobj.changedata('''update useinfo set usersite=%s where userid = 1''',data)
    mysqlobj.querydata("select * from useinfo")