diff options
| author | Schark <jordan@schark.online> | 2026-01-30 13:44:50 -0500 |
|---|---|---|
| committer | Schark <jordan@schark.online> | 2026-01-30 13:44:50 -0500 |
| commit | 2e54feadfe8b333794ea8286262c0c038a2818be (patch) | |
| tree | 1867ab8d178b535ac17dd1ef8316ddf75b64ad14 | |
| parent | 6dace24508479a1915cb02b2eb6f6594ced84219 (diff) | |
| download | cs2pov-2e54feadfe8b333794ea8286262c0c038a2818be.tar.gz cs2pov-2e54feadfe8b333794ea8286262c0c038a2818be.zip | |
New CLI system
| -rw-r--r-- | .gitignore | 1 | ||||
| -rw-r--r-- | README.md | 92 | ||||
| -rw-r--r-- | cs2pov/automation.py | 50 | ||||
| -rw-r--r-- | cs2pov/cli.py | 712 | ||||
| -rw-r--r-- | cs2pov/config.py | 8 | ||||
| -rw-r--r-- | cs2pov/parser.py | 8 | ||||
| -rw-r--r-- | cs2pov/preprocessor.py | 411 | ||||
| -rw-r--r-- | cs2pov/trim.py | 87 |
8 files changed, 1160 insertions, 209 deletions
@@ -4,3 +4,4 @@ *egg-info* *__pycache__* .venv/ +test_demos/ @@ -13,7 +13,7 @@ Counter-Strike 2 POV recording automation. Largely AI-assisted and personalized - **GPU** with Vulkan support (NVIDIA or AMD) ```bash -##Install Python dependencies: +## Install Python dependencies: pip install -e . ## Gentoo (based) @@ -26,41 +26,74 @@ apt install ffmpeg xdotool pulseaudio-utils ## Quick Start ```bash -# List players in a demo -cs2pov list /path/to/demo.dem +# Show demo information (players, deaths, spawns) +cs2pov info /path/to/demo.dem -# Record a player's POV -cs2pov -d /path/to/demo.dem -p "PlayerName" -o recording.mp4 +# Record a player's POV (full pipeline: record + trim) +cs2pov pov -d /path/to/demo.dem -p "PlayerName" -o recording.mp4 + +# Raw recording only (no trimming) +cs2pov record -d /path/to/demo.dem -p "PlayerName" -o raw.mp4 + +# Trim an existing recording +cs2pov trim raw.mp4 -d /path/to/demo.dem -p "PlayerName" ``` -## Usage +## Commands + +### `cs2pov info` - Show Demo Information + +Display demo metadata, player list, and death/spawn statistics for each player. + +```bash +cs2pov info demo.dem # Human-readable output +cs2pov info demo.dem --json # JSON output +cs2pov info demo.dem -v # Verbose (includes death period details) +``` + +### `cs2pov pov` - Full Recording Pipeline + +Record a player's POV and automatically trim death periods. +```bash +cs2pov pov -d demo.dem -p "PlayerName" -o recording.mp4 +cs2pov pov -d demo.dem -p "PlayerName" -o recording.mp4 --no-trim # Skip trimming ``` -cs2pov [-h] --demo DEMO --player PLAYER --output OUTPUT [options] + +### `cs2pov record` - Raw Recording Only + +Record without post-processing. Useful for batch recording then trimming later. + +```bash +cs2pov record -d demo.dem -p "PlayerName" -o raw.mp4 ``` -### Required Arguments +### `cs2pov trim` - Trim Existing Recording + +Remove death periods from a previously recorded video. + +```bash +cs2pov trim raw.mp4 -d demo.dem -p "PlayerName" +cs2pov trim raw.mp4 -d demo.dem -p "PlayerName" -o trimmed.mp4 +``` -| Argument | Short | Description | -|----------|-------|-------------| -| `--demo` | `-d` | Path to demo file (.dem) | -| `--player` | `-p` | Player to record (name or SteamID) | -| `--output` | `-o` | Output video file path | +## Common Options -### Optional Arguments +### Recording Options (pov, record) | Argument | Short | Default | Description | |----------|-------|---------|-------------| +| `--demo` | `-d` | required | Path to demo file (.dem) | +| `--player` | `-p` | required | Player to record (name or SteamID) | +| `--output` | `-o` | required | Output video file path | | `--resolution` | `-r` | 1920x1080 | Recording resolution | | `--framerate` | `-f` | 60 | Recording framerate | -| `--no-hud` | | off | Hide HUD elements (except killfeed) | +| `--no-hud` | | off | Hide HUD elements | | `--no-audio` | | off | Disable audio recording | | `--audio-device` | | auto | PulseAudio device for audio capture | -| `--no-trim` | | off | Skip post-processing, keep full recording | -| `--display` | | 0 | X display number (0 = real display) | -| `--cs2-path` | | auto | Path to CS2 installation (can also be CS2_PATH envvar) | +| `--display` | | 0 | X display number | +| `--cs2-path` | | auto | Path to CS2 installation | | `--verbose` | `-v` | off | Verbose output | -| `--version` | | | Show version | ### Player Identification @@ -74,16 +107,17 @@ The `--player` argument accepts multiple formats: ## How It Works 1. **Parse demo** - Extract player list and metadata using demoparser2 -2. **Generate config** - Create CS2 CFG file with spectator settings -3. **Copy demo** - Place demo in CS2's replays directory -4. **Launch CS2** - Start CS2 via Steam with the generated config -5. **Wait for map load** - Monitor console.log for map load completion -6. **Hide demo UI** - Send Shift+F2 to hide playback controls -7. **Start capture** - Launch FFmpeg to record display + audio (PulseAudio) -8. **Recording loop** - Send F5 periodically to keep spectator locked on target player -9. **Wait for demo end** - Monitor console.log for demo completion -10. **Finalize** - Stop capture and terminate CS2 -11. **Post-process** - Trim start (until POV selected) and death periods from video +2. **Preprocess timeline** - Extract death/spawn events for accurate trimming +3. **Generate config** - Create CS2 CFG file with spectator settings +4. **Copy demo** - Place demo in CS2's replays directory +5. **Launch CS2** - Start CS2 via Steam with the generated config +6. **Wait for first spawn** - Monitor console.log for player spawn +7. **Hide demo UI** - Send Shift+F2 to hide playback controls +8. **Start capture** - Launch FFmpeg to record display + audio (PulseAudio) +9. **Recording loop** - Send F5 periodically to keep spectator locked on target player +10. **Wait for demo end** - Monitor console.log for demo completion +11. **Finalize** - Stop capture and terminate CS2 +12. **Post-process** - Trim start and death periods from video using timeline data ## Noteworthy Issues/Workarounds diff --git a/cs2pov/automation.py b/cs2pov/automation.py index 843b2ee..317ecad 100644 --- a/cs2pov/automation.py +++ b/cs2pov/automation.py @@ -145,6 +145,56 @@ def wait_for_map_load(log_path: Path, timeout: float = 120, poll_interval: float return False +def wait_for_first_spawn( + log_path: Path, + player_name: str, + timeout: float = 180, + poll_interval: float = 0.5 +) -> bool: + """Wait for first spawn of the target player by watching console.log. + + Looks for: <player> spawned + + This indicates the player has spawned and the POV camera is ready. + + Args: + log_path: Path to CS2 console.log + player_name: Name of the player to watch for + timeout: Maximum time to wait in seconds + poll_interval: Time between checks + + Returns: + True if first spawn detected, False if timeout + """ + import time + + spawn_pattern = re.compile( + rf"{re.escape(player_name)} spawned\b" + ) + start = time.time() + last_position = 0 + + while time.time() - start < timeout: + if not log_path.exists(): + time.sleep(poll_interval) + continue + + try: + with open(log_path, 'r', errors='ignore') as f: + f.seek(last_position) + content = f.read() + last_position = f.tell() + + if spawn_pattern.search(content): + return True + except Exception: + pass + + time.sleep(poll_interval) + + return False + + def wait_for_cs2_window(display: str = ":0", timeout: float = 120, poll_interval: float = 2.0) -> Optional[str]: """Wait for CS2 window to appear. diff --git a/cs2pov/cli.py b/cs2pov/cli.py index 1793be8..09eff51 100644 --- a/cs2pov/cli.py +++ b/cs2pov/cli.py @@ -1,16 +1,16 @@ #!/usr/bin/env python3 """CS2 POV Recorder - Record player POV from CS2 demo files. -Simplified architecture with linear flow: -1. Setup (parse demo, generate config, launch CS2) -2. Recording loop (single-threaded, blocking) -3. Cleanup (stop FFmpeg, kill CS2) -4. Post-processing (trim death periods) +Subcommand-based CLI: + pov - Full pipeline (record + trim) + info - Show demo information + record - Raw recording only + trim - Post-process existing video """ import argparse +import json import shutil -import subprocess import sys import time from dataclasses import dataclass @@ -18,15 +18,20 @@ from pathlib import Path from typing import Optional from . import __version__ -from .automation import check_xdotool, send_key, check_demo_ended, wait_for_cs2_window, wait_for_map_load +from .automation import send_key, check_demo_ended, wait_for_cs2_window, wait_for_first_spawn from .capture import FFmpegCapture, get_default_audio_monitor from .config import RecordingConfig, generate_recording_cfg from .exceptions import CS2POVError from .game import CS2Process, find_cs2_path, get_cfg_dir, get_demo_dir -from .parser import DemoInfo, find_player, get_player_index, parse_demo -from .trim import extract_death_periods, trim_death_periods +from .parser import DemoInfo, PlayerInfo, find_player, get_player_index, parse_demo +from .preprocessor import DemoTimeline, preprocess_demo, get_trim_periods +from .trim import extract_death_periods, TrimPeriod, trim_video_with_periods +# ============================================================================= +# Data Classes +# ============================================================================= + @dataclass class RecordingResult: """Result of recording phase with metadata for post-processing.""" @@ -36,7 +41,12 @@ class RecordingResult: recording_start_time: float player_slot: int exit_reason: str # "demo_ended", "timeout", "ffmpeg_stopped", "interrupted" + timeline: Optional[DemoTimeline] = None + +# ============================================================================= +# Shared Utilities +# ============================================================================= def check_dependencies() -> list[str]: """Check for required system dependencies.""" @@ -57,17 +67,9 @@ def parse_resolution(resolution_str: str) -> tuple[int, int]: raise ValueError(f"Invalid resolution format: {resolution_str}. Use WxH (e.g., 1920x1080)") -def print_demo_info(demo_info: DemoInfo): - """Print parsed demo information.""" - print(f" Map: {demo_info.map_name}") - print(f" Ticks: {demo_info.total_ticks}") - print(f" Tick rate: {demo_info.tick_rate}") - print(f" Players: {len(demo_info.players)}") - for p in demo_info.players: - team_str = f" [{p.team}]" if p.team else "" - print(f" - {p.name}{team_str} ({p.steamid})") - print(f" Rounds: {len(demo_info.rounds)}") - +# ============================================================================= +# Recording Functions +# ============================================================================= def recording_loop( display: str, @@ -79,14 +81,6 @@ def recording_loop( ) -> str: """Main recording loop - single-threaded, blocking. - Args: - display: X display string - console_log_path: Path to CS2 console.log - cs2_process: CS2 process manager - ffmpeg: FFmpeg capture instance - timeout: Maximum recording time in seconds - verbose: Print verbose output - Returns: Exit reason: "demo_ended", "cs2_exited", "timeout", "ffmpeg_stopped" """ @@ -102,22 +96,18 @@ def recording_loop( while True: elapsed = time.time() - start_time - # Check timeout if elapsed > timeout: print(f" Timeout reached ({timeout/60:.1f} min)") return "timeout" - # Check if CS2 still running if not cs2_process.is_running(): print(" CS2 exited") return "cs2_exited" - # Check if FFmpeg still running if not ffmpeg.is_running(): print(" FFmpeg stopped unexpectedly") return "ffmpeg_stopped" - # Check for demo end in console.log demo_ended, log_position = check_demo_ended(console_log_path, log_position) if demo_ended: print(" Demo end detected in console.log") @@ -135,9 +125,10 @@ def recording_loop( if window_id: send_key("F5", display, window_id) + if verbose: + print(f" Sending F5 (spec_lock)") last_spec_lock = elapsed - # Print status every 60 seconds if elapsed - last_status_time >= 60: print(f" Still recording... ({elapsed/60:.1f} min)") last_status_time = elapsed @@ -158,11 +149,7 @@ def record_demo( enable_audio: bool = True, audio_device: str | None = None, ) -> RecordingResult: - """Record a player's POV from a demo file. - - This is a blocking function that handles the entire recording process. - Cleanup is handled in finally block - always runs. - """ + """Record a player's POV from a demo file.""" # Find CS2 installation print("Finding CS2 installation...") cs2_path = find_cs2_path(cs2_path_override) @@ -171,16 +158,29 @@ def record_demo( # Parse demo print(f"Parsing demo: {demo_path.name}") demo_info = parse_demo(demo_path) - if verbose: - print_demo_info(demo_info) - else: - print(f" Map: {demo_info.map_name}, {len(demo_info.players)} players") - print(f" Ticks: {demo_info.total_ticks}, Rate: {demo_info.tick_rate}/s") + print(f" Map: {demo_info.map_name}, {len(demo_info.players)} players") + print(f" Ticks: {demo_info.total_ticks}, Rate: {demo_info.tick_rate}/s") # Find target player player = find_player(demo_info, player_identifier) player_index = get_player_index(demo_info, player) - print(f"Recording POV: {player.name} (SteamID: {player.steamid}, index: {player_index})") + # player_slot is user_id + 1 for spec_player command + player_slot = (player.user_id + 1) if player.user_id is not None else (player_index + 1) + print(f"Recording POV: {player.name} (SteamID: {player.steamid}, slot: {player_slot})") + + # Preprocess demo for timeline data + print("Preprocessing demo for timeline data...") + try: + timeline = preprocess_demo(demo_path, player.steamid, player.name) + print(f" Found {len(timeline.deaths)} deaths, {len(timeline.spawns)} spawns, " + f"{len(timeline.rounds)} rounds") + if verbose and timeline.death_periods: + total_dead = sum(p.duration_seconds for p in timeline.death_periods) + print(f" Total dead time: {total_dead:.1f}s across {len(timeline.death_periods)} death periods") + except Exception as e: + print(f" Warning: Preprocessing failed: {e}") + print(f" Will fall back to console.log parsing for trim") + timeline = None # Prepare directories demo_dir = get_demo_dir(cs2_path) @@ -204,6 +204,7 @@ def record_demo( player_index=player_index, player_name=player.name, player_steamid=player.steamid, + player_slot=player_slot, resolution=resolution, hide_hud=hide_hud, ) @@ -223,20 +224,19 @@ def record_demo( estimated_duration = demo_info.total_ticks / demo_info.tick_rate else: estimated_duration = 3600 - timeout = estimated_duration + 600 # Add 10 min buffer + timeout = estimated_duration + 600 print(f"Starting recording (timeout: {timeout/60:.1f} min)...") print(f" Display: {display_str}") print(f" Output: {output_path}") - # Initialize objects cs2_process: Optional[CS2Process] = None ffmpeg: Optional[FFmpegCapture] = None recording_start_time = 0.0 exit_reason = "unknown" try: - # Delete old console.log to ensure fresh log for this recording + # Delete old console.log if console_log_path.exists(): console_log_path.unlink() if verbose: @@ -247,7 +247,7 @@ def record_demo( cs2_process.launch("cs2pov_recording.cfg") print(" CS2 launching via Steam...") - # Wait for CS2 window to appear + # Wait for CS2 window print(" Waiting for CS2 window...") window_id = wait_for_cs2_window(display_str, timeout=120) if window_id: @@ -255,19 +255,20 @@ def record_demo( else: print(" Warning: CS2 window not detected, continuing anyway") - # Wait for map to load, then hide demo UI with Shift+F2 - print(" Waiting for map to load...") - if wait_for_map_load(console_log_path, timeout=120): + # Wait for first player spawn + print(f" Waiting for first spawn ({player.name})...") + if wait_for_first_spawn(console_log_path, player.name, timeout=180): + print(" First spawn detected") if verbose: - print(" Map loaded, waiting 10s before hiding UI...") - time.sleep(10) + print(" Waiting 15s for game to stabilize...") + time.sleep(15) if window_id and send_key("shift+F2", display_str, window_id): if verbose: print(" Sent Shift+F2 to hide demo UI") elif window_id: print(" Warning: Failed to send Shift+F2 to hide demo UI") else: - print(" Warning: Map load not detected, continuing anyway") + print(" Warning: First spawn not detected, continuing anyway") # Determine audio source audio_source = None @@ -279,7 +280,7 @@ def record_demo( else: print(" Warning: Could not detect audio device, recording video only") - # Start FFmpeg capture (full display + audio) + # Start FFmpeg capture ffmpeg = FFmpegCapture( display=display_str, output_path=output_path, @@ -294,10 +295,8 @@ def record_demo( else: print(f" FFmpeg capture started (video only)") - # Record start time for death period extraction recording_start_time = time.time() - # Run main recording loop (blocking) exit_reason = recording_loop( display=display_str, console_log_path=console_log_path, @@ -312,10 +311,8 @@ def record_demo( exit_reason = "interrupted" finally: - # Cleanup - always runs print("\nCleaning up...") - # Stop FFmpeg first (so video file is finalized) if ffmpeg: print(" Stopping FFmpeg...") graceful = ffmpeg.stop(timeout=15) @@ -326,22 +323,18 @@ def record_demo( if ffmpeg.stderr_path and ffmpeg.stderr_path.exists(): print(f" FFmpeg log: {ffmpeg.stderr_path}") - # Then stop CS2 if cs2_process and cs2_process.is_running(): print(" Terminating CS2...") cs2_process.terminate() print(" Cleanup complete") - # Save console.log with demo-matching name for retention if console_log_path.exists(): saved_log_path = output_path.parent / f"console_{demo_path.stem}.log" shutil.copy2(console_log_path, saved_log_path) print(f" Console log saved: {saved_log_path.name}") - # Update path for post-processing to use the saved copy console_log_path = saved_log_path - # Return result with metadata for post-processing success = exit_reason in ("demo_ended", "cs2_exited") return RecordingResult( success=success, @@ -350,6 +343,7 @@ def record_demo( recording_start_time=recording_start_time, player_slot=player_index, exit_reason=exit_reason, + timeline=timeline, ) @@ -359,11 +353,9 @@ def postprocess_video( player_slot: int, recording_start_time: float, verbose: bool = False, + timeline: Optional[DemoTimeline] = None, ) -> Path: - """Post-process a recorded video to trim death periods. - - This is a separate phase that can be run independently. - """ + """Post-process a recorded video to trim death periods.""" if not video_path.exists(): print(f"Error: Video file not found: {video_path}") return video_path @@ -371,38 +363,65 @@ def postprocess_video( raw_size_mb = video_path.stat().st_size / (1024 * 1024) print(f"\nRaw recording: {video_path} ({raw_size_mb:.1f} MB)") - print("\nPost-processing: extracting death periods...") - if verbose: - print(f" Console log: {console_log_path}") - print(f" Player slot: {player_slot}") - print(f" Recording start: {recording_start_time}") + print("\nPost-processing: extracting trim periods...") - if not console_log_path.exists(): - print(f" Console log not found, skipping trim") - return video_path + trim_periods: list[TrimPeriod] = [] - death_periods = extract_death_periods( - log_path=console_log_path, - player_slot=player_slot, - recording_start_time=recording_start_time, - verbose=verbose - ) + # Prefer timeline data + if timeline is not None and timeline.spawns: + print(" Using preprocessed timeline data (demoparser2)") + if verbose: + print(f" Deaths: {len(timeline.deaths)}, Spawns: {len(timeline.spawns)}") - if death_periods: - print(f" Found {len(death_periods)} death periods") - total_dead_time = sum(p.duration for p in death_periods) - print(f" Total dead time: {total_dead_time:.1f}s") + demo_trim_periods = get_trim_periods(timeline) + + if demo_trim_periods: + for start, end in demo_trim_periods: + trim_periods.append(TrimPeriod(start_time=start, end_time=end)) + if verbose: + print(f" Period: {start:.2f}s - {end:.2f}s ({end - start:.2f}s)") + + # Fall back to console.log parsing + if not trim_periods: + if timeline is not None: + print(" Timeline has no spawn data, falling back to console.log") + else: + print(" Using console.log parsing (legacy method)") + + if verbose: + print(f" Console log: {console_log_path}") + print(f" Player slot: {player_slot}") + print(f" Recording start: {recording_start_time}") + + if not console_log_path.exists(): + print(f" Console log not found, skipping trim") + print(f"\nRecording saved: {video_path} ({raw_size_mb:.1f} MB)") + return video_path + + death_periods = extract_death_periods( + log_path=console_log_path, + player_slot=player_slot, + recording_start_time=recording_start_time, + verbose=verbose + ) + + for dp in death_periods: + trim_periods.append(TrimPeriod(start_time=dp.death_time, end_time=dp.respawn_time)) + + if trim_periods: + print(f" Found {len(trim_periods)} periods to trim") + total_trim_time = sum(p.duration for p in trim_periods) + print(f" Total trim time: {total_trim_time:.1f}s") - # Rename original to _raw raw_path = video_path.parent / f"{video_path.stem}_raw{video_path.suffix}" video_path.rename(raw_path) print(f" Raw recording moved to: {raw_path.name}") - print(" Trimming death periods...") - success = trim_death_periods( + print(" Trimming periods from video...") + success = trim_video_with_periods( input_path=raw_path, output_path=video_path, - death_periods=death_periods, + trim_periods=trim_periods, verbose=verbose ) @@ -414,62 +433,287 @@ def postprocess_video( if not video_path.exists() and raw_path.exists(): raw_path.rename(video_path) else: - print(" No death periods found, keeping original") + print(" No periods to trim, keeping original") print(f"\nRecording saved: {video_path} ({raw_size_mb:.1f} MB)") return video_path -def main(): - """Main entry point.""" +# ============================================================================= +# Info Output Functions +# ============================================================================= + +def print_demo_info_extended( + demo_info: DemoInfo, + timelines: dict[int, DemoTimeline], + verbose: bool = False +): + """Print comprehensive demo information.""" + # Get rounds and duration from timeline data (more reliable than header) + rounds_count = 0 + max_tick = 0 + tickrate = demo_info.tick_rate or 64 + + for timeline in timelines.values(): + if timeline.rounds: + rounds_count = max(rounds_count, len(timeline.rounds)) + # Find max tick from spawns/deaths to calculate duration + for spawn in timeline.spawns: + max_tick = max(max_tick, spawn.tick) + for death in timeline.deaths: + max_tick = max(max_tick, death.tick) + if timeline.tickrate > 0: + tickrate = timeline.tickrate + + # Calculate duration from max tick + if max_tick > 0 and tickrate > 0: + duration = max_tick / tickrate + duration_str = f"{duration:.1f}s ({duration/60:.1f} min)" + elif demo_info.total_ticks > 0 and tickrate > 0: + duration = demo_info.total_ticks / tickrate + duration_str = f"{duration:.1f}s ({duration/60:.1f} min)" + else: + duration_str = "unknown" + + print(f"Demo: {demo_info.path.name}") + print(f" Map: {demo_info.map_name}") + print(f" Tick rate: {tickrate}/s") + print(f" Duration: {duration_str}") + print(f" Rounds: {rounds_count}") + + print(f"\nPlayers ({len(demo_info.players)}):") + for player in demo_info.players: + team_str = f"[{player.team}]" if player.team else "[--]" + timeline = timelines.get(player.steamid) + + print(f" {player.name}") + print(f" SteamID: {player.steamid} Team: {team_str}") + + if timeline: + death_count = len(timeline.deaths) + spawn_count = len(timeline.spawns) + dead_time = sum(p.duration_seconds for p in timeline.death_periods) + print(f" Deaths: {death_count}, Spawns: {spawn_count}, Dead time: {dead_time:.1f}s") + + if verbose and timeline.death_periods: + print(f" Death periods:") + for i, period in enumerate(timeline.death_periods, 1): + weapon = period.death.weapon or "unknown" + hs = " (headshot)" if period.death.headshot else "" + print(f" {i:2}. {period.death.time_seconds:7.1f}s - {period.spawn.time_seconds:7.1f}s " + f"({period.duration_seconds:5.1f}s) [{weapon}{hs}]") + + +def format_info_json( + demo_info: DemoInfo, + timelines: dict[int, DemoTimeline] +) -> dict: + """Format demo info as JSON-serializable dict.""" + # Get rounds and duration from timeline data (more reliable than header) + rounds_count = 0 + max_tick = 0 + tickrate = demo_info.tick_rate or 64 + + for timeline in timelines.values(): + if timeline.rounds: + rounds_count = max(rounds_count, len(timeline.rounds)) + for spawn in timeline.spawns: + max_tick = max(max_tick, spawn.tick) + for death in timeline.deaths: + max_tick = max(max_tick, death.tick) + if timeline.tickrate > 0: + tickrate = timeline.tickrate + + # Calculate duration from max tick + if max_tick > 0 and tickrate > 0: + duration = max_tick / tickrate + elif demo_info.total_ticks > 0 and tickrate > 0: + duration = demo_info.total_ticks / tickrate + else: + duration = 0 + + players_data = [] + for player in demo_info.players: + timeline = timelines.get(player.steamid) + player_data = { + "name": player.name, + "steamid": player.steamid, + "team": player.team, + } + + if timeline: + player_data["deaths"] = len(timeline.deaths) + player_data["spawns"] = len(timeline.spawns) + player_data["dead_time_seconds"] = sum(p.duration_seconds for p in timeline.death_periods) + player_data["death_periods"] = [ + { + "death_tick": p.death.tick, + "death_time": p.death.time_seconds, + "spawn_tick": p.spawn.tick, + "spawn_time": p.spawn.time_seconds, + "duration_seconds": p.duration_seconds, + "weapon": p.death.weapon, + "headshot": p.death.headshot, + } + for p in timeline.death_periods + ] + + players_data.append(player_data) + + return { + "demo": demo_info.path.name, + "map": demo_info.map_name, + "tick_rate": tickrate, + "duration_seconds": duration, + "rounds": rounds_count, + "players": players_data, + } + + +# ============================================================================= +# Argument Parser +# ============================================================================= + +def create_parser() -> argparse.ArgumentParser: + """Create argument parser with subcommands.""" parser = argparse.ArgumentParser( - description="Record a player's POV from a CS2 demo file", + prog="cs2pov", + description="Record player POV from CS2 demo files", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""\ Examples: - %(prog)s -d match.dem -p "PlayerName" -o recording.mp4 - %(prog)s -d match.dem -p "PlayerName" -o recording.mp4 --no-trim - %(prog)s -d match.dem -p "PlayerName" -o recording.mp4 --skip-recording \\ - --console-log /path/to/console.log --player-slot 3 --recording-start-time 1706500000 + cs2pov info demo.dem # Show demo information + cs2pov info demo.dem --json # Output as JSON + cs2pov pov -d demo.dem -p "Player" -o out.mp4 + cs2pov record -d demo.dem -p "Player" -o raw.mp4 + cs2pov trim raw.mp4 -d demo.dem -p "Player" """, ) - - parser.add_argument("-d", "--demo", required=True, type=Path, help="Path to demo file (.dem)") - parser.add_argument("-p", "--player", required=True, help="Player to record (name or SteamID)") - parser.add_argument("-o", "--output", required=True, type=Path, help="Output video file path") - parser.add_argument("-r", "--resolution", default="1920x1080", help="Recording resolution") - parser.add_argument("-f", "--framerate", type=int, default=60, help="Recording framerate") - parser.add_argument("--no-hud", action="store_true", help="Hide HUD elements") - parser.add_argument("--display", type=int, default=0, help="X display number") - parser.add_argument("--no-audio", action="store_true", help="Disable audio recording") - parser.add_argument("--audio-device", help="PulseAudio device for audio capture (auto-detected by default)") - parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") - parser.add_argument("--cs2-path", type=Path, help="Path to CS2 installation") - parser.add_argument("--no-trim", action="store_true", help="Skip post-processing trim") - parser.add_argument("--skip-recording", action="store_true", help="Skip recording, only post-process") - parser.add_argument("--console-log", type=Path, help="Console log path (for --skip-recording)") - parser.add_argument("--player-slot", type=int, help="Player slot (for --skip-recording)") - parser.add_argument("--recording-start-time", type=float, help="Recording start time (for --skip-recording)") parser.add_argument("--version", action="version", version=f"%(prog)s {__version__}") - # Subcommand for listing players - subparsers = parser.add_subparsers(dest="command") - list_parser = subparsers.add_parser("list", help="List players in a demo") - list_parser.add_argument("demo", type=Path, help="Path to demo file") + subparsers = parser.add_subparsers(dest="command", required=True, metavar="COMMAND") + + # Shared argument groups + demo_args = argparse.ArgumentParser(add_help=False) + demo_args.add_argument("-d", "--demo", required=True, type=Path, + help="Demo file (.dem)") + + player_args = argparse.ArgumentParser(add_help=False) + player_args.add_argument("-p", "--player", required=True, + help="Player name or SteamID") + + output_args = argparse.ArgumentParser(add_help=False) + output_args.add_argument("-o", "--output", required=True, type=Path, + help="Output video file") + + recording_args = argparse.ArgumentParser(add_help=False) + recording_args.add_argument("-r", "--resolution", default="1920x1080", + help="Recording resolution (default: 1920x1080)") + recording_args.add_argument("-f", "--framerate", type=int, default=60, + help="Recording framerate (default: 60)") + recording_args.add_argument("--display", type=int, default=0, + help="X display number (default: 0)") + recording_args.add_argument("--no-hud", action="store_true", + help="Hide HUD elements") + recording_args.add_argument("--no-audio", action="store_true", + help="Disable audio recording") + recording_args.add_argument("--audio-device", + help="PulseAudio device (auto-detected)") + recording_args.add_argument("--cs2-path", type=Path, + help="Custom CS2 installation path") + + verbose_args = argparse.ArgumentParser(add_help=False) + verbose_args.add_argument("-v", "--verbose", action="store_true", + help="Verbose output") + + # INFO command + info_parser = subparsers.add_parser( + "info", + parents=[verbose_args], + help="Show demo information and player timeline data", + description="Display demo metadata, player list, and death/spawn statistics.", + ) + info_parser.add_argument("demo", type=Path, help="Demo file (.dem)") + info_parser.add_argument("--json", action="store_true", + help="Output as JSON") + + # POV command (full pipeline) + pov_parser = subparsers.add_parser( + "pov", + parents=[demo_args, player_args, output_args, recording_args, verbose_args], + help="Record and trim player POV (full pipeline)", + description="Record a player's POV from a demo and trim death periods.", + ) + pov_parser.add_argument("--no-trim", action="store_true", + help="Skip post-processing trim") + + # RECORD command + subparsers.add_parser( + "record", + parents=[demo_args, player_args, output_args, recording_args, verbose_args], + help="Record player POV without trimming", + description="Record a player's POV from a demo without post-processing.", + ) - args = parser.parse_args() + # TRIM command + trim_parser = subparsers.add_parser( + "trim", + parents=[demo_args, player_args, verbose_args], + help="Trim death periods from recorded video", + description="Post-process an existing recording to remove death periods.", + ) + trim_parser.add_argument("video", type=Path, help="Input video file") + trim_parser.add_argument("-o", "--output", type=Path, + help="Output file (default: adds _trimmed suffix)") + # Fallback options + trim_parser.add_argument("--console-log", type=Path, + help="Console.log file (fallback when demo unavailable)") + trim_parser.add_argument("--player-slot", type=int, + help="Player slot, 0-based (fallback)") + trim_parser.add_argument("--recording-start-time", type=float, + help="Recording start timestamp (fallback)") + + return parser + + +# ============================================================================= +# Command Handlers +# ============================================================================= + +def cmd_info(args) -> int: + """Handle 'info' command - show demo information.""" + demo_path = args.demo.resolve() + if not demo_path.exists(): + print(f"Error: Demo not found: {demo_path}", file=sys.stderr) + return 1 + + # Parse basic demo info + try: + demo_info = parse_demo(demo_path) + except CS2POVError as e: + print(f"Error: {e}", file=sys.stderr) + return 1 - # Handle list subcommand - if args.command == "list": + # Preprocess for timeline data (per-player) + player_timelines: dict[int, DemoTimeline] = {} + for player in demo_info.players: try: - demo_info = parse_demo(args.demo) - print(f"Demo: {args.demo.name}") - print_demo_info(demo_info) - return 0 - except CS2POVError as e: - print(f"Error: {e}", file=sys.stderr) - return 1 + timeline = preprocess_demo(demo_path, player.steamid, player.name) + player_timelines[player.steamid] = timeline + except Exception: + pass + + if args.json: + output = format_info_json(demo_info, player_timelines) + print(json.dumps(output, indent=2)) + else: + print_demo_info_extended(demo_info, player_timelines, verbose=args.verbose) + return 0 + + +def cmd_pov(args) -> int: + """Handle 'pov' command - full recording pipeline.""" # Check dependencies missing = check_dependencies() if missing: @@ -481,62 +725,33 @@ Examples: # Validate inputs demo_path = args.demo.resolve() if not demo_path.exists(): - print(f"Error: Demo file not found: {demo_path}", file=sys.stderr) + print(f"Error: Demo not found: {demo_path}", file=sys.stderr) return 1 - output_path = args.output.resolve() - try: resolution = parse_resolution(args.resolution) except ValueError as e: print(f"Error: {e}", file=sys.stderr) return 1 - # Handle --skip-recording mode - if args.skip_recording: - if not args.console_log: - print("Error: --skip-recording requires --console-log", file=sys.stderr) - return 1 - if args.player_slot is None: - print("Error: --skip-recording requires --player-slot", file=sys.stderr) - return 1 - if args.recording_start_time is None: - print("Error: --skip-recording requires --recording-start-time", file=sys.stderr) - return 1 - if not output_path.exists(): - print(f"Error: Video file not found: {output_path}", file=sys.stderr) - return 1 - - print("Post-processing only (--skip-recording mode)") - postprocess_video( - video_path=output_path, - console_log_path=args.console_log.resolve(), - player_slot=args.player_slot, - recording_start_time=args.recording_start_time, - verbose=args.verbose, - ) - return 0 + output_path = args.output.resolve() - # Phase 1-3: Record (with built-in cleanup) - try: - result = record_demo( - demo_path=demo_path, - player_identifier=args.player, - output_path=output_path, - resolution=resolution, - framerate=args.framerate, - hide_hud=args.no_hud, - display_num=args.display, - verbose=args.verbose, - cs2_path_override=args.cs2_path, - enable_audio=not args.no_audio, - audio_device=args.audio_device, - ) - except CS2POVError as e: - print(f"Error: {e}", file=sys.stderr) - return 1 + # Record + result = record_demo( + demo_path=demo_path, + player_identifier=args.player, + output_path=output_path, + resolution=resolution, + framerate=args.framerate, + hide_hud=args.no_hud, + display_num=args.display, + verbose=args.verbose, + cs2_path_override=args.cs2_path, + enable_audio=not args.no_audio, + audio_device=args.audio_device, + ) - # Phase 4: Post-process (completely separate) + # Post-process if result.success and not args.no_trim: postprocess_video( video_path=result.video_path, @@ -544,6 +759,7 @@ Examples: player_slot=result.player_slot, recording_start_time=result.recording_start_time, verbose=args.verbose, + timeline=result.timeline, ) elif result.success: size_mb = result.video_path.stat().st_size / (1024 * 1024) @@ -557,5 +773,153 @@ Examples: return 0 if result.success else 1 +def cmd_record(args) -> int: + """Handle 'record' command - raw recording without trimming.""" + # Check dependencies + missing = check_dependencies() + if missing: + print("Missing dependencies:", file=sys.stderr) + for dep in missing: + print(f" - {dep}", file=sys.stderr) + return 1 + + # Validate inputs + demo_path = args.demo.resolve() + if not demo_path.exists(): + print(f"Error: Demo not found: {demo_path}", file=sys.stderr) + return 1 + + try: + resolution = parse_resolution(args.resolution) + except ValueError as e: + print(f"Error: {e}", file=sys.stderr) + return 1 + + output_path = args.output.resolve() + + # Record only + result = record_demo( + demo_path=demo_path, + player_identifier=args.player, + output_path=output_path, + resolution=resolution, + framerate=args.framerate, + hide_hud=args.no_hud, + display_num=args.display, + verbose=args.verbose, + cs2_path_override=args.cs2_path, + enable_audio=not args.no_audio, + audio_device=args.audio_device, + ) + + if result.success: + size_mb = result.video_path.stat().st_size / (1024 * 1024) + print(f"\nRaw recording saved: {result.video_path} ({size_mb:.1f} MB)") + print(f"\nTo trim later, run:") + print(f" cs2pov trim \"{result.video_path}\" -d \"{demo_path}\" -p \"{args.player}\"") + else: + print(f"\nRecording ended with: {result.exit_reason}") + if result.video_path.exists(): + size_mb = result.video_path.stat().st_size / (1024 * 1024) + print(f"Partial recording available: {result.video_path} ({size_mb:.1f} MB)") + + return 0 if result.success else 1 + + +def cmd_trim(args) -> int: + """Handle 'trim' command - post-process existing video.""" + video_path = args.video.resolve() + if not video_path.exists(): + print(f"Error: Video not found: {video_path}", file=sys.stderr) + return 1 + + demo_path = args.demo.resolve() + if not demo_path.exists(): + print(f"Error: Demo not found: {demo_path}", file=sys.stderr) + return 1 + + # Determine output path + if args.output: + output_path = args.output.resolve() + else: + output_path = video_path.parent / f"{video_path.stem}_trimmed{video_path.suffix}" + + # Parse demo and find player + try: + demo_info = parse_demo(demo_path) + player = find_player(demo_info, args.player) + player_slot = get_player_index(demo_info, player) + except CS2POVError as e: + print(f"Error: {e}", file=sys.stderr) + return 1 + + # Get timeline from demo + timeline = None + try: + timeline = preprocess_demo(demo_path, player.steamid, player.name) + print(f"Using demo timeline: {len(timeline.deaths)} deaths, {len(timeline.spawns)} spawns") + except Exception as e: + print(f"Warning: Could not preprocess demo: {e}") + print("Falling back to console.log method") + + # Validate fallback parameters if needed + if timeline is None: + if args.console_log is None: + print("Error: --console-log required when demo preprocessing fails", file=sys.stderr) + return 1 + if args.player_slot is None: + print("Error: --player-slot required when demo preprocessing fails", file=sys.stderr) + return 1 + if args.recording_start_time is None: + print("Error: --recording-start-time required when demo preprocessing fails", file=sys.stderr) + return 1 + player_slot = args.player_slot + + # Copy video to output path first (postprocess_video expects to rename) + if video_path != output_path: + shutil.copy2(video_path, output_path) + + # Run trimming + postprocess_video( + video_path=output_path, + console_log_path=args.console_log.resolve() if args.console_log else Path("/dev/null"), + player_slot=player_slot, + recording_start_time=args.recording_start_time or 0.0, + verbose=args.verbose, + timeline=timeline, + ) + + return 0 + + +# ============================================================================= +# Entry Point +# ============================================================================= + +def main() -> int: + """Main entry point.""" + parser = create_parser() + args = parser.parse_args() + + try: + if args.command == "info": + return cmd_info(args) + elif args.command == "pov": + return cmd_pov(args) + elif args.command == "record": + return cmd_record(args) + elif args.command == "trim": + return cmd_trim(args) + else: + parser.print_help() + return 1 + except CS2POVError as e: + print(f"Error: {e}", file=sys.stderr) + return 1 + except KeyboardInterrupt: + print("\nInterrupted", file=sys.stderr) + return 130 + + if __name__ == "__main__": sys.exit(main()) diff --git a/cs2pov/config.py b/cs2pov/config.py index dc1c933..04eab45 100644 --- a/cs2pov/config.py +++ b/cs2pov/config.py @@ -12,6 +12,7 @@ class RecordingConfig: player_index: int player_name: str = "" player_steamid: int = 0 + player_slot: int = 0 # user_id + 1, used for spec_player command resolution: tuple[int, int] = (1920, 1080) hide_hud: bool = True spec_mode: int = 4 # 4 = first-person, 5 = third-person, 6 = free roam @@ -45,8 +46,8 @@ spec_mode {spec_mode} // Auto-quit when demo ends demo_quitafterplayback 1 -// Create alias to lock to player by name (more reliable than index) -alias "lock_pov" "spec_player \\"{player_name}\\"" +// Create alias to lock to player by slot number (user_id + 1) +alias "lock_pov" "spec_player {player_slot}" // Bind F5 to lock to player (automation will press this) bind "F5" "lock_pov" @@ -58,7 +59,7 @@ bind "MOUSE4" "lock_pov" playdemo "replays/{demo_name}" // These run before demo loads but we try anyway -spec_player "{player_name}" +spec_player {player_slot} spec_mode {spec_mode} // Tell user what's happening @@ -96,6 +97,7 @@ def generate_recording_cfg(config: RecordingConfig, output_path: Path) -> Path: hud_commands=hud_commands, spec_mode=config.spec_mode, player_name=config.player_name, + player_slot=config.player_slot, demo_name=config.demo_name, ) diff --git a/cs2pov/parser.py b/cs2pov/parser.py index 451ece2..d9a89ca 100644 --- a/cs2pov/parser.py +++ b/cs2pov/parser.py @@ -17,6 +17,7 @@ class PlayerInfo: steamid: int name: str team: Optional[str] = None + user_id: Optional[int] = None # Player slot from demo (use user_id + 1 for spec_player) @dataclass @@ -67,7 +68,7 @@ def parse_demo(demo_path: Path) -> DemoInfo: # Get unique players from tick data # parse_ticks returns a DataFrame with steamid and name columns - tick_df = parser.parse_ticks(["team_num"]) + tick_df = parser.parse_ticks(["team_num", "user_id"]) players = _extract_players(tick_df) # Get round events @@ -109,7 +110,10 @@ def _extract_players(tick_df) -> list[PlayerInfo]: team = "T" elif team_num == 3: team = "CT" - players.append(PlayerInfo(steamid=steamid, name=name, team=team)) + user_id = None + if "user_id" in row: + user_id = int(row["user_id"]) + players.append(PlayerInfo(steamid=steamid, name=name, team=team, user_id=user_id)) return players diff --git a/cs2pov/preprocessor.py b/cs2pov/preprocessor.py new file mode 100644 index 0000000..0bd9325 --- /dev/null +++ b/cs2pov/preprocessor.py @@ -0,0 +1,411 @@ +"""Demo preprocessor - extract timeline data using demoparser2. + +Uses demoparser2 to extract death/spawn events and round boundaries +directly from the demo file, providing tick-level precision without +depending on console.log parsing. +""" + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + +from demoparser2 import DemoParser + +from .exceptions import DemoNotFoundError, DemoParseError + + +@dataclass +class DeathEvent: + """A player death event from the demo.""" + + tick: int + time_seconds: float + attacker_steamid: Optional[int] = None + weapon: Optional[str] = None + headshot: bool = False + + +@dataclass +class SpawnEvent: + """A player spawn event from the demo.""" + + tick: int + time_seconds: float + + +@dataclass +class DeathPeriod: + """A period when the player was dead (between death and respawn).""" + + death: DeathEvent + spawn: SpawnEvent + + @property + def duration_ticks(self) -> int: + return self.spawn.tick - self.death.tick + + @property + def duration_seconds(self) -> float: + return self.spawn.time_seconds - self.death.time_seconds + + +@dataclass +class RoundBoundary: + """Boundaries for a round in the demo.""" + + round_num: int + prestart_tick: int + prestart_time: float + freeze_end_tick: Optional[int] = None + freeze_end_time: Optional[float] = None + end_tick: Optional[int] = None + end_time: Optional[float] = None + + +@dataclass +class DemoTimeline: + """Complete timeline data for a player in a demo.""" + + player_steamid: int + player_name: str + tickrate: float + total_ticks: int + total_duration: float + + deaths: list[DeathEvent] = field(default_factory=list) + spawns: list[SpawnEvent] = field(default_factory=list) + death_periods: list[DeathPeriod] = field(default_factory=list) + rounds: list[RoundBoundary] = field(default_factory=list) + + +def preprocess_demo(demo_path: Path, player_steamid: int, player_name: str = "") -> DemoTimeline: + """Pre-process demo to extract timeline data for a player. + + Uses demoparser2 to extract: + - player_death events for the target player + - player_spawn events for the target player + - Round boundaries (prestart, freeze_end, end) + + Args: + demo_path: Path to the demo file + player_steamid: SteamID64 of the player to track + player_name: Player name (for metadata) + + Returns: + DemoTimeline with all extracted data + + Raises: + DemoNotFoundError: If demo file doesn't exist + DemoParseError: If parsing fails + """ + if not demo_path.exists(): + raise DemoNotFoundError(f"Demo file not found: {demo_path}") + + try: + parser = DemoParser(str(demo_path)) + + # Get header for tickrate and duration + header = parser.parse_header() + tickrate = header.get("playback_ticks_per_second", 64) + total_ticks = header.get("playback_ticks", 0) + total_duration = total_ticks / tickrate if tickrate > 0 else 0 + + # Extract death events for the player + deaths = _extract_deaths(parser, player_steamid, tickrate) + + # Extract spawn events for the player + spawns = _extract_spawns(parser, player_steamid, tickrate) + + # Compute death periods (match deaths with subsequent spawns) + death_periods = compute_death_periods(deaths, spawns) + + # Extract round boundaries + rounds = _extract_round_boundaries(parser, tickrate) + + return DemoTimeline( + player_steamid=player_steamid, + player_name=player_name, + tickrate=tickrate, + total_ticks=total_ticks, + total_duration=total_duration, + deaths=deaths, + spawns=spawns, + death_periods=death_periods, + rounds=rounds, + ) + + except Exception as e: + if isinstance(e, (DemoNotFoundError, DemoParseError)): + raise + raise DemoParseError(f"Failed to preprocess demo: {e}") from e + + +def _extract_deaths(parser: DemoParser, player_steamid: int, tickrate: float) -> list[DeathEvent]: + """Extract death events for a specific player.""" + deaths = [] + + try: + events_df = parser.parse_event("player_death") + if events_df is None or len(events_df) == 0: + return deaths + + # Filter for deaths of the target player + # user_steamid is the player who died + # Note: event steamids are strings, tick data steamids are uint64 + steamid_str = str(player_steamid) + player_deaths = events_df[events_df["user_steamid"] == steamid_str] + + for _, row in player_deaths.iterrows(): + tick = int(row["tick"]) + time_seconds = tick / tickrate if tickrate > 0 else 0 + + # Extract optional fields + attacker_steamid = row.get("attacker_steamid") + if attacker_steamid is not None and attacker_steamid != 0: + attacker_steamid = int(attacker_steamid) + else: + attacker_steamid = None + + weapon = row.get("weapon") + if weapon is not None: + weapon = str(weapon) + + headshot = bool(row.get("headshot", False)) + + deaths.append(DeathEvent( + tick=tick, + time_seconds=time_seconds, + attacker_steamid=attacker_steamid, + weapon=weapon, + headshot=headshot, + )) + + except Exception: + # If event parsing fails, return empty list + pass + + # Sort by tick to ensure chronological order + deaths.sort(key=lambda d: d.tick) + return deaths + + +def _extract_spawns(parser: DemoParser, player_steamid: int, tickrate: float) -> list[SpawnEvent]: + """Extract spawn events for a specific player.""" + spawns = [] + + try: + events_df = parser.parse_event("player_spawn") + if events_df is None or len(events_df) == 0: + return spawns + + # Filter for spawns of the target player + # Note: event steamids are strings, tick data steamids are uint64 + steamid_str = str(player_steamid) + player_spawns = events_df[events_df["user_steamid"] == steamid_str] + + for _, row in player_spawns.iterrows(): + tick = int(row["tick"]) + time_seconds = tick / tickrate if tickrate > 0 else 0 + + spawns.append(SpawnEvent( + tick=tick, + time_seconds=time_seconds, + )) + + except Exception: + # If event parsing fails, return empty list + pass + + # Sort by tick to ensure chronological order + spawns.sort(key=lambda s: s.tick) + return spawns + + +def _extract_round_boundaries(parser: DemoParser, tickrate: float) -> list[RoundBoundary]: + """Extract round boundary events.""" + rounds = [] + + try: + # Parse each event type individually (parse_event only takes one event name) + prestart_df = parser.parse_event("round_prestart") + freeze_end_df = parser.parse_event("round_freeze_end") + round_end_df = parser.parse_event("round_officially_ended") + + # Convert to lists of (tick, time) tuples + prestart_list = [] + if prestart_df is not None and len(prestart_df) > 0: + prestart_df = prestart_df.sort_values("tick") + prestart_list = [(int(row["tick"]), int(row["tick"]) / tickrate) for _, row in prestart_df.iterrows()] + + freeze_end_list = [] + if freeze_end_df is not None and len(freeze_end_df) > 0: + freeze_end_df = freeze_end_df.sort_values("tick") + freeze_end_list = [(int(row["tick"]), int(row["tick"]) / tickrate) for _, row in freeze_end_df.iterrows()] + + round_end_list = [] + if round_end_df is not None and len(round_end_df) > 0: + round_end_df = round_end_df.sort_values("tick") + round_end_list = [(int(row["tick"]), int(row["tick"]) / tickrate) for _, row in round_end_df.iterrows()] + + if not prestart_list: + return rounds + + # Match events to form round boundaries + for round_num, (prestart_tick, prestart_time) in enumerate(prestart_list, start=1): + round_boundary = RoundBoundary( + round_num=round_num, + prestart_tick=prestart_tick, + prestart_time=prestart_time, + ) + + # Find freeze_end after this prestart + for freeze_tick, freeze_time in freeze_end_list: + if freeze_tick > prestart_tick: + round_boundary.freeze_end_tick = freeze_tick + round_boundary.freeze_end_time = freeze_time + break + + # Find round_end after this prestart (but before next prestart) + next_prestart_tick = float("inf") + if round_num < len(prestart_list): + next_prestart_tick = prestart_list[round_num][0] + + for end_tick, end_time in round_end_list: + if prestart_tick < end_tick < next_prestart_tick: + round_boundary.end_tick = end_tick + round_boundary.end_time = end_time + break + + rounds.append(round_boundary) + + except Exception: + # If event parsing fails, return empty list + pass + + return rounds + + +def compute_death_periods(deaths: list[DeathEvent], spawns: list[SpawnEvent]) -> list[DeathPeriod]: + """Match each death with its subsequent spawn to form death periods. + + For each death, finds the next spawn that occurs after it. + Deaths without a subsequent spawn (e.g., last death of match) are ignored. + + Args: + deaths: List of death events, sorted by tick + spawns: List of spawn events, sorted by tick + + Returns: + List of DeathPeriod objects + """ + if not deaths or not spawns: + return [] + + death_periods = [] + spawn_idx = 0 + + for death in deaths: + # Find next spawn after this death + while spawn_idx < len(spawns) and spawns[spawn_idx].tick <= death.tick: + spawn_idx += 1 + + if spawn_idx < len(spawns): + spawn = spawns[spawn_idx] + death_periods.append(DeathPeriod(death=death, spawn=spawn)) + spawn_idx += 1 + + return death_periods + + +def get_trim_periods(timeline: DemoTimeline) -> list[tuple[float, float]]: + """Get periods to trim from video based on timeline data. + + Uses first player_spawn as t=0 reference point. This handles the + variable CS2 startup delay by anchoring to when the player first + appears in the game. + + Returns (start, end) tuples for periods to remove: + - Initial period: 0 to first_spawn_time (startup + freeze time) + - Death periods: death_time to respawn_time for each death + + The returned times are relative to video start (not demo start). + When recording, the first spawn in the demo corresponds to some + point in the video. By returning demo-relative times and letting + the caller handle the offset, we keep this function pure. + + Args: + timeline: DemoTimeline with death/spawn data + + Returns: + List of (start_seconds, end_seconds) tuples to trim + """ + if not timeline.spawns: + return [] + + trim_periods = [] + + # First spawn is our reference point + first_spawn = timeline.spawns[0] + first_spawn_time = first_spawn.time_seconds + + # Trim from demo start to first spawn (startup, freeze time, etc.) + # Only add if > 0.5s to avoid tiny trims + if first_spawn_time > 0.5: + trim_periods.append((0.0, first_spawn_time)) + + # Add death periods (already computed relative to demo start) + for period in timeline.death_periods: + # Only include if death is after first spawn + if period.death.tick > first_spawn.tick: + trim_periods.append(( + period.death.time_seconds, + period.spawn.time_seconds, + )) + + return trim_periods + + +def get_trim_periods_for_video( + timeline: DemoTimeline, + video_start_offset: float = 0.0, +) -> list[tuple[float, float]]: + """Get trim periods adjusted for video timing. + + When recording a demo, there's a delay between video start and + demo playback start. This function adjusts the timeline-based + trim periods to account for that offset. + + Args: + timeline: DemoTimeline with death/spawn data + video_start_offset: Seconds from video start to first spawn + in video. If not provided, assumes first spawn is at + video_start_offset=0 (i.e., video started at first spawn). + + Returns: + List of (start_seconds, end_seconds) tuples for video trimming + """ + demo_periods = get_trim_periods(timeline) + if not demo_periods: + return [] + + # Get first spawn time in demo (our reference point) + if not timeline.spawns: + return [] + + first_spawn_demo_time = timeline.spawns[0].time_seconds + + # Adjust periods: demo time -> video time + # If first spawn is at video_start_offset in the video, + # then demo_time 0 is at (video_start_offset - first_spawn_demo_time) + video_periods = [] + for start, end in demo_periods: + # Convert demo-relative to video-relative + video_start = start - first_spawn_demo_time + video_start_offset + video_end = end - first_spawn_demo_time + video_start_offset + + # Only include positive time ranges + if video_end > 0: + video_start = max(0.0, video_start) + video_periods.append((video_start, video_end)) + + return video_periods diff --git a/cs2pov/trim.py b/cs2pov/trim.py index 6bfb97b..82e33d5 100644 --- a/cs2pov/trim.py +++ b/cs2pov/trim.py @@ -6,10 +6,13 @@ import tempfile from dataclasses import dataclass from datetime import datetime from pathlib import Path -from typing import Optional +from typing import Optional, TYPE_CHECKING from .exceptions import CaptureError +if TYPE_CHECKING: + from .preprocessor import DemoTimeline + @dataclass class DeathPeriod: @@ -22,6 +25,17 @@ class DeathPeriod: return self.respawn_time - self.death_time +@dataclass +class TrimPeriod: + """A period to trim from the video.""" + start_time: float # Video timestamp in seconds + end_time: float # Video timestamp in seconds + + @property + def duration(self) -> float: + return self.end_time - self.start_time + + def parse_log_timestamp(timestamp_str: str, reference_date: Optional[datetime] = None) -> datetime: """Parse a console.log timestamp in MM/DD HH:mm:ss format. @@ -340,3 +354,74 @@ def trim_death_periods( print(f" [Trim] Output video: {output_duration:.2f}s") return True + + +def get_trim_periods_from_timeline( + timeline: "DemoTimeline", + first_spawn_video_time: float, + verbose: bool = False, +) -> list[TrimPeriod]: + """Get trim periods from preprocessed timeline data. + + Uses the timeline's death/spawn events to compute trim periods. + The first_spawn_video_time parameter tells us when the first spawn + occurred in the video, allowing us to align demo times with video times. + + Args: + timeline: Preprocessed DemoTimeline with death/spawn events + first_spawn_video_time: Video timestamp when first spawn occurred + verbose: Print debug output + + Returns: + List of TrimPeriod objects with video-relative timestamps + """ + from .preprocessor import get_trim_periods_for_video + + trim_periods = [] + + # Get demo-relative trim periods and convert to video-relative + video_periods = get_trim_periods_for_video(timeline, first_spawn_video_time) + + for start, end in video_periods: + trim_periods.append(TrimPeriod(start_time=start, end_time=end)) + if verbose: + print(f" [Trim] Period: {start:.2f}s - {end:.2f}s ({end - start:.2f}s)") + + if verbose: + print(f" [Trim] Found {len(trim_periods)} periods to trim from timeline") + + return trim_periods + + +def trim_video_with_periods( + input_path: Path, + output_path: Path, + trim_periods: list[TrimPeriod], + verbose: bool = False, +) -> bool: + """Trim specified periods from video using FFmpeg concat demuxer. + + This is a generalized version that takes TrimPeriod objects directly. + Creates segments for periods to keep and concatenates them. + + Args: + input_path: Path to input video + output_path: Path for output video + trim_periods: List of periods to remove from video + verbose: Print debug output + + Returns: + True if trimming was performed, False if no trimming needed + """ + if not trim_periods: + if verbose: + print(" [Trim] No periods to trim") + return False + + # Convert TrimPeriod to DeathPeriod for compatibility with existing logic + death_periods = [ + DeathPeriod(death_time=p.start_time, respawn_time=p.end_time) + for p in trim_periods + ] + + return trim_death_periods(input_path, output_path, death_periods, verbose) |
