Add MCP server (v1 read-only): toggle, per-pane gate, panel UI
This commit is contained in:
parent
6068522ee3
commit
83d8932c98
15 changed files with 1235 additions and 7 deletions
|
|
@ -22,6 +22,22 @@ tauri-plugin-opener = "2"
|
|||
keyring-core = "1"
|
||||
windows-native-keyring-store = "1"
|
||||
|
||||
# Embedded MCP server: lets a Claude session drive the workspace
|
||||
# (list panes, read scrollback, etc.). Streamable HTTP transport mounted
|
||||
# on an Axum router so we can add a bearer-auth middleware in front.
|
||||
rmcp = { version = "=1.7.0", features = [
|
||||
"server",
|
||||
"macros",
|
||||
"schemars",
|
||||
"transport-streamable-http-server",
|
||||
] }
|
||||
schemars = "1"
|
||||
axum = { version = "0.8", default-features = false, features = ["http1", "tokio"] }
|
||||
tower = "0.5"
|
||||
tokio-util = { version = "0.7", features = ["rt"] }
|
||||
rand = "0.9"
|
||||
hex = "0.4"
|
||||
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
|
||||
|
|
|
|||
|
|
@ -1,10 +1,14 @@
|
|||
//! Tauri command surface. Every JS-callable function lives here.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use base64::{engine::general_purpose::STANDARD as B64, Engine as _};
|
||||
use tauri::{AppHandle, Manager};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::creds;
|
||||
use crate::hosts::{self, SshHost, SshHostView};
|
||||
use crate::mcp::{self, McpMirror, McpServerHandle, McpState, RunningServer};
|
||||
use crate::pty::{list_wsl_distros, PaneId, PtyManager, SpawnSpec};
|
||||
|
||||
const WORKSPACE_FILE: &str = "workspace.json";
|
||||
|
|
@ -17,7 +21,7 @@ pub async fn list_distros() -> Result<Vec<String>, String> {
|
|||
#[tauri::command]
|
||||
pub async fn spawn_pane(
|
||||
app: AppHandle,
|
||||
manager: tauri::State<'_, PtyManager>,
|
||||
manager: tauri::State<'_, Arc<PtyManager>>,
|
||||
spec: SpawnSpec,
|
||||
cols: u16,
|
||||
rows: u16,
|
||||
|
|
@ -29,7 +33,7 @@ pub async fn spawn_pane(
|
|||
/// strings; the frontend encodes before sending).
|
||||
#[tauri::command]
|
||||
pub async fn write_to_pane(
|
||||
manager: tauri::State<'_, PtyManager>,
|
||||
manager: tauri::State<'_, Arc<PtyManager>>,
|
||||
id: PaneId,
|
||||
data_b64: String,
|
||||
) -> Result<(), String> {
|
||||
|
|
@ -41,7 +45,7 @@ pub async fn write_to_pane(
|
|||
|
||||
#[tauri::command]
|
||||
pub async fn resize_pane(
|
||||
manager: tauri::State<'_, PtyManager>,
|
||||
manager: tauri::State<'_, Arc<PtyManager>>,
|
||||
id: PaneId,
|
||||
cols: u16,
|
||||
rows: u16,
|
||||
|
|
@ -51,7 +55,7 @@ pub async fn resize_pane(
|
|||
|
||||
#[tauri::command]
|
||||
pub async fn kill_pane(
|
||||
manager: tauri::State<'_, PtyManager>,
|
||||
manager: tauri::State<'_, Arc<PtyManager>>,
|
||||
id: PaneId,
|
||||
) -> Result<(), String> {
|
||||
manager.kill(id).map_err(|e| e.to_string())
|
||||
|
|
@ -137,3 +141,80 @@ pub async fn delete_host_password(host_id: String) -> Result<(), String> {
|
|||
pub async fn has_host_password(host_id: String) -> Result<bool, String> {
|
||||
Ok(creds::has(&host_id))
|
||||
}
|
||||
|
||||
// ---- MCP server lifecycle --------------------------------------------------
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
pub struct McpStatus {
|
||||
pub running: bool,
|
||||
pub url: Option<String>,
|
||||
pub token: Option<String>,
|
||||
}
|
||||
|
||||
fn server_status(handle: &McpServerHandle) -> McpStatus {
|
||||
let g = handle.0.lock();
|
||||
match g.as_ref() {
|
||||
Some(srv) => McpStatus {
|
||||
running: true,
|
||||
url: Some(format!("http://{}/mcp", srv.addr)),
|
||||
token: Some(srv.token.clone()),
|
||||
},
|
||||
None => McpStatus {
|
||||
running: false,
|
||||
url: None,
|
||||
token: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
pub async fn mcp_start(
|
||||
ptys: tauri::State<'_, Arc<PtyManager>>,
|
||||
state: tauri::State<'_, Arc<RwLock<McpState>>>,
|
||||
handle: tauri::State<'_, McpServerHandle>,
|
||||
) -> Result<McpStatus, String> {
|
||||
{
|
||||
let g = handle.0.lock();
|
||||
if g.is_some() {
|
||||
return Ok(server_status(&handle));
|
||||
}
|
||||
}
|
||||
let ptys_arc: Arc<PtyManager> = (*ptys).clone();
|
||||
let state_arc: Arc<RwLock<McpState>> = (*state).clone();
|
||||
let running: RunningServer = mcp::start_server(ptys_arc, state_arc)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
{
|
||||
let mut g = handle.0.lock();
|
||||
*g = Some(running);
|
||||
}
|
||||
Ok(server_status(&handle))
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
pub async fn mcp_stop(
|
||||
handle: tauri::State<'_, McpServerHandle>,
|
||||
) -> Result<McpStatus, String> {
|
||||
mcp::stop_server(&handle);
|
||||
Ok(server_status(&handle))
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
pub async fn mcp_status(
|
||||
handle: tauri::State<'_, McpServerHandle>,
|
||||
) -> Result<McpStatus, String> {
|
||||
Ok(server_status(&handle))
|
||||
}
|
||||
|
||||
/// Frontend pushes the gated mirror after every tree/host change. Backend
|
||||
/// caches it for MCP responses — the MCP server only ever sees what the
|
||||
/// frontend chose to mirror (default-deny per-leaf gate).
|
||||
#[tauri::command]
|
||||
pub async fn mcp_update_state(
|
||||
state: tauri::State<'_, Arc<RwLock<McpState>>>,
|
||||
mirror: McpMirror,
|
||||
) -> Result<(), String> {
|
||||
let mut g = state.write().await;
|
||||
g.mirror = mirror;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,8 +3,12 @@
|
|||
mod commands;
|
||||
mod creds;
|
||||
mod hosts;
|
||||
mod mcp;
|
||||
mod pty;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::mcp::{McpServerHandle, McpState};
|
||||
use crate::pty::PtyManager;
|
||||
|
||||
pub fn run() {
|
||||
|
|
@ -26,10 +30,19 @@ pub fn run() {
|
|||
Err(e) => tracing::warn!("keyring store init failed: {e}"),
|
||||
}
|
||||
|
||||
// PtyManager and McpState are shared with the MCP server, so register
|
||||
// them as Arc<T> rather than the plain T. Tauri commands access them
|
||||
// via `tauri::State<'_, Arc<T>>` and deref / clone as needed.
|
||||
let ptys: Arc<PtyManager> = Arc::new(PtyManager::new());
|
||||
let mcp_state: Arc<tokio::sync::RwLock<McpState>> =
|
||||
Arc::new(tokio::sync::RwLock::new(McpState::default()));
|
||||
|
||||
tauri::Builder::default()
|
||||
.plugin(tauri_plugin_clipboard_manager::init())
|
||||
.plugin(tauri_plugin_opener::init())
|
||||
.manage(PtyManager::new())
|
||||
.manage(ptys)
|
||||
.manage(mcp_state)
|
||||
.manage(McpServerHandle::default())
|
||||
.invoke_handler(tauri::generate_handler![
|
||||
commands::list_distros,
|
||||
commands::spawn_pane,
|
||||
|
|
@ -43,6 +56,10 @@ pub fn run() {
|
|||
commands::set_host_password,
|
||||
commands::delete_host_password,
|
||||
commands::has_host_password,
|
||||
commands::mcp_start,
|
||||
commands::mcp_stop,
|
||||
commands::mcp_status,
|
||||
commands::mcp_update_state,
|
||||
])
|
||||
.run(tauri::generate_context!())
|
||||
.expect("error while running tauri application");
|
||||
|
|
|
|||
464
src-tauri/src/mcp.rs
Normal file
464
src-tauri/src/mcp.rs
Normal file
|
|
@ -0,0 +1,464 @@
|
|||
//! Embedded MCP server. Lets a Claude session running anywhere on the
|
||||
//! same machine — including inside one of tiletopia's own panes — inspect
|
||||
//! the workspace via Model Context Protocol.
|
||||
//!
|
||||
//! V1 surface (read-only):
|
||||
//! resources: tiletopia://layout, tiletopia://panes, tiletopia://hosts
|
||||
//! tools: read_pane(leaf_id, last_lines?, after_seq?)
|
||||
//! wait_for_idle(leaf_id, idle_ms?, timeout_ms?)
|
||||
//!
|
||||
//! Per-pane `mcpAllow` gate (default-deny) lives in the frontend tree;
|
||||
//! the frontend mirrors the gated subset into {@link McpState} via the
|
||||
//! `mcp_update_state` Tauri command. The MCP server only sees what the
|
||||
//! mirror exposes — no peeking around it.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::Result;
|
||||
use axum::{
|
||||
body::Body,
|
||||
http::{HeaderMap, HeaderValue, Request, StatusCode},
|
||||
middleware::{self, Next},
|
||||
response::Response,
|
||||
Router,
|
||||
};
|
||||
use parking_lot::Mutex as PlMutex;
|
||||
use rmcp::{
|
||||
handler::server::{router::tool::ToolRouter, wrapper::Parameters},
|
||||
model::*,
|
||||
schemars, tool, tool_handler, tool_router,
|
||||
service::RequestContext,
|
||||
transport::streamable_http_server::{
|
||||
session::local::LocalSessionManager, tower::StreamableHttpService,
|
||||
},
|
||||
ErrorData as McpError, RoleServer, ServerHandler,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use tokio::{net::TcpListener, sync::RwLock, task::JoinHandle};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::pty::{PaneId, PtyManager};
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
// Shared state mirrored from the frontend.
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
pub type LeafId = String;
|
||||
|
||||
/// Cached snapshot the frontend pushes via `mcp_update_state` whenever the
|
||||
/// tree or hosts change. Source of truth for everything except scrollback,
|
||||
/// which the backend collects directly via {@link PtyManager}.
|
||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct McpMirror {
|
||||
/// Serialised layout tree (full structure, post-filtering happens
|
||||
/// per-resource — see read_resource).
|
||||
#[serde(default)]
|
||||
pub layout_json: String,
|
||||
/// Map of leaf id → pane metadata. Includes only leaves with
|
||||
/// `mcpAllow === true` (frontend gates before mirroring).
|
||||
#[serde(default)]
|
||||
pub leaves: HashMap<LeafId, MirroredLeaf>,
|
||||
/// Saved SSH hosts, password fields stripped.
|
||||
#[serde(default)]
|
||||
pub hosts: Vec<MirroredHost>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct MirroredLeaf {
|
||||
pub pane_id: Option<PaneId>,
|
||||
pub label: Option<String>,
|
||||
pub shell_kind: String,
|
||||
pub distro: Option<String>,
|
||||
pub ssh_host_id: Option<String>,
|
||||
#[serde(default)]
|
||||
pub broadcast: bool,
|
||||
#[serde(default)]
|
||||
pub active: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct MirroredHost {
|
||||
pub id: String,
|
||||
pub label: String,
|
||||
pub hostname: String,
|
||||
pub user: Option<String>,
|
||||
pub port: Option<u16>,
|
||||
#[serde(default)]
|
||||
pub has_password: bool,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct McpState {
|
||||
pub bearer_token: String,
|
||||
pub mirror: McpMirror,
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
// MCP service: tools + resources.
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TileService {
|
||||
ptys: Arc<PtyManager>,
|
||||
state: Arc<RwLock<McpState>>,
|
||||
tool_router: ToolRouter<Self>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, schemars::JsonSchema)]
|
||||
pub struct ReadPaneArgs {
|
||||
/// Stable leaf id from the tree (uuid-shaped). Must belong to a pane
|
||||
/// the user has allow-listed for MCP access.
|
||||
pub leaf_id: LeafId,
|
||||
/// Return only the last N lines (default 200, hard cap 3000).
|
||||
#[serde(default)]
|
||||
pub last_lines: Option<usize>,
|
||||
/// Only return bytes whose seq > this. Pair with the `__seq__` value
|
||||
/// returned in a prior call for incremental polling.
|
||||
#[serde(default)]
|
||||
pub after_seq: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, schemars::JsonSchema)]
|
||||
pub struct WaitForIdleArgs {
|
||||
pub leaf_id: LeafId,
|
||||
/// Required quiet window before declaring idle (default 500 ms).
|
||||
#[serde(default)]
|
||||
pub idle_ms: Option<u64>,
|
||||
/// Hard timeout in ms; returns timeout=true after this (default 30s,
|
||||
/// hard cap 5 min).
|
||||
#[serde(default)]
|
||||
pub timeout_ms: Option<u64>,
|
||||
}
|
||||
|
||||
const READ_PANE_HARD_CAP_LINES: usize = 3000;
|
||||
const WAIT_TIMEOUT_HARD_CAP_MS: u64 = 5 * 60 * 1000;
|
||||
|
||||
#[tool_router]
|
||||
impl TileService {
|
||||
pub fn new(ptys: Arc<PtyManager>, state: Arc<RwLock<McpState>>) -> Self {
|
||||
Self {
|
||||
ptys,
|
||||
state,
|
||||
tool_router: Self::tool_router(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Look up a leaf_id → pane_id under the MCP-allow gate.
|
||||
async fn resolve_pane(&self, leaf_id: &str) -> Result<PaneId, McpError> {
|
||||
let st = self.state.read().await;
|
||||
let leaf = st.mirror.leaves.get(leaf_id).ok_or_else(|| {
|
||||
McpError::invalid_params(
|
||||
"unknown leaf_id (not visible to MCP; user may need to allow it)",
|
||||
Some(json!({ "leaf_id": leaf_id })),
|
||||
)
|
||||
})?;
|
||||
leaf.pane_id.ok_or_else(|| {
|
||||
McpError::invalid_params(
|
||||
"leaf has no live pane",
|
||||
Some(json!({ "leaf_id": leaf_id })),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
#[tool(description = "Read the recent scrollback of a terminal pane. \
|
||||
Returns text plus a __seq__=N marker that can be passed back as \
|
||||
after_seq for incremental polling.")]
|
||||
async fn read_pane(
|
||||
&self,
|
||||
Parameters(args): Parameters<ReadPaneArgs>,
|
||||
) -> Result<CallToolResult, McpError> {
|
||||
let pane_id = self.resolve_pane(&args.leaf_id).await?;
|
||||
|
||||
let ring = self.ptys.ring(pane_id).ok_or_else(|| {
|
||||
McpError::internal_error(
|
||||
"pane ring missing (pane may have just exited)",
|
||||
Some(json!({ "leaf_id": args.leaf_id })),
|
||||
)
|
||||
})?;
|
||||
let (bytes, seq) = {
|
||||
let g = ring.lock();
|
||||
g.snapshot()
|
||||
};
|
||||
|
||||
// Trim by after_seq if provided: bytes in the ring beyond
|
||||
// `after_seq` is `seq - after_seq`, clamped against ring size.
|
||||
let start = match args.after_seq {
|
||||
Some(prev) if seq > prev => {
|
||||
let new_bytes = (seq - prev) as usize;
|
||||
bytes.len().saturating_sub(new_bytes)
|
||||
}
|
||||
Some(_) => bytes.len(),
|
||||
None => 0,
|
||||
};
|
||||
let tail = &bytes[start..];
|
||||
|
||||
let text = String::from_utf8_lossy(tail);
|
||||
let cap = args
|
||||
.last_lines
|
||||
.map(|n| n.min(READ_PANE_HARD_CAP_LINES))
|
||||
.unwrap_or(200);
|
||||
let limited: String = if cap == 0 {
|
||||
String::new()
|
||||
} else {
|
||||
let lines: Vec<&str> = text.lines().collect();
|
||||
let start_line = lines.len().saturating_sub(cap);
|
||||
lines[start_line..].join("\n")
|
||||
};
|
||||
|
||||
Ok(CallToolResult::success(vec![
|
||||
Content::text(limited),
|
||||
Content::text(format!("__seq__={seq}")),
|
||||
]))
|
||||
}
|
||||
|
||||
#[tool(description = "Block until a pane has been quiet (no output) \
|
||||
for idle_ms, or timeout_ms elapses. Useful for command-completion \
|
||||
synchronisation. Returns {idle:bool, seq:u64, elapsed_ms:u64}.")]
|
||||
async fn wait_for_idle(
|
||||
&self,
|
||||
Parameters(args): Parameters<WaitForIdleArgs>,
|
||||
) -> Result<CallToolResult, McpError> {
|
||||
let pane_id = self.resolve_pane(&args.leaf_id).await?;
|
||||
let ring = self.ptys.ring(pane_id).ok_or_else(|| {
|
||||
McpError::internal_error("pane ring missing", None)
|
||||
})?;
|
||||
|
||||
let idle_target = Duration::from_millis(args.idle_ms.unwrap_or(500));
|
||||
let timeout = Duration::from_millis(
|
||||
args.timeout_ms
|
||||
.unwrap_or(30_000)
|
||||
.min(WAIT_TIMEOUT_HARD_CAP_MS),
|
||||
);
|
||||
let start = Instant::now();
|
||||
let mut last_seq = ring.lock().snapshot().1;
|
||||
let mut last_change = Instant::now();
|
||||
|
||||
loop {
|
||||
// Sleep in small slices so we notice both incoming data and
|
||||
// the overall timeout promptly.
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
let now_seq = ring.lock().snapshot().1;
|
||||
if now_seq != last_seq {
|
||||
last_seq = now_seq;
|
||||
last_change = Instant::now();
|
||||
}
|
||||
if last_change.elapsed() >= idle_target {
|
||||
return Ok(CallToolResult::success(vec![Content::text(
|
||||
json!({
|
||||
"idle": true,
|
||||
"seq": last_seq,
|
||||
"elapsed_ms": start.elapsed().as_millis() as u64,
|
||||
})
|
||||
.to_string(),
|
||||
)]));
|
||||
}
|
||||
if start.elapsed() >= timeout {
|
||||
return Ok(CallToolResult::success(vec![Content::text(
|
||||
json!({
|
||||
"idle": false,
|
||||
"seq": last_seq,
|
||||
"elapsed_ms": start.elapsed().as_millis() as u64,
|
||||
})
|
||||
.to_string(),
|
||||
)]));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tool_handler]
|
||||
impl ServerHandler for TileService {
|
||||
fn get_info(&self) -> ServerInfo {
|
||||
ServerInfo::new(
|
||||
ServerCapabilities::builder()
|
||||
.enable_tools()
|
||||
.enable_resources()
|
||||
.build(),
|
||||
)
|
||||
.with_server_info(Implementation::from_build_env())
|
||||
.with_protocol_version(ProtocolVersion::V_2024_11_05)
|
||||
.with_instructions(
|
||||
"Tiletopia MCP (read-only v1). Resources: tiletopia://layout, \
|
||||
tiletopia://panes, tiletopia://hosts. Tools: read_pane, \
|
||||
wait_for_idle. Only panes the user has allow-listed are \
|
||||
visible.",
|
||||
)
|
||||
}
|
||||
|
||||
async fn list_resources(
|
||||
&self,
|
||||
_r: Option<PaginatedRequestParams>,
|
||||
_: RequestContext<RoleServer>,
|
||||
) -> Result<ListResourcesResult, McpError> {
|
||||
Ok(ListResourcesResult {
|
||||
resources: vec![
|
||||
RawResource::new("tiletopia://layout", "layout").no_annotation(),
|
||||
RawResource::new("tiletopia://panes", "panes").no_annotation(),
|
||||
RawResource::new("tiletopia://hosts", "hosts").no_annotation(),
|
||||
],
|
||||
next_cursor: None,
|
||||
meta: None,
|
||||
})
|
||||
}
|
||||
|
||||
async fn read_resource(
|
||||
&self,
|
||||
req: ReadResourceRequestParams,
|
||||
_: RequestContext<RoleServer>,
|
||||
) -> Result<ReadResourceResult, McpError> {
|
||||
let state = self.state.read().await;
|
||||
let body = match req.uri.as_str() {
|
||||
"tiletopia://layout" => state.mirror.layout_json.clone(),
|
||||
"tiletopia://panes" => {
|
||||
serde_json::to_string(&state.mirror.leaves).unwrap_or_default()
|
||||
}
|
||||
"tiletopia://hosts" => {
|
||||
serde_json::to_string(&state.mirror.hosts).unwrap_or_default()
|
||||
}
|
||||
other => {
|
||||
return Err(McpError::resource_not_found(
|
||||
"resource_not_found",
|
||||
Some(json!({ "uri": other })),
|
||||
));
|
||||
}
|
||||
};
|
||||
Ok(ReadResourceResult {
|
||||
contents: vec![ResourceContents::text(body, req.uri)],
|
||||
})
|
||||
}
|
||||
|
||||
async fn list_resource_templates(
|
||||
&self,
|
||||
_r: Option<PaginatedRequestParams>,
|
||||
_: RequestContext<RoleServer>,
|
||||
) -> Result<ListResourceTemplatesResult, McpError> {
|
||||
Ok(ListResourceTemplatesResult {
|
||||
resource_templates: vec![],
|
||||
next_cursor: None,
|
||||
meta: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
// HTTP wiring + bearer auth.
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
async fn bearer_auth(
|
||||
axum::extract::State(expected): axum::extract::State<Arc<String>>,
|
||||
headers: HeaderMap,
|
||||
req: Request<Body>,
|
||||
next: Next,
|
||||
) -> Result<Response, Response> {
|
||||
let supplied = headers
|
||||
.get(axum::http::header::AUTHORIZATION)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.and_then(|s| s.strip_prefix("Bearer "));
|
||||
let ok = supplied
|
||||
.map(|t| constant_time_eq(t.as_bytes(), expected.as_bytes()))
|
||||
.unwrap_or(false);
|
||||
if ok {
|
||||
return Ok(next.run(req).await);
|
||||
}
|
||||
|
||||
let mut resp = Response::builder()
|
||||
.status(StatusCode::UNAUTHORIZED)
|
||||
.body(Body::empty())
|
||||
.unwrap();
|
||||
resp.headers_mut().insert(
|
||||
axum::http::header::WWW_AUTHENTICATE,
|
||||
HeaderValue::from_static(r#"Bearer realm="tiletopia""#),
|
||||
);
|
||||
Err(resp)
|
||||
}
|
||||
|
||||
fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
|
||||
if a.len() != b.len() {
|
||||
return false;
|
||||
}
|
||||
let mut d = 0u8;
|
||||
for (x, y) in a.iter().zip(b) {
|
||||
d |= x ^ y;
|
||||
}
|
||||
d == 0
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
// Lifecycle.
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
pub struct RunningServer {
|
||||
pub addr: SocketAddr,
|
||||
pub token: String,
|
||||
pub cancel: CancellationToken,
|
||||
pub task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct McpServerHandle(pub PlMutex<Option<RunningServer>>);
|
||||
|
||||
pub async fn start_server(
|
||||
ptys: Arc<PtyManager>,
|
||||
state: Arc<RwLock<McpState>>,
|
||||
) -> Result<RunningServer> {
|
||||
// 256-bit bearer token, hex-encoded.
|
||||
use rand::RngCore;
|
||||
let mut buf = [0u8; 32];
|
||||
rand::rng().fill_bytes(&mut buf);
|
||||
let token = hex::encode(buf);
|
||||
state.write().await.bearer_token = token.clone();
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
// Fresh service per session; cheap because we share state via Arcs.
|
||||
let ptys_f = ptys.clone();
|
||||
let state_f = state.clone();
|
||||
let mcp_service = StreamableHttpService::new(
|
||||
move || Ok(TileService::new(ptys_f.clone(), state_f.clone())),
|
||||
LocalSessionManager::default().into(),
|
||||
Default::default(),
|
||||
);
|
||||
|
||||
let app = Router::new()
|
||||
.nest_service("/mcp", mcp_service)
|
||||
.layer(middleware::from_fn_with_state(
|
||||
Arc::new(token.clone()),
|
||||
bearer_auth,
|
||||
));
|
||||
|
||||
// Port 0 → OS picks. Recover via local_addr() before serving.
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await?;
|
||||
let addr = listener.local_addr()?;
|
||||
|
||||
let cancel_inner = cancel.clone();
|
||||
let task = tokio::spawn(async move {
|
||||
let _ = axum::serve(listener, app)
|
||||
.with_graceful_shutdown(async move {
|
||||
cancel_inner.cancelled().await;
|
||||
})
|
||||
.await;
|
||||
});
|
||||
|
||||
tracing::info!("MCP server listening on http://{addr}/mcp");
|
||||
Ok(RunningServer {
|
||||
addr,
|
||||
token,
|
||||
cancel,
|
||||
task,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn stop_server(handle: &McpServerHandle) {
|
||||
if let Some(srv) = handle.0.lock().take() {
|
||||
srv.cancel.cancel();
|
||||
srv.task.abort();
|
||||
tracing::info!("MCP server stopped");
|
||||
}
|
||||
}
|
||||
|
|
@ -2,7 +2,7 @@
|
|||
//! through portable-pty, reads its output on a background thread, and
|
||||
//! forwards chunks to the frontend as `pane://{id}/data` events.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::io::{Read, Write};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
|
@ -52,6 +52,41 @@ pub enum SpawnSpec {
|
|||
/// the SSH prompt.
|
||||
type SharedWriter = Arc<Mutex<Box<dyn Write + Send>>>;
|
||||
|
||||
/// Per-pane scrollback ring exposed to the MCP server. Capped — we drop the
|
||||
/// oldest bytes when full. `seq` is a monotonic byte counter that wraps at
|
||||
/// u64; the MCP `read_pane` tool uses it for incremental polling and the
|
||||
/// `wait_for_idle` tool uses it to detect silence.
|
||||
pub const PANE_RING_CAPACITY: usize = 256 * 1024;
|
||||
|
||||
pub struct PaneRing {
|
||||
buf: VecDeque<u8>,
|
||||
seq: u64,
|
||||
}
|
||||
|
||||
impl PaneRing {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
buf: VecDeque::with_capacity(PANE_RING_CAPACITY),
|
||||
seq: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn push(&mut self, bytes: &[u8]) {
|
||||
for &b in bytes {
|
||||
if self.buf.len() == PANE_RING_CAPACITY {
|
||||
self.buf.pop_front();
|
||||
}
|
||||
self.buf.push_back(b);
|
||||
}
|
||||
self.seq = self.seq.wrapping_add(bytes.len() as u64);
|
||||
}
|
||||
|
||||
/// Snapshot: current contents (oldest-first) + the seq counter.
|
||||
pub fn snapshot(&self) -> (Vec<u8>, u64) {
|
||||
(self.buf.iter().copied().collect(), self.seq)
|
||||
}
|
||||
}
|
||||
|
||||
/// What we keep alive for each spawned PTY.
|
||||
///
|
||||
/// `master` stays in scope to keep the PTY alive; we never write through it
|
||||
|
|
@ -63,6 +98,9 @@ struct PaneHandle {
|
|||
writer: SharedWriter,
|
||||
#[allow(dead_code)]
|
||||
child: Box<dyn portable_pty::Child + Send + Sync>,
|
||||
/// Same Arc the reader thread appends into; the MCP server reads via
|
||||
/// {@link PtyManager::ring}.
|
||||
ring: Arc<Mutex<PaneRing>>,
|
||||
}
|
||||
|
||||
pub struct PtyManager {
|
||||
|
|
@ -127,6 +165,7 @@ impl PtyManager {
|
|||
.take_writer()
|
||||
.context("take_writer failed")?;
|
||||
let writer: SharedWriter = Arc::new(Mutex::new(writer_raw));
|
||||
let ring: Arc<Mutex<PaneRing>> = Arc::new(Mutex::new(PaneRing::new()));
|
||||
|
||||
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
|
|
@ -136,14 +175,18 @@ impl PtyManager {
|
|||
master: pair.master,
|
||||
writer: writer.clone(),
|
||||
child,
|
||||
ring: ring.clone(),
|
||||
},
|
||||
);
|
||||
|
||||
// Reader thread: pump bytes -> base64 -> emit. Also handles the
|
||||
// password-prompt autotype state machine if `saved_password` is set.
|
||||
// password-prompt autotype state machine if `saved_password` is set,
|
||||
// and pushes raw bytes into the per-pane scrollback ring for the
|
||||
// MCP server to read.
|
||||
let app_for_reader = app.clone();
|
||||
let event_name = format!("pane://{id}/data");
|
||||
let writer_for_reader = writer.clone();
|
||||
let ring_for_reader = ring.clone();
|
||||
std::thread::spawn(move || {
|
||||
let mut buf = [0u8; 8192];
|
||||
let mut pw_state = PasswordState::from(saved_password);
|
||||
|
|
@ -159,6 +202,9 @@ impl PtyManager {
|
|||
// on the renderer; pw_state mutates here.
|
||||
pw_state.observe(&buf[..n], &writer_for_reader, id);
|
||||
|
||||
// Mirror bytes into the scrollback ring (MCP source).
|
||||
ring_for_reader.lock().push(&buf[..n]);
|
||||
|
||||
let chunk_b64 = B64.encode(&buf[..n]);
|
||||
if let Err(e) =
|
||||
app_for_reader.emit(&event_name, DataChunk { b64: chunk_b64 })
|
||||
|
|
@ -217,6 +263,13 @@ impl PtyManager {
|
|||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Borrow the per-pane scrollback ring. Returns None if the pane has
|
||||
/// been killed. The Arc lets callers hold the ring even after the
|
||||
/// PaneHandle is dropped (reader thread will stop pushing into it).
|
||||
pub fn ring(&self, id: PaneId) -> Option<Arc<Mutex<PaneRing>>> {
|
||||
self.panes.lock().get(&id).map(|p| p.ring.clone())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone)]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue