Files
opc-manager/backend/routes.py
mac caebf90438 重构:flask_app.py 拆分为 db/helpers/routes/seed_data + Blueprint
- flask_app.py 1166行→33行纯入口
- 新建 db.py(配置+连接+SQL工具)
- 新建 helpers.py(attach_common/monthly_finance/add_file_index)
- 新建 routes.py(全路由 Blueprint + 装饰器 + TABLES)
- 新建 migrations/seed_data.py(seed_db 搬迁)
- migrations/{tables,columns,data_fixes,seed}.py 改 import 为 from db
- 删除死代码 init_db(228行)+ latest_followup(10行)
- 反向依赖消除:migrations 不再 import flask_app
- 前端零改动,URL 不变
2026-07-02 18:30:24 +08:00

613 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# routes.py — 全部 HTTP 路由Blueprint
# 依赖flask, db.py, helpers.py, werkzeug
import os
import json
from datetime import date
from pathlib import Path
from flask import Blueprint, jsonify, render_template, request, send_file, session, redirect
from werkzeug.security import generate_password_hash, check_password_hash
from db import db, now, _exec, rows, one, UPLOAD_DIR
from helpers import add_file_index, attach_common, monthly_finance
bp = Blueprint("api", __name__)
ALL_TENANTS = ["总工作台", "科普·无界", "科研·无界", "医患·无界", "MCN·无界", "学会·无界"]
TABLES = {
"sales": ("sales_leads", ["target_customer", "priority", "status", "tenant"]),
"proposals": ("business_proposals", ["customer_or_project_name", "version", "description", "status", "created_date", "proposal_type", "notes", "tenant"]),
"operations": ("operation_projects", ["project_name", "project_version", "project_type", "project_status", "current_stage", "owner", "target_customer", "customer_need", "expected_contract_amount", "expected_sign_date", "sign_probability", "next_action", "sop_stage", "execution_progress", "current_deliverable", "risks", "notes", "tenant"]),
"products": ("product_versions", ["product_name", "version", "version_goal", "priority", "start_date", "plan_date", "dev_done_date", "test_date", "launch_date", "status", "notes", "tenant"]),
"finance": ("finance_records", ["month", "project_name", "record_type", "category", "amount", "occurred_date", "notes", "tenant"]),
"tasks": ("project_tasks", ["project_id", "phase", "milestone", "task", "owner", "due_date", "blockers", "notes", "status", "sort_order", "priority", "tenant"]),
"projectFinances": ("project_finances", ["project_id", "tenant", "business_type", "customer_name", "sign_amount", "sign_month", "status", "sales_person", "owner", "total_rev", "total_gross", "total_payment", "total_cost", "total_paid", "budget_data"]),
}
# ---------- 鉴权装饰器 ----------
def login_required(f):
from functools import wraps
@wraps(f)
def decorated(*args, **kwargs):
if "user_id" not in session:
return jsonify({"error": "未登录"}), 401
return f(*args, **kwargs)
return decorated
def admin_required(f):
from functools import wraps
@wraps(f)
def decorated(*args, **kwargs):
if "user_id" not in session:
return jsonify({"error": "未登录"}), 401
if session.get("role") != "admin":
return jsonify({"error": "无权限"}), 403
return f(*args, **kwargs)
return decorated
# ---------- 认证路由 ----------
@bp.route("/login")
def login_page():
return render_template("login.html")
@bp.route("/api/auth/login", methods=["POST"])
def auth_login():
data = request.get_json(force=True) or {}
username = data.get("username", "").strip()
password = data.get("password", "")
conn = db()
try:
user = one(conn, "SELECT * FROM users WHERE username=?", (username,))
if not user or not check_password_hash(user["password_hash"], password):
return jsonify({"error": "用户名或密码错误"}), 401
session["user_id"] = user["id"]
session["username"] = user["username"]
session["display_name"] = user["display_name"]
session["role"] = user["role"]
if user["role"] == "admin":
session["tenants"] = ["总工作台", "科普·无界", "科研·无界", "医患·无界", "MCN·无界", "学会·无界"]
else:
ut = rows(conn, "SELECT tenant FROM user_tenants WHERE user_id=?", (user["id"],))
session["tenants"] = [x["tenant"] for x in ut]
return jsonify({
"ok": True,
"user": {"id": user["id"], "username": user["username"], "display_name": user["display_name"], "role": user["role"]},
"tenants": session["tenants"],
})
finally:
conn.close()
@bp.route("/api/auth/logout", methods=["POST"])
def auth_logout():
session.clear()
return jsonify({"ok": True})
@bp.route("/api/auth/me")
def auth_me():
if "user_id" not in session:
return jsonify({"logged_in": False})
tenants = session.get("tenants", [])
if "总工作台" not in tenants:
tenants = ["总工作台"] + tenants
return jsonify({
"logged_in": True,
"user": {"id": session["user_id"], "username": session["username"], "display_name": session["display_name"], "role": session["role"]},
"tenants": tenants,
})
# ---------- 账号管理 ----------
@bp.route("/api/users")
@admin_required
def list_users():
conn = db()
try:
users = rows(conn, "SELECT id, username, display_name, role, created_at FROM users ORDER BY id")
ut_rows = rows(conn, "SELECT user_id, tenant FROM user_tenants")
tenant_map = {}
for r in ut_rows:
tenant_map.setdefault(r["user_id"], []).append(r["tenant"])
for u in users:
u["tenants"] = tenant_map.get(u["id"], [])
return jsonify(users)
finally:
conn.close()
@bp.route("/api/users", methods=["POST"])
@admin_required
def create_user():
data = request.get_json(force=True)
username = (data.get("username") or "").strip()
display_name = (data.get("display_name") or "").strip()
password = data.get("password") or ""
role = data.get("role") or "opc_owner"
tenants = data.get("tenants") or []
if not username or not password or not display_name:
return jsonify({"error": "用户名/密码/显示名不能为空"}), 400
if role not in ("admin", "opc_owner"):
return jsonify({"error": "角色非法"}), 400
conn = db()
try:
if one(conn, "SELECT id FROM users WHERE username=?", (username,)):
return jsonify({"error": "用户名已存在"}), 400
_exec(conn, "INSERT INTO users (username, password_hash, display_name, role, created_at) VALUES (?,?,?,?,?)",
(username, generate_password_hash(password, "pbkdf2:sha256"), display_name, role, date.today().isoformat()))
u = one(conn, "SELECT id FROM users WHERE username=?", (username,))
for t in tenants:
if t in ALL_TENANTS:
_exec(conn, "INSERT IGNORE INTO user_tenants (user_id, tenant) VALUES (?,?)", (u["id"], t))
conn.commit()
return jsonify({"ok": True, "id": u["id"]})
finally:
conn.close()
@bp.route("/api/users/<int:uid>", methods=["PUT"])
@admin_required
def update_user(uid):
data = request.get_json(force=True)
conn = db()
try:
u = one(conn, "SELECT * FROM users WHERE id=?", (uid,))
if not u:
return jsonify({"error": "用户不存在"}), 404
display_name = (data.get("display_name") or "").strip() or u["display_name"]
role = data.get("role") or u["role"]
if role not in ("admin", "opc_owner"):
return jsonify({"error": "角色非法"}), 400
password = data.get("password") or ""
if password:
_exec(conn, "UPDATE users SET display_name=?, role=?, password_hash=? WHERE id=?",
(display_name, role, generate_password_hash(password, "pbkdf2:sha256"), uid))
else:
_exec(conn, "UPDATE users SET display_name=?, role=? WHERE id=?", (display_name, role, uid))
if "tenants" in data:
_exec(conn, "DELETE FROM user_tenants WHERE user_id=?", (uid,))
for t in data["tenants"]:
if t in ALL_TENANTS:
_exec(conn, "INSERT IGNORE INTO user_tenants (user_id, tenant) VALUES (?,?)", (uid, t))
if role != "admin":
admin_count = one(conn, "SELECT COUNT(*) AS c FROM users WHERE role='admin'")["c"]
if admin_count == 0:
return jsonify({"error": "至少保留一个管理员"}), 400
conn.commit()
return jsonify({"ok": True})
finally:
conn.close()
@bp.route("/api/users/<int:uid>", methods=["DELETE"])
@admin_required
def delete_user(uid):
if uid == session.get("user_id"):
return jsonify({"error": "不能删除当前登录账号"}), 400
conn = db()
try:
u = one(conn, "SELECT * FROM users WHERE id=?", (uid,))
if not u:
return jsonify({"error": "用户不存在"}), 404
if u["role"] == "admin":
admin_count = one(conn, "SELECT COUNT(*) AS c FROM users WHERE role='admin'")["c"]
if admin_count <= 1:
return jsonify({"error": "至少保留一个管理员"}), 400
_exec(conn, "DELETE FROM user_tenants WHERE user_id=?", (uid,))
_exec(conn, "DELETE FROM users WHERE id=?", (uid,))
conn.commit()
return jsonify({"ok": True})
finally:
conn.close()
@bp.route("/api/tenants")
def list_tenants():
return jsonify(ALL_TENANTS)
# ---------- 首页 + bootstrap ----------
@bp.route("/")
def index():
if "user_id" not in session:
return redirect("/login")
return render_template("index.html")
@bp.route("/api/bootstrap")
def bootstrap():
if "user_id" not in session:
return jsonify({"error": "未登录"}), 401
tenant = request.args.get("tenant", session.get("tenants", ["科普·无界"])[0])
allowed = session.get("tenants", [])
if "总工作台" not in allowed:
allowed = ["总工作台"] + allowed
if tenant not in allowed:
tenant = allowed[0]
conn = db()
try:
# 总工作台:聚合所有工作台的首页数据
if tenant == "总工作台":
real_tenants = [t for t in allowed if t != "总工作台"]
all_metrics = []
all_monthly = []
all_recent = []
for t in real_tenants:
t_pfs = rows(conn, "SELECT * FROM project_finances WHERE tenant=? ORDER BY id DESC", [t])
t_ops = attach_common(conn, "operations", rows(conn, "SELECT * FROM operation_projects WHERE tenant=? ORDER BY id ASC", [t]))
t_sales = attach_common(conn, "sales", rows(conn, "SELECT * FROM sales_leads WHERE tenant=? ORDER BY id DESC", [t]))
t_products = attach_common(conn, "products", rows(conn, "SELECT * FROM product_versions WHERE tenant=? ORDER BY id DESC", [t]))
t_proposals = attach_common(conn, "proposals", rows(conn, "SELECT * FROM business_proposals WHERE tenant=? ORDER BY id DESC", [t]))
t_signed_pfs = [x for x in t_pfs if x["status"] == "已签约"]
def t_parse_budget(pf):
try:
budget = json.loads(pf.get("budget_data") or "[]")
except (json.JSONDecodeError, TypeError):
budget = []
return {(b.get("month") or "").replace("-", "_"): b for b in budget}
t_bm = {pf["id"]: t_parse_budget(pf) for pf in t_pfs}
def t_sum_budget(field, months_range):
total = 0
for pf in t_pfs:
bm = t_bm.get(pf["id"], {})
for m in months_range:
b = bm.get(f"2026_{m:02d}")
if b:
total += float(b.get(field) or 0)
return total
_now_month = date.today().month
_q_start = ((_now_month - 1) // 3) * 3 + 1
_q_range = range(_q_start, _q_start + 3)
_q_months = [f"2026-{m:02d}" for m in _q_range]
all_metrics.append({
"total_projects": len(t_signed_pfs),
"total_proposals": len(t_ops),
"total_products": len(t_proposals),
"upcoming_products": len(t_products),
"signed_amount": sum(x["sign_amount"] or 0 for x in t_pfs if x["status"] == "已签约"),
"signed_annual": sum(x["sign_amount"] or 0 for x in t_pfs if x["status"] == "已签约"),
"signed_q2": sum(x["sign_amount"] or 0 for x in t_pfs if x["status"] == "已签约" and (x.get("sign_month") or "")[:7] in _q_months),
"signed_month": sum(x["sign_amount"] or 0 for x in t_pfs if x["status"] == "已签约" and (x.get("sign_month") or "")[:7] == f"2026-{_now_month:02d}"),
"revenue_annual": t_sum_budget("rev", range(1, 13)),
"revenue_q2": t_sum_budget("rev", _q_range),
"monthly_revenue": t_sum_budget("rev", [_now_month]),
"gross_annual": t_sum_budget("gross", range(1, 13)),
"gross_q2": t_sum_budget("gross", _q_range),
"monthly_net_profit": t_sum_budget("gross", [_now_month]),
"payment_annual": t_sum_budget("payment", range(1, 13)),
"payment_q2": t_sum_budget("payment", _q_range),
"payment_month": t_sum_budget("payment", [_now_month]),
"cost_annual": t_sum_budget("cost", range(1, 13)),
"cost_q2": t_sum_budget("cost", _q_range),
"cost_month": t_sum_budget("cost", [_now_month]),
})
all_monthly.append(monthly_finance(conn, t))
all_recent.extend(rows(conn, "SELECT * FROM follow_up_records WHERE tenant=? ORDER BY id DESC LIMIT 4", [t]))
agg = {}
for key in ["total_projects","total_proposals","total_products","upcoming_products","signed_amount","signed_annual","signed_q2","signed_month","revenue_annual","revenue_q2","monthly_revenue","gross_annual","gross_q2","monthly_net_profit","payment_annual","payment_q2","payment_month","cost_annual","cost_q2","cost_month"]:
agg[key] = sum(m.get(key, 0) for m in all_metrics)
merged_monthly = []
for i in range(12):
m = {"month": all_monthly[0][i]["month"] if all_monthly and len(all_monthly[0]) > i else f"2026-{i+1:02d}"}
for field in ["revenue","gross","payment","cost","sign"]:
m[field] = sum(tl[i][field] if i < len(tl) else 0 for tl in all_monthly)
merged_monthly.append(m)
summary = {
"project_name": "总工作台",
"metrics": agg,
"recent": sorted(all_recent, key=lambda x: x.get("id", 0), reverse=True)[:8],
"risks": [],
}
return jsonify({"summary": summary, "sales": [], "proposals": [], "operations": [], "products": [], "finance": [], "projectFinances": [], "financeMonthly": merged_monthly, "tasks": [], "tenant": tenant, "tenants": allowed})
def q(sql, *args):
return rows(conn, sql, args)
sales = attach_common(conn, "sales", q("SELECT * FROM sales_leads WHERE tenant=? ORDER BY id DESC", tenant))
proposals = attach_common(conn, "proposals", q("SELECT * FROM business_proposals WHERE tenant=? ORDER BY id DESC", tenant))
operations = attach_common(conn, "operations", q("SELECT * FROM operation_projects WHERE tenant=? ORDER BY id ASC", tenant))
products = attach_common(conn, "products", q("SELECT * FROM product_versions WHERE tenant=? ORDER BY id DESC", tenant))
finance = q("SELECT * FROM finance_records WHERE tenant=? ORDER BY month DESC, id DESC", tenant)
tasks = q("SELECT * FROM project_tasks WHERE tenant=? ORDER BY phase, sort_order, id", tenant)
pfs = q("SELECT * FROM project_finances WHERE tenant=? ORDER BY id DESC", tenant)
signed_pfs = [x for x in pfs if x["status"] == "已签约"]
def parse_budget(pf):
try:
budget = json.loads(pf.get("budget_data") or "[]")
except (json.JSONDecodeError, TypeError):
budget = []
return {(b.get("month") or "").replace("-", "_"): b for b in budget}
budget_maps = [(pf, parse_budget(pf)) for pf in signed_pfs]
def sum_budget(field, months_range):
total = 0
for pf, bm in budget_maps:
for m in months_range:
b = bm.get(f"2026_{m:02d}")
if b:
total += float(b.get(field) or 0)
return total
_now_month = date.today().month
_q_start = ((_now_month - 1) // 3) * 3 + 1
_q_range = range(_q_start, _q_start + 3)
rev_annual = sum_budget("rev", range(1, 13))
gross_annual = sum_budget("gross", range(1, 13))
rev_q2 = sum_budget("rev", _q_range)
gross_q2 = sum_budget("gross", _q_range)
rev_month = sum_budget("rev", [_now_month])
gross_month = sum_budget("gross", [_now_month])
payment_annual = sum_budget("payment", range(1, 13))
cost_annual = sum_budget("cost", range(1, 13))
payment_q2 = sum_budget("payment", _q_range)
cost_q2 = sum_budget("cost", _q_range)
payment_month = sum_budget("payment", [_now_month])
cost_month = sum_budget("cost", [_now_month])
def pf_status_sum(status):
return sum(x["sign_amount"] or 0 for x in pfs if x["status"] == status)
signed_amount = pf_status_sum("已签约")
signed_annual = sum(x["sign_amount"] or 0 for x in pfs if x["status"] == "已签约")
_q_months = [f"2026-{m:02d}" for m in _q_range]
signed_q2 = sum(x["sign_amount"] or 0 for x in pfs if x["status"] == "已签约" and (x.get("sign_month") or "")[:7] in _q_months)
signed_month = sum(x["sign_amount"] or 0 for x in pfs if x["status"] == "已签约" and (x.get("sign_month") or "")[:7] == f"2026-{_now_month:02d}")
pipeline_amount = sum(x["expected_contract_amount"] or 0 for x in operations if x["project_status"] not in ["已签约","已丢单","已归档","已完成"])
signed_not_executed = sum(x["expected_contract_amount"] or 0 for x in operations if x["project_type"] == "execution" and x["execution_progress"] < 100)
summary = {
"project_name": "科普(慰心斋)",
"metrics": {
"p0_customers": len([x for x in sales if x["priority"] == "P0"]),
"active_sales": len([x for x in sales if x["status"] in ["待跟进", "跟进中", "方案中", "商务谈判"]]),
"execution_projects": len([x for x in operations if x["project_type"] == "execution"]),
"risk_projects": len([x for x in operations if x["project_status"] == "有风险" or x["risks"]]),
"monthly_revenue": rev_month,
"monthly_net_profit": gross_month,
"monthly_gross": gross_month,
"upcoming_products": len(products),
"total_projects": len(signed_pfs),
"total_proposals": len(operations),
"total_products": len(proposals),
"signed_amount": signed_amount,
"signed_annual": signed_annual,
"signed_q2": signed_q2,
"signed_month": signed_month,
"pipeline_amount": pipeline_amount,
"revenue_annual": rev_annual,
"revenue_q2": rev_q2,
"gross_annual": gross_annual,
"gross_q2": gross_q2,
"payment_annual": payment_annual,
"payment_q2": payment_q2,
"payment_month": payment_month,
"cost_annual": cost_annual,
"cost_q2": cost_q2,
"cost_month": cost_month,
"signed_not_executed": signed_not_executed,
},
"recent": q("SELECT * FROM follow_up_records WHERE tenant=? ORDER BY id DESC LIMIT 8", tenant),
"risks": [{"title": "执行提醒", "content": x["next_action"]} for x in operations if x["next_action"]][:5],
}
return jsonify({"summary": summary, "sales": sales, "proposals": proposals, "operations": operations, "products": products, "finance": finance, "projectFinances": pfs, "financeMonthly": monthly_finance(conn, tenant), "tasks": tasks, "tenant": tenant, "tenants": allowed})
finally:
conn.close()
# ---------- 通用 CRUD ----------
@bp.route("/api/<resource>", methods=["POST"])
@login_required
def create_resource(resource):
if resource not in TABLES:
return jsonify({"error": "unknown resource"}), 404
table, cols = TABLES[resource]
payload = request.get_json(force=True).get("data", {})
if resource == "tasks":
valid_statuses = ["未开始", "进行中", "已结束"]
if not payload.get("status") or payload["status"] not in valid_statuses:
payload["status"] = "未开始"
conn = db()
try:
type_cur = conn.cursor()
type_cur.execute(f"DESCRIBE {table}")
col_types = {r[0]: r[1].upper() for r in type_cur.fetchall()}
type_cur.close()
values = []
for col in cols:
val = payload.get(col, "")
if val == "" and ("DOUBLE" in col_types.get(col, "") or "INT" in col_types.get(col, "")):
val = 0
values.append(val)
cur = _exec(conn, f"INSERT INTO {table} ({','.join(cols)}) VALUES ({','.join(['?'] * len(cols))})", values)
conn.commit()
return jsonify({"id": cur.lastrowid})
finally:
conn.close()
@bp.route("/api/<resource>/<int:item_id>", methods=["PUT", "DELETE"])
@login_required
def update_resource(resource, item_id):
if resource not in TABLES:
return jsonify({"error": "unknown resource"}), 404
table, cols = TABLES[resource]
conn = db()
try:
if request.method == "DELETE":
_exec(conn, f"DELETE FROM {table} WHERE id=?", (item_id,))
conn.commit()
return jsonify({"ok": True})
payload = request.get_json(force=True).get("data", {})
if resource == "tasks" and "status" in payload:
valid_statuses = ["未开始", "进行中", "已结束"]
if not payload["status"] or payload["status"] not in valid_statuses:
payload["status"] = "未开始"
if resource == "products":
cur = _exec(conn, f"SELECT start_date FROM {table} WHERE id=?", (item_id,))
row = cur.fetchone()
cur.close()
current_start = (row or {}).get("start_date", "") or ""
new_start = payload.get("start_date", current_start)
if "start_date" in payload and not new_start:
return jsonify({"error": "启动时间为必填项"}), 400
date_fields = ["plan_date", "dev_done_date", "test_date", "launch_date"]
for f in date_fields:
if f in payload and payload[f] and new_start and payload[f] < new_start:
labels = {"plan_date": "产品方案", "dev_done_date": "研发完成", "test_date": "测试完成", "launch_date": "上线时间"}
return jsonify({"error": f"{labels[f]}不能早于启动时间({new_start}"}), 400
update_cols = [col for col in cols if col in payload]
if update_cols:
_exec(conn,
f"UPDATE {table} SET {','.join([col + '=?' for col in update_cols])}, updated_at=? WHERE id=?",
[payload[col] for col in update_cols] + [now(), item_id],
)
conn.commit()
return jsonify({"ok": True})
finally:
conn.close()
# ---------- followups ----------
@bp.route("/api/followups/<target_type>/<int:target_id>", methods=["POST"])
@login_required
def add_followup(target_type, target_id):
payload = request.get_json(force=True).get("data", {})
conn = db()
try:
_exec(conn,
"""INSERT INTO follow_up_records
(target_type,target_id,followed_at,follower,follow_up_method,content,next_action,next_follow_up_at,tenant)
VALUES (?,?,?,?,?,?,?,?,?)""",
(
target_type,
target_id,
payload.get("followed_at") or date.today().isoformat(),
payload.get("follower") or "慰心",
payload.get("follow_up_method") or "记录",
payload.get("content") or "",
payload.get("next_action") or "",
payload.get("next_follow_up_at") or "",
payload.get("tenant") or "科普·无界",
),
)
conn.commit()
return jsonify({"ok": True})
finally:
conn.close()
@bp.route("/api/followups/<int:followup_id>", methods=["DELETE"])
@login_required
def delete_followup(followup_id):
conn = db()
try:
cur = _exec(conn, "DELETE FROM follow_up_records WHERE id=?", (followup_id,))
conn.commit()
if cur.rowcount == 0:
return jsonify({"error": "not found"}), 404
return jsonify({"ok": True})
finally:
conn.close()
# ---------- batch sort ----------
@bp.route("/api/tasks/batch-sort", methods=["POST"])
@login_required
def batch_sort_tasks():
conn = db()
try:
items = request.get_json(force=True).get("items", [])
for item in items:
_exec(conn, "UPDATE project_tasks SET sort_order=? WHERE id=?", (item["sort_order"], item["id"]))
conn.commit()
return jsonify({"ok": True})
finally:
conn.close()
@bp.route("/api/operations/batch-sort", methods=["POST"])
@login_required
def batch_sort_operations():
conn = db()
try:
items = request.get_json(force=True).get("items", [])
for item in items:
_exec(conn, "UPDATE operation_projects SET sort_order=? WHERE id=?", (item["sort_order"], item["id"]))
conn.commit()
return jsonify({"ok": True})
finally:
conn.close()
# ---------- files ----------
@bp.route("/api/files/upload", methods=["POST"])
@login_required
def upload_file():
file = request.files["file"]
module = request.form["module"]
owner_id = int(request.form["owner_id"])
owner_version = request.form.get("owner_version", "")
category = request.form.get("file_category", "")
folder = UPLOAD_DIR / module / str(owner_id)
folder.mkdir(parents=True, exist_ok=True)
target = folder / file.filename
file.save(target)
conn = db()
try:
add_file_index(conn, module, owner_id, owner_version, category, target, external=False)
conn.commit()
return jsonify({"ok": True})
finally:
conn.close()
@bp.route("/api/files/<int:file_id>/content")
def file_content(file_id):
conn = db()
try:
asset = one(conn, "SELECT * FROM file_assets WHERE id=?", (file_id,))
if not asset:
return jsonify({"error": "not found"}), 404
path = Path(asset["file_path"])
if not path.exists():
return jsonify({"error": "missing"}), 404
return send_file(path, as_attachment=request.args.get("inline") == "false", download_name=asset["file_name"])
finally:
conn.close()
@bp.route("/api/files/<int:file_id>", methods=["DELETE"])
@login_required
def delete_file(file_id):
conn = db()
try:
asset = one(conn, "SELECT * FROM file_assets WHERE id=?", (file_id,))
if not asset:
return jsonify({"error": "not found"}), 404
path = Path(asset["file_path"])
if path.exists() and str(UPLOAD_DIR) in str(path.resolve()):
path.unlink(missing_ok=True)
_exec(conn, "DELETE FROM file_assets WHERE id=?", (file_id,))
conn.commit()
return jsonify({"ok": True})
finally:
conn.close()
# ---------- health ----------
@bp.route("/api/health")
def health():
return jsonify({"ok": True, "service": "opc-manager"})