Skip to content

File: ShaderFlow/Modules/Audio.py

ShaderFlow.Modules.Audio

fuzzy_string_search(
    string: str,
    choices: list[str],
    many: int = 1,
    minimum_score: int = 0,
) -> list[tuple[str, int]]

Fuzzy search a string in a list of strings, returns a list of matches

Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
66
67
68
69
70
71
72
73
74
def fuzzy_string_search(string: str, choices: list[str], many: int=1, minimum_score: int=0) -> list[tuple[str, int]]:
    """Fuzzy search a string in a list of strings, returns a list of matches"""
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore")
        import thefuzz.process
        result = thefuzz.process.extract(string, choices, limit=many)
        if many == 1:
            return result[0]
        return result

root_mean_square

root_mean_square(data) -> float
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
76
77
def root_mean_square(data) -> float:
    return numpy.sqrt(numpy.mean(numpy.square(data)))

BrokenAudioMode

Bases: BrokenEnum

Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
81
82
83
class BrokenAudioMode(BrokenEnum):
    Realtime = "realtime"
    File     = "file"

Realtime

Realtime = 'realtime'

File

File = 'file'

BrokenAudio

Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
@define(slots=False)
class BrokenAudio:
    mode: BrokenAudioMode = BrokenAudioMode.Realtime.field()

    data: numpy.ndarray = None
    """Progressive audio data, shape: (channels, samples)"""

    dtype: numpy.dtype = numpy.float32
    """Data type of the audio samples"""

    tell: int = 0
    """The number of samples read from the audio so far"""

    def __post__(self):
        BrokenWorker.thread(self._play_thread)
        BrokenWorker.thread(self._record_thread)
        self.create_buffer()

    @property
    def buffer_size(self) -> Samples:
        return int(self.samplerate*self.buffer_seconds)

    @property
    def shape(self) -> tuple[Channels, Samples]:
        return (self.channels, self.buffer_size)

    def create_buffer(self) -> None:
        self.data = numpy.zeros(self.shape, dtype=self.dtype)

    def add_data(self, data: numpy.ndarray) -> Optional[numpy.ndarray]:
        """
        Roll the data to the left by the length of the new data; copy new data to the end
        Note: Channel count must match the buffer's one

        Args:
            data: The new data of shape: (channels, length)

        Returns:
            The data that was written, if any
        """
        data = numpy.array(data, dtype=self.dtype)
        length = data.shape[1]
        self.data = numpy.roll(self.data, -length, axis=1)
        self.data[:, -length:] = data
        self.tell += length
        return data

    def get_data_between_samples(self, start: Samples, end: Samples) -> numpy.ndarray:
        return self.data[:, int(start):int(end)]

    def get_data_between_seconds(self, start: Seconds, end: Seconds) -> numpy.ndarray:
        return self.get_data_between_samples(start*self.samplerate, end*self.samplerate)

    def get_last_n_samples(self, n: Samples, *, offset: Samples=0) -> numpy.ndarray:
        return self.data[:, -(int(n+offset) + 1) : -(int(offset) + 1)]

    def get_last_n_seconds(self, n: Seconds) -> numpy.ndarray:
        return self.get_last_n_samples(n*self.samplerate)

    # -------------------------------------------|
    # Sample Rate

    _samplerate: Hertz = 44100

    @property
    def samplerate(self) -> Hertz:
        """How many data points per second the audio is sampled at. Defaults to 44100"""
        return (self._samplerate or 44100)

    @samplerate.setter
    def samplerate(self, value: Hertz):
        self._samplerate = value
        self.create_buffer()

    # -------------------------------------------|
    # Channels

    _channels: int = 2

    @property
    def channels(self) -> int:
        """Number of audio streams (channels). Two is stereo, one is mono. Defaults to 2"""
        return self._channels or 2

    @channels.setter
    def channels(self, value: int):
        self._channels = value
        self.create_buffer()

    # -------------------------------------------|
    # History

    _buffer_seconds: Seconds = 30.0

    @property
    def buffer_seconds(self) -> Seconds:
        """Buffer length in seconds. Cheap on ram and fast, ideally have a decent side"""
        # Note: To convince yourself, (48000 Hz) * (2 Channels) * (30 sec) * (f32=4 bytes) = 11 MB
        return self._buffer_seconds

    @buffer_seconds.setter
    def buffer_seconds(self, value: Seconds):
        self._buffer_seconds = value
        self.create_buffer()

    # -------------------------------------------|
    # File

    _file: Path = None
    _file_reader: BrokenAudioReader = None
    _file_stream: Generator[tuple[Seconds, numpy.ndarray], None, Seconds] = None

    @property
    def file(self) -> Path:
        return self._file

    @file.setter
    def file(self, value: Path):
        self._file = BrokenPath.get(value)
        if self._file and not (self._file.exists()):
            return log.minor(f"Audio File doesn't exist ({value})")
        self.samplerate   = BrokenFFmpeg.get_audio_samplerate(self.file, echo=False)
        self.channels     = BrokenFFmpeg.get_audio_channels(self.file, echo=False)
        self._file_reader = BrokenAudioReader(path=self.file)
        self._file_stream = self._file_reader.stream
        self.mode         = BrokenAudioMode.File
        self.close_recorder()

    # -------------------------------------------|
    # Soundcard

    recorder_device: Any = None
    recorder: Any = None

    @staticmethod
    def recorders() -> Iterable['soundcard._Recorder']:
        yield from soundcard.all_microphones(include_loopback=True)

    @staticmethod
    def recorders_names() -> Iterable[str]:
        yield from map(lambda device: device.name, BrokenAudio.recorders())

    speaker_device: Any = None
    speaker: Any = None

    @staticmethod
    def speakers() -> Iterable['soundcard._Speaker']:
        yield from soundcard.all_speakers()

    @staticmethod
    def speakers_names() -> Iterable[str]:
        yield from map(lambda device: device.name, BrokenAudio.speakers())

    def print_recorders(self) -> None:
        """List and print all available Audio recording devices"""
        log.info("Recording Devices:")
        for i, device in enumerate(BrokenAudio.recorders()):
            log.info(f"• ({i:2d}) Recorder: '{device.name}'")

    def print_speakers(self) -> None:
        """List and print all available Audio playback devices"""
        log.info("Playback Devices:")
        for i, device in enumerate(BrokenAudio.speakers()):
            log.info(f"• ({i:2d}) Speaker: '{device.name}'")

    def __fuzzy__(self, name: str, devices: Iterable[str]) -> Optional[str]:
        device_name = fuzzy_string_search(name, devices)[0]
        return next(filter(lambda x: x.name == device_name, devices), None)

    def open_speaker(self,
        name: str=None,
        *,
        samplerate: Hertz=None,
    ) -> Self:
        """
        Open a SoundCard device for playing real-time audio.

        Args:
            name: The name of the device to open. If None, the default speaker is used. The search
                is fuzzy, so the match does not need to be exact

            samplerate: If None, gets self.samplerate

        Returns:
            Self, Fluent interface
        """
        (self.speaker or Nothing()).__exit__(None, None, None)

        # Search for the Speaker
        if name is None:
            self.speaker_device = soundcard.default_speaker()
        else:
            self.speaker_device = self.__fuzzy__(name, self.speakers_names)

        # Open the speaker
        log.info(f"Opening Speaker with Device ({self.speaker_device})")
        self.speaker = self.speaker_device.player(
            samplerate=samplerate or self.samplerate,
        ).__enter__()
        return self

    def close_speaker(self) -> Self:
        (self.speaker or Nothing()).__exit__(None, None, None)
        self.speaker = None
        return self

    def open_recorder(self,
        name: str=None,
        *,
        samplerate: Hertz=44100,
        channels: list[int]=None,
        blocksize: int=512,
    ) -> Self:
        """
        Open a SoundCard device for recording real-time audio.

        Args:
            name: The name of the device to open. If None, the first loopback device or default
                microphone is used. The search is fuzzy, so the match does not need to be exact

            samplerate: The desired sample rate of the audio

            channels: Channels to read from the device.
                • None: Record all available channels
                • list[int]: Record only the specified channels
                • -1: (Linux: Mono mix of all channels) (MacOS: Silence)

            blocksize: Desired minimum latency in samples, and also the number of recorded
                samples at a time. Lower values reduces latency and increases CPU usage, which
                funnily enough might cause latency issues

        Returns:
            Self, Fluent interface
        """
        self.close_recorder()

        # Search for default loopback device
        if name is None:
            for device in self.recorders():
                if device.isloopback:
                    self.recorder_device = device
                    break
            self.recorder_device = (self.recorder_device or soundcard.default_microphone())
        else:
            self.recorder_device = self.__fuzzy__(name, self.recorders_names())

        # Open the recorder
        log.info(f"Opening Recorder with Device ({self.recorder_device})")
        self.recorder = self.recorder_device.recorder(
            samplerate=samplerate,
            channels=channels,
            blocksize=blocksize,
        ).__enter__()

        # Update properties
        self.samplerate = getattr(self.recorder, "_samplerate", samplerate)
        self.channels   = self.recorder_device.channels
        self.mode       = BrokenAudioMode.Realtime
        return self

    def close_recorder(self) -> Self:
        (self.recorder or Nothing()).__exit__(None, None, None)
        self.recorder = None
        return self

    def record(self, numframes: int=None) -> Optional[numpy.ndarray]:
        """Record a number of samples from the recorder. 'None' records all"""
        if (self.recorder is not None):
            return self.add_data(self.recorder.record(numframes=numframes).T)

    def _record_thread(self) -> None:
        while True:
            try:
                if (self.record() is None):
                    time.sleep(0.01)
            except Exception:
                pass

    # # Playing

    _play_queue: deque[numpy.ndarray] = Factory(deque)

    def play(self, data: numpy.ndarray) -> None:
        """Add a numpy array to the play queue. for non-blocking playback"""
        if (self.speaker_device is not None):
            self._play_queue.append(data)

    def _play_thread(self) -> None:
        while True:
            if (self._play_queue and self.speaker):
                self.speaker.play(self._play_queue.popleft().T)
                continue
            time.sleep(0.01)

    # -------------------------------------------|
    # Properties utils

    @property
    def stereo(self) -> bool:
        return (self.channels == 2)

    @property
    def mono(self) -> bool:
        return (self.channels == 1)

    @property
    def duration(self) -> Seconds:
        if self.mode == BrokenAudioMode.Realtime:
            return math.inf
        if self.mode == BrokenAudioMode.File:
            return BrokenFFmpeg.get_audio_duration(self.file)

