summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--.gitignore1
-rw-r--r--README.md92
-rw-r--r--cs2pov/automation.py50
-rw-r--r--cs2pov/cli.py712
-rw-r--r--cs2pov/config.py8
-rw-r--r--cs2pov/parser.py8
-rw-r--r--cs2pov/preprocessor.py411
-rw-r--r--cs2pov/trim.py87
8 files changed, 1160 insertions, 209 deletions
diff --git a/.gitignore b/.gitignore
index 1bb3ca1..4b93ea3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,3 +4,4 @@
*egg-info*
*__pycache__*
.venv/
+test_demos/
diff --git a/README.md b/README.md
index c78bffb..db98e41 100644
--- a/README.md
+++ b/README.md
@@ -13,7 +13,7 @@ Counter-Strike 2 POV recording automation. Largely AI-assisted and personalized
- **GPU** with Vulkan support (NVIDIA or AMD)
```bash
-##Install Python dependencies:
+## Install Python dependencies:
pip install -e .
## Gentoo (based)
@@ -26,41 +26,74 @@ apt install ffmpeg xdotool pulseaudio-utils
## Quick Start
```bash
-# List players in a demo
-cs2pov list /path/to/demo.dem
+# Show demo information (players, deaths, spawns)
+cs2pov info /path/to/demo.dem
-# Record a player's POV
-cs2pov -d /path/to/demo.dem -p "PlayerName" -o recording.mp4
+# Record a player's POV (full pipeline: record + trim)
+cs2pov pov -d /path/to/demo.dem -p "PlayerName" -o recording.mp4
+
+# Raw recording only (no trimming)
+cs2pov record -d /path/to/demo.dem -p "PlayerName" -o raw.mp4
+
+# Trim an existing recording
+cs2pov trim raw.mp4 -d /path/to/demo.dem -p "PlayerName"
```
-## Usage
+## Commands
+
+### `cs2pov info` - Show Demo Information
+
+Display demo metadata, player list, and death/spawn statistics for each player.
+
+```bash
+cs2pov info demo.dem # Human-readable output
+cs2pov info demo.dem --json # JSON output
+cs2pov info demo.dem -v # Verbose (includes death period details)
+```
+
+### `cs2pov pov` - Full Recording Pipeline
+
+Record a player's POV and automatically trim death periods.
+```bash
+cs2pov pov -d demo.dem -p "PlayerName" -o recording.mp4
+cs2pov pov -d demo.dem -p "PlayerName" -o recording.mp4 --no-trim # Skip trimming
```
-cs2pov [-h] --demo DEMO --player PLAYER --output OUTPUT [options]
+
+### `cs2pov record` - Raw Recording Only
+
+Record without post-processing. Useful for batch recording then trimming later.
+
+```bash
+cs2pov record -d demo.dem -p "PlayerName" -o raw.mp4
```
-### Required Arguments
+### `cs2pov trim` - Trim Existing Recording
+
+Remove death periods from a previously recorded video.
+
+```bash
+cs2pov trim raw.mp4 -d demo.dem -p "PlayerName"
+cs2pov trim raw.mp4 -d demo.dem -p "PlayerName" -o trimmed.mp4
+```
-| Argument | Short | Description |
-|----------|-------|-------------|
-| `--demo` | `-d` | Path to demo file (.dem) |
-| `--player` | `-p` | Player to record (name or SteamID) |
-| `--output` | `-o` | Output video file path |
+## Common Options
-### Optional Arguments
+### Recording Options (pov, record)
| Argument | Short | Default | Description |
|----------|-------|---------|-------------|
+| `--demo` | `-d` | required | Path to demo file (.dem) |
+| `--player` | `-p` | required | Player to record (name or SteamID) |
+| `--output` | `-o` | required | Output video file path |
| `--resolution` | `-r` | 1920x1080 | Recording resolution |
| `--framerate` | `-f` | 60 | Recording framerate |
-| `--no-hud` | | off | Hide HUD elements (except killfeed) |
+| `--no-hud` | | off | Hide HUD elements |
| `--no-audio` | | off | Disable audio recording |
| `--audio-device` | | auto | PulseAudio device for audio capture |
-| `--no-trim` | | off | Skip post-processing, keep full recording |
-| `--display` | | 0 | X display number (0 = real display) |
-| `--cs2-path` | | auto | Path to CS2 installation (can also be CS2_PATH envvar) |
+| `--display` | | 0 | X display number |
+| `--cs2-path` | | auto | Path to CS2 installation |
| `--verbose` | `-v` | off | Verbose output |
-| `--version` | | | Show version |
### Player Identification
@@ -74,16 +107,17 @@ The `--player` argument accepts multiple formats:
## How It Works
1. **Parse demo** - Extract player list and metadata using demoparser2
-2. **Generate config** - Create CS2 CFG file with spectator settings
-3. **Copy demo** - Place demo in CS2's replays directory
-4. **Launch CS2** - Start CS2 via Steam with the generated config
-5. **Wait for map load** - Monitor console.log for map load completion
-6. **Hide demo UI** - Send Shift+F2 to hide playback controls
-7. **Start capture** - Launch FFmpeg to record display + audio (PulseAudio)
-8. **Recording loop** - Send F5 periodically to keep spectator locked on target player
-9. **Wait for demo end** - Monitor console.log for demo completion
-10. **Finalize** - Stop capture and terminate CS2
-11. **Post-process** - Trim start (until POV selected) and death periods from video
+2. **Preprocess timeline** - Extract death/spawn events for accurate trimming
+3. **Generate config** - Create CS2 CFG file with spectator settings
+4. **Copy demo** - Place demo in CS2's replays directory
+5. **Launch CS2** - Start CS2 via Steam with the generated config
+6. **Wait for first spawn** - Monitor console.log for player spawn
+7. **Hide demo UI** - Send Shift+F2 to hide playback controls
+8. **Start capture** - Launch FFmpeg to record display + audio (PulseAudio)
+9. **Recording loop** - Send F5 periodically to keep spectator locked on target player
+10. **Wait for demo end** - Monitor console.log for demo completion
+11. **Finalize** - Stop capture and terminate CS2
+12. **Post-process** - Trim start and death periods from video using timeline data
## Noteworthy Issues/Workarounds
diff --git a/cs2pov/automation.py b/cs2pov/automation.py
index 843b2ee..317ecad 100644
--- a/cs2pov/automation.py
+++ b/cs2pov/automation.py
@@ -145,6 +145,56 @@ def wait_for_map_load(log_path: Path, timeout: float = 120, poll_interval: float
return False
+def wait_for_first_spawn(
+ log_path: Path,
+ player_name: str,
+ timeout: float = 180,
+ poll_interval: float = 0.5
+) -> bool:
+ """Wait for first spawn of the target player by watching console.log.
+
+ Looks for: <player> spawned
+
+ This indicates the player has spawned and the POV camera is ready.
+
+ Args:
+ log_path: Path to CS2 console.log
+ player_name: Name of the player to watch for
+ timeout: Maximum time to wait in seconds
+ poll_interval: Time between checks
+
+ Returns:
+ True if first spawn detected, False if timeout
+ """
+ import time
+
+ spawn_pattern = re.compile(
+ rf"{re.escape(player_name)} spawned\b"
+ )
+ start = time.time()
+ last_position = 0
+
+ while time.time() - start < timeout:
+ if not log_path.exists():
+ time.sleep(poll_interval)
+ continue
+
+ try:
+ with open(log_path, 'r', errors='ignore') as f:
+ f.seek(last_position)
+ content = f.read()
+ last_position = f.tell()
+
+ if spawn_pattern.search(content):
+ return True
+ except Exception:
+ pass
+
+ time.sleep(poll_interval)
+
+ return False
+
+
def wait_for_cs2_window(display: str = ":0", timeout: float = 120, poll_interval: float = 2.0) -> Optional[str]:
"""Wait for CS2 window to appear.
diff --git a/cs2pov/cli.py b/cs2pov/cli.py
index 1793be8..09eff51 100644
--- a/cs2pov/cli.py
+++ b/cs2pov/cli.py
@@ -1,16 +1,16 @@
#!/usr/bin/env python3
"""CS2 POV Recorder - Record player POV from CS2 demo files.
-Simplified architecture with linear flow:
-1. Setup (parse demo, generate config, launch CS2)
-2. Recording loop (single-threaded, blocking)
-3. Cleanup (stop FFmpeg, kill CS2)
-4. Post-processing (trim death periods)
+Subcommand-based CLI:
+ pov - Full pipeline (record + trim)
+ info - Show demo information
+ record - Raw recording only
+ trim - Post-process existing video
"""
import argparse
+import json
import shutil
-import subprocess
import sys
import time
from dataclasses import dataclass
@@ -18,15 +18,20 @@ from pathlib import Path
from typing import Optional
from . import __version__
-from .automation import check_xdotool, send_key, check_demo_ended, wait_for_cs2_window, wait_for_map_load
+from .automation import send_key, check_demo_ended, wait_for_cs2_window, wait_for_first_spawn
from .capture import FFmpegCapture, get_default_audio_monitor
from .config import RecordingConfig, generate_recording_cfg
from .exceptions import CS2POVError
from .game import CS2Process, find_cs2_path, get_cfg_dir, get_demo_dir
-from .parser import DemoInfo, find_player, get_player_index, parse_demo
-from .trim import extract_death_periods, trim_death_periods
+from .parser import DemoInfo, PlayerInfo, find_player, get_player_index, parse_demo
+from .preprocessor import DemoTimeline, preprocess_demo, get_trim_periods
+from .trim import extract_death_periods, TrimPeriod, trim_video_with_periods
+# =============================================================================
+# Data Classes
+# =============================================================================
+
@dataclass
class RecordingResult:
"""Result of recording phase with metadata for post-processing."""
@@ -36,7 +41,12 @@ class RecordingResult:
recording_start_time: float
player_slot: int
exit_reason: str # "demo_ended", "timeout", "ffmpeg_stopped", "interrupted"
+ timeline: Optional[DemoTimeline] = None
+
+# =============================================================================
+# Shared Utilities
+# =============================================================================
def check_dependencies() -> list[str]:
"""Check for required system dependencies."""
@@ -57,17 +67,9 @@ def parse_resolution(resolution_str: str) -> tuple[int, int]:
raise ValueError(f"Invalid resolution format: {resolution_str}. Use WxH (e.g., 1920x1080)")
-def print_demo_info(demo_info: DemoInfo):
- """Print parsed demo information."""
- print(f" Map: {demo_info.map_name}")
- print(f" Ticks: {demo_info.total_ticks}")
- print(f" Tick rate: {demo_info.tick_rate}")
- print(f" Players: {len(demo_info.players)}")
- for p in demo_info.players:
- team_str = f" [{p.team}]" if p.team else ""
- print(f" - {p.name}{team_str} ({p.steamid})")
- print(f" Rounds: {len(demo_info.rounds)}")
-
+# =============================================================================
+# Recording Functions
+# =============================================================================
def recording_loop(
display: str,
@@ -79,14 +81,6 @@ def recording_loop(
) -> str:
"""Main recording loop - single-threaded, blocking.
- Args:
- display: X display string
- console_log_path: Path to CS2 console.log
- cs2_process: CS2 process manager
- ffmpeg: FFmpeg capture instance
- timeout: Maximum recording time in seconds
- verbose: Print verbose output
-
Returns:
Exit reason: "demo_ended", "cs2_exited", "timeout", "ffmpeg_stopped"
"""
@@ -102,22 +96,18 @@ def recording_loop(
while True:
elapsed = time.time() - start_time
- # Check timeout
if elapsed > timeout:
print(f" Timeout reached ({timeout/60:.1f} min)")
return "timeout"
- # Check if CS2 still running
if not cs2_process.is_running():
print(" CS2 exited")
return "cs2_exited"
- # Check if FFmpeg still running
if not ffmpeg.is_running():
print(" FFmpeg stopped unexpectedly")
return "ffmpeg_stopped"
- # Check for demo end in console.log
demo_ended, log_position = check_demo_ended(console_log_path, log_position)
if demo_ended:
print(" Demo end detected in console.log")
@@ -135,9 +125,10 @@ def recording_loop(
if window_id:
send_key("F5", display, window_id)
+ if verbose:
+ print(f" Sending F5 (spec_lock)")
last_spec_lock = elapsed
- # Print status every 60 seconds
if elapsed - last_status_time >= 60:
print(f" Still recording... ({elapsed/60:.1f} min)")
last_status_time = elapsed
@@ -158,11 +149,7 @@ def record_demo(
enable_audio: bool = True,
audio_device: str | None = None,
) -> RecordingResult:
- """Record a player's POV from a demo file.
-
- This is a blocking function that handles the entire recording process.
- Cleanup is handled in finally block - always runs.
- """
+ """Record a player's POV from a demo file."""
# Find CS2 installation
print("Finding CS2 installation...")
cs2_path = find_cs2_path(cs2_path_override)
@@ -171,16 +158,29 @@ def record_demo(
# Parse demo
print(f"Parsing demo: {demo_path.name}")
demo_info = parse_demo(demo_path)
- if verbose:
- print_demo_info(demo_info)
- else:
- print(f" Map: {demo_info.map_name}, {len(demo_info.players)} players")
- print(f" Ticks: {demo_info.total_ticks}, Rate: {demo_info.tick_rate}/s")
+ print(f" Map: {demo_info.map_name}, {len(demo_info.players)} players")
+ print(f" Ticks: {demo_info.total_ticks}, Rate: {demo_info.tick_rate}/s")
# Find target player
player = find_player(demo_info, player_identifier)
player_index = get_player_index(demo_info, player)
- print(f"Recording POV: {player.name} (SteamID: {player.steamid}, index: {player_index})")
+ # player_slot is user_id + 1 for spec_player command
+ player_slot = (player.user_id + 1) if player.user_id is not None else (player_index + 1)
+ print(f"Recording POV: {player.name} (SteamID: {player.steamid}, slot: {player_slot})")
+
+ # Preprocess demo for timeline data
+ print("Preprocessing demo for timeline data...")
+ try:
+ timeline = preprocess_demo(demo_path, player.steamid, player.name)
+ print(f" Found {len(timeline.deaths)} deaths, {len(timeline.spawns)} spawns, "
+ f"{len(timeline.rounds)} rounds")
+ if verbose and timeline.death_periods:
+ total_dead = sum(p.duration_seconds for p in timeline.death_periods)
+ print(f" Total dead time: {total_dead:.1f}s across {len(timeline.death_periods)} death periods")
+ except Exception as e:
+ print(f" Warning: Preprocessing failed: {e}")
+ print(f" Will fall back to console.log parsing for trim")
+ timeline = None
# Prepare directories
demo_dir = get_demo_dir(cs2_path)
@@ -204,6 +204,7 @@ def record_demo(
player_index=player_index,
player_name=player.name,
player_steamid=player.steamid,
+ player_slot=player_slot,
resolution=resolution,
hide_hud=hide_hud,
)
@@ -223,20 +224,19 @@ def record_demo(
estimated_duration = demo_info.total_ticks / demo_info.tick_rate
else:
estimated_duration = 3600
- timeout = estimated_duration + 600 # Add 10 min buffer
+ timeout = estimated_duration + 600
print(f"Starting recording (timeout: {timeout/60:.1f} min)...")
print(f" Display: {display_str}")
print(f" Output: {output_path}")
- # Initialize objects
cs2_process: Optional[CS2Process] = None
ffmpeg: Optional[FFmpegCapture] = None
recording_start_time = 0.0
exit_reason = "unknown"
try:
- # Delete old console.log to ensure fresh log for this recording
+ # Delete old console.log
if console_log_path.exists():
console_log_path.unlink()
if verbose:
@@ -247,7 +247,7 @@ def record_demo(
cs2_process.launch("cs2pov_recording.cfg")
print(" CS2 launching via Steam...")
- # Wait for CS2 window to appear
+ # Wait for CS2 window
print(" Waiting for CS2 window...")
window_id = wait_for_cs2_window(display_str, timeout=120)
if window_id:
@@ -255,19 +255,20 @@ def record_demo(
else:
print(" Warning: CS2 window not detected, continuing anyway")
- # Wait for map to load, then hide demo UI with Shift+F2
- print(" Waiting for map to load...")
- if wait_for_map_load(console_log_path, timeout=120):
+ # Wait for first player spawn
+ print(f" Waiting for first spawn ({player.name})...")
+ if wait_for_first_spawn(console_log_path, player.name, timeout=180):
+ print(" First spawn detected")
if verbose:
- print(" Map loaded, waiting 10s before hiding UI...")
- time.sleep(10)
+ print(" Waiting 15s for game to stabilize...")
+ time.sleep(15)
if window_id and send_key("shift+F2", display_str, window_id):
if verbose:
print(" Sent Shift+F2 to hide demo UI")
elif window_id:
print(" Warning: Failed to send Shift+F2 to hide demo UI")
else:
- print(" Warning: Map load not detected, continuing anyway")
+ print(" Warning: First spawn not detected, continuing anyway")
# Determine audio source
audio_source = None
@@ -279,7 +280,7 @@ def record_demo(
else:
print(" Warning: Could not detect audio device, recording video only")
- # Start FFmpeg capture (full display + audio)
+ # Start FFmpeg capture
ffmpeg = FFmpegCapture(
display=display_str,
output_path=output_path,
@@ -294,10 +295,8 @@ def record_demo(
else:
print(f" FFmpeg capture started (video only)")
- # Record start time for death period extraction
recording_start_time = time.time()
- # Run main recording loop (blocking)
exit_reason = recording_loop(
display=display_str,
console_log_path=console_log_path,
@@ -312,10 +311,8 @@ def record_demo(
exit_reason = "interrupted"
finally:
- # Cleanup - always runs
print("\nCleaning up...")
- # Stop FFmpeg first (so video file is finalized)
if ffmpeg:
print(" Stopping FFmpeg...")
graceful = ffmpeg.stop(timeout=15)
@@ -326,22 +323,18 @@ def record_demo(
if ffmpeg.stderr_path and ffmpeg.stderr_path.exists():
print(f" FFmpeg log: {ffmpeg.stderr_path}")
- # Then stop CS2
if cs2_process and cs2_process.is_running():
print(" Terminating CS2...")
cs2_process.terminate()
print(" Cleanup complete")
- # Save console.log with demo-matching name for retention
if console_log_path.exists():
saved_log_path = output_path.parent / f"console_{demo_path.stem}.log"
shutil.copy2(console_log_path, saved_log_path)
print(f" Console log saved: {saved_log_path.name}")
- # Update path for post-processing to use the saved copy
console_log_path = saved_log_path
- # Return result with metadata for post-processing
success = exit_reason in ("demo_ended", "cs2_exited")
return RecordingResult(
success=success,
@@ -350,6 +343,7 @@ def record_demo(
recording_start_time=recording_start_time,
player_slot=player_index,
exit_reason=exit_reason,
+ timeline=timeline,
)
@@ -359,11 +353,9 @@ def postprocess_video(
player_slot: int,
recording_start_time: float,
verbose: bool = False,
+ timeline: Optional[DemoTimeline] = None,
) -> Path:
- """Post-process a recorded video to trim death periods.
-
- This is a separate phase that can be run independently.
- """
+ """Post-process a recorded video to trim death periods."""
if not video_path.exists():
print(f"Error: Video file not found: {video_path}")
return video_path
@@ -371,38 +363,65 @@ def postprocess_video(
raw_size_mb = video_path.stat().st_size / (1024 * 1024)
print(f"\nRaw recording: {video_path} ({raw_size_mb:.1f} MB)")
- print("\nPost-processing: extracting death periods...")
- if verbose:
- print(f" Console log: {console_log_path}")
- print(f" Player slot: {player_slot}")
- print(f" Recording start: {recording_start_time}")
+ print("\nPost-processing: extracting trim periods...")
- if not console_log_path.exists():
- print(f" Console log not found, skipping trim")
- return video_path
+ trim_periods: list[TrimPeriod] = []
- death_periods = extract_death_periods(
- log_path=console_log_path,
- player_slot=player_slot,
- recording_start_time=recording_start_time,
- verbose=verbose
- )
+ # Prefer timeline data
+ if timeline is not None and timeline.spawns:
+ print(" Using preprocessed timeline data (demoparser2)")
+ if verbose:
+ print(f" Deaths: {len(timeline.deaths)}, Spawns: {len(timeline.spawns)}")
- if death_periods:
- print(f" Found {len(death_periods)} death periods")
- total_dead_time = sum(p.duration for p in death_periods)
- print(f" Total dead time: {total_dead_time:.1f}s")
+ demo_trim_periods = get_trim_periods(timeline)
+
+ if demo_trim_periods:
+ for start, end in demo_trim_periods:
+ trim_periods.append(TrimPeriod(start_time=start, end_time=end))
+ if verbose:
+ print(f" Period: {start:.2f}s - {end:.2f}s ({end - start:.2f}s)")
+
+ # Fall back to console.log parsing
+ if not trim_periods:
+ if timeline is not None:
+ print(" Timeline has no spawn data, falling back to console.log")
+ else:
+ print(" Using console.log parsing (legacy method)")
+
+ if verbose:
+ print(f" Console log: {console_log_path}")
+ print(f" Player slot: {player_slot}")
+ print(f" Recording start: {recording_start_time}")
+
+ if not console_log_path.exists():
+ print(f" Console log not found, skipping trim")
+ print(f"\nRecording saved: {video_path} ({raw_size_mb:.1f} MB)")
+ return video_path
+
+ death_periods = extract_death_periods(
+ log_path=console_log_path,
+ player_slot=player_slot,
+ recording_start_time=recording_start_time,
+ verbose=verbose
+ )
+
+ for dp in death_periods:
+ trim_periods.append(TrimPeriod(start_time=dp.death_time, end_time=dp.respawn_time))
+
+ if trim_periods:
+ print(f" Found {len(trim_periods)} periods to trim")
+ total_trim_time = sum(p.duration for p in trim_periods)
+ print(f" Total trim time: {total_trim_time:.1f}s")
- # Rename original to _raw
raw_path = video_path.parent / f"{video_path.stem}_raw{video_path.suffix}"
video_path.rename(raw_path)
print(f" Raw recording moved to: {raw_path.name}")
- print(" Trimming death periods...")
- success = trim_death_periods(
+ print(" Trimming periods from video...")
+ success = trim_video_with_periods(
input_path=raw_path,
output_path=video_path,
- death_periods=death_periods,
+ trim_periods=trim_periods,
verbose=verbose
)
@@ -414,62 +433,287 @@ def postprocess_video(
if not video_path.exists() and raw_path.exists():
raw_path.rename(video_path)
else:
- print(" No death periods found, keeping original")
+ print(" No periods to trim, keeping original")
print(f"\nRecording saved: {video_path} ({raw_size_mb:.1f} MB)")
return video_path
-def main():
- """Main entry point."""
+# =============================================================================
+# Info Output Functions
+# =============================================================================
+
+def print_demo_info_extended(
+ demo_info: DemoInfo,
+ timelines: dict[int, DemoTimeline],
+ verbose: bool = False
+):
+ """Print comprehensive demo information."""
+ # Get rounds and duration from timeline data (more reliable than header)
+ rounds_count = 0
+ max_tick = 0
+ tickrate = demo_info.tick_rate or 64
+
+ for timeline in timelines.values():
+ if timeline.rounds:
+ rounds_count = max(rounds_count, len(timeline.rounds))
+ # Find max tick from spawns/deaths to calculate duration
+ for spawn in timeline.spawns:
+ max_tick = max(max_tick, spawn.tick)
+ for death in timeline.deaths:
+ max_tick = max(max_tick, death.tick)
+ if timeline.tickrate > 0:
+ tickrate = timeline.tickrate
+
+ # Calculate duration from max tick
+ if max_tick > 0 and tickrate > 0:
+ duration = max_tick / tickrate
+ duration_str = f"{duration:.1f}s ({duration/60:.1f} min)"
+ elif demo_info.total_ticks > 0 and tickrate > 0:
+ duration = demo_info.total_ticks / tickrate
+ duration_str = f"{duration:.1f}s ({duration/60:.1f} min)"
+ else:
+ duration_str = "unknown"
+
+ print(f"Demo: {demo_info.path.name}")
+ print(f" Map: {demo_info.map_name}")
+ print(f" Tick rate: {tickrate}/s")
+ print(f" Duration: {duration_str}")
+ print(f" Rounds: {rounds_count}")
+
+ print(f"\nPlayers ({len(demo_info.players)}):")
+ for player in demo_info.players:
+ team_str = f"[{player.team}]" if player.team else "[--]"
+ timeline = timelines.get(player.steamid)
+
+ print(f" {player.name}")
+ print(f" SteamID: {player.steamid} Team: {team_str}")
+
+ if timeline:
+ death_count = len(timeline.deaths)
+ spawn_count = len(timeline.spawns)
+ dead_time = sum(p.duration_seconds for p in timeline.death_periods)
+ print(f" Deaths: {death_count}, Spawns: {spawn_count}, Dead time: {dead_time:.1f}s")
+
+ if verbose and timeline.death_periods:
+ print(f" Death periods:")
+ for i, period in enumerate(timeline.death_periods, 1):
+ weapon = period.death.weapon or "unknown"
+ hs = " (headshot)" if period.death.headshot else ""
+ print(f" {i:2}. {period.death.time_seconds:7.1f}s - {period.spawn.time_seconds:7.1f}s "
+ f"({period.duration_seconds:5.1f}s) [{weapon}{hs}]")
+
+
+def format_info_json(
+ demo_info: DemoInfo,
+ timelines: dict[int, DemoTimeline]
+) -> dict:
+ """Format demo info as JSON-serializable dict."""
+ # Get rounds and duration from timeline data (more reliable than header)
+ rounds_count = 0
+ max_tick = 0
+ tickrate = demo_info.tick_rate or 64
+
+ for timeline in timelines.values():
+ if timeline.rounds:
+ rounds_count = max(rounds_count, len(timeline.rounds))
+ for spawn in timeline.spawns:
+ max_tick = max(max_tick, spawn.tick)
+ for death in timeline.deaths:
+ max_tick = max(max_tick, death.tick)
+ if timeline.tickrate > 0:
+ tickrate = timeline.tickrate
+
+ # Calculate duration from max tick
+ if max_tick > 0 and tickrate > 0:
+ duration = max_tick / tickrate
+ elif demo_info.total_ticks > 0 and tickrate > 0:
+ duration = demo_info.total_ticks / tickrate
+ else:
+ duration = 0
+
+ players_data = []
+ for player in demo_info.players:
+ timeline = timelines.get(player.steamid)
+ player_data = {
+ "name": player.name,
+ "steamid": player.steamid,
+ "team": player.team,
+ }
+
+ if timeline:
+ player_data["deaths"] = len(timeline.deaths)
+ player_data["spawns"] = len(timeline.spawns)
+ player_data["dead_time_seconds"] = sum(p.duration_seconds for p in timeline.death_periods)
+ player_data["death_periods"] = [
+ {
+ "death_tick": p.death.tick,
+ "death_time": p.death.time_seconds,
+ "spawn_tick": p.spawn.tick,
+ "spawn_time": p.spawn.time_seconds,
+ "duration_seconds": p.duration_seconds,
+ "weapon": p.death.weapon,
+ "headshot": p.death.headshot,
+ }
+ for p in timeline.death_periods
+ ]
+
+ players_data.append(player_data)
+
+ return {
+ "demo": demo_info.path.name,
+ "map": demo_info.map_name,
+ "tick_rate": tickrate,
+ "duration_seconds": duration,
+ "rounds": rounds_count,
+ "players": players_data,
+ }
+
+
+# =============================================================================
+# Argument Parser
+# =============================================================================
+
+def create_parser() -> argparse.ArgumentParser:
+ """Create argument parser with subcommands."""
parser = argparse.ArgumentParser(
- description="Record a player's POV from a CS2 demo file",
+ prog="cs2pov",
+ description="Record player POV from CS2 demo files",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Examples:
- %(prog)s -d match.dem -p "PlayerName" -o recording.mp4
- %(prog)s -d match.dem -p "PlayerName" -o recording.mp4 --no-trim
- %(prog)s -d match.dem -p "PlayerName" -o recording.mp4 --skip-recording \\
- --console-log /path/to/console.log --player-slot 3 --recording-start-time 1706500000
+ cs2pov info demo.dem # Show demo information
+ cs2pov info demo.dem --json # Output as JSON
+ cs2pov pov -d demo.dem -p "Player" -o out.mp4
+ cs2pov record -d demo.dem -p "Player" -o raw.mp4
+ cs2pov trim raw.mp4 -d demo.dem -p "Player"
""",
)
-
- parser.add_argument("-d", "--demo", required=True, type=Path, help="Path to demo file (.dem)")
- parser.add_argument("-p", "--player", required=True, help="Player to record (name or SteamID)")
- parser.add_argument("-o", "--output", required=True, type=Path, help="Output video file path")
- parser.add_argument("-r", "--resolution", default="1920x1080", help="Recording resolution")
- parser.add_argument("-f", "--framerate", type=int, default=60, help="Recording framerate")
- parser.add_argument("--no-hud", action="store_true", help="Hide HUD elements")
- parser.add_argument("--display", type=int, default=0, help="X display number")
- parser.add_argument("--no-audio", action="store_true", help="Disable audio recording")
- parser.add_argument("--audio-device", help="PulseAudio device for audio capture (auto-detected by default)")
- parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
- parser.add_argument("--cs2-path", type=Path, help="Path to CS2 installation")
- parser.add_argument("--no-trim", action="store_true", help="Skip post-processing trim")
- parser.add_argument("--skip-recording", action="store_true", help="Skip recording, only post-process")
- parser.add_argument("--console-log", type=Path, help="Console log path (for --skip-recording)")
- parser.add_argument("--player-slot", type=int, help="Player slot (for --skip-recording)")
- parser.add_argument("--recording-start-time", type=float, help="Recording start time (for --skip-recording)")
parser.add_argument("--version", action="version", version=f"%(prog)s {__version__}")
- # Subcommand for listing players
- subparsers = parser.add_subparsers(dest="command")
- list_parser = subparsers.add_parser("list", help="List players in a demo")
- list_parser.add_argument("demo", type=Path, help="Path to demo file")
+ subparsers = parser.add_subparsers(dest="command", required=True, metavar="COMMAND")
+
+ # Shared argument groups
+ demo_args = argparse.ArgumentParser(add_help=False)
+ demo_args.add_argument("-d", "--demo", required=True, type=Path,
+ help="Demo file (.dem)")
+
+ player_args = argparse.ArgumentParser(add_help=False)
+ player_args.add_argument("-p", "--player", required=True,
+ help="Player name or SteamID")
+
+ output_args = argparse.ArgumentParser(add_help=False)
+ output_args.add_argument("-o", "--output", required=True, type=Path,
+ help="Output video file")
+
+ recording_args = argparse.ArgumentParser(add_help=False)
+ recording_args.add_argument("-r", "--resolution", default="1920x1080",
+ help="Recording resolution (default: 1920x1080)")
+ recording_args.add_argument("-f", "--framerate", type=int, default=60,
+ help="Recording framerate (default: 60)")
+ recording_args.add_argument("--display", type=int, default=0,
+ help="X display number (default: 0)")
+ recording_args.add_argument("--no-hud", action="store_true",
+ help="Hide HUD elements")
+ recording_args.add_argument("--no-audio", action="store_true",
+ help="Disable audio recording")
+ recording_args.add_argument("--audio-device",
+ help="PulseAudio device (auto-detected)")
+ recording_args.add_argument("--cs2-path", type=Path,
+ help="Custom CS2 installation path")
+
+ verbose_args = argparse.ArgumentParser(add_help=False)
+ verbose_args.add_argument("-v", "--verbose", action="store_true",
+ help="Verbose output")
+
+ # INFO command
+ info_parser = subparsers.add_parser(
+ "info",
+ parents=[verbose_args],
+ help="Show demo information and player timeline data",
+ description="Display demo metadata, player list, and death/spawn statistics.",
+ )
+ info_parser.add_argument("demo", type=Path, help="Demo file (.dem)")
+ info_parser.add_argument("--json", action="store_true",
+ help="Output as JSON")
+
+ # POV command (full pipeline)
+ pov_parser = subparsers.add_parser(
+ "pov",
+ parents=[demo_args, player_args, output_args, recording_args, verbose_args],
+ help="Record and trim player POV (full pipeline)",
+ description="Record a player's POV from a demo and trim death periods.",
+ )
+ pov_parser.add_argument("--no-trim", action="store_true",
+ help="Skip post-processing trim")
+
+ # RECORD command
+ subparsers.add_parser(
+ "record",
+ parents=[demo_args, player_args, output_args, recording_args, verbose_args],
+ help="Record player POV without trimming",
+ description="Record a player's POV from a demo without post-processing.",
+ )
- args = parser.parse_args()
+ # TRIM command
+ trim_parser = subparsers.add_parser(
+ "trim",
+ parents=[demo_args, player_args, verbose_args],
+ help="Trim death periods from recorded video",
+ description="Post-process an existing recording to remove death periods.",
+ )
+ trim_parser.add_argument("video", type=Path, help="Input video file")
+ trim_parser.add_argument("-o", "--output", type=Path,
+ help="Output file (default: adds _trimmed suffix)")
+ # Fallback options
+ trim_parser.add_argument("--console-log", type=Path,
+ help="Console.log file (fallback when demo unavailable)")
+ trim_parser.add_argument("--player-slot", type=int,
+ help="Player slot, 0-based (fallback)")
+ trim_parser.add_argument("--recording-start-time", type=float,
+ help="Recording start timestamp (fallback)")
+
+ return parser
+
+
+# =============================================================================
+# Command Handlers
+# =============================================================================
+
+def cmd_info(args) -> int:
+ """Handle 'info' command - show demo information."""
+ demo_path = args.demo.resolve()
+ if not demo_path.exists():
+ print(f"Error: Demo not found: {demo_path}", file=sys.stderr)
+ return 1
+
+ # Parse basic demo info
+ try:
+ demo_info = parse_demo(demo_path)
+ except CS2POVError as e:
+ print(f"Error: {e}", file=sys.stderr)
+ return 1
- # Handle list subcommand
- if args.command == "list":
+ # Preprocess for timeline data (per-player)
+ player_timelines: dict[int, DemoTimeline] = {}
+ for player in demo_info.players:
try:
- demo_info = parse_demo(args.demo)
- print(f"Demo: {args.demo.name}")
- print_demo_info(demo_info)
- return 0
- except CS2POVError as e:
- print(f"Error: {e}", file=sys.stderr)
- return 1
+ timeline = preprocess_demo(demo_path, player.steamid, player.name)
+ player_timelines[player.steamid] = timeline
+ except Exception:
+ pass
+
+ if args.json:
+ output = format_info_json(demo_info, player_timelines)
+ print(json.dumps(output, indent=2))
+ else:
+ print_demo_info_extended(demo_info, player_timelines, verbose=args.verbose)
+ return 0
+
+
+def cmd_pov(args) -> int:
+ """Handle 'pov' command - full recording pipeline."""
# Check dependencies
missing = check_dependencies()
if missing:
@@ -481,62 +725,33 @@ Examples:
# Validate inputs
demo_path = args.demo.resolve()
if not demo_path.exists():
- print(f"Error: Demo file not found: {demo_path}", file=sys.stderr)
+ print(f"Error: Demo not found: {demo_path}", file=sys.stderr)
return 1
- output_path = args.output.resolve()
-
try:
resolution = parse_resolution(args.resolution)
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
return 1
- # Handle --skip-recording mode
- if args.skip_recording:
- if not args.console_log:
- print("Error: --skip-recording requires --console-log", file=sys.stderr)
- return 1
- if args.player_slot is None:
- print("Error: --skip-recording requires --player-slot", file=sys.stderr)
- return 1
- if args.recording_start_time is None:
- print("Error: --skip-recording requires --recording-start-time", file=sys.stderr)
- return 1
- if not output_path.exists():
- print(f"Error: Video file not found: {output_path}", file=sys.stderr)
- return 1
-
- print("Post-processing only (--skip-recording mode)")
- postprocess_video(
- video_path=output_path,
- console_log_path=args.console_log.resolve(),
- player_slot=args.player_slot,
- recording_start_time=args.recording_start_time,
- verbose=args.verbose,
- )
- return 0
+ output_path = args.output.resolve()
- # Phase 1-3: Record (with built-in cleanup)
- try:
- result = record_demo(
- demo_path=demo_path,
- player_identifier=args.player,
- output_path=output_path,
- resolution=resolution,
- framerate=args.framerate,
- hide_hud=args.no_hud,
- display_num=args.display,
- verbose=args.verbose,
- cs2_path_override=args.cs2_path,
- enable_audio=not args.no_audio,
- audio_device=args.audio_device,
- )
- except CS2POVError as e:
- print(f"Error: {e}", file=sys.stderr)
- return 1
+ # Record
+ result = record_demo(
+ demo_path=demo_path,
+ player_identifier=args.player,
+ output_path=output_path,
+ resolution=resolution,
+ framerate=args.framerate,
+ hide_hud=args.no_hud,
+ display_num=args.display,
+ verbose=args.verbose,
+ cs2_path_override=args.cs2_path,
+ enable_audio=not args.no_audio,
+ audio_device=args.audio_device,
+ )
- # Phase 4: Post-process (completely separate)
+ # Post-process
if result.success and not args.no_trim:
postprocess_video(
video_path=result.video_path,
@@ -544,6 +759,7 @@ Examples:
player_slot=result.player_slot,
recording_start_time=result.recording_start_time,
verbose=args.verbose,
+ timeline=result.timeline,
)
elif result.success:
size_mb = result.video_path.stat().st_size / (1024 * 1024)
@@ -557,5 +773,153 @@ Examples:
return 0 if result.success else 1
+def cmd_record(args) -> int:
+ """Handle 'record' command - raw recording without trimming."""
+ # Check dependencies
+ missing = check_dependencies()
+ if missing:
+ print("Missing dependencies:", file=sys.stderr)
+ for dep in missing:
+ print(f" - {dep}", file=sys.stderr)
+ return 1
+
+ # Validate inputs
+ demo_path = args.demo.resolve()
+ if not demo_path.exists():
+ print(f"Error: Demo not found: {demo_path}", file=sys.stderr)
+ return 1
+
+ try:
+ resolution = parse_resolution(args.resolution)
+ except ValueError as e:
+ print(f"Error: {e}", file=sys.stderr)
+ return 1
+
+ output_path = args.output.resolve()
+
+ # Record only
+ result = record_demo(
+ demo_path=demo_path,
+ player_identifier=args.player,
+ output_path=output_path,
+ resolution=resolution,
+ framerate=args.framerate,
+ hide_hud=args.no_hud,
+ display_num=args.display,
+ verbose=args.verbose,
+ cs2_path_override=args.cs2_path,
+ enable_audio=not args.no_audio,
+ audio_device=args.audio_device,
+ )
+
+ if result.success:
+ size_mb = result.video_path.stat().st_size / (1024 * 1024)
+ print(f"\nRaw recording saved: {result.video_path} ({size_mb:.1f} MB)")
+ print(f"\nTo trim later, run:")
+ print(f" cs2pov trim \"{result.video_path}\" -d \"{demo_path}\" -p \"{args.player}\"")
+ else:
+ print(f"\nRecording ended with: {result.exit_reason}")
+ if result.video_path.exists():
+ size_mb = result.video_path.stat().st_size / (1024 * 1024)
+ print(f"Partial recording available: {result.video_path} ({size_mb:.1f} MB)")
+
+ return 0 if result.success else 1
+
+
+def cmd_trim(args) -> int:
+ """Handle 'trim' command - post-process existing video."""
+ video_path = args.video.resolve()
+ if not video_path.exists():
+ print(f"Error: Video not found: {video_path}", file=sys.stderr)
+ return 1
+
+ demo_path = args.demo.resolve()
+ if not demo_path.exists():
+ print(f"Error: Demo not found: {demo_path}", file=sys.stderr)
+ return 1
+
+ # Determine output path
+ if args.output:
+ output_path = args.output.resolve()
+ else:
+ output_path = video_path.parent / f"{video_path.stem}_trimmed{video_path.suffix}"
+
+ # Parse demo and find player
+ try:
+ demo_info = parse_demo(demo_path)
+ player = find_player(demo_info, args.player)
+ player_slot = get_player_index(demo_info, player)
+ except CS2POVError as e:
+ print(f"Error: {e}", file=sys.stderr)
+ return 1
+
+ # Get timeline from demo
+ timeline = None
+ try:
+ timeline = preprocess_demo(demo_path, player.steamid, player.name)
+ print(f"Using demo timeline: {len(timeline.deaths)} deaths, {len(timeline.spawns)} spawns")
+ except Exception as e:
+ print(f"Warning: Could not preprocess demo: {e}")
+ print("Falling back to console.log method")
+
+ # Validate fallback parameters if needed
+ if timeline is None:
+ if args.console_log is None:
+ print("Error: --console-log required when demo preprocessing fails", file=sys.stderr)
+ return 1
+ if args.player_slot is None:
+ print("Error: --player-slot required when demo preprocessing fails", file=sys.stderr)
+ return 1
+ if args.recording_start_time is None:
+ print("Error: --recording-start-time required when demo preprocessing fails", file=sys.stderr)
+ return 1
+ player_slot = args.player_slot
+
+ # Copy video to output path first (postprocess_video expects to rename)
+ if video_path != output_path:
+ shutil.copy2(video_path, output_path)
+
+ # Run trimming
+ postprocess_video(
+ video_path=output_path,
+ console_log_path=args.console_log.resolve() if args.console_log else Path("/dev/null"),
+ player_slot=player_slot,
+ recording_start_time=args.recording_start_time or 0.0,
+ verbose=args.verbose,
+ timeline=timeline,
+ )
+
+ return 0
+
+
+# =============================================================================
+# Entry Point
+# =============================================================================
+
+def main() -> int:
+ """Main entry point."""
+ parser = create_parser()
+ args = parser.parse_args()
+
+ try:
+ if args.command == "info":
+ return cmd_info(args)
+ elif args.command == "pov":
+ return cmd_pov(args)
+ elif args.command == "record":
+ return cmd_record(args)
+ elif args.command == "trim":
+ return cmd_trim(args)
+ else:
+ parser.print_help()
+ return 1
+ except CS2POVError as e:
+ print(f"Error: {e}", file=sys.stderr)
+ return 1
+ except KeyboardInterrupt:
+ print("\nInterrupted", file=sys.stderr)
+ return 130
+
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/cs2pov/config.py b/cs2pov/config.py
index dc1c933..04eab45 100644
--- a/cs2pov/config.py
+++ b/cs2pov/config.py
@@ -12,6 +12,7 @@ class RecordingConfig:
player_index: int
player_name: str = ""
player_steamid: int = 0
+ player_slot: int = 0 # user_id + 1, used for spec_player command
resolution: tuple[int, int] = (1920, 1080)
hide_hud: bool = True
spec_mode: int = 4 # 4 = first-person, 5 = third-person, 6 = free roam
@@ -45,8 +46,8 @@ spec_mode {spec_mode}
// Auto-quit when demo ends
demo_quitafterplayback 1
-// Create alias to lock to player by name (more reliable than index)
-alias "lock_pov" "spec_player \\"{player_name}\\""
+// Create alias to lock to player by slot number (user_id + 1)
+alias "lock_pov" "spec_player {player_slot}"
// Bind F5 to lock to player (automation will press this)
bind "F5" "lock_pov"
@@ -58,7 +59,7 @@ bind "MOUSE4" "lock_pov"
playdemo "replays/{demo_name}"
// These run before demo loads but we try anyway
-spec_player "{player_name}"
+spec_player {player_slot}
spec_mode {spec_mode}
// Tell user what's happening
@@ -96,6 +97,7 @@ def generate_recording_cfg(config: RecordingConfig, output_path: Path) -> Path:
hud_commands=hud_commands,
spec_mode=config.spec_mode,
player_name=config.player_name,
+ player_slot=config.player_slot,
demo_name=config.demo_name,
)
diff --git a/cs2pov/parser.py b/cs2pov/parser.py
index 451ece2..d9a89ca 100644
--- a/cs2pov/parser.py
+++ b/cs2pov/parser.py
@@ -17,6 +17,7 @@ class PlayerInfo:
steamid: int
name: str
team: Optional[str] = None
+ user_id: Optional[int] = None # Player slot from demo (use user_id + 1 for spec_player)
@dataclass
@@ -67,7 +68,7 @@ def parse_demo(demo_path: Path) -> DemoInfo:
# Get unique players from tick data
# parse_ticks returns a DataFrame with steamid and name columns
- tick_df = parser.parse_ticks(["team_num"])
+ tick_df = parser.parse_ticks(["team_num", "user_id"])
players = _extract_players(tick_df)
# Get round events
@@ -109,7 +110,10 @@ def _extract_players(tick_df) -> list[PlayerInfo]:
team = "T"
elif team_num == 3:
team = "CT"
- players.append(PlayerInfo(steamid=steamid, name=name, team=team))
+ user_id = None
+ if "user_id" in row:
+ user_id = int(row["user_id"])
+ players.append(PlayerInfo(steamid=steamid, name=name, team=team, user_id=user_id))
return players
diff --git a/cs2pov/preprocessor.py b/cs2pov/preprocessor.py
new file mode 100644
index 0000000..0bd9325
--- /dev/null
+++ b/cs2pov/preprocessor.py
@@ -0,0 +1,411 @@
+"""Demo preprocessor - extract timeline data using demoparser2.
+
+Uses demoparser2 to extract death/spawn events and round boundaries
+directly from the demo file, providing tick-level precision without
+depending on console.log parsing.
+"""
+
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import Optional
+
+from demoparser2 import DemoParser
+
+from .exceptions import DemoNotFoundError, DemoParseError
+
+
+@dataclass
+class DeathEvent:
+ """A player death event from the demo."""
+
+ tick: int
+ time_seconds: float
+ attacker_steamid: Optional[int] = None
+ weapon: Optional[str] = None
+ headshot: bool = False
+
+
+@dataclass
+class SpawnEvent:
+ """A player spawn event from the demo."""
+
+ tick: int
+ time_seconds: float
+
+
+@dataclass
+class DeathPeriod:
+ """A period when the player was dead (between death and respawn)."""
+
+ death: DeathEvent
+ spawn: SpawnEvent
+
+ @property
+ def duration_ticks(self) -> int:
+ return self.spawn.tick - self.death.tick
+
+ @property
+ def duration_seconds(self) -> float:
+ return self.spawn.time_seconds - self.death.time_seconds
+
+
+@dataclass
+class RoundBoundary:
+ """Boundaries for a round in the demo."""
+
+ round_num: int
+ prestart_tick: int
+ prestart_time: float
+ freeze_end_tick: Optional[int] = None
+ freeze_end_time: Optional[float] = None
+ end_tick: Optional[int] = None
+ end_time: Optional[float] = None
+
+
+@dataclass
+class DemoTimeline:
+ """Complete timeline data for a player in a demo."""
+
+ player_steamid: int
+ player_name: str
+ tickrate: float
+ total_ticks: int
+ total_duration: float
+
+ deaths: list[DeathEvent] = field(default_factory=list)
+ spawns: list[SpawnEvent] = field(default_factory=list)
+ death_periods: list[DeathPeriod] = field(default_factory=list)
+ rounds: list[RoundBoundary] = field(default_factory=list)
+
+
+def preprocess_demo(demo_path: Path, player_steamid: int, player_name: str = "") -> DemoTimeline:
+ """Pre-process demo to extract timeline data for a player.
+
+ Uses demoparser2 to extract:
+ - player_death events for the target player
+ - player_spawn events for the target player
+ - Round boundaries (prestart, freeze_end, end)
+
+ Args:
+ demo_path: Path to the demo file
+ player_steamid: SteamID64 of the player to track
+ player_name: Player name (for metadata)
+
+ Returns:
+ DemoTimeline with all extracted data
+
+ Raises:
+ DemoNotFoundError: If demo file doesn't exist
+ DemoParseError: If parsing fails
+ """
+ if not demo_path.exists():
+ raise DemoNotFoundError(f"Demo file not found: {demo_path}")
+
+ try:
+ parser = DemoParser(str(demo_path))
+
+ # Get header for tickrate and duration
+ header = parser.parse_header()
+ tickrate = header.get("playback_ticks_per_second", 64)
+ total_ticks = header.get("playback_ticks", 0)
+ total_duration = total_ticks / tickrate if tickrate > 0 else 0
+
+ # Extract death events for the player
+ deaths = _extract_deaths(parser, player_steamid, tickrate)
+
+ # Extract spawn events for the player
+ spawns = _extract_spawns(parser, player_steamid, tickrate)
+
+ # Compute death periods (match deaths with subsequent spawns)
+ death_periods = compute_death_periods(deaths, spawns)
+
+ # Extract round boundaries
+ rounds = _extract_round_boundaries(parser, tickrate)
+
+ return DemoTimeline(
+ player_steamid=player_steamid,
+ player_name=player_name,
+ tickrate=tickrate,
+ total_ticks=total_ticks,
+ total_duration=total_duration,
+ deaths=deaths,
+ spawns=spawns,
+ death_periods=death_periods,
+ rounds=rounds,
+ )
+
+ except Exception as e:
+ if isinstance(e, (DemoNotFoundError, DemoParseError)):
+ raise
+ raise DemoParseError(f"Failed to preprocess demo: {e}") from e
+
+
+def _extract_deaths(parser: DemoParser, player_steamid: int, tickrate: float) -> list[DeathEvent]:
+ """Extract death events for a specific player."""
+ deaths = []
+
+ try:
+ events_df = parser.parse_event("player_death")
+ if events_df is None or len(events_df) == 0:
+ return deaths
+
+ # Filter for deaths of the target player
+ # user_steamid is the player who died
+ # Note: event steamids are strings, tick data steamids are uint64
+ steamid_str = str(player_steamid)
+ player_deaths = events_df[events_df["user_steamid"] == steamid_str]
+
+ for _, row in player_deaths.iterrows():
+ tick = int(row["tick"])
+ time_seconds = tick / tickrate if tickrate > 0 else 0
+
+ # Extract optional fields
+ attacker_steamid = row.get("attacker_steamid")
+ if attacker_steamid is not None and attacker_steamid != 0:
+ attacker_steamid = int(attacker_steamid)
+ else:
+ attacker_steamid = None
+
+ weapon = row.get("weapon")
+ if weapon is not None:
+ weapon = str(weapon)
+
+ headshot = bool(row.get("headshot", False))
+
+ deaths.append(DeathEvent(
+ tick=tick,
+ time_seconds=time_seconds,
+ attacker_steamid=attacker_steamid,
+ weapon=weapon,
+ headshot=headshot,
+ ))
+
+ except Exception:
+ # If event parsing fails, return empty list
+ pass
+
+ # Sort by tick to ensure chronological order
+ deaths.sort(key=lambda d: d.tick)
+ return deaths
+
+
+def _extract_spawns(parser: DemoParser, player_steamid: int, tickrate: float) -> list[SpawnEvent]:
+ """Extract spawn events for a specific player."""
+ spawns = []
+
+ try:
+ events_df = parser.parse_event("player_spawn")
+ if events_df is None or len(events_df) == 0:
+ return spawns
+
+ # Filter for spawns of the target player
+ # Note: event steamids are strings, tick data steamids are uint64
+ steamid_str = str(player_steamid)
+ player_spawns = events_df[events_df["user_steamid"] == steamid_str]
+
+ for _, row in player_spawns.iterrows():
+ tick = int(row["tick"])
+ time_seconds = tick / tickrate if tickrate > 0 else 0
+
+ spawns.append(SpawnEvent(
+ tick=tick,
+ time_seconds=time_seconds,
+ ))
+
+ except Exception:
+ # If event parsing fails, return empty list
+ pass
+
+ # Sort by tick to ensure chronological order
+ spawns.sort(key=lambda s: s.tick)
+ return spawns
+
+
+def _extract_round_boundaries(parser: DemoParser, tickrate: float) -> list[RoundBoundary]:
+ """Extract round boundary events."""
+ rounds = []
+
+ try:
+ # Parse each event type individually (parse_event only takes one event name)
+ prestart_df = parser.parse_event("round_prestart")
+ freeze_end_df = parser.parse_event("round_freeze_end")
+ round_end_df = parser.parse_event("round_officially_ended")
+
+ # Convert to lists of (tick, time) tuples
+ prestart_list = []
+ if prestart_df is not None and len(prestart_df) > 0:
+ prestart_df = prestart_df.sort_values("tick")
+ prestart_list = [(int(row["tick"]), int(row["tick"]) / tickrate) for _, row in prestart_df.iterrows()]
+
+ freeze_end_list = []
+ if freeze_end_df is not None and len(freeze_end_df) > 0:
+ freeze_end_df = freeze_end_df.sort_values("tick")
+ freeze_end_list = [(int(row["tick"]), int(row["tick"]) / tickrate) for _, row in freeze_end_df.iterrows()]
+
+ round_end_list = []
+ if round_end_df is not None and len(round_end_df) > 0:
+ round_end_df = round_end_df.sort_values("tick")
+ round_end_list = [(int(row["tick"]), int(row["tick"]) / tickrate) for _, row in round_end_df.iterrows()]
+
+ if not prestart_list:
+ return rounds
+
+ # Match events to form round boundaries
+ for round_num, (prestart_tick, prestart_time) in enumerate(prestart_list, start=1):
+ round_boundary = RoundBoundary(
+ round_num=round_num,
+ prestart_tick=prestart_tick,
+ prestart_time=prestart_time,
+ )
+
+ # Find freeze_end after this prestart
+ for freeze_tick, freeze_time in freeze_end_list:
+ if freeze_tick > prestart_tick:
+ round_boundary.freeze_end_tick = freeze_tick
+ round_boundary.freeze_end_time = freeze_time
+ break
+
+ # Find round_end after this prestart (but before next prestart)
+ next_prestart_tick = float("inf")
+ if round_num < len(prestart_list):
+ next_prestart_tick = prestart_list[round_num][0]
+
+ for end_tick, end_time in round_end_list:
+ if prestart_tick < end_tick < next_prestart_tick:
+ round_boundary.end_tick = end_tick
+ round_boundary.end_time = end_time
+ break
+
+ rounds.append(round_boundary)
+
+ except Exception:
+ # If event parsing fails, return empty list
+ pass
+
+ return rounds
+
+
+def compute_death_periods(deaths: list[DeathEvent], spawns: list[SpawnEvent]) -> list[DeathPeriod]:
+ """Match each death with its subsequent spawn to form death periods.
+
+ For each death, finds the next spawn that occurs after it.
+ Deaths without a subsequent spawn (e.g., last death of match) are ignored.
+
+ Args:
+ deaths: List of death events, sorted by tick
+ spawns: List of spawn events, sorted by tick
+
+ Returns:
+ List of DeathPeriod objects
+ """
+ if not deaths or not spawns:
+ return []
+
+ death_periods = []
+ spawn_idx = 0
+
+ for death in deaths:
+ # Find next spawn after this death
+ while spawn_idx < len(spawns) and spawns[spawn_idx].tick <= death.tick:
+ spawn_idx += 1
+
+ if spawn_idx < len(spawns):
+ spawn = spawns[spawn_idx]
+ death_periods.append(DeathPeriod(death=death, spawn=spawn))
+ spawn_idx += 1
+
+ return death_periods
+
+
+def get_trim_periods(timeline: DemoTimeline) -> list[tuple[float, float]]:
+ """Get periods to trim from video based on timeline data.
+
+ Uses first player_spawn as t=0 reference point. This handles the
+ variable CS2 startup delay by anchoring to when the player first
+ appears in the game.
+
+ Returns (start, end) tuples for periods to remove:
+ - Initial period: 0 to first_spawn_time (startup + freeze time)
+ - Death periods: death_time to respawn_time for each death
+
+ The returned times are relative to video start (not demo start).
+ When recording, the first spawn in the demo corresponds to some
+ point in the video. By returning demo-relative times and letting
+ the caller handle the offset, we keep this function pure.
+
+ Args:
+ timeline: DemoTimeline with death/spawn data
+
+ Returns:
+ List of (start_seconds, end_seconds) tuples to trim
+ """
+ if not timeline.spawns:
+ return []
+
+ trim_periods = []
+
+ # First spawn is our reference point
+ first_spawn = timeline.spawns[0]
+ first_spawn_time = first_spawn.time_seconds
+
+ # Trim from demo start to first spawn (startup, freeze time, etc.)
+ # Only add if > 0.5s to avoid tiny trims
+ if first_spawn_time > 0.5:
+ trim_periods.append((0.0, first_spawn_time))
+
+ # Add death periods (already computed relative to demo start)
+ for period in timeline.death_periods:
+ # Only include if death is after first spawn
+ if period.death.tick > first_spawn.tick:
+ trim_periods.append((
+ period.death.time_seconds,
+ period.spawn.time_seconds,
+ ))
+
+ return trim_periods
+
+
+def get_trim_periods_for_video(
+ timeline: DemoTimeline,
+ video_start_offset: float = 0.0,
+) -> list[tuple[float, float]]:
+ """Get trim periods adjusted for video timing.
+
+ When recording a demo, there's a delay between video start and
+ demo playback start. This function adjusts the timeline-based
+ trim periods to account for that offset.
+
+ Args:
+ timeline: DemoTimeline with death/spawn data
+ video_start_offset: Seconds from video start to first spawn
+ in video. If not provided, assumes first spawn is at
+ video_start_offset=0 (i.e., video started at first spawn).
+
+ Returns:
+ List of (start_seconds, end_seconds) tuples for video trimming
+ """
+ demo_periods = get_trim_periods(timeline)
+ if not demo_periods:
+ return []
+
+ # Get first spawn time in demo (our reference point)
+ if not timeline.spawns:
+ return []
+
+ first_spawn_demo_time = timeline.spawns[0].time_seconds
+
+ # Adjust periods: demo time -> video time
+ # If first spawn is at video_start_offset in the video,
+ # then demo_time 0 is at (video_start_offset - first_spawn_demo_time)
+ video_periods = []
+ for start, end in demo_periods:
+ # Convert demo-relative to video-relative
+ video_start = start - first_spawn_demo_time + video_start_offset
+ video_end = end - first_spawn_demo_time + video_start_offset
+
+ # Only include positive time ranges
+ if video_end > 0:
+ video_start = max(0.0, video_start)
+ video_periods.append((video_start, video_end))
+
+ return video_periods
diff --git a/cs2pov/trim.py b/cs2pov/trim.py
index 6bfb97b..82e33d5 100644
--- a/cs2pov/trim.py
+++ b/cs2pov/trim.py
@@ -6,10 +6,13 @@ import tempfile
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
-from typing import Optional
+from typing import Optional, TYPE_CHECKING
from .exceptions import CaptureError
+if TYPE_CHECKING:
+ from .preprocessor import DemoTimeline
+
@dataclass
class DeathPeriod:
@@ -22,6 +25,17 @@ class DeathPeriod:
return self.respawn_time - self.death_time
+@dataclass
+class TrimPeriod:
+ """A period to trim from the video."""
+ start_time: float # Video timestamp in seconds
+ end_time: float # Video timestamp in seconds
+
+ @property
+ def duration(self) -> float:
+ return self.end_time - self.start_time
+
+
def parse_log_timestamp(timestamp_str: str, reference_date: Optional[datetime] = None) -> datetime:
"""Parse a console.log timestamp in MM/DD HH:mm:ss format.
@@ -340,3 +354,74 @@ def trim_death_periods(
print(f" [Trim] Output video: {output_duration:.2f}s")
return True
+
+
+def get_trim_periods_from_timeline(
+ timeline: "DemoTimeline",
+ first_spawn_video_time: float,
+ verbose: bool = False,
+) -> list[TrimPeriod]:
+ """Get trim periods from preprocessed timeline data.
+
+ Uses the timeline's death/spawn events to compute trim periods.
+ The first_spawn_video_time parameter tells us when the first spawn
+ occurred in the video, allowing us to align demo times with video times.
+
+ Args:
+ timeline: Preprocessed DemoTimeline with death/spawn events
+ first_spawn_video_time: Video timestamp when first spawn occurred
+ verbose: Print debug output
+
+ Returns:
+ List of TrimPeriod objects with video-relative timestamps
+ """
+ from .preprocessor import get_trim_periods_for_video
+
+ trim_periods = []
+
+ # Get demo-relative trim periods and convert to video-relative
+ video_periods = get_trim_periods_for_video(timeline, first_spawn_video_time)
+
+ for start, end in video_periods:
+ trim_periods.append(TrimPeriod(start_time=start, end_time=end))
+ if verbose:
+ print(f" [Trim] Period: {start:.2f}s - {end:.2f}s ({end - start:.2f}s)")
+
+ if verbose:
+ print(f" [Trim] Found {len(trim_periods)} periods to trim from timeline")
+
+ return trim_periods
+
+
+def trim_video_with_periods(
+ input_path: Path,
+ output_path: Path,
+ trim_periods: list[TrimPeriod],
+ verbose: bool = False,
+) -> bool:
+ """Trim specified periods from video using FFmpeg concat demuxer.
+
+ This is a generalized version that takes TrimPeriod objects directly.
+ Creates segments for periods to keep and concatenates them.
+
+ Args:
+ input_path: Path to input video
+ output_path: Path for output video
+ trim_periods: List of periods to remove from video
+ verbose: Print debug output
+
+ Returns:
+ True if trimming was performed, False if no trimming needed
+ """
+ if not trim_periods:
+ if verbose:
+ print(" [Trim] No periods to trim")
+ return False
+
+ # Convert TrimPeriod to DeathPeriod for compatibility with existing logic
+ death_periods = [
+ DeathPeriod(death_time=p.start_time, respawn_time=p.end_time)
+ for p in trim_periods
+ ]
+
+ return trim_death_periods(input_path, output_path, death_periods, verbose)