summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--cs2pov/cli.py260
-rw-r--r--cs2pov/preprocessor.py174
-rw-r--r--cs2pov/trim.py119
3 files changed, 417 insertions, 136 deletions
diff --git a/cs2pov/cli.py b/cs2pov/cli.py
index efb9258..f21e368 100644
--- a/cs2pov/cli.py
+++ b/cs2pov/cli.py
@@ -25,7 +25,7 @@ from .exceptions import CS2POVError
from .game import CS2Process, find_cs2_path, get_cfg_dir, get_demo_dir
from .parser import DemoInfo, PlayerInfo, find_player, get_player_index, parse_demo
from .preprocessor import DemoTimeline, preprocess_demo, get_trim_periods
-from .trim import extract_death_periods, TrimPeriod, trim_video_with_periods
+from .trim import extract_death_periods
# =============================================================================
@@ -172,11 +172,11 @@ def record_demo(
print("Preprocessing demo for timeline data...")
try:
timeline = preprocess_demo(demo_path, player.steamid, player.name)
- print(f" Found {len(timeline.deaths)} deaths, {len(timeline.spawns)} spawns, "
- f"{len(timeline.rounds)} rounds")
- if verbose and timeline.death_periods:
- total_dead = sum(p.duration_seconds for p in timeline.death_periods)
- print(f" Total dead time: {total_dead:.1f}s across {len(timeline.death_periods)} death periods")
+ print(f" Found {len(timeline.deaths)} deaths, {len(timeline.rounds)} rounds")
+ print(f" {len(timeline.alive_segments)} alive segments to keep")
+ if verbose and timeline.alive_segments:
+ total_alive = sum(seg.duration_seconds for seg in timeline.alive_segments)
+ print(f" Total alive time: {total_alive:.1f}s")
except Exception as e:
print(f" Warning: Preprocessing failed: {e}")
print(f" Will fall back to console.log parsing for trim")
@@ -354,16 +354,20 @@ def postprocess_video(
recording_start_time: float,
verbose: bool = False,
timeline: Optional[DemoTimeline] = None,
+ startup_time_override: Optional[float] = None,
) -> Path:
- """Post-process a recorded video to trim death periods.
+ """Post-process a recorded video to keep only alive segments.
- Process:
+ New simplified approach:
1. Calculate startup_time = video_duration - demo_duration
- 2. Trim startup from beginning (aligns video with demo time 0)
- 3. Convert death periods from ticks to seconds (demo time)
- 4. Apply death period trims (times are now aligned with truncated video)
+ 2. Convert alive_segments from demo time to video time
+ 3. Extract and concatenate only the alive segments
+
+ Args:
+ startup_time_override: If provided, use this value instead of calculating
+ startup_time. Useful when the automatic calculation is wrong.
"""
- from .trim import get_video_duration
+ from .trim import get_video_duration, extract_and_concat_segments
if not video_path.exists():
print(f"Error: Video file not found: {video_path}")
@@ -372,23 +376,22 @@ def postprocess_video(
raw_size_mb = video_path.stat().st_size / (1024 * 1024)
print(f"\nRaw recording: {video_path} ({raw_size_mb:.1f} MB)")
- print("\nPost-processing: calculating trim periods...")
+ print("\nPost-processing: calculating segments to keep...")
- trim_periods: list[TrimPeriod] = []
+ video_segments: list[tuple[float, float]] = []
- # Prefer timeline data (need either spawns or rounds to calculate trims)
- if timeline is not None and (timeline.spawns or timeline.rounds):
- print(" Using preprocessed timeline data (demoparser2)")
+ # Prefer timeline data with alive_segments
+ if timeline is not None and timeline.alive_segments:
+ print(" Using alive segments from timeline (demoparser2)")
if verbose:
- print(f" Deaths: {len(timeline.deaths)}, Spawns: {len(timeline.spawns)}, Rounds: {len(timeline.rounds)}")
- print(f" Death periods: {len(timeline.death_periods)}, Round end periods: {len(timeline.round_end_periods)}")
+ print(f" Alive segments: {len(timeline.alive_segments)}")
+ print(f" Deaths: {len(timeline.deaths)}, Rounds: {len(timeline.rounds)}")
print(f" Tickrate: {timeline.tickrate}, Total ticks: {timeline.total_ticks}")
- # Debug: show how many rounds have end_tick and freeze_end_tick
- rounds_with_end = sum(1 for r in timeline.rounds if r.end_tick is not None)
- rounds_with_freeze = sum(1 for r in timeline.rounds if r.freeze_end_tick is not None)
- print(f" Rounds with end_tick: {rounds_with_end}, with freeze_end: {rounds_with_freeze}")
+ for i, seg in enumerate(timeline.alive_segments):
+ print(f" Segment {i+1}: R{seg.round_num} {seg.start_time:.2f}s - {seg.end_time:.2f}s "
+ f"({seg.duration_seconds:.2f}s, ended: {seg.reason_ended})")
- # Step 1: Get video duration and demo duration
+ # Step 1: Get video duration
try:
video_duration = get_video_duration(video_path)
except Exception as e:
@@ -396,7 +399,7 @@ def postprocess_video(
print(f"\nRecording saved: {video_path} ({raw_size_mb:.1f} MB)")
return video_path
- # Get demo duration - try multiple sources
+ # Step 2: Get demo duration for startup_time calculation
demo_duration = 0.0
# Method 1: From console.log demo end marker (most accurate)
@@ -412,18 +415,22 @@ def postprocess_video(
if verbose:
print(f" Demo duration source: demo header")
- # Method 3: Calculate from last event in timeline
- if demo_duration == 0.0 and timeline.tickrate > 0:
- max_tick = 0
- for spawn in timeline.spawns:
- max_tick = max(max_tick, spawn.tick)
- for death in timeline.deaths:
- max_tick = max(max_tick, death.tick)
- if max_tick > 0:
- # Add a buffer since last event isn't necessarily demo end
- demo_duration = (max_tick / timeline.tickrate) + 60.0
- if verbose:
- print(f" Demo duration source: last event tick + 60s buffer")
+ # Method 3: From last round end (more accurate than alive segments)
+ if demo_duration == 0.0 and timeline.rounds:
+ # Find the last round with an end_tick
+ for r in reversed(timeline.rounds):
+ if r.end_tick is not None:
+ demo_duration = r.end_time + 5.0 # Small buffer after round ends
+ if verbose:
+ print(f" Demo duration source: last round end + 5s")
+ break
+
+ # Method 4: From last alive segment end (fallback)
+ if demo_duration == 0.0 and timeline.alive_segments:
+ last_segment = timeline.alive_segments[-1]
+ demo_duration = last_segment.end_time + 5.0 # Small buffer
+ if verbose:
+ print(f" Demo duration source: last alive segment + 5s")
if verbose:
print(f" Video duration: {video_duration:.2f}s")
@@ -434,65 +441,38 @@ def postprocess_video(
print(f"\nRecording saved: {video_path} ({raw_size_mb:.1f} MB)")
return video_path
- # Step 2: Calculate startup time (recording before demo started)
- startup_time = video_duration - demo_duration
- if startup_time < 0:
- print(f" Warning: Negative startup time ({startup_time:.2f}s), using 0")
- startup_time = 0.0
-
- if verbose:
- print(f" Startup time (to trim): {startup_time:.2f}s")
-
- # Step 3: Build trim periods
- # First: trim from 0 to (startup_time + first_action_time)
- # This removes the recording startup AND the demo freeze/warmup time
- # Use first round's freeze_end - 5s as the cut-in point (consistent with death periods)
- first_action_time = 0.0
- if timeline.rounds and timeline.rounds[0].freeze_end_time is not None:
- first_action_time = timeline.rounds[0].freeze_end_time - 5.0
- first_action_time = max(0.0, first_action_time) # Don't go negative
- elif timeline.spawns:
- first_action_time = timeline.spawns[0].time_seconds
-
- initial_trim_end = startup_time + first_action_time
-
- if initial_trim_end > 0.5: # Only trim if > 0.5s
- trim_periods.append(TrimPeriod(start_time=0.0, end_time=initial_trim_end))
+ # Step 3: Calculate startup time (recording before demo started)
+ if startup_time_override is not None:
+ startup_time = startup_time_override
if verbose:
- print(f" Initial trim: 0.00s - {initial_trim_end:.2f}s (startup + pre-action)")
-
- # Step 4: Add death periods (convert demo ticks to video time)
- # video_time = startup_time + demo_time
- for death_period in timeline.death_periods:
- death_video_time = startup_time + death_period.death.time_seconds
- respawn_video_time = startup_time + death_period.spawn.time_seconds
-
- # Only include if after the initial trim
- if respawn_video_time > initial_trim_end:
- # Adjust start if it overlaps with initial trim
- death_video_time = max(death_video_time, initial_trim_end)
- trim_periods.append(TrimPeriod(start_time=death_video_time, end_time=respawn_video_time))
- if verbose:
- duration = respawn_video_time - death_video_time
- print(f" Death period: {death_video_time:.2f}s - {respawn_video_time:.2f}s ({duration:.2f}s)")
-
- # Step 5: Add round end periods (dead time between rounds when player survives)
- for round_end_period in timeline.round_end_periods:
- round_end_video_time = startup_time + round_end_period.round_end_time
- next_round_video_time = startup_time + round_end_period.next_round_time
-
- # Only include if after the initial trim
- if next_round_video_time > initial_trim_end:
- round_end_video_time = max(round_end_video_time, initial_trim_end)
- trim_periods.append(TrimPeriod(start_time=round_end_video_time, end_time=next_round_video_time))
+ print(f" Startup time: {startup_time:.2f}s (manual override)")
+ else:
+ startup_time = video_duration - demo_duration
+ if startup_time < 0:
+ print(f" Warning: Negative startup time ({startup_time:.2f}s), using 0")
+ startup_time = 0.0
+ if verbose:
+ print(f" Startup time: {startup_time:.2f}s (calculated)")
+
+ # Step 4: Convert alive segments to video time
+ for seg in timeline.alive_segments:
+ video_start = startup_time + seg.start_time
+ video_end = startup_time + seg.end_time
+
+ # Clamp to video bounds
+ video_start = max(0.0, video_start)
+ video_end = min(video_duration, video_end)
+
+ # Only include if segment has meaningful duration
+ if video_end > video_start + 0.5:
+ video_segments.append((video_start, video_end))
if verbose:
- duration = next_round_video_time - round_end_video_time
- print(f" Round end period: {round_end_video_time:.2f}s - {next_round_video_time:.2f}s ({duration:.2f}s)")
+ print(f" Keep: {video_start:.2f}s - {video_end:.2f}s ({video_end - video_start:.2f}s)")
- # Fall back to console.log parsing
- if not trim_periods:
+ # Fall back to console.log parsing (legacy method)
+ if not video_segments:
if timeline is not None:
- print(" Timeline has no spawn data, falling back to console.log")
+ print(" No alive segments found, falling back to console.log")
else:
print(" Using console.log parsing (legacy method)")
@@ -502,10 +482,11 @@ def postprocess_video(
print(f" Recording start: {recording_start_time}")
if not console_log_path.exists():
- print(f" Console log not found, skipping trim")
+ print(" Console log not found, skipping trim")
print(f"\nRecording saved: {video_path} ({raw_size_mb:.1f} MB)")
return video_path
+ # Use legacy death period extraction
death_periods = extract_death_periods(
log_path=console_log_path,
player_slot=player_slot,
@@ -513,23 +494,48 @@ def postprocess_video(
verbose=verbose
)
- for dp in death_periods:
- trim_periods.append(TrimPeriod(start_time=dp.death_time, end_time=dp.respawn_time))
+ if death_periods:
+ # Convert death periods to video segments (inverse)
+ try:
+ video_duration = get_video_duration(video_path)
+ except Exception as e:
+ print(f" Error getting video duration: {e}")
+ return video_path
+
+ # Sort death periods and compute alive segments
+ death_periods_sorted = sorted(death_periods, key=lambda p: p.death_time)
+ current_pos = 0.0
+
+ for dp in death_periods_sorted:
+ if dp.death_time > current_pos:
+ video_segments.append((current_pos, dp.death_time))
+ current_pos = max(current_pos, dp.respawn_time)
+
+ # Add final segment
+ if current_pos < video_duration:
+ video_segments.append((current_pos, video_duration))
+
+ # Execute trimming
+ if video_segments:
+ total_keep_time = sum(end - start for start, end in video_segments)
+ try:
+ video_duration = get_video_duration(video_path)
+ total_trim_time = video_duration - total_keep_time
+ except:
+ total_trim_time = 0.0
- if trim_periods:
- print(f" Found {len(trim_periods)} periods to trim")
- total_trim_time = sum(p.duration for p in trim_periods)
- print(f" Total trim time: {total_trim_time:.1f}s")
+ print(f" Found {len(video_segments)} segments to keep")
+ print(f" Total keep time: {total_keep_time:.1f}s, trim time: {total_trim_time:.1f}s")
raw_path = video_path.parent / f"{video_path.stem}_raw{video_path.suffix}"
video_path.rename(raw_path)
print(f" Raw recording moved to: {raw_path.name}")
- print(" Trimming periods from video...")
- success = trim_video_with_periods(
+ print(" Extracting and concatenating segments...")
+ success = extract_and_concat_segments(
input_path=raw_path,
output_path=video_path,
- trim_periods=trim_periods,
+ segments=video_segments,
verbose=verbose
)
@@ -537,11 +543,11 @@ def postprocess_video(
final_size_mb = video_path.stat().st_size / (1024 * 1024)
print(f"\nFinal recording: {video_path} ({final_size_mb:.1f} MB)")
else:
- print(f"\nTrimming failed, restoring raw recording")
+ print("\nTrimming failed, restoring raw recording")
if not video_path.exists() and raw_path.exists():
raw_path.rename(video_path)
else:
- print(" No periods to trim, keeping original")
+ print(" No segments to extract, keeping original")
print(f"\nRecording saved: {video_path} ({raw_size_mb:.1f} MB)")
return video_path
@@ -599,17 +605,15 @@ def print_demo_info_extended(
if timeline:
death_count = len(timeline.deaths)
- spawn_count = len(timeline.spawns)
- dead_time = sum(p.duration_seconds for p in timeline.death_periods)
- print(f" Deaths: {death_count}, Spawns: {spawn_count}, Dead time: {dead_time:.1f}s")
+ alive_time = sum(seg.duration_seconds for seg in timeline.alive_segments)
+ print(f" Deaths: {death_count}, Alive segments: {len(timeline.alive_segments)}, "
+ f"Alive time: {alive_time:.1f}s")
- if verbose and timeline.death_periods:
- print(f" Death periods:")
- for i, period in enumerate(timeline.death_periods, 1):
- weapon = period.death.weapon or "unknown"
- hs = " (headshot)" if period.death.headshot else ""
- print(f" {i:2}. {period.death.time_seconds:7.1f}s - {period.spawn.time_seconds:7.1f}s "
- f"({period.duration_seconds:5.1f}s) [{weapon}{hs}]")
+ if verbose and timeline.alive_segments:
+ print(f" Alive segments:")
+ for i, seg in enumerate(timeline.alive_segments, 1):
+ print(f" {i:2}. R{seg.round_num:2} {seg.start_time:7.1f}s - {seg.end_time:7.1f}s "
+ f"({seg.duration_seconds:5.1f}s) [{seg.reason_ended}]")
def format_info_json(
@@ -651,19 +655,18 @@ def format_info_json(
if timeline:
player_data["deaths"] = len(timeline.deaths)
- player_data["spawns"] = len(timeline.spawns)
- player_data["dead_time_seconds"] = sum(p.duration_seconds for p in timeline.death_periods)
- player_data["death_periods"] = [
+ player_data["alive_time_seconds"] = sum(seg.duration_seconds for seg in timeline.alive_segments)
+ player_data["alive_segments"] = [
{
- "death_tick": p.death.tick,
- "death_time": p.death.time_seconds,
- "spawn_tick": p.spawn.tick,
- "spawn_time": p.spawn.time_seconds,
- "duration_seconds": p.duration_seconds,
- "weapon": p.death.weapon,
- "headshot": p.death.headshot,
+ "round_num": seg.round_num,
+ "start_tick": seg.start_tick,
+ "start_time": seg.start_time,
+ "end_tick": seg.end_tick,
+ "end_time": seg.end_time,
+ "duration_seconds": seg.duration_seconds,
+ "reason_ended": seg.reason_ended,
}
- for p in timeline.death_periods
+ for seg in timeline.alive_segments
]
players_data.append(player_data)
@@ -780,6 +783,8 @@ Examples:
help="Player slot, 0-based (fallback)")
trim_parser.add_argument("--recording-start-time", type=float,
help="Recording start timestamp (fallback)")
+ trim_parser.add_argument("--startup-time", type=float,
+ help="Override startup time (seconds from video start to demo start)")
return parser
@@ -965,8 +970,8 @@ def cmd_trim(args) -> int:
timeline = None
try:
timeline = preprocess_demo(demo_path, player.steamid, player.name)
- print(f"Using demo timeline: {len(timeline.deaths)} deaths, {len(timeline.spawns)} spawns, {len(timeline.rounds)} rounds")
- print(f" {len(timeline.death_periods)} death periods, {len(timeline.round_end_periods)} round end periods")
+ print(f"Using demo timeline: {len(timeline.deaths)} deaths, {len(timeline.rounds)} rounds")
+ print(f" {len(timeline.alive_segments)} alive segments to keep")
except Exception as e:
print(f"Warning: Could not preprocess demo: {e}")
print("Falling back to console.log method")
@@ -996,6 +1001,7 @@ def cmd_trim(args) -> int:
recording_start_time=args.recording_start_time or 0.0,
verbose=args.verbose,
timeline=timeline,
+ startup_time_override=args.startup_time,
)
return 0
diff --git a/cs2pov/preprocessor.py b/cs2pov/preprocessor.py
index 445f2a4..e7077b4 100644
--- a/cs2pov/preprocessor.py
+++ b/cs2pov/preprocessor.py
@@ -77,6 +77,24 @@ class RoundEndPeriod:
@dataclass
+class AliveSegment:
+ """A period when the player is alive and gameplay should be shown.
+
+ Times are demo-relative (tick 0 = demo start).
+ """
+ start_tick: int
+ end_tick: int
+ start_time: float # seconds from demo start
+ end_time: float # seconds from demo start
+ round_num: int # 0 for pre-round segment
+ reason_ended: str # "death", "round_end", "demo_end"
+
+ @property
+ def duration_seconds(self) -> float:
+ return self.end_time - self.start_time
+
+
+@dataclass
class DemoTimeline:
"""Complete timeline data for a player in a demo."""
@@ -88,8 +106,11 @@ class DemoTimeline:
deaths: list[DeathEvent] = field(default_factory=list)
spawns: list[SpawnEvent] = field(default_factory=list)
- death_periods: list[DeathPeriod] = field(default_factory=list)
rounds: list[RoundBoundary] = field(default_factory=list)
+ alive_segments: list[AliveSegment] = field(default_factory=list)
+
+ # Deprecated - kept for backward compatibility during transition
+ death_periods: list[DeathPeriod] = field(default_factory=list)
round_end_periods: list[RoundEndPeriod] = field(default_factory=list)
@@ -134,16 +155,21 @@ def preprocess_demo(demo_path: Path, player_steamid: int, player_name: str = "")
# Extract round boundaries
rounds = _extract_round_boundaries(parser, tickrate)
- # Compute death periods using rounds (death -> 5s before next round freeze_end)
- # This is more reliable than spawn events
- death_periods = compute_death_periods_from_rounds(deaths, rounds, tickrate, buffer_seconds=5.0)
+ # Compute alive segments (new unified approach)
+ # This replaces the separate death_periods and round_end_periods logic
+ alive_segments = compute_alive_segments(
+ deaths=deaths,
+ rounds=rounds,
+ tickrate=tickrate,
+ total_ticks=total_ticks,
+ buffer_before_round=5.0,
+ buffer_after_round_end=2.0,
+ )
- # Fallback to spawn-based if rounds didn't work
+ # Deprecated: still compute these for backward compatibility
+ death_periods = compute_death_periods_from_rounds(deaths, rounds, tickrate, buffer_seconds=5.0)
if not death_periods and deaths and spawns:
death_periods = compute_death_periods(deaths, spawns)
-
- # Compute round end periods (round_end -> 5s before next round freeze_end)
- # This trims dead time between rounds when the player survives
round_end_periods = compute_round_end_periods(rounds, tickrate, buffer_before_next=5.0, buffer_after_end=2.0)
return DemoTimeline(
@@ -154,8 +180,10 @@ def preprocess_demo(demo_path: Path, player_steamid: int, player_name: str = "")
total_duration=total_duration,
deaths=deaths,
spawns=spawns,
- death_periods=death_periods,
rounds=rounds,
+ alive_segments=alive_segments,
+ # Deprecated fields (kept for backward compatibility)
+ death_periods=death_periods,
round_end_periods=round_end_periods,
)
@@ -462,6 +490,134 @@ def compute_round_end_periods(
return round_end_periods
+def compute_alive_segments(
+ deaths: list[DeathEvent],
+ rounds: list[RoundBoundary],
+ tickrate: float,
+ total_ticks: int,
+ buffer_before_round: float = 5.0,
+ buffer_after_round_end: float = 2.0,
+) -> list[AliveSegment]:
+ """Compute alive segments from deaths and round boundaries.
+
+ An "alive segment" is a period when the player is alive and actively
+ in gameplay. This replaces the separate death_periods and round_end_periods
+ with a unified approach.
+
+ For each round:
+ - Segment starts at (freeze_end - buffer_before_round)
+ - Segment ends at (player death) OR (round_end + buffer_after_round_end),
+ whichever comes first
+
+ This naturally handles both death and survival cases.
+
+ Args:
+ deaths: List of death events, sorted by tick
+ rounds: List of round boundaries, sorted by round_num
+ tickrate: Demo tickrate for time calculations
+ total_ticks: Total demo ticks (for last segment)
+ buffer_before_round: Seconds before freeze_end to start segment (default 5s)
+ buffer_after_round_end: Seconds after round_end to show (default 2s)
+
+ Returns:
+ List of AliveSegment objects, sorted by start_tick
+ """
+ if not rounds:
+ return []
+
+ alive_segments = []
+ buffer_before_ticks = int(buffer_before_round * tickrate)
+ buffer_after_ticks = int(buffer_after_round_end * tickrate)
+
+ # Create a set of death ticks for quick lookup
+ death_ticks = {d.tick: d for d in deaths}
+
+ for i, r in enumerate(rounds):
+ # Determine segment start: freeze_end - buffer (or prestart if no freeze_end)
+ if r.freeze_end_tick is not None:
+ segment_start_tick = r.freeze_end_tick - buffer_before_ticks
+ else:
+ # Fallback: use prestart + estimated freeze time (~15s)
+ segment_start_tick = r.prestart_tick + int(15.0 * tickrate) - buffer_before_ticks
+
+ segment_start_tick = max(0, segment_start_tick) # Clamp to demo start
+
+ # Determine round boundaries for finding deaths
+ round_start_tick = r.freeze_end_tick if r.freeze_end_tick else r.prestart_tick
+ round_end_tick = r.end_tick
+
+ # If no round end, use next round's prestart or total_ticks
+ if round_end_tick is None:
+ if i + 1 < len(rounds):
+ round_end_tick = rounds[i + 1].prestart_tick
+ else:
+ round_end_tick = total_ticks
+
+ # Find first death in this round (between freeze_end and round_end)
+ death_in_round = None
+ for death in deaths:
+ if round_start_tick <= death.tick < round_end_tick:
+ death_in_round = death
+ break
+
+ # Determine segment end
+ if death_in_round:
+ segment_end_tick = death_in_round.tick
+ reason = "death"
+ elif r.end_tick is not None:
+ segment_end_tick = r.end_tick + buffer_after_ticks
+ reason = "round_end"
+ else:
+ segment_end_tick = round_end_tick
+ reason = "demo_end"
+
+ # Create segment if valid (end > start)
+ if segment_end_tick > segment_start_tick:
+ alive_segments.append(AliveSegment(
+ start_tick=segment_start_tick,
+ end_tick=segment_end_tick,
+ start_time=segment_start_tick / tickrate,
+ end_time=segment_end_tick / tickrate,
+ round_num=r.round_num,
+ reason_ended=reason,
+ ))
+
+ # Merge overlapping segments
+ return _merge_alive_segments(alive_segments)
+
+
+def _merge_alive_segments(segments: list[AliveSegment]) -> list[AliveSegment]:
+ """Merge overlapping or adjacent alive segments."""
+ if not segments:
+ return []
+
+ # Sort by start tick
+ sorted_segments = sorted(segments, key=lambda s: s.start_tick)
+ merged = [sorted_segments[0]]
+
+ for current in sorted_segments[1:]:
+ prev = merged[-1]
+
+ # Check if current overlaps or is adjacent to previous
+ if current.start_tick <= prev.end_tick:
+ # Extend previous segment if current ends later
+ if current.end_tick > prev.end_tick:
+ # Create new merged segment
+ merged[-1] = AliveSegment(
+ start_tick=prev.start_tick,
+ end_tick=current.end_tick,
+ start_time=prev.start_time,
+ end_time=current.end_time,
+ round_num=prev.round_num, # Keep first round's number
+ reason_ended=current.reason_ended, # Use later reason
+ )
+ else:
+ # No overlap, add as new segment
+ merged.append(current)
+
+ return merged
+
+
def get_trim_periods(timeline: DemoTimeline) -> list[tuple[float, float]]:
"""Get periods to trim from video based on timeline data.
diff --git a/cs2pov/trim.py b/cs2pov/trim.py
index 82e33d5..534dae6 100644
--- a/cs2pov/trim.py
+++ b/cs2pov/trim.py
@@ -425,3 +425,122 @@ def trim_video_with_periods(
]
return trim_death_periods(input_path, output_path, death_periods, verbose)
+
+
+def extract_and_concat_segments(
+ input_path: Path,
+ output_path: Path,
+ segments: list[tuple[float, float]],
+ verbose: bool = False,
+) -> bool:
+ """Extract and concatenate video segments using FFmpeg.
+
+ This is a simplified interface that takes the segments to KEEP directly,
+ rather than computing them from periods to remove.
+
+ Args:
+ input_path: Path to input video
+ output_path: Path for output video
+ segments: List of (start, end) tuples in video time (seconds)
+ verbose: Print debug output
+
+ Returns:
+ True if trimming was performed, False on failure
+ """
+ if not segments:
+ if verbose:
+ print(" [Trim] No segments to extract")
+ return False
+
+ # Sort segments by start time
+ segments = sorted(segments, key=lambda s: s[0])
+
+ if verbose:
+ total_duration = sum(end - start for start, end in segments)
+ print(f" [Trim] Extracting {len(segments)} segments, total duration: {total_duration:.2f}s")
+
+ # Create concat file and segment files in temp directory
+ with tempfile.TemporaryDirectory() as tmpdir:
+ tmpdir_path = Path(tmpdir)
+ concat_file = tmpdir_path / "concat.txt"
+ segment_paths: list[Path] = []
+
+ # Extract each segment
+ for i, (start, end) in enumerate(segments):
+ segment_path = tmpdir_path / f"segment_{i:03d}.mp4"
+ segment_paths.append(segment_path)
+
+ duration = end - start
+ if verbose:
+ print(f" [Trim] Extracting segment {i}: {start:.2f}s - {end:.2f}s ({duration:.2f}s)")
+
+ cmd = [
+ "ffmpeg",
+ "-y",
+ "-ss", str(start),
+ "-i", str(input_path),
+ "-t", str(duration),
+ "-c", "copy", # No re-encoding
+ "-avoid_negative_ts", "make_zero",
+ str(segment_path)
+ ]
+
+ try:
+ result = subprocess.run(
+ cmd,
+ capture_output=True,
+ text=True,
+ timeout=300 # 5 min per segment
+ )
+ if result.returncode != 0:
+ if verbose:
+ print(f" [Trim] Segment extraction failed: {result.stderr}")
+ return False
+ except subprocess.TimeoutExpired:
+ if verbose:
+ print(" [Trim] Segment extraction timed out")
+ return False
+
+ # Create concat file
+ with open(concat_file, 'w') as f:
+ for segment_path in segment_paths:
+ # Escape single quotes in path
+ escaped_path = str(segment_path).replace("'", "'\\''")
+ f.write(f"file '{escaped_path}'\n")
+
+ if verbose:
+ print(f" [Trim] Concatenating {len(segment_paths)} segments")
+
+ # Concatenate segments
+ cmd = [
+ "ffmpeg",
+ "-y",
+ "-f", "concat",
+ "-safe", "0",
+ "-i", str(concat_file),
+ "-c", "copy",
+ str(output_path)
+ ]
+
+ try:
+ result = subprocess.run(
+ cmd,
+ capture_output=True,
+ text=True,
+ timeout=600 # 10 min for concat
+ )
+ if result.returncode != 0:
+ if verbose:
+ print(f" [Trim] Concatenation failed: {result.stderr}")
+ return False
+ except subprocess.TimeoutExpired:
+ if verbose:
+ print(" [Trim] Concatenation timed out")
+ return False
+
+ if verbose:
+ if output_path.exists():
+ output_duration = get_video_duration(output_path)
+ print(f" [Trim] Output video: {output_duration:.2f}s")
+
+ return True