data

data: numpy.ndarray = None

Progressive audio data, shape: (channels, samples)

dtype

dtype: numpy.dtype = numpy.float32

Data type of the audio samples

tell

tell: int = 0

The number of samples read from the audio so far

__post__

__post__()
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
 98
 99
100
101
def __post__(self):
    BrokenWorker.thread(self._play_thread)
    BrokenWorker.thread(self._record_thread)
    self.create_buffer()

buffer_size

buffer_size: Samples

shape

shape: tuple[Channels, Samples]

create_buffer

create_buffer() -> None
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
111
112
def create_buffer(self) -> None:
    self.data = numpy.zeros(self.shape, dtype=self.dtype)

add_data

add_data(data: numpy.ndarray) -> Optional[numpy.ndarray]

Roll the data to the left by the length of the new data; copy new data to the end Note: Channel count must match the buffer's one

Parameters:

  • data (numpy.ndarray) –

    The new data of shape: (channels, length)

Returns:

  • Optional[numpy.ndarray]

    The data that was written, if any

Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def add_data(self, data: numpy.ndarray) -> Optional[numpy.ndarray]:
    """
    Roll the data to the left by the length of the new data; copy new data to the end
    Note: Channel count must match the buffer's one

    Args:
        data: The new data of shape: (channels, length)

    Returns:
        The data that was written, if any
    """
    data = numpy.array(data, dtype=self.dtype)
    length = data.shape[1]
    self.data = numpy.roll(self.data, -length, axis=1)
    self.data[:, -length:] = data
    self.tell += length
    return data

