Skip to content

File: ShaderFlow/Modules/Piano.py

ShaderFlow.Modules.Piano

MAX_CHANNELS

MAX_CHANNELS = 32

MAX_ROLLING

MAX_ROLLING = 256

MAX_NOTE

MAX_NOTE = 128

ShaderPiano

Bases: ShaderModule

Source code in Projects/ShaderFlow/ShaderFlow/Modules/Piano.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
@define
class ShaderPiano(ShaderModule):
    name: str = "iPiano"
    """Texture name prefixes for this Module"""

    tempo: deque[tuple[Seconds, BPM]] = Factory(deque)
    """List of tempo changes at (seconds, bpm)"""

    keys_texture: ShaderTexture = None
    """Velocities texture, X is MIDI index, Y is Velocity, size (MAX_NOTE, 1)"""

    channel_texture: ShaderTexture = None
    """Channel being played texture, X is MIDI index, Y is Channel, size (MAX_NOTE, 1)"""

    roll_texture: ShaderTexture = None
    """Piano roll'ling notes main texture'. The X coordinate is the MIDI index, pixels contains data
    (start, end, channel, velocity), of each playing key on the Y. Size (MAX_ROLLING, MAX_NOTE)"""

    time_offset: Seconds = 0
    """Offset the notes being played search from the current time"""

    roll_time: Seconds = 2
    """How long the notes are visible"""

    height: float = 0.275
    """Height of the piano in the shader (0-1)"""

    black_ratio: float = 0.6
    """How long are black keys compared to white keys"""

    global_minimum_note: int = MAX_NOTE
    """The lowest note in the loaded notes"""

    global_maximum_note: int = 0
    """The highest note in the loaded notes"""

    extra_keys: int = 6
    """Display the dynamic range plus this many keys on each side"""

    lookahead: Seconds = 2
    """Lookup notes in (roll_time + this) for setting the dynamic ranges"""

    release_before_end: Seconds = 0.03
    """Workaround for the transition between close/glued to be perceived"""

    key_press_dynamics: DynamicNumber = Factory(lambda: DynamicNumber(
        value=numpy.zeros(MAX_NOTE, dtype=numpy.float32),
        frequency=4, zeta=0.4, response=0, precision=0
    ))

    note_range_dynamics: DynamicNumber = Factory(lambda: DynamicNumber(
        value=numpy.zeros(2, dtype=numpy.float32),
        frequency=0.05, zeta=1/(2**0.5), response=0,
    ))

    tree: dict[int, dict[int, deque[BrokenPianoNote]]] = Factory(dict)
    """Internal data structure for storing the notes"""

    @property
    def lookup_time(self) -> Seconds:
        """The full lookup time we should care for future notes (rolling+future range)"""
        return (self.roll_time + self.lookahead)

    # # Internal

    def build(self):
        self.keys_texture    = ShaderTexture(scene=self.scene, name=f"{self.name}Keys").from_numpy(self._empty_keys())
        self.channel_texture = ShaderTexture(scene=self.scene, name=f"{self.name}Chan").from_numpy(self._empty_keys())
        self.roll_texture    = ShaderTexture(scene=self.scene, name=f"{self.name}Roll").from_numpy(self._empty_roll())
        self.tempo_texture   = ShaderTexture(scene=self.scene, name=f"{self.name}Tempo").from_numpy(numpy.zeros((100, 1, 2), numpy.float32))

    def _empty_keys(self) -> numpy.ndarray:
        return numpy.zeros((1, MAX_NOTE), dtype=numpy.float32)

    def _empty_roll(self) -> numpy.ndarray:
        return numpy.zeros((MAX_NOTE, MAX_ROLLING, 4), dtype=numpy.float32)

    # # Data structure

    @staticmethod
    def _ranges(start: Seconds, end: Seconds) -> Iterable[int]:
        return range(int(start), int(end)+1)

    def clear(self):
        self.tree.clear()

    def add_note(self, note: Optional[BrokenPianoNote]) -> None:
        if note is None:
            return
        for index in self._ranges(note.start, note.end):
            self.tree.setdefault(note.note, dict()).setdefault(index, deque()).append(note)
        self.update_global_ranges(note.note)

    @property
    def notes(self) -> Iterable[BrokenPianoNote]:
        for block in self.tree.values():
            for notes in block.values():
                yield from notes

    @property
    def duration(self) -> float:
        return max((note.end for note in self.notes), default=0)

    def __iter__(self) -> Iterable[BrokenPianoNote]:
        return self.notes

    def notes_between(self, index: int, start: Seconds, end: Seconds) -> Iterable[BrokenPianoNote]:
        exists = set()
        for other in self._ranges(start, end):
            for note in self.tree.get(index, dict()).get(other, deque()):
                if (note.start > end):
                    continue
                if (id(note) in exists):
                    continue
                exists.add(id(note))
                yield note

    def update_global_ranges(self, note: int) -> None:
        self.global_minimum_note = min(self.global_minimum_note, note)
        self.global_maximum_note = max(self.global_maximum_note, note)

    @property
    def maximum_velocity(self) -> Optional[int]:
        return max((note.velocity for note in self.notes), default=None)

    @property
    def minimum_velocity(self) -> Optional[int]:
        return min((note.velocity for note in self.notes), default=None)

    def normalize_velocities(self, minimum: int=100, maximum: int=100) -> None:
        ma, mi = (self.maximum_velocity, self.minimum_velocity)

        # Safe against (minimum-maximum=0)
        def new(velocity: int) -> int:
            if (ma != mi):
                int((velocity - mi)/(ma - mi)*(maximum - minimum) + minimum)
            return int((maximum + minimum) / 2)

        for note in self.notes:
            note.velocity = new(note.velocity)

    def load_midi(self, path: Path):
        import pretty_midi

        if not (path := BrokenPath.get(path)).exists():
            self.log_warning(f"Input Midi file not found ({path})")
            return

        with Halo(log.info(f"Loading Midi file at ({path})")):
            midi = pretty_midi.PrettyMIDI(str(path))
            for channel, instrument in enumerate(midi.instruments):
                if instrument.is_drum:
                    pass
                for note in instrument.notes:
                    self.add_note(BrokenPianoNote(
                        note=note.pitch,
                        start=note.start,
                        end=note.end,
                        channel=channel,
                        velocity=note.velocity,
                    ))
            # Add tempo changes
            for when, tempo in zip(*midi.get_tempo_changes()):
                self.tempo.append((when, tempo))

        self.tempo_texture.clear()

        for offset, (when, tempo) in enumerate(self.tempo):
            self.tempo_texture.write(data=struct.pack("ff", when, tempo), viewport=(0, offset, 1, 1))

    # # Core Logic

    # A (MAX_MIDI Notes x MAX_CHANNELS Channels) matrix of the end-most note being played
    _playing_matrix: list[list[Optional[BrokenPianoNote]]] = Factory(lambda: [[None]*MAX_CHANNELS for _ in range(MAX_NOTE)])

    def update(self):

        # Utilities and trackers
        time = (self.scene.time + self.time_offset)
        upcoming = set()

        # # Get and update pressed keys
        self.key_press_dynamics.target.fill(0)
        roll = self._empty_roll()

        # Channel '-1' means the note is not being played !
        channels = (self._empty_keys() - 1)

        # Optimization: No need to check for the entire range 😉
        for midi in range(self.global_minimum_note, self.global_maximum_note+1):
            simultaneous = 0

            for note in self.notes_between(midi, time, time+self.lookup_time):
                upcoming.add(midi)

                # Ignore notes out of the viewport
                if (note.start >= time+self.roll_time):
                    continue

                # Build a 2D Grid of the piano keys being played
                # • Coordinate: (Note, #offset) @ (Start, End, Channel, Velocity)
                if (simultaneous < MAX_ROLLING):
                    roll[note.note, simultaneous] = (note.start, note.end, note.channel, note.velocity)
                    simultaneous += 1

                # Skip non-playing notes
                if not (note.start <= time <= note.end):
                    continue

                # Workaround: Don't play the full note, so close notes velocities are perceived twice
                _note_too_small = (note.end - note.start) < self.release_before_end
                _shorter_note = (time < (note.end - self.release_before_end))

                if (_shorter_note or _note_too_small):
                    self.key_press_dynamics.target[midi] = note.velocity

                # Either way, the channel must be colored
                channels[0][midi] = note.channel

                # Find empty slots or notes that will end soon, replace and play
                other = self._playing_matrix[midi][note.channel]
                if (other is None) or (other.end > note.end):
                    play_velocity = int(128*((note.velocity/128)**0.5))
                    self.fluid_key_down(midi, play_velocity, note.channel)
                    self._playing_matrix[midi][note.channel] = note

            # Find notes that are not being played
            for channel in range(MAX_CHANNELS * self.scene.realtime):
                if (other := self._playing_matrix[midi][channel]) and (other.end < time):
                    self._playing_matrix[midi][channel] = None
                    self.fluid_key_up(midi, other.channel)

        # Dynamic zoom velocity based on future lookup
        self.note_range_dynamics.frequency = 0.5/self.lookup_time

        # Set dynamic note range to the globals on the start
        if sum(self.note_range_dynamics.value) == 0:
            self.note_range_dynamics.value[:] = (self.global_minimum_note, self.global_maximum_note)

        # Set new targets for dynamic keys
        self.note_range_dynamics.target[:] = (
            min(upcoming, default=self.global_minimum_note),
            max(upcoming, default=self.global_maximum_note)
        )

        # Write to keys textures
        self.note_range_dynamics.next(dt=abs(self.scene.dt))
        self.key_press_dynamics.next(dt=abs(self.scene.dt))
        self.keys_texture.write(data=self.key_press_dynamics.value)
        self.roll_texture.write(data=roll)
        self.channel_texture.write(data=channels)

    def pipeline(self) -> Iterable[ShaderVariable]:
        yield Uniform("int",   f"{self.name}GlobalMin",  self.global_minimum_note)
        yield Uniform("int",   f"{self.name}GlobalMax",  self.global_maximum_note)
        yield Uniform("vec2",  f"{self.name}Dynamic",    self.note_range_dynamics.value)
        yield Uniform("float", f"{self.name}RollTime",   self.roll_time)
        yield Uniform("float", f"{self.name}Extra",      self.extra_keys)
        yield Uniform("float", f"{self.name}Height",     self.height)
        yield Uniform("int",   f"{self.name}Limit",      MAX_ROLLING)
        yield Uniform("float", f"{self.name}BlackRatio", self.black_ratio)

    # # Fluidsynth

    fluidsynth: Any = None
    soundfont:  Any = None

    def fluid_load(self, sf2: Path, driver: str=("pulseaudio" if BrokenPlatform.OnLinux else None)) -> None:
        if not (sf2 := BrokenPath.get(sf2)).exists():
            self.log_warning(f"Couldn't load SoundFont from path ({sf2}), will not have Real Time MIDI Audio")
            return

        # Download FluidSynth for Windows
        if BrokenPlatform.OnWindows:
            FLUIDSYNTH = "https://github.com/FluidSynth/fluidsynth/releases/download/v2.3.4/fluidsynth-2.3.4-win10-x64.zip"
            BrokenPath.add_to_path(BrokenPath.extract(BrokenPath.download(FLUIDSYNTH), BROKEN.DIRECTORIES.EXTERNALS), recurse=True)
        elif BrokenPlatform.OnMacOS:
            if not shutil.which("fluidsynth"):
                shell("brew", "install", "fluidsynth")
        elif BrokenPlatform.OnLinux:
            self.log_warning("(Linux) Please install FluidSynth in your Package Manager if needed")

        import fluidsynth
        self.fluidsynth = fluidsynth.Synth()
        with Halo(log.info(f"Loading FluidSynth SoundFont ({sf2.name})")):
            self.soundfont = self.fluidsynth.sfload(str(sf2))
        self.fluidsynth.set_reverb(1, 1, 80, 1)
        self.fluidsynth.start(driver=driver)
        for channel in range(MAX_CHANNELS):
            self.fluid_select(channel, 0, 0)

    def fluid_select(self, channel: int=0, bank: int=0, preset: int=0) -> None:
        if self.fluidsynth and self.scene.realtime:
            self.fluidsynth.program_select(channel, self.soundfont, bank, preset)

    def fluid_key_down(self, note: int, velocity: int=127, channel: int=0) -> None:
        if self.fluidsynth and self.scene.realtime:
            self.fluidsynth.noteon(channel, note, velocity)

    def fluid_key_up(self, note: int, channel: int=0) -> None:
        if self.fluidsynth and self.scene.realtime:
            self.fluidsynth.noteoff(channel, note)

    def fluid_all_notes_off(self) -> None:
        if self.fluidsynth and self.scene.realtime:
            for channel, note in itertools.product(range(MAX_CHANNELS), range(MAX_NOTE)):
                self.fluidsynth.noteoff(channel, note)

    def fluid_render(self,
        midi: Path,
        soundfont: Path=None,
        output: Path=None
    ) -> Path:
        if not self.fluidsynth:
            return

        # Get temporary cached file
        if output is None:
            midi_hash = hashlib.md5(BrokenPath.get(midi).read_bytes()).hexdigest()
            output = Path(tempfile.gettempdir())/f"ShaderFlow-Midi2Audio-{midi_hash}.wav"

        import midi2audio
        with Halo(log.info(f"Rendering FluidSynth Midi ({midi}) → ({output})")):
            midi2audio.FluidSynth(soundfont).midi_to_audio(midi, output)

        # Normalize audio with FFmpeg
        normalized = output.with_suffix(".aac")
        with Halo(log.info(f"Normalizing Audio ({output}) → ({normalized})")):
            (BrokenFFmpeg()
                .quiet()
                .input(output)
                .filter("loudnorm")
                .aac()
                .output(normalized)
            ).run()

        return BrokenPath.get(normalized)

