Skip to content

Base

Warning

Better Docstrings and Formatting are being worked on for the Code References

Base

Base.inverse(x: float) -> float

Source code in Broken/Base.py
Python
62
63
def inverse(x: float) -> float:
    return (1/x)

Base.clamp(value: float, low: float = 0, high: float = 1) -> float

Source code in Broken/Base.py
Python
65
66
def clamp(value: float, low: float=0, high: float=1) -> float:
    return max(low, min(value, high))

Base.Maybe(call, when, **args)

Source code in Broken/Base.py
Python
68
69
70
def Maybe(call, when, **args):
    if when:
        return call(**args)

Base.LazyCounter()

Source code in Broken/Base.py
Python
72
73
74
75
76
@contextlib.contextmanager
def LazyCounter():
    start = time.perf_counter()
    yield None
    log.info(f"Took: {time.perf_counter() - start:.4f} seconds")

Base.flatten(*stuff: Union[Any, List[Any]], truthy: bool = True) -> List[Any]

Flatten nested list like iterables to a 1D list [[a, b], c, [d, e, (None, 3)], [g, h]] -> [a, b, c, d, e, None, 3, g, h]

Source code in Broken/Base.py
Python
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def flatten(*stuff: Union[Any, List[Any]], truthy: bool=True) -> List[Any]:
    """
    Flatten nested list like iterables to a 1D list
    [[a, b], c, [d, e, (None, 3)], [g, h]] -> [a, b, c, d, e, None, 3, g, h]
    """
    # Fixme: Add allow_none argument
    iterables = (list, tuple, Generator)
    def flatten(stuff):
        return [
            item for subitem in stuff for item in
            (flatten(subitem) if isinstance(subitem, iterables) else [subitem])
            if (not truthy) or (truthy and item)
        ]
    return flatten(stuff)

Base.easy_multiple(method: Callable) -> Callable

Returns first element if the method returns len 1 else list of outputs

Source code in Broken/Base.py
Python
 93
 94
 95
 96
 97
 98
 99
100
def easy_multiple(method: Callable) -> Callable:
    """Returns first element if the method returns len 1 else list of outputs"""
    def wrapper(*args, **kwargs):
        result = list(method(*args, **kwargs))
        if len(result) == 1:
            return result[0]
        return result
    return wrapper

Base.apply(callback: Callable, iterable: Iterable[Any]) -> List[Any]

map(f, x) is lazy, this consumes the generator returning a list

Source code in Broken/Base.py
Python
102
103
104
def apply(callback: Callable, iterable: Iterable[Any]) -> List[Any]:
    """map(f, x) is lazy, this consumes the generator returning a list"""
    return list(map(callback, iterable))

Base.denum(item: Any) -> Any

Source code in Broken/Base.py
Python
106
107
def denum(item: Any) -> Any:
    return (item.value if isinstance(item, Enum) else item)

Base.dunder(name: str) -> bool staticmethod

Source code in Broken/Base.py
Python
109
110
111
@staticmethod
def dunder(name: str) -> bool:
    return name.startswith("__") and name.endswith("__")

Base.Ignore

A class that does nothing. No-operation faster Mock

Source code in Broken/Base.py
Python
113
114
115
116
117
118
119
120
class Ignore:
    """A class that does nothing. No-operation faster Mock"""
    def __nop__(self, *args, **kwargs) -> Self:
        return self
    def __call__(self, *args, **kwargs) -> Self:
        return self
    def __getattr__(self, _):
        return self.__nop__

__nop__(*args, **kwargs) -> Self

Source code in Broken/Base.py
Python
115
116
def __nop__(self, *args, **kwargs) -> Self:
    return self

__call__(*args, **kwargs) -> Self

Source code in Broken/Base.py
Python
117
118
def __call__(self, *args, **kwargs) -> Self:
    return self

__getattr__(_)

Source code in Broken/Base.py
Python
119
120
def __getattr__(self, _):
    return self.__nop__

Base.SameTracker

Doumo same desu

Source code in Broken/Base.py
Python
122
123
124
125
126
127
128
129
130
131
132
133
134
135
@define
class SameTracker:
    """Doumo same desu"""
    value: Any = None

    def __call__(self, value: Any) -> bool:
        """
        If a value is the same, returns True, else updates it and returns False
        • Useful on ignoring expensive calls where parameters doesn't changes
        """
        if self.value != value:
            self.value = value
            return False
        return True

value: Any = None class-attribute instance-attribute

__call__(value: Any) -> bool

If a value is the same, returns True, else updates it and returns False • Useful on ignoring expensive calls where parameters doesn't changes

Source code in Broken/Base.py
Python
127
128
129
130
131
132
133
134
135
def __call__(self, value: Any) -> bool:
    """
    If a value is the same, returns True, else updates it and returns False
    • Useful on ignoring expensive calls where parameters doesn't changes
    """
    if self.value != value:
        self.value = value
        return False
    return True

Base.shell(*args: list[Any], output: bool = False, Popen: bool = False, shell: bool = False, env: dict[str, str] = None, echo: bool = True, confirm: bool = False, do: bool = True, **kwargs) -> Union[None, str, subprocess.Popen, subprocess.CompletedProcess]

Better subprocess.* commands, all in one, yeet whatever you think it works • This is arguably the most important function in Broken 🙈

Example
Python
1
shell(["binary", "-m"], "arg1", None, "arg2", 3, output=True, echo=False, confirm=True)

Parameters:

  • `args`

    The command to run, can be a list of arguments or a list of lists of arguments, don't care

  • `output`

    Return the process's stdout as a decoded string

  • `Popen`

    Run the process with subprocess.Popen

  • `shell`

    (discouraged) Run the command with shell=True, occasionally useful

  • `echo`

    Log the command being run or not

  • `confirm`

    Ask for confirmation before running the command or not

  • `do`

    Inverse of skip, do not run the command, but log it as minor

Kwargs (subprocess. arguments): cwd: Current working directory for the command env: Environment variables for the command *: Any other keyword arguments are passed to subprocess.

Source code in Broken/Base.py
Python
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def shell(
    *args:   list[Any],
    output:  bool=False,
    Popen:   bool=False,
    shell:   bool=False,
    env:     dict[str, str]=None,
    echo:    bool=True,
    confirm: bool=False,
    do:      bool=True,
    **kwargs
) -> Union[None, str, subprocess.Popen, subprocess.CompletedProcess]:
    """
    Better subprocess.* commands, all in one, yeet whatever you think it works
    • This is arguably the most important function in Broken 🙈

    Example:
        ```python
        shell(["binary", "-m"], "arg1", None, "arg2", 3, output=True, echo=False, confirm=True)
        ```

    Args:
        `args`:    The command to run, can be a list of arguments or a list of lists of arguments, don't care
        `output`:  Return the process's stdout as a decoded string
        `Popen`:   Run the process with subprocess.Popen
        `shell`:   (discouraged) Run the command with shell=True, occasionally useful
        `echo`:    Log the command being run or not
        `confirm`: Ask for confirmation before running the command or not
        `do`:      Inverse of `skip`, do not run the command, but log it as minor

    Kwargs (subprocess.* arguments):
        `cwd`: Current working directory for the command
        `env`: Environment variables for the command
        `*`:   Any other keyword arguments are passed to subprocess.*
    """
    if output and Popen:
        raise ValueError(log.error("Cannot use (output=True) and (Popen=True) at the same time"))

    # Flatten a list, remove falsy values, convert to strings
    command = tuple(map(str, flatten(args)))

    if shell:
        log.warning("Running command with (shell=True), be careful.." + " Consider using (confirm=True)"*(not confirm))
        command = '"' + '" "'.join(command) + '"'

    # Assert command won't fail due unknown binary
    if (not shell) and (not shutil.which(command[0])):
        raise FileNotFoundError(log.error(f"Binary doesn't exist or was not found on PATH ({command[0]})"))

    # Get the current working directory
    cwd = f" @ ({kwargs.get('cwd', '') or Path.cwd()})"
    (log.info if do else log.skip)(("Running" if do else "Skipping") + f" Command {command}{cwd}", echo=echo)
    if (not do): return

    # Confirm running command or not
    if confirm and not click.confirm("• Confirm running the command above"):
        return

    # Update kwargs on the fly
    kwargs["env"] = os.environ | (env or {})
    kwargs["shell"] = shell

    # Run the command and return specified object
    if output: return subprocess.check_output(command, **kwargs).decode("utf-8")
    if Popen:  return subprocess.Popen(command, **kwargs)
    else:      return subprocess.run(command, **kwargs)

Base.transcends(method, base, generator: bool = False)

Are you tired of managing and calling super().(args, *kwargs) in your methods?

We have just the right solution for you!

Introducing transcends, the decorator that crosses your class's MRO and calls the method with the same name as the one you are decorating. It's an automatic super() !

Source code in Broken/Base.py
Python
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def transcends(method, base, generator: bool=False):
    """
    Are you tired of managing and calling super().<name>(*args, **kwargs) in your methods?
    > We have just the right solution for you!

    Introducing transcends, the decorator that crosses your class's MRO and calls the method
    with the same name as the one you are decorating. It's an automatic super() !
    """
    name = method.__name__

    def decorator(func: Callable) -> Callable:
        def get_targets(self):
            for cls in type(self).mro()[:-2]:
                if cls in (base, object):
                    continue
                if (target := cls.__dict__.get(name)):
                    yield target

        # Note: We can't have a `if generator` else the func becomes a Generator
        def yields(self, *args, **kwargs):
            for target in get_targets(self):
                yield from target(self, *args, **kwargs)
        def plain(self, *args, **kwargs):
            for target in get_targets(self):
                target(self, *args, **kwargs)

        return (yields if generator else plain)
    return decorator

Base.BrokenAttrs

Walk over an @attrs.defined class and call post on all classes in the MRO.

Warn: Must NOT define attrs_post_init in an inheriting class

Fixme: Can improve by starting on BrokenAttrs itself

Source code in Broken/Base.py
Python
235
236
237
238
239
240
241
242
243
244
245
246
247
class BrokenAttrs:
    """
    Walk over an @attrs.defined class and call __post__ on all classes in the MRO.
    # Warn: Must NOT define __attrs_post_init__ in an inheriting class
    # Fixme: Can improve by starting on BrokenAttrs itself
    """
    def __attrs_post_init__(self):
        for cls in reversed(type(self).mro()):
            if method := cls.__dict__.get("__post__"):
                method(self)

    def __post__(self) -> None:
        pass

__attrs_post_init__()

Source code in Broken/Base.py
Python
241
242
243
244
def __attrs_post_init__(self):
    for cls in reversed(type(self).mro()):
        if method := cls.__dict__.get("__post__"):
            method(self)

__post__() -> None

Source code in Broken/Base.py
Python
246
247
def __post__(self) -> None:
    pass

Base.BrokenPlatform

Host Platform information, Cross Compilation targets and some utilities

Source code in Broken/Base.py
Python
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
class BrokenPlatform:
    """
    Host Platform information, Cross Compilation targets and some utilities
    """

    # Name of the current platform - (linux, windows, macos, bsd)
    Name:         str  = platform.system().lower().replace("darwin", "macos")

    # Booleans if the current platform is the following
    OnLinux:      bool = (Name == "linux")
    OnWindows:    bool = (Name == "windows")
    OnMacOS:      bool = (Name == "macos")
    OnBSD:        bool = (Name == "bsd")

    # Platform release binaries extension and CPU architecture
    Extension:    str  = (".exe" if OnWindows else ".bin")
    Architecture: str  = (platform.machine().lower().replace("x86_64", "amd64"))

    # Family of platforms
    OnUnix:       bool = (OnLinux or OnMacOS or OnBSD)

    # Distro IDs: https://distro.readthedocs.io/en/latest/
    LinuxDistro:  str  = distro.id()

    # # Booleans if the current platform is the following

    # Ubuntu-like
    OnUbuntu:     bool = (LinuxDistro == "ubuntu")
    OnDebian:     bool = (LinuxDistro == "debian")
    OnMint:       bool = (LinuxDistro == "linuxmint")
    OnRaspberry:  bool = (LinuxDistro == "raspbian")
    OnUbuntuLike: bool = (OnUbuntu or OnDebian or OnMint or OnRaspberry)

    # Arch-like
    OnArch:       bool = (LinuxDistro == "arch")
    OnManjaro:    bool = (LinuxDistro == "manjaro")
    OnArchLike:   bool = (OnArch or OnManjaro)

    # RedHat-like
    OnFedora:     bool = (LinuxDistro == "fedora")
    OnCentOS:     bool = (LinuxDistro == "centos")
    OnRedHat:     bool = (LinuxDistro == "rhel")
    OnRedHatLike: bool = (OnFedora or OnCentOS or OnRedHat)

    # Others
    OnGentoo:     bool = (LinuxDistro == "gentoo")

    # BSD - I have never used it
    OnOpenBSD:    bool = (LinuxDistro == "openbsd")
    OnNetBSD:     bool = (LinuxDistro == "netbsd")
    OnBSDLike:    bool = (OnOpenBSD or OnNetBSD)

    class Targets(BrokenEnum):
        """List of common platforms targets for releases"""
        LinuxAMD64:   str = "linux-amd64"
        LinuxARM:     str = "linux-arm64"
        WindowsAMD64: str = "windows-amd64"
        WindowsARM:   str = "windows-arm64"
        MacosAMD64:   str = "macos-amd64"
        MacosARM:     str = "macos-arm64"

        @property
        def rust(self) -> str:
            return {
                self.LinuxAMD64:   "x86_64-unknown-linux-gnu",
                self.LinuxARM:     "aarch64-unknown-linux-gnu",
                self.WindowsAMD64: "x86_64-pc-windows-gnu",
                self.WindowsARM:   "aarch64-pc-windows-gnu",
                self.MacosAMD64:   "x86_64-apple-darwin",
                self.MacosARM:     "aarch64-apple-darwin",
            }[self]

        @property
        def extension(self) -> str:
            """Same as BrokenPlatform.Extension"""
            return ".exe" if ("windows" in self.value) else ".bin"

        @property
        def name(self) -> str:
            """Same as BrokenPlatform.Name"""
            return self.value.split("-")[0]

        @property
        def architecture(self) -> str:
            """Same as BrokenPlatform.Architecture"""
            return self.value.split("-")[1]

    CurrentTarget: str = f"{Name}-{Architecture}"

    @staticmethod
    def clear_terminal(**kwargs):
        if not kwargs.get("do", True):
            return

        if BrokenPlatform.OnWindows:
            os.system("cls")
        else:
            shell("clear", **kwargs)

    @staticmethod
    def log_system_info():
        log.info(f"• System Info: {platform.system()} {platform.release()}, Python {platform.python_version()} {platform.machine()}")

    # Simply, why Windows/Python have different directory names for scripts? ...
    # https://github.com/pypa/virtualenv/commit/993ba1316a83b760370f5a3872b3f5ef4dd904c1
    PyScripts         = ("Scripts" if OnWindows else "bin")
    PyScriptExtension = (".cmd" if OnWindows else "")

Name: str = platform.system().lower().replace('darwin', 'macos') class-attribute instance-attribute

OnLinux: bool = Name == 'linux' class-attribute instance-attribute

