From 14ffcf4bd3f6ed6d7af6f8171d46ff81d4f17e83 Mon Sep 17 00:00:00 2001 From: megaproxy Date: Sat, 9 May 2026 00:05:12 +0100 Subject: [PATCH] Add Rust modules: paths, jsonl, usage, watch, commands, state, settings --- src-tauri/src/commands.rs | 72 +++++++++ src-tauri/src/jsonl.rs | 235 +++++++++++++++++++++++++++++ src-tauri/src/lib.rs | 72 +++++++++ src-tauri/src/main.rs | 6 + src-tauri/src/paths.rs | 155 +++++++++++++++++++ src-tauri/src/settings.rs | 87 +++++++++++ src-tauri/src/state.rs | 58 +++++++ src-tauri/src/usage.rs | 308 ++++++++++++++++++++++++++++++++++++++ src-tauri/src/watch.rs | 233 ++++++++++++++++++++++++++++ 9 files changed, 1226 insertions(+) create mode 100644 src-tauri/src/commands.rs create mode 100644 src-tauri/src/jsonl.rs create mode 100644 src-tauri/src/lib.rs create mode 100644 src-tauri/src/main.rs create mode 100644 src-tauri/src/paths.rs create mode 100644 src-tauri/src/settings.rs create mode 100644 src-tauri/src/state.rs create mode 100644 src-tauri/src/usage.rs create mode 100644 src-tauri/src/watch.rs diff --git a/src-tauri/src/commands.rs b/src-tauri/src/commands.rs new file mode 100644 index 0000000..8209e48 --- /dev/null +++ b/src-tauri/src/commands.rs @@ -0,0 +1,72 @@ +//! Tauri command surface — every JS-callable function lives here. + +use crate::paths::{list_wsl_distros, resolve_roots, ResolvedRoots}; +use crate::settings::{save as save_settings, Settings}; +use crate::state::SharedState; +use crate::usage::{build_snapshot, UsageSnapshot}; +use crate::watch::refresh_and_emit; + +#[tauri::command] +pub async fn get_snapshot(state: tauri::State<'_, SharedState>) -> Result { + let mut events = state.collect_events(); + events.sort_by_key(|e| e.ts); + let caps = state.settings.read().caps.clone(); + Ok(build_snapshot(&events, &caps, chrono::Utc::now())) +} + +#[tauri::command] +pub async fn get_settings(state: tauri::State<'_, SharedState>) -> Result { + Ok(state.settings.read().clone()) +} + +#[tauri::command] +pub async fn set_settings( + state: tauri::State<'_, SharedState>, + app: tauri::AppHandle, + new: Settings, +) -> Result<(), String> { + { + *state.settings.write() = new.clone(); + } + save_settings(&new).map_err(|e| e.to_string())?; + + // If roots-related settings changed, force a re-resolve + rescan. + { + let mut roots = state.roots.write(); + roots.clear(); + } + refresh_and_emit(&app, state.inner(), None) + .await + .map_err(|e| e.to_string())?; + Ok(()) +} + +#[tauri::command] +pub async fn list_distros() -> Result, String> { + list_wsl_distros().map_err(|e| e.to_string()) +} + +#[tauri::command] +pub async fn get_roots(state: tauri::State<'_, SharedState>) -> Result { + let s = state.settings.read().clone(); + Ok(resolve_roots(s.wsl_distro_override.as_deref(), s.include_native)) +} + +#[tauri::command] +pub async fn force_rescan( + state: tauri::State<'_, SharedState>, + app: tauri::AppHandle, +) -> Result<(), String> { + // Wipe caches so every file is reparsed from offset 0. + state.files.write().clear(); + state.seen_ids.write().clear(); + refresh_and_emit(&app, state.inner(), None) + .await + .map_err(|e| e.to_string()) +} + +#[tauri::command] +pub async fn quit_app(app: tauri::AppHandle) -> Result<(), String> { + app.exit(0); + Ok(()) +} diff --git a/src-tauri/src/jsonl.rs b/src-tauri/src/jsonl.rs new file mode 100644 index 0000000..fa89afd --- /dev/null +++ b/src-tauri/src/jsonl.rs @@ -0,0 +1,235 @@ +//! Stream-parse Claude Code JSONL transcripts and normalize them into +//! [`UsageEvent`]s the rest of the app can sum. +//! +//! Field shape (verified against actual transcripts on 2026-05-08): +//! +//! { type:"assistant", timestamp:ISO8601, sessionId, requestId, uuid, +//! message: { model: "claude-opus-4-7", +//! usage: { input_tokens, output_tokens, +//! cache_creation_input_tokens, +//! cache_read_input_tokens } } } + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::fs::File; +use std::io::{BufRead, BufReader, Seek, SeekFrom}; +use std::path::{Path, PathBuf}; +use walkdir::WalkDir; + +/// Raw shape we care about. Everything else is ignored — the format gains +/// fields between Claude Code releases and we don't want to break on those. +#[derive(Debug, Deserialize)] +struct RawLine { + #[serde(rename = "type")] + kind: Option, + timestamp: Option>, + #[serde(rename = "sessionId")] + _session_id: Option, + #[serde(rename = "requestId")] + request_id: Option, + uuid: Option, + message: Option, +} + +#[derive(Debug, Deserialize)] +struct RawMessage { + model: Option, + usage: Option, +} + +#[derive(Debug, Deserialize, Default)] +struct RawUsage { + #[serde(default)] + input_tokens: u64, + #[serde(default)] + output_tokens: u64, + #[serde(default)] + cache_creation_input_tokens: u64, + #[serde(default)] + cache_read_input_tokens: u64, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] +#[serde(rename_all = "lowercase")] +pub enum ModelFamily { + Opus, + Sonnet, + Haiku, + Other, +} + +#[derive(Debug, Clone, Serialize)] +pub struct UsageEvent { + pub ts: DateTime, + pub model_raw: String, + pub model_label: String, + pub model_family: ModelFamily, + pub input: u64, + pub output: u64, + pub cache_create: u64, + pub cache_read: u64, + pub total: u64, + pub dedupe_key: String, + pub source_file: PathBuf, +} + +/// "claude-opus-4-7" → ("Opus 4.7", Opus) +/// "claude-sonnet-4-6" → ("Sonnet 4.6", Sonnet) +/// "claude-haiku-4-5-20251001" → ("Haiku 4.5", Haiku) +pub fn normalize_model(raw: &str) -> (String, ModelFamily) { + let lower = raw.to_lowercase(); + let family = if lower.contains("opus") { + ModelFamily::Opus + } else if lower.contains("sonnet") { + ModelFamily::Sonnet + } else if lower.contains("haiku") { + ModelFamily::Haiku + } else { + ModelFamily::Other + }; + + // Pull the first major.minor segment (e.g. "4-7" → "4.7"). If we can't + // find one, just title-case the family name. + let mut version: Option = None; + let parts: Vec<&str> = lower.split('-').collect(); + for i in 0..parts.len().saturating_sub(1) { + if parts[i].parse::().is_ok() && parts[i + 1].parse::().is_ok() { + version = Some(format!("{}.{}", parts[i], parts[i + 1])); + break; + } + } + + let family_str = match family { + ModelFamily::Opus => "Opus", + ModelFamily::Sonnet => "Sonnet", + ModelFamily::Haiku => "Haiku", + ModelFamily::Other => "Model", + }; + + let label = match version { + Some(v) => format!("{family_str} {v}"), + None => family_str.to_string(), + }; + + (label, family) +} + +/// Walk a root and return every `*.jsonl` (including nested `subagents/`). +pub fn enumerate_jsonl(root: &Path) -> Vec { + WalkDir::new(root) + .follow_links(false) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_file()) + .filter(|e| { + e.path() + .extension() + .and_then(|s| s.to_str()) + .map(|s| s.eq_ignore_ascii_case("jsonl")) + .unwrap_or(false) + }) + .map(|e| e.into_path()) + .collect() +} + +/// Tail-read from `start_offset`. Returns `(new_events, new_offset)`. +/// +/// Only advances past fully-terminated lines — a half-flushed last line is +/// reparsed on the next call once it gets its trailing newline. +/// +/// Caller is responsible for deduping returned events against a global +/// `seen_ids` set (subagent transcripts overlap with their parent). +pub fn parse_jsonl_from( + path: &Path, + start_offset: u64, +) -> anyhow::Result<(Vec, u64)> { + let mut f = File::open(path)?; + let len = f.metadata()?.len(); + + // File rotated / truncated: start over. + let start = if start_offset > len { 0 } else { start_offset }; + f.seek(SeekFrom::Start(start))?; + + let mut reader = BufReader::new(f); + let mut events: Vec = Vec::new(); + let mut offset = start; + + loop { + let mut buf: Vec = Vec::new(); + let n = reader.read_until(b'\n', &mut buf)?; + if n == 0 { + break; + } + // No newline → partial flush, leave it for next call. + if !buf.ends_with(b"\n") { + break; + } + offset += n as u64; + + let line = std::str::from_utf8(&buf).map(str::trim).unwrap_or(""); + if line.is_empty() { + continue; + } + + let parsed: RawLine = match serde_json::from_str(line) { + Ok(v) => v, + Err(_) => continue, // Skip malformed lines silently — the file is + // continually appended; partial writes happen. + }; + + if parsed.kind.as_deref() != Some("assistant") { + continue; + } + + let (Some(ts), Some(msg)) = (parsed.timestamp, parsed.message) else { + continue; + }; + let Some(usage) = msg.usage else { continue }; + + let model_raw = msg.model.unwrap_or_else(|| "unknown".to_string()); + let (model_label, model_family) = normalize_model(&model_raw); + + let total = usage.input_tokens + + usage.output_tokens + + usage.cache_creation_input_tokens + + usage.cache_read_input_tokens; + if total == 0 { + continue; + } + + let dedupe_key = parsed + .request_id + .or(parsed.uuid) + .unwrap_or_else(|| format!("{}:{}", path.display(), offset)); + + events.push(UsageEvent { + ts, + model_raw, + model_label, + model_family, + input: usage.input_tokens, + output: usage.output_tokens, + cache_create: usage.cache_creation_input_tokens, + cache_read: usage.cache_read_input_tokens, + total, + dedupe_key, + source_file: path.to_path_buf(), + }); + } + + Ok((events, offset)) +} + +/// Read the file's mtime (in nanos) and size — used to skip unchanged files +/// during the 60s poll fallback. +pub fn stat(path: &Path) -> Option<(i128, u64)> { + let m = std::fs::metadata(path).ok()?; + let mtime = m + .modified() + .ok() + .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) + .map(|d| d.as_nanos() as i128) + .unwrap_or(0); + Some((mtime, m.len())) +} + diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs new file mode 100644 index 0000000..36cc7f4 --- /dev/null +++ b/src-tauri/src/lib.rs @@ -0,0 +1,72 @@ +//! Library entry point. `main.rs` calls `run()`. +//! +//! Wires Tauri builder, plugins, app state, and the watcher. + +mod commands; +mod jsonl; +mod paths; +mod settings; +mod state; +mod usage; +mod watch; + +use tauri::Manager; + +pub fn run() { + // Logs go to stderr; in release on Windows there's no console attached, + // which is fine — RUST_LOG=debug from a console launch will surface them. + let _ = tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .with_writer(std::io::stderr) + .try_init(); + + tauri::Builder::default() + .plugin(tauri_plugin_autostart::init( + // MacosLauncher is ignored on Windows but the constructor requires it. + tauri_plugin_autostart::MacosLauncher::LaunchAgent, + Some(vec![]), + )) + .plugin(tauri_plugin_store::Builder::new().build()) + .plugin(tauri_plugin_dialog::init()) + .setup(|app| { + let settings = settings::load(); + let shared = state::AppState::new(settings); + app.manage(shared.clone()); + + let handle = app.handle().clone(); + let state_for_task = shared.clone(); + tauri::async_runtime::spawn(async move { + // Initial scan + emit. + if let Err(e) = + watch::refresh_and_emit(&handle, &state_for_task, None).await + { + tracing::warn!("initial refresh failed: {e:#}"); + } + // Start the watcher; keep it alive for the lifetime of the app. + match watch::start(handle.clone(), state_for_task.clone()) { + Ok(w) => { + // Move into a long-lived store on the AppState so it + // isn't dropped (which would unregister the watcher). + state_for_task.set_watcher(w); + } + Err(e) => tracing::warn!("watcher start failed: {e:#}"), + } + }); + + Ok(()) + }) + .invoke_handler(tauri::generate_handler![ + commands::get_snapshot, + commands::get_settings, + commands::set_settings, + commands::list_distros, + commands::get_roots, + commands::force_rescan, + commands::quit_app, + ]) + .run(tauri::generate_context!()) + .expect("error while running tauri application"); +} diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs new file mode 100644 index 0000000..6641b69 --- /dev/null +++ b/src-tauri/src/main.rs @@ -0,0 +1,6 @@ +// Hide the console window in release builds. In debug we keep it for tracing logs. +#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")] + +fn main() { + claude_usage_widget_lib::run(); +} diff --git a/src-tauri/src/paths.rs b/src-tauri/src/paths.rs new file mode 100644 index 0000000..0ee0bf1 --- /dev/null +++ b/src-tauri/src/paths.rs @@ -0,0 +1,155 @@ +//! WSL distro detection and Claude Code projects-root resolution. +//! +//! Discovery order: +//! 1. settings.wsl_distro_override → \\wsl$\\home\\.claude\projects +//! 2. settings.include_native (true) → %USERPROFILE%\.claude\projects +//! 3. autodetect via `wsl.exe -l -q` → first hit per distro +//! +//! All hits are unioned, canonicalized, and deduped. + +use serde::Serialize; +use std::path::{Path, PathBuf}; + +#[derive(Debug, Clone, Serialize)] +pub struct ResolvedRoots { + pub roots: Vec, + pub wsl_distro: Option, + pub native_present: bool, +} + +/// `wsl.exe -l -q` lists installed distros, one per line, in UTF-16LE. +/// On non-Windows platforms (or when wsl.exe isn't on PATH) returns Ok(empty). +pub fn list_wsl_distros() -> anyhow::Result> { + if !cfg!(windows) { + return Ok(Vec::new()); + } + + let out = match std::process::Command::new("wsl.exe") + .args(["-l", "-q"]) + .output() + { + Ok(o) => o, + Err(e) => { + tracing::debug!("wsl.exe not available: {e}"); + return Ok(Vec::new()); + } + }; + + if !out.status.success() { + return Ok(Vec::new()); + } + + // UTF-16LE with BOM. Decode lossily and strip nulls / BOM / CR. + let raw_u16: Vec = out + .stdout + .chunks_exact(2) + .map(|b| u16::from_le_bytes([b[0], b[1]])) + .collect(); + let decoded = String::from_utf16_lossy(&raw_u16); + + let distros: Vec = decoded + .lines() + .map(|l| l.trim_matches(|c: char| c == '\u{FEFF}' || c.is_whitespace()).to_string()) + .filter(|l| !l.is_empty()) + .collect(); + + Ok(distros) +} + +/// `\\wsl$\\home\\.claude\projects` if it exists. +/// +/// Tries `$USERNAME` first, then falls back to scanning `\\wsl$\\home\` +/// for the first subdirectory that contains a readable `.claude/projects/`. +pub fn wsl_projects_path(distro: &str) -> Option { + let home_root = PathBuf::from(format!(r"\\wsl$\{}\home", distro)); + if !dir_exists(&home_root) { + return None; + } + + // 1. Try $USERNAME (which on Windows usually matches the WSL user). + if let Ok(name) = std::env::var("USERNAME") { + let candidate = home_root.join(&name).join(".claude").join("projects"); + if dir_exists(&candidate) { + return Some(candidate); + } + } + + // 2. Scan home dirs. + let entries = std::fs::read_dir(&home_root).ok()?; + for entry in entries.flatten() { + let p = entry.path().join(".claude").join("projects"); + if dir_exists(&p) { + return Some(p); + } + } + + None +} + +/// `%USERPROFILE%\.claude\projects` if it exists. +pub fn native_projects_path() -> Option { + let home = dirs::home_dir()?; + let p = home.join(".claude").join("projects"); + dir_exists(&p).then_some(p) +} + +pub fn dir_exists(p: &Path) -> bool { + std::fs::metadata(p).map(|m| m.is_dir()).unwrap_or(false) +} + +/// Resolve all roots that should be scanned right now. +/// +/// `override_distro` short-circuits the autodetect: if Some, only that distro +/// is consulted (in addition to the native path if `include_native`). +pub fn resolve_roots( + override_distro: Option<&str>, + include_native: bool, +) -> ResolvedRoots { + let mut roots: Vec = Vec::new(); + let mut chosen_distro: Option = None; + + let native = native_projects_path(); + let native_present = native.is_some(); + if include_native { + if let Some(p) = native.clone() { + roots.push(p); + } + } + + match override_distro { + Some(d) => { + if let Some(p) = wsl_projects_path(d) { + roots.push(p); + chosen_distro = Some(d.to_string()); + } + } + None => { + if let Ok(distros) = list_wsl_distros() { + for d in &distros { + if let Some(p) = wsl_projects_path(d) { + roots.push(p); + if chosen_distro.is_none() { + chosen_distro = Some(d.clone()); + } + } + } + } + } + } + + // Canonicalize + dedupe. canonicalize() can fail on UNC paths on some + // Windows builds; if so, fall back to the raw path so we don't drop hits. + let mut canon: Vec = Vec::new(); + for p in roots { + let c = std::fs::canonicalize(&p).unwrap_or(p); + if !canon.contains(&c) { + canon.push(c); + } + } + + ResolvedRoots { + roots: canon, + wsl_distro: chosen_distro, + native_present, + } +} diff --git a/src-tauri/src/settings.rs b/src-tauri/src/settings.rs new file mode 100644 index 0000000..fd80c92 --- /dev/null +++ b/src-tauri/src/settings.rs @@ -0,0 +1,87 @@ +//! On-disk settings: caps, distro override, autostart toggle, window pos. +//! +//! Stored as plain JSON at `%APPDATA%\claude-widget\config.json` on Windows, +//! `~/.config/claude-widget/config.json` elsewhere. + +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Caps { + pub block_tokens: u64, + pub weekly_tokens: u64, +} + +impl Default for Caps { + fn default() -> Self { + // Placeholder values — user tunes them in Settings once they have + // a few weeks of real data. There is no authoritative local source + // for the actual Anthropic-side caps. + Self { + block_tokens: 200_000, + weekly_tokens: 2_000_000, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct Settings { + pub caps: Caps, + /// Force a specific WSL distro instead of auto-detecting. None = auto. + pub wsl_distro_override: Option, + /// Whether to also scan `%USERPROFILE%\.claude\projects\` (native install). + pub include_native: bool, + /// Last-saved window position so it survives a restart. + pub window_pos: Option<(i32, i32)>, + /// Mirror of the Tauri autostart plugin state for UI display. + pub autostart: bool, +} + +impl Default for Settings { + fn default() -> Self { + Self { + caps: Caps::default(), + wsl_distro_override: None, + include_native: true, + window_pos: None, + autostart: false, + } + } +} + +pub fn config_dir() -> PathBuf { + // `dirs::config_dir()` returns %APPDATA% on Windows and ~/.config on Linux. + let base = dirs::config_dir().unwrap_or_else(|| PathBuf::from(".")); + base.join("claude-widget") +} + +pub fn config_path() -> PathBuf { + config_dir().join("config.json") +} + +pub fn load() -> Settings { + let path = config_path(); + let Ok(bytes) = std::fs::read(&path) else { + return Settings::default(); + }; + match serde_json::from_slice::(&bytes) { + Ok(s) => s, + Err(e) => { + tracing::warn!("settings file at {} unreadable ({}), using defaults", path.display(), e); + Settings::default() + } + } +} + +pub fn save(s: &Settings) -> anyhow::Result<()> { + let dir = config_dir(); + std::fs::create_dir_all(&dir)?; + let path = config_path(); + let bytes = serde_json::to_vec_pretty(s)?; + // Atomic-ish write: tmp file + rename. Avoids corruption on crash. + let tmp = path.with_extension("json.tmp"); + std::fs::write(&tmp, bytes)?; + std::fs::rename(&tmp, &path)?; + Ok(()) +} diff --git a/src-tauri/src/state.rs b/src-tauri/src/state.rs new file mode 100644 index 0000000..508cbea --- /dev/null +++ b/src-tauri/src/state.rs @@ -0,0 +1,58 @@ +//! Process-global mutable state. + +use parking_lot::{Mutex, RwLock}; +use std::collections::{HashMap, HashSet}; +use std::path::PathBuf; +use std::sync::Arc; + +use crate::jsonl::UsageEvent; +use crate::settings::Settings; +use crate::watch::WatcherHandle; + +/// Per-file parse cache. We resume reading from `byte_offset` on the next +/// poll, so we never re-parse already-seen lines. +pub struct FileCache { + pub mtime_ns: i128, + pub size: u64, + pub byte_offset: u64, + pub messages: Vec, +} + +pub struct AppState { + pub roots: RwLock>, + pub files: RwLock>, + pub seen_ids: RwLock>, + pub settings: RwLock, + /// Boxed so we can keep the watcher alive across the whole app lifetime + /// without polluting Tauri's setup hook. + pub watcher: Mutex>, +} + +pub type SharedState = Arc; + +impl AppState { + pub fn new(settings: Settings) -> SharedState { + Arc::new(Self { + roots: RwLock::new(Vec::new()), + files: RwLock::new(HashMap::new()), + seen_ids: RwLock::new(HashSet::new()), + settings: RwLock::new(settings), + watcher: Mutex::new(None), + }) + } + + pub fn set_watcher(&self, w: WatcherHandle) { + *self.watcher.lock() = Some(w); + } + + /// Snapshot all events across all cached files in one allocation. + /// Caller is responsible for sorting if needed. + pub fn collect_events(&self) -> Vec { + let files = self.files.read(); + let mut out = Vec::with_capacity(files.values().map(|f| f.messages.len()).sum()); + for f in files.values() { + out.extend(f.messages.iter().cloned()); + } + out + } +} diff --git a/src-tauri/src/usage.rs b/src-tauri/src/usage.rs new file mode 100644 index 0000000..7d0ce73 --- /dev/null +++ b/src-tauri/src/usage.rs @@ -0,0 +1,308 @@ +//! Aggregation: events → 5-hour blocks + 7-day rolling window + per-model. +//! +//! The block algorithm is the same one ccusage uses: +//! - block_start = floor_to_hour(first event's ts) +//! - block_end = block_start + 5h +//! - new block when no previous, OR ts >= prev.block_end, OR +//! gap (ts - prev_event.ts) >= 5h +//! +//! Weekly is "past 7 days from now" — Anthropic's actual Max-plan reset day +//! is buggy and shifts, so a calendar anchor would lie. + +use chrono::{DateTime, Datelike, Duration, TimeZone, Timelike, Utc}; +use serde::Serialize; + +use crate::jsonl::{ModelFamily, UsageEvent}; +use crate::settings::Caps; + +const BLOCK_HOURS: i64 = 5; + +#[derive(Debug, Clone, Serialize)] +pub struct ModelBreakdown { + pub opus: u64, + pub sonnet: u64, + pub haiku: u64, + pub other: u64, +} + +impl Default for ModelBreakdown { + fn default() -> Self { + Self { opus: 0, sonnet: 0, haiku: 0, other: 0 } + } +} + +impl ModelBreakdown { + fn add(&mut self, family: ModelFamily, n: u64) { + match family { + ModelFamily::Opus => self.opus += n, + ModelFamily::Sonnet => self.sonnet += n, + ModelFamily::Haiku => self.haiku += n, + ModelFamily::Other => self.other += n, + } + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct BlockSummary { + pub block_start: DateTime, + pub block_end: DateTime, + pub now: DateTime, + pub seconds_remaining: i64, + pub total_tokens: u64, + pub by_family: ModelBreakdown, + pub message_count: u64, +} + +#[derive(Debug, Clone, Serialize)] +pub struct DayBucket { + /// `YYYY-MM-DD` in the user's local timezone. + pub date_local: String, + pub total_tokens: u64, +} + +#[derive(Debug, Clone, Serialize)] +pub struct WeeklySummary { + pub window_start: DateTime, + pub window_end: DateTime, + pub total_tokens: u64, + pub by_day: [DayBucket; 7], + pub by_family: ModelBreakdown, +} + +#[derive(Debug, Clone, Serialize)] +pub struct UsageSnapshot { + pub block: Option, + pub weekly: WeeklySummary, + pub caps: Caps, + pub generated_at: DateTime, +} + +pub fn floor_to_hour(ts: DateTime) -> DateTime { + Utc.with_ymd_and_hms(ts.year(), ts.month(), ts.day(), ts.hour(), 0, 0) + .single() + .unwrap_or(ts) +} + +/// Walk a chronologically-sorted slice of events and produce one block per +/// detected gap or 5h-end boundary. Events MUST be sorted by `ts` ascending. +pub fn compute_blocks(events: &[UsageEvent]) -> Vec { + let mut out: Vec = Vec::new(); + if events.is_empty() { + return out; + } + + let block_window = Duration::hours(BLOCK_HOURS); + let now = Utc::now(); + + // Working accumulator for the current block. + struct Acc { + start: DateTime, + end: DateTime, + total: u64, + breakdown: ModelBreakdown, + msgs: u64, + } + + let close = |a: Acc, now: DateTime| -> BlockSummary { + BlockSummary { + block_start: a.start, + block_end: a.end, + now, + seconds_remaining: (a.end - now).num_seconds(), + total_tokens: a.total, + by_family: a.breakdown, + message_count: a.msgs, + } + }; + + let mut acc: Option = None; + let mut prev_ts: Option> = None; + + for ev in events { + let starts_new_block = match (&acc, prev_ts) { + (None, _) => true, + (Some(a), Some(prev)) => ev.ts >= a.end || (ev.ts - prev) >= block_window, + (Some(a), None) => ev.ts >= a.end, + }; + + if starts_new_block { + if let Some(a) = acc.take() { + out.push(close(a, now)); + } + let start = floor_to_hour(ev.ts); + acc = Some(Acc { + start, + end: start + block_window, + total: 0, + breakdown: ModelBreakdown::default(), + msgs: 0, + }); + } + + let a = acc.as_mut().expect("acc just initialized"); + a.total += ev.total; + a.breakdown.add(ev.model_family, ev.total); + a.msgs += 1; + prev_ts = Some(ev.ts); + } + + if let Some(a) = acc { + out.push(close(a, now)); + } + out +} + +/// Pick the block that contains `now`. Falls back to the most recent block +/// whose end is in the future. Returns `None` if there are no blocks. +pub fn active_block(blocks: &[BlockSummary], now: DateTime) -> Option { + blocks + .iter() + .rev() + .find(|b| now >= b.block_start && now < b.block_end) + .or_else(|| blocks.iter().rev().find(|b| b.block_end > now)) + .cloned() +} + +/// Rolling 7-day window from `now`. `by_day[0]` is the oldest day, +/// `by_day[6]` is today. Day boundaries are in the user's local timezone. +pub fn compute_weekly(events: &[UsageEvent], now: DateTime) -> WeeklySummary { + let window_start = now - Duration::days(7); + let mut total: u64 = 0; + let mut breakdown = ModelBreakdown::default(); + + let today_local = chrono::Local::now().date_naive(); + let mut buckets: [DayBucket; 7] = std::array::from_fn(|i| { + let date = today_local - chrono::Duration::days(6 - i as i64); + DayBucket { + date_local: date.format("%Y-%m-%d").to_string(), + total_tokens: 0, + } + }); + + for ev in events { + if ev.ts < window_start { + continue; + } + total += ev.total; + breakdown.add(ev.model_family, ev.total); + + let ev_local = ev.ts.with_timezone(&chrono::Local).date_naive(); + let day_diff = (today_local - ev_local).num_days(); + if (0..=6).contains(&day_diff) { + let idx = (6 - day_diff) as usize; + buckets[idx].total_tokens += ev.total; + } + } + + WeeklySummary { + window_start, + window_end: now, + total_tokens: total, + by_day: buckets, + by_family: breakdown, + } +} + +/// One-shot: events → snapshot for the frontend. +/// `events` must be sorted by `ts` ascending. +pub fn build_snapshot( + events: &[UsageEvent], + caps: &Caps, + now: DateTime, +) -> UsageSnapshot { + let blocks = compute_blocks(events); + let block = active_block(&blocks, now); + let weekly = compute_weekly(events, now); + UsageSnapshot { + block, + weekly, + caps: caps.clone(), + generated_at: now, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::jsonl::{ModelFamily, UsageEvent}; + use std::path::PathBuf; + + fn ev(ts: &str, total: u64, family: ModelFamily) -> UsageEvent { + let ts: DateTime = ts.parse().unwrap(); + UsageEvent { + ts, + model_raw: "test".into(), + model_label: "Test".into(), + model_family: family, + input: 0, + output: 0, + cache_create: 0, + cache_read: 0, + total, + dedupe_key: format!("k-{}", ts.timestamp_nanos_opt().unwrap_or(0)), + source_file: PathBuf::new(), + } + } + + #[test] + fn block_starts_at_hour_floor() { + let events = vec![ev("2026-05-08T22:37:21Z", 100, ModelFamily::Opus)]; + let blocks = compute_blocks(&events); + assert_eq!(blocks.len(), 1); + assert_eq!(blocks[0].block_start.hour(), 22); + assert_eq!(blocks[0].block_start.minute(), 0); + assert_eq!((blocks[0].block_end - blocks[0].block_start).num_hours(), 5); + assert_eq!(blocks[0].total_tokens, 100); + } + + #[test] + fn five_hour_gap_starts_new_block() { + let events = vec![ + ev("2026-05-08T10:00:00Z", 100, ModelFamily::Opus), + ev("2026-05-08T15:30:00Z", 200, ModelFamily::Sonnet), // 5h30m gap + ]; + let blocks = compute_blocks(&events); + assert_eq!(blocks.len(), 2); + assert_eq!(blocks[0].total_tokens, 100); + assert_eq!(blocks[1].total_tokens, 200); + assert_eq!(blocks[1].by_family.sonnet, 200); + } + + #[test] + fn within_block_aggregates() { + let events = vec![ + ev("2026-05-08T10:00:00Z", 100, ModelFamily::Opus), + ev("2026-05-08T11:00:00Z", 200, ModelFamily::Opus), + ev("2026-05-08T12:30:00Z", 300, ModelFamily::Sonnet), + ]; + let blocks = compute_blocks(&events); + assert_eq!(blocks.len(), 1); + assert_eq!(blocks[0].total_tokens, 600); + assert_eq!(blocks[0].by_family.opus, 300); + assert_eq!(blocks[0].by_family.sonnet, 300); + assert_eq!(blocks[0].message_count, 3); + } + + #[test] + fn block_end_starts_new_block() { + // Gap < 5h but the previous block_end was reached. + let events = vec![ + ev("2026-05-08T10:00:00Z", 100, ModelFamily::Opus), // block_end = 15:00 + ev("2026-05-08T15:30:00Z", 200, ModelFamily::Opus), // > 15:00 → new block + ]; + let blocks = compute_blocks(&events); + assert_eq!(blocks.len(), 2); + } + + #[test] + fn weekly_buckets_today_oldest_first() { + // We can't pin "today" in tests deterministically, but we can at least + // assert the array shape and that a recent event lands in the last bucket. + let now = Utc::now(); + let events = vec![ev(&now.to_rfc3339(), 500, ModelFamily::Opus)]; + let w = compute_weekly(&events, now); + assert_eq!(w.by_day.len(), 7); + assert_eq!(w.total_tokens, 500); + assert_eq!(w.by_day[6].total_tokens, 500); + } +} diff --git a/src-tauri/src/watch.rs b/src-tauri/src/watch.rs new file mode 100644 index 0000000..9aad267 --- /dev/null +++ b/src-tauri/src/watch.rs @@ -0,0 +1,233 @@ +//! File watcher + 60s tokio poll fallback. +//! +//! `notify_debouncer_full` provides recursive watching with burst smoothing. +//! ReadDirectoryChangesW on the WSL 9P mount (`\\wsl$\...`) is unreliable, so +//! a 60s tokio interval also runs `refresh_and_emit` to backstop missed events. + +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::time::Duration; + +use anyhow::Context; +use notify::{RecommendedWatcher, RecursiveMode}; +use notify_debouncer_full::{new_debouncer, DebounceEventResult, FileIdMap, Debouncer}; +use tauri::{AppHandle, Emitter, Manager}; +use tokio::sync::mpsc; + +use crate::jsonl::{enumerate_jsonl, parse_jsonl_from, stat as file_stat, UsageEvent}; +use crate::paths::resolve_roots; +use crate::state::{AppState, FileCache, SharedState}; +use crate::usage::build_snapshot; + +/// Lives for the lifetime of the app (held by AppState). Drop unregisters. +pub struct WatcherHandle { + _debouncer: Debouncer, + _poll_task: tokio::task::JoinHandle<()>, + _consumer_task: tokio::task::JoinHandle<()>, +} + +pub fn start(app: AppHandle, state: SharedState) -> anyhow::Result { + // Channel from the debouncer thread → async consumer task. + let (tx, mut rx) = mpsc::unbounded_channel::>(); + + let tx_for_handler = tx.clone(); + let mut debouncer = new_debouncer( + Duration::from_millis(250), + None, + move |result: DebounceEventResult| { + let Ok(events) = result else { return }; + let mut paths: Vec = events + .into_iter() + .flat_map(|e| e.event.paths) + .filter(|p| { + p.extension() + .and_then(|s| s.to_str()) + .map(|s| s.eq_ignore_ascii_case("jsonl")) + .unwrap_or(false) + }) + .collect(); + paths.sort(); + paths.dedup(); + if paths.is_empty() { + return; + } + let _ = tx_for_handler.send(paths); + }, + ) + .context("create debouncer")?; + + // Watch every currently-resolved root. + let roots: Vec = state.roots.read().clone(); + for root in &roots { + if let Err(e) = debouncer + .watcher() + .watch(root, RecursiveMode::Recursive) + { + tracing::warn!("could not watch {}: {e}", root.display()); + } + } + + // Consumer task: reparses changed files and emits. + let app_for_consumer = app.clone(); + let state_for_consumer = state.clone(); + let consumer_task = tauri::async_runtime::spawn(async move { + while let Some(paths) = rx.recv().await { + if let Err(e) = + refresh_and_emit(&app_for_consumer, &state_for_consumer, Some(&paths)) + .await + { + tracing::warn!("incremental refresh failed: {e:#}"); + } + } + }); + + // 60s poll fallback. + let app_for_poll = app.clone(); + let state_for_poll = state.clone(); + let poll_task = tauri::async_runtime::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(60)); + // First tick fires immediately — skip it; `setup` already did a full scan. + interval.tick().await; + loop { + interval.tick().await; + if let Err(e) = refresh_and_emit(&app_for_poll, &state_for_poll, None).await { + tracing::warn!("poll refresh failed: {e:#}"); + } + } + }); + + Ok(WatcherHandle { + _debouncer: debouncer, + _poll_task: poll_task, + _consumer_task: consumer_task, + }) +} + +/// Reparse the given files (or all roots if `changed` is None), update the +/// cache, rebuild a snapshot, and emit `usage-updated` to the frontend. +pub async fn refresh_and_emit( + app: &AppHandle, + state: &SharedState, + changed: Option<&[PathBuf]>, +) -> anyhow::Result<()> { + // Re-resolve roots if we don't have any yet (cold start) or on full poll. + if state.roots.read().is_empty() || changed.is_none() { + let settings = state.settings.read().clone(); + let resolved = resolve_roots( + settings.wsl_distro_override.as_deref(), + settings.include_native, + ); + let mut roots = state.roots.write(); + *roots = resolved.roots; + } + + let files_to_check: Vec = match changed { + Some(paths) => paths.to_vec(), + None => { + let roots = state.roots.read().clone(); + let mut all = Vec::new(); + for r in &roots { + all.extend(enumerate_jsonl(r)); + } + all + } + }; + + let mut new_events: Vec = Vec::new(); + + for path in files_to_check { + let Some((mtime, size)) = file_stat(&path) else { continue }; + + // Skip unchanged files on full poll. On targeted refresh we always + // re-stat, but if the cached size+mtime match, we can still skip. + let cached_offset = { + let files = state.files.read(); + files.get(&path).cloned_offset() + }; + + if let Some((cached_mtime, cached_size, cached_offset)) = cached_offset { + if cached_mtime == mtime && cached_size == size { + continue; + } + // Truncation: reparse from start. + let start = if size < cached_offset { 0 } else { cached_offset }; + let (events, new_offset) = match parse_jsonl_from(&path, start) { + Ok(v) => v, + Err(e) => { + tracing::debug!("parse_jsonl_from {}: {e}", path.display()); + continue; + } + }; + let deduped = dedupe(&state, events); + update_cache(state, &path, mtime, size, new_offset, &deduped); + new_events.extend(deduped); + } else { + // Brand new file. + let (events, new_offset) = match parse_jsonl_from(&path, 0) { + Ok(v) => v, + Err(e) => { + tracing::debug!("parse_jsonl_from {}: {e}", path.display()); + continue; + } + }; + let deduped = dedupe(&state, events); + update_cache(state, &path, mtime, size, new_offset, &deduped); + new_events.extend(deduped); + } + } + + // Rebuild snapshot from the full event set. + let mut all_events = state.collect_events(); + all_events.sort_by_key(|e| e.ts); + + let caps = state.settings.read().caps.clone(); + let snapshot = build_snapshot(&all_events, &caps, chrono::Utc::now()); + + if app.get_webview_window("main").is_some() { + app.emit("usage-updated", &snapshot) + .context("emit usage-updated")?; + } + // (If the window isn't up yet, the cache is already populated; the + // frontend pulls the initial snapshot via `get_snapshot` on mount.) + let _ = new_events; + Ok(()) +} + +fn dedupe(state: &SharedState, events: Vec) -> Vec { + let mut seen = state.seen_ids.write(); + events + .into_iter() + .filter(|e| seen.insert(e.dedupe_key.clone())) + .collect() +} + +fn update_cache( + state: &SharedState, + path: &Path, + mtime: i128, + size: u64, + new_offset: u64, + new_events: &[UsageEvent], +) { + let mut files = state.files.write(); + let entry = files.entry(path.to_path_buf()).or_insert(FileCache { + mtime_ns: mtime, + size, + byte_offset: new_offset, + messages: Vec::new(), + }); + entry.mtime_ns = mtime; + entry.size = size; + entry.byte_offset = new_offset; + entry.messages.extend(new_events.iter().cloned()); +} + +// Helper to extract (mtime, size, offset) from an Optional FileCache. +trait CachedOffset { + fn cloned_offset(self) -> Option<(i128, u64, u64)>; +} +impl CachedOffset for Option<&FileCache> { + fn cloned_offset(self) -> Option<(i128, u64, u64)> { + self.map(|f| (f.mtime_ns, f.size, f.byte_offset)) + } +}