Add Rust modules: paths, jsonl, usage, watch, commands, state, settings

This commit is contained in:
megaproxy 2026-05-09 00:05:12 +01:00
parent 8abb0599f6
commit 14ffcf4bd3
9 changed files with 1226 additions and 0 deletions

72
src-tauri/src/commands.rs Normal file
View file

@ -0,0 +1,72 @@
//! Tauri command surface — every JS-callable function lives here.
use crate::paths::{list_wsl_distros, resolve_roots, ResolvedRoots};
use crate::settings::{save as save_settings, Settings};
use crate::state::SharedState;
use crate::usage::{build_snapshot, UsageSnapshot};
use crate::watch::refresh_and_emit;
#[tauri::command]
pub async fn get_snapshot(state: tauri::State<'_, SharedState>) -> Result<UsageSnapshot, String> {
let mut events = state.collect_events();
events.sort_by_key(|e| e.ts);
let caps = state.settings.read().caps.clone();
Ok(build_snapshot(&events, &caps, chrono::Utc::now()))
}
#[tauri::command]
pub async fn get_settings(state: tauri::State<'_, SharedState>) -> Result<Settings, String> {
Ok(state.settings.read().clone())
}
#[tauri::command]
pub async fn set_settings(
state: tauri::State<'_, SharedState>,
app: tauri::AppHandle,
new: Settings,
) -> Result<(), String> {
{
*state.settings.write() = new.clone();
}
save_settings(&new).map_err(|e| e.to_string())?;
// If roots-related settings changed, force a re-resolve + rescan.
{
let mut roots = state.roots.write();
roots.clear();
}
refresh_and_emit(&app, state.inner(), None)
.await
.map_err(|e| e.to_string())?;
Ok(())
}
#[tauri::command]
pub async fn list_distros() -> Result<Vec<String>, String> {
list_wsl_distros().map_err(|e| e.to_string())
}
#[tauri::command]
pub async fn get_roots(state: tauri::State<'_, SharedState>) -> Result<ResolvedRoots, String> {
let s = state.settings.read().clone();
Ok(resolve_roots(s.wsl_distro_override.as_deref(), s.include_native))
}
#[tauri::command]
pub async fn force_rescan(
state: tauri::State<'_, SharedState>,
app: tauri::AppHandle,
) -> Result<(), String> {
// Wipe caches so every file is reparsed from offset 0.
state.files.write().clear();
state.seen_ids.write().clear();
refresh_and_emit(&app, state.inner(), None)
.await
.map_err(|e| e.to_string())
}
#[tauri::command]
pub async fn quit_app(app: tauri::AppHandle) -> Result<(), String> {
app.exit(0);
Ok(())
}

235
src-tauri/src/jsonl.rs Normal file
View file