OnWindows: bool = Name == 'windows' class-attribute instance-attribute

OnMacOS: bool = Name == 'macos' class-attribute instance-attribute

OnBSD: bool = Name == 'bsd' class-attribute instance-attribute

Extension: str = '.exe' if OnWindows else '.bin' class-attribute instance-attribute

Architecture: str = platform.machine().lower().replace('x86_64', 'amd64') class-attribute instance-attribute

OnUnix: bool = OnLinux or OnMacOS or OnBSD class-attribute instance-attribute

LinuxDistro: str = distro.id() class-attribute instance-attribute

OnUbuntu: bool = LinuxDistro == 'ubuntu' class-attribute instance-attribute

OnDebian: bool = LinuxDistro == 'debian' class-attribute instance-attribute

OnMint: bool = LinuxDistro == 'linuxmint' class-attribute instance-attribute

OnRaspberry: bool = LinuxDistro == 'raspbian' class-attribute instance-attribute

OnUbuntuLike: bool = OnUbuntu or OnDebian or OnMint or OnRaspberry class-attribute instance-attribute

OnArch: bool = LinuxDistro == 'arch' class-attribute instance-attribute

OnManjaro: bool = LinuxDistro == 'manjaro' class-attribute instance-attribute

OnArchLike: bool = OnArch or OnManjaro class-attribute instance-attribute

OnFedora: bool = LinuxDistro == 'fedora' class-attribute instance-attribute

OnCentOS: bool = LinuxDistro == 'centos' class-attribute instance-attribute

OnRedHat: bool = LinuxDistro == 'rhel' class-attribute instance-attribute

OnRedHatLike: bool = OnFedora or OnCentOS or OnRedHat class-attribute instance-attribute

OnGentoo: bool = LinuxDistro == 'gentoo' class-attribute instance-attribute

OnOpenBSD: bool = LinuxDistro == 'openbsd' class-attribute instance-attribute

OnNetBSD: bool = LinuxDistro == 'netbsd' class-attribute instance-attribute

OnBSDLike: bool = OnOpenBSD or OnNetBSD class-attribute instance-attribute

Targets

List of common platforms targets for releases

Source code in Broken/Base.py
Python
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
class Targets(BrokenEnum):
    """List of common platforms targets for releases"""
    LinuxAMD64:   str = "linux-amd64"
    LinuxARM:     str = "linux-arm64"
    WindowsAMD64: str = "windows-amd64"
    WindowsARM:   str = "windows-arm64"
    MacosAMD64:   str = "macos-amd64"
    MacosARM:     str = "macos-arm64"

    @property
    def rust(self) -> str:
        return {
            self.LinuxAMD64:   "x86_64-unknown-linux-gnu",
            self.LinuxARM:     "aarch64-unknown-linux-gnu",
            self.WindowsAMD64: "x86_64-pc-windows-gnu",
            self.WindowsARM:   "aarch64-pc-windows-gnu",
            self.MacosAMD64:   "x86_64-apple-darwin",
            self.MacosARM:     "aarch64-apple-darwin",
        }[self]

    @property
    def extension(self) -> str:
        """Same as BrokenPlatform.Extension"""
        return ".exe" if ("windows" in self.value) else ".bin"

    @property
    def name(self) -> str:
        """Same as BrokenPlatform.Name"""
        return self.value.split("-")[0]

    @property
    def architecture(self) -> str:
        """Same as BrokenPlatform.Architecture"""
        return self.value.split("-")[1]
LinuxAMD64: str = 'linux-amd64' class-attribute instance-attribute
LinuxARM: str = 'linux-arm64' class-attribute instance-attribute
WindowsAMD64: str = 'windows-amd64' class-attribute instance-attribute
WindowsARM: str = 'windows-arm64' class-attribute instance-attribute
MacosAMD64: str = 'macos-amd64' class-attribute instance-attribute
MacosARM: str = 'macos-arm64' class-attribute instance-attribute
rust: str property
extension: str property

Same as BrokenPlatform.Extension

name: str property

Same as BrokenPlatform.Name

architecture: str property

Same as BrokenPlatform.Architecture

CurrentTarget: str = f'{Name}-{Architecture}' class-attribute instance-attribute

clear_terminal(**kwargs) staticmethod

Source code in Broken/Base.py
Python
340
341
342
343
344
345
346
347
348
@staticmethod
def clear_terminal(**kwargs):
    if not kwargs.get("do", True):
        return

    if BrokenPlatform.OnWindows:
        os.system("cls")
    else:
        shell("clear", **kwargs)

log_system_info() staticmethod

Source code in Broken/Base.py
Python
350
351
352
@staticmethod
def log_system_info():
    log.info(f"• System Info: {platform.system()} {platform.release()}, Python {platform.python_version()} {platform.machine()}")

PyScripts = 'Scripts' if OnWindows else 'bin' class-attribute instance-attribute

PyScriptExtension = '.cmd' if OnWindows else '' class-attribute instance-attribute

Base.ShutilFormat

Source code in Broken/Base.py
Python
361
362
363
364
365
366
class ShutilFormat(BrokenEnum):
    Zip   = "zip"
    Tar   = "tar"
    GzTar = "tar.gz"
    BzTar = "tar.bz2"
    XzTar = "tar.xz"

Zip = 'zip' class-attribute instance-attribute

Tar = 'tar' class-attribute instance-attribute

GzTar = 'tar.gz' class-attribute instance-attribute

BzTar = 'tar.bz2' class-attribute instance-attribute

XzTar = 'tar.xz' class-attribute instance-attribute

Base.BrokenPath

BrokenPath is not supposed to be instantiated as a class but used as a namespace (static class) • The new method returns the value of .get method, which is pathlib.Path • Many Path utilities as staticmethod and contextmanager

Source code in Broken/Base.py
Python
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
class BrokenPath(Path):
    """
    BrokenPath is not supposed to be instantiated as a class but used as a namespace (static class)
    • The __new__ method returns the value of .get method, which is pathlib.Path
    • Many Path utilities as staticmethod and contextmanager
    """
    def __new__(cls, *paths: Path, valid: bool=False) -> Optional[Path]:
        return cls.get(*paths, valid=valid)

    @staticmethod
    def get(*paths: Path, valid: bool=False) -> Optional[Path] | List[Optional[Path]]:
        """Resolve paths to absolute None-safe, optionally verify their existence, None if invalid"""
        paths = map(lambda path: Path(path).expanduser().resolve() if path else None, paths)
        paths = map(lambda path: (path if (path and path.exists()) else None) if valid else path, paths)
        paths = list(paths)
        return paths[0] if (len(paths) == 1) else paths

    @staticmethod
    def copy(src: Path, dst: Path, *, echo=True) -> Path:
        src, dst = BrokenPath(src, dst)
        BrokenPath.mkdir(dst.parent, echo=False)
        if src.is_dir():
            log.info(f"Copying Directory ({src})\n → ({dst})", echo=echo)
            shutil.copytree(src, dst)
        else:
            log.info(f"Copying File ({src})\n → ({dst})", echo=echo)
            shutil.copy2(src, dst)
        return dst

    @staticmethod
    def move(src: Path, dst: Path, *, echo=True) -> Path:
        src, dst = BrokenPath(src, dst)
        log.info(f"Moving ({src})\n → ({dst})", echo=echo)
        shutil.move(src, dst)
        return dst

    @staticmethod
    def remove(path: Path, *, confirm=False, echo=True) -> Path:

        # Already removed or doesn't exist
        if not (path := BrokenPath(path, valid=True)):
            return path

        log.info(f"Removing Path ({path})", echo=echo)

        # Safety: Must not be common
        if path in (Path.cwd(), Path.home()):
            log.error(f"Avoided catastrophic failure by not removing ({path})")
            exit(1)

        # Symlinks are safe to remove
        if path.is_symlink():
            path.unlink()
            return path

        # Confirm removal: directory contains data
        if confirm and (not click.confirm(f"• Confirm removing path ({path})")):
            return path

        # Remove the path
        if path.is_dir():
            shutil.rmtree(path, ignore_errors=True)
        else:
            path.unlink()

        return path

    @staticmethod
    def touch(path: Path, *, echo=True) -> Path:
        """Creates a file, fail safe™"""
        if (path := BrokenPath(path)).exists():
            log.success(f"File ({path}) already touched", echo=echo)
            return
        log.info(f"Touching file {path}", echo=echo)
        path.touch()
        return path

    @staticmethod
    def mkdir(path: Path, parent: bool=False, *, echo=True) -> Path:
        """Creates a directory and its parents, fail safe™"""
        path = BrokenPath(path)
        path = path.parent if parent else path
        if path.exists():
            log.success(f"Directory ({path}) already exists", echo=echo)
            return path
        log.info(f"Creating directory {path}", echo=echo)
        path.mkdir(parents=True, exist_ok=True)
        return path

    @staticmethod
    def resetdir(path: Path, *, echo=True) -> Path:
        """Creates a directory and its parents, fail safe™"""
        BrokenPath.remove(path, echo=echo)
        return BrokenPath.mkdir(path, echo=echo)

    @contextlib.contextmanager
    def pushd(path: Path, *, echo: bool=True) -> Generator[Path, None, None]:
        """Change directory, then change back when done"""
        path = BrokenPath(path)
        cwd = os.getcwd()
        log.info(f"Pushd ({path})", echo=echo)
        os.chdir(path)
        yield path
        log.info(f"Popd  ({path})", echo=echo)
        os.chdir(cwd)

    @staticmethod
    def symlink(virtual: Path, real: Path, *, echo: bool=True) -> Path:
        """
        Symlink [virtual] -> [real], `virtual` being the symlink file and `real` the target

        Args:
            virtual (Path): Symlink path (file)
            real (Path): Target path (real path)

        Returns:
            None if it fails, else `virtual` Path
        """
        log.info(f"Symlinking ({virtual})\n → ({real})", echo=echo)

        # Return if already symlinked
        if (BrokenPath(virtual) == BrokenPath(real)):
            return virtual

        # Make Virtual's parent directory
        BrokenPath.mkdir(virtual.parent, echo=False)

        # Remove old symlink if it points to a non existing directory
        if virtual.is_symlink() and (not virtual.resolve().exists()):
            virtual.unlink()

        # Virtual doesn't exist, ok to create
        elif not virtual.exists():
            pass

        # File exists and is a symlink - safe to remove
        elif virtual.is_symlink():
            virtual.unlink()

        # Virtual is a directory and not empty
        elif virtual.is_dir() and (not os.listdir(virtual)):
            BrokenPath.remove(virtual, echo=False)

        else:
            if click.confirm(f"• Path ({virtual}) exists, but Broken wants to create a symlink to ({real})\nConfirm removing the 'virtual' path and continuing? (It might contain data or be a important symlink)"):
                BrokenPath.remove(virtual, echo=False)
            else:
                return

        # Actually symlink
        virtual.symlink_to(real)
        return virtual

    @staticmethod
    def make_executable(path: Path, *, echo=False) -> Path:
        """Make a file executable"""
        path = BrokenPath(path)
        if BrokenPlatform.OnUnix:
            shell("chmod", "+x", path, echo=echo)
        elif BrokenPlatform.OnWindows:
            shell("attrib", "+x", path, echo=echo)
        return path

    @staticmethod
    def zip(path: Path, output: Path=None, *, format: ShutilFormat=ShutilFormat.Zip, echo: bool=True) -> Path:
        format = ShutilFormat(format)
        output = BrokenPath(output or path).with_suffix(f".{format}")
        path   = BrokenPath(path)
        BrokenPath.remove(output, echo=echo)
        log.info(f"Zipping ({path})\n → ({output})", echo=echo)
        shutil.make_archive(output.with_suffix(""), format, path)
        return output

    @staticmethod
    def stem(path: Path) -> str:
        """
        Get the "true stem" of a path, as pathlib's only gets the last dot one
        • "/path/with/many.ext.ens.ions" -> "many" instead of "many.ext.ens"
        """
        stem = Path(Path(path).stem)
        while (stem := Path(stem).with_suffix("")).suffix:
            continue
        return str(stem)

    @staticmethod
    def sha256sum(data: Union[Path, str, bytes]) -> Optional[str]:
        """Get the sha256sum of a file, directory or bytes"""

        # Nibble the bytes !
        if isinstance(data, bytes):
            return hashlib.sha256(data).hexdigest()

        # String or Path is a valid path
        elif (path := BrokenPath(data, valid=True)):
            with BrokenSpinner(log.info(f"Calculating sha256sum of ({path})")):
                if path.is_file():
                    return hashlib.sha256(path.read_bytes()).hexdigest()

                # Iterate on all files for low memory footprint
                feed = hashlib.sha256()
                for file in path.rglob("*"):
                    if not file.is_file():
                        continue
                    with open(file, "rb") as file:
                        while (chunk := file.read(8192)):
                            feed.update(chunk)
                return feed.hexdigest()

        elif isinstance(data, str):
            return hashlib.sha256(data.encode("utf-8")).hexdigest()

        return

    @staticmethod
    def extract(
        path: Path,
        output: Path=None,
        *,
        overwrite: bool=False,
        PATH: bool=False,
        echo: bool=True
    ) -> Path:
        path, output = BrokenPath(path, output)

        # Output is input without suffix if not given
        if (output is None):
            output = path.parent/BrokenPath.stem(path)

        # Add stem to the output as some archives might be flat
        output /= BrokenPath.stem(path)

        # Re-extract on order
        Maybe(BrokenPath.remove, overwrite, echo=echo)

        # A file to skip if it exists, created after successful extraction
        if (extract_flag := (output/"BrokenPath.extract.ok")).exists():
            log.minor(f"Already extracted ({output})", echo=echo)
            Maybe(BrokenPath.add_to_path, PATH, path=output, recursively=True, echo=echo)
            return output

        # Show progress as this might take a while on slower IOs
        log.info(f"Extracting ({path})\n → ({output})", echo=echo)
        with BrokenSpinner("Extracting archive.."):
            shutil.unpack_archive(path, output)

        extract_flag.touch()
        Maybe(BrokenPath.add_to_path, PATH, path=output, recursively=True, echo=echo)
        return output/BrokenPath.stem(path)

    @staticmethod
    def download(
        url: str,
        output: Path=None,
        *,
        size_check: bool=True,
        chunk: int=1024,
        echo: bool=True
    ) -> Optional[Path]:

        # Link must be valid
        if not validators.url(url):
            log.error(f"The string ({url}) doesn't look like a valid URL", echo=echo)
            return

        import requests

        # Default to Broken's Download directory
        if (output is None):
            output = Broken.BROKEN.DIRECTORIES.DOWNLOADS

        # Append url's file name to the output path
        if (output := BrokenPath(output)).is_dir():
            output /= Path(url.split("#")[0].split("?")[0].split("/")[-1])

        # Without size check, the existence of the file is enough
        if output.exists() and (not size_check):
            log.info(f"Already Downloaded ({output})", echo=echo)
            log.minor("• Size check was skipped, the file might be incomplete", echo=echo)
            return

        # Send the GET request, we might be offline!
        try:
            response = requests.get(url, stream=True)
        except requests.exceptions.RequestException as error:
            log.error(f"Failed to download file ({url}) → ({output}): {error}", echo=echo)
            return output

        size = int(response.headers.get('content-length', 0))

        # The file might already be (partially) downloaded
        if output.exists():
            A, B = (output.stat().st_size, size)
            if (A == B):
                log.info(f"Already Downloaded ({output})", echo=echo)
                return output
            else:
                log.warning(f"Wrong Download at ({output})", echo=echo)

        log.info(f"Downloading file at ({url}):", echo=echo)
        log.info(f"• Output: ({output})", echo=echo)

        # It is binary prefix, right? kibi, mebi, gibi, etc. as we're dealing with raw bytes
        with open(output, "wb") as file, tqdm.tqdm(
            desc=f"Downloading ({output.name})",
            total=size, unit="iB", unit_scale=True, unit_divisor=1024,
            mininterval=1/30, maxinterval=0.5, leave=False
        ) as progress:
            for data in response.iter_content(chunk_size=chunk):
                progress.update(file.write(data))

        # Url was invalid
        if (response.status_code != 200):
            log.error(f"Failed to Download File at ({url}):", echo=echo)
            log.error(f"• HTTP Error: {response.status_code}", echo=echo)
            return

        # Wrong downloaded and expected size
        elif (output.stat().st_size != size):
            log.error(f"File ({output}) was not downloaded correctly", echo=echo)
            return

        log.success(f"Downloaded file ({output}) from ({url})", echo=echo)
        return output

    @staticmethod
    def get_external(url: str, *, subdir: str="", echo: bool=True) -> Path:
        file = BrokenPath.download(denum(url), echo=echo)

        # File is a Archive, extract
        if any((str(file).endswith(ext) for ext in ShutilFormat.values)):
            directory = Broken.BROKEN.DIRECTORIES.EXTERNAL_ARCHIVES
            return BrokenPath.extract(file, directory, PATH=True, echo=echo)

        # File is some known type, move to their own external directory
        if bool(subdir):
            directory = Broken.BROKEN.DIRECTORIES.EXTERNALS/subdir
        elif file.suffix in AUDIO_EXTENSIONS:
            directory = Broken.BROKEN.DIRECTORIES.EXTERNAL_AUDIO
        elif file.suffix in IMAGE_EXTENSIONS:
            directory = Broken.BROKEN.DIRECTORIES.EXTERNAL_IMAGES
        elif file.suffix in FONTS_EXTENSIONS:
            directory = Broken.BROKEN.DIRECTORIES.EXTERNAL_FONTS
        elif file.suffix in SOUNDFONTS_EXTENSIONS:
            directory = Broken.BROKEN.DIRECTORIES.EXTERNAL_SOUNDFONTS
        elif file.suffix in MIDI_EXTENSIONS:
            directory = Broken.BROKEN.DIRECTORIES.EXTERNAL_MIDIS
        else:
            directory = Broken.BROKEN.DIRECTORIES.EXTERNALS
        return BrokenPath.move(file, directory/subdir, echo=echo)

    @staticmethod
    @easy_multiple
    def which(*name: str, echo=True) -> Optional[Path]:
        BrokenPath.update_externals_path()
        return apply(shutil.which, name)

    @staticmethod
    def update_externals_path(path: Path=None, *, echo: bool=True) -> Optional[Path]:
        path = path or Broken.BROKEN.DIRECTORIES.EXTERNALS
        return BrokenPath.add_to_path(path, recursively=True, echo=echo)

    @staticmethod
    def on_path(path: Path) -> bool:
        """Check if a path is on PATH, works with symlinks"""
        return BrokenPath(path) in map(BrokenPath, os.environ.get("PATH", "").split(os.pathsep))

    @staticmethod
    def add_to_path(
        path: Path,
        *,
        recursively: bool=False,
        persistent: bool=False,
        preferential: bool=True,
        echo: bool=True
    ) -> Path:
        """
        Add a path, recursively or not, to System's Path or this Python process's Path

        Args:
            recursively: Also add all subdirectories of the given path
            persistent: Use 'userpath' package to add to the Shell's or Registry PATH
            preferential: Prepends the path for less priority on system binaries

        Returns:
            The Path argument itself
        """
        path = BrokenPath(path)

        # Can't recurse on file or non existing directories
        if (not path.exists()) and path.is_file() and recursively:
            log.warning(f"Can't add non existing path or file recursively to Path ({path})", echo=echo)
            return path

        log.debug(f"Adding to Path (Recursively: {recursively}, Persistent: {persistent}): ({path})", echo=echo)

        for other in (path.rglob("*") if recursively else [path]):
            if other.is_file():
                continue
            if BrokenPath.on_path(other):
                continue
            if persistent:
                import userpath
                userpath.append(str(other))
            else:
                if preferential:
                    os.environ["PATH"] = (str(other)+os.pathsep+os.environ["PATH"])
                    sys.path.insert(0, str(other))
                else:
                    os.environ["PATH"] = (os.environ["PATH"]+os.pathsep+str(other))
                    sys.path.append(str(other))
        return path

    # # Specific / "Utils"

    @staticmethod
    def open_in_file_explorer(path: Path):
        """Opens a path in the file explorer"""
        path = BrokenPath(path)
        if BrokenPlatform.OnWindows:
            os.startfile(str(path))
        elif BrokenPlatform.OnLinux:
            shell("xdg-open", path)
        elif BrokenPlatform.OnMacOS:
            shell("open", path)

    # Fixme: Untested functions, needs better name; are these useful?

    @staticmethod
    def non_empty_file(path: Path) -> bool:
        return path.exists() and path.is_file() and path.stat().st_size > 0

    @staticmethod
    def empty_file(path: Path, create: bool=True) -> bool:
        if create and not path.exists():
            path.parent.mkdir(parents=True, exist_ok=True)
            path.touch()
        return path.exists() and path.is_file() and len(path.read_text()) == 0

    @contextlib.contextmanager
    def PATH(*,
        directories: List[Path],
        recursive: bool=True,
        prepend: bool=True,
        clean: bool=False,
        restore: bool=True,
    ):
        """
        Temporarily limits the PATH to given directories
        - directories: List of directories to add to PATH
        - recursive: Whether to add subdirectories of given directories to PATH
        - prepend: Prioritize binaries found in input directories
        - restricted: Do not include current PATH in the new PATH
        """

        # Make Path objects
        directories = apply(Path, flatten(directories))

        # Get current PATH
        old = os.environ["PATH"]

        # List of all directories in PATH
        PATH = [] if clean else os.environ["PATH"].split(os.pathsep)

        # Add directories to PATH
        for directory in directories:
            PATH.append(directory)

            # Do not recurse if so
            if not recursive:
                continue

            # WARN: This could be slow on too many directories (wrong input?)
            # Find all subdirectories of a path
            for path in directory.rglob("*"):
                if path.is_dir():
                    if prepend:
                        PATH.insert(0, path)
                    else:
                        PATH.append(path)

        # Set new PATH
        os.environ["PATH"] = os.pathsep.join(map(str, PATH))

        yield os.environ["PATH"]

        # Restore PATH
        os.environ["PATH"] = old

