Files
opc-manager/backend/app/main.py
mac 8dc69f8bd6 feat: OPC 工作台 — 科普(慰心斋)单项目管理系统
Flask + Tailwind CSS + Trix + Chart.js + Lucide Icons + SQLite

- 首页概览:关键指标卡片、财务趋势图、风险提醒、近期动态
- 销售管理:客户表格 + 抽屉详情(自动保存 + 评论)
- 业务方案:版本表格 + 抽屉(文件上传/预览/删除 + 评论)
- 运营管理:项目表格(业务机会/执行项目分类)+ 抽屉
- 产品研发:版本表格 + 抽屉
- 财务管理:月度收入/毛利/成本/净利曲线图 + 明细表
- 所有抽屉:Plane 风格紧凑布局、字段失焦自动保存、Trix 富文本评论框、点击遮罩关闭
2026-05-30 00:08:28 +08:00

554 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import date, datetime
from pathlib import Path
import shutil
import sqlite3
from typing import Any, Dict, List, Optional
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from pydantic import BaseModel
from sqlalchemy import Column, DateTime, Float, Integer, String, Text, create_engine, select
from sqlalchemy.orm import Session, declarative_base, sessionmaker
ROOT = Path(__file__).resolve().parents[2]
DATA_DIR = ROOT / "data"
UPLOAD_DIR = DATA_DIR / "uploads"
DB_PATH = DATA_DIR / "opc.sqlite"
WEIXIN_BASE = Path("/Users/mac/天机阁/地阁/慰心斋")
OLD_FINANCE_DB = WEIXIN_BASE / "5、财务管理/mananger/data/finance.sqlite"
DATA_DIR.mkdir(parents=True, exist_ok=True)
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
engine = create_engine(f"sqlite:///{DB_PATH}", connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
Base = declarative_base()
def now() -> datetime:
return datetime.utcnow()
class TimestampMixin:
created_at = Column(DateTime, default=now)
updated_at = Column(DateTime, default=now, onupdate=now)
class SalesLead(Base, TimestampMixin):
__tablename__ = "sales_leads"
id = Column(Integer, primary_key=True)
target_customer = Column(String, nullable=False)
priority = Column(String, default="P1")
status = Column(String, default="待跟进")
class FollowUpRecord(Base, TimestampMixin):
__tablename__ = "follow_up_records"
id = Column(Integer, primary_key=True)
target_type = Column(String, nullable=False)
target_id = Column(Integer, nullable=False)
followed_at = Column(String, default=lambda: date.today().isoformat())
follower = Column(String, default="慰心")
follow_up_method = Column(String, default="记录")
content = Column(Text, default="")
next_action = Column(Text, default="")
next_follow_up_at = Column(String, default="")
class BusinessProposal(Base, TimestampMixin):
__tablename__ = "business_proposals"
id = Column(Integer, primary_key=True)
customer_or_project_name = Column(String, nullable=False)
version = Column(String, nullable=False)
description = Column(Text, default="")
status = Column(String, default="草稿")
created_date = Column(String, default=lambda: date.today().isoformat())
class OperationProject(Base, TimestampMixin):
__tablename__ = "operation_projects"
id = Column(Integer, primary_key=True)
project_name = Column(String, nullable=False)
project_version = Column(String, default="v1.0")
project_type = Column(String, default="opportunity")
project_status = Column(String, default="线索发现")
current_stage = Column(String, default="")
owner = Column(String, default="慰心")
start_date = Column(String, default="")
end_date = Column(String, default="")
target_customer = Column(String, default="")
customer_need = Column(Text, default="")
expected_contract_amount = Column(Float, default=0)
expected_sign_date = Column(String, default="")
sign_probability = Column(Float, default=0)
next_action = Column(Text, default="")
related_business_proposal_id = Column(Integer, nullable=True)
sop_file_id = Column(Integer, nullable=True)
sop_stage = Column(String, default="")
execution_progress = Column(Float, default=0)
current_deliverable = Column(Text, default="")
risks = Column(Text, default="")
notes = Column(Text, default="")
class ProductVersion(Base, TimestampMixin):
__tablename__ = "product_versions"
id = Column(Integer, primary_key=True)
product_name = Column(String, nullable=False)
version = Column(String, nullable=False)
version_goal = Column(Text, default="")
feature_list = Column(Text, default="")
launch_date = Column(String, default="")
status = Column(String, default="规划中")
notes = Column(Text, default="")
class FinanceRecord(Base, TimestampMixin):
__tablename__ = "finance_records"
id = Column(Integer, primary_key=True)
month = Column(String, nullable=False)
project_name = Column(String, default="科普(慰心斋)")
record_type = Column(String, nullable=False)
category = Column(String, default="")
amount = Column(Float, default=0)
occurred_date = Column(String, default="")
notes = Column(Text, default="")
class FileAsset(Base, TimestampMixin):
__tablename__ = "file_assets"
id = Column(Integer, primary_key=True)
module = Column(String, nullable=False)
owner_id = Column(Integer, nullable=False)
owner_version = Column(String, default="")
file_category = Column(String, default="")
file_name = Column(String, nullable=False)
file_type = Column(String, default="")
file_size = Column(Integer, default=0)
file_path = Column(String, nullable=False)
is_external = Column(Integer, default=0)
notes = Column(Text, default="")
Base.metadata.create_all(bind=engine)
def to_dict(row: Any) -> Dict[str, Any]:
data = {c.name: getattr(row, c.name) for c in row.__table__.columns}
for key in ("created_at", "updated_at"):
if data.get(key):
data[key] = data[key].isoformat()
return data
def latest_followup(db: Session, target_type: str, target_id: int) -> str:
row = db.execute(
select(FollowUpRecord)
.where(FollowUpRecord.target_type == target_type, FollowUpRecord.target_id == target_id)
.order_by(FollowUpRecord.followed_at.desc(), FollowUpRecord.id.desc())
).scalar_one_or_none()
return row.content if row else ""
def list_files(db: Session, module: str, owner_id: int) -> List[Dict[str, Any]]:
rows = db.execute(
select(FileAsset).where(FileAsset.module == module, FileAsset.owner_id == owner_id).order_by(FileAsset.id.desc())
).scalars()
return [to_dict(x) for x in rows]
def create_file_asset(db: Session, module: str, owner_id: int, category: str, path: Path, version: str = "", external: bool = True):
if not path.exists():
return
asset = FileAsset(
module=module,
owner_id=owner_id,
owner_version=version,
file_category=category,
file_name=path.name,
file_type=path.suffix.lower().lstrip("."),
file_size=path.stat().st_size,
file_path=str(path),
is_external=1 if external else 0,
)
db.add(asset)
def seed_data() -> None:
db = SessionLocal()
try:
if db.query(SalesLead).count() > 0:
return
sales_rows = [
("齐鲁制药", "P0", "跟进中", "多产品线科普年度框架,需推进高层沟通。"),
("百利天恒", "P0", "方案中", "BL-B01D1 上市前医生教育机会,准备方案。"),
("信达生物", "P0", "已签约", "现有科普项目升级/续约,重点保障执行。"),
("三生制药", "P1", "待跟进", "多科室医生教育+患者科普机会。"),
("天广实生物", "P1", "待跟进", "血液肿瘤医生教育机会。"),
]
for name, priority, status, note in sales_rows:
lead = SalesLead(target_customer=name, priority=priority, status=status)
db.add(lead)
db.flush()
db.add(FollowUpRecord(target_type="sales", target_id=lead.id, content=note, next_action="明确下一次沟通人和时间"))
proposal = BusinessProposal(
customer_or_project_name="信达生物",
version="v1.5",
description="信达科普项目续约与报价方案",
status="已提交客户",
created_date="2026-05-28",
)
db.add(proposal)
db.flush()
proposal_dir = WEIXIN_BASE / "2、业务方案/信达/v1.5"
for category, names in {
"方案": ["整体方案.pptx", "整体方案.pdf"],
"成本": ["业务报价-2亿方案.xlsx", "业务报价-5250万方案.xlsx", "5、最新报价.xlsx"],
"SOP": ["SOP.docx"],
"财务流程": ["财务流程.docx"],
}.items():
for filename in names:
create_file_asset(db, "proposal", proposal.id, category, proposal_dir / filename, proposal.version)
ops = [
("圆心科技 科普文章项目", "v2026-文章", "execution", "SOP 执行中", "内容生产", 55, "文章内容生产与审核执行中"),
("圆心科技 科普视频项目", "v2026-视频", "execution", "SOP 执行中", "内容生产", 45, "视频脚本、拍摄与审核推进"),
("圆心科技 科普专访项目", "v2026-专访", "opportunity", "方案已提交", "商务推进", 0, "专访项目推动签约"),
]
op_dir = WEIXIN_BASE / "3、运营方案"
for name, version, kind, status, stage, progress, note in ops:
project = OperationProject(
project_name=name,
project_version=version,
project_type=kind,
project_status=status,
current_stage=stage,
target_customer="圆心科技",
customer_need="科普内容项目执行与管理",
expected_contract_amount=0 if kind == "execution" else 200,
expected_sign_date="2026-06",
sign_probability=70 if kind == "opportunity" else 100,
sop_stage=stage,
execution_progress=progress,
current_deliverable=note,
next_action="补齐版本要求文件并更新下一节点",
)
db.add(project)
db.flush()
db.add(FollowUpRecord(target_type="operation", target_id=project.id, content=note, next_action=project.next_action))
file_map = [
(1, "项目方案", "圆心科技--科普文章项目(1).pptx"),
(2, "项目方案", "圆心科技-科普视频项目(1).pptx"),
(3, "项目方案", "圆心科技-科普专访项目-2026年(1).pdf"),
(1, "项目管理手册", "圆心科技《项目管理手册》-2026年.pdf"),
(2, "审核标准", "科普项目-审核标准(文章-视频-音频).pdf"),
]
for project_id, category, filename in file_map:
project = db.get(OperationProject, project_id)
if project:
create_file_asset(db, "operation", project.id, category, op_dir / filename, project.project_version)
products = [
("妙手医生服务小程序", "v1.1", "视频任务增强 + 积分商城", "草稿箱、批量上传、积分商城、消息通知", "2026-Q3", "规划中"),
("数字化营销后台管理系统", "v1.2", "运营数据看板 + 智能审核", "医生活跃、任务完成率、AI 预审、渠道数据上报", "2026-Q3", "设计中"),
("妙手患者服务", "v0.5", "科普浏览 + 医生主页 MVP", "科普文章/视频浏览、医生主页、搜索", "2026-Q3", "规划中"),
("数字人内容平台", "v0.1", "基础数字人视频生成 MVP", "预设形象、AI 配音、脚本驱动、简单模板", "2026-Q3", "规划中"),
("渠道分发引擎", "v1.0", "六渠道统一分发", "分发 API、内容适配、分发排期、效果追踪", "2027-Q1", "规划中"),
]
for row in products:
product = ProductVersion(
product_name=row[0], version=row[1], version_goal=row[2], feature_list=row[3], launch_date=row[4], status=row[5]
)
db.add(product)
db.flush()
db.add(FollowUpRecord(target_type="product", target_id=product.id, content=f"{row[0]} {row[1]}{row[2]}", next_action="按路线图推进"))
finance_seed = [
("2026-05", "revenue", "信达生物续约确认收入", 120, "信达项目阶段确收"),
("2026-06", "revenue", "信达生物续约确认收入", 80, "信达项目尾款预估"),
("2026-05", "cost_expense", "内容生产", 32, "医生劳务与内容制作"),
("2026-05", "cost_expense", "运营管理", 16, "项目管理与渠道协同"),
("2026-06", "cost_expense", "渠道分发", 24, "投放与分发费用"),
]
for month, record_type, category, amount, notes in finance_seed:
db.add(FinanceRecord(month=month, record_type=record_type, category=category, amount=amount, occurred_date=f"{month}-01", notes=notes))
if OLD_FINANCE_DB.exists():
conn = sqlite3.connect(OLD_FINANCE_DB)
conn.row_factory = sqlite3.Row
for row in conn.execute("SELECT name, expected_revenue FROM customers WHERE expected_revenue > 0").fetchall():
db.add(
FinanceRecord(
month="2026-07",
record_type="revenue",
category=f"{row['name']} 预计确收",
amount=float(row["expected_revenue"] or 0),
occurred_date="2026-07-01",
notes="由原财务 manager 客户预算迁移",
)
)
conn.close()
db.commit()
finally:
db.close()
seed_data()
app = FastAPI(title="OPC Manager API")
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "http://127.0.0.1:5173"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class Payload(BaseModel):
data: Dict[str, Any]
def db_session():
db = SessionLocal()
try:
yield db
finally:
db.close()
def get_model(name: str):
models = {
"sales": SalesLead,
"followups": FollowUpRecord,
"proposals": BusinessProposal,
"operations": OperationProject,
"products": ProductVersion,
"finance": FinanceRecord,
}
if name not in models:
raise HTTPException(404, "unknown resource")
return models[name]
def followup_target(resource: str) -> str:
return {"sales": "sales", "operations": "operation", "products": "product"}.get(resource, resource)
def apply_payload(obj: Any, payload: Dict[str, Any]):
cols = {c.name for c in obj.__table__.columns}
for key, value in payload.items():
if key in cols and key not in {"id", "created_at", "updated_at"}:
setattr(obj, key, value)
@app.get("/api/summary")
def summary():
db = SessionLocal()
try:
sales = db.query(SalesLead).all()
ops = db.query(OperationProject).all()
products = db.query(ProductVersion).all()
finance = db.query(FinanceRecord).all()
current_month = "2026-05"
revenue = sum(x.amount for x in finance if x.month == current_month and x.record_type == "revenue")
cost_expense = sum(x.amount for x in finance if x.month == current_month and x.record_type == "cost_expense")
net_profit = revenue - cost_expense
risk_ops = [x for x in ops if x.project_status == "有风险" or x.risks]
recent = []
for record in db.query(FollowUpRecord).order_by(FollowUpRecord.id.desc()).limit(6):
recent.append(to_dict(record))
return {
"project_name": "科普(慰心斋)",
"metrics": {
"p0_customers": len([x for x in sales if x.priority == "P0"]),
"active_sales": len([x for x in sales if x.status in ["待跟进", "跟进中", "方案中", "商务谈判"]]),
"execution_projects": len([x for x in ops if x.project_type == "execution"]),
"risk_projects": len(risk_ops),
"monthly_revenue": revenue,
"monthly_net_profit": net_profit,
"upcoming_products": len([x for x in products if x.status in ["规划中", "设计中", "开发中", "测试中"]]),
},
"risks": [
{"title": "执行项目风险", "content": x.risks or f"{x.project_name} 需要按 SOP 更新下一节点"}
for x in risk_ops[:5]
],
"recent": recent,
}
finally:
db.close()
@app.get("/api/health")
def health():
return {"ok": True, "db": str(DB_PATH)}
@app.get("/api/{resource}")
def list_resource(resource: str):
db = SessionLocal()
try:
model = get_model(resource)
rows = db.query(model).order_by(model.id.desc()).all()
data = []
for row in rows:
item = to_dict(row)
if resource in ["sales", "operations", "products"]:
item["followups"] = [
to_dict(x)
for x in db.query(FollowUpRecord)
.filter(FollowUpRecord.target_type == followup_target(resource), FollowUpRecord.target_id == row.id)
.order_by(FollowUpRecord.followed_at.desc(), FollowUpRecord.id.desc())
]
item["latest_follow_up_record"] = item["followups"][0]["content"] if item["followups"] else ""
if resource == "proposals":
item["files"] = list_files(db, "proposal", row.id)
if resource == "operations":
item["files"] = list_files(db, "operation", row.id)
data.append(item)
return data
finally:
db.close()
@app.post("/api/{resource}")
def create_resource(resource: str, payload: Payload):
db = SessionLocal()
try:
model = get_model(resource)
obj = model()
apply_payload(obj, payload.data)
db.add(obj)
db.commit()
db.refresh(obj)
return to_dict(obj)
finally:
db.close()
@app.put("/api/{resource}/{item_id}")
def update_resource(resource: str, item_id: int, payload: Payload):
db = SessionLocal()
try:
model = get_model(resource)
obj = db.get(model, item_id)
if not obj:
raise HTTPException(404, "not found")
apply_payload(obj, payload.data)
db.commit()
db.refresh(obj)
return to_dict(obj)
finally:
db.close()
@app.delete("/api/{resource}/{item_id}")
def delete_resource(resource: str, item_id: int):
db = SessionLocal()
try:
model = get_model(resource)
obj = db.get(model, item_id)
if not obj:
raise HTTPException(404, "not found")
db.delete(obj)
db.commit()
return {"ok": True}
finally:
db.close()
@app.get("/api/finance/summary/monthly")
def finance_monthly():
db = SessionLocal()
try:
rows = db.query(FinanceRecord).all()
months = sorted({x.month for x in rows})
result = []
for month in months:
revenue = sum(x.amount for x in rows if x.month == month and x.record_type == "revenue")
cost_expense = sum(x.amount for x in rows if x.month == month and x.record_type == "cost_expense")
result.append(
{
"month": month,
"revenue": revenue,
"gross_profit": revenue - cost_expense,
"cost_expense": cost_expense,
"net_profit": revenue - cost_expense,
}
)
return result
finally:
db.close()
@app.post("/api/followups/{target_type}/{target_id}")
def add_followup(target_type: str, target_id: int, payload: Payload):
db = SessionLocal()
try:
record = FollowUpRecord(target_type=target_type, target_id=target_id)
apply_payload(record, payload.data)
db.add(record)
db.commit()
db.refresh(record)
return to_dict(record)
finally:
db.close()
@app.post("/api/files/upload")
async def upload_file(
module: str = Form(...),
owner_id: int = Form(...),
owner_version: str = Form(""),
file_category: str = Form(""),
file: UploadFile = File(...),
):
db = SessionLocal()
try:
folder = UPLOAD_DIR / module / str(owner_id)
folder.mkdir(parents=True, exist_ok=True)
target = folder / file.filename
with target.open("wb") as out:
shutil.copyfileobj(file.file, out)
asset = FileAsset(
module=module,
owner_id=owner_id,
owner_version=owner_version,
file_category=file_category,
file_name=file.filename,
file_type=Path(file.filename).suffix.lower().lstrip("."),
file_size=target.stat().st_size,
file_path=str(target),
is_external=0,
)
db.add(asset)
db.commit()
db.refresh(asset)
return to_dict(asset)
finally:
db.close()
@app.get("/api/files/{file_id}/content")
def file_content(file_id: int, inline: bool = True):
db = SessionLocal()
try:
asset = db.get(FileAsset, file_id)
if not asset:
raise HTTPException(404, "file not found")
path = Path(asset.file_path)
if not path.exists():
raise HTTPException(404, "file missing")
disposition = "inline" if inline else "attachment"
return FileResponse(path, filename=asset.file_name, content_disposition_type=disposition)
finally:
db.close()