diff options
| author | Schark <jordan@schark.online> | 2026-01-30 23:20:18 -0500 |
|---|---|---|
| committer | Schark <jordan@schark.online> | 2026-01-30 23:20:18 -0500 |
| commit | 84b5adccbd60fb7cdf73d014a2d631f23b7884e9 (patch) | |
| tree | ce8a1599be785b1a1aff686d29034598d1065ade | |
| parent | f98dd968715aa3113649588c42a2586d50df00fc (diff) | |
| download | cs2pov-84b5adccbd60fb7cdf73d014a2d631f23b7884e9.tar.gz cs2pov-84b5adccbd60fb7cdf73d014a2d631f23b7884e9.zip | |
Slight workarounds to shifted time issues
| -rw-r--r-- | cs2pov/cli.py | 260 | ||||
| -rw-r--r-- | cs2pov/preprocessor.py | 174 | ||||
| -rw-r--r-- | cs2pov/trim.py | 119 |
3 files changed, 417 insertions, 136 deletions
diff --git a/cs2pov/cli.py b/cs2pov/cli.py index efb9258..f21e368 100644 --- a/cs2pov/cli.py +++ b/cs2pov/cli.py @@ -25,7 +25,7 @@ from .exceptions import CS2POVError from .game import CS2Process, find_cs2_path, get_cfg_dir, get_demo_dir from .parser import DemoInfo, PlayerInfo, find_player, get_player_index, parse_demo from .preprocessor import DemoTimeline, preprocess_demo, get_trim_periods -from .trim import extract_death_periods, TrimPeriod, trim_video_with_periods +from .trim import extract_death_periods # ============================================================================= @@ -172,11 +172,11 @@ def record_demo( print("Preprocessing demo for timeline data...") try: timeline = preprocess_demo(demo_path, player.steamid, player.name) - print(f" Found {len(timeline.deaths)} deaths, {len(timeline.spawns)} spawns, " - f"{len(timeline.rounds)} rounds") - if verbose and timeline.death_periods: - total_dead = sum(p.duration_seconds for p in timeline.death_periods) - print(f" Total dead time: {total_dead:.1f}s across {len(timeline.death_periods)} death periods") + print(f" Found {len(timeline.deaths)} deaths, {len(timeline.rounds)} rounds") + print(f" {len(timeline.alive_segments)} alive segments to keep") + if verbose and timeline.alive_segments: + total_alive = sum(seg.duration_seconds for seg in timeline.alive_segments) + print(f" Total alive time: {total_alive:.1f}s") except Exception as e: print(f" Warning: Preprocessing failed: {e}") print(f" Will fall back to console.log parsing for trim") @@ -354,16 +354,20 @@ def postprocess_video( recording_start_time: float, verbose: bool = False, timeline: Optional[DemoTimeline] = None, + startup_time_override: Optional[float] = None, ) -> Path: - """Post-process a recorded video to trim death periods. + """Post-process a recorded video to keep only alive segments. - Process: + New simplified approach: 1. Calculate startup_time = video_duration - demo_duration - 2. Trim startup from beginning (aligns video with demo time 0) - 3. Convert death periods from ticks to seconds (demo time) - 4. Apply death period trims (times are now aligned with truncated video) + 2. Convert alive_segments from demo time to video time + 3. Extract and concatenate only the alive segments + + Args: + startup_time_override: If provided, use this value instead of calculating + startup_time. Useful when the automatic calculation is wrong. """ - from .trim import get_video_duration + from .trim import get_video_duration, extract_and_concat_segments if not video_path.exists(): print(f"Error: Video file not found: {video_path}") @@ -372,23 +376,22 @@ def postprocess_video( raw_size_mb = video_path.stat().st_size / (1024 * 1024) print(f"\nRaw recording: {video_path} ({raw_size_mb:.1f} MB)") - print("\nPost-processing: calculating trim periods...") + print("\nPost-processing: calculating segments to keep...") - trim_periods: list[TrimPeriod] = [] + video_segments: list[tuple[float, float]] = [] - # Prefer timeline data (need either spawns or rounds to calculate trims) - if timeline is not None and (timeline.spawns or timeline.rounds): - print(" Using preprocessed timeline data (demoparser2)") + # Prefer timeline data with alive_segments + if timeline is not None and timeline.alive_segments: + print(" Using alive segments from timeline (demoparser2)") if verbose: - print(f" Deaths: {len(timeline.deaths)}, Spawns: {len(timeline.spawns)}, Rounds: {len(timeline.rounds)}") - print(f" Death periods: {len(timeline.death_periods)}, Round end periods: {len(timeline.round_end_periods)}") + print(f" Alive segments: {len(timeline.alive_segments)}") + print(f" Deaths: {len(timeline.deaths)}, Rounds: {len(timeline.rounds)}") print(f" Tickrate: {timeline.tickrate}, Total ticks: {timeline.total_ticks}") - # Debug: show how many rounds have end_tick and freeze_end_tick - rounds_with_end = sum(1 for r in timeline.rounds if r.end_tick is not None) - rounds_with_freeze = sum(1 for r in timeline.rounds if r.freeze_end_tick is not None) - print(f" Rounds with end_tick: {rounds_with_end}, with freeze_end: {rounds_with_freeze}") + for i, seg in enumerate(timeline.alive_segments): + print(f" Segment {i+1}: R{seg.round_num} {seg.start_time:.2f}s - {seg.end_time:.2f}s " + f"({seg.duration_seconds:.2f}s, ended: {seg.reason_ended})") - # Step 1: Get video duration and demo duration + # Step 1: Get video duration try: video_duration = get_video_duration(video_path) except Exception as e: @@ -396,7 +399,7 @@ def postprocess_video( print(f"\nRecording saved: {video_path} ({raw_size_mb:.1f} MB)") return video_path - # Get demo duration - try multiple sources + # Step 2: Get demo duration for startup_time calculation demo_duration = 0.0 # Method 1: From console.log demo end marker (most accurate) @@ -412,18 +415,22 @@ def postprocess_video( if verbose: print(f" Demo duration source: demo header") - # Method 3: Calculate from last event in timeline - if demo_duration == 0.0 and timeline.tickrate > 0: - max_tick = 0 - for spawn in timeline.spawns: - max_tick = max(max_tick, spawn.tick) - for death in timeline.deaths: - max_tick = max(max_tick, death.tick) - if max_tick > 0: - # Add a buffer since last event isn't necessarily demo end - demo_duration = (max_tick / timeline.tickrate) + 60.0 - if verbose: - print(f" Demo duration source: last event tick + 60s buffer") + # Method 3: From last round end (more accurate than alive segments) + if demo_duration == 0.0 and timeline.rounds: + # Find the last round with an end_tick + for r in reversed(timeline.rounds): + if r.end_tick is not None: + demo_duration = r.end_time + 5.0 # Small buffer after round ends + if verbose: + print(f" Demo duration source: last round end + 5s") + break + + # Method 4: From last alive segment end (fallback) + if demo_duration == 0.0 and timeline.alive_segments: + last_segment = timeline.alive_segments[-1] + demo_duration = last_segment.end_time + 5.0 # Small buffer + if verbose: + print(f" Demo duration source: last alive segment + 5s") if verbose: print(f" Video duration: {video_duration:.2f}s") @@ -434,65 +441,38 @@ def postprocess_video( print(f"\nRecording saved: {video_path} ({raw_size_mb:.1f} MB)") return video_path - # Step 2: Calculate startup time (recording before demo started) - startup_time = video_duration - demo_duration - if startup_time < 0: - print(f" Warning: Negative startup time ({startup_time:.2f}s), using 0") - startup_time = 0.0 - - if verbose: - print(f" Startup time (to trim): {startup_time:.2f}s") - - # Step 3: Build trim periods - # First: trim from 0 to (startup_time + first_action_time) - # This removes the recording startup AND the demo freeze/warmup time - # Use first round's freeze_end - 5s as the cut-in point (consistent with death periods) - first_action_time = 0.0 - if timeline.rounds and timeline.rounds[0].freeze_end_time is not None: - first_action_time = timeline.rounds[0].freeze_end_time - 5.0 - first_action_time = max(0.0, first_action_time) # Don't go negative - elif timeline.spawns: - first_action_time = timeline.spawns[0].time_seconds - - initial_trim_end = startup_time + first_action_time - - if initial_trim_end > 0.5: # Only trim if > 0.5s - trim_periods.append(TrimPeriod(start_time=0.0, end_time=initial_trim_end)) + # Step 3: Calculate startup time (recording before demo started) + if startup_time_override is not None: + startup_time = startup_time_override if verbose: - print(f" Initial trim: 0.00s - {initial_trim_end:.2f}s (startup + pre-action)") - - # Step 4: Add death periods (convert demo ticks to video time) - # video_time = startup_time + demo_time - for death_period in timeline.death_periods: - death_video_time = startup_time + death_period.death.time_seconds - respawn_video_time = startup_time + death_period.spawn.time_seconds - - # Only include if after the initial trim - if respawn_video_time > initial_trim_end: - # Adjust start if it overlaps with initial trim - death_video_time = max(death_video_time, initial_trim_end) - trim_periods.append(TrimPeriod(start_time=death_video_time, end_time=respawn_video_time)) - if verbose: - duration = respawn_video_time - death_video_time - print(f" Death period: {death_video_time:.2f}s - {respawn_video_time:.2f}s ({duration:.2f}s)") - - # Step 5: Add round end periods (dead time between rounds when player survives) - for round_end_period in timeline.round_end_periods: - round_end_video_time = startup_time + round_end_period.round_end_time - next_round_video_time = startup_time + round_end_period.next_round_time - - # Only include if after the initial trim - if next_round_video_time > initial_trim_end: - round_end_video_time = max(round_end_video_time, initial_trim_end) - trim_periods.append(TrimPeriod(start_time=round_end_video_time, end_time=next_round_video_time)) + print(f" Startup time: {startup_time:.2f}s (manual override)") + else: + startup_time = video_duration - demo_duration + if startup_time < 0: + print(f" Warning: Negative startup time ({startup_time:.2f}s), using 0") + startup_time = 0.0 + if verbose: + print(f" Startup time: {startup_time:.2f}s (calculated)") + + # Step 4: Convert alive segments to video time + for seg in timeline.alive_segments: + video_start = startup_time + seg.start_time + video_end = startup_time + seg.end_time + + # Clamp to video bounds + video_start = max(0.0, video_start) + video_end = min(video_duration, video_end) + + # Only include if segment has meaningful duration + if video_end > video_start + 0.5: + video_segments.append((video_start, video_end)) if verbose: - duration = next_round_video_time - round_end_video_time - print(f" Round end period: {round_end_video_time:.2f}s - {next_round_video_time:.2f}s ({duration:.2f}s)") + print(f" Keep: {video_start:.2f}s - {video_end:.2f}s ({video_end - video_start:.2f}s)") - # Fall back to console.log parsing - if not trim_periods: + # Fall back to console.log parsing (legacy method) + if not video_segments: if timeline is not None: - print(" Timeline has no spawn data, falling back to console.log") + print(" No alive segments found, falling back to console.log") else: print(" Using console.log parsing (legacy method)") @@ -502,10 +482,11 @@ def postprocess_video( print(f" Recording start: {recording_start_time}") if not console_log_path.exists(): - print(f" Console log not found, skipping trim") + print(" Console log not found, skipping trim") print(f"\nRecording saved: {video_path} ({raw_size_mb:.1f} MB)") return video_path + # Use legacy death period extraction death_periods = extract_death_periods( log_path=console_log_path, player_slot=player_slot, @@ -513,23 +494,48 @@ def postprocess_video( verbose=verbose ) - for dp in death_periods: - trim_periods.append(TrimPeriod(start_time=dp.death_time, end_time=dp.respawn_time)) + if death_periods: + # Convert death periods to video segments (inverse) + try: + video_duration = get_video_duration(video_path) + except Exception as e: + print(f" Error getting video duration: {e}") + return video_path + + # Sort death periods and compute alive segments + death_periods_sorted = sorted(death_periods, key=lambda p: p.death_time) + current_pos = 0.0 + + for dp in death_periods_sorted: + if dp.death_time > current_pos: + video_segments.append((current_pos, dp.death_time)) + current_pos = max(current_pos, dp.respawn_time) + + # Add final segment + if current_pos < video_duration: + video_segments.append((current_pos, video_duration)) + + # Execute trimming + if video_segments: + total_keep_time = sum(end - start for start, end in video_segments) + try: + video_duration = get_video_duration(video_path) + total_trim_time = video_duration - total_keep_time + except: + total_trim_time = 0.0 - if trim_periods: - print(f" Found {len(trim_periods)} periods to trim") - total_trim_time = sum(p.duration for p in trim_periods) - print(f" Total trim time: {total_trim_time:.1f}s") + print(f" Found {len(video_segments)} segments to keep") + print(f" Total keep time: {total_keep_time:.1f}s, trim time: {total_trim_time:.1f}s") raw_path = video_path.parent / f"{video_path.stem}_raw{video_path.suffix}" video_path.rename(raw_path) print(f" Raw recording moved to: {raw_path.name}") - print(" Trimming periods from video...") - success = trim_video_with_periods( + print(" Extracting and concatenating segments...") + success = extract_and_concat_segments( input_path=raw_path, output_path=video_path, - trim_periods=trim_periods, + segments=video_segments, verbose=verbose ) @@ -537,11 +543,11 @@ def postprocess_video( final_size_mb = video_path.stat().st_size / (1024 * 1024) print(f"\nFinal recording: {video_path} ({final_size_mb:.1f} MB)") else: - print(f"\nTrimming failed, restoring raw recording") + print("\nTrimming failed, restoring raw recording") if not video_path.exists() and raw_path.exists(): raw_path.rename(video_path) else: - print(" No periods to trim, keeping original") + print(" No segments to extract, keeping original") print(f"\nRecording saved: {video_path} ({raw_size_mb:.1f} MB)") return video_path @@ -599,17 +605,15 @@ def print_demo_info_extended( if timeline: death_count = len(timeline.deaths) - spawn_count = len(timeline.spawns) - dead_time = sum(p.duration_seconds for p in timeline.death_periods) - print(f" Deaths: {death_count}, Spawns: {spawn_count}, Dead time: {dead_time:.1f}s") + alive_time = sum(seg.duration_seconds for seg in timeline.alive_segments) + print(f" Deaths: {death_count}, Alive segments: {len(timeline.alive_segments)}, " + f"Alive time: {alive_time:.1f}s") - if verbose and timeline.death_periods: - print(f" Death periods:") - for i, period in enumerate(timeline.death_periods, 1): - weapon = period.death.weapon or "unknown" - hs = " (headshot)" if period.death.headshot else "" - print(f" {i:2}. {period.death.time_seconds:7.1f}s - {period.spawn.time_seconds:7.1f}s " - f"({period.duration_seconds:5.1f}s) [{weapon}{hs}]") + if verbose and timeline.alive_segments: + print(f" Alive segments:") + for i, seg in enumerate(timeline.alive_segments, 1): + print(f" {i:2}. R{seg.round_num:2} {seg.start_time:7.1f}s - {seg.end_time:7.1f}s " + f"({seg.duration_seconds:5.1f}s) [{seg.reason_ended}]") def format_info_json( @@ -651,19 +655,18 @@ def format_info_json( if timeline: player_data["deaths"] = len(timeline.deaths) - player_data["spawns"] = len(timeline.spawns) - player_data["dead_time_seconds"] = sum(p.duration_seconds for p in timeline.death_periods) - player_data["death_periods"] = [ + player_data["alive_time_seconds"] = sum(seg.duration_seconds for seg in timeline.alive_segments) + player_data["alive_segments"] = [ { - "death_tick": p.death.tick, - "death_time": p.death.time_seconds, - "spawn_tick": p.spawn.tick, - "spawn_time": p.spawn.time_seconds, - "duration_seconds": p.duration_seconds, - "weapon": p.death.weapon, - "headshot": p.death.headshot, + "round_num": seg.round_num, + "start_tick": seg.start_tick, + "start_time": seg.start_time, + "end_tick": seg.end_tick, + "end_time": seg.end_time, + "duration_seconds": seg.duration_seconds, + "reason_ended": seg.reason_ended, } - for p in timeline.death_periods + for seg in timeline.alive_segments ] players_data.append(player_data) @@ -780,6 +783,8 @@ Examples: help="Player slot, 0-based (fallback)") trim_parser.add_argument("--recording-start-time", type=float, help="Recording start timestamp (fallback)") + trim_parser.add_argument("--startup-time", type=float, + help="Override startup time (seconds from video start to demo start)") return parser @@ -965,8 +970,8 @@ def cmd_trim(args) -> int: timeline = None try: timeline = preprocess_demo(demo_path, player.steamid, player.name) - print(f"Using demo timeline: {len(timeline.deaths)} deaths, {len(timeline.spawns)} spawns, {len(timeline.rounds)} rounds") - print(f" {len(timeline.death_periods)} death periods, {len(timeline.round_end_periods)} round end periods") + print(f"Using demo timeline: {len(timeline.deaths)} deaths, {len(timeline.rounds)} rounds") + print(f" {len(timeline.alive_segments)} alive segments to keep") except Exception as e: print(f"Warning: Could not preprocess demo: {e}") print("Falling back to console.log method") @@ -996,6 +1001,7 @@ def cmd_trim(args) -> int: recording_start_time=args.recording_start_time or 0.0, verbose=args.verbose, timeline=timeline, + startup_time_override=args.startup_time, ) return 0 diff --git a/cs2pov/preprocessor.py b/cs2pov/preprocessor.py index 445f2a4..e7077b4 100644 --- a/cs2pov/preprocessor.py +++ b/cs2pov/preprocessor.py @@ -77,6 +77,24 @@ class RoundEndPeriod: @dataclass +class AliveSegment: + """A period when the player is alive and gameplay should be shown. + + Times are demo-relative (tick 0 = demo start). + """ + start_tick: int + end_tick: int + start_time: float # seconds from demo start + end_time: float # seconds from demo start + round_num: int # 0 for pre-round segment + reason_ended: str # "death", "round_end", "demo_end" + + @property + def duration_seconds(self) -> float: + return self.end_time - self.start_time + + +@dataclass class DemoTimeline: """Complete timeline data for a player in a demo.""" @@ -88,8 +106,11 @@ class DemoTimeline: deaths: list[DeathEvent] = field(default_factory=list) spawns: list[SpawnEvent] = field(default_factory=list) - death_periods: list[DeathPeriod] = field(default_factory=list) rounds: list[RoundBoundary] = field(default_factory=list) + alive_segments: list[AliveSegment] = field(default_factory=list) + + # Deprecated - kept for backward compatibility during transition + death_periods: list[DeathPeriod] = field(default_factory=list) round_end_periods: list[RoundEndPeriod] = field(default_factory=list) @@ -134,16 +155,21 @@ def preprocess_demo(demo_path: Path, player_steamid: int, player_name: str = "") # Extract round boundaries rounds = _extract_round_boundaries(parser, tickrate) - # Compute death periods using rounds (death -> 5s before next round freeze_end) - # This is more reliable than spawn events - death_periods = compute_death_periods_from_rounds(deaths, rounds, tickrate, buffer_seconds=5.0) + # Compute alive segments (new unified approach) + # This replaces the separate death_periods and round_end_periods logic + alive_segments = compute_alive_segments( + deaths=deaths, + rounds=rounds, + tickrate=tickrate, + total_ticks=total_ticks, + buffer_before_round=5.0, + buffer_after_round_end=2.0, + ) - # Fallback to spawn-based if rounds didn't work + # Deprecated: still compute these for backward compatibility + death_periods = compute_death_periods_from_rounds(deaths, rounds, tickrate, buffer_seconds=5.0) if not death_periods and deaths and spawns: death_periods = compute_death_periods(deaths, spawns) - - # Compute round end periods (round_end -> 5s before next round freeze_end) - # This trims dead time between rounds when the player survives round_end_periods = compute_round_end_periods(rounds, tickrate, buffer_before_next=5.0, buffer_after_end=2.0) return DemoTimeline( @@ -154,8 +180,10 @@ def preprocess_demo(demo_path: Path, player_steamid: int, player_name: str = "") total_duration=total_duration, deaths=deaths, spawns=spawns, - death_periods=death_periods, rounds=rounds, + alive_segments=alive_segments, + # Deprecated fields (kept for backward compatibility) + death_periods=death_periods, round_end_periods=round_end_periods, ) @@ -462,6 +490,134 @@ def compute_round_end_periods( return round_end_periods +def compute_alive_segments( + deaths: list[DeathEvent], + rounds: list[RoundBoundary], + tickrate: float, + total_ticks: int, + buffer_before_round: float = 5.0, + buffer_after_round_end: float = 2.0, +) -> list[AliveSegment]: + """Compute alive segments from deaths and round boundaries. + + An "alive segment" is a period when the player is alive and actively + in gameplay. This replaces the separate death_periods and round_end_periods + with a unified approach. + + For each round: + - Segment starts at (freeze_end - buffer_before_round) + - Segment ends at (player death) OR (round_end + buffer_after_round_end), + whichever comes first + + This naturally handles both death and survival cases. + + Args: + deaths: List of death events, sorted by tick + rounds: List of round boundaries, sorted by round_num + tickrate: Demo tickrate for time calculations + total_ticks: Total demo ticks (for last segment) + buffer_before_round: Seconds before freeze_end to start segment (default 5s) + buffer_after_round_end: Seconds after round_end to show (default 2s) + + Returns: + List of AliveSegment objects, sorted by start_tick + """ + if not rounds: + return [] + + alive_segments = [] + buffer_before_ticks = int(buffer_before_round * tickrate) + buffer_after_ticks = int(buffer_after_round_end * tickrate) + + # Create a set of death ticks for quick lookup + death_ticks = {d.tick: d for d in deaths} + + for i, r in enumerate(rounds): + # Determine segment start: freeze_end - buffer (or prestart if no freeze_end) + if r.freeze_end_tick is not None: + segment_start_tick = r.freeze_end_tick - buffer_before_ticks + else: + # Fallback: use prestart + estimated freeze time (~15s) + segment_start_tick = r.prestart_tick + int(15.0 * tickrate) - buffer_before_ticks + + segment_start_tick = max(0, segment_start_tick) # Clamp to demo start + + # Determine round boundaries for finding deaths + round_start_tick = r.freeze_end_tick if r.freeze_end_tick else r.prestart_tick + round_end_tick = r.end_tick + + # If no round end, use next round's prestart or total_ticks + if round_end_tick is None: + if i + 1 < len(rounds): + round_end_tick = rounds[i + 1].prestart_tick + else: + round_end_tick = total_ticks + + # Find first death in this round (between freeze_end and round_end) + death_in_round = None + for death in deaths: + if round_start_tick <= death.tick < round_end_tick: + death_in_round = death + break + + # Determine segment end + if death_in_round: + segment_end_tick = death_in_round.tick + reason = "death" + elif r.end_tick is not None: + segment_end_tick = r.end_tick + buffer_after_ticks + reason = "round_end" + else: + segment_end_tick = round_end_tick + reason = "demo_end" + + # Create segment if valid (end > start) + if segment_end_tick > segment_start_tick: + alive_segments.append(AliveSegment( + start_tick=segment_start_tick, + end_tick=segment_end_tick, + start_time=segment_start_tick / tickrate, + end_time=segment_end_tick / tickrate, + round_num=r.round_num, + reason_ended=reason, + )) + + # Merge overlapping segments + return _merge_alive_segments(alive_segments) + + +def _merge_alive_segments(segments: list[AliveSegment]) -> list[AliveSegment]: + """Merge overlapping or adjacent alive segments.""" + if not segments: + return [] + + # Sort by start tick + sorted_segments = sorted(segments, key=lambda s: s.start_tick) + merged = [sorted_segments[0]] + + for current in sorted_segments[1:]: + prev = merged[-1] + + # Check if current overlaps or is adjacent to previous + if current.start_tick <= prev.end_tick: + # Extend previous segment if current ends later + if current.end_tick > prev.end_tick: + # Create new merged segment + merged[-1] = AliveSegment( + start_tick=prev.start_tick, + end_tick=current.end_tick, + start_time=prev.start_time, + end_time=current.end_time, + round_num=prev.round_num, # Keep first round's number + reason_ended=current.reason_ended, # Use later reason + ) + else: + # No overlap, add as new segment + merged.append(current) + + return merged + + def get_trim_periods(timeline: DemoTimeline) -> list[tuple[float, float]]: """Get periods to trim from video based on timeline data. diff --git a/cs2pov/trim.py b/cs2pov/trim.py index 82e33d5..534dae6 100644 --- a/cs2pov/trim.py +++ b/cs2pov/trim.py @@ -425,3 +425,122 @@ def trim_video_with_periods( ] return trim_death_periods(input_path, output_path, death_periods, verbose) + + +def extract_and_concat_segments( + input_path: Path, + output_path: Path, + segments: list[tuple[float, float]], + verbose: bool = False, +) -> bool: + """Extract and concatenate video segments using FFmpeg. + + This is a simplified interface that takes the segments to KEEP directly, + rather than computing them from periods to remove. + + Args: + input_path: Path to input video + output_path: Path for output video + segments: List of (start, end) tuples in video time (seconds) + verbose: Print debug output + + Returns: + True if trimming was performed, False on failure + """ + if not segments: + if verbose: + print(" [Trim] No segments to extract") + return False + + # Sort segments by start time + segments = sorted(segments, key=lambda s: s[0]) + + if verbose: + total_duration = sum(end - start for start, end in segments) + print(f" [Trim] Extracting {len(segments)} segments, total duration: {total_duration:.2f}s") + + # Create concat file and segment files in temp directory + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir_path = Path(tmpdir) + concat_file = tmpdir_path / "concat.txt" + segment_paths: list[Path] = [] + + # Extract each segment + for i, (start, end) in enumerate(segments): + segment_path = tmpdir_path / f"segment_{i:03d}.mp4" + segment_paths.append(segment_path) + + duration = end - start + if verbose: + print(f" [Trim] Extracting segment {i}: {start:.2f}s - {end:.2f}s ({duration:.2f}s)") + + cmd = [ + "ffmpeg", + "-y", + "-ss", str(start), + "-i", str(input_path), + "-t", str(duration), + "-c", "copy", # No re-encoding + "-avoid_negative_ts", "make_zero", + str(segment_path) + ] + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=300 # 5 min per segment + ) + if result.returncode != 0: + if verbose: + print(f" [Trim] Segment extraction failed: {result.stderr}") + return False + except subprocess.TimeoutExpired: + if verbose: + print(" [Trim] Segment extraction timed out") + return False + + # Create concat file + with open(concat_file, 'w') as f: + for segment_path in segment_paths: + # Escape single quotes in path + escaped_path = str(segment_path).replace("'", "'\\''") + f.write(f"file '{escaped_path}'\n") + + if verbose: + print(f" [Trim] Concatenating {len(segment_paths)} segments") + + # Concatenate segments + cmd = [ + "ffmpeg", + "-y", + "-f", "concat", + "-safe", "0", + "-i", str(concat_file), + "-c", "copy", + str(output_path) + ] + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=600 # 10 min for concat + ) + if result.returncode != 0: + if verbose: + print(f" [Trim] Concatenation failed: {result.stderr}") + return False + except subprocess.TimeoutExpired: + if verbose: + print(" [Trim] Concatenation timed out") + return False + + if verbose: + if output_path.exists(): + output_duration = get_video_duration(output_path) + print(f" [Trim] Output video: {output_duration:.2f}s") + + return True |