__new__(*paths: Path, valid: bool = False) -> Optional[Path]

Source code in Broken/Base.py
Python
374
375
def __new__(cls, *paths: Path, valid: bool=False) -> Optional[Path]:
    return cls.get(*paths, valid=valid)

get(*paths: Path, valid: bool = False) -> Optional[Path] | List[Optional[Path]] staticmethod

Resolve paths to absolute None-safe, optionally verify their existence, None if invalid

Source code in Broken/Base.py
Python
377
378
379
380
381
382
383
@staticmethod
def get(*paths: Path, valid: bool=False) -> Optional[Path] | List[Optional[Path]]:
    """Resolve paths to absolute None-safe, optionally verify their existence, None if invalid"""
    paths = map(lambda path: Path(path).expanduser().resolve() if path else None, paths)
    paths = map(lambda path: (path if (path and path.exists()) else None) if valid else path, paths)
    paths = list(paths)
    return paths[0] if (len(paths) == 1) else paths

copy(src: Path, dst: Path, *, echo=True) -> Path staticmethod

Source code in Broken/Base.py
Python
385
386
387
388
389
390
391
392
393
394
395
@staticmethod
def copy(src: Path, dst: Path, *, echo=True) -> Path:
    src, dst = BrokenPath(src, dst)
    BrokenPath.mkdir(dst.parent, echo=False)
    if src.is_dir():
        log.info(f"Copying Directory ({src})\n → ({dst})", echo=echo)
        shutil.copytree(src, dst)
    else:
        log.info(f"Copying File ({src})\n → ({dst})", echo=echo)
        shutil.copy2(src, dst)
    return dst

move(src: Path, dst: Path, *, echo=True) -> Path staticmethod

Source code in Broken/Base.py
Python
397
398
399
400
401
402
@staticmethod
def move(src: Path, dst: Path, *, echo=True) -> Path:
    src, dst = BrokenPath(src, dst)
    log.info(f"Moving ({src})\n → ({dst})", echo=echo)
    shutil.move(src, dst)
    return dst

remove(path: Path, *, confirm=False, echo=True) -> Path staticmethod

Source code in Broken/Base.py
Python
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
@staticmethod
def remove(path: Path, *, confirm=False, echo=True) -> Path:

    # Already removed or doesn't exist
    if not (path := BrokenPath(path, valid=True)):
        return path

    log.info(f"Removing Path ({path})", echo=echo)

    # Safety: Must not be common
    if path in (Path.cwd(), Path.home()):
        log.error(f"Avoided catastrophic failure by not removing ({path})")
        exit(1)

    # Symlinks are safe to remove
    if path.is_symlink():
        path.unlink()
        return path

    # Confirm removal: directory contains data
    if confirm and (not click.confirm(f"• Confirm removing path ({path})")):
        return path

    # Remove the path
    if path.is_dir():
        shutil.rmtree(path, ignore_errors=True)
    else:
        path.unlink()

    return path

touch(path: Path, *, echo=True) -> Path staticmethod

Creates a file, fail safe™

Source code in Broken/Base.py
Python
435
436
437
438
439
440
441
442
443
@staticmethod
def touch(path: Path, *, echo=True) -> Path:
    """Creates a file, fail safe™"""
    if (path := BrokenPath(path)).exists():
        log.success(f"File ({path}) already touched", echo=echo)
        return
    log.info(f"Touching file {path}", echo=echo)
    path.touch()
    return path

mkdir(path: Path, parent: bool = False, *, echo=True) -> Path staticmethod

Creates a directory and its parents, fail safe™

Source code in Broken/Base.py
Python
445
446
447
448
449
450
451
452
453
454
455
@staticmethod
def mkdir(path: Path, parent: bool=False, *, echo=True) -> Path:
    """Creates a directory and its parents, fail safe™"""
    path = BrokenPath(path)
    path = path.parent if parent else path
    if path.exists():
        log.success(f"Directory ({path}) already exists", echo=echo)
        return path
    log.info(f"Creating directory {path}", echo=echo)
    path.mkdir(parents=True, exist_ok=True)
    return path

resetdir(path: Path, *, echo=True) -> Path staticmethod

Creates a directory and its parents, fail safe™

Source code in Broken/Base.py
Python
457
458
459
460
461
@staticmethod
def resetdir(path: Path, *, echo=True) -> Path:
    """Creates a directory and its parents, fail safe™"""
    BrokenPath.remove(path, echo=echo)
    return BrokenPath.mkdir(path, echo=echo)

pushd(path: Path, *, echo: bool = True) -> Generator[Path, None, None]

Change directory, then change back when done

Source code in Broken/Base.py
Python
463
464
465
466
467
468
469
470
471
472
@contextlib.contextmanager
def pushd(path: Path, *, echo: bool=True) -> Generator[Path, None, None]:
    """Change directory, then change back when done"""
    path = BrokenPath(path)
    cwd = os.getcwd()
    log.info(f"Pushd ({path})", echo=echo)
    os.chdir(path)
    yield path
    log.info(f"Popd  ({path})", echo=echo)
    os.chdir(cwd)

Symlink [virtual] -> [real], virtual being the symlink file and real the target

Parameters:

  • virtual (Path) –

    Symlink path (file)

  • real (Path) –

    Target path (real path)

Returns:

  • Path

    None if it fails, else virtual Path

Source code in Broken/Base.py
Python
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
@staticmethod
def symlink(virtual: Path, real: Path, *, echo: bool=True) -> Path:
    """
    Symlink [virtual] -> [real], `virtual` being the symlink file and `real` the target

    Args:
        virtual (Path): Symlink path (file)
        real (Path): Target path (real path)

    Returns:
        None if it fails, else `virtual` Path
    """
    log.info(f"Symlinking ({virtual})\n → ({real})", echo=echo)

    # Return if already symlinked
    if (BrokenPath(virtual) == BrokenPath(real)):
        return virtual

    # Make Virtual's parent directory
    BrokenPath.mkdir(virtual.parent, echo=False)

    # Remove old symlink if it points to a non existing directory
    if virtual.is_symlink() and (not virtual.resolve().exists()):
        virtual.unlink()

    # Virtual doesn't exist, ok to create
    elif not virtual.exists():
        pass

    # File exists and is a symlink - safe to remove
    elif virtual.is_symlink():
        virtual.unlink()

    # Virtual is a directory and not empty
    elif virtual.is_dir() and (not os.listdir(virtual)):
        BrokenPath.remove(virtual, echo=False)

    else:
        if click.confirm(f"• Path ({virtual}) exists, but Broken wants to create a symlink to ({real})\nConfirm removing the 'virtual' path and continuing? (It might contain data or be a important symlink)"):
            BrokenPath.remove(virtual, echo=False)
        else:
            return

    # Actually symlink
    virtual.symlink_to(real)
    return virtual

make_executable(path: Path, *, echo=False) -> Path staticmethod

Make a file executable

Source code in Broken/Base.py
Python
521
522
523
524
525
526
527
528
529
@staticmethod
def make_executable(path: Path, *, echo=False) -> Path:
    """Make a file executable"""
    path = BrokenPath(path)
    if BrokenPlatform.OnUnix:
        shell("chmod", "+x", path, echo=echo)
    elif BrokenPlatform.OnWindows:
        shell("attrib", "+x", path, echo=echo)
    return path

zip(path: Path, output: Path = None, *, format: ShutilFormat = ShutilFormat.Zip, echo: bool = True) -> Path staticmethod

Source code in Broken/Base.py
Python
531
532
533
534
535
536
537
538
539
@staticmethod
def zip(path: Path, output: Path=None, *, format: ShutilFormat=ShutilFormat.Zip, echo: bool=True) -> Path:
    format = ShutilFormat(format)
    output = BrokenPath(output or path).with_suffix(f".{format}")
    path   = BrokenPath(path)
    BrokenPath.remove(output, echo=echo)
    log.info(f"Zipping ({path})\n → ({output})", echo=echo)
    shutil.make_archive(output.with_suffix(""), format, path)
    return output

stem(path: Path) -> str staticmethod

Get the "true stem" of a path, as pathlib's only gets the last dot one • "/path/with/many.ext.ens.ions" -> "many" instead of "many.ext.ens"