name

name: str = 'iPiano'

Texture name prefixes for this Module

tempo

tempo: deque[tuple[Seconds, BPM]] = Factory(deque)

List of tempo changes at (seconds, bpm)

keys_texture

keys_texture: ShaderTexture = None

Velocities texture, X is MIDI index, Y is Velocity, size (MAX_NOTE, 1)

channel_texture

channel_texture: ShaderTexture = None

Channel being played texture, X is MIDI index, Y is Channel, size (MAX_NOTE, 1)

roll_texture

roll_texture: ShaderTexture = None

Piano roll'ling notes main texture'. The X coordinate is the MIDI index, pixels contains data (start, end, channel, velocity), of each playing key on the Y. Size (MAX_ROLLING, MAX_NOTE)

time_offset

time_offset: Seconds = 0

Offset the notes being played search from the current time

roll_time

roll_time: Seconds = 2

How long the notes are visible

height

height: float = 0.275

Height of the piano in the shader (0-1)

black_ratio

black_ratio: float = 0.6

How long are black keys compared to white keys

global_minimum_note

global_minimum_note: int = MAX_NOTE

The lowest note in the loaded notes

global_maximum_note

global_maximum_note: int = 0

The highest note in the loaded notes

extra_keys

extra_keys: int = 6

Display the dynamic range plus this many keys on each side

