Files
ziwei-power/app.py
mac 6e1b793b40 v1.4.0 — 蓝图六板块体系
- 立志项新增板块选择器(医疗服务/医药营销/医疗支付/AI智能/公司治理/个人修养)
- 数据格式从纯字符串升级为 {text, pillar},向后兼容旧数据
- 新增蓝图面板:展示使命 + 六板块卡片 + 各板块立志统计
- stats API 新增 morning_items 返回全部立志数据
2026-06-04 15:43:53 +08:00

347 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""ziwei-power Flask 应用 — 磁场管理打卡系统"""
import os
from datetime import timedelta
from functools import wraps
from flask import Flask, render_template, request, jsonify, session, redirect, url_for
from werkzeug.security import generate_password_hash, check_password_hash
from database import init_db, get_checkin, save_checkin, delete_checkin, get_all_checkins, get_wishes, save_wish, update_wish, delete_wish, reorder_wishes
app = Flask(__name__)
# 固定密钥确保 gunicorn 多 worker 下 session 可互通
app.secret_key = os.environ.get('SECRET_KEY', 'ziwei-power-secret-2026')
# 会话持久化30 天
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=30)
# ── 用户库 ────────────────────────────────────────────
USERS = {
'qiukai': {
'password_hash': generate_password_hash('AiAlex2018$'),
'name': '凯哥'
}
}
# ── 登录检查装饰器 ────────────────────────────────────
def login_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not session.get('logged_in'):
return redirect(url_for('login_page', next=request.path))
return f(*args, **kwargs)
return decorated
# ── 登录路由 ──────────────────────────────────────────
@app.route('/login', methods=['GET', 'POST'])
def login_page():
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
remember = request.form.get('remember') == 'on'
user = USERS.get(username)
if user and check_password_hash(user['password_hash'], password):
session.permanent = remember
session['logged_in'] = True
session['username'] = username
session['display_name'] = user['name']
next_url = request.args.get('next', '/')
return jsonify({'ok': True, 'next': next_url})
else:
return jsonify({'ok': False, 'error': '账号或密码错误'}), 401
# GET如果已登录直接跳转
if session.get('logged_in'):
return redirect(url_for('index'))
return render_template('login.html')
@app.route('/logout')
def logout():
session.clear()
return redirect(url_for('login_page'))
# ── 主页 ──────────────────────────────────────────────
def compute_stats():
"""计算统计数据,供 API 和模板共用"""
rows = get_all_checkins()
total_days = len(rows)
total_morning = 0
total_evening = 0
total_study = 0
calendar = {}
all_morning_items = [] # 收集所有立志项用于蓝图统计
for row in rows:
d = row['date']
data = row['data']
morning = data.get('morning', [])
evening = data.get('evening', [])
study = data.get('study', [])
# 收集 morning items
for mi in morning:
if isinstance(mi, dict) and (mi.get('text', '') or '').strip():
all_morning_items.append(mi)
elif isinstance(mi, str) and mi.strip():
all_morning_items.append({'text': mi, 'pillar': ''})
morning_count = sum(1 for x in morning if (
isinstance(x, str) and x.strip() or
isinstance(x, dict) and (x.get('text', '') or '').strip()
))
evening_count = sum(1 for x in evening if (
isinstance(x, str) and x.strip() or
isinstance(x, dict) and (x.get('mistake', '') or '').strip()
))
study_count = sum(1 for x in study if x.get('done'))
total_morning += morning_count
total_evening += evening_count
total_study += study_count
# 新评分3 项都有 → 达标 60 分 + 额外加分
all_three = morning_count > 0 and evening_count > 0 and study_count > 0
if all_three:
extra = (morning_count - 1) + (evening_count - 1) + (study_count - 1)
# day_score = 60 + extra * 5 (不在此处使用,仅用于日历状态)
calendar[d] = 'pass' if all_three else 'fail'
return dict(
total_days=total_days, total_morning=total_morning,
total_evening=total_evening, total_study=total_study,
calendar=calendar,
morning_items=all_morning_items
)
@app.route('/')
@login_required
def index():
import json
stats = compute_stats()
wishes = [dict(w) for w in get_wishes()]
return render_template('index.html',
username=session.get('display_name', session.get('username', '')),
initial_stats=json.dumps(stats, ensure_ascii=False),
initial_wishes=json.dumps(wishes, ensure_ascii=False))
# ── API ──────────────────────────────────────────────
@app.route('/api/checkin', methods=['GET'])
@login_required
def api_get_checkin():
date = request.args.get('date', '')
if not date:
return jsonify({'ok': False, 'error': '缺少 date 参数'}), 400
row = get_checkin(date)
return jsonify({'ok': True, 'data': row})
@app.route('/api/checkin', methods=['POST'])
@login_required
def api_save_checkin():
body = request.get_json(force=True)
date = body.get('date', '')
if not date:
return jsonify({'ok': False, 'error': '缺少 date 字段'}), 400
data = body.get('data', {})
save_checkin(date, data)
return jsonify({'ok': True})
@app.route('/api/checkin/<date>', methods=['DELETE'])
@login_required
def api_delete_checkin(date):
delete_checkin(date)
return jsonify({'ok': True})
@app.route('/api/history', methods=['GET'])
@login_required
def api_history():
rows = get_all_checkins()
return jsonify({'ok': True, 'data': rows})
@app.route('/api/stats', methods=['GET'])
@login_required
def api_stats():
stats = compute_stats()
return jsonify({'ok': True, **stats})
@app.route('/api/user', methods=['GET'])
@login_required
def api_user():
return jsonify({
'ok': True,
'username': session.get('username'),
'display_name': session.get('display_name')
})
# ── 心愿清单 API ──────────────────────────────
@app.route('/api/wishes', methods=['GET'])
@login_required
def api_get_wishes():
wishes = [dict(w) for w in get_wishes()]
return jsonify({'ok': True, 'data': wishes})
@app.route('/api/wishes', methods=['POST'])
@login_required
def api_create_wish():
body = request.get_json(force=True)
name = body.get('name', '').strip()
if not name:
return jsonify({'ok': False, 'error': '名称不能为空'}), 400
quadrant = body.get('quadrant', '重要不紧急')
deadline = body.get('deadline', '')
wid = save_wish(name, quadrant, deadline)
return jsonify({'ok': True, 'id': wid})
@app.route('/api/wishes/<int:wish_id>', methods=['PUT'])
@login_required
def api_update_wish(wish_id):
body = request.get_json(force=True)
update_wish(wish_id, **body)
return jsonify({'ok': True})
@app.route('/api/wishes/<int:wish_id>', methods=['DELETE'])
@login_required
def api_delete_wish(wish_id):
delete_wish(wish_id)
return jsonify({'ok': True})
@app.route('/api/wishes/reorder', methods=['PUT'])
@login_required
def api_reorder_wishes():
body = request.get_json(force=True)
order = body.get('order', [])
if not isinstance(order, list):
return jsonify({'ok': False, 'error': 'order 必须是列表'}), 400
reorder_wishes(order)
return jsonify({'ok': True})
# ── 日历同步 API ────────────────────────────
CALENDAR_CACHE = os.path.join(os.path.expanduser('~'), '.workbuddy', 'data', 'ziwei-power', 'calendar_cache.json')
DING_MCP_URL = 'https://mcp-gw.dingtalk.com/server/95959163f85d8b58a167f65cd8bd3d22690b16c75ebacd0fa095016396a10e0d?key=0993e4d4e44c50ea25a0db840cc5815e'
def _fetch_dingtalk_events(date_str):
"""调用钉钉 MCP 实时查询指定日期的日程"""
import json as _json, urllib.request as _ur
from datetime import datetime, timezone, timedelta
tz = timezone(timedelta(hours=8))
try:
d = datetime.strptime(date_str, '%Y-%m-%d').replace(tzinfo=tz)
except ValueError:
return []
start_ts = int(d.replace(hour=0, minute=0, second=0).timestamp() * 1000)
end_ts = int(d.replace(hour=23, minute=59, second=59).timestamp() * 1000)
body = _json.dumps({
'jsonrpc': '2.0', 'method': 'tools/call', 'id': 1,
'params': {'name': 'list_calendar_events', 'arguments': {
'calendarId': 'primary', 'startTime': start_ts, 'endTime': end_ts, 'limit': 20
}}
}).encode('utf-8')
req = _ur.Request(DING_MCP_URL, data=body, headers={
'Content-Type': 'application/json', 'Accept': 'application/json'
})
try:
with _ur.urlopen(req, timeout=10) as resp:
data = _json.loads(resp.read())
events = data.get('result', {}).get('structuredContent', {}).get('result', {}).get('events', [])
except Exception:
return None # 网络错误返回 None调用方回退缓存
results = []
for e in events:
summary = (e.get('summary') or '').strip()
if not summary:
continue
start = e.get('start', {}).get('dateTime', '')
end = e.get('end', {}).get('dateTime', '')
time_str = ''
if start and end:
time_str = start[11:16] + '-' + end[11:16]
results.append({
'date': date_str, 'summary': summary,
'time': time_str, 'location': e.get('location') or ''
})
# 写入缓存
try:
all_cached = []
try:
with open(CALENDAR_CACHE, 'r') as f:
all_cached = _json.load(f)
except (FileNotFoundError, _json.JSONDecodeError):
pass
# 替换同一天的旧缓存
all_cached = [c for c in all_cached if c.get('date') != date_str] + results
with open(CALENDAR_CACHE, 'w') as f:
_json.dump(all_cached, f, ensure_ascii=False, indent=2)
except Exception:
pass
return results
@app.route('/api/calendar-sync', methods=['GET'])
@login_required
def api_calendar_sync():
import json as _json
date = request.args.get('date', '')
if not date:
return jsonify({'ok': False, 'error': '缺少 date 参数'}), 400
# 优先实时查询钉钉 MCP
events = _fetch_dingtalk_events(date)
if events is None:
# MCP 不可用,回退缓存
try:
with open(CALENDAR_CACHE, 'r') as f:
all_cached = _json.load(f)
events = [e for e in all_cached if e.get('date') == date]
except (FileNotFoundError, _json.JSONDecodeError):
events = []
return jsonify({'ok': True, 'data': events})
@app.route('/api/calendar-sync-all', methods=['GET'])
@login_required
def api_calendar_sync_all():
"""批量查询过去15天~未来15天的钉钉日程按日期分组返回"""
import json as _json
from datetime import datetime, timedelta
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
results = []
for delta in range(-15, 16):
d = today + timedelta(days=delta)
ds = d.strftime('%Y-%m-%d')
events = _fetch_dingtalk_events(ds)
if events:
results.append({'date': ds, 'events': events})
return jsonify({'ok': True, 'data': results})
# ── 启动 ──────────────────────────────────────────────
if __name__ == '__main__':
init_db()
app.config['TEMPLATES_AUTO_RELOAD'] = True
port = int(os.environ.get('PORT', 5058))
app.run(host='0.0.0.0', port=port, debug=False)