Files
ziwei-power/app.py
mac 350c9fb877 v1.5.3 — 日历同步按日期自动填充
- 后端 api_calendar_sync_all 查询后自动去重保存到各日期 checkin
- 不再只填充当前选中的日期
- 弹窗显示已填充天数
2026-06-05 12:19:22 +08:00

450 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""ziwei-power Flask 应用 — 磁场管理打卡系统"""
import os
from datetime import timedelta
from functools import wraps
from flask import Flask, render_template, request, jsonify, session, redirect, url_for
from werkzeug.security import generate_password_hash, check_password_hash
from database import init_db, get_checkin, save_checkin, delete_checkin, get_all_checkins, get_wishes, save_wish, update_wish, delete_wish, reorder_wishes
app = Flask(__name__)
# 固定密钥确保 gunicorn 多 worker 下 session 可互通
app.secret_key = os.environ.get('SECRET_KEY', 'ziwei-power-secret-2026')
# 会话持久化30 天
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=30)
# ── 用户库 ────────────────────────────────────────────
USERS = {
'qiukai': {
'password_hash': generate_password_hash('AiAlex2018$'),
'name': '凯哥'
}
}
# ── 登录检查装饰器 ────────────────────────────────────
def login_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not session.get('logged_in'):
return redirect(url_for('login_page', next=request.path))
return f(*args, **kwargs)
return decorated
# ── 登录路由 ──────────────────────────────────────────
@app.route('/login', methods=['GET', 'POST'])
def login_page():
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
remember = request.form.get('remember') == 'on'
user = USERS.get(username)
if user and check_password_hash(user['password_hash'], password):
session.permanent = remember
session['logged_in'] = True
session['username'] = username
session['display_name'] = user['name']
next_url = request.args.get('next', '/')
return jsonify({'ok': True, 'next': next_url})
else:
return jsonify({'ok': False, 'error': '账号或密码错误'}), 401
# GET如果已登录直接跳转
if session.get('logged_in'):
return redirect(url_for('index'))
return render_template('login.html')
@app.route('/logout')
def logout():
session.clear()
return redirect(url_for('login_page'))
# ── 主页 ──────────────────────────────────────────────
def compute_stats():
"""计算统计数据,供 API 和模板共用"""
rows = get_all_checkins()
total_days = len(rows)
total_morning = 0
total_evening = 0
total_study = 0
calendar = {}
all_morning_items = []
all_evening_items = []
all_study_items = []
# 关键词 → 板块 自动归类规则
PILLAR_KEYWORDS = {
'医药营销': ['科普','科研','学术会议','学术','会议','患者管理','临床推广','临床','药企','处方','KOL','代表','拜访','荣昌','恒瑞','信达','鲁银','OPC','项目','标杆','客户'],
'医疗服务': ['诊疗','治疗','医生','医院','科室','手术','门诊','患者','','护理','诊断','影像','检验'],
'医疗支付': ['医保','商保','理赔','支付','保险','费用','报销','商业保险','保单'],
'AI 智能': ['AI','数字人','智能','算法','自动化','平台','系统','网站','开发','代码','程序','模型'],
'公司治理': ['团队','组织','管理','招聘','HR','制度','流程','财务','考核','KPI','规划','人才','架构','预算'],
'个人修养': ['早起','睡觉','俯卧撑','运动','冥想','阅读','复盘','写作','习惯','修身','立志','责善','改过','勤学'],
}
def classify_auto(text):
"""根据文本内容自动归类"""
if not text or not text.strip():
return ''
text_lower = text.strip()
for pillar, keywords in PILLAR_KEYWORDS.items():
for kw in keywords:
if kw in text_lower:
return pillar
return '' # 未匹配
for row in rows:
d = row['date']
data = row['data']
morning = data.get('morning', [])
evening = data.get('evening', [])
study = data.get('study', [])
# 收集并自动归类
for mi in morning:
text = mi if isinstance(mi, str) else mi.get('text', '')
if isinstance(text, str) and text.strip():
auto_pillar = mi.get('pillar', '') if isinstance(mi, dict) else ''
if not auto_pillar:
auto_pillar = classify_auto(text)
all_morning_items.append({'text': text.strip(), 'pillar': auto_pillar})
for ei in evening:
if isinstance(ei, dict):
m = (ei.get('mistake') or '').strip()
imp = (ei.get('improvement') or '').strip()
if m or imp:
pillar = ei.get('pillar', '') or classify_auto(m + ' ' + imp)
all_evening_items.append({'mistake': m, 'improvement': imp, 'pillar': pillar})
elif isinstance(ei, str) and ei.strip():
all_evening_items.append({'mistake': ei.strip(), 'improvement': '', 'pillar': classify_auto(ei)})
for si in study:
name = si.get('name', '') if isinstance(si, dict) else str(si)
if name.strip():
pillar = si.get('pillar', '') if isinstance(si, dict) else ''
if not pillar:
pillar = classify_auto(name)
all_study_items.append({
'name': name.strip(),
'done': si.get('done', False) if isinstance(si, dict) else False,
'note': si.get('note', '') if isinstance(si, dict) else '',
'pillar': pillar
})
morning_count = sum(1 for x in morning if (
isinstance(x, str) and x.strip() or
isinstance(x, dict) and (x.get('text', '') or '').strip()
))
evening_count = sum(1 for x in evening if (
isinstance(x, str) and x.strip() or
isinstance(x, dict) and (x.get('mistake', '') or '').strip()
))
study_count = sum(1 for x in study if x.get('done'))
total_morning += morning_count
total_evening += evening_count
total_study += study_count
# 新评分3 项都有 → 达标 60 分 + 额外加分
all_three = morning_count > 0 and evening_count > 0 and study_count > 0
if all_three:
extra = (morning_count - 1) + (evening_count - 1) + (study_count - 1)
# day_score = 60 + extra * 5 (不在此处使用,仅用于日历状态)
calendar[d] = 'pass' if all_three else 'fail'
# 构建板块统计
pillars = ['医疗服务','医药营销','医疗支付','AI 智能','公司治理','个人修养']
pillar_breakdown = {}
for p in pillars:
pillar_breakdown[p] = {
'morning': 0, 'evening': 0, 'study': 0,
'morning_items': [], 'evening_items': [], 'study_items': []
}
for mi in all_morning_items:
p = mi['pillar']
if p in pillar_breakdown:
pillar_breakdown[p]['morning'] += 1
pillar_breakdown[p]['morning_items'].append(mi)
for ei in all_evening_items:
p = ei['pillar']
if p in pillar_breakdown:
pillar_breakdown[p]['evening'] += 1
pillar_breakdown[p]['evening_items'].append(ei)
for si in all_study_items:
p = si['pillar']
if p in pillar_breakdown:
pillar_breakdown[p]['study'] += 1
pillar_breakdown[p]['study_items'].append(si)
return dict(
total_days=total_days, total_morning=total_morning,
total_evening=total_evening, total_study=total_study,
calendar=calendar,
morning_items=all_morning_items,
evening_items=all_evening_items,
study_items=all_study_items,
pillar_breakdown=pillar_breakdown
)
@app.route('/')
@login_required
def index():
import json
stats = compute_stats()
wishes = [dict(w) for w in get_wishes()]
return render_template('index.html',
username=session.get('display_name', session.get('username', '')),
initial_stats=json.dumps(stats, ensure_ascii=False),
initial_wishes=json.dumps(wishes, ensure_ascii=False))
# ── API ──────────────────────────────────────────────
@app.route('/api/checkin', methods=['GET'])
@login_required
def api_get_checkin():
date = request.args.get('date', '')
if not date:
return jsonify({'ok': False, 'error': '缺少 date 参数'}), 400
row = get_checkin(date)
return jsonify({'ok': True, 'data': row})
@app.route('/api/checkin', methods=['POST'])
@login_required
def api_save_checkin():
body = request.get_json(force=True)
date = body.get('date', '')
if not date:
return jsonify({'ok': False, 'error': '缺少 date 字段'}), 400
data = body.get('data', {})
save_checkin(date, data)
return jsonify({'ok': True})
@app.route('/api/checkin/<date>', methods=['DELETE'])
@login_required
def api_delete_checkin(date):
delete_checkin(date)
return jsonify({'ok': True})
@app.route('/api/history', methods=['GET'])
@login_required
def api_history():
rows = get_all_checkins()
return jsonify({'ok': True, 'data': rows})
@app.route('/api/stats', methods=['GET'])
@login_required
def api_stats():
stats = compute_stats()
return jsonify({'ok': True, **stats})
@app.route('/api/user', methods=['GET'])
@login_required
def api_user():
return jsonify({
'ok': True,
'username': session.get('username'),
'display_name': session.get('display_name')
})
# ── 心愿清单 API ──────────────────────────────
@app.route('/api/wishes', methods=['GET'])
@login_required
def api_get_wishes():
wishes = [dict(w) for w in get_wishes()]
return jsonify({'ok': True, 'data': wishes})
@app.route('/api/wishes', methods=['POST'])
@login_required
def api_create_wish():
body = request.get_json(force=True)
name = body.get('name', '').strip()
if not name:
return jsonify({'ok': False, 'error': '名称不能为空'}), 400
quadrant = body.get('quadrant', '重要不紧急')
deadline = body.get('deadline', '')
wid = save_wish(name, quadrant, deadline)
return jsonify({'ok': True, 'id': wid})
@app.route('/api/wishes/<int:wish_id>', methods=['PUT'])
@login_required
def api_update_wish(wish_id):
body = request.get_json(force=True)
update_wish(wish_id, **body)
return jsonify({'ok': True})
@app.route('/api/wishes/<int:wish_id>', methods=['DELETE'])
@login_required
def api_delete_wish(wish_id):
delete_wish(wish_id)
return jsonify({'ok': True})
@app.route('/api/wishes/reorder', methods=['PUT'])
@login_required
def api_reorder_wishes():
body = request.get_json(force=True)
order = body.get('order', [])
if not isinstance(order, list):
return jsonify({'ok': False, 'error': 'order 必须是列表'}), 400
reorder_wishes(order)
return jsonify({'ok': True})
# ── 日历同步 API ────────────────────────────
CALENDAR_CACHE = os.path.join(os.path.expanduser('~'), '.workbuddy', 'data', 'ziwei-power', 'calendar_cache.json')
DING_MCP_URL = 'https://mcp-gw.dingtalk.com/server/95959163f85d8b58a167f65cd8bd3d22690b16c75ebacd0fa095016396a10e0d?key=0993e4d4e44c50ea25a0db840cc5815e'
def _fetch_dingtalk_events(date_str):
"""调用钉钉 MCP 实时查询指定日期的日程"""
import json as _json, urllib.request as _ur
from datetime import datetime, timezone, timedelta
tz = timezone(timedelta(hours=8))
try:
d = datetime.strptime(date_str, '%Y-%m-%d').replace(tzinfo=tz)
except ValueError:
return []
start_ts = int(d.replace(hour=0, minute=0, second=0).timestamp() * 1000)
end_ts = int(d.replace(hour=23, minute=59, second=59).timestamp() * 1000)
body = _json.dumps({
'jsonrpc': '2.0', 'method': 'tools/call', 'id': 1,
'params': {'name': 'list_calendar_events', 'arguments': {
'calendarId': 'primary', 'startTime': start_ts, 'endTime': end_ts, 'limit': 20
}}
}).encode('utf-8')
req = _ur.Request(DING_MCP_URL, data=body, headers={
'Content-Type': 'application/json', 'Accept': 'application/json'
})
try:
with _ur.urlopen(req, timeout=10) as resp:
data = _json.loads(resp.read())
events = data.get('result', {}).get('structuredContent', {}).get('result', {}).get('events', [])
except Exception:
return None # 网络错误返回 None调用方回退缓存
results = []
for e in events:
summary = (e.get('summary') or '').strip()
if not summary:
continue
start = e.get('start', {}).get('dateTime', '')
end = e.get('end', {}).get('dateTime', '')
time_str = ''
if start and end:
time_str = start[11:16] + '-' + end[11:16]
results.append({
'date': date_str, 'summary': summary,
'time': time_str, 'location': e.get('location') or ''
})
# 写入缓存
try:
all_cached = []
try:
with open(CALENDAR_CACHE, 'r') as f:
all_cached = _json.load(f)
except (FileNotFoundError, _json.JSONDecodeError):
pass
# 替换同一天的旧缓存
all_cached = [c for c in all_cached if c.get('date') != date_str] + results
with open(CALENDAR_CACHE, 'w') as f:
_json.dump(all_cached, f, ensure_ascii=False, indent=2)
except Exception:
pass
return results
@app.route('/api/calendar-sync', methods=['GET'])
@login_required
def api_calendar_sync():
import json as _json
date = request.args.get('date', '')
if not date:
return jsonify({'ok': False, 'error': '缺少 date 参数'}), 400
# 优先实时查询钉钉 MCP
events = _fetch_dingtalk_events(date)
if events is None:
# MCP 不可用,回退缓存
try:
with open(CALENDAR_CACHE, 'r') as f:
all_cached = _json.load(f)
events = [e for e in all_cached if e.get('date') == date]
except (FileNotFoundError, _json.JSONDecodeError):
events = []
return jsonify({'ok': True, 'data': events})
@app.route('/api/calendar-sync-all', methods=['GET'])
@login_required
def api_calendar_sync_all():
"""批量查询过去15天~未来15天的钉钉日程按日期分组返回并自动填充到对应日期的 checkin"""
import json as _json
from datetime import datetime, timedelta
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
results = []
saved_days = 0
for delta in range(-15, 16):
d = today + timedelta(days=delta)
ds = d.strftime('%Y-%m-%d')
events = _fetch_dingtalk_events(ds)
if events:
results.append({'date': ds, 'events': events})
# 自动去重填充到对应日期的 checkin
existing = get_checkin(ds)
data = existing['data'] if existing else {'morning': [], 'evening': [], 'study': []}
morning = data.get('morning', [])
existing_texts = set()
for mi in morning:
t = mi if isinstance(mi, str) else mi.get('text', '')
if t.strip():
existing_texts.add(t.strip())
added = False
for evt in events:
summary = (evt.get('summary') or '').strip()
if not summary:
continue
text = f"{evt['time']}{summary}" if evt.get('time') else summary
loc = evt.get('location', '')
if loc:
text += f" @{loc}"
text = text.strip()
if text not in existing_texts:
morning.append(text)
existing_texts.add(text)
added = True
if added:
data['morning'] = morning
save_checkin(ds, data)
saved_days += 1
return jsonify({'ok': True, 'data': results, 'saved_days': saved_days})
# ── 启动 ──────────────────────────────────────────────
if __name__ == '__main__':
init_db()
app.config['TEMPLATES_AUTO_RELOAD'] = True
port = int(os.environ.get('PORT', 5058))
app.run(host='0.0.0.0', port=port, debug=False)