Skip to content

File: Broken/Core/BrokenScheduler.py

Broken.Core.BrokenScheduler

precise

precise(sleep: float, *, error: float = 0.001) -> None

A precise alternative of time.sleep(), low cpu near-end thread spin. Increases CPU usage.

Source code in Broken/Core/BrokenScheduler.py
13
14
15
16
17
18
19
20
21
22
23
24
25
def precise(sleep: float, *, error: float=0.001) -> None:
    """A precise alternative of time.sleep(), low cpu near-end thread spin. Increases CPU usage."""
    start = time.perf_counter()

    # Sleep close to the due time
    if (ahead := max(0, sleep - error)):
        time.sleep(ahead)
    else:
        return

    # Spin the thread until the time is up (precise Sleep)
    while (time.perf_counter() - start) < sleep:
        pass

NULL_CONTEXT

NULL_CONTEXT = contextlib.nullcontext()

SchedulerTask

Source code in Broken/Core/BrokenScheduler.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
@define
class SchedulerTask:

    # # Basic

    task: Callable = None
    """Function callable to call every synchronization. Automatically sends a 'time' or 'dt'
    argument if the function's signature contains it"""

    args: list[Any] = field(factory=list, repr=False)
    """Method's positional arguments"""

    kwargs: dict[str, Any] = field(factory=dict, repr=False)
    """Method's keyword arguments"""

    output: Any = field(default=None, repr=False)
    """Method's return value of the last call"""

    context: Any = None
    """Context to use when calling task (with statement)"""

    lock: Lock = None
    """Threading Lock to use when calling task (with statement)"""

    enabled: bool = True
    """Whether to enable this client or not"""

    once: bool = False
    """Client will be removed after next call"""

    # # Synchronization

    frequency: float = 60.0
    """Ideal frequency of task calls"""

    frameskip: bool = True
    """Constant deltatime mode (False) or real deltatime mode (True)"""

    freewheel: bool = False
    """"Rendering" mode, do not sleep on real time, exact virtual frametimes"""

    precise: bool = False
    """Use precise time sleeping for near-perfect frametimes"""

    # # Timing

    started: float = Factory(lambda: time.absolute())
    """Time when client was started (initializes $now+started, value in now() seconds)"""

    next_call: float = None
    """Next time to call task (initializes $now+next_call, value in now() seconds)"""

    last_call: float = None
    """Last time task was called (initializes $now+last_call, value in now() seconds)"""

    # # Flags

    _dt: bool = False

    def __attrs_post_init__(self):
        signature = inspect.signature(self.task)
        self._dt = ("dt" in signature.parameters)

        # Assign idealistic values for decoupled
        if self.freewheel: self.started = time.zero
        self.last_call = (self.last_call or self.started) - self.period
        self.next_call = (self.next_call or self.started)

    # # Useful properties

    @property
    def fps(self) -> float:
        return self.frequency

    @fps.setter
    def fps(self, value: float):
        self.frequency = value

    @property
    def period(self) -> float:
        return (1.0 / self.frequency)

    @period.setter
    def period(self, value: float):
        self.frequency = (1 / value)

    @property
    def should_delete(self) -> bool:
        return (self.once and (not self.enabled))

    @property
    def should_live(self) -> bool:
        return (not self.should_delete)

    # # Sorting (prioritizes 'once' clients)

    def __lt__(self, other: Self) -> bool:
        if (self.once and not other.once):
            return True
        return (self.next_call < other.next_call)

    def __gt__(self, other: Self) -> bool:
        if (not self.once and other.once):
            return True
        return (self.next_call > other.next_call)

    # # Implementation

    def next(self, block: bool=True) -> Self:

        # Time to wait for next call if block
        wait = max(0, (self.next_call - time.absolute()))

        # Rendering is instant
        if self.freewheel:
            pass

        # Block until due
        elif block:
            if self.precise:
                time.precise(wait)
            else:
                time.sleep(wait)

        # Non-blocking not due yet
        elif wait > 0:
            return None

        # The assumed instant the code below will run instantly
        now = (self.next_call if self.freewheel else time.absolute())

        if (self._dt):
            self.kwargs["dt"] = (now - self.last_call)

            # Frameskip limits maximum dt to period
            if (not self.frameskip):
                self.kwargs["dt"] = min(self.kwargs["dt"], self.period)

        self.last_call = now

        # Enter contexts, call task with args and kwargs
        with (self.lock or NULL_CONTEXT):
            with (self.context or NULL_CONTEXT):
                self.output = self.task(*self.args, **self.kwargs)

        # Find a future multiple of period
        while (self.next_call <= now):
            self.next_call += self.period

        # (Disabled && Once) clients gets deleted
        self.enabled = (not self.once)
        return self

