450 lines
17 KiB
Python
450 lines
17 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""ziwei-power Flask 应用 — 磁场管理打卡系统"""
|
||
|
||
import os
|
||
from datetime import timedelta
|
||
from functools import wraps
|
||
from flask import Flask, render_template, request, jsonify, session, redirect, url_for
|
||
from werkzeug.security import generate_password_hash, check_password_hash
|
||
from database import init_db, get_checkin, save_checkin, delete_checkin, get_all_checkins, get_wishes, save_wish, update_wish, delete_wish, reorder_wishes
|
||
|
||
app = Flask(__name__)
|
||
# 固定密钥确保 gunicorn 多 worker 下 session 可互通
|
||
app.secret_key = os.environ.get('SECRET_KEY', 'ziwei-power-secret-2026')
|
||
|
||
# 会话持久化:30 天
|
||
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=30)
|
||
|
||
# ── 用户库 ────────────────────────────────────────────
|
||
|
||
USERS = {
|
||
'qiukai': {
|
||
'password_hash': generate_password_hash('AiAlex2018$'),
|
||
'name': '凯哥'
|
||
}
|
||
}
|
||
|
||
# ── 登录检查装饰器 ────────────────────────────────────
|
||
|
||
def login_required(f):
|
||
@wraps(f)
|
||
def decorated(*args, **kwargs):
|
||
if not session.get('logged_in'):
|
||
return redirect(url_for('login_page', next=request.path))
|
||
return f(*args, **kwargs)
|
||
return decorated
|
||
|
||
|
||
# ── 登录路由 ──────────────────────────────────────────
|
||
|
||
@app.route('/login', methods=['GET', 'POST'])
|
||
def login_page():
|
||
if request.method == 'POST':
|
||
username = request.form.get('username', '').strip()
|
||
password = request.form.get('password', '')
|
||
remember = request.form.get('remember') == 'on'
|
||
|
||
user = USERS.get(username)
|
||
if user and check_password_hash(user['password_hash'], password):
|
||
session.permanent = remember
|
||
session['logged_in'] = True
|
||
session['username'] = username
|
||
session['display_name'] = user['name']
|
||
next_url = request.args.get('next', '/')
|
||
return jsonify({'ok': True, 'next': next_url})
|
||
else:
|
||
return jsonify({'ok': False, 'error': '账号或密码错误'}), 401
|
||
|
||
# GET:如果已登录直接跳转
|
||
if session.get('logged_in'):
|
||
return redirect(url_for('index'))
|
||
return render_template('login.html')
|
||
|
||
|
||
@app.route('/logout')
|
||
def logout():
|
||
session.clear()
|
||
return redirect(url_for('login_page'))
|
||
|
||
|
||
# ── 主页 ──────────────────────────────────────────────
|
||
|
||
def compute_stats():
|
||
"""计算统计数据,供 API 和模板共用"""
|
||
rows = get_all_checkins()
|
||
total_days = len(rows)
|
||
total_morning = 0
|
||
total_evening = 0
|
||
total_study = 0
|
||
calendar = {}
|
||
all_morning_items = []
|
||
all_evening_items = []
|
||
all_study_items = []
|
||
|
||
# 关键词 → 板块 自动归类规则
|
||
PILLAR_KEYWORDS = {
|
||
'医药营销': ['科普','科研','学术会议','学术','会议','患者管理','临床推广','临床','药企','处方','KOL','代表','拜访','荣昌','恒瑞','信达','鲁银','OPC','项目','标杆','客户'],
|
||
'医疗服务': ['诊疗','治疗','医生','医院','科室','手术','门诊','患者','病','护理','诊断','影像','检验'],
|
||
'医疗支付': ['医保','商保','理赔','支付','保险','费用','报销','商业保险','保单'],
|
||
'AI 智能': ['AI','数字人','智能','算法','自动化','平台','系统','网站','开发','代码','程序','模型'],
|
||
'公司治理': ['团队','组织','管理','招聘','HR','制度','流程','财务','考核','KPI','规划','人才','架构','预算'],
|
||
'个人修养': ['早起','睡觉','俯卧撑','运动','冥想','阅读','复盘','写作','习惯','修身','立志','责善','改过','勤学'],
|
||
}
|
||
|
||
def classify_auto(text):
|
||
"""根据文本内容自动归类"""
|
||
if not text or not text.strip():
|
||
return ''
|
||
text_lower = text.strip()
|
||
for pillar, keywords in PILLAR_KEYWORDS.items():
|
||
for kw in keywords:
|
||
if kw in text_lower:
|
||
return pillar
|
||
return '' # 未匹配
|
||
|
||
for row in rows:
|
||
d = row['date']
|
||
data = row['data']
|
||
morning = data.get('morning', [])
|
||
evening = data.get('evening', [])
|
||
study = data.get('study', [])
|
||
|
||
# 收集并自动归类
|
||
for mi in morning:
|
||
text = mi if isinstance(mi, str) else mi.get('text', '')
|
||
if isinstance(text, str) and text.strip():
|
||
auto_pillar = mi.get('pillar', '') if isinstance(mi, dict) else ''
|
||
if not auto_pillar:
|
||
auto_pillar = classify_auto(text)
|
||
all_morning_items.append({'text': text.strip(), 'pillar': auto_pillar})
|
||
|
||
for ei in evening:
|
||
if isinstance(ei, dict):
|
||
m = (ei.get('mistake') or '').strip()
|
||
imp = (ei.get('improvement') or '').strip()
|
||
if m or imp:
|
||
pillar = ei.get('pillar', '') or classify_auto(m + ' ' + imp)
|
||
all_evening_items.append({'mistake': m, 'improvement': imp, 'pillar': pillar})
|
||
elif isinstance(ei, str) and ei.strip():
|
||
all_evening_items.append({'mistake': ei.strip(), 'improvement': '', 'pillar': classify_auto(ei)})
|
||
|
||
for si in study:
|
||
name = si.get('name', '') if isinstance(si, dict) else str(si)
|
||
if name.strip():
|
||
pillar = si.get('pillar', '') if isinstance(si, dict) else ''
|
||
if not pillar:
|
||
pillar = classify_auto(name)
|
||
all_study_items.append({
|
||
'name': name.strip(),
|
||
'done': si.get('done', False) if isinstance(si, dict) else False,
|
||
'note': si.get('note', '') if isinstance(si, dict) else '',
|
||
'pillar': pillar
|
||
})
|
||
|
||
morning_count = sum(1 for x in morning if (
|
||
isinstance(x, str) and x.strip() or
|
||
isinstance(x, dict) and (x.get('text', '') or '').strip()
|
||
))
|
||
evening_count = sum(1 for x in evening if (
|
||
isinstance(x, str) and x.strip() or
|
||
isinstance(x, dict) and (x.get('mistake', '') or '').strip()
|
||
))
|
||
study_count = sum(1 for x in study if x.get('done'))
|
||
|
||
total_morning += morning_count
|
||
total_evening += evening_count
|
||
total_study += study_count
|
||
|
||
# 新评分:3 项都有 → 达标 60 分 + 额外加分
|
||
all_three = morning_count > 0 and evening_count > 0 and study_count > 0
|
||
if all_three:
|
||
extra = (morning_count - 1) + (evening_count - 1) + (study_count - 1)
|
||
# day_score = 60 + extra * 5 (不在此处使用,仅用于日历状态)
|
||
calendar[d] = 'pass' if all_three else 'fail'
|
||
|
||
# 构建板块统计
|
||
pillars = ['医疗服务','医药营销','医疗支付','AI 智能','公司治理','个人修养']
|
||
pillar_breakdown = {}
|
||
for p in pillars:
|
||
pillar_breakdown[p] = {
|
||
'morning': 0, 'evening': 0, 'study': 0,
|
||
'morning_items': [], 'evening_items': [], 'study_items': []
|
||
}
|
||
for mi in all_morning_items:
|
||
p = mi['pillar']
|
||
if p in pillar_breakdown:
|
||
pillar_breakdown[p]['morning'] += 1
|
||
pillar_breakdown[p]['morning_items'].append(mi)
|
||
for ei in all_evening_items:
|
||
p = ei['pillar']
|
||
if p in pillar_breakdown:
|
||
pillar_breakdown[p]['evening'] += 1
|
||
pillar_breakdown[p]['evening_items'].append(ei)
|
||
for si in all_study_items:
|
||
p = si['pillar']
|
||
if p in pillar_breakdown:
|
||
pillar_breakdown[p]['study'] += 1
|
||
pillar_breakdown[p]['study_items'].append(si)
|
||
|
||
return dict(
|
||
total_days=total_days, total_morning=total_morning,
|
||
total_evening=total_evening, total_study=total_study,
|
||
calendar=calendar,
|
||
morning_items=all_morning_items,
|
||
evening_items=all_evening_items,
|
||
study_items=all_study_items,
|
||
pillar_breakdown=pillar_breakdown
|
||
)
|
||
|
||
|
||
@app.route('/')
|
||
@login_required
|
||
def index():
|
||
import json
|
||
stats = compute_stats()
|
||
wishes = [dict(w) for w in get_wishes()]
|
||
return render_template('index.html',
|
||
username=session.get('display_name', session.get('username', '')),
|
||
initial_stats=json.dumps(stats, ensure_ascii=False),
|
||
initial_wishes=json.dumps(wishes, ensure_ascii=False))
|
||
|
||
|
||
# ── API ──────────────────────────────────────────────
|
||
|
||
@app.route('/api/checkin', methods=['GET'])
|
||
@login_required
|
||
def api_get_checkin():
|
||
date = request.args.get('date', '')
|
||
if not date:
|
||
return jsonify({'ok': False, 'error': '缺少 date 参数'}), 400
|
||
row = get_checkin(date)
|
||
return jsonify({'ok': True, 'data': row})
|
||
|
||
|
||
@app.route('/api/checkin', methods=['POST'])
|
||
@login_required
|
||
def api_save_checkin():
|
||
body = request.get_json(force=True)
|
||
date = body.get('date', '')
|
||
if not date:
|
||
return jsonify({'ok': False, 'error': '缺少 date 字段'}), 400
|
||
data = body.get('data', {})
|
||
save_checkin(date, data)
|
||
return jsonify({'ok': True})
|
||
|
||
|
||
@app.route('/api/checkin/<date>', methods=['DELETE'])
|
||
@login_required
|
||
def api_delete_checkin(date):
|
||
delete_checkin(date)
|
||
return jsonify({'ok': True})
|
||
|
||
|
||
@app.route('/api/history', methods=['GET'])
|
||
@login_required
|
||
def api_history():
|
||
rows = get_all_checkins()
|
||
return jsonify({'ok': True, 'data': rows})
|
||
|
||
|
||
@app.route('/api/stats', methods=['GET'])
|
||
@login_required
|
||
def api_stats():
|
||
stats = compute_stats()
|
||
return jsonify({'ok': True, **stats})
|
||
|
||
|
||
@app.route('/api/user', methods=['GET'])
|
||
@login_required
|
||
def api_user():
|
||
return jsonify({
|
||
'ok': True,
|
||
'username': session.get('username'),
|
||
'display_name': session.get('display_name')
|
||
})
|
||
|
||
|
||
# ── 心愿清单 API ──────────────────────────────
|
||
|
||
@app.route('/api/wishes', methods=['GET'])
|
||
@login_required
|
||
def api_get_wishes():
|
||
wishes = [dict(w) for w in get_wishes()]
|
||
return jsonify({'ok': True, 'data': wishes})
|
||
|
||
|
||
@app.route('/api/wishes', methods=['POST'])
|
||
@login_required
|
||
def api_create_wish():
|
||
body = request.get_json(force=True)
|
||
name = body.get('name', '').strip()
|
||
if not name:
|
||
return jsonify({'ok': False, 'error': '名称不能为空'}), 400
|
||
quadrant = body.get('quadrant', '重要不紧急')
|
||
deadline = body.get('deadline', '')
|
||
wid = save_wish(name, quadrant, deadline)
|
||
return jsonify({'ok': True, 'id': wid})
|
||
|
||
|
||
@app.route('/api/wishes/<int:wish_id>', methods=['PUT'])
|
||
@login_required
|
||
def api_update_wish(wish_id):
|
||
body = request.get_json(force=True)
|
||
update_wish(wish_id, **body)
|
||
return jsonify({'ok': True})
|
||
|
||
|
||
@app.route('/api/wishes/<int:wish_id>', methods=['DELETE'])
|
||
@login_required
|
||
def api_delete_wish(wish_id):
|
||
delete_wish(wish_id)
|
||
return jsonify({'ok': True})
|
||
|
||
|
||
@app.route('/api/wishes/reorder', methods=['PUT'])
|
||
@login_required
|
||
def api_reorder_wishes():
|
||
body = request.get_json(force=True)
|
||
order = body.get('order', [])
|
||
if not isinstance(order, list):
|
||
return jsonify({'ok': False, 'error': 'order 必须是列表'}), 400
|
||
reorder_wishes(order)
|
||
return jsonify({'ok': True})
|
||
|
||
|
||
# ── 日历同步 API ────────────────────────────
|
||
|
||
CALENDAR_CACHE = os.path.join(os.path.expanduser('~'), '.workbuddy', 'data', 'ziwei-power', 'calendar_cache.json')
|
||
DING_MCP_URL = 'https://mcp-gw.dingtalk.com/server/95959163f85d8b58a167f65cd8bd3d22690b16c75ebacd0fa095016396a10e0d?key=0993e4d4e44c50ea25a0db840cc5815e'
|
||
|
||
|
||
def _fetch_dingtalk_events(date_str):
|
||
"""调用钉钉 MCP 实时查询指定日期的日程"""
|
||
import json as _json, urllib.request as _ur
|
||
from datetime import datetime, timezone, timedelta
|
||
tz = timezone(timedelta(hours=8))
|
||
try:
|
||
d = datetime.strptime(date_str, '%Y-%m-%d').replace(tzinfo=tz)
|
||
except ValueError:
|
||
return []
|
||
start_ts = int(d.replace(hour=0, minute=0, second=0).timestamp() * 1000)
|
||
end_ts = int(d.replace(hour=23, minute=59, second=59).timestamp() * 1000)
|
||
body = _json.dumps({
|
||
'jsonrpc': '2.0', 'method': 'tools/call', 'id': 1,
|
||
'params': {'name': 'list_calendar_events', 'arguments': {
|
||
'calendarId': 'primary', 'startTime': start_ts, 'endTime': end_ts, 'limit': 20
|
||
}}
|
||
}).encode('utf-8')
|
||
req = _ur.Request(DING_MCP_URL, data=body, headers={
|
||
'Content-Type': 'application/json', 'Accept': 'application/json'
|
||
})
|
||
try:
|
||
with _ur.urlopen(req, timeout=10) as resp:
|
||
data = _json.loads(resp.read())
|
||
events = data.get('result', {}).get('structuredContent', {}).get('result', {}).get('events', [])
|
||
except Exception:
|
||
return None # 网络错误返回 None,调用方回退缓存
|
||
results = []
|
||
for e in events:
|
||
summary = (e.get('summary') or '').strip()
|
||
if not summary:
|
||
continue
|
||
start = e.get('start', {}).get('dateTime', '')
|
||
end = e.get('end', {}).get('dateTime', '')
|
||
time_str = ''
|
||
if start and end:
|
||
time_str = start[11:16] + '-' + end[11:16]
|
||
results.append({
|
||
'date': date_str, 'summary': summary,
|
||
'time': time_str, 'location': e.get('location') or ''
|
||
})
|
||
# 写入缓存
|
||
try:
|
||
all_cached = []
|
||
try:
|
||
with open(CALENDAR_CACHE, 'r') as f:
|
||
all_cached = _json.load(f)
|
||
except (FileNotFoundError, _json.JSONDecodeError):
|
||
pass
|
||
# 替换同一天的旧缓存
|
||
all_cached = [c for c in all_cached if c.get('date') != date_str] + results
|
||
with open(CALENDAR_CACHE, 'w') as f:
|
||
_json.dump(all_cached, f, ensure_ascii=False, indent=2)
|
||
except Exception:
|
||
pass
|
||
return results
|
||
|
||
|
||
@app.route('/api/calendar-sync', methods=['GET'])
|
||
@login_required
|
||
def api_calendar_sync():
|
||
import json as _json
|
||
date = request.args.get('date', '')
|
||
if not date:
|
||
return jsonify({'ok': False, 'error': '缺少 date 参数'}), 400
|
||
# 优先实时查询钉钉 MCP
|
||
events = _fetch_dingtalk_events(date)
|
||
if events is None:
|
||
# MCP 不可用,回退缓存
|
||
try:
|
||
with open(CALENDAR_CACHE, 'r') as f:
|
||
all_cached = _json.load(f)
|
||
events = [e for e in all_cached if e.get('date') == date]
|
||
except (FileNotFoundError, _json.JSONDecodeError):
|
||
events = []
|
||
return jsonify({'ok': True, 'data': events})
|
||
|
||
|
||
@app.route('/api/calendar-sync-all', methods=['GET'])
|
||
@login_required
|
||
def api_calendar_sync_all():
|
||
"""批量查询过去15天~未来15天的钉钉日程,按日期分组返回,并自动填充到对应日期的 checkin"""
|
||
import json as _json
|
||
from datetime import datetime, timedelta
|
||
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
|
||
results = []
|
||
saved_days = 0
|
||
for delta in range(-15, 16):
|
||
d = today + timedelta(days=delta)
|
||
ds = d.strftime('%Y-%m-%d')
|
||
events = _fetch_dingtalk_events(ds)
|
||
if events:
|
||
results.append({'date': ds, 'events': events})
|
||
# 自动去重填充到对应日期的 checkin
|
||
existing = get_checkin(ds)
|
||
data = existing['data'] if existing else {'morning': [], 'evening': [], 'study': []}
|
||
morning = data.get('morning', [])
|
||
existing_texts = set()
|
||
for mi in morning:
|
||
t = mi if isinstance(mi, str) else mi.get('text', '')
|
||
if t.strip():
|
||
existing_texts.add(t.strip())
|
||
added = False
|
||
for evt in events:
|
||
summary = (evt.get('summary') or '').strip()
|
||
if not summary:
|
||
continue
|
||
text = f"【{evt['time']}】{summary}" if evt.get('time') else summary
|
||
loc = evt.get('location', '')
|
||
if loc:
|
||
text += f" @{loc}"
|
||
text = text.strip()
|
||
if text not in existing_texts:
|
||
morning.append(text)
|
||
existing_texts.add(text)
|
||
added = True
|
||
if added:
|
||
data['morning'] = morning
|
||
save_checkin(ds, data)
|
||
saved_days += 1
|
||
return jsonify({'ok': True, 'data': results, 'saved_days': saved_days})
|
||
|
||
|
||
# ── 启动 ──────────────────────────────────────────────
|
||
|
||
if __name__ == '__main__':
|
||
init_db()
|
||
app.config['TEMPLATES_AUTO_RELOAD'] = True
|
||
port = int(os.environ.get('PORT', 5058))
|
||
app.run(host='0.0.0.0', port=port, debug=False)
|