lookahead

lookahead: Seconds = 2

Lookup notes in (roll_time + this) for setting the dynamic ranges

release_before_end

release_before_end: Seconds = 0.03

Workaround for the transition between close/glued to be perceived

key_press_dynamics

key_press_dynamics: DynamicNumber = Factory(
    lambda: DynamicNumber(
        value=numpy.zeros(MAX_NOTE, dtype=numpy.float32),
        frequency=4,
        zeta=0.4,
        response=0,
        precision=0,
    )
)

note_range_dynamics

note_range_dynamics: DynamicNumber = Factory(
    lambda: DynamicNumber(
        value=numpy.zeros(2, dtype=numpy.float32),
        frequency=0.05,
        zeta=1 / 2**0.5,
        response=0,
    )
)

tree

tree: dict[int, dict[int, deque[BrokenPianoNote]]] = (
    Factory(dict)
)

Internal data structure for storing the notes

lookup_time

lookup_time: Seconds

The full lookup time we should care for future notes (rolling+future range)

build

build()
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Piano.py
93
94
95
96
97
def build(self):
    self.keys_texture    = ShaderTexture(scene=self.scene, name=f"{self.name}Keys").from_numpy(self._empty_keys())
    self.channel_texture = ShaderTexture(scene=self.scene, name=f"{self.name}Chan").from_numpy(self._empty_keys())
    self.roll_texture    = ShaderTexture(scene=self.scene, name=f"{self.name}Roll").from_numpy(self._empty_roll())
    self.tempo_texture   = ShaderTexture(scene=self.scene, name=f"{self.name}Tempo").from_numpy(numpy.zeros((100, 1, 2), numpy.float32))

