Skip to content

File: ShaderFlow/Texture.py

ShaderFlow.Texture

numpy2mgltype

numpy2mgltype(type: Union[numpy.dtype, str]) -> str
Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
17
18
19
20
21
22
23
24
25
26
27
def numpy2mgltype(type: Union[numpy.dtype, str]) -> str:
    if isinstance(type, str):
        return type
    if isinstance(type, numpy.dtype):
        type = type.type
    return {
        numpy.uint8:   "f1",
        numpy.uint16:  "u2",
        numpy.float16: "f2",
        numpy.float32: "f4",
    }.get(type)

TextureFilter

Bases: BrokenEnum

Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
30
31
32
33
class TextureFilter(BrokenEnum):
    # Fixme: Disallow bad combinations of filter and types
    Nearest = "nearest"
    Linear  = "linear"

Nearest

Nearest = 'nearest'

Linear

Linear = 'linear'

Anisotropy

Bases: BrokenEnum

Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
36
37
38
39
40
41
class Anisotropy(BrokenEnum):
    x1  = 1
    x2  = 2
    x4  = 4
    x8  = 8
    x16 = 16

x1

x1 = 1

x2

x2 = 2

x4

x4 = 4

x8

x8 = 8

x16

x16 = 16

TextureBox

Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
44
45
46
47
48
49
50
51
52
53
54
@define
class TextureBox:
    texture: moderngl.Texture = None
    fbo:     moderngl.Framebuffer = None
    data:    bytes = field(default=None, repr=False)
    clear:   bool  = False
    empty:   bool  = True

    def release(self) -> None:
        (self.texture or Nothing()).release()
        (self.fbo     or Nothing()).release()

texture

texture: moderngl.Texture = None

fbo

fbo: moderngl.Framebuffer = None

data

data: bytes = field(default=None, repr=False)

clear

clear: bool = False

empty

empty: bool = True

release

release() -> None
Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
52
53
54
def release(self) -> None:
    (self.texture or Nothing()).release()
    (self.fbo     or Nothing()).release()

ShaderTexture

