Skip to content

File: ShaderFlow/Modules/Video.py

ShaderFlow.Modules.Video

BrokenSmartVideoFrames

Bases: BrokenAttrs

Source code in Projects/ShaderFlow/ShaderFlow/Modules/Video.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
@define(slots=False)
class BrokenSmartVideoFrames(BrokenAttrs):
    path:     Path    = None
    buffer:   Seconds = 60
    threads:  int     = 6
    quality:  int     = 95
    time:     Seconds = 0
    lossless: bool    = True
    _width:   int     = None
    _height:  int     = None
    _fps:     Hertz   = None
    _turbo:   Any     = None
    _raw:     deque   = Factory(deque)
    _frames:  dict    = Factory(dict)

    # Dynamically set
    encode:  Callable = None
    decode:  Callable = None

    @property
    def max_raw(self) -> int:
        return self.threads*4

    @property
    def width(self) -> int:
        return self._width

    @property
    def height(self) -> int:
        return self._height

    # # Initialization

    LOSSLESS_MAX_BUFFER_LENGTH = 4

    def __post__(self):
        self._fps = BrokenFFmpeg.get_video_framerate(self.path)
        self._width, self._height = BrokenFFmpeg.get_video_resolution(self.path)

        if not all((self._fps, self._width, self._height)):
            raise ValueError("Could not get video metadata")

        # TurboJPEG will raise if shared lib is not found
        with contextlib.suppress(RuntimeError, ModuleNotFoundError):
            import turbojpeg
            self._turbo = turbojpeg.TurboJPEG()

        if self.lossless:
            self.log_warning("Using lossless frames. Limiting buffer length for Out of Memory safety")
            self.buffer = min(self.buffer, BrokenSmartVideoFrames.LOSSLESS_MAX_BUFFER_LENGTH)
            self.encode = lambda frame: frame
            self.decode = lambda frame: frame

        elif (self._turbo is not None):
            self.log_success("Using TurboJPEG for compression. Best speeds available")
            self.encode = lambda frame: self._turbo.encode(frame, quality=self.quality)
            self.decode = lambda frame: self._turbo.decode(frame)

        elif ("cv2" in sys.modules):
            self.log_success("Using OpenCV for compression. Slower than TurboJPEG but enough")
            self.encode = lambda frame: cv2.imencode(".jpeg", frame)[1]
            self.decode = lambda frame: cv2.imdecode(frame, cv2.IMREAD_COLOR)

        else:
            self.log_warning("Using PIL for compression. Performance killer GIL fallback")
            self.decode = lambda frame: PIL.Image.open(io.BytesIO(frame))
            self.encode = lambda frame: PIL.Image.fromarray(frame).save(
                io.BytesIO(), format="jpeg", quality=self.quality
            )

        # Create worker threads. The good, the bad and the ugly
        BrokenWorker.thread(self.extracto)
        BrokenWorker.thread(self.deleter)
        for _ in range(self.threads):
            BrokenWorker.thread(self.worke)

    # # Utilities

    def time2index(self, time: Seconds) -> int:
        return int(time*self._fps)

    def index2time(self, index: int) -> Seconds:
        return (index/self._fps)

    # # Check if we can decode and encode with the libraries

    def get_frame(self, time: Seconds) -> tuple[int, numpy.ndarray]:
        want = self.time2index(time)
        self.time = time
        import time

        # Wait until the frame exists
        while (jpeg := self._frames.get(want)) is None:
            time.sleep(0.01)

        return (want, lambda: self.decode(jpeg))

    @property
    def buffer_frames(self) -> int:
        return int(self.buffer*self._fps)

    @property
    def time_index(self) -> int:
        return self.time2index(self.time)

    @property
    def _future_index(self) -> int:
        return self.time_index + self.buffer_frames

    def _future_window(self, index: int) -> bool:
        return index < self._future_index

    @property
    def _past_index(self) -> int:
        return self.time_index - self.buffer_frames

    def _past_window(self, index: int) -> bool:
        return self._past_index < index

    def _time_window(self, index: int) -> bool:
        return self._past_window(index) and self._future_window(index)

    def _should_rewind(self, index: int) -> bool:
        """Point must be older than the past cutoff to trigger a rewind"""
        return (self.time_index + self.buffer_frames) < index

    # # Workers

    _oldest: int = 0
    _newest: int = 0

    def extractor(self):
        def forward():
            for index, frame in enumerate(BrokenFFmpeg.iter_video_frames(self.path)):

                # Skip already processed frames
                if self._frames.get(index) is not None:
                    continue

                # Skip frames outside of the past time window
                if not self._past_window(index):
                    continue

                while not self._future_window(index):
                    if self._should_rewind(index):
                        return
                    time.sleep(0.01)

                # Limit how much raw frames there can be
                while len(self._raw) > self.max_raw:
                    time.sleep(0.01)

                self._raw.append((index, frame))
                self._newest = max(self._newest, index)

        while True:
            forward()

    def worker(self):
        """Blindly get new frames from the deque, compress and store them"""
        while True:
            try:
                index, frame = self._raw.popleft()
                frame = numpy.array(numpy.flip(frame, axis=0))
                self._frames[index] = self.encode(frame)
            except IndexError:
                time.sleep(0.01)

    def deleter(self):
        """Delete old frames that are not in the time window"""
        while True:
            for index in range(self._oldest, self._past_index):
                self._frames[index] = None
                self._oldest = index
            for index in range(self._newest, self._future_index, -1):
                self._frames[index] = None
                self._newest = index
            time.sleep(0.5)

