# 一.基础介绍
SQLAlchemy 是一个 Python 的 SQL 工具包和对象关系映射(ORM)工具,它提供了一个高层的 ORM 以及底层的 SQL 表达式语言。SQLAlchemy 是开源的,并且可以在商业和非商业项目中免费使用。它支持多种数据库系统,包括 PostgreSQL、MySQL、SQLite 等。
# 1. SQLAlchemy 的起源
SQLAlchemy 最初由 Michael Bayer 在 2005 年创建,目的是提供一个全面的 SQL 工具包和 ORM 解决方案,以满足 Python 社区的需求。随着时间的推移,SQLAlchemy 不断发展和完善,成为了 Python 数据库编程领域中最受欢迎的库之一。
# 2. SQLAlchemy 的核心组件
# 2.1 核心 SQL 工具包
SQLAlchemy 的核心 SQL 工具包提供了构建 SQL 查询的功能,它允许开发者以 Pythonic 的方式编写 SQL 语句。这包括了对数据库表的创建、数据的增删改查等操作。
# 2.2 ORM 层
ORM(Object-Relational Mapping)层是 SQLAlchemy 的另一个重要组成部分,它允许开发者使用 Python 类和对象来表示数据库中的表和行。ORM 层抽象了数据库操作,使得开发者可以不必编写 SQL 语句,而是通过操作 Python 对象来间接地与数据库交互。
# 3. SQLAlchemy 的优势
# 3.1 灵活性
SQLAlchemy 提供了灵活的 SQL 构建工具,开发者可以自由地编写 SQL 语句,同时也可以利用 ORM 层提供的抽象来简化数据库操作。
# 3.2 跨数据库支持
SQLAlchemy 支持多种数据库系统,这意味着开发者可以使用相同的代码库来操作不同的数据库,而不需要为每种数据库编写特定的代码。
# 3.3 强大的社区支持
由于 SQLAlchemy 的流行,它拥有一个活跃的社区,开发者可以在社区中找到大量的资源和帮助,包括文档、教程和第三方库。
# 二.实战步骤
# 1.数据库配置
# 数据库
database:
TYPE: mysql
DATABASE_URL: mysql://root:xxx@xxxx:9306/test?serverTimezone=Asia/Shanghai
USERNAME: root
PASSWORD: xxx
HOST: xxxx
PORT: 9306
DBNAME: test
MAX_OVERFLOW: 60
POOL_TIMEOUT: 120
POOL_SIZE: 30
URL_PROPERTY: ?charset=utf8
ECHO: True
2
3
4
5
6
7
8
9
10
11
12
13
14
# 2.model
from datetime import datetime
import pytz
from sqlalchemy import String, Column, Text, DateTime, JSON
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, attributes
def get_beijing_now():
# 获取当前系统时区
return datetime.now(pytz.timezone('Asia/Shanghai'))
# 基类
class Base(AsyncAttrs, DeclarativeBase):
id: Mapped[int] = mapped_column(primary_key=True)
create_time = Column(DateTime, default=get_beijing_now, nullable=False)
update_time = Column(DateTime, default=get_beijing_now, onupdate=get_beijing_now, nullable=False)
def to_dict(self):
"""
转为字典输出
:return:
"""
return {c.name: getattr(self, c.name) for c in self.__table__.columns}
@repr_generator
class AlchemyEntitySchemas(Base):
__tablename__ = "entity_schemas"
name = Column(String(255), nullable=False, comment='名称')
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 3.连接配置
from sqlalchemy.pool import QueuePool
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.sql import text
from base.config import get_config_key
from urllib.parse import quote_plus as urlquote
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, AsyncEngine, async_sessionmaker
class Database:
def __init__(self, url, pool_size=30, pool_timeout=1200, max_overflow=60, echo=False):
try:
self.engine = create_engine(url, poolclass=QueuePool, pool_size=pool_size,
max_overflow=max_overflow, pool_timeout=pool_timeout,
echo=echo, pool_recycle=7200, pool_pre_ping=True, echo_pool=echo)
self.Session = sessionmaker(bind=self.engine, expire_on_commit=False, autocommit=False, autoflush=False)
print("Database connected successfully.")
except SQLAlchemyError as e:
print(f"Error connecting to the database: {e}")
def get_session(self):
return self.Session()
@staticmethod
def close_session(_session):
_session.close()
@staticmethod
def execute_query(query, _session):
try:
result = _session.execute(query)
return result.fetchall()
except SQLAlchemyError as e:
print(f"Error executing query: {e}")
return None
finally:
Database.close_session(_session)
class SyncDatabase:
async_engine: AsyncEngine = None
async_session = None
def __init__(self, url, pool_size=30, pool_timeout=1200, max_overflow=60, echo=False):
self.url = url
self.max_overflow = max_overflow
self.pool_timeout = pool_timeout
self.pool_size = pool_size
self.echo = echo
self.connect()
def connect(self):
"""创建数据库引擎和会话类"""
try:
self.async_engine = create_async_engine(self.url, echo=self.echo, pool_size=pool_size,
max_overflow=max_overflow, pool_timeout=pool_timeout,
pool_recycle=7200,
pool_pre_ping=True, echo_pool=self.echo)
self.async_session = async_sessionmaker(bind=self.async_engine, class_=AsyncSession, expire_on_commit=False,
autocommit=False, autoflush=False)
print("Database connected successfully.")
except SQLAlchemyError as e:
print(f"Error connecting to the database: {e}")
def get_db_url():
userName = get_config_key("database", "USERNAME")
password = get_config_key("database", "PASSWORD")
dbHost = get_config_key("database", "HOST")
dbPort = get_config_key("database", "PORT")
dbName = get_config_key("database", "DBNAME")
urlProperty = get_config_key("database", "URL_PROPERTY")
if dbName is None:
return f'mysql+pymysql://{userName}:{urlquote(password)}@{dbHost}:{dbPort}{urlProperty}'
else:
return f'mysql+pymysql://{userName}:{urlquote(password)}@{dbHost}:{dbPort}/{dbName}{urlProperty}'
def get_sync_db_url():
userName = get_config_key("database", "USERNAME")
password = get_config_key("database", "PASSWORD")
dbHost = get_config_key("database", "HOST")
dbPort = get_config_key("database", "PORT")
dbName = get_config_key("database", "DBNAME")
urlProperty = get_config_key("database", "URL_PROPERTY")
if dbName is None:
return f'mysql+aiomysql://{userName}:{urlquote(password)}@{dbHost}:{dbPort}{urlProperty}'
else:
return f'mysql+aiomysql://{userName}:{urlquote(password)}@{dbHost}:{dbPort}/{dbName}{urlProperty}'
url = get_db_url()
max_overflow = get_config_key("database", "MAX_OVERFLOW")
pool_timeout = get_config_key("database", "POOL_TIMEOUT")
pool_size = get_config_key("database", "POOL_SIZE")
echo = get_config_key("database", "ECHO")
# sqlalchemy实际操作对象,导入的时候应该导入这个对象
get_sqlalchemy_db = Database(url, pool_size=pool_size, pool_timeout=pool_timeout, max_overflow=max_overflow, echo=echo)
# 异步的
SYNC_DB_URI = get_sync_db_url()
_async_engine = create_async_engine(SYNC_DB_URI, echo=echo, pool_size=pool_size,
max_overflow=max_overflow, pool_timeout=pool_timeout, pool_recycle=7200,
pool_pre_ping=True, echo_pool=echo)
# 异步IO的 sqlalchemy实际操作对象,导入的时候应该导入这个对象
async_session_factory = async_sessionmaker(bind=_async_engine, class_=AsyncSession, expire_on_commit=False,
autocommit=False, autoflush=False)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# 4.调用 SQL
@staticmethod
async def find_by_name(name: str):
"""
根据名称查询
"""
db = get_sqlalchemy_db
try:
with Session(db.engine) as session:
stmt = select(AlchemySchemas)
if name:
stmt = stmt.where(AlchemySchemas.name == name)
schemas_infos = session.scalars(stmt).all()
return [schemas_info.to_dict() for schemas_info in schemas_infos] if schemas_infos else None
except SQLAlchemyError as e:
print(f"An error occurred: {e}")
return None
finally:
db.close_session(session)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 5.只更新非空
@staticmethod
def get_update_values(model: AlchemySchemas, **fields: Optional[Any]) -> Dict[str, Any]:
"""
生成更新字典,只有当字段非空时才包含该字段。
"""
return {k: v for k, v in fields.items() if v is not None}
2
3
4
5
6
@staticmethod
async def update_schema(schemas_model: SchemasUpdateModel):
"""
更新schema
"""
db = get_sqlalchemy_db
try:
with Session(db.engine) as session:
update_values = SchemasManager.get_update_values(
model=AlchemySchemas,
desc=schemas_model.desc,
name=schemas_model.name,
)
stmt = (
update(AlchemySchemas)
.where(AlchemySchemas.id == schemas_model.id)
.values(update_values)
)
session.execute(stmt)
session.commit()
except SQLAlchemyError as e:
logger.error(f"find schemas by id An error occurred: {e}")
finally:
db.close_session(session)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 6.非空判断
sqlalchemy查询字段非空
@staticmethod
def get_all_run_id():
db = get_sqlalchemy_db
try:
with (Session(db.engine) as session):
stmt = select(AlchemyTaskManage).where(AlchemyTaskManage.es_index_prefix.isnot(None))
list_res = session.scalars(stmt).all()
return [task_info.es_index_prefix for task_info in list_res] if list_res else []
except Exception as e:
logger.error(f"find task by name An error occurred: {e}")
return None
finally:
db.close_session(session)
2
3
4
5
6
7
8
9
10
11
12
13
← 02-路线规划 04-debug启动 →