Bases: ShaderModule

Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
@define
class ShaderTexture(ShaderModule):
    name: str = None

    def __post__(self):
        self.make()

    # -------------------------------------------|

    def __smart__(self, attr, value, method) -> Any:
        if (converter := attr.converter):
            value = converter(value)
        if getattr(self, attr.name) != value:
            self.__setstate__({attr.name: value})
            method()
        return value

    def __apply__(self, attr, value) -> Any:
        return self.__smart__(attr, value, self.apply)

    def __make__(self, attr, value) -> Any:
        return self.__smart__(attr, value, self.make)

    # -------------------------------------------|

    final: bool = field(default=False, converter=bool)
    """Is this bound to the final FSSAA ShaderObject?"""

    track: float = field(default=0.0, converter=float, on_setattr=__make__)
    """Match the scene's resolution times this factor on this texture"""

    filter: TextureFilter = TextureFilter.Linear.field(on_setattr=__apply__)
    """The interpolation filter applied to the texture when sampling on the GPU"""

    anisotropy: Anisotropy = Anisotropy.x16.field(on_setattr=__apply__)
    """Anisotropic filter level, improves texture quality at oblique angles"""

    mipmaps: bool = field(default=False, converter=bool, on_setattr=__apply__)
    """Compute mipmaps for this texture, improves quality at large distances"""

    repeat_x: bool = field(default=True, converter=bool, on_setattr=__apply__)
    """Should the texture repeat on the X axis when out of bounds or clamp"""

    repeat_y: bool = field(default=True, converter=bool, on_setattr=__apply__)
    """Should the texture repeat on the Y axis when out of bounds or clamp"""

    def repeat(self, value: bool) -> Self:
        """Syntatic sugar for setting both repeat_x and repeat_y"""
        self.repeat_x = self.repeat_y = bool(value)
        return self.apply()

    @property
    def moderngl_filter(self) -> int:
        return dict(
            linear=moderngl.LINEAR,
            nearest=moderngl.NEAREST,
            linear_mipmap=moderngl.LINEAR_MIPMAP_LINEAR,
            nearest_mipmap=moderngl.NEAREST_MIPMAP_NEAREST,
        ).get(self.filter.value + ("_mipmap"*self.mipmaps))

    # -------------------------------------------|

    # Width

    _width: int = field(default=1, converter=int)

    @property
    def width(self) -> int:
        if not self.track:
            return self._width
        return self.resolution[0]

    @width.setter
    def width(self, value: int):
        if (self._width == value):
            return
        self._width = value
        self.make()

    # Height

    _height: int = field(default=1, converter=int)

    @property
    def height(self) -> int:
        if not self.track:
            return self._height
        return self.resolution[1]

    @height.setter
    def height(self, value: int):
        if (self._height == value):
            return
        self._height = value
        self.make()

    components: int = field(default=4, converter=int, on_setattr=__make__)
    """Number of color channels per pixel (1 Grayscale, 2 RG, 3 RGB, 4 RGBA)"""

    dtype: numpy.dtype = field(
        default=numpy.uint8,
        converter=numpy.dtype,
        on_setattr=__make__)
    """Data type of the texture for each pixel channel"""

    @property
    def resolution(self) -> tuple[int, int]:
        if not self.track:
            return (self._width, self._height)
        def scale(data):
            return tuple(max(1, int(x*self.track)) for x in data)
        if self.final:
            return scale(self.scene.resolution)
        return scale(self.scene.render_resolution)

    @resolution.setter
    def resolution(self, value: tuple[int, int]):
        if not self.track:
            self.width, self.height = value

    @property
    def size(self) -> tuple[int, int]:
        return self.resolution

    @size.setter
    def size(self, value: tuple[int, int]):
        self.resolution = value

    @property
    def aspect_ratio(self) -> float:
        return self.width/(self.height or 1)

    # Bytes size and Zero filling

    @property
    def zeros(self) -> numpy.ndarray:
        return numpy.zeros((*self.size, self.components), dtype=self.dtype)

    @property
    def bytes_per_pixel(self) -> int:
        return (self.dtype.itemsize * self.components)

    @property
    def size_t(self) -> int:
        """Size of the texture data in bytes"""
        return (self.width * self.height * self.bytes_per_pixel)

    def new_buffer(self) -> moderngl.Buffer:
        """Make a new buffer with the current size of the texture"""
        return self.scene.opengl.buffer(reserve=self.size_t)

    # -------------------------------------------|

    matrix: deque[deque[TextureBox]] = Factory(deque)
    """Matrix of previous frames (temporal) and their layers (layers)"""

    temporal: int = field(default=1, converter=int, on_setattr=__make__)
    """Number of previous frames to be stored"""

    layers: int = field(default=1, converter=int, on_setattr=__make__)
    """Number of layers to be stored, useful in single-shader multipass"""

    @property
    def boxes(self) -> Iterable[tuple[int, int, TextureBox]]:
        for t, temporal in enumerate(self.matrix):
            for b, box in enumerate(temporal):
                yield (t, b, box)

    def row(self, n: int=0) -> Iterable[TextureBox]:
        yield from self.matrix[n]

    def make(self) -> Self:
        if (max(self.size) > (limit := self.scene.opengl.info['GL_MAX_VIEWPORT_DIMS'][0])):
            raise Exception(f"Texture size too large for this OpenGL context: {self.size} > {limit}")

        # Populate the matrix with current size
        for row in pop_fill(self.matrix, deque, self.temporal):
            pop_fill(row, TextureBox, self.layers)

        # Recreate texture boxes
        for (_, _, box) in self.boxes:
            box.release()
            box.texture = self.scene.opengl.texture(
                components=self.components,
                dtype=numpy2mgltype(self.dtype),
                size=self.size)
            box.fbo = self.scene.opengl.framebuffer(
                color_attachments=[box.texture])

            # Rewrite previous data if same size
            if box.data and (self.size_t == len(box.data)):
                box.texture.write(box.data)

        return self.apply()

    def apply(self) -> Self:
        """Apply filters and flags to all textures"""
        for (_, _, box) in self.boxes:
            if self.mipmaps:
                box.texture.build_mipmaps()
            box.texture.filter     = (self.moderngl_filter, self.moderngl_filter)
            box.texture.anisotropy = self.anisotropy.value
            box.texture.repeat_x   = self.repeat_x
            box.texture.repeat_y   = self.repeat_y
        return self

    def destroy(self) -> None:
        for (_, _, box) in self.boxes:
            box.release()

    def get_box(self, temporal: int=0, layer: int=-1) -> Optional[TextureBox]:
        """Note: Points to the current final box"""
        return list_get(list_get(self.matrix, temporal), layer)

    @property
    def fbo(self) -> moderngl.Framebuffer:
        """Final and most Recent FBO of this Texture"""
        if (self.final and self.scene.realtime):
            return self.scene.window.fbo
        return self.get_box().fbo

    @property
    def texture(self) -> moderngl.Texture:
        """Final and most Recent Texture of this Texture"""
        return self.get_box().texture

    def roll(self, n: int=1) -> Self:
        """Rotate the temporal layers by $n times"""
        self.matrix.rotate(n)
        return self

    # -------------------------------------------|
    # Input and Output

    def write(self,
        data: bytes=None,
        *,
        temporal: int=0,
        layer: int=-1,
        viewport: tuple[int, int, int, int]=None,
    ) -> Self:
        box = self.get_box(temporal, layer)
        box.texture.write(data, viewport=viewport)
        if (not viewport):
            box.data = bytes(data)
        box.empty = False
        return self

    def from_numpy(self, data: numpy.ndarray) -> Self:
        unpack = list(data.shape)
        if len(unpack) == 2:
            unpack.append(1)
        self._height, self._width, self.components = unpack
        self.dtype = data.dtype
        self.make()
        self.write(numpy.flip(data, axis=0).tobytes())
        return self

    def from_image(self, image: LoadableImage) -> Self:
        return self.from_numpy(numpy.array(LoadImage(image)))

    def clear(self, temporal: int=0, layer: int=-1) -> Self:
        return self.write(self.zeros, temporal=temporal, layer=layer)

    def is_empty(self, temporal: int=0, layer: int=-1) -> bool:
        return self.get_box(temporal, layer).empty

    # Todo: Sampling functions with numpy index ranges

    # -------------------------------------------|
    # Module

    def _coord2name(self, temporal: int, layer: int) -> str:
        return f"{self.name}{temporal}x{layer}"

    def defines(self) -> Iterable[str]:
        if not self.name:
            return

        # Define last frames as plain name (iTex0x(-1) -> iTex, iTex1x(-1) -> iTex1)
        for temporal in range(self.temporal):
            yield f"#define {self.name}{temporal or ''} {self.name}{temporal}x{self.layers-1}"

        # Function to sample a dynamic temporal, layer
        yield f"\nvec4 {self.name}Texture(int temporal, int layer, vec2 astuv) {{"
        yield "    if (false) return vec4(0);"
        for temporal in range(self.temporal):
            for layer in range(self.layers):
                yield f"    else if (temporal == {temporal} && layer == {layer}) return texture({self._coord2name(temporal, layer)}, astuv);"
        yield "    else {return vec4(0);}"
        yield "}"

    def handle(self, message: ShaderMessage):
        if self.track and isinstance(message, ShaderMessage.Shader.RecreateTextures):
            self.make()

    def pipeline(self) -> Iterable[ShaderVariable]:
        if not self.name:
            return
        yield Uniform("int", "iLayer", None)
        yield Uniform("vec2",  f"{self.name}Size",     self.size)
        # yield Uniform("float", f"{self.name}AspectRatio", self.aspect_ratio)
        yield Uniform("int",   f"{self.name}Layers",   self.layers)
        yield Uniform("int",   f"{self.name}Temporal", self.temporal)
        for (t, b, box) in self.boxes:
            yield Uniform("sampler2D", self._coord2name(t, b), box.texture)