Source code in Broken/Base.py
Python
541
542
543
544
545
546
547
548
549
550
@staticmethod
def stem(path: Path) -> str:
    """
    Get the "true stem" of a path, as pathlib's only gets the last dot one
    • "/path/with/many.ext.ens.ions" -> "many" instead of "many.ext.ens"
    """
    stem = Path(Path(path).stem)
    while (stem := Path(stem).with_suffix("")).suffix:
        continue
    return str(stem)

sha256sum(data: Union[Path, str, bytes]) -> Optional[str] staticmethod

Get the sha256sum of a file, directory or bytes

Source code in Broken/Base.py
Python
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
@staticmethod
def sha256sum(data: Union[Path, str, bytes]) -> Optional[str]:
    """Get the sha256sum of a file, directory or bytes"""

    # Nibble the bytes !
    if isinstance(data, bytes):
        return hashlib.sha256(data).hexdigest()

    # String or Path is a valid path
    elif (path := BrokenPath(data, valid=True)):
        with BrokenSpinner(log.info(f"Calculating sha256sum of ({path})")):
            if path.is_file():
                return hashlib.sha256(path.read_bytes()).hexdigest()

            # Iterate on all files for low memory footprint
            feed = hashlib.sha256()
            for file in path.rglob("*"):
                if not file.is_file():
                    continue
                with open(file, "rb") as file:
                    while (chunk := file.read(8192)):
                        feed.update(chunk)
            return feed.hexdigest()

    elif isinstance(data, str):
        return hashlib.sha256(data.encode("utf-8")).hexdigest()

    return

extract(path: Path, output: Path = None, *, overwrite: bool = False, PATH: bool = False, echo: bool = True) -> Path staticmethod

Source code in Broken/Base.py
Python
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
@staticmethod
def extract(
    path: Path,
    output: Path=None,
    *,
    overwrite: bool=False,
    PATH: bool=False,
    echo: bool=True
) -> Path:
    path, output = BrokenPath(path, output)

    # Output is input without suffix if not given
    if (output is None):
        output = path.parent/BrokenPath.stem(path)

    # Add stem to the output as some archives might be flat
    output /= BrokenPath.stem(path)

    # Re-extract on order
    Maybe(BrokenPath.remove, overwrite, echo=echo)

    # A file to skip if it exists, created after successful extraction
    if (extract_flag := (output/"BrokenPath.extract.ok")).exists():
        log.minor(f"Already extracted ({output})", echo=echo)
        Maybe(BrokenPath.add_to_path, PATH, path=output, recursively=True, echo=echo)
        return output

    # Show progress as this might take a while on slower IOs
    log.info(f"Extracting ({path})\n → ({output})", echo=echo)
    with BrokenSpinner("Extracting archive.."):
        shutil.unpack_archive(path, output)

    extract_flag.touch()
    Maybe(BrokenPath.add_to_path, PATH, path=output, recursively=True, echo=echo)
    return output/BrokenPath.stem(path)

download(url: str, output: Path = None, *, size_check: bool = True, chunk: int = 1024, echo: bool = True) -> Optional[Path] staticmethod

Source code in Broken/Base.py
Python
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
@staticmethod
def download(
    url: str,
    output: Path=None,
    *,
    size_check: bool=True,
    chunk: int=1024,
    echo: bool=True
) -> Optional[Path]:

    # Link must be valid
    if not validators.url(url):
        log.error(f"The string ({url}) doesn't look like a valid URL", echo=echo)
        return

    import requests

    # Default to Broken's Download directory
    if (output is None):
        output = Broken.BROKEN.DIRECTORIES.DOWNLOADS

    # Append url's file name to the output path
    if (output := BrokenPath(output)).is_dir():
        output /= Path(url.split("#")[0].split("?")[0].split("/")[-1])

    # Without size check, the existence of the file is enough
    if output.exists() and (not size_check):
        log.info(f"Already Downloaded ({output})", echo=echo)
        log.minor("• Size check was skipped, the file might be incomplete", echo=echo)
        return

    # Send the GET request, we might be offline!
    try:
        response = requests.get(url, stream=True)
    except requests.exceptions.RequestException as error:
        log.error(f"Failed to download file ({url}) → ({output}): {error}", echo=echo)
        return output

    size = int(response.headers.get('content-length', 0))

    # The file might already be (partially) downloaded
    if output.exists():
        A, B = (output.stat().st_size, size)
        if (A == B):
            log.info(f"Already Downloaded ({output})", echo=echo)
            return output
        else:
            log.warning(f"Wrong Download at ({output})", echo=echo)

    log.info(f"Downloading file at ({url}):", echo=echo)
    log.info(f"• Output: ({output})", echo=echo)

    # It is binary prefix, right? kibi, mebi, gibi, etc. as we're dealing with raw bytes
    with open(output, "wb") as file, tqdm.tqdm(
        desc=f"Downloading ({output.name})",
        total=size, unit="iB", unit_scale=True, unit_divisor=1024,
        mininterval=1/30, maxinterval=0.5, leave=False
    ) as progress:
        for data in response.iter_content(chunk_size=chunk):
            progress.update(file.write(data))

    # Url was invalid
    if (response.status_code != 200):
        log.error(f"Failed to Download File at ({url}):", echo=echo)
        log.error(f"• HTTP Error: {response.status_code}", echo=echo)
        return

    # Wrong downloaded and expected size
    elif (output.stat().st_size != size):
        log.error(f"File ({output}) was not downloaded correctly", echo=echo)
        return

    log.success(f"Downloaded file ({output}) from ({url})", echo=echo)
    return output

get_external(url: str, *, subdir: str = '', echo: bool = True) -> Path staticmethod

Source code in Broken/Base.py
Python
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
@staticmethod
def get_external(url: str, *, subdir: str="", echo: bool=True) -> Path:
    file = BrokenPath.download(denum(url), echo=echo)

    # File is a Archive, extract
    if any((str(file).endswith(ext) for ext in ShutilFormat.values)):
        directory = Broken.BROKEN.DIRECTORIES.EXTERNAL_ARCHIVES
        return BrokenPath.extract(file, directory, PATH=True, echo=echo)

    # File is some known type, move to their own external directory
    if bool(subdir):
        directory = Broken.BROKEN.DIRECTORIES.EXTERNALS/subdir
    elif file.suffix in AUDIO_EXTENSIONS:
        directory = Broken.BROKEN.DIRECTORIES.EXTERNAL_AUDIO
    elif file.suffix in IMAGE_EXTENSIONS:
        directory = Broken.BROKEN.DIRECTORIES.EXTERNAL_IMAGES
    elif file.suffix in FONTS_EXTENSIONS:
        directory = Broken.BROKEN.DIRECTORIES.EXTERNAL_FONTS
    elif file.suffix in SOUNDFONTS_EXTENSIONS:
        directory = Broken.BROKEN.DIRECTORIES.EXTERNAL_SOUNDFONTS
    elif file.suffix in MIDI_EXTENSIONS:
        directory = Broken.BROKEN.DIRECTORIES.EXTERNAL_MIDIS
    else:
        directory = Broken.BROKEN.DIRECTORIES.EXTERNALS
    return BrokenPath.move(file, directory/subdir, echo=echo)

which(*name: str, echo=True) -> Optional[Path] staticmethod

Source code in Broken/Base.py
Python
718
719
720
721
722
@staticmethod
@easy_multiple
def which(*name: str, echo=True) -> Optional[Path]:
    BrokenPath.update_externals_path()
    return apply(shutil.which, name)

update_externals_path(path: Path = None, *, echo: bool = True) -> Optional[Path] staticmethod

Source code in Broken/Base.py
Python
724
725
726
727
@staticmethod
def update_externals_path(path: Path=None, *, echo: bool=True) -> Optional[Path]:
    path = path or Broken.BROKEN.DIRECTORIES.EXTERNALS
    return BrokenPath.add_to_path(path, recursively=True, echo=echo)

on_path(path: Path) -> bool staticmethod

Check if a path is on PATH, works with symlinks

Source code in Broken/Base.py
Python
729
730
731
732
@staticmethod
def on_path(path: Path) -> bool:
    """Check if a path is on PATH, works with symlinks"""
    return BrokenPath(path) in map(BrokenPath, os.environ.get("PATH", "").split(os.pathsep))

add_to_path(path: Path, *, recursively: bool = False, persistent: bool = False, preferential: bool = True, echo: bool = True) -> Path staticmethod

Add a path, recursively or not, to System's Path or this Python process's Path

Parameters:

  • recursively (bool, default: False ) –

    Also add all subdirectories of the given path

  • persistent (bool, default: False ) –

    Use 'userpath' package to add to the Shell's or Registry PATH

  • preferential (bool, default: True ) –

    Prepends the path for less priority on system binaries

Returns:

  • Path

    The Path argument itself

Source code in Broken/Base.py
Python
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
@staticmethod
def add_to_path(
    path: Path,
    *,
    recursively: bool=False,
    persistent: bool=False,
    preferential: bool=True,
    echo: bool=True
) -> Path:
    """
    Add a path, recursively or not, to System's Path or this Python process's Path

    Args:
        recursively: Also add all subdirectories of the given path
        persistent: Use 'userpath' package to add to the Shell's or Registry PATH
        preferential: Prepends the path for less priority on system binaries

    Returns:
        The Path argument itself
    """
    path = BrokenPath(path)

    # Can't recurse on file or non existing directories
    if (not path.exists()) and path.is_file() and recursively:
        log.warning(f"Can't add non existing path or file recursively to Path ({path})", echo=echo)
        return path

    log.debug(f"Adding to Path (Recursively: {recursively}, Persistent: {persistent}): ({path})", echo=echo)

    for other in (path.rglob("*") if recursively else [path]):
        if other.is_file():
            continue
        if BrokenPath.on_path(other):
            continue
        if persistent:
            import userpath
            userpath.append(str(other))
        else:
            if preferential:
                os.environ["PATH"] = (str(other)+os.pathsep+os.environ["PATH"])
                sys.path.insert(0, str(other))
            else:
                os.environ["PATH"] = (os.environ["PATH"]+os.pathsep+str(other))
                sys.path.append(str(other))
    return path

open_in_file_explorer(path: Path) staticmethod

Opens a path in the file explorer

Source code in Broken/Base.py
Python
782
783
784
785
786
787
788
789
790
791
@staticmethod
def open_in_file_explorer(path: Path):
    """Opens a path in the file explorer"""
    path = BrokenPath(path)
    if BrokenPlatform.OnWindows:
        os.startfile(str(path))
    elif BrokenPlatform.OnLinux:
        shell("xdg-open", path)
    elif BrokenPlatform.OnMacOS:
        shell("open", path)

non_empty_file(path: Path) -> bool staticmethod

Source code in Broken/Base.py
Python
795
796
797
@staticmethod
def non_empty_file(path: Path) -> bool:
    return path.exists() and path.is_file() and path.stat().st_size > 0

empty_file(path: Path, create: bool = True) -> bool staticmethod

Source code in Broken/Base.py
Python
799
800
801
802
803
804
@staticmethod
def empty_file(path: Path, create: bool=True) -> bool:
    if create and not path.exists():
        path.parent.mkdir(parents=True, exist_ok=True)
        path.touch()
    return path.exists() and path.is_file() and len(path.read_text()) == 0

PATH(*, directories: List[Path], recursive: bool = True, prepend: bool = True, clean: bool = False, restore: bool = True)

Temporarily limits the PATH to given directories - directories: List of directories to add to PATH - recursive: Whether to add subdirectories of given directories to PATH - prepend: Prioritize binaries found in input directories - restricted: Do not include current PATH in the new PATH

Source code in Broken/Base.py
Python
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
@contextlib.contextmanager
def PATH(*,
    directories: List[Path],
    recursive: bool=True,
    prepend: bool=True,
    clean: bool=False,
    restore: bool=True,
):
    """
    Temporarily limits the PATH to given directories
    - directories: List of directories to add to PATH
    - recursive: Whether to add subdirectories of given directories to PATH
    - prepend: Prioritize binaries found in input directories
    - restricted: Do not include current PATH in the new PATH
    """

    # Make Path objects
    directories = apply(Path, flatten(directories))

    # Get current PATH
    old = os.environ["PATH"]

    # List of all directories in PATH
    PATH = [] if clean else os.environ["PATH"].split(os.pathsep)

    # Add directories to PATH
    for directory in directories:
        PATH.append(directory)

        # Do not recurse if so
        if not recursive:
            continue

        # WARN: This could be slow on too many directories (wrong input?)
        # Find all subdirectories of a path
        for path in directory.rglob("*"):
            if path.is_dir():
                if prepend:
                    PATH.insert(0, path)
                else:
                    PATH.append(path)

    # Set new PATH
    os.environ["PATH"] = os.pathsep.join(map(str, PATH))

    yield os.environ["PATH"]

    # Restore PATH
    os.environ["PATH"] = old

Base.BrokenUtils

