相关推荐recommended
资产信息管理系统-前后端开发
作者:mmseoamin日期:2024-01-30

题目要求:

资产管理系统

利用H5规范,CSS样式与JS脚本独立于HTML页面,Javascript调用jQuery库,CRUD后端使用FastAPI封装,前端页面在Nginx中运行,调用API模块, 实现CURD的课设总结

基本设计:

后端:

前后端的对接可以通过 FastAPI 框架提供的 API 接口实现。

第一,在后端部分,使用 FastAPI 构建了一个封装了 CRUD 操作的 API 服务器。这个服务器可以接收来自前端的请求,并根据请求中的操作类型(如 POST、GET、PUT、DELETE 等)和数据来执行相应的操作。前端部分通过使用 HTML、CSS 和 JS 等技术构建了用户界面,包括 CRUD 操作的界面。这些界面可以通过浏览器或 Nginx 服务器提供给用户使用。

在前后端对接的过程中,前端页面可以通过 AJAX 或 Fetch API 等技术向 API 服务器发送请求。API 服务器接收请求后,根据请求中的操作类型和数据执行相应的操作,然后将结果返回给前端。前端接收到结果后,可以根据结果更新页面或做出相应的响应。


综上,通过 FastAPI 框架提供的 API 接口实现了前后端的对接。前端发送请求给 API 服务器,API 服务器处理请求并返回结果给前端,从而实现了前后端的交互和数据管理。

数据库:

资产表(Assets):

资产ID(AssetID):主键,自增长,类型为整数

资产名称(AssetName):资产名称,类型为字符串,长度为255

资产类型(AssetType):资产类型,类型为字符串,长度为50

购买日期(PurchaseDate):购买日期,类型为日期

购买价格(PurchasePrice):购买价格,类型为数值型,精度为两位小数

资产状态(AssetStatus):资产状态,类型为字符串,长度为50

其他相关字段...

用户表(Users):

用户ID(UserID):主键,自增长,类型为整数

用户名(Username):用户名,类型为字符串,长度为255

密码(Password):密码,类型为字符串,长度为255

姓名(Name):姓名,类型为字符串,长度为100

邮箱(Email):邮箱,类型为字符串,长度为100

其他相关字段...

操作记录表(OperationRecords):

记录ID(RecordID):主键,自增长,类型为整数

用户ID(UserID):操作用户ID,类型为整数

操作类型(OperationType):操作类型,类型为字符串,长度为50

操作时间(OperationTime):操作时间,类型为时间戳

其他相关字段...

main.py:

# -*-coding:utf-8-*-
from fastapi import FastAPI, HTTPException, Depends, Form
from starlette.middleware.cors import CORSMiddleware
from user import user_router
from asset import asset_router
from operation import op_router
import uvicorn
app = FastAPI()
#三个路由,模块化,对应各自的路由
app.include_router(user_router, prefix='/user')
app.include_router(asset_router, prefix='/asset')
app.include_router(op_router, prefix='/operation')
# 添加跨域访问中间件
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://127.0.0.1"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
if __name__ == '__main__':
    uvicorn.run(app)

user.py:

# -*-coding:utf-8-*-
from datetime import datetime
from fastapi import APIRouter, Query, Depends, Response
from pydantic import BaseModel
from sqlalchemy import select, func
from sqlalchemy.orm import Session
from tool import *
from model import User
#验证请求参数是否正确
class UserParam(BaseModel):
    username: str
    phone: str
    birth: str
    email: str
    asset_count: int
user_router = APIRouter()
#请求到user/list则执行该函数
@user_router.get('/list')#分页:
def get_user_list(page: int = Query(1), keyword: str = Query(None), limit: int = Query(15),
                  dbs: Session = Depends(get_db)):
#以下通过dbs操作数据库
    if keyword is None:
        query = dbs.query(User)#是空,则声明一个查询
    else:
        query = dbs.query(User).filter((User.username.ilike(f"%{keyword}%")))
    total_count = query.with_entities(func.count(User.user_id)).scalar()
#统计数据经行分页
    return {'code': 0, 'data': query.offset((page - 1) * limit).limit(limit).all(), 'count': total_count}
#线偏移,限制和所有结果
#请求post路径时,参数和数据在UserParam请求体中,相对get来说更安全一些,且数据没有上限,而get有上限
@user_router.post('/add')
def add_user(user: UserParam, dbs: Session = Depends(get_db)):
    try:
        new_user = User(**user.model_dump())
#**代表解包,user.model_dump()整体代表字典
#解包:a(**{'a':1})==a(a==1)
#创建一个新操作保存操作记录
        new_operation = OperationRecord(
            **{"user_id": new_user.user_id, "username": new_user.username, "operation_type": "添加用户",
               "operation_time": datetime.now()})
        for key, value in user.model_dump().items():