get_data_between_samples

get_data_between_samples(
    start: Samples, end: Samples
) -> numpy.ndarray
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
132
133
def get_data_between_samples(self, start: Samples, end: Samples) -> numpy.ndarray:
    return self.data[:, int(start):int(end)]

get_data_between_seconds

get_data_between_seconds(
    start: Seconds, end: Seconds
) -> numpy.ndarray
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
135
136
def get_data_between_seconds(self, start: Seconds, end: Seconds) -> numpy.ndarray:
    return self.get_data_between_samples(start*self.samplerate, end*self.samplerate)

get_last_n_samples

get_last_n_samples(
    n: Samples, *, offset: Samples = 0
) -> numpy.ndarray
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
138
139
def get_last_n_samples(self, n: Samples, *, offset: Samples=0) -> numpy.ndarray:
    return self.data[:, -(int(n+offset) + 1) : -(int(offset) + 1)]

get_last_n_seconds

get_last_n_seconds(n: Seconds) -> numpy.ndarray
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
141
142
def get_last_n_seconds(self, n: Seconds) -> numpy.ndarray:
    return self.get_last_n_samples(n*self.samplerate)

samplerate

samplerate: Hertz

How many data points per second the audio is sampled at. Defaults to 44100

channels

channels: int

Number of audio streams (channels). Two is stereo, one is mono. Defaults to 2

