85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395 | @define(slots=False)
class BrokenAudio:
mode: BrokenAudioMode = BrokenAudioMode.Realtime.field()
data: numpy.ndarray = None
"""Progressive audio data, shape: (channels, samples)"""
dtype: numpy.dtype = numpy.float32
"""Data type of the audio samples"""
tell: int = 0
"""The number of samples read from the audio so far"""
def __post__(self):
BrokenWorker.thread(self._play_thread)
BrokenWorker.thread(self._record_thread)
self.create_buffer()
@property
def buffer_size(self) -> Samples:
return int(self.samplerate*self.buffer_seconds)
@property
def shape(self) -> tuple[Channels, Samples]:
return (self.channels, self.buffer_size)
def create_buffer(self) -> None:
self.data = numpy.zeros(self.shape, dtype=self.dtype)
def add_data(self, data: numpy.ndarray) -> Optional[numpy.ndarray]:
"""
Roll the data to the left by the length of the new data; copy new data to the end
Note: Channel count must match the buffer's one
Args:
data: The new data of shape: (channels, length)
Returns:
The data that was written, if any
"""
data = numpy.array(data, dtype=self.dtype)
length = data.shape[1]
self.data = numpy.roll(self.data, -length, axis=1)
self.data[:, -length:] = data
self.tell += length
return data
def get_data_between_samples(self, start: Samples, end: Samples) -> numpy.ndarray:
return self.data[:, int(start):int(end)]
def get_data_between_seconds(self, start: Seconds, end: Seconds) -> numpy.ndarray:
return self.get_data_between_samples(start*self.samplerate, end*self.samplerate)
def get_last_n_samples(self, n: Samples, *, offset: Samples=0) -> numpy.ndarray:
return self.data[:, -(int(n+offset) + 1) : -(int(offset) + 1)]
def get_last_n_seconds(self, n: Seconds) -> numpy.ndarray:
return self.get_last_n_samples(n*self.samplerate)
# -------------------------------------------|
# Sample Rate
_samplerate: Hertz = 44100
@property
def samplerate(self) -> Hertz:
"""How many data points per second the audio is sampled at. Defaults to 44100"""
return (self._samplerate or 44100)
@samplerate.setter
def samplerate(self, value: Hertz):
self._samplerate = value
self.create_buffer()
# -------------------------------------------|
# Channels
_channels: int = 2
@property
def channels(self) -> int:
"""Number of audio streams (channels). Two is stereo, one is mono. Defaults to 2"""
return self._channels or 2
@channels.setter
def channels(self, value: int):
self._channels = value
self.create_buffer()
# -------------------------------------------|
# History
_buffer_seconds: Seconds = 30.0
@property
def buffer_seconds(self) -> Seconds:
"""Buffer length in seconds. Cheap on ram and fast, ideally have a decent side"""
# Note: To convince yourself, (48000 Hz) * (2 Channels) * (30 sec) * (f32=4 bytes) = 11 MB
return self._buffer_seconds
@buffer_seconds.setter
def buffer_seconds(self, value: Seconds):
self._buffer_seconds = value
self.create_buffer()
# -------------------------------------------|
# File
_file: Path = None
_file_reader: BrokenAudioReader = None
_file_stream: Generator[tuple[Seconds, numpy.ndarray], None, Seconds] = None
@property
def file(self) -> Path:
return self._file
@file.setter
def file(self, value: Path):
self._file = BrokenPath.get(value)
if self._file and not (self._file.exists()):
return log.minor(f"Audio File doesn't exist ({value})")
self.samplerate = BrokenFFmpeg.get_audio_samplerate(self.file, echo=False)
self.channels = BrokenFFmpeg.get_audio_channels(self.file, echo=False)
self._file_reader = BrokenAudioReader(path=self.file)
self._file_stream = self._file_reader.stream
self.mode = BrokenAudioMode.File
self.close_recorder()
# -------------------------------------------|
# Soundcard
recorder_device: Any = None
recorder: Any = None
@staticmethod
def recorders() -> Iterable['soundcard._Recorder']:
yield from soundcard.all_microphones(include_loopback=True)
@staticmethod
def recorders_names() -> Iterable[str]:
yield from map(lambda device: device.name, BrokenAudio.recorders())
speaker_device: Any = None
speaker: Any = None
@staticmethod
def speakers() -> Iterable['soundcard._Speaker']:
yield from soundcard.all_speakers()
@staticmethod
def speakers_names() -> Iterable[str]:
yield from map(lambda device: device.name, BrokenAudio.speakers())
def print_recorders(self) -> None:
"""List and print all available Audio recording devices"""
log.info("Recording Devices:")
for i, device in enumerate(BrokenAudio.recorders()):
log.info(f"• ({i:2d}) Recorder: '{device.name}'")
def print_speakers(self) -> None:
"""List and print all available Audio playback devices"""
log.info("Playback Devices:")
for i, device in enumerate(BrokenAudio.speakers()):
log.info(f"• ({i:2d}) Speaker: '{device.name}'")
def __fuzzy__(self, name: str, devices: Iterable[str]) -> Optional[str]:
device_name = fuzzy_string_search(name, devices)[0]
return next(filter(lambda x: x.name == device_name, devices), None)
def open_speaker(self,
name: str=None,
*,
samplerate: Hertz=None,
) -> Self:
"""
Open a SoundCard device for playing real-time audio.
Args:
name: The name of the device to open. If None, the default speaker is used. The search
is fuzzy, so the match does not need to be exact
samplerate: If None, gets self.samplerate
Returns:
Self, Fluent interface
"""
(self.speaker or Nothing()).__exit__(None, None, None)
# Search for the Speaker
if name is None:
self.speaker_device = soundcard.default_speaker()
else:
self.speaker_device = self.__fuzzy__(name, self.speakers_names)
# Open the speaker
log.info(f"Opening Speaker with Device ({self.speaker_device})")
self.speaker = self.speaker_device.player(
samplerate=samplerate or self.samplerate,
).__enter__()
return self
def close_speaker(self) -> Self:
(self.speaker or Nothing()).__exit__(None, None, None)
self.speaker = None
return self
def open_recorder(self,
name: str=None,
*,
samplerate: Hertz=44100,
channels: list[int]=None,
blocksize: int=512,
) -> Self:
"""
Open a SoundCard device for recording real-time audio.
Args:
name: The name of the device to open. If None, the first loopback device or default
microphone is used. The search is fuzzy, so the match does not need to be exact
samplerate: The desired sample rate of the audio
channels: Channels to read from the device.
• None: Record all available channels
• list[int]: Record only the specified channels
• -1: (Linux: Mono mix of all channels) (MacOS: Silence)
blocksize: Desired minimum latency in samples, and also the number of recorded
samples at a time. Lower values reduces latency and increases CPU usage, which
funnily enough might cause latency issues
Returns:
Self, Fluent interface
"""
self.close_recorder()
# Search for default loopback device
if name is None:
for device in self.recorders():
if device.isloopback:
self.recorder_device = device
break
self.recorder_device = (self.recorder_device or soundcard.default_microphone())
else:
self.recorder_device = self.__fuzzy__(name, self.recorders_names())
# Open the recorder
log.info(f"Opening Recorder with Device ({self.recorder_device})")
self.recorder = self.recorder_device.recorder(
samplerate=samplerate,
channels=channels,
blocksize=blocksize,
).__enter__()
# Update properties
self.samplerate = getattr(self.recorder, "_samplerate", samplerate)
self.channels = self.recorder_device.channels
self.mode = BrokenAudioMode.Realtime
return self
def close_recorder(self) -> Self:
(self.recorder or Nothing()).__exit__(None, None, None)
self.recorder = None
return self
def record(self, numframes: int=None) -> Optional[numpy.ndarray]:
"""Record a number of samples from the recorder. 'None' records all"""
if (self.recorder is not None):
return self.add_data(self.recorder.record(numframes=numframes).T)
def _record_thread(self) -> None:
while True:
try:
if (self.record() is None):
time.sleep(0.01)
except Exception:
pass
# # Playing
_play_queue: deque[numpy.ndarray] = Factory(deque)
def play(self, data: numpy.ndarray) -> None:
"""Add a numpy array to the play queue. for non-blocking playback"""
if (self.speaker_device is not None):
self._play_queue.append(data)
def _play_thread(self) -> None:
while True:
if (self._play_queue and self.speaker):
self.speaker.play(self._play_queue.popleft().T)
continue
time.sleep(0.01)
# -------------------------------------------|
# Properties utils
@property
def stereo(self) -> bool:
return (self.channels == 2)
@property
def mono(self) -> bool:
return (self.channels == 1)
@property
def duration(self) -> Seconds:
if self.mode == BrokenAudioMode.Realtime:
return math.inf
if self.mode == BrokenAudioMode.File:
return BrokenFFmpeg.get_audio_duration(self.file)
|