clear

clear()
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Piano.py
111
112
def clear(self):
    self.tree.clear()

add_note

add_note(note: Optional[BrokenPianoNote]) -> None
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Piano.py
114
115
116
117
118
119
def add_note(self, note: Optional[BrokenPianoNote]) -> None:
    if note is None:
        return
    for index in self._ranges(note.start, note.end):
        self.tree.setdefault(note.note, dict()).setdefault(index, deque()).append(note)
    self.update_global_ranges(note.note)

notes

notes: Iterable[BrokenPianoNote]

duration

duration: float

__iter__

__iter__() -> Iterable[BrokenPianoNote]
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Piano.py
131
132
def __iter__(self) -> Iterable[BrokenPianoNote]:
    return self.notes

notes_between

notes_between(
    index: int, start: Seconds, end: Seconds
) -> Iterable[BrokenPianoNote]
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Piano.py
134
135
136
137
138
139
140
141
142
143
def notes_between(self, index: int, start: Seconds, end: Seconds) -> Iterable[BrokenPianoNote]:
    exists = set()
    for other in self._ranges(start, end):
        for note in self.tree.get(index, dict()).get(other, deque()):
            if (note.start > end):
                continue
            if (id(note) in exists):
                continue
            exists.add(id(note))
            yield note

update_global_ranges

