1. 数据库读写
数据库是最常用的通过socket连接的软件,多数时候我们写服务,做分析写算法的数据来源都来自数据库,而结果也往往需要放入数据库.最常见的数据库是关系数据库,像标准库自带的sqlite,常见的postgresql,mysql就是关系数据库他们使用统一的操作语言SQL语言进行操作,但不同的数据库对SQL语言的支持并不完全一样.而像hive这样的实现了部分SQL语句的数仓也可以看做是这类数据库的一个扩展.
另一类是非关系数据库,那就比较多样了,比较常见的大致3类:
- 以redis为代表的键值数据库
- 以mongodb为代表文档数据库
- 以influxdb为代表的时间序列数库
- 以neo4j为代表的图数据库
这些数据库一般并不通用,而是在特定情境下有较大作用,我会介绍我用过的,没用过的也就不介绍了.
1.1. 关系数据库
关系数据库本身接口几乎是一致的,这边以postgresql为例介绍,本文测试的pg使用docker部署.
1.1.1. 同步接口的关系数据库
常见的同步接口关系数据库如下:
| 对应数据库 | 包 | 
|---|---|
| sqlite | sqlite3标准库 | 
| postgresql | psycopg2 | 
| mysql | pymysql | 
| mssql | pymssql | 
| hive | pyhive | 
同步接口的关系数据库都是差不多的使用方法
- 先创建连接
- 创建一个游标
- 使用游标对象的.execute(sql)接口写入SQL语句
- 使用连接对象的.commit()接口提交sql语句
- 使用游标对象的.fetchall()接口获取结果
- 使用连接对象的.close()方法关闭连接
import psycopg2 dsn = "host=localhost port=5432 dbname=test user=postgres password=postgres" sql = '''     SELECT          column_name, table_name, data_type      FROM information_schema.columns     WHERE table_schema='public' and table_name='company';''' with psycopg2.connect(dsn) as conn:     c = conn.cursor()     c.execute('''CREATE TABLE IF NOT EXISTS company            (id INT PRIMARY KEY     NOT NULL,            name           TEXT    NOT NULL,            age            INT     NOT NULL,            address        CHAR(50),            salary         REAL);''')     conn.commit()     c.execute(sql)     result = c.fetchall() print(result) [('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')] 使用peewee做orm
orm是工程上常用的关系型数据库使用方式,使用orm可以让数据库访问这个动作面相对象,获得的数据以及针对数据的操作更加直观,但同时因为毕竟是一层包装,所以也会损失一些性能,而且因为是面向对象操作所以牺牲了灵活性.因此比较适合在业务逻辑上使用(OLTP),对于数据处理的场景(OLAP),
通常我个人比较喜欢使用peewee这个orm.我常用的特性有:
- 使用数据库的url访问数据库
- 在未知数据库路径配置的情况下使用代理对象建立映射
- 在未知表结构只知道表名的情况下获取表对象
- 在未知表是否存在的情况下安全的建表
- 使用上下文语法定义事务
- 使用迭代器访问多条数据
peewee支持的数据有:
- mysql
- postgresql
- sqlite
使用playhouse.db_url.connect的schema可以是:
- apsw: APSWDatabase
- mysql: MySQLDatabase
- mysql+pool: PooledMySQLDatabase
- postgres: PostgresqlDatabase
- postgres+pool: PooledPostgresqlDatabase
- postgresext: PostgresqlExtDatabase
- postgresext+pool: PooledPostgresqlExtDatabase
- sqlite: SqliteDatabase
- sqliteext: SqliteExtDatabase
- sqlite+pool: PooledSqliteDatabase
- sqliteext+pool: PooledSqliteExtDatabase
定义表对象
from peewee import Proxy,Model,CharField,DateField  db = Proxy()  class Person(Model):     name = CharField()     birthday = DateField()      class Meta:         database = db # This model uses the "people.db" database. 连接数据库
from playhouse.db_url import connect database = connect("postgres+pool://postgres:postgres@localhost:5432/test") 使用sql语句
with database:         # with samh.execute_sql("DESC cartoon") as cursor:         #     scheme = cursor.fetchall()         #names = [i[0] for i in  scheme]     with database.execute_sql("""         SELECT column_name, table_name, data_type          FROM information_schema.columns         WHERE table_schema='public' and table_name='company';         """) as cursor:         result = cursor.fetchall()     print(result) [('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')] 映射Person并创建表
db.initialize(database) db.create_tables([Person],safe=True) with database:         # with samh.execute_sql("DESC cartoon") as cursor:         #     scheme = cursor.fetchall()         #names = [i[0] for i in  scheme]     with database.execute_sql("""         SELECT column_name, table_name, data_type          FROM information_schema.columns         WHERE table_schema='public' and table_name='person';         """) as cursor:         result = cursor.fetchall()     print(result) [('id', 'person', 'integer'), ('name', 'person', 'character varying'), ('birthday', 'person', 'date')] 未知表结构的情况下获取表对象
from playhouse.reflection import generate_models COMPANY = generate_models(database).get("company") [('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')] 0[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')] 1插入多条数据
[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')] 2读取多条数据
[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')] 3[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')] 41.1.2. 异步接口的关系数据库
常见的异步接口关系数据库如下:
| 对应数据库 | 包 | 
|---|---|
| sqlite | aiosqlite | 
| postgresql | aiopg | 
| mysql | aiomysql | 
[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')] 5[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')] 使用peewee_async将peewee变成异步orm
peewee是基于同步接口的,而异步语法具有传染性,如果使用peewee就会阻塞,好在有一个包peewee_async为我们做好了将其异步化的工作,需要注意的是目前这个包默认安装使用的是peewee 2,而要使用peewee3需要指定版本安装,0.6.0a是一个可以使用的版本
这个包支持的数据库有:
- mysql
- postgresql
使用playhouse.db_url.connect的schema可以是:
- postgres+async
- postgres+pool+async
- mysql+async
- mysql+pool+async
[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')] 7[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')] 8[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')] 9from peewee import Proxy,Model,CharField,DateField  db = Proxy()  class Person(Model):     name = CharField()     birthday = DateField()      class Meta:         database = db # This model uses the "people.db" database. 0peewee-async的一处bug
至少在在0.6.0a版本peewee-async有一处bug,就是无法设置connect_timeout这个参数无法设置,我们可以为其打个猴子补丁
from peewee import Proxy,Model,CharField,DateField  db = Proxy()  class Person(Model):     name = CharField()     birthday = DateField()      class Meta:         database = db # This model uses the "people.db" database. 1from peewee import Proxy,Model,CharField,DateField  db = Proxy()  class Person(Model):     name = CharField()     birthday = DateField()      class Meta:         database = db # This model uses the "people.db" database. 2from peewee import Proxy,Model,CharField,DateField  db = Proxy()  class Person(Model):     name = CharField()     birthday = DateField()      class Meta:         database = db # This model uses the "people.db" database. 31.2. 键值对内存数据库Redis
除了传统关系型数据库,业务上最常见的恐怕就是redis了.redis实际上分为两种:
- 单机模式 其默认端口为6379
- 集群模式
这两者在使用上并不完全一样,集群模式无法使用需要全局扫key的操作,比如keys这种.
Redis的命令很多这边不做过多介绍,可以看官方文档.redis支持5种数据结构
- 字符串
- 列表
- 哈希表(python中的字典)
- 集合
- 有序集合
他们具体的操作可以看这个文档
redis因为其带着数据结构所以有不少邪道用法,具体的可以看我的这篇博客
1.2.1. Redis的同步接口
单机版本Redis使用包redis-py来连接,它自带一个连接池.需要注意的是从redis中取出的值时bytes类型
from peewee import Proxy,Model,CharField,DateField  db = Proxy()  class Person(Model):     name = CharField()     birthday = DateField()      class Meta:         database = db # This model uses the "people.db" database. 4from peewee import Proxy,Model,CharField,DateField  db = Proxy()  class Person(Model):     name = CharField()     birthday = DateField()      class Meta:         database = db # This model uses the "people.db" database. 5from peewee import Proxy,Model,CharField,DateField  db = Proxy()  class Person(Model):     name = CharField()     birthday = DateField()      class Meta:         database = db # This model uses the "people.db" database. 6from peewee import Proxy,Model,CharField,DateField  db = Proxy()  class Person(Model):     name = CharField()     birthday = DateField()      class Meta:         database = db # This model uses the "people.db" database. 7集群版本的redis需要使用redis-py-cluster来访问,需要注意的是目前它依赖于2.0版本的redis-py
from peewee import Proxy,Model,CharField,DateField  db = Proxy()  class Person(Model):     name = CharField()     birthday = DateField()      class Meta:         database = db # This model uses the "people.db" database. 81.2.2. Redis异步接口
在异步接口方面redis有两个比较好的包:
- aioredis 用的最多的一个包,但目前只支持单机redis 
- aredis 一个用C包 - aredis封装的异步redis客户端,接口很丰富性能也强,作者是个国人,支持单机redis和集群,但用的人相对少而且由于是个人开发所以更新不算频繁
aioredis
from peewee import Proxy,Model,CharField,DateField  db = Proxy()  class Person(Model):     name = CharField()     birthday = DateField()      class Meta:         database = db # This model uses the "people.db" database. 9from playhouse.db_url import connect database = connect("postgres+pool://postgres:postgres@localhost:5432/test") 0from peewee import Proxy,Model,CharField,DateField  db = Proxy()  class Person(Model):     name = CharField()     birthday = DateField()      class Meta:         database = db # This model uses the "people.db" database. 7aredis
aredis使用StrictRedis类连接单机redis,使用StrictRedisCluster连接redis集群,其他的操作都是一样的
from playhouse.db_url import connect database = connect("postgres+pool://postgres:postgres@localhost:5432/test") 2from playhouse.db_url import connect database = connect("postgres+pool://postgres:postgres@localhost:5432/test") 0from peewee import Proxy,Model,CharField,DateField  db = Proxy()  class Person(Model):     name = CharField()     birthday = DateField()      class Meta:         database = db # This model uses the "people.db" database. 71.3. 时间序列数库influxdb
influxdb是目前最流行的时间序列数据库,它支持类似sql语言InfluxQL的特殊语法进行操作,也可以使用http接口发起请求,因此简单好用.
influxdb默认端口为8086
influxdb的同步接口
influxdb同步接口可以使用包influxdb
from playhouse.db_url import connect database = connect("postgres+pool://postgres:postgres@localhost:5432/test") 5from playhouse.db_url import connect database = connect("postgres+pool://postgres:postgres@localhost:5432/test") 6from playhouse.db_url import connect database = connect("postgres+pool://postgres:postgres@localhost:5432/test") 7influxdb的异步接口
异步接口使用aioinflux它其实只是封装了influxdb的RESTful接口.但个人认为用起来更好用
from playhouse.db_url import connect database = connect("postgres+pool://postgres:postgres@localhost:5432/test") 8from playhouse.db_url import connect database = connect("postgres+pool://postgres:postgres@localhost:5432/test") 9from playhouse.db_url import connect database = connect("postgres+pool://postgres:postgres@localhost:5432/test") 6with database:         # with samh.execute_sql("DESC cartoon") as cursor:         #     scheme = cursor.fetchall()         #names = [i[0] for i in  scheme]     with database.execute_sql("""         SELECT column_name, table_name, data_type          FROM information_schema.columns         WHERE table_schema='public' and table_name='company';         """) as cursor:         result = cursor.fetchall()     print(result) 11.4. 图数据库ArangoDB
arangodb是一个开源的图数据库,它支持一种类似SQL的语法AQL同时也可以使用RESTful接口请求.
ArangoDB默认端口为8529,自带一个相当美观好用的web服务,我们可以在其上进行很多操作.
arangodb的同步接口
arangodb只有封装好的同步接口python-arango
with database:         # with samh.execute_sql("DESC cartoon") as cursor:         #     scheme = cursor.fetchall()         #names = [i[0] for i in  scheme]     with database.execute_sql("""         SELECT column_name, table_name, data_type          FROM information_schema.columns         WHERE table_schema='public' and table_name='company';         """) as cursor:         result = cursor.fetchall()     print(result) 2from peewee import Proxy,Model,CharField,DateField  db = Proxy()  class Person(Model):     name = CharField()     birthday = DateField()      class Meta:         database = db # This model uses the "people.db" database. 5arangodb可以像一般文档数据库一样使用
with database:         # with samh.execute_sql("DESC cartoon") as cursor:         #     scheme = cursor.fetchall()         #names = [i[0] for i in  scheme]     with database.execute_sql("""         SELECT column_name, table_name, data_type          FROM information_schema.columns         WHERE table_schema='public' and table_name='company';         """) as cursor:         result = cursor.fetchall()     print(result) 4with database:         # with samh.execute_sql("DESC cartoon") as cursor:         #     scheme = cursor.fetchall()         #names = [i[0] for i in  scheme]     with database.execute_sql("""         SELECT column_name, table_name, data_type          FROM information_schema.columns         WHERE table_schema='public' and table_name='company';         """) as cursor:         result = cursor.fetchall()     print(result) 5with database:         # with samh.execute_sql("DESC cartoon") as cursor:         #     scheme = cursor.fetchall()         #names = [i[0] for i in  scheme]     with database.execute_sql("""         SELECT column_name, table_name, data_type          FROM information_schema.columns         WHERE table_schema='public' and table_name='company';         """) as cursor:         result = cursor.fetchall()     print(result) 6from peewee import Proxy,Model,CharField,DateField  db = Proxy()  class Person(Model):     name = CharField()     birthday = DateField()      class Meta:         database = db # This model uses the "people.db" database. 5


 
		 
		 
		 
		

还没有评论,来说两句吧...