buffer_seconds

buffer_seconds: Seconds

Buffer length in seconds. Cheap on ram and fast, ideally have a decent side

file

file: Path

recorder_device

recorder_device: Any = None

recorder

recorder: Any = None

recorders

recorders() -> Iterable[soundcard._Recorder]
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
219
220
221
@staticmethod
def recorders() -> Iterable['soundcard._Recorder']:
    yield from soundcard.all_microphones(include_loopback=True)

recorders_names

recorders_names() -> Iterable[str]
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
223
224
225
@staticmethod
def recorders_names() -> Iterable[str]:
    yield from map(lambda device: device.name, BrokenAudio.recorders())

speaker_device

speaker_device: Any = None

speaker

speaker: Any = None

speakers

speakers() -> Iterable[soundcard._Speaker]
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
230
231
232
@staticmethod
def speakers() -> Iterable['soundcard._Speaker']:
    yield from soundcard.all_speakers()

speakers_names

speakers_names() -> Iterable[str]
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
234
235
236
@staticmethod
def speakers_names() -> Iterable[str]:
    yield from map(lambda device: device.name, BrokenAudio.speakers())

print_recorders

print_recorders() -> None

List and print all available Audio recording devices

Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
238
239
240
241
242
def print_recorders(self) -> None:
    """List and print all available Audio recording devices"""
    log.info("Recording Devices:")
    for i, device in enumerate(BrokenAudio.recorders()):
        log.info(f"• ({i:2d}) Recorder: '{device.name}'")

print_speakers

print_speakers() -> None

List and print all available Audio playback devices

Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
244
245
246
247
248
def print_speakers(self) -> None:
    """List and print all available Audio playback devices"""
    log.info("Playback Devices:")
    for i, device in enumerate(BrokenAudio.speakers()):
        log.info(f"• ({i:2d}) Speaker: '{device.name}'")

__fuzzy__

__fuzzy__(
    name: str, devices: Iterable[str]
) -> Optional[str]
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
250
251
252
def __fuzzy__(self, name: str, devices: Iterable[str]) -> Optional[str]:
    device_name = fuzzy_string_search(name, devices)[0]
    return next(filter(lambda x: x.name == device_name, devices), None)

open_speaker

open_speaker(
    name: str = None, *, samplerate: Hertz = None
) -> Self

Open a SoundCard device for playing real-time audio.

Parameters:

  • name (str, default: None ) –

    The name of the device to open. If None, the default speaker is used. The search is fuzzy, so the match does not need to be exact

  • samplerate (Hertz, default: None ) –

    If None, gets self.samplerate

Returns:

  • Self

    Self, Fluent interface

Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def open_speaker(self,
    name: str=None,
    *,
    samplerate: Hertz=None,
) -> Self:
    """
    Open a SoundCard device for playing real-time audio.

    Args:
        name: The name of the device to open. If None, the default speaker is used. The search
            is fuzzy, so the match does not need to be exact

        samplerate: If None, gets self.samplerate

    Returns:
        Self, Fluent interface
    """
    (self.speaker or Nothing()).__exit__(None, None, None)

    # Search for the Speaker
    if name is None:
        self.speaker_device = soundcard.default_speaker()
    else:
        self.speaker_device = self.__fuzzy__(name, self.speakers_names)

    # Open the speaker
    log.info(f"Opening Speaker with Device ({self.speaker_device})")
    self.speaker = self.speaker_device.player(
        samplerate=samplerate or self.samplerate,
    ).__enter__()
    return self

