diff options
| author | Schark <jordan@schark.online> | 2026-01-28 23:52:45 -0500 |
|---|---|---|
| committer | Schark <jordan@schark.online> | 2026-01-28 23:52:45 -0500 |
| commit | e65fa0696c42f3c59b802f865aacbce739a014f2 (patch) | |
| tree | cb3d3719ec03caadddb996636d4972ab84b42690 | |
| download | cs2pov-e65fa0696c42f3c59b802f865aacbce739a014f2.tar.gz cs2pov-e65fa0696c42f3c59b802f865aacbce739a014f2.zip | |
Init
| -rw-r--r-- | .gitignore | 6 | ||||
| -rw-r--r-- | README.md | 145 | ||||
| -rw-r--r-- | cs2pov/__init__.py | 3 | ||||
| -rw-r--r-- | cs2pov/automation.py | 166 | ||||
| -rw-r--r-- | cs2pov/capture.py | 200 | ||||
| -rw-r--r-- | cs2pov/cli.py | 561 | ||||
| -rw-r--r-- | cs2pov/config.py | 135 | ||||
| -rw-r--r-- | cs2pov/exceptions.py | 49 | ||||
| -rw-r--r-- | cs2pov/game.py | 330 | ||||
| -rw-r--r-- | cs2pov/parser.py | 244 | ||||
| -rw-r--r-- | cs2pov/trim.py | 342 | ||||
| -rw-r--r-- | requirements.txt | 1 | ||||
| -rwxr-xr-x | scripts/kill-cs2.sh | 99 | ||||
| -rw-r--r-- | setup.py | 40 |
14 files changed, 2321 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1bb3ca1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +*.mp4 +*.log +.claude/ +*egg-info* +*__pycache__* +.venv/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..c7f0b45 --- /dev/null +++ b/README.md @@ -0,0 +1,145 @@ +# cs2pov + +Record a specific player's POV from Counter-Strike 2 demo files. + +## Installation + +```bash +# Install Python dependencies +pip install -e . + +# System dependencies (Gentoo) +emerge -av media-video/ffmpeg + +# System dependencies (Debian/Ubuntu) +apt install ffmpeg +``` + +## Quick Start + +```bash +# List players in a demo +cs2pov list /path/to/demo.dem + +# Record a player's POV +cs2pov -d /path/to/demo.dem -p "PlayerName" -o recording.mp4 +``` + +## Usage + +``` +cs2pov [-h] --demo DEMO --player PLAYER --output OUTPUT [options] +``` + +### Required Arguments + +| Argument | Short | Description | +|----------|-------|-------------| +| `--demo` | `-d` | Path to demo file (.dem) | +| `--player` | `-p` | Player to record (name or SteamID) | +| `--output` | `-o` | Output video file path | + +### Optional Arguments + +| Argument | Short | Default | Description | +|----------|-------|---------|-------------| +| `--resolution` | `-r` | 1920x1080 | Recording resolution | +| `--framerate` | `-f` | 60 | Recording framerate | +| `--no-hud` | | off | Hide HUD elements (except killfeed) | +| `--display` | | 0 | X display number (0 = real display) | +| `--cs2-path` | | auto | Path to CS2 installation | +| `--verbose` | `-v` | off | Verbose output | +| `--version` | | | Show version | + +### Player Identification + +The `--player` argument accepts multiple formats: + +- **Player name**: `"PlayerName"` (case-insensitive, partial match supported) +- **SteamID64**: `76561198012345678` +- **SteamID**: `STEAM_0:1:12345678` +- **SteamID3**: `[U:1:12345678]` + +### Subcommands + +#### `list` - List players in a demo + +```bash +cs2pov list /path/to/demo.dem +``` + +Shows all players with their names, teams, and SteamIDs. + +## Examples + +```bash +# Basic recording +cs2pov -d match.dem -p "s1mple" -o s1mple_pov.mp4 + +# Record by SteamID +cs2pov -d match.dem -p 76561198012345678 -o recording.mp4 + +# Higher resolution, lower framerate +cs2pov -d match.dem -p "Player" -o out.mp4 -r 2560x1440 -f 30 + +# Hide HUD, verbose output +cs2pov -d match.dem -p "Player" -o out.mp4 --no-hud -v + +# Custom CS2 installation path +cs2pov -d match.dem -p "Player" -o out.mp4 --cs2-path /mnt/games/SteamLibrary/steamapps/common/Counter-Strike\ Global\ Offensive + +# Use virtual display (experimental, may not work with Vulkan) +cs2pov -d match.dem -p "Player" -o out.mp4 --display 99 +``` + +## Environment Variables + +| Variable | Description | +|----------|-------------| +| `CS2_PATH` | Path to CS2 installation (alternative to `--cs2-path`) | + +```bash +export CS2_PATH="/mnt/games/SteamLibrary/steamapps/common/Counter-Strike Global Offensive" +cs2pov -d match.dem -p "Player" -o out.mp4 +``` + +## Known Issues + +### Recording Uses Your Real Display + +CS2 requires Vulkan, and virtual framebuffers (Xvfb) don't support Vulkan. The tool defaults to `--display 0` (your real display), which means: + +- CS2 will take over your screen during recording +- You can't use your computer while recording +- No need to restart your window manager or display server + +If you try `--display 99` (virtual display), you'll see: "The selected graphics queue does not support presenting a swapchain image" + +### Close CS2 Before Recording + +Steam prevents running multiple CS2 instances. Close any running CS2 before using cs2pov. + +### Camera May Drift During Rounds + +CS2 doesn't support tick-accurate command injection (VDM files). The camera is locked to the target player at demo start but may drift during round transitions. + +## How It Works + +1. **Parse demo** - Extract player list and metadata using demoparser2 +2. **Generate config** - Create CS2 CFG file with spectator settings +3. **Copy demo** - Place demo in CS2's replays directory +4. **Start capture** - Launch FFmpeg to record the display +5. **Launch CS2** - Start CS2 with the generated config +6. **Wait** - CS2 plays the demo and exits automatically +7. **Finalize** - Stop capture and save video + +## Requirements + +- **Python 3.10+** +- **CS2** installed via Steam +- **FFmpeg** for video capture +- **GPU** with Vulkan support (NVIDIA or AMD) + +## License + +MIT diff --git a/cs2pov/__init__.py b/cs2pov/__init__.py new file mode 100644 index 0000000..a91c3d4 --- /dev/null +++ b/cs2pov/__init__.py @@ -0,0 +1,3 @@ +"""CS2 POV Recorder - Record player POV from CS2 demo files.""" + +__version__ = "0.1.0" diff --git a/cs2pov/automation.py b/cs2pov/automation.py new file mode 100644 index 0000000..843b2ee --- /dev/null +++ b/cs2pov/automation.py @@ -0,0 +1,166 @@ +"""CS2 input automation using xdotool - simple, non-threaded functions.""" + +import os +import re +import shutil +import subprocess +from pathlib import Path +from typing import Optional + + +def check_xdotool(): + """Check if xdotool is available.""" + if not shutil.which("xdotool"): + raise RuntimeError("xdotool not found. Install with: emerge x11-misc/xdotool") + + +def find_cs2_window(display: str = ":0") -> Optional[str]: + """Find CS2 window ID using xdotool. + + Args: + display: X display where CS2 is running + + Returns: + Window ID string, or None if not found + """ + env = os.environ.copy() + env["DISPLAY"] = display + + try: + result = subprocess.run( + ["xdotool", "search", "--class", "cs2"], + capture_output=True, + text=True, + timeout=5, + env=env + ) + if result.returncode == 0 and result.stdout.strip(): + windows = result.stdout.strip().split('\n') + # Return first window found + if windows and windows[0]: + return windows[0] + except Exception: + pass + return None + + +def send_key(key: str, display: str = ":0", window_id: Optional[str] = None) -> bool: + """Send a key press to CS2 window. + + Args: + key: Key to send (e.g., "F5", "space", "Return") + display: X display + window_id: Optional window ID (will find if not provided) + + Returns: + True if successful + """ + if window_id is None: + window_id = find_cs2_window(display) + if not window_id: + return False + + env = os.environ.copy() + env["DISPLAY"] = display + + try: + result = subprocess.run( + ["xdotool", "key", "--window", window_id, key], + capture_output=True, + text=True, + timeout=5, + env=env + ) + return result.returncode == 0 + except Exception: + return False + + +def check_demo_ended(log_path: Path, last_position: int = 0) -> tuple[bool, int]: + """Check if demo has ended by looking for marker in console.log. + + Args: + log_path: Path to CS2 console.log + last_position: File position to start reading from + + Returns: + Tuple of (demo_ended: bool, new_position: int) + """ + if not log_path.exists(): + return False, last_position + + demo_end_pattern = re.compile(r"CGameRules - paused on tick") + + try: + with open(log_path, 'r', errors='ignore') as f: + f.seek(last_position) + content = f.read() + new_position = f.tell() + + if demo_end_pattern.search(content): + return True, new_position + + return False, new_position + except Exception: + return False, last_position + + +def wait_for_map_load(log_path: Path, timeout: float = 120, poll_interval: float = 0.5) -> bool: + """Wait for map to finish loading by watching console.log. + + Looks for: [Client] Created physics for <map_name> + + Args: + log_path: Path to CS2 console.log + timeout: Maximum time to wait in seconds + poll_interval: Time between checks + + Returns: + True if map load detected, False if timeout + """ + import time + + map_load_pattern = re.compile(r"\[Client\] Created physics for") + start = time.time() + last_position = 0 + + while time.time() - start < timeout: + if not log_path.exists(): + time.sleep(poll_interval) + continue + + try: + with open(log_path, 'r', errors='ignore') as f: + f.seek(last_position) + content = f.read() + last_position = f.tell() + + if map_load_pattern.search(content): + return True + except Exception: + pass + + time.sleep(poll_interval) + + return False + + +def wait_for_cs2_window(display: str = ":0", timeout: float = 120, poll_interval: float = 2.0) -> Optional[str]: + """Wait for CS2 window to appear. + + Args: + display: X display + timeout: Maximum time to wait in seconds + poll_interval: Time between checks + + Returns: + Window ID when found, or None if timeout + """ + import time + start = time.time() + while time.time() - start < timeout: + window_id = find_cs2_window(display) + if window_id: + return window_id + time.sleep(poll_interval) + return None diff --git a/cs2pov/capture.py b/cs2pov/capture.py new file mode 100644 index 0000000..1cb2692 --- /dev/null +++ b/cs2pov/capture.py @@ -0,0 +1,200 @@ +"""FFmpeg video and audio capture - simplified, full-display only.""" + +import signal +import subprocess +import time +from pathlib import Path +from typing import Optional + +from .exceptions import CaptureError + + +def get_default_audio_monitor() -> Optional[str]: + """Detect the PulseAudio monitor device for the default output sink. + + The monitor device captures all audio being played (game audio, etc.) + rather than microphone input. + + Returns: + Monitor device name (e.g., 'alsa_output.pci-xxx.monitor') or None if detection fails + """ + try: + # Get the default output sink name + result = subprocess.run( + ["pactl", "get-default-sink"], + capture_output=True, + text=True, + timeout=5 + ) + if result.returncode == 0 and result.stdout.strip(): + sink_name = result.stdout.strip() + # The monitor source is the sink name + ".monitor" + return f"{sink_name}.monitor" + except (FileNotFoundError, subprocess.TimeoutExpired): + pass + return None + + +class FFmpegCapture: + """Manages FFmpeg x11grab video and PulseAudio capture - full display only.""" + + def __init__( + self, + display: str, + output_path: Path, + resolution: tuple[int, int] = (1920, 1080), + framerate: int = 60, + audio_device: Optional[str] = None, + enable_audio: bool = True, + ): + self.display = display + self.output_path = output_path + self.resolution = resolution + self.framerate = framerate + self.audio_device = audio_device + self.enable_audio = enable_audio + self.process: Optional[subprocess.Popen] = None + self.stderr_path: Optional[Path] = None # For debugging + + def start(self): + """Start FFmpeg capture. + + Raises: + CaptureError: If FFmpeg fails to start + """ + video_size = f"{self.resolution[0]}x{self.resolution[1]}" + + # Always capture full display at origin + # CS2 runs fullscreen, and window-specific capture breaks when geometry changes + input_source = f"{self.display}+0,0" + + # Save stderr to file for debugging + self.stderr_path = self.output_path.parent / f"{self.output_path.stem}_ffmpeg.log" + + # Determine audio device if audio is enabled + audio_source = None + if self.enable_audio: + if self.audio_device: + audio_source = self.audio_device + else: + # Auto-detect the monitor device for the default output sink + audio_source = get_default_audio_monitor() + + # Fragmented MP4 for resilience - playable even if interrupted + # Large thread_queue_size to prevent frame drops + # Keyframe every 2 seconds for better seeking/trimming + cmd = [ + "ffmpeg", + "-y", # Overwrite output + # Video input (x11grab) + "-thread_queue_size", "4096", # Large buffer + "-f", "x11grab", + "-draw_mouse", "0", # Hide cursor + "-video_size", video_size, + "-framerate", str(self.framerate), + "-i", input_source, + ] + + # Add audio input if available + if audio_source: + cmd.extend([ + # Audio input (PulseAudio) + "-thread_queue_size", "4096", + "-f", "pulse", + "-i", audio_source, + ]) + + # Video encoding settings + cmd.extend([ + "-c:v", "libx264", + "-preset", "medium", + "-crf", "18", + "-pix_fmt", "yuv420p", + "-g", str(self.framerate * 2), # Keyframe every 2 seconds + ]) + + # Audio encoding settings (if audio source available) + if audio_source: + cmd.extend([ + "-c:a", "aac", + "-b:a", "192k", + ]) + + # Output settings + cmd.extend([ + "-movflags", "+frag_keyframe+empty_moov+default_base_moof", + str(self.output_path), + ]) + + try: + # Write stderr to file for debugging + self._stderr_file = open(self.stderr_path, 'w') + self.process = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.DEVNULL, + stderr=self._stderr_file, + ) + except FileNotFoundError: + raise CaptureError("FFmpeg not found. Install with: apt install ffmpeg") + + # Brief check that it started + time.sleep(0.3) + if self.process.poll() is not None: + self._stderr_file.close() + stderr = self.stderr_path.read_text() if self.stderr_path.exists() else "" + raise CaptureError(f"FFmpeg failed to start: {stderr}") + + def is_running(self) -> bool: + """Check if FFmpeg is still running.""" + return self.process is not None and self.process.poll() is None + + def stop(self, timeout: float = 30) -> bool: + """Stop FFmpeg capture gracefully. + + Args: + timeout: Maximum time to wait for FFmpeg to finish + + Returns: + True if stopped gracefully, False if force-killed + """ + if not self.process: + return True + + graceful = True + + # Send 'q' to quit gracefully (allows proper file finalization) + if self.process.stdin: + try: + self.process.stdin.write(b"q\n") + self.process.stdin.flush() + self.process.stdin.close() + except (BrokenPipeError, OSError): + pass + + try: + self.process.wait(timeout=timeout) + except subprocess.TimeoutExpired: + graceful = False + # Try SIGINT first (like Ctrl+C) - FFmpeg handles this gracefully + self.process.send_signal(signal.SIGINT) + try: + self.process.wait(timeout=5) + graceful = True + except subprocess.TimeoutExpired: + self.process.terminate() + try: + self.process.wait(timeout=5) + except subprocess.TimeoutExpired: + self.process.kill() + self.process.wait() + + # Close stderr file + if hasattr(self, '_stderr_file') and self._stderr_file: + try: + self._stderr_file.close() + except Exception: + pass + + self.process = None + return graceful diff --git a/cs2pov/cli.py b/cs2pov/cli.py new file mode 100644 index 0000000..1793be8 --- /dev/null +++ b/cs2pov/cli.py @@ -0,0 +1,561 @@ +#!/usr/bin/env python3 +"""CS2 POV Recorder - Record player POV from CS2 demo files. + +Simplified architecture with linear flow: +1. Setup (parse demo, generate config, launch CS2) +2. Recording loop (single-threaded, blocking) +3. Cleanup (stop FFmpeg, kill CS2) +4. Post-processing (trim death periods) +""" + +import argparse +import shutil +import subprocess +import sys +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Optional + +from . import __version__ +from .automation import check_xdotool, send_key, check_demo_ended, wait_for_cs2_window, wait_for_map_load +from .capture import FFmpegCapture, get_default_audio_monitor +from .config import RecordingConfig, generate_recording_cfg +from .exceptions import CS2POVError +from .game import CS2Process, find_cs2_path, get_cfg_dir, get_demo_dir +from .parser import DemoInfo, find_player, get_player_index, parse_demo +from .trim import extract_death_periods, trim_death_periods + + +@dataclass +class RecordingResult: + """Result of recording phase with metadata for post-processing.""" + success: bool + video_path: Path + console_log_path: Path + recording_start_time: float + player_slot: int + exit_reason: str # "demo_ended", "timeout", "ffmpeg_stopped", "interrupted" + + +def check_dependencies() -> list[str]: + """Check for required system dependencies.""" + missing = [] + if not shutil.which("ffmpeg"): + missing.append("FFmpeg - Install with: apt install ffmpeg") + if not shutil.which("xdotool"): + missing.append("xdotool - Install with: apt install xdotool") + return missing + + +def parse_resolution(resolution_str: str) -> tuple[int, int]: + """Parse resolution string like '1920x1080' to tuple.""" + try: + width, height = resolution_str.lower().split("x") + return (int(width), int(height)) + except ValueError: + raise ValueError(f"Invalid resolution format: {resolution_str}. Use WxH (e.g., 1920x1080)") + + +def print_demo_info(demo_info: DemoInfo): + """Print parsed demo information.""" + print(f" Map: {demo_info.map_name}") + print(f" Ticks: {demo_info.total_ticks}") + print(f" Tick rate: {demo_info.tick_rate}") + print(f" Players: {len(demo_info.players)}") + for p in demo_info.players: + team_str = f" [{p.team}]" if p.team else "" + print(f" - {p.name}{team_str} ({p.steamid})") + print(f" Rounds: {len(demo_info.rounds)}") + + +def recording_loop( + display: str, + console_log_path: Path, + cs2_process: CS2Process, + ffmpeg: FFmpegCapture, + timeout: float, + verbose: bool = False, +) -> str: + """Main recording loop - single-threaded, blocking. + + Args: + display: X display string + console_log_path: Path to CS2 console.log + cs2_process: CS2 process manager + ffmpeg: FFmpeg capture instance + timeout: Maximum recording time in seconds + verbose: Print verbose output + + Returns: + Exit reason: "demo_ended", "cs2_exited", "timeout", "ffmpeg_stopped" + """ + start_time = time.time() + last_spec_lock = 0 + log_position = 0 + window_id = None + window_found_logged = False + last_status_time = start_time + + print(" Recording loop started") + + while True: + elapsed = time.time() - start_time + + # Check timeout + if elapsed > timeout: + print(f" Timeout reached ({timeout/60:.1f} min)") + return "timeout" + + # Check if CS2 still running + if not cs2_process.is_running(): + print(" CS2 exited") + return "cs2_exited" + + # Check if FFmpeg still running + if not ffmpeg.is_running(): + print(" FFmpeg stopped unexpectedly") + return "ffmpeg_stopped" + + # Check for demo end in console.log + demo_ended, log_position = check_demo_ended(console_log_path, log_position) + if demo_ended: + print(" Demo end detected in console.log") + return "demo_ended" + + # Send spec_lock (F5) every 3 seconds + if elapsed - last_spec_lock >= 3.0: + if window_id is None: + from .automation import find_cs2_window + window_id = find_cs2_window(display) + if window_id and not window_found_logged: + if verbose: + print(f" CS2 window found: {window_id}") + window_found_logged = True + + if window_id: + send_key("F5", display, window_id) + last_spec_lock = elapsed + + # Print status every 60 seconds + if elapsed - last_status_time >= 60: + print(f" Still recording... ({elapsed/60:.1f} min)") + last_status_time = elapsed + + time.sleep(0.5) + + +def record_demo( + demo_path: Path, + player_identifier: str, + output_path: Path, + resolution: tuple[int, int] = (1920, 1080), + framerate: int = 60, + hide_hud: bool = True, + display_num: int = 0, + verbose: bool = False, + cs2_path_override: Path | None = None, + enable_audio: bool = True, + audio_device: str | None = None, +) -> RecordingResult: + """Record a player's POV from a demo file. + + This is a blocking function that handles the entire recording process. + Cleanup is handled in finally block - always runs. + """ + # Find CS2 installation + print("Finding CS2 installation...") + cs2_path = find_cs2_path(cs2_path_override) + print(f" Found: {cs2_path}") + + # Parse demo + print(f"Parsing demo: {demo_path.name}") + demo_info = parse_demo(demo_path) + if verbose: + print_demo_info(demo_info) + else: + print(f" Map: {demo_info.map_name}, {len(demo_info.players)} players") + print(f" Ticks: {demo_info.total_ticks}, Rate: {demo_info.tick_rate}/s") + + # Find target player + player = find_player(demo_info, player_identifier) + player_index = get_player_index(demo_info, player) + print(f"Recording POV: {player.name} (SteamID: {player.steamid}, index: {player_index})") + + # Prepare directories + demo_dir = get_demo_dir(cs2_path) + demo_dir.mkdir(parents=True, exist_ok=True) + cfg_dir = get_cfg_dir(cs2_path) + cfg_dir.mkdir(parents=True, exist_ok=True) + + # Copy demo to CS2 replays directory if needed + target_demo = demo_dir / demo_path.name + if not target_demo.exists(): + print(f"Copying demo to: {target_demo}") + shutil.copy2(demo_path, target_demo) + elif not target_demo.samefile(demo_path): + print(f"Updating demo at: {target_demo}") + shutil.copy2(demo_path, target_demo) + + # Generate recording config + cfg_path = cfg_dir / "cs2pov_recording.cfg" + config = RecordingConfig( + demo_name=demo_path.stem, + player_index=player_index, + player_name=player.name, + player_steamid=player.steamid, + resolution=resolution, + hide_hud=hide_hud, + ) + generate_recording_cfg(config, cfg_path) + print(f"Generated config: {cfg_path.name}") + + # Ensure output directory exists + output_path.parent.mkdir(parents=True, exist_ok=True) + + # Setup paths + display_str = f":{display_num}" + console_log_path = cs2_path / "game/csgo/console.log" + cs2_log_path = output_path.parent / f"{output_path.stem}_cs2.log" + + # Calculate timeout + if demo_info.tick_rate > 0 and demo_info.total_ticks > 0: + estimated_duration = demo_info.total_ticks / demo_info.tick_rate + else: + estimated_duration = 3600 + timeout = estimated_duration + 600 # Add 10 min buffer + + print(f"Starting recording (timeout: {timeout/60:.1f} min)...") + print(f" Display: {display_str}") + print(f" Output: {output_path}") + + # Initialize objects + cs2_process: Optional[CS2Process] = None + ffmpeg: Optional[FFmpegCapture] = None + recording_start_time = 0.0 + exit_reason = "unknown" + + try: + # Delete old console.log to ensure fresh log for this recording + if console_log_path.exists(): + console_log_path.unlink() + if verbose: + print(f" Deleted old console.log") + + # Launch CS2 + cs2_process = CS2Process(cs2_path, display_str, log_path=cs2_log_path) + cs2_process.launch("cs2pov_recording.cfg") + print(" CS2 launching via Steam...") + + # Wait for CS2 window to appear + print(" Waiting for CS2 window...") + window_id = wait_for_cs2_window(display_str, timeout=120) + if window_id: + print(f" CS2 window ready") + else: + print(" Warning: CS2 window not detected, continuing anyway") + + # Wait for map to load, then hide demo UI with Shift+F2 + print(" Waiting for map to load...") + if wait_for_map_load(console_log_path, timeout=120): + if verbose: + print(" Map loaded, waiting 10s before hiding UI...") + time.sleep(10) + if window_id and send_key("shift+F2", display_str, window_id): + if verbose: + print(" Sent Shift+F2 to hide demo UI") + elif window_id: + print(" Warning: Failed to send Shift+F2 to hide demo UI") + else: + print(" Warning: Map load not detected, continuing anyway") + + # Determine audio source + audio_source = None + if enable_audio: + audio_source = audio_device or get_default_audio_monitor() + if audio_source: + if verbose: + print(f" Audio device: {audio_source}") + else: + print(" Warning: Could not detect audio device, recording video only") + + # Start FFmpeg capture (full display + audio) + ffmpeg = FFmpegCapture( + display=display_str, + output_path=output_path, + resolution=resolution, + framerate=framerate, + audio_device=audio_source if enable_audio else None, + enable_audio=enable_audio and audio_source is not None, + ) + ffmpeg.start() + if ffmpeg.enable_audio: + print(f" FFmpeg capture started (video + audio)") + else: + print(f" FFmpeg capture started (video only)") + + # Record start time for death period extraction + recording_start_time = time.time() + + # Run main recording loop (blocking) + exit_reason = recording_loop( + display=display_str, + console_log_path=console_log_path, + cs2_process=cs2_process, + ffmpeg=ffmpeg, + timeout=timeout, + verbose=verbose, + ) + + except KeyboardInterrupt: + print("\n Recording interrupted by user") + exit_reason = "interrupted" + + finally: + # Cleanup - always runs + print("\nCleaning up...") + + # Stop FFmpeg first (so video file is finalized) + if ffmpeg: + print(" Stopping FFmpeg...") + graceful = ffmpeg.stop(timeout=15) + if graceful: + print(" FFmpeg stopped gracefully") + else: + print(" FFmpeg force-stopped") + if ffmpeg.stderr_path and ffmpeg.stderr_path.exists(): + print(f" FFmpeg log: {ffmpeg.stderr_path}") + + # Then stop CS2 + if cs2_process and cs2_process.is_running(): + print(" Terminating CS2...") + cs2_process.terminate() + + print(" Cleanup complete") + + # Save console.log with demo-matching name for retention + if console_log_path.exists(): + saved_log_path = output_path.parent / f"console_{demo_path.stem}.log" + shutil.copy2(console_log_path, saved_log_path) + print(f" Console log saved: {saved_log_path.name}") + # Update path for post-processing to use the saved copy + console_log_path = saved_log_path + + # Return result with metadata for post-processing + success = exit_reason in ("demo_ended", "cs2_exited") + return RecordingResult( + success=success, + video_path=output_path, + console_log_path=console_log_path, + recording_start_time=recording_start_time, + player_slot=player_index, + exit_reason=exit_reason, + ) + + +def postprocess_video( + video_path: Path, + console_log_path: Path, + player_slot: int, + recording_start_time: float, + verbose: bool = False, +) -> Path: + """Post-process a recorded video to trim death periods. + + This is a separate phase that can be run independently. + """ + if not video_path.exists(): + print(f"Error: Video file not found: {video_path}") + return video_path + + raw_size_mb = video_path.stat().st_size / (1024 * 1024) + print(f"\nRaw recording: {video_path} ({raw_size_mb:.1f} MB)") + + print("\nPost-processing: extracting death periods...") + if verbose: + print(f" Console log: {console_log_path}") + print(f" Player slot: {player_slot}") + print(f" Recording start: {recording_start_time}") + + if not console_log_path.exists(): + print(f" Console log not found, skipping trim") + return video_path + + death_periods = extract_death_periods( + log_path=console_log_path, + player_slot=player_slot, + recording_start_time=recording_start_time, + verbose=verbose + ) + + if death_periods: + print(f" Found {len(death_periods)} death periods") + total_dead_time = sum(p.duration for p in death_periods) + print(f" Total dead time: {total_dead_time:.1f}s") + + # Rename original to _raw + raw_path = video_path.parent / f"{video_path.stem}_raw{video_path.suffix}" + video_path.rename(raw_path) + print(f" Raw recording moved to: {raw_path.name}") + + print(" Trimming death periods...") + success = trim_death_periods( + input_path=raw_path, + output_path=video_path, + death_periods=death_periods, + verbose=verbose + ) + + if success and video_path.exists(): + final_size_mb = video_path.stat().st_size / (1024 * 1024) + print(f"\nFinal recording: {video_path} ({final_size_mb:.1f} MB)") + else: + print(f"\nTrimming failed, restoring raw recording") + if not video_path.exists() and raw_path.exists(): + raw_path.rename(video_path) + else: + print(" No death periods found, keeping original") + print(f"\nRecording saved: {video_path} ({raw_size_mb:.1f} MB)") + + return video_path + + +def main(): + """Main entry point.""" + parser = argparse.ArgumentParser( + description="Record a player's POV from a CS2 demo file", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog="""\ +Examples: + %(prog)s -d match.dem -p "PlayerName" -o recording.mp4 + %(prog)s -d match.dem -p "PlayerName" -o recording.mp4 --no-trim + %(prog)s -d match.dem -p "PlayerName" -o recording.mp4 --skip-recording \\ + --console-log /path/to/console.log --player-slot 3 --recording-start-time 1706500000 +""", + ) + + parser.add_argument("-d", "--demo", required=True, type=Path, help="Path to demo file (.dem)") + parser.add_argument("-p", "--player", required=True, help="Player to record (name or SteamID)") + parser.add_argument("-o", "--output", required=True, type=Path, help="Output video file path") + parser.add_argument("-r", "--resolution", default="1920x1080", help="Recording resolution") + parser.add_argument("-f", "--framerate", type=int, default=60, help="Recording framerate") + parser.add_argument("--no-hud", action="store_true", help="Hide HUD elements") + parser.add_argument("--display", type=int, default=0, help="X display number") + parser.add_argument("--no-audio", action="store_true", help="Disable audio recording") + parser.add_argument("--audio-device", help="PulseAudio device for audio capture (auto-detected by default)") + parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") + parser.add_argument("--cs2-path", type=Path, help="Path to CS2 installation") + parser.add_argument("--no-trim", action="store_true", help="Skip post-processing trim") + parser.add_argument("--skip-recording", action="store_true", help="Skip recording, only post-process") + parser.add_argument("--console-log", type=Path, help="Console log path (for --skip-recording)") + parser.add_argument("--player-slot", type=int, help="Player slot (for --skip-recording)") + parser.add_argument("--recording-start-time", type=float, help="Recording start time (for --skip-recording)") + parser.add_argument("--version", action="version", version=f"%(prog)s {__version__}") + + # Subcommand for listing players + subparsers = parser.add_subparsers(dest="command") + list_parser = subparsers.add_parser("list", help="List players in a demo") + list_parser.add_argument("demo", type=Path, help="Path to demo file") + + args = parser.parse_args() + + # Handle list subcommand + if args.command == "list": + try: + demo_info = parse_demo(args.demo) + print(f"Demo: {args.demo.name}") + print_demo_info(demo_info) + return 0 + except CS2POVError as e: + print(f"Error: {e}", file=sys.stderr) + return 1 + + # Check dependencies + missing = check_dependencies() + if missing: + print("Missing dependencies:", file=sys.stderr) + for dep in missing: + print(f" - {dep}", file=sys.stderr) + return 1 + + # Validate inputs + demo_path = args.demo.resolve() + if not demo_path.exists(): + print(f"Error: Demo file not found: {demo_path}", file=sys.stderr) + return 1 + + output_path = args.output.resolve() + + try: + resolution = parse_resolution(args.resolution) + except ValueError as e: + print(f"Error: {e}", file=sys.stderr) + return 1 + + # Handle --skip-recording mode + if args.skip_recording: + if not args.console_log: + print("Error: --skip-recording requires --console-log", file=sys.stderr) + return 1 + if args.player_slot is None: + print("Error: --skip-recording requires --player-slot", file=sys.stderr) + return 1 + if args.recording_start_time is None: + print("Error: --skip-recording requires --recording-start-time", file=sys.stderr) + return 1 + if not output_path.exists(): + print(f"Error: Video file not found: {output_path}", file=sys.stderr) + return 1 + + print("Post-processing only (--skip-recording mode)") + postprocess_video( + video_path=output_path, + console_log_path=args.console_log.resolve(), + player_slot=args.player_slot, + recording_start_time=args.recording_start_time, + verbose=args.verbose, + ) + return 0 + + # Phase 1-3: Record (with built-in cleanup) + try: + result = record_demo( + demo_path=demo_path, + player_identifier=args.player, + output_path=output_path, + resolution=resolution, + framerate=args.framerate, + hide_hud=args.no_hud, + display_num=args.display, + verbose=args.verbose, + cs2_path_override=args.cs2_path, + enable_audio=not args.no_audio, + audio_device=args.audio_device, + ) + except CS2POVError as e: + print(f"Error: {e}", file=sys.stderr) + return 1 + + # Phase 4: Post-process (completely separate) + if result.success and not args.no_trim: + postprocess_video( + video_path=result.video_path, + console_log_path=result.console_log_path, + player_slot=result.player_slot, + recording_start_time=result.recording_start_time, + verbose=args.verbose, + ) + elif result.success: + size_mb = result.video_path.stat().st_size / (1024 * 1024) + print(f"\nRecording saved: {result.video_path} ({size_mb:.1f} MB)") + else: + print(f"\nRecording ended with: {result.exit_reason}") + if result.video_path.exists(): + size_mb = result.video_path.stat().st_size / (1024 * 1024) + print(f"Partial recording available: {result.video_path} ({size_mb:.1f} MB)") + + return 0 if result.success else 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/cs2pov/config.py b/cs2pov/config.py new file mode 100644 index 0000000..dc1c933 --- /dev/null +++ b/cs2pov/config.py @@ -0,0 +1,135 @@ +"""CS2 recording configuration generation.""" + +from dataclasses import dataclass +from pathlib import Path + + +@dataclass +class RecordingConfig: + """Configuration for recording a demo POV.""" + + demo_name: str + player_index: int + player_name: str = "" + player_steamid: int = 0 + resolution: tuple[int, int] = (1920, 1080) + hide_hud: bool = True + spec_mode: int = 4 # 4 = first-person, 5 = third-person, 6 = free roam + + @property + def player_account_id(self) -> int: + """Convert SteamID64 to account ID (for spec_player_by_accountid).""" + if self.player_steamid > 76561197960265728: + return self.player_steamid - 76561197960265728 + return 0 + + +# CFG template for recording +# Note: playdemo is asynchronous - commands after it execute before demo loads +# We use binds and aliases so user can lock to player once demo is ready +BASE_CFG_TEMPLATE = """\ +// CS2 POV Recording Configuration +// Generated by cs2pov +// Target player: {player_name} + +// Disable unnecessary visual elements +spec_show_xray 0 +cl_show_observer_crosshair 0 + +// HUD settings +{hud_commands} + +// Set spectator mode (4 = first-person POV) +spec_mode {spec_mode} + +// Auto-quit when demo ends +demo_quitafterplayback 1 + +// Create alias to lock to player by name (more reliable than index) +alias "lock_pov" "spec_player \\"{player_name}\\"" + +// Bind F5 to lock to player (automation will press this) +bind "F5" "lock_pov" + +// Also try binding to a mouse button for manual use +bind "MOUSE4" "lock_pov" + +// Load and play the demo from replays directory +playdemo "replays/{demo_name}" + +// These run before demo loads but we try anyway +spec_player "{player_name}" +spec_mode {spec_mode} + +// Tell user what's happening +echo "==========================================" +echo "CS2POV: Recording {player_name}" +echo "CS2POV: Automation will keep view locked" +echo "==========================================" +""" + +# Additional commands to hide HUD elements (CS2 compatible) +HUD_HIDE_COMMANDS = """\ +cl_draw_only_deathnotices 1 +cl_drawhud 0\ +""" + +HUD_SHOW_COMMANDS = """\ +cl_draw_only_deathnotices 0 +cl_drawhud 1\ +""" + + +def generate_recording_cfg(config: RecordingConfig, output_path: Path) -> Path: + """Generate a CS2 recording configuration file. + + Args: + config: Recording configuration + output_path: Path to write the CFG file + + Returns: + Path to the generated CFG file + """ + hud_commands = HUD_HIDE_COMMANDS if config.hide_hud else HUD_SHOW_COMMANDS + + cfg_content = BASE_CFG_TEMPLATE.format( + hud_commands=hud_commands, + spec_mode=config.spec_mode, + player_name=config.player_name, + demo_name=config.demo_name, + ) + + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(cfg_content) + + return output_path + + +def generate_spec_player_cfg(player_index: int, output_path: Path) -> Path: + """Generate a separate CFG to lock spectator to a specific player. + + This is called after the demo starts loading to ensure the player + entities exist before trying to spectate them. + + Args: + player_index: Player index to spectate (0-based from our player list) + output_path: Path to write the CFG file + + Returns: + Path to the generated CFG file + """ + # In CS2 demos, spec_player takes the entity index + # Players typically have entity indices starting around 1-10 + # We'll try the player index + 1 as a starting point + entity_index = player_index + 1 + + cfg_content = f"""\ +// Lock to player +spec_player {entity_index} +spec_mode 4 +""" + + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(cfg_content) + + return output_path diff --git a/cs2pov/exceptions.py b/cs2pov/exceptions.py new file mode 100644 index 0000000..959b9f6 --- /dev/null +++ b/cs2pov/exceptions.py @@ -0,0 +1,49 @@ +"""Custom exceptions for CS2 POV recorder.""" + + +class CS2POVError(Exception): + """Base exception for CS2 POV recorder.""" + + pass + + +class DemoNotFoundError(CS2POVError): + """Demo file does not exist.""" + + pass + + +class PlayerNotFoundError(CS2POVError): + """Target player not found in demo.""" + + pass + + +class GPUNotAvailableError(CS2POVError): + """No Vulkan-capable GPU detected.""" + + pass + + +class CS2NotFoundError(CS2POVError): + """CS2 installation not found.""" + + pass + + +class CS2LaunchError(CS2POVError): + """Failed to launch CS2.""" + + pass + + +class CaptureError(CS2POVError): + """FFmpeg or display capture failed.""" + + pass + + +class DemoParseError(CS2POVError): + """Failed to parse demo file.""" + + pass diff --git a/cs2pov/game.py b/cs2pov/game.py new file mode 100644 index 0000000..0a235b1 --- /dev/null +++ b/cs2pov/game.py @@ -0,0 +1,330 @@ +"""CS2 game process management.""" + +import os +import shutil +import subprocess +from pathlib import Path +from typing import Optional + +from .exceptions import CS2NotFoundError, CS2LaunchError + + +# Common Steam library paths on Linux +STEAM_PATHS = [ + Path.home() / ".steam/steam/steamapps/common/Counter-Strike Global Offensive", + Path.home() / ".local/share/Steam/steamapps/common/Counter-Strike Global Offensive", + Path("/opt/steam/steamapps/common/Counter-Strike Global Offensive"), + Path("/mnt/games/SteamLibrary/steamapps/common/Counter-Strike Global Offensive"), +] + +# CS2 Steam App ID +CS2_APP_ID = 730 + + +def validate_cs2_path(path: Path) -> Path: + """Validate that a path contains a CS2 installation. + + Args: + path: Path to validate + + Returns: + The validated path + + Raises: + CS2NotFoundError: If path doesn't contain CS2 + """ + cs2_binary = path / "game/bin/linuxsteamrt64/cs2" + if cs2_binary.exists(): + return path + raise CS2NotFoundError( + f"CS2 not found at: {path}\n" + f"Expected binary at: {cs2_binary}" + ) + + +def find_cs2_path(custom_path: Optional[Path] = None) -> Path: + """Locate CS2 installation directory. + + Args: + custom_path: Optional custom path to CS2 installation + + Returns: + Path to CS2 installation + + Raises: + CS2NotFoundError: If CS2 installation not found + """ + # Check custom path first + if custom_path: + return validate_cs2_path(custom_path) + + # Check environment variable + env_path = os.environ.get("CS2_PATH") + if env_path: + return validate_cs2_path(Path(env_path)) + + # Search standard paths + for path in STEAM_PATHS: + cs2_binary = path / "game/bin/linuxsteamrt64/cs2" + if cs2_binary.exists(): + return path + + raise CS2NotFoundError( + "CS2 installation not found. Checked paths:\n" + + "\n".join(f" - {p}" for p in STEAM_PATHS) + + "\n\nSpecify with --cs2-path or CS2_PATH environment variable" + ) + + +def get_demo_dir(cs2_path: Path) -> Path: + """Get the demo directory path. + + Args: + cs2_path: CS2 installation path + + Returns: + Path to the replays directory + """ + return cs2_path / "game/csgo/replays" + + +def get_cfg_dir(cs2_path: Path) -> Path: + """Get the cfg directory path. + + Args: + cs2_path: CS2 installation path + + Returns: + Path to the cfg directory + """ + return cs2_path / "game/csgo/cfg" + + +class CS2Process: + """Manages CS2 game process for demo playback.""" + + def __init__(self, cs2_path: Path, display: str, log_path: Optional[Path] = None): + """Initialize CS2 process manager. + + Args: + cs2_path: Path to CS2 installation + display: X display to use (e.g., ":99") + log_path: Optional path for CS2 log output + """ + self.cs2_path = cs2_path + self.display = display + self.log_path = log_path + self.process: Optional[subprocess.Popen] = None + self._log_file: Optional[object] = None + + def launch(self, cfg_name: str, extra_args: Optional[list[str]] = None) -> subprocess.Popen: + """Launch CS2 via Steam with recording configuration. + + Args: + cfg_name: Name of CFG file to execute (without path) + extra_args: Additional command line arguments + + Returns: + The subprocess.Popen object + """ + steam_path = shutil.which("steam") + if not steam_path: + raise CS2LaunchError("Steam executable not found in PATH") + + env = os.environ.copy() + env["DISPLAY"] = self.display + + # Use steam -applaunch which reliably passes arguments to the game + cmd = [ + steam_path, + "-applaunch", str(CS2_APP_ID), + "-novid", + "-console", + "-condebug", # Enable console logging to console.log + "-fullscreen", + "-w", "1920", + "-h", "1080", + "+exec", cfg_name, + ] + if extra_args: + cmd.extend(extra_args) + + # Set up logging if path provided + if self.log_path: + self._log_file = open(self.log_path, "w") + stdout_dest = self._log_file + stderr_dest = self._log_file + else: + stdout_dest = subprocess.DEVNULL + stderr_dest = subprocess.DEVNULL + + # Launch CS2 via Steam + # Steam will launch CS2 with proper authentication + self.process = subprocess.Popen( + cmd, + env=env, + stdout=stdout_dest, + stderr=stderr_dest, + ) + + return self.process + + def find_cs2_process(self) -> Optional[int]: + """Find the actual CS2 process PID (launched by Steam). + + Returns: + PID of CS2 process, or None if not found + """ + # Try multiple patterns to find CS2 + patterns = [ + ["pgrep", "-x", "cs2"], # Exact match on process name + ["pgrep", "-f", "cs2_linux64"], # Match on command line + ["pgrep", "-f", "/cs2$"], # Match ending with /cs2 + ] + + for pattern in patterns: + try: + result = subprocess.run( + pattern, + capture_output=True, + text=True, + timeout=5 + ) + if result.returncode == 0 and result.stdout.strip(): + # Return first matching PID + pids = result.stdout.strip().split() + if pids: + return int(pids[0]) + except Exception: + continue + + return None + + def wait_for_exit( + self, + timeout: Optional[float] = None, + poll_interval: float = 2.0, + status_callback: Optional[callable] = None + ) -> int: + """Wait for CS2 to exit (demo playback complete). + + Since CS2 is launched via Steam, we monitor the CS2 process directly + rather than the Steam launcher process. + + Args: + timeout: Maximum time to wait in seconds (for demo playback, not startup) + poll_interval: How often to check if CS2 is running + status_callback: Optional function called each iteration, receives elapsed time. + Should return False to abort waiting. + + Returns: + 0 when CS2 exits + + Raises: + subprocess.TimeoutExpired: If timeout is reached + """ + import time + + # Wait for CS2 to actually start (Steam takes a moment) + cs2_pid = None + startup_timeout = 120 # Give CS2 up to 2 minutes to start + startup_start = time.time() + + print(" Waiting for CS2 to start...") + while cs2_pid is None: + cs2_pid = self.find_cs2_process() + if cs2_pid is None: + if time.time() - startup_start > startup_timeout: + raise subprocess.TimeoutExpired( + cmd="cs2", timeout=startup_timeout + ) + time.sleep(poll_interval) + + print(f" CS2 detected (PID: {cs2_pid})") + print(f" Monitoring CS2 process... (timeout: {timeout}s)" if timeout else " Monitoring CS2 process...") + + # Now wait for CS2 to exit - timeout starts from HERE, not from startup + playback_start = time.time() + consecutive_not_found = 0 # Track consecutive "not found" to avoid false positives + last_status_time = playback_start + + while True: + current_pid = self.find_cs2_process() + + if current_pid is None: + consecutive_not_found += 1 + # Require 3 consecutive checks to confirm CS2 is really gone + if consecutive_not_found >= 3: + print(" CS2 process ended") + return 0 + else: + consecutive_not_found = 0 + + if timeout is not None: + elapsed = time.time() - playback_start + if elapsed >= timeout: + raise subprocess.TimeoutExpired(cmd="cs2", timeout=timeout) + + # Print status every 60 seconds + if time.time() - last_status_time >= 60: + remaining = timeout - elapsed + print(f" Still recording... ({elapsed/60:.1f} min elapsed, {remaining/60:.1f} min remaining)") + last_status_time = time.time() + + # Call status callback if provided + if status_callback: + elapsed = time.time() - playback_start + if status_callback(elapsed) is False: + print(" Recording aborted by callback") + return 1 + + time.sleep(poll_interval) + + def is_running(self) -> bool: + """Check if CS2 is still running.""" + return self.find_cs2_process() is not None + + def terminate(self): + """Force terminate CS2 by PID (safe, won't affect our process).""" + import time + + # Close log file if open + if self._log_file: + try: + self._log_file.close() + except Exception: + pass + self._log_file = None + + # Find CS2's actual PID + cs2_pid = self.find_cs2_process() + if cs2_pid is None: + self.process = None + return + + # Try graceful termination with SIGTERM first + try: + os.kill(cs2_pid, 15) # SIGTERM + except (ProcessLookupError, PermissionError): + pass + + # Wait a moment for graceful shutdown + time.sleep(3) + + # If still running, use SIGKILL + if self.is_running(): + cs2_pid = self.find_cs2_process() + if cs2_pid: + try: + os.kill(cs2_pid, 9) # SIGKILL + except (ProcessLookupError, PermissionError): + pass + + self.process = None + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.terminate() + return False diff --git a/cs2pov/parser.py b/cs2pov/parser.py new file mode 100644 index 0000000..451ece2 --- /dev/null +++ b/cs2pov/parser.py @@ -0,0 +1,244 @@ +"""Demo parsing with demoparser2.""" + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional +import re + +from demoparser2 import DemoParser + +from .exceptions import DemoNotFoundError, DemoParseError, PlayerNotFoundError + + +@dataclass +class PlayerInfo: + """Information about a player in the demo.""" + + steamid: int + name: str + team: Optional[str] = None + + +@dataclass +class RoundInfo: + """Information about a round in the demo.""" + + round_num: int + start_tick: int + end_tick: Optional[int] = None + + +@dataclass +class DemoInfo: + """Parsed demo file information.""" + + path: Path + map_name: str + total_ticks: int + tick_rate: float + players: list[PlayerInfo] = field(default_factory=list) + rounds: list[RoundInfo] = field(default_factory=list) + + +def parse_demo(demo_path: Path) -> DemoInfo: + """Parse demo file and extract metadata, players, and round info. + + Args: + demo_path: Path to the demo file + + Returns: + DemoInfo with parsed demo data + + Raises: + DemoNotFoundError: If demo file doesn't exist + DemoParseError: If parsing fails + """ + if not demo_path.exists(): + raise DemoNotFoundError(f"Demo file not found: {demo_path}") + + try: + parser = DemoParser(str(demo_path)) + + # Parse header for basic info + header = parser.parse_header() + map_name = header.get("map_name", "unknown") + tick_rate = header.get("playback_ticks_per_second", 64) + total_ticks = header.get("playback_ticks", 0) + + # Get unique players from tick data + # parse_ticks returns a DataFrame with steamid and name columns + tick_df = parser.parse_ticks(["team_num"]) + players = _extract_players(tick_df) + + # Get round events + rounds = _extract_rounds(parser) + + return DemoInfo( + path=demo_path, + map_name=map_name, + total_ticks=total_ticks, + tick_rate=tick_rate, + players=players, + rounds=rounds, + ) + except Exception as e: + if isinstance(e, (DemoNotFoundError, DemoParseError)): + raise + raise DemoParseError(f"Failed to parse demo: {e}") from e + + +def _extract_players(tick_df) -> list[PlayerInfo]: + """Extract unique players from tick data DataFrame.""" + players = [] + seen_steamids = set() + + # tick_df has columns: tick, steamid, name, and any requested fields + if tick_df is None or len(tick_df) == 0: + return players + + # Get unique steamid/name combinations + for _, row in tick_df.drop_duplicates(subset=["steamid"]).iterrows(): + steamid = row.get("steamid") + if steamid and steamid not in seen_steamids: + seen_steamids.add(steamid) + name = row.get("name", f"Player_{steamid}") + team = None + if "team_num" in row: + team_num = row["team_num"] + if team_num == 2: + team = "T" + elif team_num == 3: + team = "CT" + players.append(PlayerInfo(steamid=steamid, name=name, team=team)) + + return players + + +def _extract_rounds(parser: DemoParser) -> list[RoundInfo]: + """Extract round boundaries from demo events.""" + rounds = [] + + try: + # Parse round_start and round_end events + events_df = parser.parse_event("round_start", "round_end") + + if events_df is None or len(events_df) == 0: + return rounds + + # Group events by type + round_starts = events_df[events_df["event_name"] == "round_start"].sort_values( + "tick" + ) + round_ends = events_df[events_df["event_name"] == "round_end"].sort_values( + "tick" + ) + + # Match starts with ends + round_num = 1 + for _, start_row in round_starts.iterrows(): + start_tick = start_row["tick"] + + # Find matching end (first end after this start) + end_tick = None + for _, end_row in round_ends.iterrows(): + if end_row["tick"] > start_tick: + end_tick = end_row["tick"] + break + + rounds.append( + RoundInfo(round_num=round_num, start_tick=start_tick, end_tick=end_tick) + ) + round_num += 1 + + except Exception: + # If event parsing fails, return empty rounds list + pass + + return rounds + + +def find_player(demo_info: DemoInfo, identifier: str) -> PlayerInfo: + """Find a player by name or SteamID. + + Args: + demo_info: Parsed demo information + identifier: Player name or SteamID string + + Returns: + PlayerInfo for the matched player + + Raises: + PlayerNotFoundError: If player not found + """ + if not demo_info.players: + raise PlayerNotFoundError("No players found in demo") + + # Try parsing as SteamID first + steamid = _parse_steamid(identifier) + if steamid is not None: + for player in demo_info.players: + if player.steamid == steamid: + return player + raise PlayerNotFoundError(f"Player with SteamID {steamid} not found in demo") + + # Try name match (case-insensitive) + identifier_lower = identifier.lower() + for player in demo_info.players: + if player.name.lower() == identifier_lower: + return player + + # Try partial name match + for player in demo_info.players: + if identifier_lower in player.name.lower(): + return player + + # List available players in error message + player_list = ", ".join(f"{p.name} ({p.steamid})" for p in demo_info.players) + raise PlayerNotFoundError( + f"Player '{identifier}' not found. Available players: {player_list}" + ) + + +def _parse_steamid(identifier: str) -> Optional[int]: + """Parse various SteamID formats to SteamID64. + + Supports: + - Raw SteamID64: 76561198012345678 + - STEAM_X:Y:Z format: STEAM_0:1:12345678 + - [U:1:X] format: [U:1:12345678] + """ + # Try raw int64 + try: + steamid = int(identifier) + if steamid > 76561197960265728: # Base SteamID64 value + return steamid + except ValueError: + pass + + # Try STEAM_X:Y:Z format + match = re.match(r"STEAM_(\d):(\d):(\d+)", identifier, re.IGNORECASE) + if match: + y = int(match.group(2)) + z = int(match.group(3)) + return 76561197960265728 + z * 2 + y + + # Try [U:1:X] format + match = re.match(r"\[U:1:(\d+)\]", identifier) + if match: + account_id = int(match.group(1)) + return 76561197960265728 + account_id + + return None + + +def get_player_index(demo_info: DemoInfo, player: PlayerInfo) -> int: + """Get the player's index in the player list (0-based). + + This is used for spec_player command which takes a slot number. + Note: The actual spec_player slot may differ from this index + depending on how CS2 assigns slots during demo playback. + """ + for i, p in enumerate(demo_info.players): + if p.steamid == player.steamid: + return i + return 0 diff --git a/cs2pov/trim.py b/cs2pov/trim.py new file mode 100644 index 0000000..6bfb97b --- /dev/null +++ b/cs2pov/trim.py @@ -0,0 +1,342 @@ +"""Post-processing: trim death periods from recorded video.""" + +import re +import subprocess +import tempfile +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Optional + +from .exceptions import CaptureError + + +@dataclass +class DeathPeriod: + """A period when the player was dead.""" + death_time: float # Video timestamp in seconds + respawn_time: float # Video timestamp in seconds + + @property + def duration(self) -> float: + return self.respawn_time - self.death_time + + +def parse_log_timestamp(timestamp_str: str, reference_date: Optional[datetime] = None) -> datetime: + """Parse a console.log timestamp in MM/DD HH:mm:ss format. + + Args: + timestamp_str: Timestamp string like "01/28 15:30:45" + reference_date: Reference date for year (defaults to current year) + + Returns: + datetime object + """ + if reference_date is None: + reference_date = datetime.now() + + # Parse MM/DD HH:mm:ss + parsed = datetime.strptime(timestamp_str, "%m/%d %H:%M:%S") + # Add the year from reference date + return parsed.replace(year=reference_date.year) + + +def extract_death_periods( + log_path: Path, + player_slot: int, + recording_start_time: float, + verbose: bool = False +) -> list[DeathPeriod]: + """Extract death periods from console.log. + + Finds periods between "Shutdown prediction for player slot X" and + "Added TrueView prediction for player slot X" lines. + + Also trims from the start of the recording until the player's POV is + first selected (first TrueView prediction). + + Args: + log_path: Path to console.log file + player_slot: Player slot index (0-based) + recording_start_time: Unix timestamp when recording started + verbose: Print debug output + + Returns: + List of DeathPeriod objects with video-relative timestamps + """ + if not log_path.exists(): + if verbose: + print(f" [Trim] Console log not found: {log_path}") + return [] + + # Patterns for death and respawn + # Format: MM/DD HH:mm:ss [Prediction] Shutdown prediction for player slot X. ... + death_pattern = re.compile( + rf"^(\d{{2}}/\d{{2}} \d{{2}}:\d{{2}}:\d{{2}}).*" + rf"\[Prediction\] Shutdown prediction for player slot {player_slot}\b" + ) + respawn_pattern = re.compile( + rf"^(\d{{2}}/\d{{2}} \d{{2}}:\d{{2}}:\d{{2}}).*" + rf"\[Prediction\] Added TrueView prediction for player slot {player_slot}\b" + ) + + # Reference date for parsing (use recording start time) + reference_date = datetime.fromtimestamp(recording_start_time) + + death_periods: list[DeathPeriod] = [] + pending_death_time: Optional[float] = None + first_pov_time: Optional[float] = None # Track first time POV is selected + + if verbose: + print(f" [Trim] Parsing log: {log_path}") + print(f" [Trim] Looking for player slot {player_slot}") + + with open(log_path, 'r', errors='ignore') as f: + for line in f: + # Check for death + death_match = death_pattern.match(line) + if death_match: + timestamp_str = death_match.group(1) + try: + log_time = parse_log_timestamp(timestamp_str, reference_date) + video_time = log_time.timestamp() - recording_start_time + if video_time >= 0: # Only consider events after recording started + pending_death_time = video_time + if verbose: + print(f" [Trim] Death at video time {video_time:.2f}s") + except ValueError as e: + if verbose: + print(f" [Trim] Failed to parse timestamp: {timestamp_str}: {e}") + continue + + # Check for respawn/POV selection + respawn_match = respawn_pattern.match(line) + if respawn_match: + timestamp_str = respawn_match.group(1) + try: + log_time = parse_log_timestamp(timestamp_str, reference_date) + video_time = log_time.timestamp() - recording_start_time + + # Track first POV selection time + if first_pov_time is None and video_time >= 0: + first_pov_time = video_time + if verbose: + print(f" [Trim] First POV selection at video time {video_time:.2f}s") + + # Handle respawn after death + if pending_death_time is not None and video_time > pending_death_time: + death_periods.append(DeathPeriod( + death_time=pending_death_time, + respawn_time=video_time + )) + if verbose: + print(f" [Trim] Respawn at video time {video_time:.2f}s " + f"(dead for {video_time - pending_death_time:.2f}s)") + pending_death_time = None + except ValueError as e: + if verbose: + print(f" [Trim] Failed to parse timestamp: {timestamp_str}: {e}") + + # Add initial period from start until first POV selection + if first_pov_time is not None and first_pov_time > 0.5: # Only if > 0.5s to avoid tiny trims + death_periods.insert(0, DeathPeriod( + death_time=0.0, + respawn_time=first_pov_time + )) + if verbose: + print(f" [Trim] Adding start trim: 0.0s - {first_pov_time:.2f}s") + + if verbose: + print(f" [Trim] Found {len(death_periods)} periods to trim") + + return death_periods + + +def get_video_duration(video_path: Path) -> float: + """Get video duration in seconds using ffprobe. + + Args: + video_path: Path to video file + + Returns: + Duration in seconds + + Raises: + CaptureError: If ffprobe fails + """ + try: + result = subprocess.run( + [ + "ffprobe", + "-v", "error", + "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", + str(video_path) + ], + capture_output=True, + text=True, + timeout=30 + ) + if result.returncode == 0: + return float(result.stdout.strip()) + raise CaptureError(f"ffprobe failed: {result.stderr}") + except FileNotFoundError: + raise CaptureError("ffprobe not found (part of ffmpeg)") + except (ValueError, subprocess.TimeoutExpired) as e: + raise CaptureError(f"Failed to get video duration: {e}") + + +def trim_death_periods( + input_path: Path, + output_path: Path, + death_periods: list[DeathPeriod], + verbose: bool = False +) -> bool: + """Trim death periods from video using FFmpeg concat demuxer. + + Creates segments for "alive" periods and concatenates them. + + Args: + input_path: Path to input video + output_path: Path for output video + death_periods: List of death periods to remove + verbose: Print debug output + + Returns: + True if trimming was performed, False if no trimming needed + """ + if not death_periods: + if verbose: + print(" [Trim] No death periods to trim") + return False + + # Get video duration + try: + video_duration = get_video_duration(input_path) + except CaptureError as e: + if verbose: + print(f" [Trim] Failed to get video duration: {e}") + return False + + if verbose: + print(f" [Trim] Video duration: {video_duration:.2f}s") + + # Sort death periods by start time + death_periods = sorted(death_periods, key=lambda p: p.death_time) + + # Calculate "alive" segments (inverse of death periods) + alive_segments: list[tuple[float, float]] = [] + current_pos = 0.0 + + for period in death_periods: + # Skip if death starts before current position (overlapping) + if period.death_time <= current_pos: + current_pos = max(current_pos, period.respawn_time) + continue + + # Add alive segment before this death + if period.death_time > current_pos: + alive_segments.append((current_pos, period.death_time)) + + current_pos = period.respawn_time + + # Add final segment after last death + if current_pos < video_duration: + alive_segments.append((current_pos, video_duration)) + + if not alive_segments: + if verbose: + print(" [Trim] No alive segments found") + return False + + if verbose: + print(f" [Trim] Found {len(alive_segments)} alive segments") + total_alive = sum(end - start for start, end in alive_segments) + total_dead = sum(p.duration for p in death_periods) + print(f" [Trim] Total alive time: {total_alive:.2f}s, dead time: {total_dead:.2f}s") + + # Create concat file and segment files in temp directory + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir_path = Path(tmpdir) + concat_file = tmpdir_path / "concat.txt" + segment_paths: list[Path] = [] + + # Extract each alive segment + for i, (start, end) in enumerate(alive_segments): + segment_path = tmpdir_path / f"segment_{i:03d}.mp4" + segment_paths.append(segment_path) + + duration = end - start + if verbose: + print(f" [Trim] Extracting segment {i}: {start:.2f}s - {end:.2f}s ({duration:.2f}s)") + + cmd = [ + "ffmpeg", + "-y", + "-ss", str(start), + "-i", str(input_path), + "-t", str(duration), + "-c", "copy", # No re-encoding + "-avoid_negative_ts", "make_zero", + str(segment_path) + ] + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=300 # 5 min per segment + ) + if result.returncode != 0: + if verbose: + print(f" [Trim] Segment extraction failed: {result.stderr}") + return False + except subprocess.TimeoutExpired: + if verbose: + print(f" [Trim] Segment extraction timed out") + return False + + # Create concat file + with open(concat_file, 'w') as f: + for segment_path in segment_paths: + # Escape single quotes in path + escaped_path = str(segment_path).replace("'", "'\\''") + f.write(f"file '{escaped_path}'\n") + + if verbose: + print(f" [Trim] Concatenating {len(segment_paths)} segments") + + # Concatenate segments + cmd = [ + "ffmpeg", + "-y", + "-f", "concat", + "-safe", "0", + "-i", str(concat_file), + "-c", "copy", + str(output_path) + ] + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=600 # 10 min for concat + ) + if result.returncode != 0: + if verbose: + print(f" [Trim] Concatenation failed: {result.stderr}") + return False + except subprocess.TimeoutExpired: + if verbose: + print(f" [Trim] Concatenation timed out") + return False + + if verbose: + if output_path.exists(): + output_duration = get_video_duration(output_path) + print(f" [Trim] Output video: {output_duration:.2f}s") + + return True diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..deb9220 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +demoparser2>=0.0.7 diff --git a/scripts/kill-cs2.sh b/scripts/kill-cs2.sh new file mode 100755 index 0000000..81860cb --- /dev/null +++ b/scripts/kill-cs2.sh @@ -0,0 +1,99 @@ +#!/bin/bash +# Kill all running CS2 processes AND clean up Steam lock files +# Usage: kill-cs2.sh [-f|--force] + +FORCE=0 +if [[ "$1" == "-f" || "$1" == "--force" ]]; then + FORCE=1 +fi + +echo "=== Finding CS2 processes ===" + +# Get CS2 PIDs +PIDS=$(pgrep -f "cs2" 2>/dev/null) + +if [ -z "$PIDS" ]; then + echo "No CS2 processes found" +else + for pid in $PIDS; do + echo "" + echo "Process $pid:" + ps -p "$pid" -o pid,ppid,state,cmd --no-headers 2>/dev/null + + # Check process state + STATE=$(ps -p "$pid" -o state= 2>/dev/null) + PPID=$(ps -p "$pid" -o ppid= 2>/dev/null | tr -d ' ') + + if [ "$STATE" = "Z" ]; then + echo " -> ZOMBIE process, killing parent (PID $PPID)" + sudo kill -9 "$PPID" 2>/dev/null + else + echo " -> Sending SIGKILL" + sudo kill -9 "$pid" 2>/dev/null + + # Check if it survived + sleep 0.5 + if kill -0 "$pid" 2>/dev/null; then + echo " -> Still alive! Trying parent (PID $PPID)" + sudo kill -9 "$PPID" 2>/dev/null + else + echo " -> Killed" + fi + fi + done +fi + +echo "" +echo "=== Killing Steam reaper/overlay ===" +sudo pkill -9 -f "reaper.*730" 2>/dev/null +sudo pkill -9 -f "gameoverlayui.*730" 2>/dev/null + +echo "" +echo "=== Cleaning lock files ===" + +# Find CS2 installation +CS2_PATHS=( + "${HOME}/.steam/steam/steamapps/common/Counter-Strike Global Offensive" + "${HOME}/.local/share/Steam/steamapps/common/Counter-Strike Global Offensive" + "/mnt/games/SteamLibrary/steamapps/common/Counter-Strike Global Offensive" +) + +for cs2path in "${CS2_PATHS[@]}"; do + if [ -d "$cs2path" ]; then + echo "Checking: $cs2path" + find "$cs2path" -name "*.lock" -o -name ".lock" 2>/dev/null | while read lock; do + echo " Removing: $lock" + rm -f "$lock" + done + fi +done + +# Temp lock files +rm -f /tmp/steam_730.lock /tmp/.steam_730_lock 2>/dev/null + +echo "" +echo "=== Cleaning shared memory ===" +rm -f /dev/shm/*steam* /dev/shm/*valve* /dev/shm/*source* 2>/dev/null +echo "Cleared /dev/shm" + +# System V shared memory +if [ "$FORCE" -eq 1 ]; then + SHMIDS=$(ipcs -m 2>/dev/null | grep "$(whoami)" | awk '{print $2}') + for shmid in $SHMIDS; do + ipcrm -m "$shmid" 2>/dev/null && echo "Removed shm segment $shmid" + done +fi + +echo "" +echo "=== Final check ===" +REMAINING=$(pgrep -f "cs2" 2>/dev/null) +if [ -n "$REMAINING" ]; then + echo "WARNING: CS2 processes still running:" + ps -p $(echo $REMAINING | tr ' ' ',') -o pid,ppid,state,cmd --no-headers + echo "" + echo "These may be unkillable zombies. Try logging out and back in," + echo "or reboot if necessary." + exit 1 +else + echo "All CS2 processes killed" +fi diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..9ff49b6 --- /dev/null +++ b/setup.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 +"""Setup script for cs2pov.""" + +from pathlib import Path +from setuptools import setup, find_packages + +# Read version from package +version = {} +exec(Path("cs2pov/__init__.py").read_text(), version) + +# Read requirements +requirements = Path("requirements.txt").read_text().strip().split("\n") + +setup( + name="cs2pov", + version=version["__version__"], + description="Record player POV from CS2 demo files", + author="", + python_requires=">=3.10", + packages=find_packages(), + install_requires=requirements, + entry_points={ + "console_scripts": [ + "cs2pov=cs2pov.cli:main", + ], + }, + classifiers=[ + "Development Status :: 3 - Alpha", + "Environment :: Console", + "Intended Audience :: End Users/Desktop", + "License :: OSI Approved :: MIT License", + "Operating System :: POSIX :: Linux", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Games/Entertainment :: First Person Shooters", + "Topic :: Multimedia :: Video :: Capture", + ], +) |