#判断是否是日期的格式,如果是则转换成datatime对象
            if key == 'birth':
                setattr(new_user, key, datetime.strptime(value, '%Y-%m-%d'))
            else:
                setattr(new_user, key, value)
        dbs.add(new_user)
        dbs.flush()#刷新
        dbs.add(new_operation)
        dbs.commit()
        return {'code': 0}#正确
    except Exception:
        return Response(status_code=404)
#路径/user/modify?user_id=2
#此处与之前相比多了一个user_id参数
@user_router.post('/modify')
def modify_user(user: UserParam, user_id: int = Query(...), dbs: Session = Depends(get_db)):
    try:
        user_modify = dbs.query(User).filter(User.user_id == user_id).first()
#用user_id查询
        new_operation = OperationRecord(
            **{"user_id": user_modify.user_id, "username": user_modify.username, "operation_type": "修改用户",
               "operation_time": datetime.now()})
        if user_modify is None:
            raise HTTPException(status_code=404, detail="User not found")
        for key, value in user.model_dump().items():
            if 'birth' == key:
                setattr(user_modify, key, datetime.strptime(value, '%Y-%m-%d'))
            else:
                setattr(user_modify, key, value)
        dbs.add(new_operation)
        dbs.commit()#直接提交到数据库
        return {'code': 0}#正确结束
    except Exception:
        return Response(status_code=404)
@user_router.get('/del')
#...代表必要要有query,否则返回错误,即:不可以/user/del直接请求,而是/user/del?user_id=
def del_user(user_id: int = Query(...), dbs: Session = Depends(get_db)):
    db_user = dbs.query(User).filter(User.user_id == user_id).first()
    new_operation = OperationRecord(
        **{"user_id": db_user.user_id, "username": db_user.username, "operation_type": "删除用户",
           "operation_time": datetime.now()})
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    dbs.delete(db_user)
    dbs.add(new_operation)
    dbs.commit()
    return {'code': 0}

 tool.py:

# -*-coding:utf-8-*-
from datetime import timedelta, datetime
from typing import Optional
from fastapi import Cookie, HTTPException, Header
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from model import *
# 在数据库中创建表
Base.metadata.create_all(engine)#engine相当于数据库
session_maker = sessionmaker(autocommit=False, autoflush=False, bind=engine)
#返回一个数据库操作接口
def get_db():
    db = session_maker()#数据库库接口
    try:
        yield db
    finally:
        db.close()

 model.py:

# -*-coding:utf-8-*-
from sqlalchemy import create_engine, Column, Integer, String, Numeric, Date, DateTime, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
#该orm框架应用很广泛
# 创建数据库连接
engine = create_engine('sqlite:///shiyan.db')
#创建引擎
# 创建一个基类
Base = declarative_base()
#创建表的类后不需要用sql语句创建
# 定义用户表(Users)
class User(Base):#三个表继承基类创建
    __tablename__ = 'users'
    user_id = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String(255))
    phone = Column(String(255))
    birth = Column(Date)
    email = Column(String(100))
    asset_count = Column(Integer)
# 定义资产表(Assets)
class Asset(Base):
    __tablename__ = 'assets'
    asset_id = Column(Integer, primary_key=True, autoincrement=True)
    asset_name = Column(String(255))
    asset_type = Column(String(50))
    purchase_date = Column(Date)
    purchase_price = Column(Numeric(10, 2))
    asset_status = Column(String(50))
    user_id = Column(Integer, ForeignKey('users.user_id'))#外键
# 定义操作记录表(OperationRecords)
class OperationRecord(Base):
    __tablename__ = 'operation_records'
    record_id = Column(Integer, primary_key=True, autoincrement=True)
    user_id = Column(Integer, nullable=True)
    username = Column(String(255), nullable=True)
    asset_id = Column(Integer, nullable=True)
    asset_name = Column(String(255), nullable=True)
    operation_type = Column(String(50))
    operation_time = Column(DateTime)

asset.py:

# -*-coding:utf-8-*-
from datetime import datetime
from fastapi import APIRouter, Query, Depends, Response
from pydantic import BaseModel
from sqlalchemy import select, func
from sqlalchemy.orm import Session
from tool import *
from model import Asset
class AssetParam(BaseModel):
    asset_name: str
    asset_type: str
    purchase_date: str
    purchase_price: str
    asset_status: str
    user_id: int
asset_router = APIRouter()
@asset_router.get('/list')
def get_asset_list(page: int = Query(1), limit: int = Query(15), keyword: str = Query(None),
                   dbs: Session = Depends(get_db)):
    if keyword is None:
        query = dbs.query(Asset.asset_id, Asset.asset_name, Asset.asset_type, Asset.asset_status, Asset.purchase_date,
                          Asset.purchase_price, User.user_id, User.username).where(Asset.user_id == User.user_id)
    else:
        query = dbs.query(Asset.asset_id, Asset.asset_name, Asset.asset_type, Asset.asset_status, Asset.purchase_date,
                          Asset.purchase_price, User.user_id, User.username).where(
            Asset.user_id == User.user_id).filter((Asset.asset_name.ilike(f"%{keyword}%")))#ilike模糊查询
    total_count = query.with_entities(func.count(Asset.asset_id)).scalar()
    return {'code': 0, 'data': query.offset((page - 1) * limit).limit(limit).all(), 'count': total_count}
