-
Notifications
You must be signed in to change notification settings - Fork 0
/
av1frameextractor.py
176 lines (143 loc) · 6.43 KB
/
av1frameextractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#!/usr/bin/env python3
"""
Multi-format AV1 Frame Extractor
This script processes video files that may contain AV1 video in various container
formats, extracting high-quality frame snapshots both evenly spaced and randomly
across the video timeline.
Usage:
python multi_format_av1_extractor.py input_dir output_dir num_snapshots [--limit LIMIT]
Dependencies:
- FFmpeg (https://ffmpeg.org/)
- Python 3.6+
- ffmpeg-python (pip install ffmpeg-python)
- Pillow (pip install Pillow)
Author: Claude
Date: September 4, 2024
"""
import argparse
import os
import random
import subprocess
import sys
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
import ffmpeg
from PIL import Image
# List of common container formats that can host AV1 video
SUPPORTED_EXTENSIONS = ('.mp4', '.webm', '.mkv', '.avi', '.mov', '.ogg', '.ts', '.m4v')
def check_dependencies():
"""Check if required dependencies are installed."""
try:
subprocess.run(["ffmpeg", "-version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
except subprocess.CalledProcessError:
print("Error: FFmpeg is not installed or not in the system PATH.")
print("Please install FFmpeg: https://ffmpeg.org/download.html")
sys.exit(1)
try:
import ffmpeg
except ImportError:
print("Error: ffmpeg-python is not installed.")
print("Please install it using: pip install ffmpeg-python")
sys.exit(1)
try:
from PIL import Image
except ImportError:
print("Error: Pillow is not installed.")
print("Please install it using: pip install Pillow")
sys.exit(1)
def get_video_info(video_path):
"""Get video information using FFprobe."""
try:
probe = ffmpeg.probe(video_path)
video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)
if video_stream is None:
print(f"No video stream found in {video_path}")
return None
return {
'duration': float(probe['format']['duration']),
'codec': video_stream['codec_name'],
'width': int(video_stream['width']),
'height': int(video_stream['height'])
}
except ffmpeg.Error as e:
print(f"Error probing video {video_path}: {e.stderr.decode()}")
return None
def extract_frame(video_path, output_path, timestamp):
"""Extract a single frame from the video at the given timestamp."""
try:
(
ffmpeg
.input(video_path, ss=timestamp)
.filter('select', 'gte(n,0)')
.output(output_path, vframes=1, format='image2', vcodec='png')
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
)
return True
except ffmpeg.Error as e:
print(f"Error extracting frame at {timestamp} from {video_path}: {e.stderr.decode()}")
return False
def is_high_quality(image_path, min_width=640, min_height=480):
"""Check if the image meets the quality criteria."""
try:
with Image.open(image_path) as img:
width, height = img.size
return width >= min_width and height >= min_height
except Exception as e:
print(f"Error checking image quality for {image_path}: {e}")
return False
def process_video(video_path, output_dir, num_snapshots):
"""Process a single video file, extracting evenly spaced and random frames."""
video_name = Path(video_path).stem
video_info = get_video_info(video_path)
if video_info is None:
print(f"Skipping {video_path}: Unable to get video information")
return
if video_info['codec'].lower() != 'av1':
print(f"Skipping {video_path}: Not an AV1 encoded video (codec: {video_info['codec']})")
return
duration = video_info['duration']
if duration <= 120: # Skip videos shorter than 2 minutes
print(f"Skipping {video_path}: Too short (duration: {duration:.2f} seconds)")
return
print(f"Processing {video_path} (duration: {duration:.2f} seconds, resolution: {video_info['width']}x{video_info['height']})")
# Exclude first and last minute
start_time = 60
end_time = duration - 60
interval = (end_time - start_time) / (num_snapshots + 1)
# Evenly spaced snapshots
even_timestamps = [start_time + i * interval for i in range(1, num_snapshots + 1)]
# Random snapshots
random_timestamps = [random.uniform(start_time, end_time) for _ in range(num_snapshots)]
for i, timestamp in enumerate(even_timestamps + random_timestamps):
snapshot_type = "even" if i < num_snapshots else "random"
output_path = os.path.join(output_dir, f"{video_name}_{snapshot_type}_{i % num_snapshots + 1}.png")
if extract_frame(video_path, output_path, timestamp):
if is_high_quality(output_path):
print(f"Extracted {snapshot_type} frame {i % num_snapshots + 1} from {video_name}")
else:
os.remove(output_path)
print(f"Discarded low-quality {snapshot_type} frame {i % num_snapshots + 1} from {video_name}")
def main():
"""Main function to process video files and extract frame snapshots."""
parser = argparse.ArgumentParser(description="Extract high-quality frame snapshots from AV1 videos in various container formats.")
parser.add_argument("input_dir", help="Input directory containing video files")
parser.add_argument("output_dir", help="Output directory for extracted frame snapshots")
parser.add_argument("num_snapshots", type=int, help="Number of snapshots to extract per video")
parser.add_argument("--limit", type=int, help="Limit the number of videos to process")
args = parser.parse_args()
check_dependencies()
input_dir = Path(args.input_dir)
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
video_files = [f for f in input_dir.iterdir() if f.suffix.lower() in SUPPORTED_EXTENSIONS]
if args.limit:
video_files = video_files[:args.limit]
print(f"Found {len(video_files)} potentially compatible video files to process")
with ThreadPoolExecutor() as executor:
futures = [executor.submit(process_video, str(video), str(output_dir), args.num_snapshots) for video in video_files]
for future in futures:
future.result()
print("Frame extraction completed")
if __name__ == "__main__":
main()