update_global_ranges(note: int) -> None
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Piano.py
145
146
147
def update_global_ranges(self, note: int) -> None:
    self.global_minimum_note = min(self.global_minimum_note, note)
    self.global_maximum_note = max(self.global_maximum_note, note)

maximum_velocity

maximum_velocity: Optional[int]

minimum_velocity

minimum_velocity: Optional[int]

normalize_velocities

normalize_velocities(
    minimum: int = 100, maximum: int = 100
) -> None
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Piano.py
157
158
159
160
161
162
163
164
165
166
167
def normalize_velocities(self, minimum: int=100, maximum: int=100) -> None:
    ma, mi = (self.maximum_velocity, self.minimum_velocity)

    # Safe against (minimum-maximum=0)
    def new(velocity: int) -> int:
        if (ma != mi):
            int((velocity - mi)/(ma - mi)*(maximum - minimum) + minimum)
        return int((maximum + minimum) / 2)

    for note in self.notes:
        note.velocity = new(note.velocity)

load_midi

load_midi(path: Path)
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Piano.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def load_midi(self, path: Path):
    import pretty_midi

    if not (path := BrokenPath.get(path)).exists():
        self.log_warning(f"Input Midi file not found ({path})")
        return

    with Halo(log.info(f"Loading Midi file at ({path})")):
        midi = pretty_midi.PrettyMIDI(str(path))
        for channel, instrument in enumerate(midi.instruments):
            if instrument.is_drum:
                pass
            for note in instrument.notes:
                self.add_note(BrokenPianoNote(
                    note=note.pitch,
                    start=note.start,
                    end=note.end,
                    channel=channel,
                    velocity=note.velocity,
                ))
        # Add tempo changes
        for when, tempo in zip(*midi.get_tempo_changes()):
            self.tempo.append((when, tempo))

    self.tempo_texture.clear()

    for offset, (when, tempo) in enumerate(self.tempo):
        self.tempo_texture.write(data=struct.pack("ff", when, tempo), viewport=(0, offset, 1, 1))