task

task: Callable = None

Function callable to call every synchronization. Automatically sends a 'time' or 'dt' argument if the function's signature contains it

args

args: list[Any] = field(factory=list, repr=False)

Method's positional arguments

kwargs

kwargs: dict[str, Any] = field(factory=dict, repr=False)

Method's keyword arguments

output

output: Any = field(default=None, repr=False)

Method's return value of the last call

context

context: Any = None

Context to use when calling task (with statement)

lock

lock: Lock = None

Threading Lock to use when calling task (with statement)

enabled

enabled: bool = True

Whether to enable this client or not

once

once: bool = False

Client will be removed after next call

frequency

frequency: float = 60.0

Ideal frequency of task calls

frameskip

frameskip: bool = True

Constant deltatime mode (False) or real deltatime mode (True)

freewheel

freewheel: bool = False

"Rendering" mode, do not sleep on real time, exact virtual frametimes

precise

precise: bool = False

Use precise time sleeping for near-perfect frametimes

started

started: float = Factory(lambda: time.absolute())

Time when client was started (initializes $now+started, value in now() seconds)

next_call

next_call: float = None

Next time to call task (initializes $now+next_call, value in now() seconds)

last_call

last_call: float = None

Last time task was called (initializes $now+last_call, value in now() seconds)

__attrs_post_init__

__attrs_post_init__()
Source code in Broken/Core/BrokenScheduler.py
92
93
94
95
96
97
98
99
def __attrs_post_init__(self):
    signature = inspect.signature(self.task)
    self._dt = ("dt" in signature.parameters)

    # Assign idealistic values for decoupled
    if self.freewheel: self.started = time.zero
    self.last_call = (self.last_call or self.started) - self.period
    self.next_call = (self.next_call or self.started)

fps

fps: float

period

period: float

should_delete

should_delete: bool

should_live

should_live: bool

__lt__

__lt__(other: Self) -> bool
Source code in Broken/Core/BrokenScheduler.py
129
130
131
132
def __lt__(self, other: Self) -> bool:
    if (self.once and not other.once):
        return True
    return (self.next_call < other.next_call)

__gt__

__gt__(other: Self) -> bool
Source code in Broken/Core/BrokenScheduler.py
134
135
136
137
def __gt__(self, other: Self) -> bool:
    if (not self.once and other.once):
        return True
    return (self.next_call > other.next_call)

next

next(block: bool = True) -> Self
Source code in Broken/Core/BrokenScheduler.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def next(self, block: bool=True) -> Self:

    # Time to wait for next call if block
    wait = max(0, (self.next_call - time.absolute()))

    # Rendering is instant
    if self.freewheel:
        pass

    # Block until due
    elif block:
        if self.precise:
            time.precise(wait)
        else:
            time.sleep(wait)

    # Non-blocking not due yet
    elif wait > 0:
        return None

    # The assumed instant the code below will run instantly
    now = (self.next_call if self.freewheel else time.absolute())

    if (self._dt):
        self.kwargs["dt"] = (now - self.last_call)

        # Frameskip limits maximum dt to period
        if (not self.frameskip):
            self.kwargs["dt"] = min(self.kwargs["dt"], self.period)

    self.last_call = now

    # Enter contexts, call task with args and kwargs
    with (self.lock or NULL_CONTEXT):
        with (self.context or NULL_CONTEXT):
            self.output = self.task(*self.args, **self.kwargs)

    # Find a future multiple of period
    while (self.next_call <= now):
        self.next_call += self.period

    # (Disabled && Once) clients gets deleted
    self.enabled = (not self.once)
    return self

BrokenScheduler

