21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198 | @define(slots=False)
class BrokenSmartVideoFrames(BrokenAttrs):
path: Path = None
buffer: Seconds = 60
threads: int = 6
quality: int = 95
time: Seconds = 0
lossless: bool = True
_width: int = None
_height: int = None
_fps: Hertz = None
_turbo: Any = None
_raw: deque = Factory(deque)
_frames: dict = Factory(dict)
# Dynamically set
encode: Callable = None
decode: Callable = None
@property
def max_raw(self) -> int:
return self.threads*4
@property
def width(self) -> int:
return self._width
@property
def height(self) -> int:
return self._height
# # Initialization
LOSSLESS_MAX_BUFFER_LENGTH = 4
def __post__(self):
self._fps = BrokenFFmpeg.get_video_framerate(self.path)
self._width, self._height = BrokenFFmpeg.get_video_resolution(self.path)
if not all((self._fps, self._width, self._height)):
raise ValueError("Could not get video metadata")
# TurboJPEG will raise if shared lib is not found
with contextlib.suppress(RuntimeError, ModuleNotFoundError):
import turbojpeg
self._turbo = turbojpeg.TurboJPEG()
if self.lossless:
self.log_warning("Using lossless frames. Limiting buffer length for Out of Memory safety")
self.buffer = min(self.buffer, BrokenSmartVideoFrames.LOSSLESS_MAX_BUFFER_LENGTH)
self.encode = lambda frame: frame
self.decode = lambda frame: frame
elif (self._turbo is not None):
self.log_success("Using TurboJPEG for compression. Best speeds available")
self.encode = lambda frame: self._turbo.encode(frame, quality=self.quality)
self.decode = lambda frame: self._turbo.decode(frame)
elif ("cv2" in sys.modules):
self.log_success("Using OpenCV for compression. Slower than TurboJPEG but enough")
self.encode = lambda frame: cv2.imencode(".jpeg", frame)[1]
self.decode = lambda frame: cv2.imdecode(frame, cv2.IMREAD_COLOR)
else:
self.log_warning("Using PIL for compression. Performance killer GIL fallback")
self.decode = lambda frame: PIL.Image.open(io.BytesIO(frame))
self.encode = lambda frame: PIL.Image.fromarray(frame).save(
io.BytesIO(), format="jpeg", quality=self.quality
)
# Create worker threads. The good, the bad and the ugly
BrokenWorker.thread(self.extracto)
BrokenWorker.thread(self.deleter)
for _ in range(self.threads):
BrokenWorker.thread(self.worke)
# # Utilities
def time2index(self, time: Seconds) -> int:
return int(time*self._fps)
def index2time(self, index: int) -> Seconds:
return (index/self._fps)
# # Check if we can decode and encode with the libraries
def get_frame(self, time: Seconds) -> tuple[int, numpy.ndarray]:
want = self.time2index(time)
self.time = time
import time
# Wait until the frame exists
while (jpeg := self._frames.get(want)) is None:
time.sleep(0.01)
return (want, lambda: self.decode(jpeg))
@property
def buffer_frames(self) -> int:
return int(self.buffer*self._fps)
@property
def time_index(self) -> int:
return self.time2index(self.time)
@property
def _future_index(self) -> int:
return self.time_index + self.buffer_frames
def _future_window(self, index: int) -> bool:
return index < self._future_index
@property
def _past_index(self) -> int:
return self.time_index - self.buffer_frames
def _past_window(self, index: int) -> bool:
return self._past_index < index
def _time_window(self, index: int) -> bool:
return self._past_window(index) and self._future_window(index)
def _should_rewind(self, index: int) -> bool:
"""Point must be older than the past cutoff to trigger a rewind"""
return (self.time_index + self.buffer_frames) < index
# # Workers
_oldest: int = 0
_newest: int = 0
def extractor(self):
def forward():
for index, frame in enumerate(BrokenFFmpeg.iter_video_frames(self.path)):
# Skip already processed frames
if self._frames.get(index) is not None:
continue
# Skip frames outside of the past time window
if not self._past_window(index):
continue
while not self._future_window(index):
if self._should_rewind(index):
return
time.sleep(0.01)
# Limit how much raw frames there can be
while len(self._raw) > self.max_raw:
time.sleep(0.01)
self._raw.append((index, frame))
self._newest = max(self._newest, index)
while True:
forward()
def worker(self):
"""Blindly get new frames from the deque, compress and store them"""
while True:
try:
index, frame = self._raw.popleft()
frame = numpy.array(numpy.flip(frame, axis=0))
self._frames[index] = self.encode(frame)
except IndexError:
time.sleep(0.01)
def deleter(self):
"""Delete old frames that are not in the time window"""
while True:
for index in range(self._oldest, self._past_index):
self._frames[index] = None
self._oldest = index
for index in range(self._newest, self._future_index, -1):
self._frames[index] = None
self._newest = index
time.sleep(0.5)
|