Skip to content

File: Broken/Core/__init__.py

Broken.Core

flatten

flatten(
    *items: Any,
    cast: type = list,
    block: Optional[Collection] = (None, ""),
    unpack: Iterable[type] = (
        list,
        deque,
        tuple,
        map,
        Generator,
    )
) -> Collection[Any]

Flatten/unpack nested iterables (list, deque, tuple, map, Generator) to a plain 1D list - Removes common falsy values by default, modify with block={None, False, "", [], ...}

Example
1
2
3
4
5
# [1, 2, 3, 4, 5, 6, 7]
flatten([1, [2, 3], 4, [5, [6, 7]]])

# [0, 1, 2, True, False, "Hello"]
flatten(range(3), (True, False), None, "Hello")

Returns:

  • Collection[Any]

    casted object with all unpacked items without any of the blocked values

Source code in Broken/Core/__init__.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def flatten(
    *items: Any,
    cast: type = list,
    block: Optional[Collection] = (None, ""),
    unpack: Iterable[type] = (list, deque, tuple, map, Generator),
) -> Collection[Any]:
    """
    Flatten/unpack nested iterables (list, deque, tuple, map, Generator) to a plain 1D list
    - Removes common falsy values by default, modify with `block={None, False, "", [], ...}`

    Example:
        ```python
        # [1, 2, 3, 4, 5, 6, 7]
        flatten([1, [2, 3], 4, [5, [6, 7]]])

        # [0, 1, 2, True, False, "Hello"]
        flatten(range(3), (True, False), None, "Hello")
        ```

    Returns:
        `cast`ed object with all `unpack`ed `items` without any of the `block`ed values
    """
    def flatten(data):
        if bool(block):
            data = filter(lambda item: (item not in block), data)
        for item in data:
            if isinstance(item, unpack):
                yield from flatten(item)
                continue
            yield item
    return cast(flatten(items))

every

every(
    *items: Any,
    cast: type = list,
    block: Collection[Any] = (None, "")
) -> Optional[Collection]

Returns the flattened items if not any element is in the block list, else None. Useful when a Model class has a list of optional arguments that doesn't add falsy values to a command

