# -*- coding: utf-8 -*- """ziwei-power SQLite 数据库操作层""" import sqlite3 import json import os from datetime import datetime DB_DIR = os.path.join(os.path.expanduser('~'), '.workbuddy', 'data', 'ziwei-power') os.makedirs(DB_DIR, exist_ok=True) DB_PATH = os.path.join(DB_DIR, 'ziwei_power.db') # 当前数据库 schema 版本 —— 改表结构时必须 +1 并补迁移逻辑 CURRENT_SCHEMA_VERSION = 1 def get_conn(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def _get_schema_version(conn): """读取当前数据库的 schema 版本,无表时返回 0""" conn.execute(''' CREATE TABLE IF NOT EXISTS schema_version ( version INTEGER NOT NULL ) ''') row = conn.execute('SELECT version FROM schema_version').fetchone() return row['version'] if row else 0 def _set_schema_version(conn, version): """写入 schema 版本""" conn.execute('DELETE FROM schema_version') conn.execute('INSERT INTO schema_version (version) VALUES (?)', (version,)) def init_db(): """初始化数据库表 & 自动迁移""" conn = get_conn() current = _get_schema_version(conn) # ── 迁移步骤(按版本号递增)──────────────────────── if current < 1: # v1: 初始表结构 conn.execute(''' CREATE TABLE IF NOT EXISTS checkins ( id INTEGER PRIMARY KEY AUTOINCREMENT, date TEXT UNIQUE NOT NULL, data TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) ''') # ── 将来加字段/改表在此扩展 ── # if current < 2: # conn.execute('ALTER TABLE checkins ADD COLUMN tags TEXT DEFAULT ""') # # 可选:对已有行做数据补全 # conn.execute("UPDATE checkins SET tags = '[]' WHERE tags IS NULL") # ── 写入最新版本号 ── _set_schema_version(conn, CURRENT_SCHEMA_VERSION) conn.commit() conn.close() def get_checkin(date_str): """获取某天的打卡记录,返回 dict 或 None""" conn = get_conn() row = conn.execute('SELECT * FROM checkins WHERE date = ?', (date_str,)).fetchone() conn.close() if row: return { 'id': row['id'], 'date': row['date'], 'data': json.loads(row['data']), 'created_at': row['created_at'], 'updated_at': row['updated_at'] } return None def save_checkin(date_str, data_dict): """保存或更新打卡记录""" now = datetime.now().isoformat() conn = get_conn() existing = conn.execute('SELECT id FROM checkins WHERE date = ?', (date_str,)).fetchone() json_data = json.dumps(data_dict, ensure_ascii=False) if existing: conn.execute( 'UPDATE checkins SET data = ?, updated_at = ? WHERE date = ?', (json_data, now, date_str) ) else: conn.execute( 'INSERT INTO checkins (date, data, created_at, updated_at) VALUES (?, ?, ?, ?)', (date_str, json_data, now, now) ) conn.commit() conn.close() def delete_checkin(date_str): """删除某天的打卡记录""" conn = get_conn() conn.execute('DELETE FROM checkins WHERE date = ?', (date_str,)) conn.commit() conn.close() def get_all_checkins(): """获取所有打卡记录,按日期倒序""" conn = get_conn() rows = conn.execute('SELECT * FROM checkins ORDER BY date DESC').fetchall() conn.close() results = [] for row in rows: results.append({ 'id': row['id'], 'date': row['date'], 'data': json.loads(row['data']), 'created_at': row['created_at'], 'updated_at': row['updated_at'] }) return results