close_speaker

close_speaker() -> Self
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
286
287
288
289
def close_speaker(self) -> Self:
    (self.speaker or Nothing()).__exit__(None, None, None)
    self.speaker = None
    return self

open_recorder

open_recorder(
    name: str = None,
    *,
    samplerate: Hertz = 44100,
    channels: list[int] = None,
    blocksize: int = 512
) -> Self

Open a SoundCard device for recording real-time audio.

Parameters:

  • name (str, default: None ) –

    The name of the device to open. If None, the first loopback device or default microphone is used. The search is fuzzy, so the match does not need to be exact

  • samplerate (Hertz, default: 44100 ) –

    The desired sample rate of the audio

  • channels (list[int], default: None ) –

    Channels to read from the device. • None: Record all available channels • list[int]: Record only the specified channels • -1: (Linux: Mono mix of all channels) (MacOS: Silence)

  • blocksize (int, default: 512 ) –

    Desired minimum latency in samples, and also the number of recorded samples at a time. Lower values reduces latency and increases CPU usage, which funnily enough might cause latency issues

Returns:

  • Self

    Self, Fluent interface

Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
def open_recorder(self,
    name: str=None,
    *,
    samplerate: Hertz=44100,
    channels: list[int]=None,
    blocksize: int=512,
) -> Self:
    """
    Open a SoundCard device for recording real-time audio.

    Args:
        name: The name of the device to open. If None, the first loopback device or default
            microphone is used. The search is fuzzy, so the match does not need to be exact

        samplerate: The desired sample rate of the audio

        channels: Channels to read from the device.
            • None: Record all available channels
            • list[int]: Record only the specified channels
            • -1: (Linux: Mono mix of all channels) (MacOS: Silence)

        blocksize: Desired minimum latency in samples, and also the number of recorded
            samples at a time. Lower values reduces latency and increases CPU usage, which
            funnily enough might cause latency issues

    Returns:
        Self, Fluent interface
    """
    self.close_recorder()

    # Search for default loopback device
    if name is None:
        for device in self.recorders():
            if device.isloopback:
                self.recorder_device = device
                break
        self.recorder_device = (self.recorder_device or soundcard.default_microphone())
    else:
        self.recorder_device = self.__fuzzy__(name, self.recorders_names())

    # Open the recorder
    log.info(f"Opening Recorder with Device ({self.recorder_device})")
    self.recorder = self.recorder_device.recorder(
        samplerate=samplerate,
        channels=channels,
        blocksize=blocksize,
    ).__enter__()

    # Update properties
    self.samplerate = getattr(self.recorder, "_samplerate", samplerate)
    self.channels   = self.recorder_device.channels
    self.mode       = BrokenAudioMode.Realtime
    return self

close_recorder

close_recorder() -> Self
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
345
346
347
348
def close_recorder(self) -> Self:
    (self.recorder or Nothing()).__exit__(None, None, None)
    self.recorder = None
    return self

record

record(numframes: int = None) -> Optional[numpy.ndarray]

Record a number of samples from the recorder. 'None' records all

Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
350
351
352
353
def record(self, numframes: int=None) -> Optional[numpy.ndarray]:
    """Record a number of samples from the recorder. 'None' records all"""
    if (self.recorder is not None):
        return self.add_data(self.recorder.record(numframes=numframes).T)

play

play(data: numpy.ndarray) -> None

Add a numpy array to the play queue. for non-blocking playback

Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
367
368
369
370
def play(self, data: numpy.ndarray) -> None:
    """Add a numpy array to the play queue. for non-blocking playback"""
    if (self.speaker_device is not None):
        self._play_queue.append(data)

stereo

stereo: bool

mono

mono: bool

duration

duration: Seconds

ShaderAudio

