SQLAlchemy 内容简要

0 评论
/ /
1379 阅读
/
9449 字
01 2018-11

1. Introduction

1.1 create engine

from sqlalchemy import create_engine
engine = create_engine('postgresql+psycopg2://username:password@localhost:' \
                       '5432/mydb')
  • pool_recycle
    多久后recycle连接, 一般mysql在检测到一个连接8小时没有活动时自动断开. 默认-1, 不recycle.
  • pool_size
    连接池大小, 默认5. 不使用连接池, 可以把pool_class设置sqlalchemy.pool.NullPool
  • pool_class
    a. QueuePool
    非sqllite情况下, 默认的连接池. 实现对打开连接数的限制.
    b. SingletonThreadPool
    为每一个线程维护一个连接. 当连接数量超过pool_size时, 可能在任意时刻调用close方法. 时间不确定, 且不论连接线程是否在使用当前连接.
    因此建议只用于使用sqllite内存db的测试环境. 不建议用于生产环境.
  • echo
    打印engine处理日志

1.2 engine connect

connection = engine.connect()

2. CORE & ORM

SQLAlchemy core更偏向数据库表级别的操作
而ORM更面向程序使用的模型

2.1 ORM

2.1.1 定义Schema

定义一个orm的class

  • 继承declarative_base
  • 包含tablename熟悉, 定义表名
  • 包含至少一个Column对象属性
  • 保证至少一个属性组成主键

__repr__方法: 定义对象被如何表达

2.1.2 Model关系

比如CustomAudienceAdRef表可以关联对应的CustomAudience.
通过backref. 查询CustomAudience时也能查到对应的CustomAudienceAdRef.

class CustomAudienceAdRef(Base, JSONDataMixin):
    '''
    自定义人群包与广告计划关联关系
    '''
    __tablename__ = 'custom_audience_ad_ref'
    __model_version__ = '2016-12-02'

    id = Column(BigInteger, primary_key=True)
    custom_audience_id = Column(BigInteger, ForeignKey('custom_audience.id'))
    data = Column(Text)
    create_time = Column(DateTime)
    modify_time = Column(DateTime)

    custom_audience = relationship("CustomAudience", backref=backref('custom_audience_ad_ref', order_by=id))

查询时, 需要加上joinedload_all. 否则会报错

DetachedInstanceError: Parent instance <CustomAudience at 0x7f8a714bf1d0> is not bound to a Session; lazy load operation of attribute 'custom_audience_ad_ref' cannot proceed

https://stackoverflow.com/questions/27701573/detachedinstanceerror-parent-instance-car-is-not-bound-to-a-session-lazy-lo

from sqlalchemy.orm import joinedload_all
audience = rs.query(CustomAudience).options(joinedload_all('*')) \
            .filter(CustomAudience.id == custom_audience_id).first()
print audience.custom_audience_ad_ref[0].id

通过, 可以实现一对一关系

relationship("CustomAudience", uselist=False)
Base.metadata.create_all(engine)可以根据定义的model创建对应的表
 

2.1.3 Session

session 是ORM和db交互的方式

  • 包装了engine的连接
  • 包装了对象的identity map(类似cache, 保存了由表名和主键确定的唯一对象列表)
  • 包装了transaction

使用sessionmaker创建session
sessionmaker类保证创建出同样配置的session. 应当全局被使用一次.

from sqlalchemy import create_engine 
from sqlalchemy.orm import sessionmaker
engine = create_engine('sqlite:///:memory:')
Session = sessionmaker(bind=engine)
session = Session()

Session状态
https://stackoverflow.com/questions/8645250/how-to-close-sqlalchemy-connection-in-mysql

http://docs.sqlalchemy.org/en/latest/orm/session_state_management.html
一个实例在session中可能有以下状态:

  • Transient
  • Pending
  • Persiste
  • Deleted
  • Detached

session.identity_map保存了一个WeakInstanceDict
key是database_identity(model的class, primary_key); value是InstanceState

Expunge: 将对象从session中移除,
sending persistent instances to the detached state, and pending instances to the transient state

对象实例会有一个dict对象保存属性, 这些属性类似缓存.
对象过期的时候(比如commit之后, 由于不知道事务的执行情况), dict内容会作废, 再次访问对象实例的属性, 会触发lazy load. 从数据库加载这些属性. 这个时候如果session已经close就会报错.

可以在commit之前, 用expunge, 将对象实例从session移除. 这是对象实例可以正常使用. 但是无法加载未加载的属性, 也无法加载expired的属性.

mysql gone away
http://docs.sqlalchemy.org/en/latest/faq/connections.html?highlight=gone%20away
http://docs.sqlalchemy.org/en/latest/core/pooling.html#pool-disconnects

The primary cause of this error is that the MySQL connection has timed out and has been closed by the server. The MySQL server closes connections which have been idle a period of time which defaults to eight hours. To accommodate this, the immediate setting is to enable the create_engine.pool_recycle setting

For the more general case of accommodating database restarts and other temporary loss of connectivity due to network issues, connections that are in the pool may be recycled in response to more generalized disconnect detection techniques.
这时有两种方式解决
悲观方式: 每次执行语句前先执行select 1. 或者通过pre_ping参数

engine = create_engine("mysql+pymysql://user:pw@host/db", pool_pre_ping=True)

乐观方式: 执行失败后重试

from sqlalchemy import create_engine, exc
e = create_engine(...)
c = e.connect()

try:
    # suppose the database has been restarted.
    c.execute("SELECT * FROM table")
    c.close()
except exc.DBAPIError, e:
    # an exception is raised, Connection is invalidated.
    if e.connection_invalidated:
        print("Connection was invalidated!")

# after the invalidate event, a new connection
# starts with a new Pool
c = e.connect()
c.execute("SELECT * FROM table")