Source code in Broken/Base.py
Python
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
class BrokenUtils:

    @staticmethod
    def truthy(items: List[Any]) -> List[Any]:
        """Transforms a list into a truthy-only values list, removing all falsy values"""
        return [item for item in flatten(items) if item]

    @staticmethod
    def get_free_tcp_port():
        import socket
        temp_socket = socket.socket()
        temp_socket.bind(('', 0))
        port = temp_socket.getsockname()[1]
        temp_socket.close()
        return port

    @staticmethod
    def fuzzy_string_search(string: str, choices: List[str], many: int=1, minimum_score: int=0) -> list[tuple[str, int]]:
        """Fuzzy search a string in a list of strings, returns a list of matches"""
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore")
            import thefuzz.process
            result = thefuzz.process.extract(string, choices, limit=many)
            if many == 1:
                return result[0]
            return result

    @staticmethod
    def recurse(function: callable, **variables) -> Any:
        """
        Calls some function with the previous scope locals() updated by variables
        # Note: Not a fast method, consider using for convenience only

        Use case are functions that are called recursively and need to be called with the same arguments

        ```python
        def function(with, many, arguments, and, only=3, one=True, modification="Nice"):
            ...
            if recurse_condition:
                BrokenUtils.recurse(function, many=True)
        ```
        """

        # Get the previous scope locals() and update it with the variables
        previous_locals = inspect.currentframe().f_back.f_locals
        previous_locals.update(variables)

        # Filter out variables that are not in the function arguments
        previous_locals = {
            k: v for k, v in previous_locals.items()
            if k in inspect.getfullargspec(function).args
            if k != "self"
        }

        # Call and return the same function
        return function(**previous_locals)

    @staticmethod
    def sublist_in_list(sublist: List[Any], list: List[Any]) -> bool:
        """Check if a sublist is in a list"""
        return all(item in list for item in sublist)

    @staticmethod
    def extend(base: type, name: str=None, as_property: bool=False) -> type:
        """
        Extend a class with another class's methods or a method directly.

        # Usage:
        Decorator of the class or method, class to extend as argument

        @BrokenUtils.extend(BaseClass)
        class ExtendedClass:
            def method(self):
                ...

        @BrokenUtils.extend(BaseClass)
        def method(self):
            ...

        @BrokenUtils.extend(BaseClass, as_property=True)
        def method(self):
            ...
        """
        def extender(add: type):

            # Extend as property
            if as_property:
                return BrokenUtils.extend(base, name=name, as_property=False)(property(add))

            # If add is a method
            if isinstance(add, types.FunctionType):
                setattr(base, name or add.__name__, add)
                return base

            # If it's a property
            if isinstance(add, property):
                setattr(base, name or add.fget.__name__, add)
                return base

            # If add is a class, add its methods to base
            for key, value in add.__dict__.items():
                if key.startswith("__"):
                    continue
                setattr(base, key, value)
                return base

        return extender

    @staticmethod
    def have_import(module: str, *, load: bool=False) -> bool:
        """Check if a module has been imported"""
        if load:
            try:
                __import__(module)
                return True
            except ImportError:
                return False
        return sys.modules.get(module, False)

    @staticmethod
    def relaunch(*, safe: int=3, echo: bool=True) -> subprocess.CompletedProcess:
        """
        Relaunch the current process with the same arguments up until a `safe` recursion
        """
        key = "BROKEN_RELAUNCH_INDEX"

        # Get and assert safe recursion level
        if (level := int(os.environ.get(key, 0))) >= safe:
            log.error(f"Reached maximum relaunch recursion depth ({safe}), aborting")
            exit(1)

        # Increment relaunch index
        os.environ[key] = str(level + 1)

        log.info(f"Relaunching self process with arguments ({sys.argv})", echo=echo)
        return shell(sys.executable, sys.argv, echo=echo)

    # Todo: Move this to a proper class
    PRECISE_SLEEP_AHEAD_SECONDS     = (5e-3 if BrokenPlatform.OnWindows else 5e-4)
    PRECISE_SLEEP_LATE_MULTIPLIER   = 4
    PRECISE_SLEEP_INTERPOLATE_RATIO = 0.05
    PRECISE_SLEEP_MAX_AHEAD_SECONDS = 0.1

    @staticmethod
    def precise_sleep(seconds: float) -> None:
        """
        Sleep for a precise amount of time. This function is very interesting for some reasons:
        - time.sleep() obviously uses syscalls, and it's up to the OS to implement it
        - This is usually done by scheduling the thread to wake up after the time has passed
        - On Windows, this precision was 15ms (terrible), and on Python 3.11+ was optimized to 100ns
        - On Unix, the precision is great with nanosleep() or usleep() (1ms or less)

        So, in practice, the best ever precision time sleep function would be:
        ```python
        while (now - start) < wait:
            now = time.perf_counter()
        ```

        As evident, this spins the thread full time due the .perf_counter() and conditional, which
        is not wanted on a sleep function (to use 100% of a thread)

        Taking advantage of the fact that time.sleep() is usually precise enough, and always will
        overshoot the time, we can sleep close to the time and apply the previous spinning method
        to achieve a very precise sleep, with a low enough overhead, relatively speaking.

        Args:
            seconds: Precise time to sleep

        Returns:
            None
        """
        if seconds < 0:
            return

        # Sleep close to the due time
        ahead = max(0, seconds - BrokenUtils.PRECISE_SLEEP_AHEAD_SECONDS)
        start = time.perf_counter()
        time.sleep(max(0, ahead))

        # How much error the last sleep() just did
        late = ((time.perf_counter() - start) - ahead) * BrokenUtils.PRECISE_SLEEP_LATE_MULTIPLIER

        # Adjust future sleeps based on the error
        BrokenUtils.PRECISE_SLEEP_AHEAD_SECONDS += BrokenUtils.PRECISE_SLEEP_INTERPOLATE_RATIO * \
            (late - BrokenUtils.PRECISE_SLEEP_AHEAD_SECONDS)

        # Clamp the ahead time: 0 <= ahead <= max
        BrokenUtils.PRECISE_SLEEP_AHEAD_SECONDS = min(
            max(0, BrokenUtils.PRECISE_SLEEP_AHEAD_SECONDS),
            BrokenUtils.PRECISE_SLEEP_MAX_AHEAD_SECONDS
        )

        # Spin the thread until the time is up (precise Sleep)
        while (time.perf_counter() - start) < seconds:
            pass

    @staticmethod
    def locals(level: int=1, self: bool=True) -> dict:
        locals = inspect.currentframe()

        # Keep getting the previous's frame
        for _ in range(level):
            locals = locals.f_back

        # Get the locals
        locals = locals.f_locals

        # Remove self from locals
        if self:
            locals.pop("self", None)

        return locals

    @staticmethod
    def round(number: Number, multiple: Number, *, type=int, operation: Callable=round) -> Number:
        return type(multiple * operation(number/multiple))

    @staticmethod
    def round_resolution(width: Number, height: Number) -> Tuple[int, int]:
        """FFmpeg likes multiples of 2, so let it be"""
        return (
            BrokenUtils.round(width,  multiple=2, type=int),
            BrokenUtils.round(height, multiple=2, type=int)
        )

    @staticmethod
    def rms_stdlib(data) -> float:
        return math.sqrt(sum(x**2 for x in data) / len(data))

    @staticmethod
    def rms(data) -> float:
        import numpy
        return numpy.sqrt(numpy.mean(numpy.square(data)))

truthy(items: List[Any]) -> List[Any] staticmethod

Transforms a list into a truthy-only values list, removing all falsy values

Source code in Broken/Base.py
Python
860
861
862
863
@staticmethod
def truthy(items: List[Any]) -> List[Any]:
    """Transforms a list into a truthy-only values list, removing all falsy values"""
    return [item for item in flatten(items) if item]

get_free_tcp_port() staticmethod

Source code in Broken/Base.py
Python
865
866
867
868
869
870
871
872
@staticmethod
def get_free_tcp_port():
    import socket
    temp_socket = socket.socket()
    temp_socket.bind(('', 0))
    port = temp_socket.getsockname()[1]
    temp_socket.close()
    return port

Fuzzy search a string in a list of strings, returns a list of matches

Source code in Broken/Base.py
Python
874
875
876
877
878
879
880
881
882
883
@staticmethod
def fuzzy_string_search(string: str, choices: List[str], many: int=1, minimum_score: int=0) -> list[tuple[str, int]]:
    """Fuzzy search a string in a list of strings, returns a list of matches"""
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore")
        import thefuzz.process
        result = thefuzz.process.extract(string, choices, limit=many)
        if many == 1:
            return result[0]
        return result

recurse(function: callable, **variables) -> Any staticmethod

Calls some function with the previous scope locals() updated by variables

Note: Not a fast method, consider using for convenience only

Use case are functions that are called recursively and need to be called with the same arguments

Python
1
2
3
4
def function(with, many, arguments, and, only=3, one=True, modification="Nice"):
    ...
    if recurse_condition:
        BrokenUtils.recurse(function, many=True)
Source code in Broken/Base.py
Python
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
@staticmethod
def recurse(function: callable, **variables) -> Any:
    """
    Calls some function with the previous scope locals() updated by variables
    # Note: Not a fast method, consider using for convenience only

    Use case are functions that are called recursively and need to be called with the same arguments

    ```python
    def function(with, many, arguments, and, only=3, one=True, modification="Nice"):
        ...
        if recurse_condition:
            BrokenUtils.recurse(function, many=True)
    ```
    """

    # Get the previous scope locals() and update it with the variables
    previous_locals = inspect.currentframe().f_back.f_locals
    previous_locals.update(variables)

    # Filter out variables that are not in the function arguments
    previous_locals = {
        k: v for k, v in previous_locals.items()
        if k in inspect.getfullargspec(function).args
        if k != "self"
    }

    # Call and return the same function
    return function(**previous_locals)

sublist_in_list(sublist: List[Any], list: List[Any]) -> bool staticmethod

Check if a sublist is in a list

Source code in Broken/Base.py
Python
915
916
917
918
@staticmethod
def sublist_in_list(sublist: List[Any], list: List[Any]) -> bool:
    """Check if a sublist is in a list"""
    return all(item in list for item in sublist)

extend(base: type, name: str = None, as_property: bool = False) -> type staticmethod

Extend a class with another class's methods or a method directly.

Usage:

Decorator of the class or method, class to extend as argument

@BrokenUtils.extend(BaseClass) class ExtendedClass: def method(self): ...

@BrokenUtils.extend(BaseClass) def method(self): ...

@BrokenUtils.extend(BaseClass, as_property=True) def method(self): ...

Source code in Broken/Base.py
Python
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
@staticmethod
def extend(base: type, name: str=None, as_property: bool=False) -> type:
    """
    Extend a class with another class's methods or a method directly.

    # Usage:
    Decorator of the class or method, class to extend as argument

    @BrokenUtils.extend(BaseClass)
    class ExtendedClass:
        def method(self):
            ...

    @BrokenUtils.extend(BaseClass)
    def method(self):
        ...

    @BrokenUtils.extend(BaseClass, as_property=True)
    def method(self):
        ...
    """
    def extender(add: type):

        # Extend as property
        if as_property:
            return BrokenUtils.extend(base, name=name, as_property=False)(property(add))

        # If add is a method
        if isinstance(add, types.FunctionType):
            setattr(base, name or add.__name__, add)
            return base

        # If it's a property
        if isinstance(add, property):
            setattr(base, name or add.fget.__name__, add)
            return base

        # If add is a class, add its methods to base
        for key, value in add.__dict__.items():
            if key.startswith("__"):
                continue
            setattr(base, key, value)
            return base

    return extender

have_import(module: str, *, load: bool = False) -> bool staticmethod

Check if a module has been imported

Source code in Broken/Base.py
Python
966
967
968
969
970
971
972
973
974
975
@staticmethod
def have_import(module: str, *, load: bool=False) -> bool:
    """Check if a module has been imported"""
    if load:
        try:
            __import__(module)
            return True
        except ImportError:
            return False
    return sys.modules.get(module, False)

relaunch(*, safe: int = 3, echo: bool = True) -> subprocess.CompletedProcess staticmethod

Relaunch the current process with the same arguments up until a safe recursion

Source code in Broken/Base.py
Python
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
@staticmethod
def relaunch(*, safe: int=3, echo: bool=True) -> subprocess.CompletedProcess:
    """
    Relaunch the current process with the same arguments up until a `safe` recursion
    """
    key = "BROKEN_RELAUNCH_INDEX"

    # Get and assert safe recursion level
    if (level := int(os.environ.get(key, 0))) >= safe:
        log.error(f"Reached maximum relaunch recursion depth ({safe}), aborting")
        exit(1)

    # Increment relaunch index
    os.environ[key] = str(level + 1)

    log.info(f"Relaunching self process with arguments ({sys.argv})", echo=echo)
    return shell(sys.executable, sys.argv, echo=echo)

PRECISE_SLEEP_AHEAD_SECONDS = 0.005 if BrokenPlatform.OnWindows else 0.0005 class-attribute instance-attribute

PRECISE_SLEEP_LATE_MULTIPLIER = 4 class-attribute instance-attribute

PRECISE_SLEEP_INTERPOLATE_RATIO = 0.05 class-attribute instance-attribute

PRECISE_SLEEP_MAX_AHEAD_SECONDS = 0.1 class-attribute instance-attribute

precise_sleep(seconds: float) -> None staticmethod

Sleep for a precise amount of time. This function is very interesting for some reasons: - time.sleep() obviously uses syscalls, and it's up to the OS to implement it - This is usually done by scheduling the thread to wake up after the time has passed - On Windows, this precision was 15ms (terrible), and on Python 3.11+ was optimized to 100ns - On Unix, the precision is great with nanosleep() or usleep() (1ms or less)

So, in practice, the best ever precision time sleep function would be:

Python
1
2
while (now - start) < wait:
    now = time.perf_counter()

As evident, this spins the thread full time due the .perf_counter() and conditional, which is not wanted on a sleep function (to use 100% of a thread)

Taking advantage of the fact that time.sleep() is usually precise enough, and always will overshoot the time, we can sleep close to the time and apply the previous spinning method to achieve a very precise sleep, with a low enough overhead, relatively speaking.

Parameters:

  • seconds (float) –

    Precise time to sleep

Returns:

  • None

    None

Source code in Broken/Base.py
Python
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
@staticmethod
def precise_sleep(seconds: float) -> None:
    """
    Sleep for a precise amount of time. This function is very interesting for some reasons:
    - time.sleep() obviously uses syscalls, and it's up to the OS to implement it
    - This is usually done by scheduling the thread to wake up after the time has passed
    - On Windows, this precision was 15ms (terrible), and on Python 3.11+ was optimized to 100ns
    - On Unix, the precision is great with nanosleep() or usleep() (1ms or less)

    So, in practice, the best ever precision time sleep function would be:
    ```python
    while (now - start) < wait:
        now = time.perf_counter()
    ```

    As evident, this spins the thread full time due the .perf_counter() and conditional, which
    is not wanted on a sleep function (to use 100% of a thread)

    Taking advantage of the fact that time.sleep() is usually precise enough, and always will
    overshoot the time, we can sleep close to the time and apply the previous spinning method
    to achieve a very precise sleep, with a low enough overhead, relatively speaking.

    Args:
        seconds: Precise time to sleep

    Returns:
        None
    """
    if seconds < 0:
        return

    # Sleep close to the due time
    ahead = max(0, seconds - BrokenUtils.PRECISE_SLEEP_AHEAD_SECONDS)
    start = time.perf_counter()
    time.sleep(max(0, ahead))

    # How much error the last sleep() just did
    late = ((time.perf_counter() - start) - ahead) * BrokenUtils.PRECISE_SLEEP_LATE_MULTIPLIER

    # Adjust future sleeps based on the error
    BrokenUtils.PRECISE_SLEEP_AHEAD_SECONDS += BrokenUtils.PRECISE_SLEEP_INTERPOLATE_RATIO * \
        (late - BrokenUtils.PRECISE_SLEEP_AHEAD_SECONDS)

    # Clamp the ahead time: 0 <= ahead <= max
    BrokenUtils.PRECISE_SLEEP_AHEAD_SECONDS = min(
        max(0, BrokenUtils.PRECISE_SLEEP_AHEAD_SECONDS),
        BrokenUtils.PRECISE_SLEEP_MAX_AHEAD_SECONDS
    )

    # Spin the thread until the time is up (precise Sleep)
    while (time.perf_counter() - start) < seconds:
        pass

locals(level: int = 1, self: bool = True) -> dict staticmethod

Source code in Broken/Base.py
Python
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
@staticmethod
def locals(level: int=1, self: bool=True) -> dict:
    locals = inspect.currentframe()

    # Keep getting the previous's frame
    for _ in range(level):
        locals = locals.f_back

    # Get the locals
    locals = locals.f_locals

    # Remove self from locals
    if self:
        locals.pop("self", None)

    return locals

round(number: Number, multiple: Number, *, type=int, operation: Callable = round) -> Number staticmethod

Source code in Broken/Base.py
Python
1071
1072
1073
@staticmethod
def round(number: Number, multiple: Number, *, type=int, operation: Callable=round) -> Number:
    return type(multiple * operation(number/multiple))

round_resolution(width: Number, height: Number) -> Tuple[int, int] staticmethod

FFmpeg likes multiples of 2, so let it be

