题目要求:
资产管理系统
利用H5规范,CSS样式与JS脚本独立于HTML页面,Javascript调用jQuery库,CRUD后端使用FastAPI封装,前端页面在Nginx中运行,调用API模块, 实现CURD的课设总结
基本设计:
后端:
前后端的对接可以通过 FastAPI 框架提供的 API 接口实现。
第一,在后端部分,使用 FastAPI 构建了一个封装了 CRUD 操作的 API 服务器。这个服务器可以接收来自前端的请求,并根据请求中的操作类型(如 POST、GET、PUT、DELETE 等)和数据来执行相应的操作。前端部分通过使用 HTML、CSS 和 JS 等技术构建了用户界面,包括 CRUD 操作的界面。这些界面可以通过浏览器或 Nginx 服务器提供给用户使用。
在前后端对接的过程中,前端页面可以通过 AJAX 或 Fetch API 等技术向 API 服务器发送请求。API 服务器接收请求后,根据请求中的操作类型和数据执行相应的操作,然后将结果返回给前端。前端接收到结果后,可以根据结果更新页面或做出相应的响应。
综上,通过 FastAPI 框架提供的 API 接口实现了前后端的对接。前端发送请求给 API 服务器,API 服务器处理请求并返回结果给前端,从而实现了前后端的交互和数据管理。
数据库:
资产表(Assets):
资产ID(AssetID):主键,自增长,类型为整数
资产名称(AssetName):资产名称,类型为字符串,长度为255
资产类型(AssetType):资产类型,类型为字符串,长度为50
购买日期(PurchaseDate):购买日期,类型为日期
购买价格(PurchasePrice):购买价格,类型为数值型,精度为两位小数
资产状态(AssetStatus):资产状态,类型为字符串,长度为50
其他相关字段...
用户表(Users):
用户ID(UserID):主键,自增长,类型为整数
用户名(Username):用户名,类型为字符串,长度为255
密码(Password):密码,类型为字符串,长度为255
姓名(Name):姓名,类型为字符串,长度为100
邮箱(Email):邮箱,类型为字符串,长度为100
其他相关字段...
操作记录表(OperationRecords):
记录ID(RecordID):主键,自增长,类型为整数
用户ID(UserID):操作用户ID,类型为整数
操作类型(OperationType):操作类型,类型为字符串,长度为50
操作时间(OperationTime):操作时间,类型为时间戳
其他相关字段...
main.py:
# -*-coding:utf-8-*- from fastapi import FastAPI, HTTPException, Depends, Form from starlette.middleware.cors import CORSMiddleware from user import user_router from asset import asset_router from operation import op_router import uvicorn app = FastAPI() #三个路由,模块化,对应各自的路由 app.include_router(user_router, prefix='/user') app.include_router(asset_router, prefix='/asset') app.include_router(op_router, prefix='/operation') # 添加跨域访问中间件 app.add_middleware( CORSMiddleware, allow_origins=["http://127.0.0.1"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) if __name__ == '__main__': uvicorn.run(app)
user.py:
# -*-coding:utf-8-*- from datetime import datetime from fastapi import APIRouter, Query, Depends, Response from pydantic import BaseModel from sqlalchemy import select, func from sqlalchemy.orm import Session from tool import * from model import User #验证请求参数是否正确 class UserParam(BaseModel): username: str phone: str birth: str email: str asset_count: int user_router = APIRouter() #请求到user/list则执行该函数 @user_router.get('/list')#分页: def get_user_list(page: int = Query(1), keyword: str = Query(None), limit: int = Query(15), dbs: Session = Depends(get_db)): #以下通过dbs操作数据库 if keyword is None: query = dbs.query(User)#是空,则声明一个查询 else: query = dbs.query(User).filter((User.username.ilike(f"%{keyword}%"))) total_count = query.with_entities(func.count(User.user_id)).scalar() #统计数据经行分页 return {'code': 0, 'data': query.offset((page - 1) * limit).limit(limit).all(), 'count': total_count} #线偏移,限制和所有结果 #请求post路径时,参数和数据在UserParam请求体中,相对get来说更安全一些,且数据没有上限,而get有上限 @user_router.post('/add') def add_user(user: UserParam, dbs: Session = Depends(get_db)): try: new_user = User(**user.model_dump()) #**代表解包,user.model_dump()整体代表字典 #解包:a(**{'a':1})==a(a==1) #创建一个新操作保存操作记录 new_operation = OperationRecord( **{"user_id": new_user.user_id, "username": new_user.username, "operation_type": "添加用户", "operation_time": datetime.now()}) for key, value in user.model_dump().items(): #判断是否是日期的格式,如果是则转换成datatime对象 if key == 'birth': setattr(new_user, key, datetime.strptime(value, '%Y-%m-%d')) else: setattr(new_user, key, value) dbs.add(new_user) dbs.flush()#刷新 dbs.add(new_operation) dbs.commit() return {'code': 0}#正确 except Exception: return Response(status_code=404) #路径/user/modify?user_id=2 #此处与之前相比多了一个user_id参数 @user_router.post('/modify') def modify_user(user: UserParam, user_id: int = Query(...), dbs: Session = Depends(get_db)): try: user_modify = dbs.query(User).filter(User.user_id == user_id).first() #用user_id查询 new_operation = OperationRecord( **{"user_id": user_modify.user_id, "username": user_modify.username, "operation_type": "修改用户", "operation_time": datetime.now()}) if user_modify is None: raise HTTPException(status_code=404, detail="User not found") for key, value in user.model_dump().items(): if 'birth' == key: setattr(user_modify, key, datetime.strptime(value, '%Y-%m-%d')) else: setattr(user_modify, key, value) dbs.add(new_operation) dbs.commit()#直接提交到数据库 return {'code': 0}#正确结束 except Exception: return Response(status_code=404) @user_router.get('/del') #...代表必要要有query,否则返回错误,即:不可以/user/del直接请求,而是/user/del?user_id= def del_user(user_id: int = Query(...), dbs: Session = Depends(get_db)): db_user = dbs.query(User).filter(User.user_id == user_id).first() new_operation = OperationRecord( **{"user_id": db_user.user_id, "username": db_user.username, "operation_type": "删除用户", "operation_time": datetime.now()}) if db_user is None: raise HTTPException(status_code=404, detail="User not found") dbs.delete(db_user) dbs.add(new_operation) dbs.commit() return {'code': 0}
tool.py:
# -*-coding:utf-8-*- from datetime import timedelta, datetime from typing import Optional from fastapi import Cookie, HTTPException, Header from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from model import * # 在数据库中创建表 Base.metadata.create_all(engine)#engine相当于数据库 session_maker = sessionmaker(autocommit=False, autoflush=False, bind=engine) #返回一个数据库操作接口 def get_db(): db = session_maker()#数据库库接口 try: yield db finally: db.close()
model.py:
# -*-coding:utf-8-*- from sqlalchemy import create_engine, Column, Integer, String, Numeric, Date, DateTime, ForeignKey from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship #该orm框架应用很广泛 # 创建数据库连接 engine = create_engine('sqlite:///shiyan.db') #创建引擎 # 创建一个基类 Base = declarative_base() #创建表的类后不需要用sql语句创建 # 定义用户表(Users) class User(Base):#三个表继承基类创建 __tablename__ = 'users' user_id = Column(Integer, primary_key=True, autoincrement=True) username = Column(String(255)) phone = Column(String(255)) birth = Column(Date) email = Column(String(100)) asset_count = Column(Integer) # 定义资产表(Assets) class Asset(Base): __tablename__ = 'assets' asset_id = Column(Integer, primary_key=True, autoincrement=True) asset_name = Column(String(255)) asset_type = Column(String(50)) purchase_date = Column(Date) purchase_price = Column(Numeric(10, 2)) asset_status = Column(String(50)) user_id = Column(Integer, ForeignKey('users.user_id'))#外键 # 定义操作记录表(OperationRecords) class OperationRecord(Base): __tablename__ = 'operation_records' record_id = Column(Integer, primary_key=True, autoincrement=True) user_id = Column(Integer, nullable=True) username = Column(String(255), nullable=True) asset_id = Column(Integer, nullable=True) asset_name = Column(String(255), nullable=True) operation_type = Column(String(50)) operation_time = Column(DateTime)
asset.py:
# -*-coding:utf-8-*- from datetime import datetime from fastapi import APIRouter, Query, Depends, Response from pydantic import BaseModel from sqlalchemy import select, func from sqlalchemy.orm import Session from tool import * from model import Asset class AssetParam(BaseModel): asset_name: str asset_type: str purchase_date: str purchase_price: str asset_status: str user_id: int asset_router = APIRouter() @asset_router.get('/list') def get_asset_list(page: int = Query(1), limit: int = Query(15), keyword: str = Query(None), dbs: Session = Depends(get_db)): if keyword is None: query = dbs.query(Asset.asset_id, Asset.asset_name, Asset.asset_type, Asset.asset_status, Asset.purchase_date, Asset.purchase_price, User.user_id, User.username).where(Asset.user_id == User.user_id) else: query = dbs.query(Asset.asset_id, Asset.asset_name, Asset.asset_type, Asset.asset_status, Asset.purchase_date, Asset.purchase_price, User.user_id, User.username).where( Asset.user_id == User.user_id).filter((Asset.asset_name.ilike(f"%{keyword}%")))#ilike模糊查询 total_count = query.with_entities(func.count(Asset.asset_id)).scalar() return {'code': 0, 'data': query.offset((page - 1) * limit).limit(limit).all(), 'count': total_count} @asset_router.post('/add') def add_user(asset: AssetParam, dbs: Session = Depends(get_db)): try: new_asset = Asset(**asset.model_dump()) user_modify = dbs.query(User).filter(User.user_id == asset.user_id).first() if user_modify is not None: for key, value in asset.model_dump().items(): if 'date' in key: setattr(new_asset, key, datetime.strptime(value, '%Y-%m-%d')) else: setattr(new_asset, key, value) new_operation = OperationRecord( **{"user_id": user_modify.user_id, "username": user_modify.username, "asset_id": new_asset.asset_id, "asset_name": new_asset.asset_name, "operation_type": "添加资产", "operation_time": datetime.now()}) user_modify.asset_count += 1#资产增加了对应的用户的资产数量也增加 dbs.add(new_asset) dbs.add(new_operation) dbs.commit() return {'code': 0} else: return Response(status_code=404) except Exception: return Response(status_code=404) @asset_router.post('/modify') def modify_asset(asset: AssetParam, asset_id: int = Query(...), dbs: Session = Depends(get_db)): try: asset_modify = dbs.query(Asset).filter(Asset.asset_id == asset_id).first() old_user_modify = dbs.query(User).filter(User.user_id == asset_modify.user_id).first() new_user_modify = dbs.query(User).filter(User.user_id == asset.user_id).first() if asset_modify is None or new_user_modify is None: raise HTTPException(status_code=404, detail="Asset not found") for key, value in asset.model_dump().items(): if 'date' in key: setattr(asset_modify, key, datetime.strptime(value, '%Y-%m-%d')) else: setattr(asset_modify, key, value) new_operation = OperationRecord( **{"user_id": old_user_modify.user_id, "username": old_user_modify.username, "asset_id": asset_modify.asset_id, "asset_name": asset_modify.asset_name, "operation_type": "修改资产", "operation_time": datetime.now()}) old_user_modify.asset_count -= 1#资产拥有者改变,相应的拥有者资产数量改变 new_user_modify.asset_count += 1 dbs.add(new_operation) dbs.commit() return {'code': 0} except Exception: return Response(status_code=404) @asset_router.get('/sell') def sell_user(asset_id: int = Query(...), dbs: Session = Depends(get_db)): db_asset = dbs.query(Asset).filter(Asset.asset_id == asset_id).first() user_modify = dbs.query(User).filter(User.user_id == db_asset.user_id).first() if db_asset is None: raise HTTPException(status_code=404, detail="Asset not found") new_operation = OperationRecord( **{"user_id": user_modify.user_id, "username": user_modify.username, "asset_id": db_asset.asset_id, "asset_name": db_asset.asset_name, "operation_type": "售出资产", "operation_time": datetime.now()}) user_modify.asset_count -= 1 dbs.delete(db_asset) dbs.add(new_operation) dbs.commit() return {'code': 0}
operation.py:
# -*-coding:utf-8-*- from fastapi import APIRouter, Query, Depends from sqlalchemy import select, func from sqlalchemy.orm import Session from tool import * from model import * op_router = APIRouter() @op_router.get('/list') def get_op_list(page: int = Query(1), limit: int = Query(15), dbs: Session = Depends(get_db)): query = dbs.query(OperationRecord) total_count = query.with_entities(func.count(OperationRecord.record_id)).scalar() return {'code': 0, 'data': query.offset((page - 1) * limit).limit(limit).all(), 'count': total_count}
资产表页面:
layui
资产编辑页面:
layui //用webkit引擎渲染 //提示浏览器用最新的渲染模式
操作页面:
layui
Uvicon服务器正在监听127.0.0.1的8000端口
127.0.0.1也称为localhost,表示计算机本身地址
效果图
上一篇:打印日期c++