app.config[
'SQLALCHEMY_DATABASE_URI'
] =
'mysql+pymysql://root:123456@127.0.0.1:3306/model'
app.config[
'SQLALCHEMY_TRACK_MODIFICATIONS'
] =
False
db = SQLAlchemy(app)
class
User
(db.Model)
:
id = db.Column(db.Integer, primary_key=
True
, autoincrement=
True
)
name = db.Column(db.String(
20
), unique=
True
, nullable=
False
)
email = db.Column(db.String(
32
), unique=
True
, nullable=
False
)
CURD操作
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
@app.route('/insert/')
def insert():
wenming = User(name='wenming', email='wenming@163.com')
kunyu = User(name='kunyu', email='kunyu@163.com')
linfei = User(name='linfei', email='linfei@163.com')
luoyu = User(name='luoyu', email='luoyu@163.com')
yuhang = User(name='yuhang', email='yuhang@163.com')
peilong = User(name='peilong', email='peilong@163.com')
db.session.add_all([wenming, kunyu, linfei, luoyu, yuhang, peilong])
return '数据已添加'
@app.route('/select/<int:uid>/')
def select(uid):
user = User.query.get(uid)
if user:
return user.name
return '查无此人'
@app.route('/update/<int:uid>/')
def update(uid):
user = User.query.get(uid)
if user:
user.email = 'xxx@163.com'
db.session.add(user)
return '数据已更改'
return '查无此人'
@app.route('/delete/<int:uid>/')
def delete(uid):
user = User.query.get(uid)
if user:
db.session.delete(user)
return '数据已删除'
return '查无此人'
模型设计参考
常见字段类型
users = User.query.filter(User.id > 5).all()
# 与上式等价
users = User.query.filter(User.id.__gt__(5)).all()
>=, __ge__
<, __lt__
<=, __le__
==, __eq__
!=, __ne__
users = User.query.filter(User.id.between(5, 8)).all()
users = User.query.filter(User.id.in_((1, 3, 5, 7))).all()
users = User.query.filter(User.id.notin_((1, 3, 5, 7))).all()
users = User.query.filter(User.name.startswith('l')).all()
users = User.query.filter(User.name.endswith('ng')).all()
users = User.query.filter(User.name.contains('n')).all()
users = User.query.filter(User.name.like('%an%')).all()
users = User.query.filter(User.name.notlike('%an%')).all()
users = User.query.filter(User.name.ilike('%an%')).all()
users = User.query.filter(User.name.notilike('%an%')).all()
users = User.query.filter(User.id > 3, User.id < 8).all()
users = User.query.filter(and_(User.id > 3, User.id < 8)).all()
users = User.query.filter(or_(User.id < 3, User.id > 8)).all()
@app.route('/dengjia/')
def dengjia():
user = User.query.get(1)
user = db.session.query(User).get(1)
if user:
return user.name
return '查无此人'
from sqlalchemy import func
@app.route('/juhe/')
def juhe():
max_id = db.session.query(func.max(User.id)).scalar()
return str(max_id)
@app.route('/group/')
def group():
ret = db.session.query(User.sex, func.count(User.sex)).group_by(User.sex).all()
print(ret)
return '分组统计'
@app.route('/fields/')
def fields():
ret = User.query.with_entities(User.name, User.sex).all()
print(ret)
return '指定字段查询'
执行SQL语句
@app.route('/rawsql/')
def rawsql():
users = db.session.execute('select * from user where id > :id', params={'id': 5})
user_all = users.fetchall()
return ','.join(u.name for u in user_all)
@app.route('/paginate/')
def paginate():
pagination = User.query.paginate(page=1, per_page=3, error_out=False)
users = pagination.items
return ', '.join(u.name for u in users)
分页查询说明
方法:paginate
page:当前页码,默认为1
per_page:每页大小,默认为20
error_out:有错时是否报错
Pagination对象,其中包含了所有的分页信息
Pagination:
page:当前页码
per_page:页大小
pages:总页数
total:数据总量
prev_num:上一页页码
next_num:下一页页码
has_prev:是否有上一页
has_next:是否有下一页
items:当前页的数据
iter_pages:返回一个迭代器,分页导航条上页码,显示不完的返回None
prev:上一页的分页对象
next:下一页的分页对象
SQL日志:查看执行的SQL语句
app.config['DEBUG'] = True
app.config['TESTING'] = True
app.config['SQLALCHEMY_RECORD_QUERIES'] = True
from flask_sqlalchemy import get_debug_queries
for query in get_debug_queries():
print(query)
一对多(使用最多)
一:学生(Student)
- 添加反向引用:
articles=db.relationship('Article',backref='stu',lazy='dynamic')
- 多:文章(Article)
- 添加外键关联:
sid = db.Column(db.Integer, db.ForeignKey('student.id'))
lazy='dynamic',
secondary='selection',
backref=db.backref('students', lazy='dynamic')
多:课程(Course)
中间关联表(选课表),不需要手动维护
selection = db.Table('selection',
db.Column('student_id', db.Integer, db.ForeignKey('student.id')),
db.Column('course_id', db.Integer, db.ForeignKey('course.id'))
from flask import Flask
from flask_script import Manager
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import MigrateCommand, Migrate
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:123456@127.0.0.1:3306/model3'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
db = SQLAlchemy(app)
migrate = Migrate(app, db)
manager = Manager(app)
manager.add_command('db', MigrateCommand)
class Student(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String(20), unique=True, nullable=False)
添加反向引用:不会影响表结构,无需迁移
backref:反向引用的字段名
lazy:关联数据的加载时机
'select' / True:首次使用时自动查询,默认选项
'joined' / False:关联查询时
'subquery':子查询时
'dynamic':不加载数据,提供了关联数据的查询(不能在一的一侧使用)
articles = db.relationship('Article', backref='stu', lazy='dynamic')
profile = db.relationship('Profile', backref='stu', uselist=False)
courses = db.relationship('Course', lazy='dynamic', secondary='selection',
backref=db.backref('students', lazy='dynamic'))
class Article(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
title = db.Column(db.String(20), unique=True, nullable=False)
content = db.Column(db.Text)
sid = db.Column(db.Integer, db.ForeignKey('student.id'))
class Profile(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
realname = db.Column(db.String(20), unique=True, nullable=False)
sid = db.Column(db.Integer, db.ForeignKey('student.id'))
class Course(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String(20), unique=True, nullable=False)
selection = db.Table('selection',
db.Column('student_id', db.Integer, db.ForeignKey('student.id')),
db.Column('course_id', db.Integer, db.ForeignKey('course.id'))
@app.route('/many_many/')
def many_many():
''''''
student = Student.query.get(1)
course = Course.query.get(3)
student.courses.remove(course)
return student.name + '取消选择了' + course.name
# 根据学生找课程
student = Student.query.get(1)
courses = student.courses.all()
return ', '.join(c.name for c in courses)
# 根据课程找学生
course = Course.query.get(3)
students = course.students.all()
return ', '.join(s.name for s in students)
@app.route('/one_one/<int:xid>/')
def one_one(xid):
profile = Profile.query.get(xid)
return profile.stu.name
@app.route('/one_many/<int:xid>/')
def one_many(xid):
# 原生查询
student = Student.query.get(xid)
articles = Article.query.filter(Article.sid == xid).all()
return ', '.join(a.title for a in articles)
# 使用join
students = Student.query.join(Article, Student.id == Article.sid).all()
return ', '.join(s.name for s in students)
article = Article.query.get(xid)
return article.stu.name
@app.route('/')
def index():
return '模型使用'
if __name__ == '__main__':
manager.run()