//! Drive `claude /usage` via a pseudo-TTY and parse the rendered output //! into structured percentages. //! //! This is the only way to get the *real* subscription-side numbers (the //! same ones the user sees when typing `/usage` interactively). Anthropic //! doesn't expose `/usage` via `claude --print`, and the percentages aren't //! cached anywhere on disk between invocations. //! //! Output we parse (whitespace-collapsed, ANSI stripped): //! //! Current session //! ████████████▌ 67% used //! Resets 2:50am (Europe/London) //! //! Current week (all models) //! ████ 8% used //! Resets May 9, 12pm (Europe/London) //! //! Current week (Sonnet only) //! ██▌ 5% used //! Resets May 9, 12pm (Europe/London) //! //! The "Sonnet only" section is not always present (depends on plan tier //! and recent usage). We treat it as optional. use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use portable_pty::{native_pty_system, CommandBuilder, PtySize}; use serde::Serialize; use std::io::{Read, Write}; use std::time::{Duration, Instant}; #[derive(Debug, Clone, Serialize)] pub struct UsageBar { /// e.g. "Current session", "Current week (all models)" pub label: String, /// 0..=100 pub percent: u8, /// Free text describing reset, e.g. "2:50am (Europe/London)" or /// "May 9, 12pm (Europe/London)". Anthropic's renderer is the source /// of truth here; we don't try to re-format. pub resets_at_text: String, } #[derive(Debug, Clone, Serialize)] pub struct CliUsage { pub session: Option, pub week_all: Option, pub week_sonnet: Option, pub fetched_at: DateTime, /// True if at least one bar parsed. False on parse failure / timeout /// (UI can fall back to JSONL-derived numbers). pub ok: bool, /// Raw stripped text, kept for diagnostics in Settings. pub raw_text: String, } /// Spawn `claude`, send `/usage`, capture output, kill, parse. /// /// `command_override` lets the caller supply an alternate program (e.g. /// `wsl.exe` with args) for the Windows-host case. None means just `claude`. pub fn fetch_blocking(command_override: Option<&str>) -> Result { let raw = drive_claude_usage(command_override, Duration::from_secs(20))?; let stripped = strip_ansi_collapse(&raw); parse_usage_text(&stripped, raw) } /// Pick a sensible default command line for invoking `claude`. /// /// Order: /// 1. Native `claude` (Windows: `claude.exe` on PATH; Unix: `claude`). /// 2. On Windows: enumerate WSL distros via `wsl.exe -l -q` and probe /// each by running `bash -lc 'command -v claude'`. First hit wins. /// 3. Fallback: bare `claude` (will fail, but at least with a clear error). /// /// This is called fresh on every `/usage` fetch, but each probe is cheap /// (<200ms typical) and only runs when no override is set. fn default_command() -> CommandBuilder { if let Some(parts) = autodetect_command() { let mut c = CommandBuilder::new(&parts[0]); for a in &parts[1..] { c.arg(a); } return c; } CommandBuilder::new("claude") } /// Returns the auto-detected argv (program + args) for invoking claude, or /// None if nothing reachable was found. pub fn autodetect_command() -> Option> { // 1. Native claude. if which_exists("claude") { return Some(vec!["claude".to_string()]); } // 2. WSL distros (Windows only). if cfg!(windows) && which_exists("wsl.exe") { for distro in list_wsl_distros() { if probe_claude_in_wsl(&distro) { return Some(vec![ "wsl.exe".to_string(), "-d".to_string(), distro, "bash".to_string(), "-lc".to_string(), "claude".to_string(), ]); } } } None } fn which_exists(name: &str) -> bool { let probe = if cfg!(windows) { "where" } else { "which" }; crate::paths::quiet_command(probe) .arg(name) .output() .map(|o| o.status.success() && !o.stdout.is_empty()) .unwrap_or(false) } fn list_wsl_distros() -> Vec { let Ok(out) = crate::paths::quiet_command("wsl.exe").args(["-l", "-q"]).output() else { return Vec::new(); }; if !out.status.success() { return Vec::new(); } // wsl.exe outputs UTF-16LE. let raw_u16: Vec = out .stdout .chunks_exact(2) .map(|b| u16::from_le_bytes([b[0], b[1]])) .collect(); String::from_utf16_lossy(&raw_u16) .lines() .map(|l| l.trim_matches(|c: char| c == '\u{FEFF}' || c.is_whitespace()).to_string()) .filter(|l| !l.is_empty()) .collect() } fn probe_claude_in_wsl(distro: &str) -> bool { crate::paths::quiet_command("wsl.exe") .args(["-d", distro, "bash", "-lc", "command -v claude"]) .output() .map(|o| o.status.success() && !o.stdout.is_empty()) .unwrap_or(false) } /// Spawn the CLI in a PTY, send `/usage`, capture stdout for `total_timeout`, /// then send `/exit` and return raw bytes (still containing ANSI escapes). fn drive_claude_usage(command_override: Option<&str>, total_timeout: Duration) -> Result> { let pty_system = native_pty_system(); let pair = pty_system .openpty(PtySize { rows: 50, cols: 200, // wide so /usage doesn't wrap awkwardly pixel_width: 0, pixel_height: 0, }) .context("openpty")?; let mut cmd = match command_override { Some(s) if !s.trim().is_empty() => { // Allow simple "wsl.exe -d Ubuntu bash -lc claude" style strings. let parts: Vec<&str> = s.split_whitespace().collect(); let mut c = CommandBuilder::new(parts[0]); for arg in &parts[1..] { c.arg(arg); } c } _ => default_command(), }; // claude inspects $TERM; give it something reasonable. cmd.env("TERM", "xterm-256color"); let mut child = pair.slave.spawn_command(cmd).context("spawn claude")?; drop(pair.slave); // close our copy of the slave let mut writer = pair.master.take_writer().context("take_writer")?; let mut reader = pair.master.try_clone_reader().context("clone_reader")?; // Read in a background thread; communicate via a channel. let (tx, rx) = std::sync::mpsc::channel::>(); let reader_handle = std::thread::spawn(move || { let mut buf = [0u8; 8192]; loop { match reader.read(&mut buf) { Ok(0) | Err(_) => break, Ok(n) => { if tx.send(buf[..n].to_vec()).is_err() { break; } } } } }); let mut output: Vec = Vec::with_capacity(64 * 1024); // 1. Wait ~3.5s for the TUI to initialize (welcome banner, prompt, etc). drain(&rx, &mut output, Duration::from_millis(3500)); // 2. Send /usage. writer.write_all(b"/usage\r").context("write /usage")?; writer.flush().ok(); // 3. Drain output until quiet period or total deadline. let deadline = Instant::now() + total_timeout; let mut last_growth = Instant::now(); let prev_len = output.len(); let _ = prev_len; loop { let pre = output.len(); drain(&rx, &mut output, Duration::from_millis(700)); if output.len() > pre { last_growth = Instant::now(); } // Quiet for >1.2s → assume render finished. if last_growth.elapsed() > Duration::from_millis(1200) { break; } if Instant::now() > deadline { break; } } // 4. Best-effort exit. If it doesn't shut down, we'll kill the child. let _ = writer.write_all(b"/exit\r"); let _ = writer.flush(); drain(&rx, &mut output, Duration::from_millis(500)); drop(writer); let _ = child.kill(); let _ = child.wait(); drop(pair.master); let _ = reader_handle.join(); Ok(output) } fn drain( rx: &std::sync::mpsc::Receiver>, out: &mut Vec, timeout: Duration, ) { let deadline = Instant::now() + timeout; while let Some(remaining) = deadline.checked_duration_since(Instant::now()) { match rx.recv_timeout(remaining) { Ok(chunk) => out.extend_from_slice(&chunk), Err(_) => break, } } } /// Strip CSI / OSC / DCS escape sequences and collapse runs of identical /// re-rendered lines (TUIs paint the same content many times during scan). pub fn strip_ansi_collapse(raw: &[u8]) -> String { // CSI: ESC [ ... // OSC: ESC ] ... BEL // DCS / SOS / PM / APC: ESC P / X / ^ / _ ... ESC \ static CSI_RE: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| { regex::bytes::Regex::new( r"\x1b\[[0-9;?]*[ -/]*[@-~]|\x1b\][^\x07]*\x07|\x1b[PX^_].*?\x1b\\", ) .unwrap() }); let cleaned = CSI_RE.replace_all(raw, &b""[..]); // Drop stray BEL / lone ESC. let cleaned: Vec = cleaned .iter() .copied() .filter(|&b| b != 0x07 && b != 0x1b) .collect(); let text = String::from_utf8_lossy(&cleaned).to_string(); // Collapse repeated lines (TUI redraws same line hundreds of times). let mut out: Vec<&str> = Vec::new(); for line in text.lines() { let trimmed = line.trim_end(); if out.last().map(|p| *p == trimmed).unwrap_or(false) { continue; } out.push(trimmed); } // Keep blank lines for parser hints, but collapse runs of >1 blank. let mut compressed: Vec<&str> = Vec::with_capacity(out.len()); let mut prev_blank = false; for line in out { let blank = line.is_empty(); if blank && prev_blank { continue; } compressed.push(line); prev_blank = blank; } compressed.join("\n") } /// Parse the cleaned text into a [`CliUsage`]. /// /// Strategy: scan for the headings ("Current session", "Current week (all models)", /// "Current week (Sonnet only)") then look for `NN% used` and `Resets …` on the /// next few lines after each heading. pub fn parse_usage_text(stripped: &str, raw_for_diag: Vec) -> Result { let lines: Vec<&str> = stripped.lines().collect(); let find_section = |label: &str| -> Option { let idx = lines.iter().position(|l| { let t = l.trim(); // Some renderings squash spaces ("Currentsession"). Match both. t == label || t.replace(' ', "") == label.replace(' ', "") })?; // Look at the next 6 lines for "X% used" and "Resets ...". let mut percent: Option = None; let mut resets: Option = None; for line in &lines[idx + 1..(idx + 7).min(lines.len())] { if percent.is_none() { if let Some(cap) = PCT_RE.captures(line) { if let Ok(n) = cap[1].parse::() { percent = Some(n); } } } if resets.is_none() { if let Some(cap) = RESET_RE.captures(line) { resets = Some(cap[1].trim().to_string()); } } if percent.is_some() && resets.is_some() { break; } } Some(UsageBar { label: label.to_string(), percent: percent?, resets_at_text: resets.unwrap_or_default(), }) }; let session = find_section("Current session"); let week_all = find_section("Current week (all models)"); let week_sonnet = find_section("Current week (Sonnet only)"); let ok = session.is_some() || week_all.is_some(); let _ = raw_for_diag; // accepted for API compat; we surface the cleaned form Ok(CliUsage { session, week_all, week_sonnet, fetched_at: Utc::now(), ok, raw_text: stripped.to_string(), }) } static PCT_RE: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| regex::Regex::new(r"(\d{1,3})\s*%\s*used").unwrap()); static RESET_RE: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| regex::Regex::new(r"Resets?\s+(.+)$").unwrap()); #[cfg(test)] mod tests { use super::*; const SAMPLE: &str = r"Settings Status Config Usage Stats Session Total cost: $0.0000 Total duration (API): 0s Total duration (wall): 4s Current session █████████████████████████████████▌ 67% used Resets 2:50am (Europe/London) Current week (all models) ████ 8% used Resets May 9, 12pm (Europe/London) Current week (Sonnet only) ██▌ 5% used Resets May 9, 12pm (Europe/London) What's contributing to your limits usage? "; #[test] fn parses_three_bars() { let u = parse_usage_text(SAMPLE, Vec::new()).unwrap(); assert!(u.ok); let s = u.session.expect("session"); assert_eq!(s.percent, 67); assert!(s.resets_at_text.contains("2:50am")); let w = u.week_all.expect("week_all"); assert_eq!(w.percent, 8); assert!(w.resets_at_text.contains("May 9")); let so = u.week_sonnet.expect("week_sonnet"); assert_eq!(so.percent, 5); } #[test] fn parser_tolerates_missing_sonnet_section() { let s = SAMPLE .split("Current week (Sonnet only)") .next() .unwrap(); let u = parse_usage_text(s, Vec::new()).unwrap(); assert!(u.session.is_some()); assert!(u.week_all.is_some()); assert!(u.week_sonnet.is_none()); } }