# -*- coding: utf-8 -*- """ziwei-power Flask 应用 — 磁场管理打卡系统""" import os from datetime import timedelta from functools import wraps from flask import Flask, render_template, request, jsonify, session, redirect, url_for from werkzeug.security import generate_password_hash, check_password_hash from database import init_db, get_checkin, save_checkin, delete_checkin, get_all_checkins, get_wishes, save_wish, update_wish, delete_wish, reorder_wishes app = Flask(__name__) # 固定密钥确保 gunicorn 多 worker 下 session 可互通 app.secret_key = os.environ.get('SECRET_KEY', 'ziwei-power-secret-2026') # 会话持久化:30 天 app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=30) # ── 用户库 ──────────────────────────────────────────── USERS = { 'qiukai': { 'password_hash': generate_password_hash('AiAlex2018$'), 'name': '凯哥' } } # ── 登录检查装饰器 ──────────────────────────────────── def login_required(f): @wraps(f) def decorated(*args, **kwargs): if not session.get('logged_in'): return redirect(url_for('login_page', next=request.path)) return f(*args, **kwargs) return decorated # ── 登录路由 ────────────────────────────────────────── @app.route('/login', methods=['GET', 'POST']) def login_page(): if request.method == 'POST': username = request.form.get('username', '').strip() password = request.form.get('password', '') remember = request.form.get('remember') == 'on' user = USERS.get(username) if user and check_password_hash(user['password_hash'], password): session.permanent = remember session['logged_in'] = True session['username'] = username session['display_name'] = user['name'] next_url = request.args.get('next', '/') return jsonify({'ok': True, 'next': next_url}) else: return jsonify({'ok': False, 'error': '账号或密码错误'}), 401 # GET:如果已登录直接跳转 if session.get('logged_in'): return redirect(url_for('index')) return render_template('login.html') @app.route('/logout') def logout(): session.clear() return redirect(url_for('login_page')) # ── 主页 ────────────────────────────────────────────── def compute_stats(): """计算统计数据,供 API 和模板共用""" rows = get_all_checkins() total_days = len(rows) total_morning = 0 total_evening = 0 total_study = 0 calendar = {} for row in rows: d = row['date'] data = row['data'] morning = data.get('morning', []) evening = data.get('evening', []) study = data.get('study', []) morning_count = sum(1 for x in morning if isinstance(x, str) and x.strip()) evening_count = sum(1 for x in evening if ( isinstance(x, str) and x.strip() or isinstance(x, dict) and (x.get('mistake', '') or '').strip() )) study_count = sum(1 for x in study if x.get('done')) total_morning += morning_count total_evening += evening_count total_study += study_count # 新评分:3 项都有 → 达标 60 分 + 额外加分 all_three = morning_count > 0 and evening_count > 0 and study_count > 0 if all_three: extra = (morning_count - 1) + (evening_count - 1) + (study_count - 1) # day_score = 60 + extra * 5 (不在此处使用,仅用于日历状态) calendar[d] = 'pass' if all_three else 'fail' return dict( total_days=total_days, total_morning=total_morning, total_evening=total_evening, total_study=total_study, calendar=calendar ) @app.route('/') @login_required def index(): import json stats = compute_stats() wishes = [dict(w) for w in get_wishes()] return render_template('index.html', username=session.get('display_name', session.get('username', '')), initial_stats=json.dumps(stats, ensure_ascii=False), initial_wishes=json.dumps(wishes, ensure_ascii=False)) # ── API ────────────────────────────────────────────── @app.route('/api/checkin', methods=['GET']) @login_required def api_get_checkin(): date = request.args.get('date', '') if not date: return jsonify({'ok': False, 'error': '缺少 date 参数'}), 400 row = get_checkin(date) return jsonify({'ok': True, 'data': row}) @app.route('/api/checkin', methods=['POST']) @login_required def api_save_checkin(): body = request.get_json(force=True) date = body.get('date', '') if not date: return jsonify({'ok': False, 'error': '缺少 date 字段'}), 400 data = body.get('data', {}) save_checkin(date, data) return jsonify({'ok': True}) @app.route('/api/checkin/', methods=['DELETE']) @login_required def api_delete_checkin(date): delete_checkin(date) return jsonify({'ok': True}) @app.route('/api/history', methods=['GET']) @login_required def api_history(): rows = get_all_checkins() return jsonify({'ok': True, 'data': rows}) @app.route('/api/stats', methods=['GET']) @login_required def api_stats(): stats = compute_stats() return jsonify({'ok': True, **stats}) @app.route('/api/user', methods=['GET']) @login_required def api_user(): return jsonify({ 'ok': True, 'username': session.get('username'), 'display_name': session.get('display_name') }) # ── 心愿清单 API ────────────────────────────── @app.route('/api/wishes', methods=['GET']) @login_required def api_get_wishes(): wishes = [dict(w) for w in get_wishes()] return jsonify({'ok': True, 'data': wishes}) @app.route('/api/wishes', methods=['POST']) @login_required def api_create_wish(): body = request.get_json(force=True) name = body.get('name', '').strip() if not name: return jsonify({'ok': False, 'error': '名称不能为空'}), 400 quadrant = body.get('quadrant', '重要不紧急') deadline = body.get('deadline', '') wid = save_wish(name, quadrant, deadline) return jsonify({'ok': True, 'id': wid}) @app.route('/api/wishes/', methods=['PUT']) @login_required def api_update_wish(wish_id): body = request.get_json(force=True) update_wish(wish_id, **body) return jsonify({'ok': True}) @app.route('/api/wishes/', methods=['DELETE']) @login_required def api_delete_wish(wish_id): delete_wish(wish_id) return jsonify({'ok': True}) @app.route('/api/wishes/reorder', methods=['PUT']) @login_required def api_reorder_wishes(): body = request.get_json(force=True) order = body.get('order', []) if not isinstance(order, list): return jsonify({'ok': False, 'error': 'order 必须是列表'}), 400 reorder_wishes(order) return jsonify({'ok': True}) # ── 启动 ────────────────────────────────────────────── if __name__ == '__main__': init_db() app.config['TEMPLATES_AUTO_RELOAD'] = True port = int(os.environ.get('PORT', 5058)) app.run(host='0.0.0.0', port=port, debug=False)