Files
ziwei-power/app.py
mac f015b72ad8 v1.3.1 — 日历同步改为实时查询钉钉 MCP
- 后端直接调用钉钉 MCP Gateway (JSON-RPC)
- 返回结果同时写入缓存,MCP 不可用时回退缓存
- 无需定时任务,点击即查
2026-06-04 12:46:00 +08:00

318 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""ziwei-power Flask 应用 — 磁场管理打卡系统"""
import os
from datetime import timedelta
from functools import wraps
from flask import Flask, render_template, request, jsonify, session, redirect, url_for
from werkzeug.security import generate_password_hash, check_password_hash
from database import init_db, get_checkin, save_checkin, delete_checkin, get_all_checkins, get_wishes, save_wish, update_wish, delete_wish, reorder_wishes
app = Flask(__name__)
# 固定密钥确保 gunicorn 多 worker 下 session 可互通
app.secret_key = os.environ.get('SECRET_KEY', 'ziwei-power-secret-2026')
# 会话持久化30 天
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=30)
# ── 用户库 ────────────────────────────────────────────
USERS = {
'qiukai': {
'password_hash': generate_password_hash('AiAlex2018$'),
'name': '凯哥'
}
}
# ── 登录检查装饰器 ────────────────────────────────────
def login_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not session.get('logged_in'):
return redirect(url_for('login_page', next=request.path))
return f(*args, **kwargs)
return decorated
# ── 登录路由 ──────────────────────────────────────────
@app.route('/login', methods=['GET', 'POST'])
def login_page():
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
remember = request.form.get('remember') == 'on'
user = USERS.get(username)
if user and check_password_hash(user['password_hash'], password):
session.permanent = remember
session['logged_in'] = True
session['username'] = username
session['display_name'] = user['name']
next_url = request.args.get('next', '/')
return jsonify({'ok': True, 'next': next_url})
else:
return jsonify({'ok': False, 'error': '账号或密码错误'}), 401
# GET如果已登录直接跳转
if session.get('logged_in'):
return redirect(url_for('index'))
return render_template('login.html')
@app.route('/logout')
def logout():
session.clear()
return redirect(url_for('login_page'))
# ── 主页 ──────────────────────────────────────────────
def compute_stats():
"""计算统计数据,供 API 和模板共用"""
rows = get_all_checkins()
total_days = len(rows)
total_morning = 0
total_evening = 0
total_study = 0
calendar = {}
for row in rows:
d = row['date']
data = row['data']
morning = data.get('morning', [])
evening = data.get('evening', [])
study = data.get('study', [])
morning_count = sum(1 for x in morning if isinstance(x, str) and x.strip())
evening_count = sum(1 for x in evening if (
isinstance(x, str) and x.strip() or
isinstance(x, dict) and (x.get('mistake', '') or '').strip()
))
study_count = sum(1 for x in study if x.get('done'))
total_morning += morning_count
total_evening += evening_count
total_study += study_count
# 新评分3 项都有 → 达标 60 分 + 额外加分
all_three = morning_count > 0 and evening_count > 0 and study_count > 0
if all_three:
extra = (morning_count - 1) + (evening_count - 1) + (study_count - 1)
# day_score = 60 + extra * 5 (不在此处使用,仅用于日历状态)
calendar[d] = 'pass' if all_three else 'fail'
return dict(
total_days=total_days, total_morning=total_morning,
total_evening=total_evening, total_study=total_study,
calendar=calendar
)
@app.route('/')
@login_required
def index():
import json
stats = compute_stats()
wishes = [dict(w) for w in get_wishes()]
return render_template('index.html',
username=session.get('display_name', session.get('username', '')),
initial_stats=json.dumps(stats, ensure_ascii=False),
initial_wishes=json.dumps(wishes, ensure_ascii=False))
# ── API ──────────────────────────────────────────────
@app.route('/api/checkin', methods=['GET'])
@login_required
def api_get_checkin():
date = request.args.get('date', '')
if not date:
return jsonify({'ok': False, 'error': '缺少 date 参数'}), 400
row = get_checkin(date)
return jsonify({'ok': True, 'data': row})
@app.route('/api/checkin', methods=['POST'])
@login_required
def api_save_checkin():
body = request.get_json(force=True)
date = body.get('date', '')
if not date:
return jsonify({'ok': False, 'error': '缺少 date 字段'}), 400
data = body.get('data', {})
save_checkin(date, data)
return jsonify({'ok': True})
@app.route('/api/checkin/<date>', methods=['DELETE'])
@login_required
def api_delete_checkin(date):
delete_checkin(date)
return jsonify({'ok': True})
@app.route('/api/history', methods=['GET'])
@login_required
def api_history():
rows = get_all_checkins()
return jsonify({'ok': True, 'data': rows})
@app.route('/api/stats', methods=['GET'])
@login_required
def api_stats():
stats = compute_stats()
return jsonify({'ok': True, **stats})
@app.route('/api/user', methods=['GET'])
@login_required
def api_user():
return jsonify({
'ok': True,
'username': session.get('username'),
'display_name': session.get('display_name')
})
# ── 心愿清单 API ──────────────────────────────
@app.route('/api/wishes', methods=['GET'])
@login_required
def api_get_wishes():
wishes = [dict(w) for w in get_wishes()]
return jsonify({'ok': True, 'data': wishes})
@app.route('/api/wishes', methods=['POST'])
@login_required
def api_create_wish():
body = request.get_json(force=True)
name = body.get('name', '').strip()
if not name:
return jsonify({'ok': False, 'error': '名称不能为空'}), 400
quadrant = body.get('quadrant', '重要不紧急')
deadline = body.get('deadline', '')
wid = save_wish(name, quadrant, deadline)
return jsonify({'ok': True, 'id': wid})
@app.route('/api/wishes/<int:wish_id>', methods=['PUT'])
@login_required
def api_update_wish(wish_id):
body = request.get_json(force=True)
update_wish(wish_id, **body)
return jsonify({'ok': True})
@app.route('/api/wishes/<int:wish_id>', methods=['DELETE'])
@login_required
def api_delete_wish(wish_id):
delete_wish(wish_id)
return jsonify({'ok': True})
@app.route('/api/wishes/reorder', methods=['PUT'])
@login_required
def api_reorder_wishes():
body = request.get_json(force=True)
order = body.get('order', [])
if not isinstance(order, list):
return jsonify({'ok': False, 'error': 'order 必须是列表'}), 400
reorder_wishes(order)
return jsonify({'ok': True})
# ── 日历同步 API ────────────────────────────
CALENDAR_CACHE = os.path.join(os.path.expanduser('~'), '.workbuddy', 'data', 'ziwei-power', 'calendar_cache.json')
DING_MCP_URL = 'https://mcp-gw.dingtalk.com/server/95959163f85d8b58a167f65cd8bd3d22690b16c75ebacd0fa095016396a10e0d?key=0993e4d4e44c50ea25a0db840cc5815e'
def _fetch_dingtalk_events(date_str):
"""调用钉钉 MCP 实时查询指定日期的日程"""
import json as _json, urllib.request as _ur
from datetime import datetime, timezone, timedelta
tz = timezone(timedelta(hours=8))
try:
d = datetime.strptime(date_str, '%Y-%m-%d').replace(tzinfo=tz)
except ValueError:
return []
start_ts = int(d.replace(hour=0, minute=0, second=0).timestamp() * 1000)
end_ts = int(d.replace(hour=23, minute=59, second=59).timestamp() * 1000)
body = _json.dumps({
'jsonrpc': '2.0', 'method': 'tools/call', 'id': 1,
'params': {'name': 'list_calendar_events', 'arguments': {
'calendarId': 'primary', 'startTime': start_ts, 'endTime': end_ts, 'limit': 20
}}
}).encode('utf-8')
req = _ur.Request(DING_MCP_URL, data=body, headers={
'Content-Type': 'application/json', 'Accept': 'application/json'
})
try:
with _ur.urlopen(req, timeout=10) as resp:
data = _json.loads(resp.read())
events = data.get('result', {}).get('structuredContent', {}).get('result', {}).get('events', [])
except Exception:
return None # 网络错误返回 None调用方回退缓存
results = []
for e in events:
summary = (e.get('summary') or '').strip()
if not summary:
continue
start = e.get('start', {}).get('dateTime', '')
end = e.get('end', {}).get('dateTime', '')
time_str = ''
if start and end:
time_str = start[11:16] + '-' + end[11:16]
results.append({
'date': date_str, 'summary': summary,
'time': time_str, 'location': e.get('location') or ''
})
# 写入缓存
try:
all_cached = []
try:
with open(CALENDAR_CACHE, 'r') as f:
all_cached = _json.load(f)
except (FileNotFoundError, _json.JSONDecodeError):
pass
# 替换同一天的旧缓存
all_cached = [c for c in all_cached if c.get('date') != date_str] + results
with open(CALENDAR_CACHE, 'w') as f:
_json.dump(all_cached, f, ensure_ascii=False, indent=2)
except Exception:
pass
return results
@app.route('/api/calendar-sync', methods=['GET'])
@login_required
def api_calendar_sync():
import json as _json
date = request.args.get('date', '')
if not date:
return jsonify({'ok': False, 'error': '缺少 date 参数'}), 400
# 优先实时查询钉钉 MCP
events = _fetch_dingtalk_events(date)
if events is None:
# MCP 不可用,回退缓存
try:
with open(CALENDAR_CACHE, 'r') as f:
all_cached = _json.load(f)
events = [e for e in all_cached if e.get('date') == date]
except (FileNotFoundError, _json.JSONDecodeError):
events = []
return jsonify({'ok': True, 'data': events})
# ── 启动 ──────────────────────────────────────────────
if __name__ == '__main__':
init_db()
app.config['TEMPLATES_AUTO_RELOAD'] = True
port = int(os.environ.get('PORT', 5058))
app.run(host='0.0.0.0', port=port, debug=False)