@ -0,0 +1,235 @@
//! Stream-parse Claude Code JSONL transcripts and normalize them into
//! [`UsageEvent`]s the rest of the app can sum.
//!
//! Field shape (verified against actual transcripts on 2026-05-08):
//!
//! { type:"assistant", timestamp:ISO8601, sessionId, requestId, uuid,
//! message: { model: "claude-opus-4-7",
//! usage: { input_tokens, output_tokens,
//! cache_creation_input_tokens,
//! cache_read_input_tokens } } }
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use walkdir::WalkDir;
/// Raw shape we care about. Everything else is ignored — the format gains
/// fields between Claude Code releases and we don't want to break on those.
#[derive(Debug, Deserialize)]
struct RawLine {
#[serde(rename = "type")]
kind: Option<String>,
timestamp: Option<DateTime<Utc>>,
#[serde(rename = "sessionId")]
_session_id: Option<String>,
#[serde(rename = "requestId")]
request_id: Option<String>,
uuid: Option<String>,
message: Option<RawMessage>,
}
#[derive(Debug, Deserialize)]
struct RawMessage {
model: Option<String>,
usage: Option<RawUsage>,
}
#[derive(Debug, Deserialize, Default)]
struct RawUsage {
#[serde(default)]
input_tokens: u64,
#[serde(default)]
output_tokens: u64,
#[serde(default)]
cache_creation_input_tokens: u64,
#[serde(default)]
cache_read_input_tokens: u64,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "lowercase")]
pub enum ModelFamily {
Opus,
Sonnet,
Haiku,
Other,
}
#[derive(Debug, Clone, Serialize)]
pub struct UsageEvent {
pub ts: DateTime<Utc>,
pub model_raw: String,
pub model_label: String,
pub model_family: ModelFamily,
pub input: u64,
pub output: u64,
pub cache_create: u64,
pub cache_read: u64,
pub total: u64,
pub dedupe_key: String,
pub source_file: PathBuf,
}
/// "claude-opus-4-7" → ("Opus 4.7", Opus)
/// "claude-sonnet-4-6" → ("Sonnet 4.6", Sonnet)
/// "claude-haiku-4-5-20251001" → ("Haiku 4.5", Haiku)
pub fn normalize_model(raw: &str) -> (String, ModelFamily) {
let lower = raw.to_lowercase();
let family = if lower.contains("opus") {
ModelFamily::Opus
} else if lower.contains("sonnet") {
ModelFamily::Sonnet
} else if lower.contains("haiku") {
ModelFamily::Haiku
} else {
ModelFamily::Other
};
// Pull the first major.minor segment (e.g. "4-7" → "4.7"). If we can't
// find one, just title-case the family name.
let mut version: Option<String> = None;
let parts: Vec<&str> = lower.split('-').collect();
for i in 0..parts.len().saturating_sub(1) {
if parts[i].parse::<u32>().is_ok() && parts[i + 1].parse::<u32>().is_ok() {
version = Some(format!("{}.{}", parts[i], parts[i + 1]));
break;
}
}
let family_str = match family {
ModelFamily::Opus => "Opus",
ModelFamily::Sonnet => "Sonnet",
ModelFamily::Haiku => "Haiku",
ModelFamily::Other => "Model",
};
let label = match version {
Some(v) => format!("{family_str} {v}"),
None => family_str.to_string(),
};
(label, family)
}
/// Walk a root and return every `*.jsonl` (including nested `subagents/`).
pub fn enumerate_jsonl(root: &Path) -> Vec<PathBuf> {
WalkDir::new(root)
.follow_links(false)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
.filter(|e| {
e.path()
.extension()
.and_then(|s| s.to_str())
.map(|s| s.eq_ignore_ascii_case("jsonl"))
.unwrap_or(false)
})
.map(|e| e.into_path())
.collect()
}
/// Tail-read from `start_offset`. Returns `(new_events, new_offset)`.
///
/// Only advances past fully-terminated lines — a half-flushed last line is
/// reparsed on the next call once it gets its trailing newline.
///
/// Caller is responsible for deduping returned events against a global
/// `seen_ids` set (subagent transcripts overlap with their parent).
pub fn parse_jsonl_from(
path: &Path,
start_offset: u64,
) -> anyhow::Result<(Vec<UsageEvent>, u64)> {
let mut f = File::open(path)?;
let len = f.metadata()?.len();
// File rotated / truncated: start over.
let start = if start_offset > len { 0 } else { start_offset };
f.seek(SeekFrom::Start(start))?;
let mut reader = BufReader::new(f);
let mut events: Vec<UsageEvent> = Vec::new();
let mut offset = start;
loop {
let mut buf: Vec<u8> = Vec::new();
let n = reader.read_until(b'\n', &mut buf)?;
if n == 0 {
break;
}
// No newline → partial flush, leave it for next call.
if !buf.ends_with(b"\n") {
break;
}
offset += n as u64;
let line = std::str::from_utf8(&buf).map(str::trim).unwrap_or("");
if line.is_empty() {
continue;
}
let parsed: RawLine = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue, // Skip malformed lines silently — the file is
// continually appended; partial writes happen.
};
if parsed.kind.as_deref() != Some("assistant") {
continue;
}
let (Some(ts), Some(msg)) = (parsed.timestamp, parsed.message) else {
continue;
};
let Some(usage) = msg.usage else { continue };
let model_raw = msg.model.unwrap_or_else(|| "unknown".to_string());
let (model_label, model_family) = normalize_model(&model_raw);
let total = usage.input_tokens
+ usage.output_tokens
+ usage.cache_creation_input_tokens
+ usage.cache_read_input_tokens;
if total == 0 {
continue;
}
let dedupe_key = parsed
.request_id
.or(parsed.uuid)
.unwrap_or_else(|| format!("{}:{}", path.display(), offset));
events.push(UsageEvent {
ts,
model_raw,
model_label,
model_family,
input: usage.input_tokens,
output: usage.output_tokens,
cache_create: usage.cache_creation_input_tokens,
cache_read: usage.cache_read_input_tokens,
total,
dedupe_key,
source_file: path.to_path_buf(),
});
}
Ok((events, offset))
}
/// Read the file's mtime (in nanos) and size — used to skip unchanged files
/// during the 60s poll fallback.
pub fn stat(path: &Path) -> Option<(i128, u64)> {
let m = std::fs::metadata(path).ok()?;
let mtime = m
.modified()
.ok()
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_nanos() as i128)
.unwrap_or(0);
Some((mtime, m.len()))
}

72
src-tauri/src/lib.rs Normal file
View file