Usage
1
2
3
every(1, 2, 3) # [1, 2, 3]
every(1, 2, 3, None) # None
every("-arg, "") # None
Source code in Broken/Core/__init__.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def every(
    *items: Any,
    cast: type = list,
    block: Collection[Any] = (None, ""),
) -> Optional[Collection]:
    """
    Returns the flattened items if not any element is in the block list, else None. Useful when
    a Model class has a list of optional arguments that doesn't add falsy values to a command

    Usage:
        ```python
        every(1, 2, 3) # [1, 2, 3]
        every(1, 2, 3, None) # None
        every("-arg, "") # None
        ```
    """
    items = flatten(*items, block=None, cast=cast)
    if any(item in block for item in items):
        return None
    return items

shell

shell(
    *args: Any,
    output: bool = False,
    Popen: bool = False,
    env: dict[str, str] = None,
    confirm: bool = False,
    skip: bool = False,
    echo: bool = True,
    **kwargs
) -> Optional[
    Union[
        subprocess.CompletedProcess, subprocess.Popen, str
    ]
]

Enhanced subprocess runners with many additional features. Flattens the args, converts to str

Example
1
shell(["binary", "-m"], "arg1", None, "arg2", 3, confirm=True)
Source code in Broken/Core/__init__.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def shell(
    *args: Any,
    output: bool = False,
    Popen: bool = False,
    env: dict[str, str] = None,
    confirm: bool = False,
    skip: bool = False,
    echo: bool = True,
    **kwargs
) -> Optional[Union[
    subprocess.CompletedProcess,
    subprocess.Popen,
    str,
]]:
    """
    Enhanced subprocess runners with many additional features. Flattens the args, converts to str

    Example:
        ```python
        shell(["binary", "-m"], "arg1", None, "arg2", 3, confirm=True)
        ```
    """
    if (output and Popen):
        raise ValueError(log.error("Cannot use (output=True) and (Popen=True) at the same time"))

    args = tuple(map(str, flatten(args)))

    # Assert command won't fail due unknown binary
    if (not shell) and (not shutil.which(args[0])):
        raise FileNotFoundError(log.error(f"Binary doesn't exist or was not found on PATH ({args[0]})"))

    # Log the command being run, temp variables
    _log = (log.skip if skip else log.info)
    _cwd = f" @ ({kwargs.get('cwd', '') or Path.cwd()})"
    _log(f"{args}{_cwd}", echo=echo)
    if skip: return

    if kwargs.get("shell", False):
        args = ' '.join(args)
        log.warning((
            "Running command with (shell=True), be careful.. "
            "Consider using (confirm=True)"*(not confirm)
        ))

    if confirm and not click.confirm("• Confirm running the command above"):
        return

    # Update current environ for the command only
    kwargs["env"] = os.environ | (env or {})

    # Windows: preexec_fn is not supported, remove from kwargs
    if (os.name == "nt") and (kwargs.pop("preexec_fn", None)):
        log.minor("shell(preexec_fn=...) is not supported on Windows, ignoring..")

    if (output):
        return subprocess.check_output(args, **kwargs).decode("utf-8")

    if (Popen):
        return subprocess.Popen(args, **kwargs)

    return subprocess.run(args, **kwargs)

apply

apply(
    callback: Callable,
    iterable: Iterable,
    *,
    cast: Callable = list
) -> Collection

Applies a callback to all items of an iterable, returning a $cast of the results

Source code in Broken/Core/__init__.py
152
153
154
155
156
157
158
def apply(
    callback: Callable,
    iterable: Iterable, *,
    cast: Callable = list
) -> Collection:
    """Applies a callback to all items of an iterable, returning a $cast of the results"""
    return cast(map(callback, iterable))

denum

denum(item: Union[enum.Enum, Any]) -> Any

De-enumerates an item: if it's an Enum, returns the value, else the item itself

Source code in Broken/Core/__init__.py
161
162
163
def denum(item: Union[enum.Enum, Any]) -> Any:
    """De-enumerates an item: if it's an Enum, returns the value, else the item itself"""
    return (item.value if isinstance(item, enum.Enum) else item)

pop_fill

pop_fill(
    data: Collection, fill: type, length: int
) -> Collection

Pop or fill until a data's length is met

Source code in Broken/Core/__init__.py
166
167
168
169
170
171
172
def pop_fill(data: Collection, fill: type, length: int) -> Collection:
    """Pop or fill until a data's length is met"""
    while (len(data) > length):
        data.pop()
    while (len(data) < length):
        data.append(fill())
    return data

multi_context

multi_context(
    *contexts: contextlib.AbstractContextManager,
) -> Generator

Enter multiple contexts at once

Source code in Broken/Core/__init__.py
175
176
177
178
179
180
181
@contextlib.contextmanager
def multi_context(*contexts: contextlib.AbstractContextManager) -> Generator:
    """Enter multiple contexts at once"""
    with contextlib.ExitStack() as stack:
        for context in flatten(contexts):
            stack.enter_context(context)
        yield None

tempvars

tempvars(**variables: str) -> Generator

Temporarily sets environment variables inside a context

Source code in Broken/Core/__init__.py
184
185
186
187
188
189
190
191
192
193
194
195
@contextlib.contextmanager
def tempvars(**variables: str) -> Generator:
    """Temporarily sets environment variables inside a context"""
    original = os.environ.copy()
    os.environ.update(variables)
    try:
        log.info(f"Setting environment variables: {tuple(variables.items())}")
        yield None
    finally:
        log.info(f"Restoring environment variables: {tuple(variables.keys())}")
        os.environ.clear()
        os.environ.update(original)

block_modules

block_modules(*modules: str) -> Generator

Pretend a module isn't installed

Source code in Broken/Core/__init__.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
@contextlib.contextmanager
def block_modules(*modules: str) -> Generator:
    """Pretend a module isn't installed"""
    state = sys.modules.copy()
    try:
        for module in flatten(modules):
            sys.modules[module] = None
        yield None
    finally:
        for module in modules:
            if (module in state):
                sys.modules[module] = state[module]
                continue
            del sys.modules[module]

smartproxy

smartproxy(object: Any) -> Any

Returns a weakref proxy if the object is not already proxied

Source code in Broken/Core/__init__.py
214
215
216
217
218
219
220
221
def smartproxy(object: Any) -> Any:
    """Returns a weakref proxy if the object is not already proxied"""
    from weakref import CallableProxyType, ProxyType, proxy

    if not isinstance(object, (CallableProxyType, ProxyType)):
        object = proxy(object)

    return object

clamp

clamp(
    value: float, low: float = 0, high: float = 1
) -> float
Source code in Broken/Core/__init__.py
224
225
def clamp(value: float, low: float=0, high: float=1) -> float:
    return max(low, min(value, high))

nearest

nearest(
    number: Number,
    multiple: Number,
    *,
    cast=int,
    operator: Callable = round
) -> Number

Finds the nearest multiple of a base number, by default ints but works for floats too

Source code in Broken/Core/__init__.py
228
229
230
def nearest(number: Number, multiple: Number, *, cast=int, operator: Callable=round) -> Number:
    """Finds the nearest multiple of a base number, by default ints but works for floats too"""
    return cast(multiple * operator(number/multiple))

list_get

list_get(
    data: list, index: int, default: Any = None
) -> Optional[Any]

Returns the item at 'index' or 'default' if out of range

Source code in Broken/Core/__init__.py
233
234
235
236
237
def list_get(data: list, index: int, default: Any=None) -> Optional[Any]:
    """Returns the item at 'index' or 'default' if out of range"""
    if (index >= len(data)):
        return default
    return data[index]

hyphen_range

hyphen_range(
    string: Optional[str], *, inclusive: bool = True
) -> Iterable[int]

Yields the numbers in a hyphenated CSV range, just like when selecting what pages to print - Accepts any of ("-", "..", "...", "_", "->") as a hyphenated range - Special values: - "all", returns infinite range from 0 - "even", returns even numbers - "odd", returns odd numbers

Example
1
2
3
hyphen_range("2,3") # 2, 3
hyphen_range("2-5") # 2, 3, 4, 5
hyphen_range("1-3, 5") # 1, 2, 3, 5
Source code in Broken/Core/__init__.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def hyphen_range(string: Optional[str], *, inclusive: bool=True) -> Iterable[int]:
    """
    Yields the numbers in a hyphenated CSV range, just like when selecting what pages to print
    - Accepts any of ("-", "..", "...", "_", "->") as a hyphenated range
    - Special values:
        - "all", returns infinite range from 0
        - "even", returns even numbers
        - "odd", returns odd numbers

    Example:
        ```python
        hyphen_range("2,3") # 2, 3
        hyphen_range("2-5") # 2, 3, 4, 5
        hyphen_range("1-3, 5") # 1, 2, 3, 5
        ```
    """
    if not bool(string):
        return None

    if (string == "all"):
        yield from itertools.count()
    elif (string == "even"):
        yield from itertools.count(0, 2)
    elif (string == "odd"):
        yield from itertools.count(1, 2)

    for part in string.split(","):
        if ("-" in part):
            start, end = map(int, re.split(r"_|-|\.\.|\.\.\.|\-\>", part))
            yield from range(start, end + int(inclusive))
            continue
        yield int(part)

limited_ratio

limited_ratio(
    number: Optional[float], *, limit: float = None
) -> Optional[tuple[int, int]]

Same as Number.as_integer_ratio but with an optional upper limit and optional return

Source code in Broken/Core/__init__.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def limited_ratio(
    number: Optional[float], *,
    limit: float = None
) -> Optional[tuple[int, int]]:
    """Same as Number.as_integer_ratio but with an optional upper limit and optional return"""
    if (number is None):
        return None

    num, den = number.as_integer_ratio()

    if limit and (den > limit or num > limit):
        normalize = limit/min(num, den)
        num *= normalize
        den *= normalize

    return (int(num), int(den))

overrides

overrides(
    old: Any,
    new: Optional[Any],
    default: Optional[Any] = None,
    resets: Any = -1,
) -> Optional[Any]

Returns 'new' if is not None, else keeps 'old' value

Source code in Broken/Core/__init__.py
292
293
294
295
296
297
298
299
300
301
302
303
def overrides(
    old: Any,
    new: Optional[Any],
    default: Optional[Any]=None,
    resets: Any=-1
) -> Optional[Any]:
    """Returns 'new' if is not None, else keeps 'old' value"""
    if (new == resets):
        return default
    if (new is None):
        return old
    return new

install

install(
    *,
    packages: Union[str, Iterable[str]],
    pypi: Optional[Union[str, Iterable[str]]] = None,
    args: Optional[Union[str, Iterable[str]]] = None
) -> None
Source code in Broken/Core/__init__.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def install(*,
    packages: Union[str, Iterable[str]],
    pypi: Optional[Union[str, Iterable[str]]]=None,
    args: Optional[Union[str, Iterable[str]]]=None
) -> None:
    # Ensure arguments are tuples
    packages = flatten(packages, cast=tuple)
    pypi = flatten(pypi or packages, cast=tuple)
    args = flatten(args, cast=tuple)

    caller = inspect.currentframe().f_back.f_globals

    # Import the package and insert on the caller's globals
    def inject_packages():
        for package in packages:
            caller[package] = __import__(package)

    try:
        return inject_packages()
    except ImportError:
        log.info(f"Installing packages: {packages}..")

    for method in (
        (sys.executable, "-m", "uv", "pip", "install"),
        (sys.executable, "-m", "pip", "install")
    ):
        if shell(*method, *pypi, *args).returncode == 0:
            return inject_packages()

    raise RuntimeError(log.error(f"Failed to install packages: {packages}"))

combinations

combinations(**options: Any) -> Iterable[DotMap]

Returns a dictionary of key='this' of itertools.product

Source code in Broken/Core/__init__.py
338
339
340
341
def combinations(**options: Any) -> Iterable[DotMap]:
    """Returns a dictionary of key='this' of itertools.product"""
    for items in itertools.product(*options.values()):
        yield DotMap(zip(options.keys(), items))

arguments

arguments() -> bool

Returns True if any arguments are present on sys.argv

Source code in Broken/Core/__init__.py
344
345
346
def arguments() -> bool:
    """Returns True if any arguments are present on sys.argv"""
    return bool(sys.argv[1:])

easyloop

easyloop(method: Callable = None, *, period: float = 0.0)

Wraps a method in an infinite loop called every 'period' seconds

Source code in Broken/Core/__init__.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def easyloop(method: Callable=None, *, period: float=0.0):
    """Wraps a method in an infinite loop called every 'period' seconds"""

    def decorator(method):

        @functools.wraps(method)
        def wrapper(*args, **kwargs):
            while True:
                method(*args, **kwargs)
                time.sleep(period)

        return wrapper

    if (method is None):
        return decorator

    return decorator(method)

Nothing

No-operation faster than Mock - A class that does nothing

Source code in Broken/Core/__init__.py
371
372
373
374
375
376
377
378
class Nothing:
    """No-operation faster than Mock - A class that does nothing"""
    def __nop__(self, *args, **kwargs) -> Self:
        return self
    def __call__(self, *args, **kwargs) -> Self:
        return self
    def __getattr__(self, _) -> Callable:
        return self.__nop__

__nop__

__nop__(*args, **kwargs) -> Self
Source code in Broken/Core/__init__.py
373
374
def __nop__(self, *args, **kwargs) -> Self:
    return self

__call__

__call__(*args, **kwargs) -> Self
Source code in Broken/Core/__init__.py
375
376
def __call__(self, *args, **kwargs) -> Self:
    return self

__getattr__

__getattr__(_) -> Callable
Source code in Broken/Core/__init__.py
377
378
def __getattr__(self, _) -> Callable:
    return self.__nop__

StaticClass

A class that can't be instantiated directl, only used for static methods (namespace)

Source code in Broken/Core/__init__.py
381
382
383
384
385
class StaticClass:
    """A class that can't be instantiated directl, only used for static methods (namespace)"""

    def __new__(cls, *args, **kwargs):
        raise TypeError(f"Can't instantiate static class '{cls.__name__}'")

__new__

__new__(*args, **kwargs)
Source code in Broken/Core/__init__.py
384
385
def __new__(cls, *args, **kwargs):
    raise TypeError(f"Can't instantiate static class '{cls.__name__}'")

BrokenSingleton

Bases: ABC

Source code in Broken/Core/__init__.py
388
389
390
391
392
393
class BrokenSingleton(ABC):
    def __new__(cls, *args, **kwargs):
        if not hasattr(cls, "__instance__"):
            self = super().__new__(cls)
            cls.__instance__ = self
        return cls.__instance__

__new__

__new__(*args, **kwargs)
Source code in Broken/Core/__init__.py
389
390
391
392
393
def __new__(cls, *args, **kwargs):
    if not hasattr(cls, "__instance__"):
        self = super().__new__(cls)
        cls.__instance__ = self
    return cls.__instance__

BrokenFluent

Fluent-like .copy(update) and .(update) setter for classes

Source code in Broken/Core/__init__.py
396
397
398
399
400
401
402
403
404
405
406
407
class BrokenFluent:
    """Fluent-like .copy(**update) and .(**update) setter for classes"""

    def __call__(self, **update) -> Self:
        """Updates the instance with the provided kwargs"""
        for key, value in update.items():
            setattr(self, key, value)
        return self

    def copy(self, **update) -> Self:
        """Returns an updated copy of this instance"""
        return copy.deepcopy(self)(**update)

__call__

__call__(**update) -> Self

Updates the instance with the provided kwargs

Source code in Broken/Core/__init__.py
399
400
401
402
403
def __call__(self, **update) -> Self:
    """Updates the instance with the provided kwargs"""
    for key, value in update.items():
        setattr(self, key, value)
    return self

copy

copy(**update) -> Self

Returns an updated copy of this instance

Source code in Broken/Core/__init__.py
405
406
407
def copy(self, **update) -> Self:
    """Returns an updated copy of this instance"""
    return copy.deepcopy(self)(**update)

BrokenAttrs

Walk over an @attrs.defined class and call post on all classes in the MRO Warn: Must NOT define attrs_post_init in an inheriting class Fixme: Can improve by starting on BrokenAttrs itself

Source code in Broken/Core/__init__.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
class BrokenAttrs:
    """
    Walk over an @attrs.defined class and call __post__ on all classes in the MRO
    Warn: Must NOT define __attrs_post_init__ in an inheriting class
    Fixme: Can improve by starting on BrokenAttrs itself
    """
    def __attrs_post_init__(self):
        for cls in reversed(type(self).mro()):
            if method := cls.__dict__.get("__post__"):
                method(self)

    @abstractmethod
    def __post__(self) -> None:
        ...

__attrs_post_init__

__attrs_post_init__()
Source code in Broken/Core/__init__.py
416
417
418
419
def __attrs_post_init__(self):
    for cls in reversed(type(self).mro()):
        if method := cls.__dict__.get("__post__"):
            method(self)

__post__

__post__() -> None
Source code in Broken/Core/__init__.py
421
422
423
@abstractmethod
def __post__(self) -> None:
    ...

BrokenModel

Bases: BaseModel

Pydantic model utilities

Source code in Broken/Core/__init__.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
class BrokenModel(BaseModel):
    """Pydantic model utilities"""
    model_config = ConfigDict(
        use_attribute_docstrings=True,
    )

    def model_post_init(self, __context):
        for cls in reversed(type(self).mro()):
            if method := cls.__dict__.get("__post__"):
                method(self)

    def __post__(self) -> None:
        ...

    def __hash__(self) -> int:
        """Deterministic hash heuristic, as hash() is random seeded"""
        return int(hashlib.sha256(self.json(full=True).encode()).hexdigest(), 16)

    # Serialization

    def json(self, full: bool=True) -> str:
        return self.model_dump_json(
            exclude_defaults=(not full),
            exclude_none=False
        )

    def dict(self, full: bool=True) -> dict:
        return self.model_dump(
            exclude_defaults=(not full),
            exclude_none=False
        )

    def schema(self) -> dict:
        return self.model_json_schema()

    # Deserialization

    @classmethod
    def load(cls, data: Union[dict, str]) -> Self:
        if isinstance(data, dict):
            return cls.model_validate(data)
        elif isinstance(data, str):
            return cls.model_validate_json(data)
        elif isinstance(data, cls):
            return data
        raise ValueError(f"Can't load from value of type {type(data)}")

    def update(self, **data: Union[dict, str]) -> Self:
        for (key, value) in data.items():
            setattr(self, key, value)
        return self

    # Dict-like utilities

    def keys(self) -> Iterable[str]:
        yield from self.dict().keys()

    def values(self) -> Iterable[Any]:
        yield from self.dict().values()

    def items(self) -> Iterable[tuple[str, Any]]:
        yield from self.dict().items()

    # Special

    def reset(self) -> None:
        for key, value in self.model_fields.items():
            setattr(self, key, value.default)

model_config

model_config = ConfigDict(use_attribute_docstrings=True)

model_post_init

model_post_init(__context)
Source code in Broken/Core/__init__.py
432
433
434
435
def model_post_init(self, __context):
    for cls in reversed(type(self).mro()):
        if method := cls.__dict__.get("__post__"):
            method(self)

__post__

__post__() -> None
Source code in Broken/Core/__init__.py
437
438
def __post__(self) -> None:
    ...

__hash__

__hash__() -> int

Deterministic hash heuristic, as hash() is random seeded

Source code in Broken/Core/__init__.py
440
441
442
def __hash__(self) -> int:
    """Deterministic hash heuristic, as hash() is random seeded"""
    return int(hashlib.sha256(self.json(full=True).encode()).hexdigest(), 16)

json

json(full: bool = True) -> str
Source code in Broken/Core/__init__.py
446
447
448
449
450
def json(self, full: bool=True) -> str:
    return self.model_dump_json(
        exclude_defaults=(not full),
        exclude_none=False
    )

dict

dict(full: bool = True) -> dict
Source code in Broken/Core/__init__.py
452
453
454
455
456
def dict(self, full: bool=True) -> dict:
    return self.model_dump(
        exclude_defaults=(not full),
        exclude_none=False
    )

schema

schema() -> dict
Source code in Broken/Core/__init__.py
458
459
def schema(self) -> dict:
    return self.model_json_schema()

load

load(data: Union[dict, str]) -> Self
Source code in Broken/Core/__init__.py
463
464
465
466
467
468
469
470
471
@classmethod
def load(cls, data: Union[dict, str]) -> Self:
    if isinstance(data, dict):
        return cls.model_validate(data)
    elif isinstance(data, str):
        return cls.model_validate_json(data)
    elif isinstance(data, cls):
        return data
    raise ValueError(f"Can't load from value of type {type(data)}")

update

update(**data: Union[dict, str]) -> Self
Source code in Broken/Core/__init__.py
473
474
475
476
def update(self, **data: Union[dict, str]) -> Self:
    for (key, value) in data.items():
        setattr(self, key, value)
    return self

keys

keys() -> Iterable[str]
Source code in Broken/Core/__init__.py
480
481
def keys(self) -> Iterable[str]:
    yield from self.dict().keys()

values

values() -> Iterable[Any]
Source code in Broken/Core/__init__.py
483
484
def values(self) -> Iterable[Any]:
    yield from self.dict().values()

items

items() -> Iterable[tuple[str, Any]]
Source code in Broken/Core/__init__.py
486
487
def items(self) -> Iterable[tuple[str, Any]]:
    yield from self.dict().items()

reset

reset() -> None
Source code in Broken/Core/__init__.py
491
492
493
def reset(self) -> None:
    for key, value in self.model_fields.items():
        setattr(self, key, value.default)

FrozenHash

Bases: BaseModel

Source code in Broken/Core/__init__.py
496
497
498
499
500
501
class FrozenHash(BaseModel):
    hash: int = Field(0, exclude=True)

    def __hash__(self):
        self.hash = (self.hash or super().__hash__())
        return self.hash

hash

hash: int = Field(0, exclude=True)

__hash__

__hash__()
Source code in Broken/Core/__init__.py
499
500
501
def __hash__(self):
    self.hash = (self.hash or super().__hash__())
    return self.hash

BrokenAttribute

Bases: StaticClass

Recursive implementation for getattr and setattr from strings

Source code in Broken/Core/__init__.py
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
class BrokenAttribute(StaticClass):
    """Recursive implementation for getattr and setattr from strings"""

    @define
    class Parts:
        all: list[str]
        body: list[str]
        last: str

    @classmethod
    def decompose(cls, key: str) -> Parts:
        parts = str(key).replace("-", "_").split(".")

        return cls.Parts(
            all=parts,
            body=parts[:-1],
            last=parts[-1]
        )

    @classmethod
    def get(cls, root: object, key: str) -> Optional[Any]:
        parts = cls.decompose(key)

        for part in parts.all:
            try:
                root = getattr(root, part)
            except AttributeError:
                return None

        return root

    @classmethod
    def set(cls, object: object, attribute: str, value: Any) -> None:
        parts = cls.decompose(attribute)

        for part in parts.body:
            try:
                object = getattr(object, part)
            except AttributeError:
                return None

        setattr(object, parts.last, value)

Parts

Source code in Broken/Core/__init__.py
507
508
509
510
511
@define
class Parts:
    all: list[str]
    body: list[str]
    last: str
all
all: list[str]
body
body: list[str]
last
last: str

decompose

decompose(key: str) -> Parts
Source code in Broken/Core/__init__.py
513
514
515
516
517
518
519
520
521
@classmethod
def decompose(cls, key: str) -> Parts:
    parts = str(key).replace("-", "_").split(".")

    return cls.Parts(
        all=parts,
        body=parts[:-1],
        last=parts[-1]
    )

get

get(root: object, key: str) -> Optional[Any]
Source code in Broken/Core/__init__.py
523
524
525
526
527
528
529
530
531
532
533
@classmethod
def get(cls, root: object, key: str) -> Optional[Any]:
    parts = cls.decompose(key)

    for part in parts.all:
        try:
            root = getattr(root, part)
        except AttributeError:
            return None

    return root

set

set(object: object, attribute: str, value: Any) -> None
Source code in Broken/Core/__init__.py
535
536
537
538
539
540
541
542
543
544
545
@classmethod
def set(cls, object: object, attribute: str, value: Any) -> None:
    parts = cls.decompose(attribute)

    for part in parts.body:
        try:
            object = getattr(object, part)
        except AttributeError:
            return None

    setattr(object, parts.last, value)

StringUtils

Bases: StaticClass

Source code in Broken/Core/__init__.py
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
class StringUtils(StaticClass):

    @classmethod
    def border(cls, string: str, border: str) -> bool:
        """Returns True if 'border' is both a prefix and suffix of 'string'"""
        return (string.startswith(border) and string.endswith(reversed(border)))

    @classmethod
    def dunder(cls, name: str) -> bool:
        """Checks if a string is a double underscore '__name__'"""
        return cls.border(name, "__")

    @classmethod
    def sunder(cls, name: str) -> bool:
        """Checks if a string is a single underscore '_name_'"""
        return (cls.border(name, "_") and not cls.dunder(name))

    @classmethod
    def private(cls, name: str) -> bool:
        """Checks if a string is a private name"""
        return name.startswith("_")

border

border(string: str, border: str) -> bool

Returns True if 'border' is both a prefix and suffix of 'string'

Source code in Broken/Core/__init__.py
550
551
552
553
@classmethod
def border(cls, string: str, border: str) -> bool:
    """Returns True if 'border' is both a prefix and suffix of 'string'"""
    return (string.startswith(border) and string.endswith(reversed(border)))

dunder

dunder(name: str) -> bool

Checks if a string is a double underscore 'name'

Source code in Broken/Core/__init__.py
555
556
557
558
@classmethod
def dunder(cls, name: str) -> bool:
    """Checks if a string is a double underscore '__name__'"""
    return cls.border(name, "__")

sunder

sunder(name: str) -> bool

Checks if a string is a single underscore 'name'

Source code in Broken/Core/__init__.py
560
561
562
563
@classmethod
def sunder(cls, name: str) -> bool:
    """Checks if a string is a single underscore '_name_'"""
    return (cls.border(name, "_") and not cls.dunder(name))

private

private(name: str) -> bool

Checks if a string is a private name

Source code in Broken/Core/__init__.py
565
566
567
568
@classmethod
def private(cls, name: str) -> bool:
    """Checks if a string is a private name"""
    return name.startswith("_")

DictUtils

Bases: StaticClass

Source code in Broken/Core/__init__.py
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
class DictUtils(StaticClass):

    @staticmethod
    def filter_dict(
        data: dict[str, Any], *,
        block: Optional[Collection] = None,
        allow: Optional[Collection] = None,
    ) -> dict[str, Any]:
        """Filters a dictionary by removing 'block' or only allowing 'allow' keys"""
        if block:
            data = {key: value for key, value in data.items() if (key not in block)}
        if allow:
            data = {key: value for key, value in data.items() if (key in allow)}
        return data

    @classmethod
    def ritems(cls, data: dict[str, Any]) -> Iterable[tuple[str, Any]]:
        """Recursively yields all items from a dictionary"""
        for (key, value) in data.items():
            if isinstance(value, dict):
                yield from cls.ritems(value)
                continue
            yield (key, value)

    @classmethod
    def rvalues(cls, data: dict[str, Any]) -> Iterable[Any]:
        """Recursively yields all values from a dictionary"""
        for (key, value) in cls.ritems(data):
            yield value

    @classmethod
    def selfless(cls, data: dict) -> dict:
        """Removes the 'self' key from a dictionary (useful for locals() or __dict__)"""
        # Note: It's also possible to call Class.method(**locals()) instead!
        return cls.filter_dict(data, block=["self"])

filter_dict

filter_dict(
    data: dict[str, Any],
    *,
    block: Optional[Collection] = None,
    allow: Optional[Collection] = None
) -> dict[str, Any]

Filters a dictionary by removing 'block' or only allowing 'allow' keys

Source code in Broken/Core/__init__.py
573
574
575
576
577
578
579
580
581
582
583
584
@staticmethod
def filter_dict(
    data: dict[str, Any], *,
    block: Optional[Collection] = None,
    allow: Optional[Collection] = None,
) -> dict[str, Any]:
    """Filters a dictionary by removing 'block' or only allowing 'allow' keys"""
    if block:
        data = {key: value for key, value in data.items() if (key not in block)}
    if allow:
        data = {key: value for key, value in data.items() if (key in allow)}
    return data

ritems

ritems(data: dict[str, Any]) -> Iterable[tuple[str, Any]]

Recursively yields all items from a dictionary

Source code in Broken/Core/__init__.py
586
587
588
589
590
591
592
593
@classmethod
def ritems(cls, data: dict[str, Any]) -> Iterable[tuple[str, Any]]:
    """Recursively yields all items from a dictionary"""
    for (key, value) in data.items():
        if isinstance(value, dict):
            yield from cls.ritems(value)
            continue
        yield (key, value)

rvalues

rvalues(data: dict[str, Any]) -> Iterable[Any]

Recursively yields all values from a dictionary

Source code in Broken/Core/__init__.py
595
596
597
598
599
@classmethod
def rvalues(cls, data: dict[str, Any]) -> Iterable[Any]:
    """Recursively yields all values from a dictionary"""
    for (key, value) in cls.ritems(data):
        yield value

selfless

selfless(data: dict) -> dict

Removes the 'self' key from a dictionary (useful for locals() or dict)

Source code in Broken/Core/__init__.py
601
602
603
604
605
@classmethod
def selfless(cls, data: dict) -> dict:
    """Removes the 'self' key from a dictionary (useful for locals() or __dict__)"""
    # Note: It's also possible to call Class.method(**locals()) instead!
    return cls.filter_dict(data, block=["self"])

BrokenCache

Bases: StaticClass

Source code in Broken/Core/__init__.py
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
class BrokenCache(StaticClass):

    @staticmethod
    @functools.lru_cache
    @functools.wraps(functools.lru_cache)
    def lru(*args, **kwargs) -> Callable:
        """Smarter lru_cache consistent with multi-calls"""
        return functools.lru_cache(*args, **kwargs)

    @staticmethod
    def requests(*args, patch: bool=False, **kwargs):
        import requests
        import requests_cache
        session = requests_cache.CachedSession(*args, **kwargs)
        if patch:
            requests.Session = session
        return session

lru

lru(*args, **kwargs) -> Callable

Smarter lru_cache consistent with multi-calls

Source code in Broken/Core/__init__.py
610
611
612
613
614
615
@staticmethod
@functools.lru_cache
@functools.wraps(functools.lru_cache)
def lru(*args, **kwargs) -> Callable:
    """Smarter lru_cache consistent with multi-calls"""
    return functools.lru_cache(*args, **kwargs)

requests

requests(*args, patch: bool = False, **kwargs)
Source code in Broken/Core/__init__.py
617
618
619
620
621
622
623
624
@staticmethod
def requests(*args, patch: bool=False, **kwargs):
    import requests
    import requests_cache
    session = requests_cache.CachedSession(*args, **kwargs)
    if patch:
        requests.Session = session
    return session

BrokenRelay

A utility class for sharing one-to-many callbacks in a 'observer' pattern style. Multiple callabacks can be subscribed to receive the same args and kwargs when an instance of this class is called. Useful cases are to avoid inheritance when sharing callbacks.

Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
relay = BrokenRelay()

# Basic usage
relay.subscribe(callback1, callback2)
relay(*args, **kwargs) # Calls callback1 and callback2

# Can also 'inject' us to bound callables
window = moderngl_window(...)
window.key_event_func = relay
window.key_event_func = relay @ (camera.walk, camera.rotate)
Source code in Broken/Core/__init__.py
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
@define
class BrokenRelay:
    """
    A utility class for sharing one-to-many callbacks in a 'observer' pattern style. Multiple
    callabacks can be subscribed to receive the same args and kwargs when an instance of this class
    is called. Useful cases are to avoid inheritance when sharing callbacks.

    Example:
        ```python
        relay = BrokenRelay()

        # Basic usage
        relay.subscribe(callback1, callback2)
        relay(*args, **kwargs) # Calls callback1 and callback2

        # Can also 'inject' us to bound callables
        window = moderngl_window(...)
        window.key_event_func = relay
        window.key_event_func = relay @ (camera.walk, camera.rotate)
        ```
    """
    registry: deque[Callable] = Factory(deque)

    def subscribe(self, *methods: Iterable[Callable]) -> Self:
        """Adds callbacks to be called with same arguments as self.__call__"""
        self.registry.extend(flatten(methods))
        return self

    def __call__(self, *args, **kwargs):
        for callback in self.registry:
            callback(*args, **kwargs)

registry

registry: deque[Callable] = Factory(deque)

subscribe

subscribe(*methods: Iterable[Callable]) -> Self

Adds callbacks to be called with same arguments as self.call

Source code in Broken/Core/__init__.py
650
651
652
653
def subscribe(self, *methods: Iterable[Callable]) -> Self:
    """Adds callbacks to be called with same arguments as self.__call__"""
    self.registry.extend(flatten(methods))
    return self

__call__

__call__(*args, **kwargs)
Source code in Broken/Core/__init__.py
655
656
657
def __call__(self, *args, **kwargs):
    for callback in self.registry:
        callback(*args, **kwargs)

Patch

Source code in Broken/Core/__init__.py
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
@define
class Patch:
    file: Path = field(converter=Path)
    replaces: dict[str, str] = field(factory=dict)
    _original: str = None

    def __attrs_post_init__(self):
        self._original = self.file.read_text("utf-8")

    def apply(self):
        content = self._original
        for key, value in self.replaces.items():
            content = content.replace(key, value)
        self.file.write_text(content, "utf-8")

    def revert(self):
        self.file.write_text(self._original, "utf-8")

    def __enter__(self):
        self.apply()
    def __exit__(self, *args):
        self.revert()

file

file: Path = field(converter=Path)

replaces

replaces: dict[str, str] = field(factory=dict)

__attrs_post_init__

__attrs_post_init__()
Source code in Broken/Core/__init__.py
666
667
def __attrs_post_init__(self):
    self._original = self.file.read_text("utf-8")

apply

apply()
Source code in Broken/Core/__init__.py
669
670
671
672
673
def apply(self):
    content = self._original
    for key, value in self.replaces.items():
        content = content.replace(key, value)
    self.file.write_text(content, "utf-8")

revert

revert()
Source code in Broken/Core/__init__.py
675
676
def revert(self):
    self.file.write_text(self._original, "utf-8")

__enter__

__enter__()
Source code in Broken/Core/__init__.py
678
679
def __enter__(self):
    self.apply()

__exit__

__exit__(*args)
Source code in Broken/Core/__init__.py
680
681
def __exit__(self, *args):
    self.revert()

BrokenWatchdog

Bases: ABC

Source code in Broken/Core/__init__.py
684
685
686
687
688
689
690
691
692
693
694
class BrokenWatchdog(ABC):

    @abstractmethod
    def __changed__(self, key, value) -> None:
        """Called when a property changes"""
        ...

    def __setattr__(self, key, value):
        """Calls __changed__ when a property changes"""
        super().__setattr__(key, value)
        self.__changed__(key, value)

__changed__

__changed__(key, value) -> None

Called when a property changes

Source code in Broken/Core/__init__.py
686
687
688
689
@abstractmethod
def __changed__(self, key, value) -> None:
    """Called when a property changes"""
    ...

__setattr__

__setattr__(key, value)

Calls changed when a property changes

Source code in Broken/Core/__init__.py
691
692
693
694
def __setattr__(self, key, value):
    """Calls __changed__ when a property changes"""
    super().__setattr__(key, value)
    self.__changed__(key, value)

ThreadedStdin

Source code in Broken/Core/__init__.py
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
@define
class ThreadedStdin:
    _process: subprocess.Popen
    _queue: Queue = Factory(factory=lambda: Queue(maxsize=10))
    _loop: bool = True

    def __attrs_post_init__(self):
        Thread(target=self.worker, daemon=True).start()
        self._process.stdin = self
    def write(self, data):
        self._queue.put(data)
    def worker(self):
        while self._loop:
            self._process.stdin.write(self._queue.get())
            self._queue.task_done()
    def close(self):
        self._queue.join()
        self._loop = False
        self._process.stdin.close()
        while self._process.poll() is None:
            time.sleep(0.01)

__attrs_post_init__

__attrs_post_init__()
Source code in Broken/Core/__init__.py
703
704
705
def __attrs_post_init__(self):
    Thread(target=self.worker, daemon=True).start()
    self._process.stdin = self

write

write(data)
Source code in Broken/Core/__init__.py
706
707
def write(self, data):
    self._queue.put(data)

worker

worker()
Source code in Broken/Core/__init__.py
708
709
710
711
def worker(self):
    while self._loop:
        self._process.stdin.write(self._queue.get())
        self._queue.task_done()

close

close()
Source code in Broken/Core/__init__.py
712
713
714
715
716
717
def close(self):
    self._queue.join()
    self._loop = False
    self._process.stdin.close()
    while self._process.poll() is None:
        time.sleep(0.01)

transcends

transcends(method, base, generator: bool = False)

Are you tired of managing and calling super().(args, *kwargs) in your methods?

We have just the right solution for you!

Introducing transcends, the decorator that crosses your class's MRO and calls the method with the same name as the one you are decorating. It's an automatic super() everywhere!

Source code in Broken/Core/__init__.py
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
def transcends(method, base, generator: bool=False):
    """
    Are you tired of managing and calling super().<name>(*args, **kwargs) in your methods?
    > We have just the right solution for you!

    Introducing transcends, the decorator that crosses your class's MRO and calls the method
    with the same name as the one you are decorating. It's an automatic super() everywhere!
    """
    name = method.__name__

    def decorator(func: Callable) -> Callable:
        def get_targets(self):
            for cls in type(self).mro()[:-2]:
                if cls in (base, object):
                    continue
                if (target := cls.__dict__.get(name)):
                    yield target

        # Note: We can't have a `if generator` else the func becomes a Generator
        def yields(self, *args, **kwargs):
            for target in get_targets(self):
                yield from target(self, *args, **kwargs)
        def plain(self, *args, **kwargs):
            for target in get_targets(self):
                target(self, *args, **kwargs)

        return (yields if generator else plain)
    return decorator

LazyImport

Source code in Broken/Core/__init__.py
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
class LazyImport:
    __import__ = copy.deepcopy(__import__)

    def __init__(self, _name: str=None):
        self._lzname_ = _name

    def __load__(self) -> Any:
        del sys.modules[self._lzname_]
        module = LazyImport.__import__(self._lzname_)
        sys.modules[self._lzname_] = module

        # Update the caller's globals with the reloaded
        sys._getframe(2).f_globals[self._lzname_] = module

        return module

    def __getattr__(self, name) -> Any:
        return getattr(self.__load__(), name)

    def __str__(self) -> str:
        return f"{type(self).__name__}(name='{self._lzname_}')"

    def __enter__(self):

        @functools.wraps(LazyImport.__import__)
        def laziest(*args):
            module = type(self)(_name=args[0])
            return sys.modules.setdefault(module._lzname_, module)

        # Patch the import function with ours
        __builtins__["__import__"] = laziest

    def __exit__(self, *args):
        __builtins__["__import__"] = LazyImport.__import__

__import__

__import__ = copy.deepcopy(__import__)

__init__

__init__(_name: str = None)
Source code in Broken/Core/__init__.py
756
757
def __init__(self, _name: str=None):
    self._lzname_ = _name

__load__

__load__() -> Any
Source code in Broken/Core/__init__.py
759
760
761
762
763
764
765
766
767
def __load__(self) -> Any:
    del sys.modules[self._lzname_]
    module = LazyImport.__import__(self._lzname_)
    sys.modules[self._lzname_] = module

    # Update the caller's globals with the reloaded
    sys._getframe(2).f_globals[self._lzname_] = module

    return module

__getattr__

__getattr__(name) -> Any
Source code in Broken/Core/__init__.py
769
770
def __getattr__(self, name) -> Any:
    return getattr(self.__load__(), name)

__str__

__str__() -> str
Source code in Broken/Core/__init__.py
772
773
def __str__(self) -> str:
    return f"{type(self).__name__}(name='{self._lzname_}')"

__enter__

__enter__()
Source code in Broken/Core/__init__.py
775
776
777
778
779
780
781
782
783
def __enter__(self):

    @functools.wraps(LazyImport.__import__)
    def laziest(*args):
        module = type(self)(_name=args[0])
        return sys.modules.setdefault(module._lzname_, module)

    # Patch the import function with ours
    __builtins__["__import__"] = laziest

__exit__

__exit__(*args)
Source code in Broken/Core/__init__.py
785
786
def __exit__(self, *args):
    __builtins__["__import__"] = LazyImport.__import__