name

name: str = None

__post__

__post__()
Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
61
62
def __post__(self):
    self.make()

__smart__

__smart__(attr, value, method) -> Any
Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
66
67
68
69
70
71
72
def __smart__(self, attr, value, method) -> Any:
    if (converter := attr.converter):
        value = converter(value)
    if getattr(self, attr.name) != value:
        self.__setstate__({attr.name: value})
        method()
    return value

__apply__

__apply__(attr, value) -> Any
Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
74
75
def __apply__(self, attr, value) -> Any:
    return self.__smart__(attr, value, self.apply)

__make__

__make__(attr, value) -> Any
Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
77
78
def __make__(self, attr, value) -> Any:
    return self.__smart__(attr, value, self.make)

final

final: bool = field(default=False, converter=bool)

Is this bound to the final FSSAA ShaderObject?

track

track: float = field(
    default=0.0, converter=float, on_setattr=__make__
)

Match the scene's resolution times this factor on this texture

filter

filter: TextureFilter = TextureFilter.Linear.field(
    on_setattr=__apply__
)

The interpolation filter applied to the texture when sampling on the GPU

anisotropy

anisotropy: Anisotropy = Anisotropy.x16.field(
    on_setattr=__apply__
)

Anisotropic filter level, improves texture quality at oblique angles

mipmaps

