#!/usr/bin/env python3 """Extract the conversation skeleton from a Claude Code, Codex, or Cursor JSONL session file. Usage: cat | python3 extract-skeleton.py cat | python3 extract-skeleton.py --output PATH Auto-detects platform (Claude Code, Codex, or Cursor) from the JSONL structure. Extracts: - User messages (text only, no tool results) - Assistant text (no thinking/reasoning blocks) - Collapsed tool call summaries (consecutive same-tool calls grouped) Consecutive tool calls of the same type are collapsed: 3+ Read calls -> "[tools] 3x Read (file1, file2, +1 more) -> all ok" Codex call/result pairs are deduplicated (only the result with status is kept). When --output PATH is given, the extracted skeleton is written to PATH and stdout receives only a one-line JSON status (_meta with wrote/bytes/stats). This lets callers route bulk content to a scratch file without round-tripping extraction bytes through orchestrator tool results. Without --output, extracted content goes to stdout and ends with a _meta line. """ import argparse import io import os import sys import json import re parser = argparse.ArgumentParser(add_help=True) parser.add_argument( "--output", metavar="PATH", help="Write extracted skeleton to PATH instead of stdout. Stdout receives a one-line _meta status.", ) args = parser.parse_args() # Capture-and-redirect when --output is set: prints in the rest of the script # go to the buffer; at the end the buffer is written to PATH and a status # line is emitted to the real stdout. _original_stdout = sys.stdout if args.output: sys.stdout = io.StringIO() stats = {"lines": 0, "parse_errors": 0, "user": 0, "assistant": 0, "tool": 0} # Claude Code wrapper tags to strip from user message content. # Strip entirely (tag + content): framework noise and raw command output. # Strip tags only (keep content): command-message, command-name, command-args, user_query. _STRIP_BLOCK = re.compile( r"<(?:task-notification|local-command-caveat|local-command-stdout|local-command-stderr|system-reminder)[^>]*>.*?", re.DOTALL, ) _STRIP_TAG = re.compile( r"]*>" ) def clean_text(text): """Strip framework wrapper tags from message text (Claude and Cursor).""" text = _STRIP_BLOCK.sub("", text) text = _STRIP_TAG.sub("", text) text = re.sub(r"\n{3,}", "\n\n", text).strip() return text # Buffer for pending tool entries: [{"ts", "name", "target", "status"}] pending_tools = [] def flush_tools(): """Print buffered tool entries, collapsing consecutive same-name groups.""" if not pending_tools: return # Group consecutive entries by tool name groups = [] for entry in pending_tools: if groups and groups[-1][0]["name"] == entry["name"]: groups[-1].append(entry) else: groups.append([entry]) for group in groups: name = group[0]["name"] if len(group) <= 2: # Print individually for e in group: status = f" -> {e['status']}" if e.get("status") else "" ts_prefix = f"[{e['ts']}] " if e.get("ts") else "" print(f"{ts_prefix}[tool] {name} {e['target']}{status}") stats["tool"] += 1 else: # Collapse ts = group[0].get("ts", "") targets = [e["target"] for e in group if e.get("target")] ok = sum(1 for e in group if e.get("status") == "ok") err = sum(1 for e in group if e.get("status") and e["status"] != "ok") no_status = len(group) - ok - err # Show first 2 targets, then "+N more" if len(targets) > 2: target_str = ", ".join(targets[:2]) + f", +{len(targets) - 2} more" elif targets: target_str = ", ".join(targets) else: target_str = "" if no_status == len(group): status_str = "" elif err == 0: status_str = " -> all ok" else: status_str = f" -> {ok} ok, {err} error" ts_prefix = f"[{ts}] " if ts else "" print(f"{ts_prefix}[tools] {len(group)}x {name} ({target_str}){status_str}") stats["tool"] += len(group) pending_tools.clear() def _safe_slice(value, n): """Slice value if it is a string; otherwise return ''. Some Claude Code / MCP tool inputs put structured data (dicts, lists) in fields like `query` or `prompt`. `dict[:N]` raises TypeError, so guard every slice with an isinstance check. """ return value[:n] if isinstance(value, str) else "" def summarize_claude_tool(block): """Extract name and target from a Claude Code tool_use block.""" name = block.get("name", "unknown") inp = block.get("input", {}) fp = inp.get("file_path") p = inp.get("path") target = ( (fp if isinstance(fp, str) else None) or (p if isinstance(p, str) else None) or _safe_slice(inp.get("command"), 120) or _safe_slice(inp.get("pattern"), 200) or _safe_slice(inp.get("query"), 80) or _safe_slice(inp.get("prompt"), 80) or "" ) if isinstance(target, str) and len(target) > 120: target = target[:120] return name, target def handle_claude(obj): msg_type = obj.get("type") ts = obj.get("timestamp", "")[:19] if msg_type == "user": msg = obj.get("message", {}) content = msg.get("content", "") if isinstance(content, list): for block in content: if block.get("type") == "tool_result": is_error = block.get("is_error", False) status = "error" if is_error else "ok" tool_use_id = block.get("tool_use_id") matched = False if tool_use_id: for entry in pending_tools: if entry.get("id") == tool_use_id: entry["status"] = status matched = True break if not matched: # Fallback: assign to earliest pending entry without a status for entry in pending_tools: if not entry.get("status"): entry["status"] = status break texts = [ c.get("text", "") for c in content if c.get("type") == "text" and len(c.get("text", "")) > 10 ] content = " ".join(texts) if isinstance(content, str): content = clean_text(content) if len(content) > 15: flush_tools() print(f"[{ts}] [user] {content[:800]}") print("---") stats["user"] += 1 elif msg_type == "assistant": msg = obj.get("message", {}) content = msg.get("content", []) if isinstance(content, list): has_text = False for block in content: if block.get("type") == "text": text = clean_text(block.get("text", "")) if len(text) > 20: if not has_text: flush_tools() has_text = True print(f"[{ts}] [assistant] {text[:800]}") print("---") stats["assistant"] += 1 elif block.get("type") == "tool_use": name, target = summarize_claude_tool(block) entry = {"ts": ts, "name": name, "target": target} tool_id = block.get("id") if tool_id: entry["id"] = tool_id pending_tools.append(entry) def handle_codex(obj): msg_type = obj.get("type") ts = obj.get("timestamp", "")[:19] if msg_type == "event_msg": p = obj.get("payload", {}) if p.get("type") == "user_message": text = p.get("message", "") if isinstance(text, str) and len(text) > 15: parts = text.split("") user_text = parts[-1].strip() if parts else text if len(user_text) > 15: flush_tools() print(f"[{ts}] [user] {user_text[:800]}") print("---") stats["user"] += 1 elif p.get("type") == "exec_command_end": # This is the deduplicated result — has status info command = p.get("command", []) cmd_str = command[-1] if command else "" output = p.get("aggregated_output", "") status = "ok" if "Process exited with code " in output: try: code = int(output.split("Process exited with code ")[1].split("\n")[0]) if code != 0: status = f"error(exit {code})" except (IndexError, ValueError): pass if cmd_str: # Shorten common patterns for readability short_cmd = cmd_str[:120] pending_tools.append({"ts": ts, "name": "exec", "target": short_cmd, "status": status}) elif msg_type == "response_item": p = obj.get("payload", {}) if p.get("type") == "message" and p.get("role") == "assistant": for block in p.get("content", []): if block.get("type") == "output_text" and len(block.get("text", "")) > 20: flush_tools() print(f"[{ts}] [assistant] {block['text'][:800]}") print("---") stats["assistant"] += 1 # Skip function_call — exec_command_end is the deduplicated version with status def handle_cursor(obj): """Cursor agent transcripts: role-based, no timestamps, same content structure as Claude.""" role = obj.get("role") content = obj.get("message", {}).get("content", []) if role == "user": texts = [] for block in (content if isinstance(content, list) else []): if block.get("type") == "text": texts.append(block.get("text", "")) text = clean_text(" ".join(texts)) if len(text) > 15: flush_tools() # No timestamps available in Cursor transcripts print(f"[user] {text[:800]}") print("---") stats["user"] += 1 elif role == "assistant": has_text = False for block in (content if isinstance(content, list) else []): if block.get("type") == "text": text = block.get("text", "") # Skip [REDACTED] placeholder blocks if len(text) > 20 and text.strip() != "[REDACTED]": if not has_text: flush_tools() has_text = True print(f"[assistant] {text[:800]}") print("---") stats["assistant"] += 1 elif block.get("type") == "tool_use": name = block.get("name", "unknown") inp = block.get("input", {}) p = inp.get("path") fp = inp.get("file_path") target = ( (p if isinstance(p, str) else None) or (fp if isinstance(fp, str) else None) or _safe_slice(inp.get("command"), 120) or _safe_slice(inp.get("pattern"), 200) or _safe_slice(inp.get("glob_pattern"), 200) or _safe_slice(inp.get("target_directory"), 200) or "" ) if isinstance(target, str) and len(target) > 120: target = target[:120] # No status info available — Cursor doesn't log tool results pending_tools.append({"ts": "", "name": name, "target": target}) # Auto-detect platform from first few lines, then process all detected = None buffer = [] for line in sys.stdin: line = line.strip() if not line: continue buffer.append(line) stats["lines"] += 1 if not detected and len(buffer) <= 10: try: obj = json.loads(line) if obj.get("type") in ("user", "assistant"): detected = "claude" elif obj.get("type") in ("session_meta", "turn_context", "response_item", "event_msg"): detected = "codex" elif obj.get("role") in ("user", "assistant") and "type" not in obj: detected = "cursor" except (json.JSONDecodeError, KeyError): pass handlers = {"claude": handle_claude, "codex": handle_codex, "cursor": handle_cursor} handler = handlers.get(detected, handle_codex) for line in buffer: try: handler(json.loads(line)) except (json.JSONDecodeError, KeyError): stats["parse_errors"] += 1 # Flush any remaining buffered tools flush_tools() print(json.dumps({"_meta": True, **stats})) if args.output: body = sys.stdout.getvalue() sys.stdout = _original_stdout with open(args.output, "w") as f: f.write(body) bytes_written = os.path.getsize(args.output) print(json.dumps({"_meta": True, "wrote": args.output, "bytes": bytes_written, **stats}))