//! Stream-parse Claude Code JSONL transcripts and normalize them into //! [`UsageEvent`]s the rest of the app can sum. //! //! Field shape (verified against actual transcripts on 2026-05-08): //! //! { type:"assistant", timestamp:ISO8601, sessionId, requestId, uuid, //! message: { model: "claude-opus-4-7", //! usage: { input_tokens, output_tokens, //! cache_creation_input_tokens, //! cache_read_input_tokens } } } use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::fs::File; use std::io::{BufRead, BufReader, Seek, SeekFrom}; use std::path::{Path, PathBuf}; use walkdir::WalkDir; /// Raw shape we care about. Everything else is ignored — the format gains /// fields between Claude Code releases and we don't want to break on those. #[derive(Debug, Deserialize)] struct RawLine { #[serde(rename = "type")] kind: Option, timestamp: Option>, #[serde(rename = "sessionId")] _session_id: Option, #[serde(rename = "requestId")] request_id: Option, uuid: Option, message: Option, } #[derive(Debug, Deserialize)] struct RawMessage { model: Option, usage: Option, } #[derive(Debug, Deserialize, Default)] struct RawUsage { #[serde(default)] input_tokens: u64, #[serde(default)] output_tokens: u64, #[serde(default)] cache_creation_input_tokens: u64, #[serde(default)] cache_read_input_tokens: u64, } #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] #[serde(rename_all = "lowercase")] pub enum ModelFamily { Opus, Sonnet, Haiku, Other, } #[derive(Debug, Clone, Serialize)] pub struct UsageEvent { pub ts: DateTime, pub model_raw: String, pub model_label: String, pub model_family: ModelFamily, pub input: u64, pub output: u64, pub cache_create: u64, pub cache_read: u64, pub total: u64, pub dedupe_key: String, pub source_file: PathBuf, } /// "claude-opus-4-7" → ("Opus 4.7", Opus) /// "claude-sonnet-4-6" → ("Sonnet 4.6", Sonnet) /// "claude-haiku-4-5-20251001" → ("Haiku 4.5", Haiku) pub fn normalize_model(raw: &str) -> (String, ModelFamily) { let lower = raw.to_lowercase(); let family = if lower.contains("opus") { ModelFamily::Opus } else if lower.contains("sonnet") { ModelFamily::Sonnet } else if lower.contains("haiku") { ModelFamily::Haiku } else { ModelFamily::Other }; // Pull the first major.minor segment (e.g. "4-7" → "4.7"). If we can't // find one, just title-case the family name. let mut version: Option = None; let parts: Vec<&str> = lower.split('-').collect(); for i in 0..parts.len().saturating_sub(1) { if parts[i].parse::().is_ok() && parts[i + 1].parse::().is_ok() { version = Some(format!("{}.{}", parts[i], parts[i + 1])); break; } } let family_str = match family { ModelFamily::Opus => "Opus", ModelFamily::Sonnet => "Sonnet", ModelFamily::Haiku => "Haiku", ModelFamily::Other => "Model", }; let label = match version { Some(v) => format!("{family_str} {v}"), None => family_str.to_string(), }; (label, family) } /// Walk a root and return every `*.jsonl` (including nested `subagents/`). pub fn enumerate_jsonl(root: &Path) -> Vec { WalkDir::new(root) .follow_links(false) .into_iter() .filter_map(|e| e.ok()) .filter(|e| e.file_type().is_file()) .filter(|e| { e.path() .extension() .and_then(|s| s.to_str()) .map(|s| s.eq_ignore_ascii_case("jsonl")) .unwrap_or(false) }) .map(|e| e.into_path()) .collect() } /// Tail-read from `start_offset`. Returns `(new_events, new_offset)`. /// /// Only advances past fully-terminated lines — a half-flushed last line is /// reparsed on the next call once it gets its trailing newline. /// /// Caller is responsible for deduping returned events against a global /// `seen_ids` set (subagent transcripts overlap with their parent). pub fn parse_jsonl_from( path: &Path, start_offset: u64, ) -> anyhow::Result<(Vec, u64)> { let mut f = File::open(path)?; let len = f.metadata()?.len(); // File rotated / truncated: start over. let start = if start_offset > len { 0 } else { start_offset }; f.seek(SeekFrom::Start(start))?; let mut reader = BufReader::new(f); let mut events: Vec = Vec::new(); let mut offset = start; loop { let mut buf: Vec = Vec::new(); let n = reader.read_until(b'\n', &mut buf)?; if n == 0 { break; } // No newline → partial flush, leave it for next call. if !buf.ends_with(b"\n") { break; } offset += n as u64; let line = std::str::from_utf8(&buf).map(str::trim).unwrap_or(""); if line.is_empty() { continue; } let parsed: RawLine = match serde_json::from_str(line) { Ok(v) => v, Err(_) => continue, // Skip malformed lines silently — the file is // continually appended; partial writes happen. }; if parsed.kind.as_deref() != Some("assistant") { continue; } let (Some(ts), Some(msg)) = (parsed.timestamp, parsed.message) else { continue; }; let Some(usage) = msg.usage else { continue }; let model_raw = msg.model.unwrap_or_else(|| "unknown".to_string()); let (model_label, model_family) = normalize_model(&model_raw); let total = usage.input_tokens + usage.output_tokens + usage.cache_creation_input_tokens + usage.cache_read_input_tokens; if total == 0 { continue; } let dedupe_key = parsed .request_id .or(parsed.uuid) .unwrap_or_else(|| format!("{}:{}", path.display(), offset)); events.push(UsageEvent { ts, model_raw, model_label, model_family, input: usage.input_tokens, output: usage.output_tokens, cache_create: usage.cache_creation_input_tokens, cache_read: usage.cache_read_input_tokens, total, dedupe_key, source_file: path.to_path_buf(), }); } Ok((events, offset)) } /// Read the file's mtime (in nanos) and size — used to skip unchanged files /// during the 60s poll fallback. pub fn stat(path: &Path) -> Option<(i128, u64)> { let m = std::fs::metadata(path).ok()?; let mtime = m .modified() .ok() .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) .map(|d| d.as_nanos() as i128) .unwrap_or(0); Some((mtime, m.len())) }