mipmaps: bool = field(
    default=False, converter=bool, on_setattr=__apply__
)

Compute mipmaps for this texture, improves quality at large distances

repeat_x

repeat_x: bool = field(
    default=True, converter=bool, on_setattr=__apply__
)

Should the texture repeat on the X axis when out of bounds or clamp

repeat_y

repeat_y: bool = field(
    default=True, converter=bool, on_setattr=__apply__
)

Should the texture repeat on the Y axis when out of bounds or clamp

repeat

repeat(value: bool) -> Self

Syntatic sugar for setting both repeat_x and repeat_y

Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
103
104
105
106
def repeat(self, value: bool) -> Self:
    """Syntatic sugar for setting both repeat_x and repeat_y"""
    self.repeat_x = self.repeat_y = bool(value)
    return self.apply()

moderngl_filter

moderngl_filter: int

width

width: int

height

height: int

components

components: int = field(
    default=4, converter=int, on_setattr=__make__
)

Number of color channels per pixel (1 Grayscale, 2 RG, 3 RGB, 4 RGBA)

dtype

dtype: numpy.dtype = field(
    default=numpy.uint8,
    converter=numpy.dtype,
    on_setattr=__make__,
)

Data type of the texture for each pixel channel

resolution

resolution: tuple[int, int]

size

size: tuple[int, int]

aspect_ratio

aspect_ratio: float

zeros

zeros: numpy.ndarray

bytes_per_pixel

bytes_per_pixel: int

size_t

size_t: int

Size of the texture data in bytes

new_buffer

new_buffer() -> moderngl.Buffer

Make a new buffer with the current size of the texture

Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
204
205
206
def new_buffer(self) -> moderngl.Buffer:
    """Make a new buffer with the current size of the texture"""
    return self.scene.opengl.buffer(reserve=self.size_t)

matrix

matrix: deque[deque[TextureBox]] = Factory(deque)

Matrix of previous frames (temporal) and their layers (layers)

temporal

temporal: int = field(
    default=1, converter=int, on_setattr=__make__
)

Number of previous frames to be stored

layers

layers: int = field(
    default=1, converter=int, on_setattr=__make__
)

Number of layers to be stored, useful in single-shader multipass

boxes

boxes: Iterable[tuple[int, int, TextureBox]]

row

row(n: int = 0) -> Iterable[TextureBox]
Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
225
226
def row(self, n: int=0) -> Iterable[TextureBox]:
    yield from self.matrix[n]

make

make() -> Self
Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def make(self) -> Self:
    if (max(self.size) > (limit := self.scene.opengl.info['GL_MAX_VIEWPORT_DIMS'][0])):
        raise Exception(f"Texture size too large for this OpenGL context: {self.size} > {limit}")

    # Populate the matrix with current size
    for row in pop_fill(self.matrix, deque, self.temporal):
        pop_fill(row, TextureBox, self.layers)

    # Recreate texture boxes
    for (_, _, box) in self.boxes:
        box.release()
        box.texture = self.scene.opengl.texture(
            components=self.components,
            dtype=numpy2mgltype(self.dtype),
            size=self.size)
        box.fbo = self.scene.opengl.framebuffer(
            color_attachments=[box.texture])

        # Rewrite previous data if same size
        if box.data and (self.size_t == len(box.data)):
            box.texture.write(box.data)

    return self.apply()

apply

apply() -> Self

Apply filters and flags to all textures

Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
252
253
254
255
256
257
258
259
260
261
def apply(self) -> Self:
    """Apply filters and flags to all textures"""
    for (_, _, box) in self.boxes:
        if self.mipmaps:
            box.texture.build_mipmaps()
        box.texture.filter     = (self.moderngl_filter, self.moderngl_filter)
        box.texture.anisotropy = self.anisotropy.value
        box.texture.repeat_x   = self.repeat_x
        box.texture.repeat_y   = self.repeat_y
    return self

destroy