path

path: Path = None

buffer

buffer: Seconds = 60

threads

threads: int = 6

quality

quality: int = 95

time

time: Seconds = 0

lossless

lossless: bool = True

encode

encode: Callable = None

decode

decode: Callable = None

max_raw

max_raw: int

width

width: int

height

height: int

LOSSLESS_MAX_BUFFER_LENGTH

LOSSLESS_MAX_BUFFER_LENGTH = 4

__post__

__post__()
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Video.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def __post__(self):
    self._fps = BrokenFFmpeg.get_video_framerate(self.path)
    self._width, self._height = BrokenFFmpeg.get_video_resolution(self.path)

    if not all((self._fps, self._width, self._height)):
        raise ValueError("Could not get video metadata")

    # TurboJPEG will raise if shared lib is not found
    with contextlib.suppress(RuntimeError, ModuleNotFoundError):
        import turbojpeg
        self._turbo = turbojpeg.TurboJPEG()

    if self.lossless:
        self.log_warning("Using lossless frames. Limiting buffer length for Out of Memory safety")
        self.buffer = min(self.buffer, BrokenSmartVideoFrames.LOSSLESS_MAX_BUFFER_LENGTH)
        self.encode = lambda frame: frame
        self.decode = lambda frame: frame

    elif (self._turbo is not None):
        self.log_success("Using TurboJPEG for compression. Best speeds available")
        self.encode = lambda frame: self._turbo.encode(frame, quality=self.quality)
        self.decode = lambda frame: self._turbo.decode(frame)

    elif ("cv2" in sys.modules):
        self.log_success("Using OpenCV for compression. Slower than TurboJPEG but enough")
        self.encode = lambda frame: cv2.imencode(".jpeg", frame)[1]
        self.decode = lambda frame: cv2.imdecode(frame, cv2.IMREAD_COLOR)

    else:
        self.log_warning("Using PIL for compression. Performance killer GIL fallback")
        self.decode = lambda frame: PIL.Image.open(io.BytesIO(frame))
        self.encode = lambda frame: PIL.Image.fromarray(frame).save(
            io.BytesIO(), format="jpeg", quality=self.quality
        )

    # Create worker threads. The good, the bad and the ugly
    BrokenWorker.thread(self.extracto)
    BrokenWorker.thread(self.deleter)
    for _ in range(self.threads):
        BrokenWorker.thread(self.worke)

time2index

time2index(time: Seconds) -> int
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Video.py
 99
100
def time2index(self, time: Seconds) -> int:
    return int(time*self._fps)

index2time

index2time(index: int) -> Seconds
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Video.py
102
103
def index2time(self, index: int) -> Seconds:
    return (index/self._fps)

get_frame

