from datetime import date, datetime from pathlib import Path import shutil import sqlite3 from typing import Any, Dict, List, Optional from fastapi import FastAPI, File, Form, HTTPException, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from pydantic import BaseModel from sqlalchemy import Column, DateTime, Float, Integer, String, Text, create_engine, select from sqlalchemy.orm import Session, declarative_base, sessionmaker ROOT = Path(__file__).resolve().parents[2] DATA_DIR = ROOT / "data" UPLOAD_DIR = DATA_DIR / "uploads" DB_PATH = DATA_DIR / "opc.sqlite" WEIXIN_BASE = Path("/Users/mac/天机阁/地阁/慰心斋") OLD_FINANCE_DB = WEIXIN_BASE / "5、财务管理/mananger/data/finance.sqlite" DATA_DIR.mkdir(parents=True, exist_ok=True) UPLOAD_DIR.mkdir(parents=True, exist_ok=True) engine = create_engine(f"sqlite:///{DB_PATH}", connect_args={"check_same_thread": False}) SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False) Base = declarative_base() def now() -> datetime: return datetime.utcnow() class TimestampMixin: created_at = Column(DateTime, default=now) updated_at = Column(DateTime, default=now, onupdate=now) class SalesLead(Base, TimestampMixin): __tablename__ = "sales_leads" id = Column(Integer, primary_key=True) target_customer = Column(String, nullable=False) priority = Column(String, default="P1") status = Column(String, default="待跟进") class FollowUpRecord(Base, TimestampMixin): __tablename__ = "follow_up_records" id = Column(Integer, primary_key=True) target_type = Column(String, nullable=False) target_id = Column(Integer, nullable=False) followed_at = Column(String, default=lambda: date.today().isoformat()) follower = Column(String, default="慰心") follow_up_method = Column(String, default="记录") content = Column(Text, default="") next_action = Column(Text, default="") next_follow_up_at = Column(String, default="") class BusinessProposal(Base, TimestampMixin): __tablename__ = "business_proposals" id = Column(Integer, primary_key=True) customer_or_project_name = Column(String, nullable=False) version = Column(String, nullable=False) description = Column(Text, default="") status = Column(String, default="草稿") created_date = Column(String, default=lambda: date.today().isoformat()) class OperationProject(Base, TimestampMixin): __tablename__ = "operation_projects" id = Column(Integer, primary_key=True) project_name = Column(String, nullable=False) project_version = Column(String, default="v1.0") project_type = Column(String, default="opportunity") project_status = Column(String, default="线索发现") current_stage = Column(String, default="") owner = Column(String, default="慰心") start_date = Column(String, default="") end_date = Column(String, default="") target_customer = Column(String, default="") customer_need = Column(Text, default="") expected_contract_amount = Column(Float, default=0) expected_sign_date = Column(String, default="") sign_probability = Column(Float, default=0) next_action = Column(Text, default="") related_business_proposal_id = Column(Integer, nullable=True) sop_file_id = Column(Integer, nullable=True) sop_stage = Column(String, default="") execution_progress = Column(Float, default=0) current_deliverable = Column(Text, default="") risks = Column(Text, default="") notes = Column(Text, default="") class ProductVersion(Base, TimestampMixin): __tablename__ = "product_versions" id = Column(Integer, primary_key=True) product_name = Column(String, nullable=False) version = Column(String, nullable=False) version_goal = Column(Text, default="") feature_list = Column(Text, default="") launch_date = Column(String, default="") status = Column(String, default="规划中") notes = Column(Text, default="") class FinanceRecord(Base, TimestampMixin): __tablename__ = "finance_records" id = Column(Integer, primary_key=True) month = Column(String, nullable=False) project_name = Column(String, default="科普(慰心斋)") record_type = Column(String, nullable=False) category = Column(String, default="") amount = Column(Float, default=0) occurred_date = Column(String, default="") notes = Column(Text, default="") class FileAsset(Base, TimestampMixin): __tablename__ = "file_assets" id = Column(Integer, primary_key=True) module = Column(String, nullable=False) owner_id = Column(Integer, nullable=False) owner_version = Column(String, default="") file_category = Column(String, default="") file_name = Column(String, nullable=False) file_type = Column(String, default="") file_size = Column(Integer, default=0) file_path = Column(String, nullable=False) is_external = Column(Integer, default=0) notes = Column(Text, default="") Base.metadata.create_all(bind=engine) def to_dict(row: Any) -> Dict[str, Any]: data = {c.name: getattr(row, c.name) for c in row.__table__.columns} for key in ("created_at", "updated_at"): if data.get(key): data[key] = data[key].isoformat() return data def latest_followup(db: Session, target_type: str, target_id: int) -> str: row = db.execute( select(FollowUpRecord) .where(FollowUpRecord.target_type == target_type, FollowUpRecord.target_id == target_id) .order_by(FollowUpRecord.followed_at.desc(), FollowUpRecord.id.desc()) ).scalar_one_or_none() return row.content if row else "" def list_files(db: Session, module: str, owner_id: int) -> List[Dict[str, Any]]: rows = db.execute( select(FileAsset).where(FileAsset.module == module, FileAsset.owner_id == owner_id).order_by(FileAsset.id.desc()) ).scalars() return [to_dict(x) for x in rows] def create_file_asset(db: Session, module: str, owner_id: int, category: str, path: Path, version: str = "", external: bool = True): if not path.exists(): return asset = FileAsset( module=module, owner_id=owner_id, owner_version=version, file_category=category, file_name=path.name, file_type=path.suffix.lower().lstrip("."), file_size=path.stat().st_size, file_path=str(path), is_external=1 if external else 0, ) db.add(asset) def seed_data() -> None: db = SessionLocal() try: if db.query(SalesLead).count() > 0: return sales_rows = [ ("齐鲁制药", "P0", "跟进中", "多产品线科普年度框架,需推进高层沟通。"), ("百利天恒", "P0", "方案中", "BL-B01D1 上市前医生教育机会,准备方案。"), ("信达生物", "P0", "已签约", "现有科普项目升级/续约,重点保障执行。"), ("三生制药", "P1", "待跟进", "多科室医生教育+患者科普机会。"), ("天广实生物", "P1", "待跟进", "血液肿瘤医生教育机会。"), ] for name, priority, status, note in sales_rows: lead = SalesLead(target_customer=name, priority=priority, status=status) db.add(lead) db.flush() db.add(FollowUpRecord(target_type="sales", target_id=lead.id, content=note, next_action="明确下一次沟通人和时间")) proposal = BusinessProposal( customer_or_project_name="信达生物", version="v1.5", description="信达科普项目续约与报价方案", status="已提交客户", created_date="2026-05-28", ) db.add(proposal) db.flush() proposal_dir = WEIXIN_BASE / "2、业务方案/信达/v1.5" for category, names in { "方案": ["整体方案.pptx", "整体方案.pdf"], "成本": ["业务报价-2亿方案.xlsx", "业务报价-5250万方案.xlsx", "5、最新报价.xlsx"], "SOP": ["SOP.docx"], "财务流程": ["财务流程.docx"], }.items(): for filename in names: create_file_asset(db, "proposal", proposal.id, category, proposal_dir / filename, proposal.version) ops = [ ("圆心科技 科普文章项目", "v2026-文章", "execution", "SOP 执行中", "内容生产", 55, "文章内容生产与审核执行中"), ("圆心科技 科普视频项目", "v2026-视频", "execution", "SOP 执行中", "内容生产", 45, "视频脚本、拍摄与审核推进"), ("圆心科技 科普专访项目", "v2026-专访", "opportunity", "方案已提交", "商务推进", 0, "专访项目推动签约"), ] op_dir = WEIXIN_BASE / "3、运营方案" for name, version, kind, status, stage, progress, note in ops: project = OperationProject( project_name=name, project_version=version, project_type=kind, project_status=status, current_stage=stage, target_customer="圆心科技", customer_need="科普内容项目执行与管理", expected_contract_amount=0 if kind == "execution" else 200, expected_sign_date="2026-06", sign_probability=70 if kind == "opportunity" else 100, sop_stage=stage, execution_progress=progress, current_deliverable=note, next_action="补齐版本要求文件并更新下一节点", ) db.add(project) db.flush() db.add(FollowUpRecord(target_type="operation", target_id=project.id, content=note, next_action=project.next_action)) file_map = [ (1, "项目方案", "圆心科技--科普文章项目(1).pptx"), (2, "项目方案", "圆心科技-科普视频项目(1).pptx"), (3, "项目方案", "圆心科技-科普专访项目-2026年(1).pdf"), (1, "项目管理手册", "圆心科技《项目管理手册》-2026年.pdf"), (2, "审核标准", "科普项目-审核标准(文章-视频-音频).pdf"), ] for project_id, category, filename in file_map: project = db.get(OperationProject, project_id) if project: create_file_asset(db, "operation", project.id, category, op_dir / filename, project.project_version) products = [ ("妙手医生服务小程序", "v1.1", "视频任务增强 + 积分商城", "草稿箱、批量上传、积分商城、消息通知", "2026-Q3", "规划中"), ("数字化营销后台管理系统", "v1.2", "运营数据看板 + 智能审核", "医生活跃、任务完成率、AI 预审、渠道数据上报", "2026-Q3", "设计中"), ("妙手患者服务", "v0.5", "科普浏览 + 医生主页 MVP", "科普文章/视频浏览、医生主页、搜索", "2026-Q3", "规划中"), ("数字人内容平台", "v0.1", "基础数字人视频生成 MVP", "预设形象、AI 配音、脚本驱动、简单模板", "2026-Q3", "规划中"), ("渠道分发引擎", "v1.0", "六渠道统一分发", "分发 API、内容适配、分发排期、效果追踪", "2027-Q1", "规划中"), ] for row in products: product = ProductVersion( product_name=row[0], version=row[1], version_goal=row[2], feature_list=row[3], launch_date=row[4], status=row[5] ) db.add(product) db.flush() db.add(FollowUpRecord(target_type="product", target_id=product.id, content=f"{row[0]} {row[1]}:{row[2]}", next_action="按路线图推进")) finance_seed = [ ("2026-05", "revenue", "信达生物续约确认收入", 120, "信达项目阶段确收"), ("2026-06", "revenue", "信达生物续约确认收入", 80, "信达项目尾款预估"), ("2026-05", "cost_expense", "内容生产", 32, "医生劳务与内容制作"), ("2026-05", "cost_expense", "运营管理", 16, "项目管理与渠道协同"), ("2026-06", "cost_expense", "渠道分发", 24, "投放与分发费用"), ] for month, record_type, category, amount, notes in finance_seed: db.add(FinanceRecord(month=month, record_type=record_type, category=category, amount=amount, occurred_date=f"{month}-01", notes=notes)) if OLD_FINANCE_DB.exists(): conn = sqlite3.connect(OLD_FINANCE_DB) conn.row_factory = sqlite3.Row for row in conn.execute("SELECT name, expected_revenue FROM customers WHERE expected_revenue > 0").fetchall(): db.add( FinanceRecord( month="2026-07", record_type="revenue", category=f"{row['name']} 预计确收", amount=float(row["expected_revenue"] or 0), occurred_date="2026-07-01", notes="由原财务 manager 客户预算迁移", ) ) conn.close() db.commit() finally: db.close() seed_data() app = FastAPI(title="OPC Manager API") app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:5173", "http://127.0.0.1:5173"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class Payload(BaseModel): data: Dict[str, Any] def db_session(): db = SessionLocal() try: yield db finally: db.close() def get_model(name: str): models = { "sales": SalesLead, "followups": FollowUpRecord, "proposals": BusinessProposal, "operations": OperationProject, "products": ProductVersion, "finance": FinanceRecord, } if name not in models: raise HTTPException(404, "unknown resource") return models[name] def followup_target(resource: str) -> str: return {"sales": "sales", "operations": "operation", "products": "product"}.get(resource, resource) def apply_payload(obj: Any, payload: Dict[str, Any]): cols = {c.name for c in obj.__table__.columns} for key, value in payload.items(): if key in cols and key not in {"id", "created_at", "updated_at"}: setattr(obj, key, value) @app.get("/api/summary") def summary(): db = SessionLocal() try: sales = db.query(SalesLead).all() ops = db.query(OperationProject).all() products = db.query(ProductVersion).all() finance = db.query(FinanceRecord).all() current_month = "2026-05" revenue = sum(x.amount for x in finance if x.month == current_month and x.record_type == "revenue") cost_expense = sum(x.amount for x in finance if x.month == current_month and x.record_type == "cost_expense") net_profit = revenue - cost_expense risk_ops = [x for x in ops if x.project_status == "有风险" or x.risks] recent = [] for record in db.query(FollowUpRecord).order_by(FollowUpRecord.id.desc()).limit(6): recent.append(to_dict(record)) return { "project_name": "科普(慰心斋)", "metrics": { "p0_customers": len([x for x in sales if x.priority == "P0"]), "active_sales": len([x for x in sales if x.status in ["待跟进", "跟进中", "方案中", "商务谈判"]]), "execution_projects": len([x for x in ops if x.project_type == "execution"]), "risk_projects": len(risk_ops), "monthly_revenue": revenue, "monthly_net_profit": net_profit, "upcoming_products": len([x for x in products if x.status in ["规划中", "设计中", "开发中", "测试中"]]), }, "risks": [ {"title": "执行项目风险", "content": x.risks or f"{x.project_name} 需要按 SOP 更新下一节点"} for x in risk_ops[:5] ], "recent": recent, } finally: db.close() @app.get("/api/health") def health(): return {"ok": True, "db": str(DB_PATH)} @app.get("/api/{resource}") def list_resource(resource: str): db = SessionLocal() try: model = get_model(resource) rows = db.query(model).order_by(model.id.desc()).all() data = [] for row in rows: item = to_dict(row) if resource in ["sales", "operations", "products"]: item["followups"] = [ to_dict(x) for x in db.query(FollowUpRecord) .filter(FollowUpRecord.target_type == followup_target(resource), FollowUpRecord.target_id == row.id) .order_by(FollowUpRecord.followed_at.desc(), FollowUpRecord.id.desc()) ] item["latest_follow_up_record"] = item["followups"][0]["content"] if item["followups"] else "" if resource == "proposals": item["files"] = list_files(db, "proposal", row.id) if resource == "operations": item["files"] = list_files(db, "operation", row.id) data.append(item) return data finally: db.close() @app.post("/api/{resource}") def create_resource(resource: str, payload: Payload): db = SessionLocal() try: model = get_model(resource) obj = model() apply_payload(obj, payload.data) db.add(obj) db.commit() db.refresh(obj) return to_dict(obj) finally: db.close() @app.put("/api/{resource}/{item_id}") def update_resource(resource: str, item_id: int, payload: Payload): db = SessionLocal() try: model = get_model(resource) obj = db.get(model, item_id) if not obj: raise HTTPException(404, "not found") apply_payload(obj, payload.data) db.commit() db.refresh(obj) return to_dict(obj) finally: db.close() @app.delete("/api/{resource}/{item_id}") def delete_resource(resource: str, item_id: int): db = SessionLocal() try: model = get_model(resource) obj = db.get(model, item_id) if not obj: raise HTTPException(404, "not found") db.delete(obj) db.commit() return {"ok": True} finally: db.close() @app.get("/api/finance/summary/monthly") def finance_monthly(): db = SessionLocal() try: rows = db.query(FinanceRecord).all() months = sorted({x.month for x in rows}) result = [] for month in months: revenue = sum(x.amount for x in rows if x.month == month and x.record_type == "revenue") cost_expense = sum(x.amount for x in rows if x.month == month and x.record_type == "cost_expense") result.append( { "month": month, "revenue": revenue, "gross_profit": revenue - cost_expense, "cost_expense": cost_expense, "net_profit": revenue - cost_expense, } ) return result finally: db.close() @app.post("/api/followups/{target_type}/{target_id}") def add_followup(target_type: str, target_id: int, payload: Payload): db = SessionLocal() try: record = FollowUpRecord(target_type=target_type, target_id=target_id) apply_payload(record, payload.data) db.add(record) db.commit() db.refresh(record) return to_dict(record) finally: db.close() @app.post("/api/files/upload") async def upload_file( module: str = Form(...), owner_id: int = Form(...), owner_version: str = Form(""), file_category: str = Form(""), file: UploadFile = File(...), ): db = SessionLocal() try: folder = UPLOAD_DIR / module / str(owner_id) folder.mkdir(parents=True, exist_ok=True) target = folder / file.filename with target.open("wb") as out: shutil.copyfileobj(file.file, out) asset = FileAsset( module=module, owner_id=owner_id, owner_version=owner_version, file_category=file_category, file_name=file.filename, file_type=Path(file.filename).suffix.lower().lstrip("."), file_size=target.stat().st_size, file_path=str(target), is_external=0, ) db.add(asset) db.commit() db.refresh(asset) return to_dict(asset) finally: db.close() @app.get("/api/files/{file_id}/content") def file_content(file_id: int, inline: bool = True): db = SessionLocal() try: asset = db.get(FileAsset, file_id) if not asset: raise HTTPException(404, "file not found") path = Path(asset.file_path) if not path.exists(): raise HTTPException(404, "file missing") disposition = "inline" if inline else "attachment" return FileResponse(path, filename=asset.file_name, content_disposition_type=disposition) finally: db.close()