destroy() -> None
Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
263
264
265
def destroy(self) -> None:
    for (_, _, box) in self.boxes:
        box.release()

get_box

get_box(
    temporal: int = 0, layer: int = -1
) -> Optional[TextureBox]

Note: Points to the current final box

Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
267
268
269
def get_box(self, temporal: int=0, layer: int=-1) -> Optional[TextureBox]:
    """Note: Points to the current final box"""
    return list_get(list_get(self.matrix, temporal), layer)

fbo

fbo: moderngl.Framebuffer

Final and most Recent FBO of this Texture

texture

texture: moderngl.Texture

Final and most Recent Texture of this Texture

roll

roll(n: int = 1) -> Self

Rotate the temporal layers by $n times

Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
283
284
285
286
def roll(self, n: int=1) -> Self:
    """Rotate the temporal layers by $n times"""
    self.matrix.rotate(n)
    return self

write

write(
    data: bytes = None,
    *,
    temporal: int = 0,
    layer: int = -1,
    viewport: tuple[int, int, int, int] = None
) -> Self
Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
291
292
293
294
295
296
297
298
299
300
301
302
303
def write(self,
    data: bytes=None,
    *,
    temporal: int=0,
    layer: int=-1,
    viewport: tuple[int, int, int, int]=None,
) -> Self:
    box = self.get_box(temporal, layer)
    box.texture.write(data, viewport=viewport)
    if (not viewport):
        box.data = bytes(data)
    box.empty = False
    return self

from_numpy

from_numpy(data: numpy.ndarray) -> Self
Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
305
306
307
308
309
310
311
312
313
def from_numpy(self, data: numpy.ndarray) -> Self:
    unpack = list(data.shape)
    if len(unpack) == 2:
        unpack.append(1)
    self._height, self._width, self.components = unpack
    self.dtype = data.dtype
    self.make()
    self.write(numpy.flip(data, axis=0).tobytes())
    return self

from_image

from_image(image: LoadableImage) -> Self
Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
315
316
def from_image(self, image: LoadableImage) -> Self:
    return self.from_numpy(numpy.array(LoadImage(image)))

clear

clear(temporal: int = 0, layer: int = -1) -> Self
Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
318
319
def clear(self, temporal: int=0, layer: int=-1) -> Self:
    return self.write(self.zeros, temporal=temporal, layer=layer)

is_empty

is_empty(temporal: int = 0, layer: int = -1) -> bool
Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
321
322
def is_empty(self, temporal: int=0, layer: int=-1) -> bool:
    return self.get_box(temporal, layer).empty

defines

defines() -> Iterable[str]
Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def defines(self) -> Iterable[str]:
    if not self.name:
        return

    # Define last frames as plain name (iTex0x(-1) -> iTex, iTex1x(-1) -> iTex1)
    for temporal in range(self.temporal):
        yield f"#define {self.name}{temporal or ''} {self.name}{temporal}x{self.layers-1}"

    # Function to sample a dynamic temporal, layer
    yield f"\nvec4 {self.name}Texture(int temporal, int layer, vec2 astuv) {{"
    yield "    if (false) return vec4(0);"
    for temporal in range(self.temporal):
        for layer in range(self.layers):
            yield f"    else if (temporal == {temporal} && layer == {layer}) return texture({self._coord2name(temporal, layer)}, astuv);"
    yield "    else {return vec4(0);}"
    yield "}"

handle

handle(message: ShaderMessage)
Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
349
350
351
def handle(self, message: ShaderMessage):
    if self.track and isinstance(message, ShaderMessage.Shader.RecreateTextures):
        self.make()

pipeline

pipeline() -> Iterable[ShaderVariable]
Source code in Projects/ShaderFlow/ShaderFlow/Texture.py
353
354
355
356
357
358
359
360
361
362
def pipeline(self) -> Iterable[ShaderVariable]:
    if not self.name:
        return
    yield Uniform("int", "iLayer", None)
    yield Uniform("vec2",  f"{self.name}Size",     self.size)
    # yield Uniform("float", f"{self.name}AspectRatio", self.aspect_ratio)
    yield Uniform("int",   f"{self.name}Layers",   self.layers)
    yield Uniform("int",   f"{self.name}Temporal", self.temporal)
    for (t, b, box) in self.boxes:
        yield Uniform("sampler2D", self._coord2name(t, b), box.texture)