Source code in Broken/Core/BrokenScheduler.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
@define
class BrokenScheduler:
    tasks: deque[SchedulerTask] = Factory(deque)

    def add(self, task: SchedulerTask) -> SchedulerTask:
        """Adds a task to the scheduler with immediate next call"""
        self.tasks.append(task)
        return task

    def new(self, task: Callable, **options) -> SchedulerTask:
        """Add a new task to the scheduler"""
        return self.add(SchedulerTask(task=task, **options))

    def once(self, task: Callable, **options) -> SchedulerTask:
        """Add a new task that shall only run once and immediately"""
        return self.add(SchedulerTask(task=task, **options, once=True))

    def delete(self, task: SchedulerTask) -> None:
        """Removes a task from the scheduler"""
        self.tasks.remove(task)

    def clear(self) -> None:
        """Removes all tasks"""
        self.tasks.clear()

    @property
    def enabled_tasks(self) -> Iterable[SchedulerTask]:
        for task in self.tasks:
            if task.enabled:
                yield task

    @property
    def next_task(self) -> Optional[SchedulerTask]:
        """Returns the next client to be called"""
        return min(self.enabled_tasks, default=None)

    def _sanitize(self) -> None:
        """Removes disabled 'once' clients"""
        # Optimization: Replace first N clients with valid ones, then pop remaining pointers
        move = 0
        for task in self.tasks:
            if task.should_live:
                self.tasks[move] = task
                move += 1
        for _ in range(len(self.tasks) - move):
            self.tasks.pop()

    def next(self, block=True) -> Optional[SchedulerTask]:
        if (task := self.next_task) is None:
            return None
        try:
            return task.next(block=block)
        finally:
            if task.should_delete:
                self._sanitize()

    def all_once(self) -> None:
        """Calls all 'once' clients. Useful for @partial calls on the main thread"""
        for task in self.tasks:
            if task.once:
                task.next()
        self._sanitize()

tasks

tasks: deque[SchedulerTask] = Factory(deque)

add

Adds a task to the scheduler with immediate next call

Source code in Broken/Core/BrokenScheduler.py
192
193
194
195
def add(self, task: SchedulerTask) -> SchedulerTask:
    """Adds a task to the scheduler with immediate next call"""
    self.tasks.append(task)
    return task

new

new(task: Callable, **options) -> SchedulerTask

Add a new task to the scheduler

Source code in Broken/Core/BrokenScheduler.py
197
198
199
def new(self, task: Callable, **options) -> SchedulerTask:
    """Add a new task to the scheduler"""
    return self.add(SchedulerTask(task=task, **options))

once

once(task: Callable, **options) -> SchedulerTask

Add a new task that shall only run once and immediately

Source code in Broken/Core/BrokenScheduler.py
201
202
203
def once(self, task: Callable, **options) -> SchedulerTask:
    """Add a new task that shall only run once and immediately"""
    return self.add(SchedulerTask(task=task, **options, once=True))

delete

delete(task: SchedulerTask) -> None

Removes a task from the scheduler

Source code in Broken/Core/BrokenScheduler.py
205
206
207
def delete(self, task: SchedulerTask) -> None:
    """Removes a task from the scheduler"""
    self.tasks.remove(task)

clear

clear() -> None

Removes all tasks

Source code in Broken/Core/BrokenScheduler.py
209
210
211
def clear(self) -> None:
    """Removes all tasks"""
    self.tasks.clear()

enabled_tasks

enabled_tasks: Iterable[SchedulerTask]

next_task

next_task: Optional[SchedulerTask]

Returns the next client to be called

next

next(block=True) -> Optional[SchedulerTask]
Source code in Broken/Core/BrokenScheduler.py
235
236
237
238
239
240
241
242
def next(self, block=True) -> Optional[SchedulerTask]:
    if (task := self.next_task) is None:
        return None
    try:
        return task.next(block=block)
    finally:
        if task.should_delete:
            self._sanitize()

all_once

all_once() -> None

Calls all 'once' clients. Useful for @partial calls on the main thread

Source code in Broken/Core/BrokenScheduler.py
244
245
246
247
248
249
def all_once(self) -> None:
    """Calls all 'once' clients. Useful for @partial calls on the main thread"""
    for task in self.tasks:
        if task.once:
            task.next()
    self._sanitize()