我最近接触到了一个极为出色的Flask后台库——pear-admin-flask,这个库具有很高的二次开发价值。借此机会学习并吸收其中Flask开发的一些高级技巧。
from flask.cli import AppGroup # 导入库 admin_cli = AppGroup("admin") # 定义子命令 admin ... @admin_cli.command("init") # 定义子命令操作 init def init_db(): # 函数 db.session.add_all(userdata) ...
命令行调用
flask admin init
a. 数据库定义权限 @ pear-admin-flask/applications/common/script/admin.py
# 作者提供了非常有效的权限管理过程 powerdata = [ Power( id=1, name='系统管理', type='0', code='', url=None, open_type=None, parent_id='0', icon='layui-icon layui-icon-set-fill', sort=1, create_time=now_time, enable=1, ), ... ]
b. 定义权限拦截函数 @ pear-admin-flask/applications/common/utils/rights.py
def authorize(power: str, log: bool = False): """用户权限判断,用于判断目前会话用户是否拥有访问权限 :param power: 权限标识 :type power: str :param log: 是否记录日志, defaults to False :type log: bool, optional """ def decorator(func): @login_required @wraps(func) def wrapper(*args, **kwargs): # 定义管理员的id为1 if current_user.username == current_app.config.get("SUPERADMIN"): return func(*args, **kwargs) if not power in session.get('permissions'): if log: admin_log(request=request, is_access=False) if request.method == 'GET': abort(403) else: return jsonify(success=False, msg="权限不足!") if log: admin_log(request=request, is_access=True) return func(*args, **kwargs) return wrapper return decorator
c. 加载权限 @ pear-admin-flask/applications/view/system/passport.py
session['permissions'] = user_power
d. 使用过程 @ 多个文件中都有使用, 例如: pear-admin-flask/applications/view/system/dept.py
@bp.get('/') @authorize("system:dept:main", log=True) def main(): return render_template('system/dept/main.html')
# 导入 Blueprint from flask import Blueprint, render_template, request, jsonify # 初始化 bp = Blueprint('dept', __name__, url_prefix='/dept') # 使用修饰器实现接口 @bp.get('/') @authorize("system:dept:main", log=True) def main(): return render_template('system/dept/main.html')
然后 统一初始化 api. @ /pear-admin-flask/applications/view/system/__init__.py
from flask import Flask, Blueprint from applications.view.system.dict import bp as dict_bp ... system_bp = Blueprint('system', __name__, url_prefix='/system') def register_system_bps(app: Flask): # 在admin_bp下注册子蓝图 system_bp.register_blueprint(user_bp) ...
接下来就可以通过浏览器 GET http://ip:port/system/dept 访问到 pear-admin-flask/applications/view/system/dept.py 中接口函数.
a. 定义用户登陆函数 @ pear-admin-flask/applications/extensions/init_login.py
@login_manager.user_loader def load_user(user_id): from applications.models import User user = User.query.get(int(user_id)) return user
b. 登陆登出 @pear-admin-flask/applications/view/system/passport.py
from flask_login import current_user, login_user, login_required, logout_user # 登录 @bp.post('/login') def login_post(): ... login_user(user, remember=remember) ... # 登出 @bp.post('/logout') @login_required def logout(): logout_user() session.pop('permissions') return success_api(msg="注销成功")
c. 会话管理, 验证是否登陆,没有登陆就禁止访问, @pear-admin-flask/applications/common/utils/rights.py
from flask_login import login_required, current_user @login_required # 登陆许可修饰器 @wraps(func) def wrapper(*args, **kwargs):
# 导入 from flask_uploads import UploadSet, IMAGES # 定义集合与路径 photos = UploadSet('photos', IMAGES) def init_upload(app: Flask): # 关联 app configure_uploads(app, photos)
使用 photos 在上传接口接收文件 @ pear-admin-flask/applications/view/system/file.py
# 上传接口 @bp.post('/upload') @authorize("system:file:add", log=True) def upload_api(): if 'file' in request.files: photo = request.files['file'] mime = request.files['file'].content_type file_url = upload_curd.upload_one(photo=photo, mime=mime) res = { "msg": "上传成功", "code": 0, "success": True, "data": {"src": file_url} } return jsonify(res) return fail_api()
这个可以单独写一节, 主要是感觉数据库的内容挺多的,这里看看库的样例, 大致样式如下 @ pear-admin-flask/applications/models/admin_user_role.py
from applications.extensions import db # 创建中间表 user_role = db.Table( "admin_user_role", # 中间表名称 db.Column("id", db.Integer, primary_key=True, autoincrement=True, comment='标识'), # 主键 db.Column("user_id", db.Integer, db.ForeignKey("admin_user.id"), comment='用户编号'), # 属性 外键 db.Column("role_id", db.Integer, db.ForeignKey("admin_role.id"), comment='角色编号'), # 属性 外键 )
不明白的可以留言讨论或者自行在百度即可.
from flask_sqlalchemy import SQLAlchemy from flask_sqlalchemy.query import Query as BaseQuery from flask_marshmallow import Marshmallow ... # 重写 BaseQuery class Query(BaseQuery): ... def all_json(self, schema: Marshmallow().Schema): return schema(many=True).dump(self.all()) def layui_paginate(self): return self.paginate(page=request.args.get('page', type=int), per_page=request.args.get('limit', type=int), error_out=False) def layui_paginate_json(self, schema: Marshmallow().Schema): """ 返回dict """ _res = self.paginate( page=request.args.get('page', type=int), per_page=request.args.get('limit', type=int), error_out=False ) return schema(many=True).dump(_res.items), _res.total, _res.page, _res.per_page db = SQLAlchemy(query_class=Query) ma = Marshmallow()
from flask import Flask from flask_restx import Api, Resource api = Api() app = Flask(__name__) api.init_app(app) @api.route('/hello',strict_slashes=False) class HelloWorld(Resource): def get(self): # 如果使用模板的块,需要使用 make_response # return make_response(render_template('index.html', data=res), 200) # 使用 jsonify 是为了返回json数据的同时,相比于 json.dumps() 其会自动修改 content-type 为 application/json # 另外,如果使用 jsonify()的同时,还想自定义返回状态码,可以使用 make_response(jsonify(data=data), 201) return jsonify({'hello': 'world'}) def post(self): pass def put(self): pass def delete(self): pass if __name__ == '__main__': app.run(debug=True)
也可以使用 flask_restx 提供的参数校验.
from flask_restx import reqparse def create_reqparse(args): rp = reqparse.RequestParser() for i in args: rp.add_argument(i[0],**i[1]) return rp create_annotation = create_reqparse([ ['image_id',{'type':int,'required':True,'location':'json'}], ['category_id',{'type':int,'location':'json'}]]) @api.route('/') class AnnotatorData(Resource): @api.expect(create_annotation) def post(self): args = create_annotation.parse_args() ...