Bases: BrokenAudio, ShaderModule

Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
@define
class ShaderAudio(BrokenAudio, ShaderModule):

    # Todo: Move to a ShaderAudioProcessing class
    volume: ShaderDynamics = None
    std:    ShaderDynamics = None
    final:  bool = True

    def __post__(self):
        self.volume = ShaderDynamics(
            scene=self.scene, name=f"{self.name}Volume",
            frequency=2, zeta=1, response=0, value=0,
            integrate=True,
        )
        self.std = ShaderDynamics(
            scene=self.scene, name=f"{self.name}STD",
            frequency=10, zeta=1, response=0, value=0,
        )

    def commands(self):
        return
        self.scene.cli.command(self.print_recorders, panel=self.panel_module_type)
        self.scene.cli.command(self.print_speakers, panel=self.panel_module_type)
        self.scene.cli.command(self.open_recorder, name=f"{self.name}-recorder", panel=f"{self.panel_module_type}: {self.name}")
        self.scene.cli.command(self.open_speaker, name=f"{self.name}-speaker", panel=f"{self.panel_module_type}: {self.name}")

    @property
    def duration(self) -> Seconds:
        return BrokenFFmpeg.get_audio_duration(self.file)

    def setup(self):
        self.file = self.file
        if (self.final and self.scene.realtime):
            if (self.mode == BrokenAudioMode.File):
                self.open_speaker()
            else:
                self.open_recorder()

    def ffhook(self, ffmpeg: BrokenFFmpeg) -> None:
        if BrokenPath.get(self.file, exists=True):
            ffmpeg.input(path=self.file)
            ffmpeg.shortest = True

    def update(self):
        try:
            if self._file_stream:
                self._file_reader.chunk = self.scene.rdt
                data = next(self._file_stream).T
                self.add_data(data)
                self.play(data)
        except StopIteration:
            pass

        self.volume.target = 2 * root_mean_square(self.get_last_n_seconds(0.1)) * (2**0.5)
        self.std.target    = numpy.std(self.get_last_n_seconds(0.1))

volume

volume: ShaderDynamics = None

std

std: ShaderDynamics = None

final

final: bool = True

__post__

__post__()
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
407
408
409
410
411
412
413
414
415
416
def __post__(self):
    self.volume = ShaderDynamics(
        scene=self.scene, name=f"{self.name}Volume",
        frequency=2, zeta=1, response=0, value=0,
        integrate=True,
    )
    self.std = ShaderDynamics(
        scene=self.scene, name=f"{self.name}STD",
        frequency=10, zeta=1, response=0, value=0,
    )

commands

commands()
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
418
419
420
421
422
423
def commands(self):
    return
    self.scene.cli.command(self.print_recorders, panel=self.panel_module_type)
    self.scene.cli.command(self.print_speakers, panel=self.panel_module_type)
    self.scene.cli.command(self.open_recorder, name=f"{self.name}-recorder", panel=f"{self.panel_module_type}: {self.name}")
    self.scene.cli.command(self.open_speaker, name=f"{self.name}-speaker", panel=f"{self.panel_module_type}: {self.name}")

duration

duration: Seconds

setup

setup()
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
429
430
431
432
433
434
435
def setup(self):
    self.file = self.file
    if (self.final and self.scene.realtime):
        if (self.mode == BrokenAudioMode.File):
            self.open_speaker()
        else:
            self.open_recorder()

ffhook

ffhook(ffmpeg: BrokenFFmpeg) -> None
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
437
438
439
440
def ffhook(self, ffmpeg: BrokenFFmpeg) -> None:
    if BrokenPath.get(self.file, exists=True):
        ffmpeg.input(path=self.file)
        ffmpeg.shortest = True

update

update()
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Audio.py
442
443
444
445
446
447
448
449
450
451
452
453
def update(self):
    try:
        if self._file_stream:
            self._file_reader.chunk = self.scene.rdt
            data = next(self._file_stream).T
            self.add_data(data)
            self.play(data)
    except StopIteration:
        pass

    self.volume.target = 2 * root_mean_square(self.get_last_n_seconds(0.1)) * (2**0.5)
    self.std.target    = numpy.std(self.get_last_n_seconds(0.1))