update

update()
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Piano.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def update(self):

    # Utilities and trackers
    time = (self.scene.time + self.time_offset)
    upcoming = set()

    # # Get and update pressed keys
    self.key_press_dynamics.target.fill(0)
    roll = self._empty_roll()

    # Channel '-1' means the note is not being played !
    channels = (self._empty_keys() - 1)

    # Optimization: No need to check for the entire range 😉
    for midi in range(self.global_minimum_note, self.global_maximum_note+1):
        simultaneous = 0

        for note in self.notes_between(midi, time, time+self.lookup_time):
            upcoming.add(midi)

            # Ignore notes out of the viewport
            if (note.start >= time+self.roll_time):
                continue

            # Build a 2D Grid of the piano keys being played
            # • Coordinate: (Note, #offset) @ (Start, End, Channel, Velocity)
            if (simultaneous < MAX_ROLLING):
                roll[note.note, simultaneous] = (note.start, note.end, note.channel, note.velocity)
                simultaneous += 1

            # Skip non-playing notes
            if not (note.start <= time <= note.end):
                continue

            # Workaround: Don't play the full note, so close notes velocities are perceived twice
            _note_too_small = (note.end - note.start) < self.release_before_end
            _shorter_note = (time < (note.end - self.release_before_end))

            if (_shorter_note or _note_too_small):
                self.key_press_dynamics.target[midi] = note.velocity

            # Either way, the channel must be colored
            channels[0][midi] = note.channel

            # Find empty slots or notes that will end soon, replace and play
            other = self._playing_matrix[midi][note.channel]
            if (other is None) or (other.end > note.end):
                play_velocity = int(128*((note.velocity/128)**0.5))
                self.fluid_key_down(midi, play_velocity, note.channel)
                self._playing_matrix[midi][note.channel] = note

        # Find notes that are not being played
        for channel in range(MAX_CHANNELS * self.scene.realtime):
            if (other := self._playing_matrix[midi][channel]) and (other.end < time):
                self._playing_matrix[midi][channel] = None
                self.fluid_key_up(midi, other.channel)

    # Dynamic zoom velocity based on future lookup
    self.note_range_dynamics.frequency = 0.5/self.lookup_time

    # Set dynamic note range to the globals on the start
    if sum(self.note_range_dynamics.value) == 0:
        self.note_range_dynamics.value[:] = (self.global_minimum_note, self.global_maximum_note)

    # Set new targets for dynamic keys
    self.note_range_dynamics.target[:] = (
        min(upcoming, default=self.global_minimum_note),
        max(upcoming, default=self.global_maximum_note)
    )

    # Write to keys textures
    self.note_range_dynamics.next(dt=abs(self.scene.dt))
    self.key_press_dynamics.next(dt=abs(self.scene.dt))
    self.keys_texture.write(data=self.key_press_dynamics.value)
    self.roll_texture.write(data=roll)
    self.channel_texture.write(data=channels)

pipeline

pipeline() -> Iterable[ShaderVariable]
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Piano.py
280
281
282
283
284
285
286
287
288
def pipeline(self) -> Iterable[ShaderVariable]:
    yield Uniform("int",   f"{self.name}GlobalMin",  self.global_minimum_note)
    yield Uniform("int",   f"{self.name}GlobalMax",  self.global_maximum_note)
    yield Uniform("vec2",  f"{self.name}Dynamic",    self.note_range_dynamics.value)
    yield Uniform("float", f"{self.name}RollTime",   self.roll_time)
    yield Uniform("float", f"{self.name}Extra",      self.extra_keys)
    yield Uniform("float", f"{self.name}Height",     self.height)
    yield Uniform("int",   f"{self.name}Limit",      MAX_ROLLING)
    yield Uniform("float", f"{self.name}BlackRatio", self.black_ratio)

fluidsynth

fluidsynth: Any = None

soundfont

soundfont: Any = None

fluid_load