@asset_router.post('/add')
def add_user(asset: AssetParam, dbs: Session = Depends(get_db)):
    try:
        new_asset = Asset(**asset.model_dump())
        user_modify = dbs.query(User).filter(User.user_id == asset.user_id).first()
        if user_modify is not None:
            for key, value in asset.model_dump().items():
                if 'date' in key:
                    setattr(new_asset, key, datetime.strptime(value, '%Y-%m-%d'))
                else:
                    setattr(new_asset, key, value)
            new_operation = OperationRecord(
                **{"user_id": user_modify.user_id, "username": user_modify.username, "asset_id": new_asset.asset_id,
                   "asset_name": new_asset.asset_name, "operation_type": "添加资产", "operation_time": datetime.now()})
            user_modify.asset_count += 1#资产增加了对应的用户的资产数量也增加
            dbs.add(new_asset)
            dbs.add(new_operation)
            dbs.commit()
            return {'code': 0}
        else:
            return Response(status_code=404)
    except Exception:
        return Response(status_code=404)
@asset_router.post('/modify')
def modify_asset(asset: AssetParam, asset_id: int = Query(...), dbs: Session = Depends(get_db)):
    try:
        asset_modify = dbs.query(Asset).filter(Asset.asset_id == asset_id).first()
        old_user_modify = dbs.query(User).filter(User.user_id == asset_modify.user_id).first()
        new_user_modify = dbs.query(User).filter(User.user_id == asset.user_id).first()
        if asset_modify is None or new_user_modify is None:
            raise HTTPException(status_code=404, detail="Asset not found")
        for key, value in asset.model_dump().items():
            if 'date' in key:
                setattr(asset_modify, key, datetime.strptime(value, '%Y-%m-%d'))
            else:
                setattr(asset_modify, key, value)
        new_operation = OperationRecord(
            **{"user_id": old_user_modify.user_id, "username": old_user_modify.username,
               "asset_id": asset_modify.asset_id,
               "asset_name": asset_modify.asset_name, "operation_type": "修改资产", "operation_time": datetime.now()})
        old_user_modify.asset_count -= 1#资产拥有者改变,相应的拥有者资产数量改变
        new_user_modify.asset_count += 1
        dbs.add(new_operation)
        dbs.commit()
        return {'code': 0}
    except Exception:
        return Response(status_code=404)
@asset_router.get('/sell')
def sell_user(asset_id: int = Query(...), dbs: Session = Depends(get_db)):
    db_asset = dbs.query(Asset).filter(Asset.asset_id == asset_id).first()
    user_modify = dbs.query(User).filter(User.user_id == db_asset.user_id).first()
    if db_asset is None:
        raise HTTPException(status_code=404, detail="Asset not found")
    new_operation = OperationRecord(
        **{"user_id": user_modify.user_id, "username": user_modify.username, "asset_id": db_asset.asset_id,
           "asset_name": db_asset.asset_name, "operation_type": "售出资产", "operation_time": datetime.now()})
    user_modify.asset_count -= 1
    dbs.delete(db_asset)
    dbs.add(new_operation)
    dbs.commit()
    return {'code': 0}

operation.py:

# -*-coding:utf-8-*-
from fastapi import APIRouter, Query, Depends
from sqlalchemy import select, func
from sqlalchemy.orm import Session
from tool import *
from model import *
op_router = APIRouter()
@op_router.get('/list')
def get_op_list(page: int = Query(1), limit: int = Query(15), dbs: Session = Depends(get_db)):
    query = dbs.query(OperationRecord)
    total_count = query.with_entities(func.count(OperationRecord.record_id)).scalar()
    return {'code': 0, 'data': query.offset((page - 1) * limit).limit(limit).all(), 'count': total_count}

资产表页面:




    
    layui
    
    
    
    
    


    
        
搜索信息

资产编辑页面:




    
    layui
    //用webkit引擎渲染
    
//提示浏览器用最新的渲染模式
    
    
    
    


    
        
            
        
    
    
        
        
            
        
    
    
        
        
            
        
    
    
        
        
            
        
    
    
        
        
            
        
    
    
        
        
            
        
    
    
        
        
            
        
    
    
        
            
        
    


操作页面:




    
    layui
    
    
    
    
    


    
        

 Uvicon服务器正在监听127.0.0.1的8000端口资产信息管理系统-前后端开发,第1张

 127.0.0.1也称为localhost,表示计算机本身地址

效果图资产信息管理系统-前后端开发,第2张

 

上一篇:打印日期c++

下一篇:STM32-创建工程模板