Source code in Broken/Base.py
Python
1075
1076
1077
1078
1079
1080
1081
@staticmethod
def round_resolution(width: Number, height: Number) -> Tuple[int, int]:
    """FFmpeg likes multiples of 2, so let it be"""
    return (
        BrokenUtils.round(width,  multiple=2, type=int),
        BrokenUtils.round(height, multiple=2, type=int)
    )

rms_stdlib(data) -> float staticmethod

Source code in Broken/Base.py
Python
1083
1084
1085
@staticmethod
def rms_stdlib(data) -> float:
    return math.sqrt(sum(x**2 for x in data) / len(data))

rms(data) -> float staticmethod

Source code in Broken/Base.py
Python
1087
1088
1089
1090
@staticmethod
def rms(data) -> float:
    import numpy
    return numpy.sqrt(numpy.mean(numpy.square(data)))

Base.BrokenThreadPool

Source code in Broken/Base.py
Python
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
@define
class BrokenThreadPool:
    threads: List[Thread] = []
    max:     int          = 1

    @property
    def alive(self) -> List[Thread]:
        return [thread for thread in self.threads if thread.is_alive()]

    @property
    def n_alive(self) -> int:
        return len(self.alive)

    def sanitize(self) -> None:
        self.threads = self.alive

    def append(self, thread: Thread, wait: float=0.01) -> Thread:
        while self.n_alive >= self.max:
            time.sleep(wait)
        self.sanitize()
        self.threads.append(thread)
        return thread

    def join(self) -> None:
        for thread in self.threads:
            thread.join()

threads: List[Thread] = [] class-attribute instance-attribute

max: int = 1 class-attribute instance-attribute

alive: List[Thread] property

n_alive: int property

sanitize() -> None

Source code in Broken/Base.py
Python
1107
1108
def sanitize(self) -> None:
    self.threads = self.alive

append(thread: Thread, wait: float = 0.01) -> Thread

Source code in Broken/Base.py
Python
1110
1111
1112
1113
1114
1115
def append(self, thread: Thread, wait: float=0.01) -> Thread:
    while self.n_alive >= self.max:
        time.sleep(wait)
    self.sanitize()
    self.threads.append(thread)
    return thread

join() -> None

Source code in Broken/Base.py
Python
1117
1118
1119
def join(self) -> None:
    for thread in self.threads:
        thread.join()

Base.BrokenThread

Source code in Broken/Base.py
Python
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
@define
class BrokenThread:
    pools = {}

    def __new__(cls, *args, **kwargs) -> Thread:
        return cls.new(*args, **kwargs)

    @staticmethod
    def pool(name: str) -> BrokenThreadPool:
        return BrokenThread.pools.setdefault(name, BrokenThreadPool())

    @staticmethod
    def join_all_pools() -> None:
        for pool in BrokenThread.pools.values():
            pool.join()

    @staticmethod
    def new(
        target: Callable,
        *args,
        start: bool=True,
        join: bool=False,
        loop: bool=False,
        pool: str=None,
        max: int=1,
        daemon: bool=False,
        locals: bool=False,
        self: bool=False,
        callback: Callable=None,
        **kwargs
    ) -> Thread:
        """
        Create a thread on a callable, yeet whatever you think it works
        • Support for a basic Thread Pool, why no native way?

        Args:
            target: The function to call, consider using functools.partial or this kwargs
            args:   Arguments to pass to the function (positional, unnamed)
            kwargs: Keyword arguments to pass to the function
            start:  Start the thread immediately after creation
            join:   Wait for the thread to finish after creation
            loop:   Wrap the target callable in a loop
            pool:   Name of the pool to append the thread to, see BrokenThreadPool
            max:    Maximum threads in the pool
            daemon: When the main thread exits, daemon threads are also terminated

        Advanced:
            locals:   Whether to pass the current scope locals to the callable or not
            self:     Include "self" in the locals if locals=True
            callback: Function to call after the thread finishes

        Returns:
            The created Thread object
        """

        # Update kwargs with locals
        if locals: kwargs.update(BrokenUtils.locals(level=2, self=self))
        the_target = target

        # Wrap the callback in a loop
        @functools.wraps(target)
        def looped(*args, **kwargs):
            while True:
                target(*args, **kwargs)
        the_target = (looped if loop else the_target)

        # Wrap the target in a callback
        @functools.wraps(target)
        def callbacked(*args, **kwargs):
            target(*args, **kwargs)
            if callback is not None:
                callback()
        the_target = (callbacked if callback else the_target)

        # Create Thread object
        parallel = Thread(
            target=the_target,
            daemon=daemon,
            args=args,
            kwargs=kwargs
        )

        # Maybe wait for the pool to be free
        if pool and (pool := BrokenThread.pools.setdefault(pool, BrokenThreadPool())):
            pool.max = max
            pool.append(parallel)
        if start:
            parallel.start()
        if join and start:
            parallel.join()
        return parallel

pools = {} class-attribute instance-attribute

__new__(*args, **kwargs) -> Thread

Source code in Broken/Base.py
Python
1125
1126
def __new__(cls, *args, **kwargs) -> Thread:
    return cls.new(*args, **kwargs)

pool(name: str) -> BrokenThreadPool staticmethod

Source code in Broken/Base.py
Python
1128
1129
1130
@staticmethod
def pool(name: str) -> BrokenThreadPool:
    return BrokenThread.pools.setdefault(name, BrokenThreadPool())

join_all_pools() -> None staticmethod

Source code in Broken/Base.py
Python
1132
1133
1134
1135
@staticmethod
def join_all_pools() -> None:
    for pool in BrokenThread.pools.values():
        pool.join()

new(target: Callable, *args, start: bool = True, join: bool = False, loop: bool = False, pool: str = None, max: int = 1, daemon: bool = False, locals: bool = False, self: bool = False, callback: Callable = None, **kwargs) -> Thread staticmethod

Create a thread on a callable, yeet whatever you think it works • Support for a basic Thread Pool, why no native way?

Parameters:

  • target (Callable) –

    The function to call, consider using functools.partial or this kwargs

  • args

    Arguments to pass to the function (positional, unnamed)

  • kwargs

    Keyword arguments to pass to the function

  • start (bool, default: True ) –

    Start the thread immediately after creation

  • join (bool, default: False ) –

    Wait for the thread to finish after creation

  • loop (bool, default: False ) –

    Wrap the target callable in a loop

  • pool (str, default: None ) –

    Name of the pool to append the thread to, see BrokenThreadPool

  • max (int, default: 1 ) –

    Maximum threads in the pool

  • daemon (bool, default: False ) –

    When the main thread exits, daemon threads are also terminated

Advanced

locals: Whether to pass the current scope locals to the callable or not self: Include "self" in the locals if locals=True callback: Function to call after the thread finishes

Returns:

  • Thread

    The created Thread object

Source code in Broken/Base.py
Python
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
@staticmethod
def new(
    target: Callable,
    *args,
    start: bool=True,
    join: bool=False,
    loop: bool=False,
    pool: str=None,
    max: int=1,
    daemon: bool=False,
    locals: bool=False,
    self: bool=False,
    callback: Callable=None,
    **kwargs
) -> Thread:
    """
    Create a thread on a callable, yeet whatever you think it works
    • Support for a basic Thread Pool, why no native way?

    Args:
        target: The function to call, consider using functools.partial or this kwargs
        args:   Arguments to pass to the function (positional, unnamed)
        kwargs: Keyword arguments to pass to the function
        start:  Start the thread immediately after creation
        join:   Wait for the thread to finish after creation
        loop:   Wrap the target callable in a loop
        pool:   Name of the pool to append the thread to, see BrokenThreadPool
        max:    Maximum threads in the pool
        daemon: When the main thread exits, daemon threads are also terminated

    Advanced:
        locals:   Whether to pass the current scope locals to the callable or not
        self:     Include "self" in the locals if locals=True
        callback: Function to call after the thread finishes

    Returns:
        The created Thread object
    """

    # Update kwargs with locals
    if locals: kwargs.update(BrokenUtils.locals(level=2, self=self))
    the_target = target

    # Wrap the callback in a loop
    @functools.wraps(target)
    def looped(*args, **kwargs):
        while True:
            target(*args, **kwargs)
    the_target = (looped if loop else the_target)

    # Wrap the target in a callback
    @functools.wraps(target)
    def callbacked(*args, **kwargs):
        target(*args, **kwargs)
        if callback is not None:
            callback()
    the_target = (callbacked if callback else the_target)

    # Create Thread object
    parallel = Thread(
        target=the_target,
        daemon=daemon,
        args=args,
        kwargs=kwargs
    )

    # Maybe wait for the pool to be free
    if pool and (pool := BrokenThread.pools.setdefault(pool, BrokenThreadPool())):
        pool.max = max
        pool.append(parallel)
    if start:
        parallel.start()
    if join and start:
        parallel.join()
    return parallel

Base.BrokenEventClient

Client configuration for BrokenEventLoop

Function:

  • callback: Function callable to call every synchronization
  • args: Arguments to pass to callback
  • kwargs: Keyword arguments to pass to callback
  • output: Output of callback (returned value)
  • context: Context to use when calling callback (with statement)
  • lock: Lock to use when calling callback (with statement)
  • enabled: Whether to enable this client or not
  • once: Whether to call this client only once or not

Synchronization

  • frequency: Frequency of callback calls
  • frameskip: Constant deltatime mode (False) or real deltatime mode (True)
  • decoupled: "Rendering" mode, do not sleep on real time, implies frameskip False

Timing:

  • next_call: Next time to call callback (initializes $now+next_call, value in now() seconds)
  • last_call: Last time callback was called (initializes $now+last_call, value in now() seconds)
  • started: Time when client was started (initializes $now+started, value in now() seconds)
  • time: Whether to pass time (time since first call) to callback
  • dt: Whether to pass dt (time since last call) to callback
Source code in Broken/Base.py
Python
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
@define
class BrokenEventClient:
    """
    Client configuration for BrokenEventLoop

    # Function:
    - callback:   Function callable to call every synchronization
    - args:       Arguments to pass to callback
    - kwargs:     Keyword arguments to pass to callback
    - output:     Output of callback (returned value)
    - context:    Context to use when calling callback (with statement)
    - lock:       Lock to use when calling callback (with statement)
    - enabled:    Whether to enable this client or not
    - once:       Whether to call this client only once or not

    # Synchronization
    - frequency:  Frequency of callback calls
    - frameskip:  Constant deltatime mode (False) or real deltatime mode (True)
    - decoupled:  "Rendering" mode, do not sleep on real time, implies frameskip False

    # Timing:
    - next_call:  Next time to call callback (initializes $now+next_call, value in now() seconds)
    - last_call:  Last time callback was called (initializes $now+last_call, value in now() seconds)
    - started:    Time when client was started (initializes $now+started, value in now() seconds)
    - time:       Whether to pass time (time since first call) to callback
    - dt:         Whether to pass dt (time since last call) to callback
    """

    # Callback
    callback: callable       = None
    args:     List[Any]      = field(factory=list, repr=False)
    kwargs:   Dict[str, Any] = field(factory=dict, repr=False)
    output:   Any            = field(default=None, repr=False)
    context:  Any            = None
    lock:     Lock           = None
    enabled:  bool           = True
    once:     bool           = False

    # Synchronization
    frequency:  Hertz = 60.0
    frameskip:  bool  = True
    decoupled:  bool  = False
    precise:    bool  = False

    # Timing
    started:   Seconds = Factory(lambda: time.bang_counter())
    next_call: Seconds = None
    last_call: Seconds = None
    _time:     bool    = False
    _dt:       bool    = False

    def __attrs_post_init__(self):
        signature = inspect.signature(self.callback)
        self._dt   = ("dt"   in signature.parameters)
        self._time = ("time" in signature.parameters)

        # Assign idealistic values for decoupled
        if self.decoupled: self.started = BIG_BANG
        self.last_call = (self.last_call or self.started)
        self.next_call = (self.next_call or self.started)

        # Note: We could use numpy.float128 for the most frametime precision on the above..
        #       .. But the Client code is smart enough to auto adjust itself to sync

    # # Useful properties

    @property
    def fps(self) -> Hertz:
        return self.frequency

    @fps.setter
    def fps(self, value: Hertz):
        self.frequency = value

    @property
    def period(self) -> Seconds:
        return 1/self.frequency

    @period.setter
    def period(self, value: Seconds):
        self.frequency = 1/value

    @property
    def should_delete(self) -> bool:
        return self.once and (not self.enabled)

    # # Sorting

    def __lt__(self, other: Self) -> bool:
        return self.next_call < other.next_call

    def __gt__(self, other: Self) -> bool:
        return self.next_call > other.next_call

    # # Implementation

    def next(self, block: bool=True) -> None | Any:

        # Time to wait for next call if block
        # - Next call at 110 seconds, now=100, wait=10
        # - Positive means to wait, negative we are behind
        wait = (self.next_call - time.bang_counter())

        if self.decoupled:
            pass
        elif block:
            if self.precise:
                BrokenUtils.precise_sleep(wait)
            else:
                time.sleep(max(0, wait))
        elif wait > 0:
            return None

        # The assumed instant the code below will run instantly
        now = self.next_call if self.decoupled else time.bang_counter()
        if self._dt:   self.kwargs["dt"]   = (now - self.last_call)
        if self._time: self.kwargs["time"] = (now - self.started)

        # Enter or not the given context, call callback with args and kwargs
        with (self.lock or contextlib.nullcontext()):
            with (self.context or contextlib.nullcontext()):
                self.output = self.callback(*self.args, **self.kwargs)

        # Fixme: This is a better way to do it, but on decoupled it's not "dt perfect"
        # self.next_call = self.period * (math.floor(now/self.period) + 1)

        # Update future and past states
        self.last_call = now
        while self.next_call <= now:
            self.next_call += self.period

        # (Disabled && Once) clients gets deleted
        self.enabled = not self.once

        return self

callback: callable = None class-attribute instance-attribute

args: List[Any] = field(factory=list, repr=False) class-attribute instance-attribute

kwargs: Dict[str, Any] = field(factory=dict, repr=False) class-attribute instance-attribute

output: Any = field(default=None, repr=False) class-attribute instance-attribute

context: Any = None class-attribute instance-attribute

lock: Lock = None class-attribute instance-attribute

enabled: bool = True class-attribute instance-attribute

once: bool = False class-attribute instance-attribute

frequency: Hertz = 60.0 class-attribute instance-attribute

frameskip: bool = True class-attribute instance-attribute

decoupled: bool = False class-attribute instance-attribute

precise: bool = False class-attribute instance-attribute

started: Seconds = Factory(lambda: time.bang_counter()) class-attribute instance-attribute

next_call: Seconds = None class-attribute instance-attribute

last_call: Seconds = None class-attribute instance-attribute

__attrs_post_init__()

Source code in Broken/Base.py
Python
1266
1267
1268
1269
1270
1271
1272
1273
1274
def __attrs_post_init__(self):
    signature = inspect.signature(self.callback)
    self._dt   = ("dt"   in signature.parameters)
    self._time = ("time" in signature.parameters)

    # Assign idealistic values for decoupled
    if self.decoupled: self.started = BIG_BANG
    self.last_call = (self.last_call or self.started)
    self.next_call = (self.next_call or self.started)

