# -*- coding: utf-8 -*- """ziwei-power Flask 应用 — 磁场管理打卡系统""" import os from datetime import timedelta from functools import wraps from flask import Flask, render_template, request, jsonify, session, redirect, url_for from werkzeug.security import generate_password_hash, check_password_hash from database import init_db, get_checkin, save_checkin, delete_checkin, get_all_checkins, get_wishes, save_wish, update_wish, delete_wish, reorder_wishes app = Flask(__name__) # 固定密钥确保 gunicorn 多 worker 下 session 可互通 app.secret_key = os.environ.get('SECRET_KEY', 'ziwei-power-secret-2026') # 会话持久化:30 天 app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=30) # ── 用户库 ──────────────────────────────────────────── USERS = { 'qiukai': { 'password_hash': generate_password_hash('AiAlex2018$'), 'name': '凯哥' } } # ── 登录检查装饰器 ──────────────────────────────────── def login_required(f): @wraps(f) def decorated(*args, **kwargs): if not session.get('logged_in'): return redirect(url_for('login_page', next=request.path)) return f(*args, **kwargs) return decorated # ── 登录路由 ────────────────────────────────────────── @app.route('/login', methods=['GET', 'POST']) def login_page(): if request.method == 'POST': username = request.form.get('username', '').strip() password = request.form.get('password', '') remember = request.form.get('remember') == 'on' user = USERS.get(username) if user and check_password_hash(user['password_hash'], password): session.permanent = remember session['logged_in'] = True session['username'] = username session['display_name'] = user['name'] next_url = request.args.get('next', '/') return jsonify({'ok': True, 'next': next_url}) else: return jsonify({'ok': False, 'error': '账号或密码错误'}), 401 # GET:如果已登录直接跳转 if session.get('logged_in'): return redirect(url_for('index')) return render_template('login.html') @app.route('/logout') def logout(): session.clear() return redirect(url_for('login_page')) # ── 主页 ────────────────────────────────────────────── def compute_stats(): """计算统计数据,供 API 和模板共用""" rows = get_all_checkins() total_days = len(rows) total_morning = 0 total_evening = 0 total_study = 0 calendar = {} all_morning_items = [] all_evening_items = [] all_study_items = [] # 关键词 → 板块 自动归类规则 PILLAR_KEYWORDS = { '医药营销': ['科普','科研','学术会议','学术','会议','患者管理','临床推广','临床','药企','处方','KOL','代表','拜访','荣昌','恒瑞','信达','鲁银','OPC','项目','标杆','客户'], '医疗服务': ['诊疗','治疗','医生','医院','科室','手术','门诊','患者','病','护理','诊断','影像','检验'], '医疗支付': ['医保','商保','理赔','支付','保险','费用','报销','商业保险','保单'], 'AI 智能': ['AI','数字人','智能','算法','自动化','平台','系统','网站','开发','代码','程序','模型'], '公司治理': ['团队','组织','管理','招聘','HR','制度','流程','财务','考核','KPI','规划','人才','架构','预算'], '个人修养': ['早起','睡觉','俯卧撑','运动','冥想','阅读','复盘','写作','习惯','修身','立志','责善','改过','勤学'], } def classify_auto(text): """根据文本内容自动归类""" if not text or not text.strip(): return '' text_lower = text.strip() for pillar, keywords in PILLAR_KEYWORDS.items(): for kw in keywords: if kw in text_lower: return pillar return '' # 未匹配 for row in rows: d = row['date'] data = row['data'] morning = data.get('morning', []) evening = data.get('evening', []) study = data.get('study', []) # 收集并自动归类 for mi in morning: text = mi if isinstance(mi, str) else mi.get('text', '') if isinstance(text, str) and text.strip(): auto_pillar = mi.get('pillar', '') if isinstance(mi, dict) else '' if not auto_pillar: auto_pillar = classify_auto(text) all_morning_items.append({'text': text.strip(), 'pillar': auto_pillar}) for ei in evening: if isinstance(ei, dict): m = (ei.get('mistake') or '').strip() imp = (ei.get('improvement') or '').strip() if m or imp: pillar = ei.get('pillar', '') or classify_auto(m + ' ' + imp) all_evening_items.append({'mistake': m, 'improvement': imp, 'pillar': pillar}) elif isinstance(ei, str) and ei.strip(): all_evening_items.append({'mistake': ei.strip(), 'improvement': '', 'pillar': classify_auto(ei)}) for si in study: name = si.get('name', '') if isinstance(si, dict) else str(si) if name.strip(): pillar = si.get('pillar', '') if isinstance(si, dict) else '' if not pillar: pillar = classify_auto(name) all_study_items.append({ 'name': name.strip(), 'done': si.get('done', False) if isinstance(si, dict) else False, 'note': si.get('note', '') if isinstance(si, dict) else '', 'pillar': pillar }) morning_count = sum(1 for x in morning if ( isinstance(x, str) and x.strip() or isinstance(x, dict) and (x.get('text', '') or '').strip() )) evening_count = sum(1 for x in evening if ( isinstance(x, str) and x.strip() or isinstance(x, dict) and (x.get('mistake', '') or '').strip() )) study_count = sum(1 for x in study if x.get('done')) total_morning += morning_count total_evening += evening_count total_study += study_count # 新评分:3 项都有 → 达标 60 分 + 额外加分 all_three = morning_count > 0 and evening_count > 0 and study_count > 0 if all_three: extra = (morning_count - 1) + (evening_count - 1) + (study_count - 1) # day_score = 60 + extra * 5 (不在此处使用,仅用于日历状态) calendar[d] = 'pass' if all_three else 'fail' # 构建板块统计 pillars = ['医疗服务','医药营销','医疗支付','AI 智能','公司治理','个人修养'] pillar_breakdown = {} for p in pillars: pillar_breakdown[p] = { 'morning': 0, 'evening': 0, 'study': 0, 'morning_items': [], 'evening_items': [], 'study_items': [] } for mi in all_morning_items: p = mi['pillar'] if p in pillar_breakdown: pillar_breakdown[p]['morning'] += 1 pillar_breakdown[p]['morning_items'].append(mi) for ei in all_evening_items: p = ei['pillar'] if p in pillar_breakdown: pillar_breakdown[p]['evening'] += 1 pillar_breakdown[p]['evening_items'].append(ei) for si in all_study_items: p = si['pillar'] if p in pillar_breakdown: pillar_breakdown[p]['study'] += 1 pillar_breakdown[p]['study_items'].append(si) return dict( total_days=total_days, total_morning=total_morning, total_evening=total_evening, total_study=total_study, calendar=calendar, morning_items=all_morning_items, evening_items=all_evening_items, study_items=all_study_items, pillar_breakdown=pillar_breakdown ) @app.route('/') @login_required def index(): import json stats = compute_stats() wishes = [dict(w) for w in get_wishes()] return render_template('index.html', username=session.get('display_name', session.get('username', '')), initial_stats=json.dumps(stats, ensure_ascii=False), initial_wishes=json.dumps(wishes, ensure_ascii=False)) # ── API ────────────────────────────────────────────── @app.route('/api/checkin', methods=['GET']) @login_required def api_get_checkin(): date = request.args.get('date', '') if not date: return jsonify({'ok': False, 'error': '缺少 date 参数'}), 400 row = get_checkin(date) return jsonify({'ok': True, 'data': row}) @app.route('/api/checkin', methods=['POST']) @login_required def api_save_checkin(): body = request.get_json(force=True) date = body.get('date', '') if not date: return jsonify({'ok': False, 'error': '缺少 date 字段'}), 400 data = body.get('data', {}) save_checkin(date, data) return jsonify({'ok': True}) @app.route('/api/checkin/', methods=['DELETE']) @login_required def api_delete_checkin(date): delete_checkin(date) return jsonify({'ok': True}) @app.route('/api/history', methods=['GET']) @login_required def api_history(): rows = get_all_checkins() return jsonify({'ok': True, 'data': rows}) @app.route('/api/stats', methods=['GET']) @login_required def api_stats(): stats = compute_stats() return jsonify({'ok': True, **stats}) @app.route('/api/user', methods=['GET']) @login_required def api_user(): return jsonify({ 'ok': True, 'username': session.get('username'), 'display_name': session.get('display_name') }) # ── 心愿清单 API ────────────────────────────── @app.route('/api/wishes', methods=['GET']) @login_required def api_get_wishes(): wishes = [dict(w) for w in get_wishes()] return jsonify({'ok': True, 'data': wishes}) @app.route('/api/wishes', methods=['POST']) @login_required def api_create_wish(): body = request.get_json(force=True) name = body.get('name', '').strip() if not name: return jsonify({'ok': False, 'error': '名称不能为空'}), 400 quadrant = body.get('quadrant', '重要不紧急') deadline = body.get('deadline', '') wid = save_wish(name, quadrant, deadline) return jsonify({'ok': True, 'id': wid}) @app.route('/api/wishes/', methods=['PUT']) @login_required def api_update_wish(wish_id): body = request.get_json(force=True) update_wish(wish_id, **body) return jsonify({'ok': True}) @app.route('/api/wishes/', methods=['DELETE']) @login_required def api_delete_wish(wish_id): delete_wish(wish_id) return jsonify({'ok': True}) @app.route('/api/wishes/reorder', methods=['PUT']) @login_required def api_reorder_wishes(): body = request.get_json(force=True) order = body.get('order', []) if not isinstance(order, list): return jsonify({'ok': False, 'error': 'order 必须是列表'}), 400 reorder_wishes(order) return jsonify({'ok': True}) # ── 日历同步 API ──────────────────────────── CALENDAR_CACHE = os.path.join(os.path.expanduser('~'), '.workbuddy', 'data', 'ziwei-power', 'calendar_cache.json') DING_MCP_URL = 'https://mcp-gw.dingtalk.com/server/95959163f85d8b58a167f65cd8bd3d22690b16c75ebacd0fa095016396a10e0d?key=0993e4d4e44c50ea25a0db840cc5815e' def _fetch_dingtalk_events(date_str): """调用钉钉 MCP 实时查询指定日期的日程""" import json as _json, urllib.request as _ur from datetime import datetime, timezone, timedelta tz = timezone(timedelta(hours=8)) try: d = datetime.strptime(date_str, '%Y-%m-%d').replace(tzinfo=tz) except ValueError: return [] start_ts = int(d.replace(hour=0, minute=0, second=0).timestamp() * 1000) end_ts = int(d.replace(hour=23, minute=59, second=59).timestamp() * 1000) body = _json.dumps({ 'jsonrpc': '2.0', 'method': 'tools/call', 'id': 1, 'params': {'name': 'list_calendar_events', 'arguments': { 'calendarId': 'primary', 'startTime': start_ts, 'endTime': end_ts, 'limit': 20 }} }).encode('utf-8') req = _ur.Request(DING_MCP_URL, data=body, headers={ 'Content-Type': 'application/json', 'Accept': 'application/json' }) try: with _ur.urlopen(req, timeout=10) as resp: data = _json.loads(resp.read()) events = data.get('result', {}).get('structuredContent', {}).get('result', {}).get('events', []) except Exception: return None # 网络错误返回 None,调用方回退缓存 results = [] for e in events: summary = (e.get('summary') or '').strip() if not summary: continue start = e.get('start', {}).get('dateTime', '') end = e.get('end', {}).get('dateTime', '') time_str = '' if start and end: time_str = start[11:16] + '-' + end[11:16] results.append({ 'date': date_str, 'summary': summary, 'time': time_str, 'location': e.get('location') or '' }) # 写入缓存 try: all_cached = [] try: with open(CALENDAR_CACHE, 'r') as f: all_cached = _json.load(f) except (FileNotFoundError, _json.JSONDecodeError): pass # 替换同一天的旧缓存 all_cached = [c for c in all_cached if c.get('date') != date_str] + results with open(CALENDAR_CACHE, 'w') as f: _json.dump(all_cached, f, ensure_ascii=False, indent=2) except Exception: pass return results @app.route('/api/calendar-sync', methods=['GET']) @login_required def api_calendar_sync(): import json as _json date = request.args.get('date', '') if not date: return jsonify({'ok': False, 'error': '缺少 date 参数'}), 400 # 优先实时查询钉钉 MCP events = _fetch_dingtalk_events(date) if events is None: # MCP 不可用,回退缓存 try: with open(CALENDAR_CACHE, 'r') as f: all_cached = _json.load(f) events = [e for e in all_cached if e.get('date') == date] except (FileNotFoundError, _json.JSONDecodeError): events = [] return jsonify({'ok': True, 'data': events}) @app.route('/api/calendar-sync-all', methods=['GET']) @login_required def api_calendar_sync_all(): """批量查询过去15天~未来15天的钉钉日程,按日期分组返回,并自动填充到对应日期的 checkin""" import json as _json from datetime import datetime, timedelta today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) results = [] saved_days = 0 for delta in range(-15, 16): d = today + timedelta(days=delta) ds = d.strftime('%Y-%m-%d') events = _fetch_dingtalk_events(ds) if events: results.append({'date': ds, 'events': events}) # 自动去重填充到对应日期的 checkin existing = get_checkin(ds) data = existing['data'] if existing else {'morning': [], 'evening': [], 'study': []} morning = data.get('morning', []) existing_texts = set() for mi in morning: t = mi if isinstance(mi, str) else mi.get('text', '') if t.strip(): existing_texts.add(t.strip()) added = False for evt in events: summary = (evt.get('summary') or '').strip() if not summary: continue text = f"【{evt['time']}】{summary}" if evt.get('time') else summary loc = evt.get('location', '') if loc: text += f" @{loc}" text = text.strip() if text not in existing_texts: morning.append(text) existing_texts.add(text) added = True if added: data['morning'] = morning save_checkin(ds, data) saved_days += 1 return jsonify({'ok': True, 'data': results, 'saved_days': saved_days}) # ── 启动 ────────────────────────────────────────────── if __name__ == '__main__': init_db() app.config['TEMPLATES_AUTO_RELOAD'] = True port = int(os.environ.get('PORT', 5058)) app.run(host='0.0.0.0', port=port, debug=False)