@ -0,0 +1,72 @@
//! Library entry point. `main.rs` calls `run()`.
//!
//! Wires Tauri builder, plugins, app state, and the watcher.
mod commands;
mod jsonl;
mod paths;
mod settings;
mod state;
mod usage;
mod watch;
use tauri::Manager;
pub fn run() {
// Logs go to stderr; in release on Windows there's no console attached,
// which is fine — RUST_LOG=debug from a console launch will surface them.
let _ = tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.with_writer(std::io::stderr)
.try_init();
tauri::Builder::default()
.plugin(tauri_plugin_autostart::init(
// MacosLauncher is ignored on Windows but the constructor requires it.
tauri_plugin_autostart::MacosLauncher::LaunchAgent,
Some(vec![]),
))
.plugin(tauri_plugin_store::Builder::new().build())
.plugin(tauri_plugin_dialog::init())
.setup(|app| {
let settings = settings::load();
let shared = state::AppState::new(settings);
app.manage(shared.clone());
let handle = app.handle().clone();
let state_for_task = shared.clone();
tauri::async_runtime::spawn(async move {
// Initial scan + emit.
if let Err(e) =
watch::refresh_and_emit(&handle, &state_for_task, None).await
{
tracing::warn!("initial refresh failed: {e:#}");
}
// Start the watcher; keep it alive for the lifetime of the app.
match watch::start(handle.clone(), state_for_task.clone()) {
Ok(w) => {
// Move into a long-lived store on the AppState so it
// isn't dropped (which would unregister the watcher).
state_for_task.set_watcher(w);
}
Err(e) => tracing::warn!("watcher start failed: {e:#}"),
}
});
Ok(())
})
.invoke_handler(tauri::generate_handler![
commands::get_snapshot,
commands::get_settings,
commands::set_settings,
commands::list_distros,
commands::get_roots,
commands::force_rescan,
commands::quit_app,
])
.run(tauri::generate_context!())
.expect("error while running tauri application");
}

6
src-tauri/src/main.rs Normal file
View file

@ -0,0 +1,6 @@
// Hide the console window in release builds. In debug we keep it for tracing logs.
#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]
fn main() {
claude_usage_widget_lib::run();
}

155
src-tauri/src/paths.rs Normal file
View file

@ -0,0 +1,155 @@
//! WSL distro detection and Claude Code projects-root resolution.
//!
//! Discovery order:
//! 1. settings.wsl_distro_override → \\wsl$\<distro>\home\<user>\.claude\projects
//! 2. settings.include_native (true) → %USERPROFILE%\.claude\projects
//! 3. autodetect via `wsl.exe -l -q` → first hit per distro
//!
//! All hits are unioned, canonicalized, and deduped.
use serde::Serialize;
use std::path::{Path, PathBuf};
#[derive(Debug, Clone, Serialize)]
pub struct ResolvedRoots {
pub roots: Vec<PathBuf>,
pub wsl_distro: Option<String>,
pub native_present: bool,
}
/// `wsl.exe -l -q` lists installed distros, one per line, in UTF-16LE.
/// On non-Windows platforms (or when wsl.exe isn't on PATH) returns Ok(empty).
pub fn list_wsl_distros() -> anyhow::Result<Vec<String>> {
if !cfg!(windows) {
return Ok(Vec::new());
}
let out = match std::process::Command::new("wsl.exe")
.args(["-l", "-q"])
.output()
{
Ok(o) => o,
Err(e) => {
tracing::debug!("wsl.exe not available: {e}");
return Ok(Vec::new());
}
};
if !out.status.success() {
return Ok(Vec::new());
}
// UTF-16LE with BOM. Decode lossily and strip nulls / BOM / CR.
let raw_u16: Vec<u16> = out
.stdout
.chunks_exact(2)
.map(|b| u16::from_le_bytes([b[0], b[1]]))
.collect();
let decoded = String::from_utf16_lossy(&raw_u16);
let distros: Vec<String> = decoded
.lines()
.map(|l| l.trim_matches(|c: char| c == '\u{FEFF}' || c.is_whitespace()).to_string())
.filter(|l| !l.is_empty())
.collect();
Ok(distros)
}
/// `\\wsl$\<distro>\home\<user>\.claude\projects` if it exists.
///
/// Tries `$USERNAME` first, then falls back to scanning `\\wsl$\<distro>\home\`
/// for the first subdirectory that contains a readable `.claude/projects/`.
pub fn wsl_projects_path(distro: &str) -> Option<PathBuf> {
let home_root = PathBuf::from(format!(r"\\wsl$\{}\home", distro));
if !dir_exists(&home_root) {
return None;
}
// 1. Try $USERNAME (which on Windows usually matches the WSL user).
if let Ok(name) = std::env::var("USERNAME") {
let candidate = home_root.join(&name).join(".claude").join("projects");
if dir_exists(&candidate) {
return Some(candidate);
}
}
// 2. Scan home dirs.
let entries = std::fs::read_dir(&home_root).ok()?;
for entry in entries.flatten() {
let p = entry.path().join(".claude").join("projects");
if dir_exists(&p) {
return Some(p);
}
}
None
}
/// `%USERPROFILE%\.claude\projects` if it exists.
pub fn native_projects_path() -> Option<PathBuf> {
let home = dirs::home_dir()?;
let p = home.join(".claude").join("projects");
dir_exists(&p).then_some(p)
}
pub fn dir_exists(p: &Path) -> bool {
std::fs::metadata(p).map(|m| m.is_dir()).unwrap_or(false)
}
/// Resolve all roots that should be scanned right now.
///
/// `override_distro` short-circuits the autodetect: if Some, only that distro
/// is consulted (in addition to the native path if `include_native`).
pub fn resolve_roots(
override_distro: Option<&str>,
include_native: bool,
) -> ResolvedRoots {
let mut roots: Vec<PathBuf> = Vec::new();
let mut chosen_distro: Option<String> = None;
let native = native_projects_path();
let native_present = native.is_some();
if include_native {
if let Some(p) = native.clone() {
roots.push(p);
}
}
match override_distro {
Some(d) => {
if let Some(p) = wsl_projects_path(d) {
roots.push(p);
chosen_distro = Some(d.to_string());
}
}
None => {
if let Ok(distros) = list_wsl_distros() {
for d in &distros {
if let Some(p) = wsl_projects_path(d) {
roots.push(p);
if chosen_distro.is_none() {
chosen_distro = Some(d.clone());
}
}
}
}
}
}
// Canonicalize + dedupe. canonicalize() can fail on UNC paths on some
// Windows builds; if so, fall back to the raw path so we don't drop hits.
let mut canon: Vec<PathBuf> = Vec::new();
for p in roots {
let c = std::fs::canonicalize(&p).unwrap_or(p);
if !canon.contains(&c) {
canon.push(c);
}
}
ResolvedRoots {
roots: canon,
wsl_distro: chosen_distro,
native_present,
}
}