fps: Hertz property writable

period: Seconds property writable

should_delete: bool property

__lt__(other: Self) -> bool

Source code in Broken/Base.py
Python
1303
1304
def __lt__(self, other: Self) -> bool:
    return self.next_call < other.next_call

__gt__(other: Self) -> bool

Source code in Broken/Base.py
Python
1306
1307
def __gt__(self, other: Self) -> bool:
    return self.next_call > other.next_call

next(block: bool = True) -> None | Any

Source code in Broken/Base.py
Python
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
def next(self, block: bool=True) -> None | Any:

    # Time to wait for next call if block
    # - Next call at 110 seconds, now=100, wait=10
    # - Positive means to wait, negative we are behind
    wait = (self.next_call - time.bang_counter())

    if self.decoupled:
        pass
    elif block:
        if self.precise:
            BrokenUtils.precise_sleep(wait)
        else:
            time.sleep(max(0, wait))
    elif wait > 0:
        return None

    # The assumed instant the code below will run instantly
    now = self.next_call if self.decoupled else time.bang_counter()
    if self._dt:   self.kwargs["dt"]   = (now - self.last_call)
    if self._time: self.kwargs["time"] = (now - self.started)

    # Enter or not the given context, call callback with args and kwargs
    with (self.lock or contextlib.nullcontext()):
        with (self.context or contextlib.nullcontext()):
            self.output = self.callback(*self.args, **self.kwargs)

    # Fixme: This is a better way to do it, but on decoupled it's not "dt perfect"
    # self.next_call = self.period * (math.floor(now/self.period) + 1)

    # Update future and past states
    self.last_call = now
    while self.next_call <= now:
        self.next_call += self.period

    # (Disabled && Once) clients gets deleted
    self.enabled = not self.once

    return self

Base.BrokenEventLoop

Source code in Broken/Base.py
Python
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
@define
class BrokenEventLoop:
    clients: List[BrokenEventClient] = Factory(list)
    thread:  Optional[Thread]        = None

    # # Management

    def add_client(self, client: BrokenEventClient) -> BrokenEventClient:
        """Adds a client to the manager with immediate next call"""
        self.clients.append(client)
        return client

    def get_client(self, name: str) -> Optional[BrokenEventClient]:
        """Gets a client by name"""
        return next((client for client in self.clients if client.name == name), None)

    # # Creation

    def new(self, *a, **k) -> BrokenEventClient:
        """Wraps around BrokenVsync for convenience"""
        return self.add_client(BrokenEventClient(*a, **k))

    def once(self, *a, **k) -> BrokenEventClient:
        """Wraps around BrokenVsync for convenience"""
        return self.add_client(BrokenEventClient(*a, **k, once=True))

    def partial(self, callable: Callable, *a, **k) -> BrokenEventClient:
        """Wraps around BrokenVsync for convenience"""
        return self.once(callable=functools.partial(callable, *a, **k))

    # # Filtering

    @property
    def enabled_clients(self) -> Iterable[BrokenEventClient]:
        """Returns a list of enabled clients"""
        for client in self.clients:
            if client.enabled:
                yield client

    @property
    def next_client(self) -> BrokenEventClient | None:
        """Returns the next client to be called"""
        return min(self.enabled_clients)

    def __sanitize__(self) -> None:
        """Removes disabled 'once' clients"""
        length = len(self.clients)
        for i, client in enumerate(reversed(self.clients)):
            if client.should_delete:
                del self.clients[length - i - 1]

    # # Actions

    def next(self, block=True) -> None | Any:
        try:
            if (client := self.next_client):
                return client.next(block=block)
        finally:
            self.__sanitize__()

    def all_once(self) -> None:
        """Calls all 'once' clients. Useful for @partial calls on the main thread"""
        for client in self.clients:
            if client.once:
                client.next()
        self.__sanitize__()

    # # Block-free next

    __work__: bool = False

    def smart_next(self) -> None | Any:
        # Note: Proof of concept. The frametime Ticking might be enough for ShaderFlow

        # Too close to the next known call, call blocking
        if abs(time.bang_counter() - self.next_client.next_call) < 0.005:
            return self.next(block=True)

        # By chance any "recently added" client was added
        if (call := self.next(block=False)):
            self.__work__ = True
            return call

        # Next iteration, wait for work but don't spin lock
        if not self.__work__:
            time.sleep(0.001)

        # Flag that there was not work done
        self.__work__ = False

    # # Thread-wise wording

    __stop__: bool = False

    def __loop__(self):
        while not self.__stop__:
            self.next()

    def start_thread(self) -> None:
        self.thread = BrokenThread.new(self.next)

    def stop_thread(self):
        self.__stop__ = True
        self.thread.join()
        self.__stop__ = False

clients: List[BrokenEventClient] = Factory(list) class-attribute instance-attribute

thread: Optional[Thread] = None class-attribute instance-attribute

add_client(client: BrokenEventClient) -> BrokenEventClient

Adds a client to the manager with immediate next call

Source code in Broken/Base.py
Python
1358
1359
1360
1361
def add_client(self, client: BrokenEventClient) -> BrokenEventClient:
    """Adds a client to the manager with immediate next call"""
    self.clients.append(client)
    return client

get_client(name: str) -> Optional[BrokenEventClient]

Gets a client by name

Source code in Broken/Base.py
Python
1363
1364
1365
def get_client(self, name: str) -> Optional[BrokenEventClient]:
    """Gets a client by name"""
    return next((client for client in self.clients if client.name == name), None)

new(*a, **k) -> BrokenEventClient

Wraps around BrokenVsync for convenience

Source code in Broken/Base.py
Python
1369
1370
1371
def new(self, *a, **k) -> BrokenEventClient:
    """Wraps around BrokenVsync for convenience"""
    return self.add_client(BrokenEventClient(*a, **k))

once(*a, **k) -> BrokenEventClient

Wraps around BrokenVsync for convenience

Source code in Broken/Base.py
Python
1373
1374
1375
def once(self, *a, **k) -> BrokenEventClient:
    """Wraps around BrokenVsync for convenience"""
    return self.add_client(BrokenEventClient(*a, **k, once=True))

partial(callable: Callable, *a, **k) -> BrokenEventClient

Wraps around BrokenVsync for convenience

Source code in Broken/Base.py
Python
1377
1378
1379
def partial(self, callable: Callable, *a, **k) -> BrokenEventClient:
    """Wraps around BrokenVsync for convenience"""
    return self.once(callable=functools.partial(callable, *a, **k))

enabled_clients: Iterable[BrokenEventClient] property

Returns a list of enabled clients

next_client: BrokenEventClient | None property

Returns the next client to be called

__sanitize__() -> None

Removes disabled 'once' clients

Source code in Broken/Base.py
Python
1395
1396
1397
1398
1399
1400
def __sanitize__(self) -> None:
    """Removes disabled 'once' clients"""
    length = len(self.clients)
    for i, client in enumerate(reversed(self.clients)):
        if client.should_delete:
            del self.clients[length - i - 1]

next(block=True) -> None | Any

Source code in Broken/Base.py
Python
1404
1405
1406
1407
1408
1409
def next(self, block=True) -> None | Any:
    try:
        if (client := self.next_client):
            return client.next(block=block)
    finally:
        self.__sanitize__()

all_once() -> None

Calls all 'once' clients. Useful for @partial calls on the main thread

Source code in Broken/Base.py
Python
1411
1412
1413
1414
1415
1416
def all_once(self) -> None:
    """Calls all 'once' clients. Useful for @partial calls on the main thread"""
    for client in self.clients:
        if client.once:
            client.next()
    self.__sanitize__()

__work__: bool = False class-attribute instance-attribute

smart_next() -> None | Any

Source code in Broken/Base.py
Python
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
def smart_next(self) -> None | Any:
    # Note: Proof of concept. The frametime Ticking might be enough for ShaderFlow

    # Too close to the next known call, call blocking
    if abs(time.bang_counter() - self.next_client.next_call) < 0.005:
        return self.next(block=True)

    # By chance any "recently added" client was added
    if (call := self.next(block=False)):
        self.__work__ = True
        return call

    # Next iteration, wait for work but don't spin lock
    if not self.__work__:
        time.sleep(0.001)

    # Flag that there was not work done
    self.__work__ = False

__stop__: bool = False class-attribute instance-attribute

__loop__()

Source code in Broken/Base.py
Python
1445
1446
1447
def __loop__(self):
    while not self.__stop__:
        self.next()

start_thread() -> None

Source code in Broken/Base.py
Python
1449
1450
def start_thread(self) -> None:
    self.thread = BrokenThread.new(self.next)

stop_thread()

Source code in Broken/Base.py
Python
1452
1453
1454
1455
def stop_thread(self):
    self.__stop__ = True
    self.thread.join()
    self.__stop__ = False

Base.BrokenTyper

A wrap around Typer with goodies

• Why? Automation. • Stupid? Absolutely. • Useful? Maybe. • Fun? Yes. • Worth it? Probably not. • Will I use it? Yes.

Source code in Broken/Base.py
Python
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
@define
class BrokenTyper:
    """
    A wrap around Typer with goodies

    • Why? Automation.
    • Stupid? Absolutely.
    • Useful? Maybe.
    • Fun? Yes.
    • Worth it? Probably not.
    • Will I use it? Yes.
    """
    description: str       = ""
    app:         Typer     = None
    chain:       bool      = False
    commands:    List[str] = Factory(list)
    default:     str       = None
    help_option: bool      = False
    exit_hook:   Callable  = Factory(Ignore)
    __first__:   bool      = True
    epilog:      str       = (
        f"• Made with [red]:heart:[/red] by [green]Broken Source Software[/green] [yellow]v{Broken.VERSION}[/yellow]\n\n"
        "→ [italic grey53]Consider [blue][link=https://www.patreon.com/Tremeschin]Sponsoring[/link][/blue] my Open Source Work[/italic grey53]"
    )

    def __attrs_post_init__(self):
        self.app = Typer(
            help=self.description or "No help provided",
            add_help_option=self.help_option,
            pretty_exceptions_enable=False,
            no_args_is_help=True,
            add_completion=False,
            rich_markup_mode="rich",
            chain=self.chain,
            epilog=self.epilog,
        )

    __panel__: str = None

    @contextlib.contextmanager
    def panel(self, name: str) -> Generator[None, None, None]:
        try:
            self.__panel__ = name
            yield
        finally:
            self.__panel__ = None

    def command(self,
        callable: Callable,
        help: str=None,
        add_help_option: bool=True,
        name: str=None,
        context: bool=True,
        default: bool=False,
        panel: str=None,
        **kwargs,
    ):
        # Command must be implemented
        if getattr(callable, "__isabstractmethod__", False):
            return

        # Maybe get callable name
        name = name or callable.__name__

        # Create Typer command
        self.app.command(
            help=help or callable.__doc__ or None,
            add_help_option=add_help_option,
            name=name,
            rich_help_panel=panel or self.__panel__ ,
            context_settings=dict(
                allow_extra_args=True,
                ignore_unknown_options=True,
            ) if context else None,
            **kwargs,
        )(callable)

        # Add to known commands
        self.commands.append(name)

        # Set as default command
        self.default = name if default else self.default

    def __call__(self, *args, shell: bool=False):
        while True:
            args = flatten(args)

            # Insert default implied command
            first = (args[0] if (len(args) > 0) else None)
            if self.default and ((not args) or (first not in self.commands)):
                args.insert(0, self.default)

            # Update args to BrokenTyper
            if not self.__first__:
                args = shlex.split(input("\n:: BrokenShell (enter for help) $ "))
            self.__first__ = False

            try:
                self.app(args)
            except SystemExit:
                self.exit_hook()
            except KeyboardInterrupt:
                log.success("BrokenTyper stopped by user")
                self.exit_hook()
            except Exception as e:
                self.exit_hook()
                raise e

            # Don't continue on non BrokenShell mode
            if not shell:
                break

description: str = '' class-attribute instance-attribute

app: Typer = None class-attribute instance-attribute

chain: bool = False class-attribute instance-attribute

commands: List[str] = Factory(list) class-attribute instance-attribute

default: str = None class-attribute instance-attribute

help_option: bool = False class-attribute instance-attribute

exit_hook: Callable = Factory(Ignore) class-attribute instance-attribute

__first__: bool = True class-attribute instance-attribute

epilog: str = f'• Made with [red]:heart:[/red] by [green]Broken Source Software[/green] [yellow]v{Broken.VERSION}[/yellow] [italic grey53]Consider [blue][link=https://www.patreon.com/Tremeschin]Sponsoring[/link][/blue] my Open Source Work[/italic grey53]' class-attribute instance-attribute

__attrs_post_init__()

Source code in Broken/Base.py
Python
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
def __attrs_post_init__(self):
    self.app = Typer(
        help=self.description or "No help provided",
        add_help_option=self.help_option,
        pretty_exceptions_enable=False,
        no_args_is_help=True,
        add_completion=False,
        rich_markup_mode="rich",
        chain=self.chain,
        epilog=self.epilog,
    )

__panel__: str = None class-attribute instance-attribute

panel(name: str) -> Generator[None, None, None]

Source code in Broken/Base.py
Python
1498
1499
1500
1501
1502
1503
1504
@contextlib.contextmanager
def panel(self, name: str) -> Generator[None, None, None]:
    try:
        self.__panel__ = name
        yield
    finally:
        self.__panel__ = None

command(callable: Callable, help: str = None, add_help_option: bool = True, name: str = None, context: bool = True, default: bool = False, panel: str = None, **kwargs)

Source code in Broken/Base.py
Python
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
def command(self,
    callable: Callable,
    help: str=None,
    add_help_option: bool=True,
    name: str=None,
    context: bool=True,
    default: bool=False,
    panel: str=None,
    **kwargs,
):
    # Command must be implemented
    if getattr(callable, "__isabstractmethod__", False):
        return

    # Maybe get callable name
    name = name or callable.__name__

    # Create Typer command
    self.app.command(
        help=help or callable.__doc__ or None,
        add_help_option=add_help_option,
        name=name,
        rich_help_panel=panel or self.__panel__ ,
        context_settings=dict(
            allow_extra_args=True,
            ignore_unknown_options=True,
        ) if context else None,
        **kwargs,
    )(callable)

    # Add to known commands
    self.commands.append(name)

    # Set as default command
    self.default = name if default else self.default

__call__(*args, shell: bool = False)

Source code in Broken/Base.py
Python
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
def __call__(self, *args, shell: bool=False):
    while True:
        args = flatten(args)

        # Insert default implied command
        first = (args[0] if (len(args) > 0) else None)
        if self.default and ((not args) or (first not in self.commands)):
            args.insert(0, self.default)

        # Update args to BrokenTyper
        if not self.__first__:
            args = shlex.split(input("\n:: BrokenShell (enter for help) $ "))
        self.__first__ = False

        try:
            self.app(args)
        except SystemExit:
            self.exit_hook()
        except KeyboardInterrupt:
            log.success("BrokenTyper stopped by user")
            self.exit_hook()
        except Exception as e:
            self.exit_hook()
            raise e

        # Don't continue on non BrokenShell mode
        if not shell:
            break

Base.BrokenWatchdog

