235 lines
7 KiB
Rust
235 lines
7 KiB
Rust
//! Stream-parse Claude Code JSONL transcripts and normalize them into
|
|
//! [`UsageEvent`]s the rest of the app can sum.
|
|
//!
|
|
//! Field shape (verified against actual transcripts on 2026-05-08):
|
|
//!
|
|
//! { type:"assistant", timestamp:ISO8601, sessionId, requestId, uuid,
|
|
//! message: { model: "claude-opus-4-7",
|
|
//! usage: { input_tokens, output_tokens,
|
|
//! cache_creation_input_tokens,
|
|
//! cache_read_input_tokens } } }
|
|
|
|
use chrono::{DateTime, Utc};
|
|
use serde::{Deserialize, Serialize};
|
|
use std::fs::File;
|
|
use std::io::{BufRead, BufReader, Seek, SeekFrom};
|
|
use std::path::{Path, PathBuf};
|
|
use walkdir::WalkDir;
|
|
|
|
/// Raw shape we care about. Everything else is ignored — the format gains
|
|
/// fields between Claude Code releases and we don't want to break on those.
|
|
#[derive(Debug, Deserialize)]
|
|
struct RawLine {
|
|
#[serde(rename = "type")]
|
|
kind: Option<String>,
|
|
timestamp: Option<DateTime<Utc>>,
|
|
#[serde(rename = "sessionId")]
|
|
_session_id: Option<String>,
|
|
#[serde(rename = "requestId")]
|
|
request_id: Option<String>,
|
|
uuid: Option<String>,
|
|
message: Option<RawMessage>,
|
|
}
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
struct RawMessage {
|
|
model: Option<String>,
|
|
usage: Option<RawUsage>,
|
|
}
|
|
|
|
#[derive(Debug, Deserialize, Default)]
|
|
struct RawUsage {
|
|
#[serde(default)]
|
|
input_tokens: u64,
|
|
#[serde(default)]
|
|
output_tokens: u64,
|
|
#[serde(default)]
|
|
cache_creation_input_tokens: u64,
|
|
#[serde(default)]
|
|
cache_read_input_tokens: u64,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
|
|
#[serde(rename_all = "lowercase")]
|
|
pub enum ModelFamily {
|
|
Opus,
|
|
Sonnet,
|
|
Haiku,
|
|
Other,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize)]
|
|
pub struct UsageEvent {
|
|
pub ts: DateTime<Utc>,
|
|
pub model_raw: String,
|
|
pub model_label: String,
|
|
pub model_family: ModelFamily,
|
|
pub input: u64,
|
|
pub output: u64,
|
|
pub cache_create: u64,
|
|
pub cache_read: u64,
|
|
pub total: u64,
|
|
pub dedupe_key: String,
|
|
pub source_file: PathBuf,
|
|
}
|
|
|
|
/// "claude-opus-4-7" → ("Opus 4.7", Opus)
|
|
/// "claude-sonnet-4-6" → ("Sonnet 4.6", Sonnet)
|
|
/// "claude-haiku-4-5-20251001" → ("Haiku 4.5", Haiku)
|
|
pub fn normalize_model(raw: &str) -> (String, ModelFamily) {
|
|
let lower = raw.to_lowercase();
|
|
let family = if lower.contains("opus") {
|
|
ModelFamily::Opus
|
|
} else if lower.contains("sonnet") {
|
|
ModelFamily::Sonnet
|
|
} else if lower.contains("haiku") {
|
|
ModelFamily::Haiku
|
|
} else {
|
|
ModelFamily::Other
|
|
};
|
|
|
|
// Pull the first major.minor segment (e.g. "4-7" → "4.7"). If we can't
|
|
// find one, just title-case the family name.
|
|
let mut version: Option<String> = None;
|
|
let parts: Vec<&str> = lower.split('-').collect();
|
|
for i in 0..parts.len().saturating_sub(1) {
|
|
if parts[i].parse::<u32>().is_ok() && parts[i + 1].parse::<u32>().is_ok() {
|
|
version = Some(format!("{}.{}", parts[i], parts[i + 1]));
|
|
break;
|
|
}
|
|
}
|
|
|
|
let family_str = match family {
|
|
ModelFamily::Opus => "Opus",
|
|
ModelFamily::Sonnet => "Sonnet",
|
|
ModelFamily::Haiku => "Haiku",
|
|
ModelFamily::Other => "Model",
|
|
};
|
|
|
|
let label = match version {
|
|
Some(v) => format!("{family_str} {v}"),
|
|
None => family_str.to_string(),
|
|
};
|
|
|
|
(label, family)
|
|
}
|
|
|
|
/// Walk a root and return every `*.jsonl` (including nested `subagents/`).
|
|
pub fn enumerate_jsonl(root: &Path) -> Vec<PathBuf> {
|
|
WalkDir::new(root)
|
|
.follow_links(false)
|
|
.into_iter()
|
|
.filter_map(|e| e.ok())
|
|
.filter(|e| e.file_type().is_file())
|
|
.filter(|e| {
|
|
e.path()
|
|
.extension()
|
|
.and_then(|s| s.to_str())
|
|
.map(|s| s.eq_ignore_ascii_case("jsonl"))
|
|
.unwrap_or(false)
|
|
})
|
|
.map(|e| e.into_path())
|
|
.collect()
|
|
}
|
|
|
|
/// Tail-read from `start_offset`. Returns `(new_events, new_offset)`.
|
|
///
|
|
/// Only advances past fully-terminated lines — a half-flushed last line is
|
|
/// reparsed on the next call once it gets its trailing newline.
|
|
///
|
|
/// Caller is responsible for deduping returned events against a global
|
|
/// `seen_ids` set (subagent transcripts overlap with their parent).
|
|
pub fn parse_jsonl_from(
|
|
path: &Path,
|
|
start_offset: u64,
|
|
) -> anyhow::Result<(Vec<UsageEvent>, u64)> {
|
|
let mut f = File::open(path)?;
|
|
let len = f.metadata()?.len();
|
|
|
|
// File rotated / truncated: start over.
|
|
let start = if start_offset > len { 0 } else { start_offset };
|
|
f.seek(SeekFrom::Start(start))?;
|
|
|
|
let mut reader = BufReader::new(f);
|
|
let mut events: Vec<UsageEvent> = Vec::new();
|
|
let mut offset = start;
|
|
|
|
loop {
|
|
let mut buf: Vec<u8> = Vec::new();
|
|
let n = reader.read_until(b'\n', &mut buf)?;
|
|
if n == 0 {
|
|
break;
|
|
}
|
|
// No newline → partial flush, leave it for next call.
|
|
if !buf.ends_with(b"\n") {
|
|
break;
|
|
}
|
|
offset += n as u64;
|
|
|
|
let line = std::str::from_utf8(&buf).map(str::trim).unwrap_or("");
|
|
if line.is_empty() {
|
|
continue;
|
|
}
|
|
|
|
let parsed: RawLine = match serde_json::from_str(line) {
|
|
Ok(v) => v,
|
|
Err(_) => continue, // Skip malformed lines silently — the file is
|
|
// continually appended; partial writes happen.
|
|
};
|
|
|
|
if parsed.kind.as_deref() != Some("assistant") {
|
|
continue;
|
|
}
|
|
|
|
let (Some(ts), Some(msg)) = (parsed.timestamp, parsed.message) else {
|
|
continue;
|
|
};
|
|
let Some(usage) = msg.usage else { continue };
|
|
|
|
let model_raw = msg.model.unwrap_or_else(|| "unknown".to_string());
|
|
let (model_label, model_family) = normalize_model(&model_raw);
|
|
|
|
let total = usage.input_tokens
|
|
+ usage.output_tokens
|
|
+ usage.cache_creation_input_tokens
|
|
+ usage.cache_read_input_tokens;
|
|
if total == 0 {
|
|
continue;
|
|
}
|
|
|
|
let dedupe_key = parsed
|
|
.request_id
|
|
.or(parsed.uuid)
|
|
.unwrap_or_else(|| format!("{}:{}", path.display(), offset));
|
|
|
|
events.push(UsageEvent {
|
|
ts,
|
|
model_raw,
|
|
model_label,
|
|
model_family,
|
|
input: usage.input_tokens,
|
|
output: usage.output_tokens,
|
|
cache_create: usage.cache_creation_input_tokens,
|
|
cache_read: usage.cache_read_input_tokens,
|
|
total,
|
|
dedupe_key,
|
|
source_file: path.to_path_buf(),
|
|
});
|
|
}
|
|
|
|
Ok((events, offset))
|
|
}
|
|
|
|
/// Read the file's mtime (in nanos) and size — used to skip unchanged files
|
|
/// during the 60s poll fallback.
|
|
pub fn stat(path: &Path) -> Option<(i128, u64)> {
|
|
let m = std::fs::metadata(path).ok()?;
|
|
let mtime = m
|
|
.modified()
|
|
.ok()
|
|
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
|
|
.map(|d| d.as_nanos() as i128)
|
|
.unwrap_or(0);
|
|
Some((mtime, m.len()))
|
|
}
|
|
|