from datetime import date, datetime from pathlib import Path import os import sys import json import shutil import sqlite3 # 保留用于数据迁移 import logging import mysql.connector # 确保 backend 目录在 sys.path 中(兼容 gunicorn --preload 模式) _backend_dir = os.path.dirname(os.path.abspath(__file__)) if _backend_dir not in sys.path: sys.path.insert(0, _backend_dir) from flask import Flask, jsonify, render_template, request, send_file, session, redirect from werkzeug.security import generate_password_hash, check_password_hash logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") ROOT = Path(__file__).resolve().parents[1] DATA_DIR = ROOT / "data" UPLOAD_DIR = DATA_DIR / "uploads" DB_PATH = DATA_DIR / "opc.sqlite" try: from dotenv import load_dotenv load_dotenv(ROOT / ".env") except ImportError: pass WEIXIN_BASE = Path(os.environ.get("WEIXIN_BASE", "/Users/mac/天机阁/地阁/慰心斋")) DATA_DIR.mkdir(parents=True, exist_ok=True) UPLOAD_DIR.mkdir(parents=True, exist_ok=True) app = Flask( __name__, template_folder=str(ROOT / "templates"), static_folder=str(ROOT / "static"), ) app.secret_key = os.environ.get("SECRET_KEY", "opc-dev-secret-2026") # ---------- 鉴权 ---------- def login_required(f): from functools import wraps @wraps(f) def decorated(*args, **kwargs): if "user_id" not in session: return jsonify({"error": "未登录"}), 401 return f(*args, **kwargs) return decorated def admin_required(f): from functools import wraps @wraps(f) def decorated(*args, **kwargs): if "user_id" not in session: return jsonify({"error": "未登录"}), 401 if session.get("role") != "admin": return jsonify({"error": "无权限"}), 403 return f(*args, **kwargs) return decorated ALL_TENANTS = ["科普·无界", "科研·无界", "医患·无界", "MCN·无界", "学会·无界"] @app.route("/login") def login_page(): return render_template("login.html") @app.route("/api/auth/login", methods=["POST"]) def auth_login(): data = request.get_json(force=True) or {} username = data.get("username", "").strip() password = data.get("password", "") conn = db() try: user = one(conn, "SELECT * FROM users WHERE username=?", (username,)) if not user or not check_password_hash(user["password_hash"], password): return jsonify({"error": "用户名或密码错误"}), 401 session["user_id"] = user["id"] session["username"] = user["username"] session["display_name"] = user["display_name"] session["role"] = user["role"] # 管理员可看所有工作台,OPC负责人看分配的工作台 if user["role"] == "admin": session["tenants"] = ["科普·无界", "科研·无界", "医患·无界", "MCN·无界", "学会·无界"] else: ut = rows(conn, "SELECT tenant FROM user_tenants WHERE user_id=?", (user["id"],)) session["tenants"] = [x["tenant"] for x in ut] return jsonify({ "ok": True, "user": {"id": user["id"], "username": user["username"], "display_name": user["display_name"], "role": user["role"]}, "tenants": session["tenants"], }) finally: conn.close() @app.route("/api/auth/logout", methods=["POST"]) def auth_logout(): session.clear() return jsonify({"ok": True}) @app.route("/api/auth/me") def auth_me(): if "user_id" not in session: return jsonify({"logged_in": False}) return jsonify({ "logged_in": True, "user": {"id": session["user_id"], "username": session["username"], "display_name": session["display_name"], "role": session["role"]}, "tenants": session.get("tenants", []), }) # ---------- 账号管理 API ---------- @app.route("/api/users") @admin_required def list_users(): conn = db() try: users = rows(conn, "SELECT id, username, display_name, role, created_at FROM users ORDER BY id") ut_rows = rows(conn, "SELECT user_id, tenant FROM user_tenants") tenant_map = {} for r in ut_rows: tenant_map.setdefault(r["user_id"], []).append(r["tenant"]) for u in users: u["tenants"] = tenant_map.get(u["id"], []) return jsonify(users) finally: conn.close() @app.route("/api/users", methods=["POST"]) @admin_required def create_user(): data = request.get_json(force=True) username = (data.get("username") or "").strip() display_name = (data.get("display_name") or "").strip() password = data.get("password") or "" role = data.get("role") or "opc_owner" tenants = data.get("tenants") or [] if not username or not password or not display_name: return jsonify({"error": "用户名/密码/显示名不能为空"}), 400 if role not in ("admin", "opc_owner"): return jsonify({"error": "角色非法"}), 400 conn = db() try: if one(conn, "SELECT id FROM users WHERE username=?", (username,)): return jsonify({"error": "用户名已存在"}), 400 _exec(conn, "INSERT INTO users (username, password_hash, display_name, role, created_at) VALUES (?,?,?,?,?)", (username, generate_password_hash(password, "pbkdf2:sha256"), display_name, role, date.today().isoformat())) u = one(conn, "SELECT id FROM users WHERE username=?", (username,)) for t in tenants: if t in ALL_TENANTS: _exec(conn, "INSERT IGNORE INTO user_tenants (user_id, tenant) VALUES (?,?)", (u["id"], t)) conn.commit() return jsonify({"ok": True, "id": u["id"]}) finally: conn.close() @app.route("/api/users/", methods=["PUT"]) @admin_required def update_user(uid): data = request.get_json(force=True) conn = db() try: u = one(conn, "SELECT * FROM users WHERE id=?", (uid,)) if not u: return jsonify({"error": "用户不存在"}), 404 display_name = (data.get("display_name") or "").strip() or u["display_name"] role = data.get("role") or u["role"] if role not in ("admin", "opc_owner"): return jsonify({"error": "角色非法"}), 400 password = data.get("password") or "" if password: _exec(conn, "UPDATE users SET display_name=?, role=?, password_hash=? WHERE id=?", (display_name, role, generate_password_hash(password, "pbkdf2:sha256"), uid)) else: _exec(conn, "UPDATE users SET display_name=?, role=? WHERE id=?", (display_name, role, uid)) # 更新工作台权限 if "tenants" in data: _exec(conn, "DELETE FROM user_tenants WHERE user_id=?", (uid,)) for t in data["tenants"]: if t in ALL_TENANTS: _exec(conn, "INSERT IGNORE INTO user_tenants (user_id, tenant) VALUES (?,?)", (uid, t)) # 不允许删除最后一个 admin if role != "admin": admin_count = one(conn, "SELECT COUNT(*) AS c FROM users WHERE role='admin'")["c"] if admin_count == 0: return jsonify({"error": "至少保留一个管理员"}), 400 conn.commit() return jsonify({"ok": True}) finally: conn.close() @app.route("/api/users/", methods=["DELETE"]) @admin_required def delete_user(uid): if uid == session.get("user_id"): return jsonify({"error": "不能删除当前登录账号"}), 400 conn = db() try: u = one(conn, "SELECT * FROM users WHERE id=?", (uid,)) if not u: return jsonify({"error": "用户不存在"}), 404 # 不允许删除最后一个 admin if u["role"] == "admin": admin_count = one(conn, "SELECT COUNT(*) AS c FROM users WHERE role='admin'")["c"] if admin_count <= 1: return jsonify({"error": "至少保留一个管理员"}), 400 _exec(conn, "DELETE FROM user_tenants WHERE user_id=?", (uid,)) _exec(conn, "DELETE FROM users WHERE id=?", (uid,)) conn.commit() return jsonify({"ok": True}) finally: conn.close() @app.route("/api/tenants") def list_tenants(): return jsonify(ALL_TENANTS) # ---------- 业务 API ---------- def db(): return mysql.connector.connect( host=os.environ.get("DB_HOST", "127.0.0.1"), port=int(os.environ.get("DB_PORT", "3306")), user=os.environ.get("DB_USER", "opc"), password=os.environ.get("DB_PASSWORD", "opc123456"), database=os.environ.get("DB_NAME", "opc"), charset="utf8mb4", collation="utf8mb4_unicode_ci", ) def now(): return datetime.utcnow().isoformat() def _exec(conn, sql, args=()): """执行 SQL,自动将 ? 转为 MySQL 的 %s""" cur = conn.cursor(dictionary=True) cur.execute(sql.replace("?", "%s"), args) return cur def rows(conn, sql, args=()): cur = _exec(conn, sql, args) rows = cur.fetchall() cur.close() return rows def one(conn, sql, args=()): cur = _exec(conn, sql, args) row = cur.fetchone() cur.close() return row def init_db(): conn = db() _exec(conn, """CREATE TABLE IF NOT EXISTS sales_leads ( id INT AUTO_INCREMENT PRIMARY KEY, target_customer VARCHAR(1000) NOT NULL, priority VARCHAR(1000) NOT NULL DEFAULT 'P1', status VARCHAR(1000) NOT NULL DEFAULT '待跟进', created_at VARCHAR(30) NOT NULL DEFAULT '', updated_at VARCHAR(30) NOT NULL DEFAULT '' )""") conn.commit() _exec(conn, """CREATE TABLE IF NOT EXISTS follow_up_records ( id INT AUTO_INCREMENT PRIMARY KEY, target_type VARCHAR(1000) NOT NULL, target_id INT NOT NULL, followed_at VARCHAR(1000) NOT NULL DEFAULT '', follower VARCHAR(1000) NOT NULL DEFAULT '慰心', follow_up_method VARCHAR(1000) NOT NULL DEFAULT '记录', content VARCHAR(1000) NOT NULL DEFAULT '', next_action VARCHAR(1000) NOT NULL DEFAULT '', next_follow_up_at VARCHAR(1000) NOT NULL DEFAULT '', created_at VARCHAR(30) NOT NULL DEFAULT '', updated_at VARCHAR(30) NOT NULL DEFAULT '' )""") conn.commit() _exec(conn, """CREATE TABLE IF NOT EXISTS business_proposals ( id INT AUTO_INCREMENT PRIMARY KEY, customer_or_project_name VARCHAR(1000) NOT NULL, version VARCHAR(1000) NOT NULL, description VARCHAR(1000) NOT NULL DEFAULT '', status VARCHAR(1000) NOT NULL DEFAULT '草稿', created_date VARCHAR(1000) NOT NULL DEFAULT '', created_at VARCHAR(30) NOT NULL DEFAULT '', updated_at VARCHAR(30) NOT NULL DEFAULT '' )""") conn.commit() _exec(conn, """CREATE TABLE IF NOT EXISTS operation_projects ( id INT AUTO_INCREMENT PRIMARY KEY, project_name VARCHAR(1000) NOT NULL, project_version VARCHAR(1000) NOT NULL DEFAULT 'v1.0', project_type VARCHAR(1000) NOT NULL DEFAULT 'opportunity', project_status VARCHAR(1000) NOT NULL DEFAULT '', current_stage VARCHAR(1000) NOT NULL DEFAULT '', owner VARCHAR(1000) NOT NULL DEFAULT '慰心', start_date VARCHAR(1000) NOT NULL DEFAULT '', end_date VARCHAR(1000) NOT NULL DEFAULT '', target_customer VARCHAR(1000) NOT NULL DEFAULT '', customer_need VARCHAR(1000) NOT NULL DEFAULT '', expected_contract_amount DOUBLE NOT NULL DEFAULT 0, expected_sign_date VARCHAR(1000) NOT NULL DEFAULT '', sign_probability DOUBLE NOT NULL DEFAULT 0, next_action VARCHAR(1000) NOT NULL DEFAULT '', related_business_proposal_id INTEGER, sop_file_id INTEGER, sop_stage VARCHAR(1000) NOT NULL DEFAULT '', execution_progress DOUBLE NOT NULL DEFAULT 0, current_deliverable VARCHAR(1000) NOT NULL DEFAULT '', risks VARCHAR(1000) NOT NULL DEFAULT '', notes VARCHAR(1000) NOT NULL DEFAULT '', created_at VARCHAR(30) NOT NULL DEFAULT '', updated_at VARCHAR(30) NOT NULL DEFAULT '' )""") conn.commit() _exec(conn, """CREATE TABLE IF NOT EXISTS product_versions ( id INT AUTO_INCREMENT PRIMARY KEY, product_name VARCHAR(1000) NOT NULL, version VARCHAR(1000) NOT NULL, version_goal VARCHAR(1000) NOT NULL DEFAULT '', feature_list VARCHAR(1000) NOT NULL DEFAULT '', launch_date VARCHAR(1000) NOT NULL DEFAULT '', status VARCHAR(1000) NOT NULL DEFAULT '规划中', notes VARCHAR(1000) NOT NULL DEFAULT '', created_at VARCHAR(30) NOT NULL DEFAULT '', updated_at VARCHAR(30) NOT NULL DEFAULT '' )""") conn.commit() _exec(conn, """CREATE TABLE IF NOT EXISTS finance_records ( id INT AUTO_INCREMENT PRIMARY KEY, month VARCHAR(1000) NOT NULL, project_name VARCHAR(1000) NOT NULL DEFAULT '科普(慰心斋)', record_type VARCHAR(1000) NOT NULL, category VARCHAR(1000) NOT NULL DEFAULT '', amount DOUBLE NOT NULL DEFAULT 0, occurred_date VARCHAR(1000) NOT NULL DEFAULT '', notes VARCHAR(1000) NOT NULL DEFAULT '', created_at VARCHAR(30) NOT NULL DEFAULT '', updated_at VARCHAR(30) NOT NULL DEFAULT '' )""") conn.commit() _exec(conn, """CREATE TABLE IF NOT EXISTS file_assets ( id INT AUTO_INCREMENT PRIMARY KEY, module VARCHAR(1000) NOT NULL, owner_id INT NOT NULL, owner_version VARCHAR(1000) NOT NULL DEFAULT '', file_category VARCHAR(1000) NOT NULL DEFAULT '', file_name VARCHAR(1000) NOT NULL, file_type VARCHAR(1000) NOT NULL DEFAULT '', file_size INTEGER NOT NULL DEFAULT 0, file_path VARCHAR(1000) NOT NULL, is_external INTEGER NOT NULL DEFAULT 0, notes VARCHAR(1000) NOT NULL DEFAULT '', created_at VARCHAR(30) NOT NULL DEFAULT '', updated_at VARCHAR(30) NOT NULL DEFAULT '' )""") conn.commit() _exec(conn, """CREATE TABLE IF NOT EXISTS project_tasks ( id INT AUTO_INCREMENT PRIMARY KEY, project_id INTEGER NOT NULL, phase VARCHAR(1000) NOT NULL DEFAULT '', milestone VARCHAR(1000) NOT NULL DEFAULT '', task VARCHAR(1000) NOT NULL DEFAULT '', owner VARCHAR(1000) NOT NULL DEFAULT '', due_date VARCHAR(1000) NOT NULL DEFAULT '', blockers VARCHAR(1000) NOT NULL DEFAULT '', notes VARCHAR(1000) NOT NULL DEFAULT '', created_at VARCHAR(30) NOT NULL DEFAULT '', updated_at VARCHAR(30) NOT NULL DEFAULT '' )""") conn.commit() # 用户表 try: _exec(conn, """CREATE TABLE IF NOT EXISTS users ( id INT AUTO_INCREMENT PRIMARY KEY, username VARCHAR(100) NOT NULL UNIQUE, password_hash VARCHAR(255) NOT NULL, display_name VARCHAR(100) NOT NULL, role VARCHAR(50) NOT NULL DEFAULT 'opc_owner', created_at VARCHAR(30) NOT NULL DEFAULT '' )""") except mysql.connector.Error as e: logger.debug(f"users table: {e}") conn.commit() # 用户-工作台关联表 try: _exec(conn, """CREATE TABLE IF NOT EXISTS user_tenants ( id INT AUTO_INCREMENT PRIMARY KEY, user_id INT NOT NULL, tenant VARCHAR(100) NOT NULL, UNIQUE KEY (user_id, tenant) )""") except mysql.connector.Error as e: logger.debug(f"user_tenants table: {e}") conn.commit() # project_finances 表(月度预算 + 签约信息) try: _exec(conn, """CREATE TABLE IF NOT EXISTS project_finances ( id INT AUTO_INCREMENT PRIMARY KEY, tenant VARCHAR(100) NOT NULL DEFAULT '科普·无界', project_id VARCHAR(100) NOT NULL DEFAULT '', business_type VARCHAR(100) NOT NULL DEFAULT '', customer_name VARCHAR(200) NOT NULL DEFAULT '', sign_amount DOUBLE NOT NULL DEFAULT 0, sign_month VARCHAR(20) NOT NULL DEFAULT '', status VARCHAR(50) NOT NULL DEFAULT '待签约', sales_person VARCHAR(100) NOT NULL DEFAULT '', total_rev DOUBLE NOT NULL DEFAULT 0, total_gross DOUBLE NOT NULL DEFAULT 0, budget_data TEXT, created_at VARCHAR(30) NOT NULL DEFAULT '', updated_at VARCHAR(30) NOT NULL DEFAULT '' )""") except mysql.connector.Error as e: logger.debug(f"project_finances table: {e}") conn.commit() # Schema migrations — 添加后续迁移的列(幂等) migrations = [ "ALTER TABLE sales_leads ADD COLUMN tenant VARCHAR(100) NOT NULL DEFAULT '科普·无界'", "ALTER TABLE follow_up_records ADD COLUMN tenant VARCHAR(100) NOT NULL DEFAULT '科普·无界'", "ALTER TABLE business_proposals ADD COLUMN tenant VARCHAR(100) NOT NULL DEFAULT '科普·无界'", "ALTER TABLE business_proposals ADD COLUMN proposal_type VARCHAR(100) NOT NULL DEFAULT '业务方案'", "ALTER TABLE business_proposals ADD COLUMN notes VARCHAR(2000) NOT NULL DEFAULT ''", "ALTER TABLE operation_projects ADD COLUMN tenant VARCHAR(100) NOT NULL DEFAULT '科普·无界'", "ALTER TABLE product_versions ADD COLUMN tenant VARCHAR(100) NOT NULL DEFAULT '科普·无界'", "ALTER TABLE product_versions ADD COLUMN platform VARCHAR(100) NOT NULL DEFAULT ''", "ALTER TABLE finance_records ADD COLUMN tenant VARCHAR(100) NOT NULL DEFAULT '科普·无界'", "ALTER TABLE project_tasks ADD COLUMN tenant VARCHAR(100) NOT NULL DEFAULT '科普·无界'", "ALTER TABLE project_tasks ADD COLUMN status VARCHAR(50) NOT NULL DEFAULT '未开始'", "ALTER TABLE project_tasks ADD COLUMN sort_order INT NOT NULL DEFAULT 0", "ALTER TABLE project_tasks ADD COLUMN priority VARCHAR(10) NOT NULL DEFAULT 'P2'", # 12 月字段(确收/毛利/回款/费用/月度现金流) ] for m in ["01","02","03","04","05","06","07","08","09","10","11","12"]: migrations.append(f"ALTER TABLE project_finances ADD COLUMN rev_2026_{m} DOUBLE NOT NULL DEFAULT 0") migrations.append(f"ALTER TABLE project_finances ADD COLUMN gross_2026_{m} DOUBLE NOT NULL DEFAULT 0") migrations.append(f"ALTER TABLE project_finances ADD COLUMN payment_2026_{m} DOUBLE NOT NULL DEFAULT 0") migrations.append(f"ALTER TABLE project_finances ADD COLUMN cost_2026_{m} DOUBLE NOT NULL DEFAULT 0") for mig in migrations: try: _exec(conn, mig) except mysql.connector.Error as e: logger.debug(f"migration skipped: {e}") conn.commit() # 数据修正:status 为空或 'done' 的任务修正为合法值 try: _exec(conn, "UPDATE project_tasks SET status='未开始' WHERE status='' OR status IS NULL") _exec(conn, "UPDATE project_tasks SET status='已结束' WHERE status='done'") _exec(conn, "UPDATE project_tasks SET status='进行中' WHERE status='验收中'") conn.commit() except mysql.connector.Error as e: logger.warning(f"task status fix failed: {e}") # 初始化默认用户(只执行一次) if not one(conn, "SELECT id FROM users LIMIT 1"): _exec(conn, """INSERT INTO users (username, password_hash, display_name, role, created_at) VALUES (?,?,?,?,?)""", ("qiukai", generate_password_hash("yxcowork2026", "pbkdf2:sha256"), "qiukai", "admin", date.today().isoformat())) _exec(conn, """INSERT INTO users (username, password_hash, display_name, role, created_at) VALUES (?,?,?,?,?)""", ("kepu", generate_password_hash("kepu123", "pbkdf2:sha256"), "科普负责人", "opc_owner", date.today().isoformat())) _exec(conn, """INSERT INTO users (username, password_hash, display_name, role, created_at) VALUES (?,?,?,?,?)""", ("keyan", generate_password_hash("keyan123", "pbkdf2:sha256"), "科研负责人", "opc_owner", date.today().isoformat())) _exec(conn, """INSERT INTO users (username, password_hash, display_name, role, created_at) VALUES (?,?,?,?,?)""", ("yihuan", generate_password_hash("yihuan123", "pbkdf2:sha256"), "医患负责人", "opc_owner", date.today().isoformat())) _exec(conn, """INSERT INTO users (username, password_hash, display_name, role, created_at) VALUES (?,?,?,?,?)""", ("mcn", generate_password_hash("mcn123", "pbkdf2:sha256"), "MCN负责人", "opc_owner", date.today().isoformat())) _exec(conn, """INSERT INTO users (username, password_hash, display_name, role, created_at) VALUES (?,?,?,?,?)""", ("wuji", generate_password_hash("wuji123", "pbkdf2:sha256"), "无界负责人", "opc_owner", date.today().isoformat())) # 各 OPC 负责人绑定工作台 for uname, tenant in [("kepu","科普·无界"),("keyan","科研·无界"),("yihuan","医患·无界"),("mcn","MCN·无界"),("wuji","学会·无界")]: u = one(conn, "SELECT id FROM users WHERE username=?", (uname,)) if u: _exec(conn, "INSERT INTO user_tenants (user_id, tenant) VALUES (?,?)", (u["id"], tenant)) conn.commit() conn.close() conn.close() def seed_db(): """填充初始示例数据(仅在空库时执行一次)""" conn = db() try: if one(conn, "SELECT id FROM sales_leads LIMIT 1"): return # 已有数据,跳过 sales = [ ("齐鲁制药", "P0", "跟进中", "多产品线科普年度框架,需推进高层沟通。"), ("百利天恒", "P0", "方案中", "BL-B01D1 上市前医生教育机会,准备方案。"), ("信达生物", "P0", "已签约", "现有科普项目升级/续约,重点保障执行。"), ("三生制药", "P1", "待跟进", "多科室医生教育+患者科普机会。"), ("天广实生物", "P1", "待跟进", "血液肿瘤医生教育机会。"), ] for customer, priority, status, note in sales: cur = _exec(conn, "INSERT INTO sales_leads (target_customer, priority, status) VALUES (?,?,?)", (customer, priority, status), ) _exec(conn, "INSERT INTO follow_up_records (target_type,target_id,followed_at,content,next_action) VALUES (?,?,?,?,?)", ("sales", cur.lastrowid, date.today().isoformat(), note, "明确下一次沟通人和时间"), ) cur = _exec(conn, "INSERT INTO business_proposals (customer_or_project_name,version,description,status,created_date) VALUES (?,?,?,?,?)", ("信达生物", "v1.5", "信达科普项目续约与报价方案", "已提交客户", "2026-05-28"), ) proposal_id = cur.lastrowid proposal_dir = WEIXIN_BASE / "2、业务方案/信达/v1.5" for category, names in { "方案": ["整体方案.pptx", "整体方案.pdf"], "成本": ["业务报价-2亿方案.xlsx", "业务报价-5250万方案.xlsx", "5、最新报价.xlsx"], "SOP": ["SOP.docx"], "财务流程": ["财务流程.docx"], }.items(): for name in names: add_file_index(conn, "proposal", proposal_id, "v1.5", category, proposal_dir / name, external=True) projects = [ ("圆心科技 科普文章项目", "v2026-文章", "execution", "SOP 执行中", "内容生产", 55, "文章内容生产与审核执行中"), ("圆心科技 科普视频项目", "v2026-视频", "execution", "SOP 执行中", "内容生产", 45, "视频脚本、拍摄与审核推进"), ("圆心科技 科普专访项目", "v2026-专访", "opportunity", "方案已提交", "商务推进", 0, "专访项目推动签约"), ] op_dir = WEIXIN_BASE / "3、运营方案" for name, version, kind, status, stage, progress, note in projects: cur = _exec(conn, """INSERT INTO operation_projects (project_name,project_version,project_type,project_status,current_stage,target_customer,customer_need, expected_contract_amount,expected_sign_date,sign_probability,next_action,sop_stage,execution_progress,current_deliverable) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", (name, version, kind, status, stage, "圆心科技", "科普内容项目执行与管理", 0 if kind == "execution" else 200, "2026-06", 100 if kind == "execution" else 70, "补齐版本要求文件并更新下一节点", stage, progress, note), ) _exec(conn, "INSERT INTO follow_up_records (target_type,target_id,followed_at,content,next_action) VALUES (?,?,?,?,?)", ("operation", cur.lastrowid, date.today().isoformat(), note, "补齐版本要求文件并更新下一节点"), ) file_map = [ (1, "v2026-文章", "项目方案", "圆心科技--科普文章项目(1).pptx"), (2, "v2026-视频", "项目方案", "圆心科技-科普视频项目(1).pptx"), (3, "v2026-专访", "项目方案", "圆心科技-科普专访项目-2026年(1).pdf"), (1, "v2026-文章", "项目管理手册", "圆心科技《项目管理手册》-2026年.pdf"), (2, "v2026-视频", "审核标准", "科普项目-审核标准(文章-视频-音频).pdf"), ] for project_id, version, category, filename in file_map: add_file_index(conn, "operation", project_id, version, category, op_dir / filename, external=True) products = [ ("妙手医生服务小程序", "v1.1", "视频任务增强 + 积分商城", "草稿箱、批量上传、积分商城、消息通知", "2026-Q3", "规划中", "科普平台"), ("数字化营销后台管理系统", "v1.2", "运营数据看板 + 智能审核", "医生活跃、任务完成率、AI 预审、渠道数据上报", "2026-Q3", "设计中", "真研平台"), ("妙手患者服务", "v0.5", "科普浏览 + 医生主页 MVP", "科普文章/视频浏览、医生主页、搜索", "2026-Q3", "规划中", "科普平台"), ("数字人内容平台", "v0.1", "基础数字人视频生成 MVP", "预设形象、AI 配音、脚本驱动、简单模板", "2026-Q3", "规划中", "科普平台"), ("渠道分发引擎", "v1.0", "六渠道统一分发", "分发 API、内容适配、分发排期、效果追踪", "2027-Q1", "规划中", "科普平台"), ] for product in products: cur = _exec(conn, "INSERT INTO product_versions (product_name,version,version_goal,feature_list,launch_date,status,platform) VALUES (?,?,?,?,?,?,?)", product, ) _exec(conn, "INSERT INTO follow_up_records (target_type,target_id,followed_at,content,next_action) VALUES (?,?,?,?,?)", ("product", cur.lastrowid, date.today().isoformat(), f"{product[0]} {product[1]}:{product[2]}", "按路线图推进"), ) for month, record_type, category, amount, notes in [ ("2026-05", "revenue", "信达生物续约确认收入", 120, "信达项目阶段确收"), ("2026-06", "revenue", "信达生物续约确认收入", 80, "信达项目尾款预估"), ("2026-05", "cost_expense", "内容生产", 32, "医生劳务与内容制作"), ("2026-05", "cost_expense", "运营管理", 16, "项目管理与渠道协同"), ("2026-06", "cost_expense", "渠道分发", 24, "投放与分发费用"), ]: _exec(conn, "INSERT INTO finance_records (month,record_type,category,amount,occurred_date,notes) VALUES (?,?,?,?,?,?)", (month, record_type, category, amount, f"{month}-01", notes), ) tasks_seed = [ ("阶段1:渠道与商务确认", "商务对接", "合同签订", "Anna", "2026-06-30", "法务审核中", "合同签订后开始执行"), ("阶段1:渠道与商务确认", "官媒渠道确认", "沟通官媒确定", "段丽华", "2026-06-30", "官媒尽力推,以先达成合作为准", "集团支持"), ("阶段1:渠道与商务确认", "官媒渠道确认", "官媒合作签约", "段丽华", "2026-06-18", "", "官媒确认细节"), ("阶段2:系统与标准搭建", "系统开发上线", "音频专访系统开发上线", "戴敏/梁军营", "2026-06-18", "客户比较着急执行,需要技术的资源", ""), ("阶段2:系统与标准搭建", "系统开发上线", "精品视频系统开发上线", "戴敏/梁军营", "2026-06-25", "", ""), ("阶段2:系统与标准搭建", "标准与培训", "业务执行手册SOP", "胡龙飞", "2026-06-12", "", "系统开发上线"), ("阶段3:人员与审核入驻", "团队组建", "医学审核人员到位", "胡龙飞", "2026-06-15", "", "审核人员招聘"), ("阶段3:人员与审核入驻", "团队组建", "视频制作人员到位", "胡龙飞", "2026-06-18", "", "项目经理招聘"), ("阶段4:供应链与制作", "供应商准入", "准入拍摄/剪辑/主持人", "胡龙飞/侯亚凤", "2026-06-18", "", ""), ("阶段2:系统与标准搭建", "脚本生产及审核", "生产脚本", "军营", "2026-06-12", "脚本目前生产比较机械,需要提前准备", "细分标签领域完成"), ] for phase, milestone, task, owner, due_date, blockers, notes in tasks_seed: _exec(conn, "INSERT INTO project_tasks (project_id,phase,milestone,task,owner,due_date,blockers,notes) VALUES (?,?,?,?,?,?,?,?)", (1, phase, milestone, task, owner, due_date, blockers, notes), ) conn.commit() logger.info("Seed data inserted successfully") finally: conn.close() def add_file_index(conn, module, owner_id, owner_version, category, path, external=True): path = Path(path) if not path.exists(): return _exec(conn, """INSERT INTO file_assets (module,owner_id,owner_version,file_category,file_name,file_type,file_size,file_path,is_external) VALUES (?,?,?,?,?,?,?,?,?)""", (module, owner_id, owner_version, category, path.name, path.suffix.lower().lstrip("."), path.stat().st_size, str(path), 1 if external else 0), ) def latest_followup(conn, target_type, target_id): row = one( conn, "SELECT content FROM follow_up_records WHERE target_type=? AND target_id=? ORDER BY followed_at DESC, id DESC LIMIT 1", (target_type, target_id), ) return row["content"] if row else "" def attach_common(conn, resource, items): """批量加载 followups 和 files,避免 N+1 查询""" if not items: return items target_map = {"sales": "sales", "proposals": "proposal", "operations": "operation", "products": "product"} target_type = target_map.get(resource) ids = [item["id"] for item in items] # 批量查 followups(一次性 IN 查询) if target_type: placeholders = ",".join(["?"] * len(ids)) all_followups = rows( conn, f"SELECT * FROM follow_up_records WHERE target_type=? AND target_id IN ({placeholders}) ORDER BY followed_at DESC, id DESC", [target_type] + ids, ) # 按目标 id 分组 followups_by_id = {} for fu in all_followups: followups_by_id.setdefault(fu["target_id"], []).append(fu) for item in items: item["followups"] = followups_by_id.get(item["id"], []) item["latest_follow_up_record"] = item["followups"][0]["content"] if item["followups"] else "" # 批量查 files(proposals + operations) file_modules = {"proposals": "proposal", "operations": "operation"} if resource in file_modules: module = file_modules[resource] placeholders = ",".join(["?"] * len(ids)) all_files = rows( conn, f"SELECT * FROM file_assets WHERE module=? AND owner_id IN ({placeholders}) ORDER BY id DESC", [module] + ids, ) files_by_id = {} for f in all_files: files_by_id.setdefault(f["owner_id"], []).append(f) for item in items: item["files"] = files_by_id.get(item["id"], []) return items def monthly_finance(conn, tenant="科普·无界"): months = [f"2026-{m:02d}" for m in range(1, 13)] pfs = rows(conn, "SELECT sign_amount, sign_month, status, budget_data FROM project_finances WHERE tenant=? AND status='已签约'", [tenant]) # 预解析 budget_data:{pf_index: {month_key: {rev, gross, payment, cost}}} parsed_budgets = [] for pf in pfs: try: budget = json.loads(pf.get("budget_data") or "[]") except (json.JSONDecodeError, TypeError): budget = [] budget_map = {} for b in budget: key = (b.get("month") or "").replace("-", "_") budget_map[key] = { "rev": float(b.get("rev") or 0), "gross": float(b.get("gross") or 0), "payment": float(b.get("payment") or 0), "cost": float(b.get("cost") or 0), } parsed_budgets.append((pf, budget_map)) data = [] for month in months: key = month.replace("-", "_") revenue = gross = payment = cost = sign = 0 for pf, budget_map in parsed_budgets: if pf["status"] == "已签约" and (pf.get("sign_month") or "") == month: sign += float(pf["sign_amount"] or 0) b = budget_map.get(key) if b: revenue += b["rev"] gross += b["gross"] payment += b["payment"] cost += b["cost"] data.append({ "month": month, "revenue": revenue, "labor": 0, "expense": 0, "purchase": 0, "gross": gross, "sign": sign, "payment": payment, "cost": cost, }) return data @app.route("/") def index(): if "user_id" not in session: return redirect("/login") return render_template("index.html") @app.route("/api/bootstrap") def bootstrap(): if "user_id" not in session: return jsonify({"error": "未登录"}), 401 tenant = request.args.get("tenant", session.get("tenants", ["科普·无界"])[0]) # 验证用户是否有权限访问该 workbench allowed = session.get("tenants", []) if tenant not in allowed: tenant = allowed[0] conn = db() try: def q(sql, *args): return rows(conn, sql, args) sales = attach_common(conn, "sales", q("SELECT * FROM sales_leads WHERE tenant=? ORDER BY id DESC", tenant)) proposals = attach_common(conn, "proposals", q("SELECT * FROM business_proposals WHERE tenant=? ORDER BY id DESC", tenant)) operations = attach_common(conn, "operations", q("SELECT * FROM operation_projects WHERE tenant=? ORDER BY id ASC", tenant)) products = attach_common(conn, "products", q("SELECT * FROM product_versions WHERE tenant=? ORDER BY id DESC", tenant)) finance = q("SELECT * FROM finance_records WHERE tenant=? ORDER BY month DESC, id DESC", tenant) tasks = q("SELECT * FROM project_tasks WHERE tenant=? ORDER BY phase, sort_order, id", tenant) pfs = q("SELECT * FROM project_finances WHERE tenant=? ORDER BY id DESC", tenant) current_month = "2026-06" signed_pfs = [x for x in pfs if x["status"] == "已签约"] # 预解析 budget_data(避免重复 JSON 解析) def parse_budget(pf): try: budget = json.loads(pf.get("budget_data") or "[]") except (json.JSONDecodeError, TypeError): budget = [] return {(b.get("month") or "").replace("-", "_"): b for b in budget} budget_maps = [(pf, parse_budget(pf)) for pf in signed_pfs] def sum_budget(field, months_range): total = 0 for pf, bm in budget_maps: for m in months_range: b = bm.get(f"2026_{m:02d}") if b: total += float(b.get(field) or 0) return total # 本季度月份范围(Q1=1-3, Q2=4-6, Q3=7-9, Q4=10-12),基于当前月 _now_month = date.today().month _q_start = ((_now_month - 1) // 3) * 3 + 1 _q_range = range(_q_start, _q_start + 3) rev_annual = sum_budget("rev", range(1, 13)) gross_annual = sum_budget("gross", range(1, 13)) rev_q2 = sum_budget("rev", _q_range) gross_q2 = sum_budget("gross", _q_range) rev_month = sum_budget("rev", [_now_month]) gross_month = sum_budget("gross", [_now_month]) payment_annual = sum_budget("payment", range(1, 13)) cost_annual = sum_budget("cost", range(1, 13)) payment_q2 = sum_budget("payment", _q_range) cost_q2 = sum_budget("cost", _q_range) payment_month = sum_budget("payment", [_now_month]) cost_month = sum_budget("cost", [_now_month]) # Contract aggregates — from project_finances (经营管理项目) def pf_status_sum(status): return sum(x["sign_amount"] or 0 for x in pfs if x["status"] == status) signed_amount = pf_status_sum("已签约") # 年度签约 = 所有已签约项目 2026 年的签约金额 signed_annual = sum(x["sign_amount"] or 0 for x in pfs if x["status"] == "已签约") # 本季度签约 = 签约月份在当前季度的已签约项目 _q_months = [f"2026-{m:02d}" for m in _q_range] signed_q2 = sum(x["sign_amount"] or 0 for x in pfs if x["status"] == "已签约" and (x.get("sign_month") or "")[:7] in _q_months) # 本月签约 = 签约月份为当月的已签约项目 signed_month = sum(x["sign_amount"] or 0 for x in pfs if x["status"] == "已签约" and (x.get("sign_month") or "")[:7] == f"2026-{_now_month:02d}") pipeline_amount = sum(x["expected_contract_amount"] or 0 for x in operations if x["project_status"] not in ["已签约","已丢单","已归档","已完成"]) signed_not_executed = sum(x["expected_contract_amount"] or 0 for x in operations if x["project_type"] == "execution" and x["execution_progress"] < 100) summary = { "project_name": "科普(慰心斋)", "metrics": { "p0_customers": len([x for x in sales if x["priority"] == "P0"]), "active_sales": len([x for x in sales if x["status"] in ["待跟进", "跟进中", "方案中", "商务谈判"]]), "execution_projects": len([x for x in operations if x["project_type"] == "execution"]), "risk_projects": len([x for x in operations if x["project_status"] == "有风险" or x["risks"]]), "monthly_revenue": rev_month, "monthly_net_profit": gross_month, "monthly_gross": gross_month, "upcoming_products": len(products), "total_projects": len(signed_pfs), "total_proposals": len(operations), "total_products": len(proposals), # Extended finance metrics "signed_amount": signed_amount, "signed_annual": signed_annual, "signed_q2": signed_q2, "signed_month": signed_month, "pipeline_amount": pipeline_amount, "revenue_annual": rev_annual, "revenue_q2": rev_q2, "gross_annual": gross_annual, "gross_q2": gross_q2, "payment_annual": payment_annual, "payment_q2": payment_q2, "payment_month": payment_month, "cost_annual": cost_annual, "cost_q2": cost_q2, "cost_month": cost_month, "signed_not_executed": signed_not_executed, }, "recent": q("SELECT * FROM follow_up_records WHERE tenant=? ORDER BY id DESC LIMIT 8", tenant), "risks": [{"title": "执行提醒", "content": x["next_action"]} for x in operations if x["next_action"]][:5], } return jsonify({"summary": summary, "sales": sales, "proposals": proposals, "operations": operations, "products": products, "finance": finance, "projectFinances": pfs, "financeMonthly": monthly_finance(conn, tenant), "tasks": tasks, "tenant": tenant, "tenants": allowed}) finally: conn.close() TABLES = { "sales": ("sales_leads", ["target_customer", "priority", "status", "tenant"]), "proposals": ("business_proposals", ["customer_or_project_name", "version", "description", "status", "created_date", "proposal_type", "notes", "tenant"]), "operations": ("operation_projects", ["project_name", "project_version", "project_type", "project_status", "current_stage", "owner", "target_customer", "customer_need", "expected_contract_amount", "expected_sign_date", "sign_probability", "next_action", "sop_stage", "execution_progress", "current_deliverable", "risks", "notes", "tenant"]), "products": ("product_versions", ["product_name", "version", "version_goal", "feature_list", "launch_date", "status", "platform", "notes", "tenant"]), "finance": ("finance_records", ["month", "project_name", "record_type", "category", "amount", "occurred_date", "notes", "tenant"]), "tasks": ("project_tasks", ["project_id", "phase", "milestone", "task", "owner", "due_date", "blockers", "notes", "status", "sort_order", "priority", "tenant"]), "projectFinances": ("project_finances", ["project_id", "tenant", "business_type", "customer_name", "sign_amount", "sign_month", "status", "sales_person", "owner", "total_rev", "total_gross", "budget_data"]), } @app.route("/api/", methods=["POST"]) @login_required def create_resource(resource): if resource not in TABLES: return jsonify({"error": "unknown resource"}), 404 table, cols = TABLES[resource] payload = request.get_json(force=True).get("data", {}) # 任务状态校验:空值或非法值修正为"未开始" if resource == "tasks": valid_statuses = ["未开始", "进行中", "已结束"] if not payload.get("status") or payload["status"] not in valid_statuses: payload["status"] = "未开始" conn = db() try: # 获取列类型,数值列空字符串转 0 避免 MySQL 严格模式报错 type_cur = conn.cursor() type_cur.execute(f"DESCRIBE {table}") col_types = {r[0]: r[1].upper() for r in type_cur.fetchall()} type_cur.close() values = [] for col in cols: val = payload.get(col, "") if val == "" and ("DOUBLE" in col_types.get(col, "") or "INT" in col_types.get(col, "")): val = 0 values.append(val) cur = _exec(conn, f"INSERT INTO {table} ({','.join(cols)}) VALUES ({','.join(['?'] * len(cols))})", values) conn.commit() return jsonify({"id": cur.lastrowid}) finally: conn.close() @app.route("/api//", methods=["PUT", "DELETE"]) @login_required def update_resource(resource, item_id): if resource not in TABLES: return jsonify({"error": "unknown resource"}), 404 table, cols = TABLES[resource] conn = db() try: if request.method == "DELETE": _exec(conn, f"DELETE FROM {table} WHERE id=?", (item_id,)) conn.commit() return jsonify({"ok": True}) payload = request.get_json(force=True).get("data", {}) # 任务状态校验:空值或非法值修正为"未开始" if resource == "tasks" and "status" in payload: valid_statuses = ["未开始", "进行中", "已结束"] if not payload["status"] or payload["status"] not in valid_statuses: payload["status"] = "未开始" update_cols = [col for col in cols if col in payload] if update_cols: _exec(conn, f"UPDATE {table} SET {','.join([col + '=?' for col in update_cols])}, updated_at=? WHERE id=?", [payload[col] for col in update_cols] + [now(), item_id], ) conn.commit() return jsonify({"ok": True}) finally: conn.close() @app.route("/api/followups//", methods=["POST"]) @login_required def add_followup(target_type, target_id): payload = request.get_json(force=True).get("data", {}) conn = db() try: _exec(conn, """INSERT INTO follow_up_records (target_type,target_id,followed_at,follower,follow_up_method,content,next_action,next_follow_up_at,tenant) VALUES (?,?,?,?,?,?,?,?,?)""", ( target_type, target_id, payload.get("followed_at") or date.today().isoformat(), payload.get("follower") or "慰心", payload.get("follow_up_method") or "记录", payload.get("content") or "", payload.get("next_action") or "", payload.get("next_follow_up_at") or "", payload.get("tenant") or "科普·无界", ), ) conn.commit() return jsonify({"ok": True}) finally: conn.close() @app.route("/api/followups/", methods=["DELETE"]) @login_required def delete_followup(followup_id): conn = db() try: cur = _exec(conn, "DELETE FROM follow_up_records WHERE id=?", (followup_id,)) conn.commit() if cur.rowcount == 0: return jsonify({"error": "not found"}), 404 return jsonify({"ok": True}) finally: conn.close() @app.route("/api/tasks/batch-sort", methods=["POST"]) @login_required def batch_sort_tasks(): conn = db() try: items = request.get_json(force=True).get("items", []) for item in items: _exec(conn, "UPDATE project_tasks SET sort_order=? WHERE id=?", (item["sort_order"], item["id"])) conn.commit() return jsonify({"ok": True}) finally: conn.close() @app.route("/api/operations/batch-sort", methods=["POST"]) @login_required def batch_sort_operations(): conn = db() try: items = request.get_json(force=True).get("items", []) for item in items: _exec(conn, "UPDATE operation_projects SET sort_order=? WHERE id=?", (item["sort_order"], item["id"])) conn.commit() return jsonify({"ok": True}) finally: conn.close() @app.route("/api/files/upload", methods=["POST"]) @login_required def upload_file(): file = request.files["file"] module = request.form["module"] owner_id = int(request.form["owner_id"]) owner_version = request.form.get("owner_version", "") category = request.form.get("file_category", "") folder = UPLOAD_DIR / module / str(owner_id) folder.mkdir(parents=True, exist_ok=True) target = folder / file.filename file.save(target) conn = db() try: add_file_index(conn, module, owner_id, owner_version, category, target, external=False) conn.commit() return jsonify({"ok": True}) finally: conn.close() @app.route("/api/files//content") def file_content(file_id): conn = db() try: asset = one(conn, "SELECT * FROM file_assets WHERE id=?", (file_id,)) if not asset: return jsonify({"error": "not found"}), 404 path = Path(asset["file_path"]) if not path.exists(): return jsonify({"error": "missing"}), 404 return send_file(path, as_attachment=request.args.get("inline") == "false", download_name=asset["file_name"]) finally: conn.close() @app.route("/api/files/", methods=["DELETE"]) @login_required def delete_file(file_id): conn = db() try: asset = one(conn, "SELECT * FROM file_assets WHERE id=?", (file_id,)) if not asset: return jsonify({"error": "not found"}), 404 # Remove physical file from uploads/ if it was uploaded to our dir path = Path(asset["file_path"]) if path.exists() and str(UPLOAD_DIR) in str(path.resolve()): path.unlink(missing_ok=True) _exec(conn, "DELETE FROM file_assets WHERE id=?", (file_id,)) conn.commit() return jsonify({"ok": True}) finally: conn.close() @app.route("/api/health") def health(): return jsonify({"ok": True, "service": "opc-manager"}) from migrations import run_migrations run_migrations() if __name__ == "__main__": app.run( host="127.0.0.1", port=5177, debug=os.environ.get("FLASK_DEBUG", "false").lower() in ("true", "1", "yes"), )