Source code in Broken/Base.py
Python
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
class BrokenWatchdog(ABC):

    @abstractmethod
    def __changed__(self, key, value) -> None:
        """Called when a property changes"""
        ...

    def __setattr__(self, key, value):
        """Calls __changed__ when a property changes"""
        super().__setattr__(key, value)
        self.__changed__(key, value)

__changed__(key, value) -> None abstractmethod

Called when a property changes

Source code in Broken/Base.py
Python
1575
1576
1577
1578
@abstractmethod
def __changed__(self, key, value) -> None:
    """Called when a property changes"""
    ...

__setattr__(key, value)

Calls changed when a property changes

Source code in Broken/Base.py
Python
1580
1581
1582
1583
def __setattr__(self, key, value):
    """Calls __changed__ when a property changes"""
    super().__setattr__(key, value)
    self.__changed__(key, value)

Base.BrokenSingleton

Source code in Broken/Base.py
Python
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
class BrokenSingleton(ABC):
    def __new__(cls, *args, **kwargs):
        if not hasattr(cls, "__instance__"):
            cls.__instance__ = super().__new__(cls)
            cls.__singleton__(*args, **kwargs)
        return cls.__instance__

    @abstractmethod
    def __singleton__(self, *args, **kwargs):
        """__init__ but for the singleton"""
        ...

__new__(*args, **kwargs)

Source code in Broken/Base.py
Python
1588
1589
1590
1591
1592
def __new__(cls, *args, **kwargs):
    if not hasattr(cls, "__instance__"):
        cls.__instance__ = super().__new__(cls)
        cls.__singleton__(*args, **kwargs)
    return cls.__instance__

__singleton__(*args, **kwargs) abstractmethod

init but for the singleton

Source code in Broken/Base.py
Python
1594
1595
1596
1597
@abstractmethod
def __singleton__(self, *args, **kwargs):
    """__init__ but for the singleton"""
    ...

Base.BrokenFluentBuilder

Do you ever feel like using a builder-like fluent syntax for changing attributes of an object?

Source code in Broken/Base.py
Python
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
class BrokenFluentBuilder:
    """
    Do you ever feel like using a builder-like fluent syntax for changing attributes of an object?
    """
    def __call__(self, **kwargs) -> Self:
        """Updates the instance with the provided kwargs"""
        for key, value in kwargs.items():
            setattr(self, key, value)
        return self

    def copy(self, **kwargs) -> Self:
        """Returns a copy of this instance"""
        new = copy.deepcopy(self)
        for key, value in kwargs.items():
            setattr(new, key, value)
        return new

__call__(**kwargs) -> Self

Updates the instance with the provided kwargs

Source code in Broken/Base.py
Python
1605
1606
1607
1608
1609
def __call__(self, **kwargs) -> Self:
    """Updates the instance with the provided kwargs"""
    for key, value in kwargs.items():
        setattr(self, key, value)
    return self

copy(**kwargs) -> Self

Returns a copy of this instance

Source code in Broken/Base.py
Python
1611
1612
1613
1614
1615
1616
def copy(self, **kwargs) -> Self:
    """Returns a copy of this instance"""
    new = copy.deepcopy(self)
    for key, value in kwargs.items():
        setattr(new, key, value)
    return new

Base.BrokenRelay

A class to bind some callback to many callables.

Useful for ModernGL window function, eg pseudocode:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
window = moderngl_window(...)

# Create BrokenRelay instance
scroll_callbacks = BrokenRelay()

# Map window scroll func callback to this class
window.mouse_scroll_event_func = scroll_callbacks

# Define many callbacks that should be called on window resize
def log_scroll(x, y):
    ...

camera2d = Camera2D(...)

# Add callbacks
scroll_callbacks.bind(log_scroll, camera2d.resize)

# Or with @ syntax
scroll_callbacks @ (log_scroll, camera2d.resize)

# It also returns self when binding
self.window_mouse_scroll_event_func = scroll_callbacks @ (log_scroll, camera2d.resize)
Source code in Broken/Base.py
Python
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
@define
class BrokenRelay:
    """
    A class to bind some callback to many callables.

    Useful for ModernGL window function, eg pseudocode:

    ```python
    window = moderngl_window(...)

    # Create BrokenRelay instance
    scroll_callbacks = BrokenRelay()

    # Map window scroll func callback to this class
    window.mouse_scroll_event_func = scroll_callbacks

    # Define many callbacks that should be called on window resize
    def log_scroll(x, y):
        ...

    camera2d = Camera2D(...)

    # Add callbacks
    scroll_callbacks.bind(log_scroll, camera2d.resize)

    # Or with @ syntax
    scroll_callbacks @ (log_scroll, camera2d.resize)

    # It also returns self when binding
    self.window_mouse_scroll_event_func = scroll_callbacks @ (log_scroll, camera2d.resize)
    ```
    """
    callbacks: list[callable] = Factory(list)

    def __bind__(self, *callbacks: callable) -> Self:
        """Adds callbacks to the list of callables, runs on self.__call__"""
        self.callbacks += flatten(callbacks)
        return self

    def bind(self, *callbacks: callable) -> Self:
        """Adds callbacks to the list of callables, runs on self.__call__"""
        return self.__bind__(callbacks)

    def subscribe(self, *callbacks: callable) -> Self:
        """Adds callbacks to the list of callables, runs on self.__call__"""
        return self.__bind__(callbacks)

    def __matmul__(self, *callbacks: callable) -> Self:
        """Convenience syntax for binding"""
        return self.__bind__(callbacks)

    def __call__(self, *args, **kwargs):
        """Pass through all callbacks to who called "us" (self)"""
        for callback in self.callbacks:
            callback(*args, **kwargs)

callbacks: list[callable] = Factory(list) class-attribute instance-attribute

__bind__(*callbacks: callable) -> Self

Adds callbacks to the list of callables, runs on self.call

Source code in Broken/Base.py
Python
1654
1655
1656
1657
def __bind__(self, *callbacks: callable) -> Self:
    """Adds callbacks to the list of callables, runs on self.__call__"""
    self.callbacks += flatten(callbacks)
    return self

bind(*callbacks: callable) -> Self

Adds callbacks to the list of callables, runs on self.call

Source code in Broken/Base.py
Python
1659
1660
1661
def bind(self, *callbacks: callable) -> Self:
    """Adds callbacks to the list of callables, runs on self.__call__"""
    return self.__bind__(callbacks)

subscribe(*callbacks: callable) -> Self

Adds callbacks to the list of callables, runs on self.call

Source code in Broken/Base.py
Python
1663
1664
1665
def subscribe(self, *callbacks: callable) -> Self:
    """Adds callbacks to the list of callables, runs on self.__call__"""
    return self.__bind__(callbacks)

__matmul__(*callbacks: callable) -> Self

Convenience syntax for binding

Source code in Broken/Base.py
Python
1667
1668
1669
def __matmul__(self, *callbacks: callable) -> Self:
    """Convenience syntax for binding"""
    return self.__bind__(callbacks)

__call__(*args, **kwargs)

Pass through all callbacks to who called "us" (self)

Source code in Broken/Base.py
Python
1671
1672
1673
1674
def __call__(self, *args, **kwargs):
    """Pass through all callbacks to who called "us" (self)"""
    for callback in self.callbacks:
        callback(*args, **kwargs)

Base.BrokenProfilerEnum

List of profilers

Source code in Broken/Base.py
Python
1678
1679
1680
class BrokenProfilerEnum(BrokenEnum):
    """List of profilers"""
    cprofile      = "cprofile"

cprofile = 'cprofile' class-attribute instance-attribute

Base.BrokenProfiler

Source code in Broken/Base.py
Python
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
@define
class BrokenProfiler:
    name: str = "NONE"
    profiler: BrokenProfilerEnum = BrokenProfilerEnum.cprofile

    def __attrs_post_init__(self):
        profiler = os.environ.get(f"{self.name}_PROFILER", self.profiler)
        self.profiler = BrokenProfilerEnum.get(profiler)

    # Base properties

    @property
    def enabled(self) -> bool:
        return os.environ.get(f"{self.name}_PROFILE", "0") == "1"

    @property
    def output(self) -> Path:
        return Path(tempfile.gettempdir())/f"{self.name}.prof"

    # The actual profiler object
    __profiler__: Any = None

    def __enter__(self) -> Self:
        if not self.enabled:
            return self

        match self.profiler:
            case BrokenProfilerEnum.cprofile:
                log.trace("Profiling with cProfile")
                import cProfile
                self.__profiler__ = cProfile.Profile()
                self.__profiler__.enable()
                return self

    def __exit__(self, *args) -> None:
        if not self.enabled:
            return

        match self.profiler:
            case BrokenProfilerEnum.cprofile:
                log.trace("Finishing cProfile")
                output = self.output.with_suffix(".prof")
                self.__profiler__.disable()
                self.__profiler__.dump_stats(output)
                shell("snakeviz", output)
                return

name: str = 'NONE' class-attribute instance-attribute

profiler: BrokenProfilerEnum = BrokenProfilerEnum.cprofile class-attribute instance-attribute

__attrs_post_init__()

Source code in Broken/Base.py
Python
1689
1690
1691
def __attrs_post_init__(self):
    profiler = os.environ.get(f"{self.name}_PROFILER", self.profiler)
    self.profiler = BrokenProfilerEnum.get(profiler)

enabled: bool property

output: Path property

__profiler__: Any = None class-attribute instance-attribute

__enter__() -> Self

Source code in Broken/Base.py
Python
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
def __enter__(self) -> Self:
    if not self.enabled:
        return self

    match self.profiler:
        case BrokenProfilerEnum.cprofile:
            log.trace("Profiling with cProfile")
            import cProfile
            self.__profiler__ = cProfile.Profile()
            self.__profiler__.enable()
            return self

__exit__(*args) -> None

Source code in Broken/Base.py
Python
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
def __exit__(self, *args) -> None:
    if not self.enabled:
        return

    match self.profiler:
        case BrokenProfilerEnum.cprofile:
            log.trace("Finishing cProfile")
            output = self.output.with_suffix(".prof")
            self.__profiler__.disable()
            self.__profiler__.dump_stats(output)
            shell("snakeviz", output)
            return

Base.TorchFlavor

Source code in Broken/Base.py
Python
1733
1734
1735
1736
1737
class TorchFlavor(BrokenEnum):
    # BASE = ""
    CPU  = "cpu"
    CUDA = "cu121"
    ROCM = "rocm5.7"

CPU = 'cpu' class-attribute instance-attribute

CUDA = 'cu121' class-attribute instance-attribute

ROCM = 'rocm5.7' class-attribute instance-attribute

Base.BrokenTorch

The Bane of my Existence and the SSD Killer - Packaging PyTorch

Source code in Broken/Base.py
Python
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
class BrokenTorch:
    """
    The Bane of my Existence and the SSD Killer - Packaging PyTorch
    """

    flavor_file: str = "PyTorch.txt"
    """A relative path to a Project's Resources defining the PyTorch Flavor"""

    version: str = "2.2.1"
    """Version of Torch to install"""

    @staticmethod
    def manage(resources: Path):

        # Maybe install a PyTorch flavor
        if (pytorch := BrokenPath(resources/BrokenTorch.flavor_file, valid=True)):
            full = pytorch.read_text().strip()
            version, flavor = full.split("+")

            if not TorchFlavor.get(flavor):
                raise ValueError(f"Invalid PyTorch Flavor ({flavor})")

            # Try getting current installed flavor, if any, without loading torch
            site_packages = Path(__import__("site").getsitepackages()[0])
            if (torch_version := (site_packages/"torch"/"version.py")).exists():
                exec(torch_version.read_text(), namespace := {})
                current_flavor = namespace["__version__"].split("+")[1]
            else:
                current_flavor = None

            # If flavors mismatch, install the correct one
            if (current_flavor != flavor):
                log.info(f"Installing PyTorch Flavor ({full}), current is ({current_flavor})")
                PIP = (sys.executable, "-m", "pip")
                source_url = f"https://download.pytorch.org/whl/{flavor}"
                shell(PIP, "uninstall", "torch", "torchvision", "torchaudio", "-y")
                shell(PIP, "install", f"torch=={version}", "torchvision", "torchaudio", "--index-url", source_url)
            else:
                log.info(f"PyTorch Flavor ({full}) already installed")

    @staticmethod
    def write_flavor(resources: Path, flavor: Optional[TorchFlavor]) -> Optional[Path]:
        if not bool(flavor):
            return None
        flavor = (f"+{flavor.value}" * bool(flavor.value))
        file = (resources/BrokenTorch.flavor_file)
        file.write_text(BrokenTorch.version + flavor)
        return file

    @staticmethod
    def remove_flavor(resources: Path) -> None:
        for file in resources.rglob(BrokenTorch.flavor_file):
            BrokenPath.remove(file)

flavor_file: str = 'PyTorch.txt' class-attribute instance-attribute

A relative path to a Project's Resources defining the PyTorch Flavor

version: str = '2.2.1' class-attribute instance-attribute

Version of Torch to install

manage(resources: Path) staticmethod

Source code in Broken/Base.py
Python
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
@staticmethod
def manage(resources: Path):

    # Maybe install a PyTorch flavor
    if (pytorch := BrokenPath(resources/BrokenTorch.flavor_file, valid=True)):
        full = pytorch.read_text().strip()
        version, flavor = full.split("+")

        if not TorchFlavor.get(flavor):
            raise ValueError(f"Invalid PyTorch Flavor ({flavor})")

        # Try getting current installed flavor, if any, without loading torch
        site_packages = Path(__import__("site").getsitepackages()[0])
        if (torch_version := (site_packages/"torch"/"version.py")).exists():
            exec(torch_version.read_text(), namespace := {})
            current_flavor = namespace["__version__"].split("+")[1]
        else:
            current_flavor = None

        # If flavors mismatch, install the correct one
        if (current_flavor != flavor):
            log.info(f"Installing PyTorch Flavor ({full}), current is ({current_flavor})")
            PIP = (sys.executable, "-m", "pip")
            source_url = f"https://download.pytorch.org/whl/{flavor}"
            shell(PIP, "uninstall", "torch", "torchvision", "torchaudio", "-y")
            shell(PIP, "install", f"torch=={version}", "torchvision", "torchaudio", "--index-url", source_url)
        else:
            log.info(f"PyTorch Flavor ({full}) already installed")

write_flavor(resources: Path, flavor: Optional[TorchFlavor]) -> Optional[Path] staticmethod

Source code in Broken/Base.py
Python
1779
1780
1781
1782
1783
1784
1785
1786
@staticmethod
def write_flavor(resources: Path, flavor: Optional[TorchFlavor]) -> Optional[Path]:
    if not bool(flavor):
        return None
    flavor = (f"+{flavor.value}" * bool(flavor.value))
    file = (resources/BrokenTorch.flavor_file)
    file.write_text(BrokenTorch.version + flavor)
    return file

remove_flavor(resources: Path) -> None staticmethod

Source code in Broken/Base.py
Python
1788
1789
1790
1791
@staticmethod
def remove_flavor(resources: Path) -> None:
    for file in resources.rglob(BrokenTorch.flavor_file):
        BrokenPath.remove(file)