fluid_load(
    sf2: Path,
    driver: str = (
        "pulseaudio" if BrokenPlatform.OnLinux else None
    ),
) -> None
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Piano.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def fluid_load(self, sf2: Path, driver: str=("pulseaudio" if BrokenPlatform.OnLinux else None)) -> None:
    if not (sf2 := BrokenPath.get(sf2)).exists():
        self.log_warning(f"Couldn't load SoundFont from path ({sf2}), will not have Real Time MIDI Audio")
        return

    # Download FluidSynth for Windows
    if BrokenPlatform.OnWindows:
        FLUIDSYNTH = "https://github.com/FluidSynth/fluidsynth/releases/download/v2.3.4/fluidsynth-2.3.4-win10-x64.zip"
        BrokenPath.add_to_path(BrokenPath.extract(BrokenPath.download(FLUIDSYNTH), BROKEN.DIRECTORIES.EXTERNALS), recurse=True)
    elif BrokenPlatform.OnMacOS:
        if not shutil.which("fluidsynth"):
            shell("brew", "install", "fluidsynth")
    elif BrokenPlatform.OnLinux:
        self.log_warning("(Linux) Please install FluidSynth in your Package Manager if needed")

    import fluidsynth
    self.fluidsynth = fluidsynth.Synth()
    with Halo(log.info(f"Loading FluidSynth SoundFont ({sf2.name})")):
        self.soundfont = self.fluidsynth.sfload(str(sf2))
    self.fluidsynth.set_reverb(1, 1, 80, 1)
    self.fluidsynth.start(driver=driver)
    for channel in range(MAX_CHANNELS):
        self.fluid_select(channel, 0, 0)

fluid_select

fluid_select(
    channel: int = 0, bank: int = 0, preset: int = 0
) -> None
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Piano.py
319
320
321
def fluid_select(self, channel: int=0, bank: int=0, preset: int=0) -> None:
    if self.fluidsynth and self.scene.realtime:
        self.fluidsynth.program_select(channel, self.soundfont, bank, preset)

fluid_key_down

fluid_key_down(
    note: int, velocity: int = 127, channel: int = 0
) -> None
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Piano.py
323
324
325
def fluid_key_down(self, note: int, velocity: int=127, channel: int=0) -> None:
    if self.fluidsynth and self.scene.realtime:
        self.fluidsynth.noteon(channel, note, velocity)

fluid_key_up

fluid_key_up(note: int, channel: int = 0) -> None
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Piano.py
327
328
329
def fluid_key_up(self, note: int, channel: int=0) -> None:
    if self.fluidsynth and self.scene.realtime:
        self.fluidsynth.noteoff(channel, note)

fluid_all_notes_off

fluid_all_notes_off() -> None
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Piano.py
331
332
333
334
def fluid_all_notes_off(self) -> None:
    if self.fluidsynth and self.scene.realtime:
        for channel, note in itertools.product(range(MAX_CHANNELS), range(MAX_NOTE)):
            self.fluidsynth.noteoff(channel, note)

fluid_render

fluid_render(
    midi: Path, soundfont: Path = None, output: Path = None
) -> Path
Source code in Projects/ShaderFlow/ShaderFlow/Modules/Piano.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def fluid_render(self,
    midi: Path,
    soundfont: Path=None,
    output: Path=None
) -> Path:
    if not self.fluidsynth:
        return

    # Get temporary cached file
    if output is None:
        midi_hash = hashlib.md5(BrokenPath.get(midi).read_bytes()).hexdigest()
        output = Path(tempfile.gettempdir())/f"ShaderFlow-Midi2Audio-{midi_hash}.wav"

    import midi2audio
    with Halo(log.info(f"Rendering FluidSynth Midi ({midi}) → ({output})")):
        midi2audio.FluidSynth(soundfont).midi_to_audio(midi, output)

    # Normalize audio with FFmpeg
    normalized = output.with_suffix(".aac")
    with Halo(log.info(f"Normalizing Audio ({output}) → ({normalized})")):
        (BrokenFFmpeg()
            .quiet()
            .input(output)
            .filter("loudnorm")
            .aac()
            .output(normalized)
        ).run()

    return BrokenPath.get(normalized)