get_frame(time: Seconds) -> tuple[int, numpy.ndarray]
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Video.py
107
108
109
110
111
112
113
114
115
116
def get_frame(self, time: Seconds) -> tuple[int, numpy.ndarray]:
    want = self.time2index(time)
    self.time = time
    import time

    # Wait until the frame exists
    while (jpeg := self._frames.get(want)) is None:
        time.sleep(0.01)

    return (want, lambda: self.decode(jpeg))

buffer_frames

buffer_frames: int

time_index

time_index: int

extractor

extractor()
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Video.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def extractor(self):
    def forward():
        for index, frame in enumerate(BrokenFFmpeg.iter_video_frames(self.path)):

            # Skip already processed frames
            if self._frames.get(index) is not None:
                continue

            # Skip frames outside of the past time window
            if not self._past_window(index):
                continue

            while not self._future_window(index):
                if self._should_rewind(index):
                    return
                time.sleep(0.01)

            # Limit how much raw frames there can be
            while len(self._raw) > self.max_raw:
                time.sleep(0.01)

            self._raw.append((index, frame))
            self._newest = max(self._newest, index)

    while True:
        forward()

worker

worker()

Blindly get new frames from the deque, compress and store them

Source code in Projects/ShaderFlow/ShaderFlow/Modules/Video.py
179
180
181
182
183
184
185
186
187
def worker(self):
    """Blindly get new frames from the deque, compress and store them"""
    while True:
        try:
            index, frame = self._raw.popleft()
            frame = numpy.array(numpy.flip(frame, axis=0))
            self._frames[index] = self.encode(frame)
        except IndexError:
            time.sleep(0.01)

deleter

deleter()

Delete old frames that are not in the time window

Source code in Projects/ShaderFlow/ShaderFlow/Modules/Video.py
189
190
191
192
193
194
195
196
197
198
def deleter(self):
    """Delete old frames that are not in the time window"""
    while True:
        for index in range(self._oldest, self._past_index):
            self._frames[index] = None
            self._oldest = index
        for index in range(self._newest, self._future_index, -1):
            self._frames[index] = None
            self._newest = index
        time.sleep(0.5)

ShaderVideo

Bases: BrokenSmartVideoFrames, ShaderModule

Source code in Projects/ShaderFlow/ShaderFlow/Modules/Video.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
@define
class ShaderVideo(BrokenSmartVideoFrames, ShaderModule):
    name: str = "iVideo"
    texture: ShaderTexture = None

    temporal: int = 10
    """How many """

    on_frame: BrokenRelay = Factory(BrokenRelay)
    """Whenever a new video frame is decoded, this attribute is called. Preferably subscribe to
    it with `video.on_frame.subscribe(callable)` or `video.on_frame @ (A, B, C)`, see BokenRelay"""

    def __post__(self):
        self.texture = ShaderTexture(
            scene=self.scene,
            name=self.name,
            width=self.width,
            height=self.height,
            temporal=self.temporal,
            components=3,
            dtype="f1"
        )

    __same__: SameTracker = Factory(SameTracker)

    def update(self):
        index, decode = self.get_frame(self.scene.time)

        if not self.__same__(index):
            image = decode()
            self.texture.roll()
            self.texture.write(image)
            self.on_frame(image)

name

name: str = 'iVideo'

texture

texture: ShaderTexture = None

temporal

temporal: int = 10

How many

on_frame

on_frame: BrokenRelay = Factory(BrokenRelay)

Whenever a new video frame is decoded, this attribute is called. Preferably subscribe to it with video.on_frame.subscribe(callable) or video.on_frame @ (A, B, C), see BokenRelay

__post__

__post__()
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Video.py
214
215
216
217
218
219
220
221
222
223
def __post__(self):
    self.texture = ShaderTexture(
        scene=self.scene,
        name=self.name,
        width=self.width,
        height=self.height,
        temporal=self.temporal,
        components=3,
        dtype="f1"
    )

__same__

__same__: SameTracker = Factory(SameTracker)

update

update()
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Video.py
227
228
229
230
231
232
233
234
def update(self):
    index, decode = self.get_frame(self.scene.time)

    if not self.__same__(index):
        image = decode()
        self.texture.roll()
        self.texture.write(image)
        self.on_frame(image)