87
src-tauri/src/settings.rs Normal file
View file

@ -0,0 +1,87 @@
//! On-disk settings: caps, distro override, autostart toggle, window pos.
//!
//! Stored as plain JSON at `%APPDATA%\claude-widget\config.json` on Windows,
//! `~/.config/claude-widget/config.json` elsewhere.
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Caps {
pub block_tokens: u64,
pub weekly_tokens: u64,
}
impl Default for Caps {
fn default() -> Self {
// Placeholder values — user tunes them in Settings once they have
// a few weeks of real data. There is no authoritative local source
// for the actual Anthropic-side caps.
Self {
block_tokens: 200_000,
weekly_tokens: 2_000_000,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct Settings {
pub caps: Caps,
/// Force a specific WSL distro instead of auto-detecting. None = auto.
pub wsl_distro_override: Option<String>,
/// Whether to also scan `%USERPROFILE%\.claude\projects\` (native install).
pub include_native: bool,
/// Last-saved window position so it survives a restart.
pub window_pos: Option<(i32, i32)>,
/// Mirror of the Tauri autostart plugin state for UI display.
pub autostart: bool,
}
impl Default for Settings {
fn default() -> Self {
Self {
caps: Caps::default(),
wsl_distro_override: None,
include_native: true,
window_pos: None,
autostart: false,
}
}
}
pub fn config_dir() -> PathBuf {
// `dirs::config_dir()` returns %APPDATA% on Windows and ~/.config on Linux.
let base = dirs::config_dir().unwrap_or_else(|| PathBuf::from("."));
base.join("claude-widget")
}
pub fn config_path() -> PathBuf {
config_dir().join("config.json")
}
pub fn load() -> Settings {
let path = config_path();
let Ok(bytes) = std::fs::read(&path) else {
return Settings::default();
};
match serde_json::from_slice::<Settings>(&bytes) {
Ok(s) => s,
Err(e) => {
tracing::warn!("settings file at {} unreadable ({}), using defaults", path.display(), e);
Settings::default()
}
}
}
pub fn save(s: &Settings) -> anyhow::Result<()> {
let dir = config_dir();
std::fs::create_dir_all(&dir)?;
let path = config_path();
let bytes = serde_json::to_vec_pretty(s)?;
// Atomic-ish write: tmp file + rename. Avoids corruption on crash.
let tmp = path.with_extension("json.tmp");
std::fs::write(&tmp, bytes)?;
std::fs::rename(&tmp, &path)?;
Ok(())
}

58
src-tauri/src/state.rs Normal file
View file

@ -0,0 +1,58 @@
//! Process-global mutable state.
use parking_lot::{Mutex, RwLock};
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;
use crate::jsonl::UsageEvent;
use crate::settings::Settings;
use crate::watch::WatcherHandle;
/// Per-file parse cache. We resume reading from `byte_offset` on the next
/// poll, so we never re-parse already-seen lines.
pub struct FileCache {
pub mtime_ns: i128,
pub size: u64,
pub byte_offset: u64,
pub messages: Vec<UsageEvent>,
}
pub struct AppState {
pub roots: RwLock<Vec<PathBuf>>,
pub files: RwLock<HashMap<PathBuf, FileCache>>,
pub seen_ids: RwLock<HashSet<String>>,
pub settings: RwLock<Settings>,
/// Boxed so we can keep the watcher alive across the whole app lifetime
/// without polluting Tauri's setup hook.
pub watcher: Mutex<Option<WatcherHandle>>,
}
pub type SharedState = Arc<AppState>;
impl AppState {
pub fn new(settings: Settings) -> SharedState {
Arc::new(Self {
roots: RwLock::new(Vec::new()),
files: RwLock::new(HashMap::new()),
seen_ids: RwLock::new(HashSet::new()),
settings: RwLock::new(settings),
watcher: Mutex::new(None),
})
}
pub fn set_watcher(&self, w: WatcherHandle) {
*self.watcher.lock() = Some(w);
}
/// Snapshot all events across all cached files in one allocation.
/// Caller is responsible for sorting if needed.
pub fn collect_events(&self) -> Vec<UsageEvent> {
let files = self.files.read();
let mut out = Vec::with_capacity(files.values().map(|f| f.messages.len()).sum());
for f in files.values() {
out.extend(f.messages.iter().cloned());
}
out
}
}

308
src-tauri/src/usage.rs Normal file
View file

@ -0,0 +1,308 @@
//! Aggregation: events → 5-hour blocks + 7-day rolling window + per-model.
//!
//! The block algorithm is the same one ccusage uses:
//! - block_start = floor_to_hour(first event's ts)
//! - block_end = block_start + 5h
//! - new block when no previous, OR ts >= prev.block_end, OR
//! gap (ts - prev_event.ts) >= 5h
//!
//! Weekly is "past 7 days from now" — Anthropic's actual Max-plan reset day
//! is buggy and shifts, so a calendar anchor would lie.
use chrono::{DateTime, Datelike, Duration, TimeZone, Timelike, Utc};
use serde::Serialize;
use crate::jsonl::{ModelFamily, UsageEvent};
use crate::settings::Caps;
const BLOCK_HOURS: i64 = 5;
#[derive(Debug, Clone, Serialize)]
pub struct ModelBreakdown {
pub opus: u64,
pub sonnet: u64,
pub haiku: u64,
pub other: u64,
}
impl Default for ModelBreakdown {
fn default() -> Self {
Self { opus: 0, sonnet: 0, haiku: 0, other: 0 }
}
}
impl ModelBreakdown {
fn add(&mut self, family: ModelFamily, n: u64) {
match family {
ModelFamily::Opus => self.opus += n,
ModelFamily::Sonnet => self.sonnet += n,
ModelFamily::Haiku => self.haiku += n,
ModelFamily::Other => self.other += n,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct BlockSummary {
pub block_start: DateTime<Utc>,
pub block_end: DateTime<Utc>,
pub now: DateTime<Utc>,
pub seconds_remaining: i64,
pub total_tokens: u64,
pub by_family: ModelBreakdown,
pub message_count: u64,
}
#[derive(Debug, Clone, Serialize)]
pub struct DayBucket {
/// `YYYY-MM-DD` in the user's local timezone.
pub date_local: String,
pub total_tokens: u64,
}
#[derive(Debug, Clone, Serialize)]
pub struct WeeklySummary {
pub window_start: DateTime<Utc>,
pub window_end: DateTime<Utc>,
pub total_tokens: u64,
pub by_day: [DayBucket; 7],
pub by_family: ModelBreakdown,
}
#[derive(Debug, Clone, Serialize)]
pub struct UsageSnapshot {
pub block: Option<BlockSummary>,
pub weekly: WeeklySummary,
pub caps: Caps,
pub generated_at: DateTime<Utc>,
}
pub fn floor_to_hour(ts: DateTime<Utc>) -> DateTime<Utc> {
Utc.with_ymd_and_hms(ts.year(), ts.month(), ts.day(), ts.hour(), 0, 0)
.single()
.unwrap_or(ts)
}
/// Walk a chronologically-sorted slice of events and produce one block per
/// detected gap or 5h-end boundary. Events MUST be sorted by `ts` ascending.
pub fn compute_blocks(events: &[UsageEvent]) -> Vec<BlockSummary> {
let mut out: Vec<BlockSummary> = Vec::new();
if events.is_empty() {
return out;
}
let block_window = Duration::hours(BLOCK_HOURS);
let now = Utc::now();
// Working accumulator for the current block.
struct Acc {
start: DateTime<Utc>,
end: DateTime<Utc>,
total: u64,
breakdown: ModelBreakdown,
msgs: u64,
}
let close = |a: Acc, now: DateTime<Utc>| -> BlockSummary {
BlockSummary {
block_start: a.start,
block_end: a.end,
now,
seconds_remaining: (a.end - now).num_seconds(),
total_tokens: a.total,
by_family: a.breakdown,
message_count: a.msgs,
}
};
let mut acc: Option<Acc> = None;
let mut prev_ts: Option<DateTime<Utc>> = None;
for ev in events {
let starts_new_block = match (&acc, prev_ts) {
(None, _) => true,
(Some(a), Some(prev)) => ev.ts >= a.end || (ev.ts - prev) >= block_window,
(Some(a), None) => ev.ts >= a.end,
};
if starts_new_block {
if let Some(a) = acc.take() {
out.push(close(a, now));
}
let start = floor_to_hour(ev.ts);
acc = Some(Acc {
start,
end: start + block_window,
total: 0,
breakdown: ModelBreakdown::default(),
msgs: 0,
});
}
let a = acc.as_mut().expect("acc just initialized");
a.total += ev.total;
a.breakdown.add(ev.model_family, ev.total);
a.msgs += 1;
prev_ts = Some(ev.ts);
}
if let Some(a) = acc {
out.push(close(a, now));
}
out
}
/// Pick the block that contains `now`. Falls back to the most recent block
/// whose end is in the future. Returns `None` if there are no blocks.
pub fn active_block(blocks: &[BlockSummary], now: DateTime<Utc>) -> Option<BlockSummary> {
blocks
.iter()
.rev()
.find(|b| now >= b.block_start && now < b.block_end)
.or_else(|| blocks.iter().rev().find(|b| b.block_end > now))
.cloned()
}
/// Rolling 7-day window from `now`. `by_day[0]` is the oldest day,
/// `by_day[6]` is today. Day boundaries are in the user's local timezone.
pub fn compute_weekly(events: &[UsageEvent], now: DateTime<Utc>) -> WeeklySummary {
let window_start = now - Duration::days(7);
let mut total: u64 = 0;
let mut breakdown = ModelBreakdown::default();
let today_local = chrono::Local::now().date_naive();
let mut buckets: [DayBucket; 7] = std::array::from_fn(|i| {
let date = today_local - chrono::Duration::days(6 - i as i64);
DayBucket {
date_local: date.format("%Y-%m-%d").to_string(),
total_tokens: 0,
}
});
for ev in events {
if ev.ts < window_start {
continue;
}
total += ev.total;
breakdown.add(ev.model_family, ev.total);
let ev_local = ev.ts.with_timezone(&chrono::Local).date_naive();
let day_diff = (today_local - ev_local).num_days();
if (0..=6).contains(&day_diff) {
let idx = (6 - day_diff) as usize;
buckets[idx].total_tokens += ev.total;
}
}
WeeklySummary {
window_start,
window_end: now,
total_tokens: total,
by_day: buckets,
by_family: breakdown,
}
}
/// One-shot: events → snapshot for the frontend.
/// `events` must be sorted by `ts` ascending.
pub fn build_snapshot(
events: &[UsageEvent],
caps: &Caps,
now: DateTime<Utc>,
) -> UsageSnapshot {
let blocks = compute_blocks(events);
let block = active_block(&blocks, now);
let weekly = compute_weekly(events, now);
UsageSnapshot {
block,
weekly,
caps: caps.clone(),
generated_at: now,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::jsonl::{ModelFamily, UsageEvent};
use std::path::PathBuf;
fn ev(ts: &str, total: u64, family: ModelFamily) -> UsageEvent {
let ts: DateTime<Utc> = ts.parse().unwrap();
UsageEvent {
ts,
model_raw: "test".into(),
model_label: "Test".into(),
model_family: family,
input: 0,
output: 0,
cache_create: 0,
cache_read: 0,
total,
dedupe_key: format!("k-{}", ts.timestamp_nanos_opt().unwrap_or(0)),
source_file: PathBuf::new(),
}
}
#[test]
fn block_starts_at_hour_floor() {
let events = vec![ev("2026-05-08T22:37:21Z", 100, ModelFamily::Opus)];
let blocks = compute_blocks(&events);
assert_eq!(blocks.len(), 1);
assert_eq!(blocks[0].block_start.hour(), 22);
assert_eq!(blocks[0].block_start.minute(), 0);
assert_eq!((blocks[0].block_end - blocks[0].block_start).num_hours(), 5);
assert_eq!(blocks[0].total_tokens, 100);
}
#[test]
fn five_hour_gap_starts_new_block() {
let events = vec![
ev("2026-05-08T10:00:00Z", 100, ModelFamily::Opus),
ev("2026-05-08T15:30:00Z", 200, ModelFamily::Sonnet), // 5h30m gap
];
let blocks = compute_blocks(&events);
assert_eq!(blocks.len(), 2);
assert_eq!(blocks[0].total_tokens, 100);
assert_eq!(blocks[1].total_tokens, 200);
assert_eq!(blocks[1].by_family.sonnet, 200);
}
#[test]
fn within_block_aggregates() {
let events = vec![
ev("2026-05-08T10:00:00Z", 100, ModelFamily::Opus),
ev("2026-05-08T11:00:00Z", 200, ModelFamily::Opus),
ev("2026-05-08T12:30:00Z", 300, ModelFamily::Sonnet),
];
let blocks = compute_blocks(&events);
assert_eq!(blocks.len(), 1);
assert_eq!(blocks[0].total_tokens, 600);
assert_eq!(blocks[0].by_family.opus, 300);
assert_eq!(blocks[0].by_family.sonnet, 300);
assert_eq!(blocks[0].message_count, 3);
}
#[test]
fn block_end_starts_new_block() {
// Gap < 5h but the previous block_end was reached.
let events = vec![
ev("2026-05-08T10:00:00Z", 100, ModelFamily::Opus), // block_end = 15:00
ev("2026-05-08T15:30:00Z", 200, ModelFamily::Opus), // > 15:00 → new block
];
let blocks = compute_blocks(&events);
assert_eq!(blocks.len(), 2);
}
#[test]
fn weekly_buckets_today_oldest_first() {
// We can't pin "today" in tests deterministically, but we can at least
// assert the array shape and that a recent event lands in the last bucket.
let now = Utc::now();
let events = vec![ev(&now.to_rfc3339(), 500, ModelFamily::Opus)];
let w = compute_weekly(&events, now);
assert_eq!(w.by_day.len(), 7);
assert_eq!(w.total_tokens, 500);
assert_eq!(w.by_day[6].total_tokens, 500);
}
}

233
src-tauri/src/watch.rs Normal file
View file

@ -0,0 +1,233 @@
//! File watcher + 60s tokio poll fallback.
//!
//! `notify_debouncer_full` provides recursive watching with burst smoothing.
//! ReadDirectoryChangesW on the WSL 9P mount (`\\wsl$\...`) is unreliable, so
//! a 60s tokio interval also runs `refresh_and_emit` to backstop missed events.
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::Context;
use notify::{RecommendedWatcher, RecursiveMode};
use notify_debouncer_full::{new_debouncer, DebounceEventResult, FileIdMap, Debouncer};
use tauri::{AppHandle, Emitter, Manager};
use tokio::sync::mpsc;
use crate::jsonl::{enumerate_jsonl, parse_jsonl_from, stat as file_stat, UsageEvent};
use crate::paths::resolve_roots;
use crate::state::{AppState, FileCache, SharedState};
use crate::usage::build_snapshot;
/// Lives for the lifetime of the app (held by AppState). Drop unregisters.
pub struct WatcherHandle {
_debouncer: Debouncer<RecommendedWatcher, FileIdMap>,
_poll_task: tokio::task::JoinHandle<()>,
_consumer_task: tokio::task::JoinHandle<()>,
}
pub fn start(app: AppHandle, state: SharedState) -> anyhow::Result<WatcherHandle> {
// Channel from the debouncer thread → async consumer task.
let (tx, mut rx) = mpsc::unbounded_channel::<Vec<PathBuf>>();
let tx_for_handler = tx.clone();
let mut debouncer = new_debouncer(
Duration::from_millis(250),
None,
move |result: DebounceEventResult| {
let Ok(events) = result else { return };
let mut paths: Vec<PathBuf> = events
.into_iter()
.flat_map(|e| e.event.paths)
.filter(|p| {
p.extension()
.and_then(|s| s.to_str())
.map(|s| s.eq_ignore_ascii_case("jsonl"))
.unwrap_or(false)
})
.collect();
paths.sort();
paths.dedup();
if paths.is_empty() {
return;
}
let _ = tx_for_handler.send(paths);
},
)
.context("create debouncer")?;
// Watch every currently-resolved root.
let roots: Vec<PathBuf> = state.roots.read().clone();
for root in &roots {
if let Err(e) = debouncer
.watcher()
.watch(root, RecursiveMode::Recursive)
{
tracing::warn!("could not watch {}: {e}", root.display());
}
}
// Consumer task: reparses changed files and emits.
let app_for_consumer = app.clone();
let state_for_consumer = state.clone();
let consumer_task = tauri::async_runtime::spawn(async move {
while let Some(paths) = rx.recv().await {
if let Err(e) =
refresh_and_emit(&app_for_consumer, &state_for_consumer, Some(&paths))
.await
{
tracing::warn!("incremental refresh failed: {e:#}");
}
}
});
// 60s poll fallback.
let app_for_poll = app.clone();
let state_for_poll = state.clone();
let poll_task = tauri::async_runtime::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
// First tick fires immediately — skip it; `setup` already did a full scan.
interval.tick().await;
loop {
interval.tick().await;
if let Err(e) = refresh_and_emit(&app_for_poll, &state_for_poll, None).await {
tracing::warn!("poll refresh failed: {e:#}");
}
}
});
Ok(WatcherHandle {
_debouncer: debouncer,
_poll_task: poll_task,
_consumer_task: consumer_task,
})
}
/// Reparse the given files (or all roots if `changed` is None), update the
/// cache, rebuild a snapshot, and emit `usage-updated` to the frontend.
pub async fn refresh_and_emit(
app: &AppHandle,
state: &SharedState,
changed: Option<&[PathBuf]>,
) -> anyhow::Result<()> {
// Re-resolve roots if we don't have any yet (cold start) or on full poll.
if state.roots.read().is_empty() || changed.is_none() {
let settings = state.settings.read().clone();
let resolved = resolve_roots(
settings.wsl_distro_override.as_deref(),
settings.include_native,
);
let mut roots = state.roots.write();
*roots = resolved.roots;
}
let files_to_check: Vec<PathBuf> = match changed {
Some(paths) => paths.to_vec(),
None => {
let roots = state.roots.read().clone();
let mut all = Vec::new();
for r in &roots {
all.extend(enumerate_jsonl(r));
}
all
}
};
let mut new_events: Vec<UsageEvent> = Vec::new();
for path in files_to_check {
let Some((mtime, size)) = file_stat(&path) else { continue };
// Skip unchanged files on full poll. On targeted refresh we always
// re-stat, but if the cached size+mtime match, we can still skip.
let cached_offset = {
let files = state.files.read();
files.get(&path).cloned_offset()
};
if let Some((cached_mtime, cached_size, cached_offset)) = cached_offset {
if cached_mtime == mtime && cached_size == size {
continue;
}
// Truncation: reparse from start.
let start = if size < cached_offset { 0 } else { cached_offset };
let (events, new_offset) = match parse_jsonl_from(&path, start) {
Ok(v) => v,
Err(e) => {
tracing::debug!("parse_jsonl_from {}: {e}", path.display());
continue;
}
};
let deduped = dedupe(&state, events);
update_cache(state, &path, mtime, size, new_offset, &deduped);
new_events.extend(deduped);
} else {
// Brand new file.
let (events, new_offset) = match parse_jsonl_from(&path, 0) {
Ok(v) => v,
Err(e) => {
tracing::debug!("parse_jsonl_from {}: {e}", path.display());
continue;
}
};
let deduped = dedupe(&state, events);
update_cache(state, &path, mtime, size, new_offset, &deduped);
new_events.extend(deduped);
}
}
// Rebuild snapshot from the full event set.
let mut all_events = state.collect_events();
all_events.sort_by_key(|e| e.ts);
let caps = state.settings.read().caps.clone();
let snapshot = build_snapshot(&all_events, &caps, chrono::Utc::now());
if app.get_webview_window("main").is_some() {
app.emit("usage-updated", &snapshot)
.context("emit usage-updated")?;
}
// (If the window isn't up yet, the cache is already populated; the
// frontend pulls the initial snapshot via `get_snapshot` on mount.)
let _ = new_events;
Ok(())
}
fn dedupe(state: &SharedState, events: Vec<UsageEvent>) -> Vec<UsageEvent> {
let mut seen = state.seen_ids.write();
events
.into_iter()
.filter(|e| seen.insert(e.dedupe_key.clone()))
.collect()
}
fn update_cache(
state: &SharedState,
path: &Path,
mtime: i128,
size: u64,
new_offset: u64,
new_events: &[UsageEvent],
) {
let mut files = state.files.write();
let entry = files.entry(path.to_path_buf()).or_insert(FileCache {
mtime_ns: mtime,
size,
byte_offset: new_offset,
messages: Vec::new(),
});
entry.mtime_ns = mtime;
entry.size = size;
entry.byte_offset = new_offset;
entry.messages.extend(new_events.iter().cloned());
}
// Helper to extract (mtime, size, offset) from an Optional FileCache.
trait CachedOffset {
fn cloned_offset(self) -> Option<(i128, u64, u64)>;
}
impl CachedOffset for Option<&FileCache> {
fn cloned_offset(self) -> Option<(i128, u64, u64)> {
self.map(|f| (f.mtime_ns, f.size, f.byte_offset))
}
}