Skip to content

Classes

CronTrigger

Internal cron-expression trigger backing Daily and Cron from hassette.scheduler.triggers.

Not part of the public API — use Daily or Cron from hassette.scheduler.triggers instead.

Source code in src/hassette/scheduler/classes.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class CronTrigger:
    """Internal cron-expression trigger backing ``Daily`` and ``Cron`` from ``hassette.scheduler.triggers``.

    Not part of the public API — use ``Daily`` or ``Cron`` from ``hassette.scheduler.triggers`` instead.
    """

    def __init__(self, cron_expression: str, start: ZonedDateTime | None = None):
        self.cron_expression = cron_expression
        self.start = start
        # Validate expression eagerly at construction time
        base = start or date_utils.now()
        croniter(cron_expression, base.py_datetime(), ret_type=datetime)

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, CronTrigger):
            return NotImplemented
        return self.cron_expression == other.cron_expression

    def __hash__(self) -> int:
        return hash(self.cron_expression)

    def __str__(self) -> str:
        return f"cron:{self.cron_expression}"

    def first_run_time(self, current_time: ZonedDateTime) -> ZonedDateTime:
        """Return the first cron-grid-aligned run time at or after current_time."""
        # Use start as the croniter anchor, but always snap to the cron grid.
        # This finds the first cron-aligned time at or after start (or current_time if no start).
        anchor = self.start or current_time
        reference = self.start if (self.start is not None and self.start > current_time) else current_time
        return self._next_after(anchor, reference)

    def next_run_time(self, previous_run: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime:
        """Return the next cron-grid-aligned run time after previous_run that is later than current_time."""
        return self._next_after(previous_run, current_time)

    def _next_after(self, anchor: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime:
        cron = croniter(self.cron_expression, anchor.py_datetime(), ret_type=datetime)
        # Normalise current_time to UTC so the ``next_time > current_dt`` comparison below is
        # unambiguous around DST transitions. During fall-back, ``current_time`` may carry
        # fold=0 (pre-transition, CDT) while croniter returns fold=1 wall-clock values (CST).
        # Without UTC normalisation, the CST occurrence appears UTC-earlier than the CDT
        # anchor and the loop skips the ambiguous slot entirely.
        current_dt_utc = current_time.py_datetime().astimezone(UTC)
        ambiguous_ticks_skipped = 0
        # Bounded iteration — avoids O(N) spin for sub-second crons after long downtime.
        # 10,000 iterations covers ~2.7 hours of per-second crons, which is generous.
        for _ in range(MAX_CRON_ITERATIONS):
            next_time = cron.get_next()
            if next_time.astimezone(UTC) > current_dt_utc:
                result = self._dst_safe_from_dt(next_time)
                # Log when DST disambiguation is actually relevant: either the returned
                # tick itself is ambiguous (fold=1 was meaningful), or we traversed
                # ambiguous ticks on the way here. Logs the dispatched result (post-
                # disambiguation) to avoid printing a wall-clock string that differs from
                # the instant actually scheduled.
                returned_is_ambiguous = self._is_fall_back_ambiguous(next_time)
                if ambiguous_ticks_skipped or returned_is_ambiguous:
                    LOGGER.info(
                        "CronTrigger(%s): DST fall-back disambiguation — %d ambiguous tick(s) "
                        "traversed; returning %s (fold=1, post-transition)",
                        self.cron_expression,
                        ambiguous_ticks_skipped + (1 if returned_is_ambiguous else 0),
                        result,
                    )
                return result
            # Only count ticks we actually skipped past — the tick that exits the loop is
            # handled above.
            if self._is_fall_back_ambiguous(next_time):
                ambiguous_ticks_skipped += 1
        # Too many iterations — skip ahead from current time.
        # Re-anchor croniter in the *original* timezone so cron expressions like
        # "0 9 * * *" still fire at 09:00 local time, not 09:00 UTC.
        LOGGER.warning(
            "CronTrigger(%s) exceeded %d iterations catching up, skipping ahead from current_time",
            self.cron_expression,
            MAX_CRON_ITERATIONS,
        )
        skip_ahead_dt = current_time.py_datetime()
        cron = croniter(self.cron_expression, skip_ahead_dt, ret_type=datetime)
        next_time = cron.get_next()
        return self._dst_safe_from_dt(next_time)

    @staticmethod
    def _is_fall_back_ambiguous(dt: datetime) -> bool:
        """Return True when ``dt``'s wall-clock time occurs twice (fall-back ambiguity)."""
        return dt.replace(fold=0).astimezone(UTC) != dt.replace(fold=1).astimezone(UTC)

    @staticmethod
    def _dst_safe_from_dt(dt: datetime) -> ZonedDateTime:
        """Convert a croniter-produced datetime to ZonedDateTime with DST disambiguation.

        Uses ``fold=1`` (post-transition / "later" occurrence) to handle:
        - **Fall-back (fold/repeated time):** prefers the second (post-transition) occurrence.
          The caller is responsible for logging when ambiguity is encountered (see
          ``_next_after``), which emits a single summary per call instead of per-tick spam.
        - **Spring-forward (gap/skipped time):** croniter already advances past the gap, so
          ``fold=1`` and ``fold=0`` produce the same result for non-gap times.

        Args:
            dt: A timezone-aware datetime produced by ``croniter.get_next()`` with a
                ``ZoneInfo``-backed tzinfo.

        Returns:
            A ``ZonedDateTime`` with DST disambiguation applied.
        """
        return ZonedDateTime.from_py_datetime(dt.replace(fold=1))

first_run_time(current_time: ZonedDateTime) -> ZonedDateTime

Return the first cron-grid-aligned run time at or after current_time.

Source code in src/hassette/scheduler/classes.py
50
51
52
53
54
55
56
def first_run_time(self, current_time: ZonedDateTime) -> ZonedDateTime:
    """Return the first cron-grid-aligned run time at or after current_time."""
    # Use start as the croniter anchor, but always snap to the cron grid.
    # This finds the first cron-aligned time at or after start (or current_time if no start).
    anchor = self.start or current_time
    reference = self.start if (self.start is not None and self.start > current_time) else current_time
    return self._next_after(anchor, reference)

next_run_time(previous_run: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime

Return the next cron-grid-aligned run time after previous_run that is later than current_time.

Source code in src/hassette/scheduler/classes.py
58
59
60
def next_run_time(self, previous_run: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime:
    """Return the next cron-grid-aligned run time after previous_run that is later than current_time."""
    return self._next_after(previous_run, current_time)

ScheduledJob dataclass

A job scheduled to run based on a trigger or at a specific time.

Source code in src/hassette/scheduler/classes.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
@dataclass(order=True)
class ScheduledJob:
    """A job scheduled to run based on a trigger or at a specific time."""

    sort_index: tuple[int, int] = field(init=False, repr=False)
    """Tuple of (next_run timestamp with nanoseconds, object id) for ordering in a priority queue."""

    owner_id: str = field(compare=False)
    """Unique string identifier for the owner of the job, e.g., a component or integration name."""

    next_run: ZonedDateTime = field(compare=False)
    """Unjittered logical fire time — used as `previous_run` in subsequent trigger calls."""

    fire_at: ZonedDateTime = field(init=False, compare=False)
    """Actual dispatch time, including any jitter offset.

    Equals ``next_run`` when no jitter is configured. Set by
    ``SchedulerService._apply_jitter_to_heap()`` at enqueue time when jitter > 0.
    The pop loop in ``_ScheduledJobQueue.pop_due_and_peek_next`` compares against
    ``fire_at`` (not ``next_run``) to decide when to dispatch.
    """

    job: "JobCallable" = field(compare=False)
    """The callable to execute when the job runs."""

    app_key: str = field(default="", compare=False)
    """Configuration-level app key for DB registration (e.g., 'my_app'). Empty for non-App owners."""

    instance_index: int = field(default=0, compare=False)
    """App instance index for DB registration. 0 for non-App owners."""

    trigger: "TriggerProtocol | None" = field(compare=False, default=None)
    """The trigger that determines the job's schedule."""

    group: str | None = field(default=None, compare=False)
    """Optional group name for grouping related jobs. Included in deduplication comparison."""

    jitter: float | None = field(default=None, compare=False)
    """Seconds of random offset applied at enqueue time by ``SchedulerService._apply_jitter_to_heap()``.

    Does not affect ``next_run`` (unjittered logical fire time). See the ``fire_at`` field on
    ``ScheduledJob`` for the actual dispatch time after jitter is applied.
    """

    timeout: float | None = field(default=None, compare=False)
    """Per-job timeout in seconds. ``None`` means use the global default
    (``config.scheduler_job_timeout_seconds``). A positive ``float`` overrides the default.
    Validated at construction: must be positive when set."""

    timeout_disabled: bool = field(default=False, compare=False)
    """When ``True``, timeout enforcement is disabled for this job regardless of the global default."""

    name: str = field(default="", compare=False)
    """Optional name for the job for easier identification."""

    name_auto: bool = field(default=False, compare=False)
    """Whether the name was auto-generated from the callable and trigger ID."""

    args: tuple[Any, ...] = field(default_factory=tuple, compare=False)
    """Positional arguments to pass to the job callable."""

    kwargs: dict[str, Any] = field(default_factory=dict, compare=False)
    """Keyword arguments to pass to the job callable."""

    error_handler: "SchedulerErrorHandlerType | None" = field(default=None, compare=False)
    """Optional error handler for this job.

    When set, this handler is invoked if the job raises an exception (including
    ``TimeoutError``, but excluding ``CancelledError``). Stored as-is for identity comparison
    in ``matches()``. ``compare=False`` prevents ``Callable | None`` from corrupting
    the ``@dataclass(order=True)`` heap ordering.
    """

    db_id: int | None = field(default=None, compare=False)
    """Database row ID for this job. Set by the executor after persistence; None until then."""

    source_location: str = field(default="", compare=False)
    """Captured source location (file:line) of the user code that scheduled this job."""

    registration_source: str = field(default="", compare=False)
    """Captured source code snippet of the scheduling call."""

    source_tier: SourceTier = field(default="app", compare=False)
    """Whether this job originates from a user app or the framework itself."""

    _scheduler: "Scheduler | None" = field(default=None, repr=False, compare=False)
    """Back-reference to the Scheduler that owns this job. Set by Scheduler.add_job()."""

    app_error_handler_resolver: "Callable[[], SchedulerErrorHandlerType | None] | None" = field(
        default=None, init=False, repr=False
    )
    """Closure that resolves the app-level error handler at dispatch time."""

    _dequeued: bool = field(default=False, repr=False, compare=False)
    """True after the job has been synchronously removed from the heap via dequeue_job()."""

    def __hash__(self) -> int:
        # Hashing on object identity is safe: each ScheduledJob is a unique object,
        # and sort_index includes id(self) as the tiebreaker, so the hash contract
        # (a == b implies hash(a) == hash(b)) holds. @dataclass(order=True) generates
        # __eq__ based on all compare=True fields (sort_index only), so two distinct
        # objects with the same sort_index are unequal and can safely share a hash bucket.
        return id(self)

    def __repr__(self) -> str:
        return f"ScheduledJob(name={self.name!r}, owner_id={self.owner_id})"

    def __post_init__(self) -> None:
        if self.timeout is not None and (isinstance(self.timeout, bool) or self.timeout <= 0):
            raise ValueError("timeout must be a positive number")
        if self.timeout_disabled and self.timeout is not None:
            raise ValueError("Cannot specify both 'timeout' and 'timeout_disabled=True'")

        self.set_next_run(self.next_run)

        if not self.name:
            callable_name = self.job.__name__ if hasattr(self.job, "__name__") else str(self.job)
            trigger_str = self.trigger.trigger_id() if self.trigger is not None else None
            self.name = f"{callable_name}:{trigger_str}" if self.trigger else callable_name
            self.name_auto = True

        self.args = tuple(self.args)
        self.kwargs = dict(self.kwargs)

    def mark_registered(self, db_id: int) -> None:
        """Set the database ID. Called by SchedulerService.add_job() after persistence.

        First call wins — a second call is a no-op, so a retry or double-registration
        cannot overwrite the original id. Mirrors ``Listener.mark_registered``.
        """
        if self.db_id is None:
            self.db_id = db_id

    def matches(self, other: "ScheduledJob") -> bool:
        """Check whether two jobs represent the same logical configuration.

        Compares callable, trigger (by trigger_id()), group, args, and kwargs.
        Does not compare runtime state (db_id, next_run, sort_index, _scheduler,
        _dequeued, owner, or any other mutable runtime field).

        Two jobs with identical callable/trigger/args but different groups are distinct
        logical jobs and will not match.
        """
        if self.trigger is not None and other.trigger is not None:
            triggers_match = self.trigger.trigger_id() == other.trigger.trigger_id()
        else:
            triggers_match = self.trigger is other.trigger
        return (
            self.job == other.job
            and triggers_match
            and self.group == other.group
            and self.jitter == other.jitter
            and self.timeout == other.timeout
            and self.timeout_disabled == other.timeout_disabled
            and self.args == other.args
            and self.kwargs == other.kwargs
            and self.error_handler is other.error_handler
        )

    def diff_fields(self, other: "ScheduledJob") -> list[str]:
        """Return a list of configuration field names that differ between two jobs.

        Compares the same fields as ``matches()`` — callable, trigger, group,
        jitter, timeout, timeout_disabled, args, kwargs.
        """
        changed: list[str] = []
        if self.job != other.job:
            changed.append("job")
        self_tid = self.trigger.trigger_id() if self.trigger is not None else None
        other_tid = other.trigger.trigger_id() if other.trigger is not None else None
        if self_tid != other_tid:
            changed.append("trigger")
        if self.group != other.group:
            changed.append("group")
        if self.jitter != other.jitter:
            changed.append("jitter")
        if self.timeout != other.timeout:
            changed.append("timeout")
        if self.timeout_disabled != other.timeout_disabled:
            changed.append("timeout_disabled")
        if self.args != other.args:
            changed.append("args")
        if self.kwargs != other.kwargs:
            changed.append("kwargs")
        if self.error_handler is not other.error_handler:
            changed.append("error_handler")
        return changed

    def set_app_error_handler_resolver(self, resolver: "Callable[[], SchedulerErrorHandlerType | None]") -> None:
        """Set the closure that resolves the app-level error handler at dispatch time."""
        self.app_error_handler_resolver = resolver

    def cancel(self) -> None:
        """Cancel the job by delegating to the owning Scheduler.

        Raises:
            RuntimeError: When called on a job that has not been registered with a Scheduler.
                Use ``Scheduler.cancel_job(job)`` directly, or register the job first.
        """
        if self._scheduler is None:
            raise RuntimeError(
                "cancel() called on a job not registered with a Scheduler. "
                "Use Scheduler.cancel_job(job) or register the job first."
            )
        self._scheduler.cancel_job(self)

    def set_next_run(self, next_run: ZonedDateTime) -> None:
        """Update the next run timestamp, fire_at, and ordering metadata.

        Both ``next_run`` and ``fire_at`` are set to the rounded value. Call
        ``SchedulerService._apply_jitter_to_heap()`` after this to set a jittered
        ``fire_at`` when the job has ``jitter`` configured.
        """
        rounded = next_run.round(unit="second")
        self.next_run = rounded
        self.fire_at = rounded
        self.sort_index = (rounded.timestamp_nanos(), id(self))

sort_index: tuple[int, int] = field(init=False, repr=False) class-attribute instance-attribute

Tuple of (next_run timestamp with nanoseconds, object id) for ordering in a priority queue.

owner_id: str = field(compare=False) class-attribute instance-attribute

Unique string identifier for the owner of the job, e.g., a component or integration name.

next_run: ZonedDateTime = field(compare=False) class-attribute instance-attribute

Unjittered logical fire time — used as previous_run in subsequent trigger calls.

fire_at: ZonedDateTime = field(init=False, compare=False) class-attribute instance-attribute

Actual dispatch time, including any jitter offset.

Equals next_run when no jitter is configured. Set by SchedulerService._apply_jitter_to_heap() at enqueue time when jitter > 0. The pop loop in _ScheduledJobQueue.pop_due_and_peek_next compares against fire_at (not next_run) to decide when to dispatch.

job: JobCallable = field(compare=False) class-attribute instance-attribute

The callable to execute when the job runs.

app_key: str = field(default='', compare=False) class-attribute instance-attribute

Configuration-level app key for DB registration (e.g., 'my_app'). Empty for non-App owners.

instance_index: int = field(default=0, compare=False) class-attribute instance-attribute

App instance index for DB registration. 0 for non-App owners.

trigger: TriggerProtocol | None = field(compare=False, default=None) class-attribute instance-attribute

The trigger that determines the job's schedule.

group: str | None = field(default=None, compare=False) class-attribute instance-attribute

Optional group name for grouping related jobs. Included in deduplication comparison.

jitter: float | None = field(default=None, compare=False) class-attribute instance-attribute

Seconds of random offset applied at enqueue time by SchedulerService._apply_jitter_to_heap().

Does not affect next_run (unjittered logical fire time). See the fire_at field on ScheduledJob for the actual dispatch time after jitter is applied.

timeout: float | None = field(default=None, compare=False) class-attribute instance-attribute

Per-job timeout in seconds. None means use the global default (config.scheduler_job_timeout_seconds). A positive float overrides the default. Validated at construction: must be positive when set.

timeout_disabled: bool = field(default=False, compare=False) class-attribute instance-attribute

When True, timeout enforcement is disabled for this job regardless of the global default.

name: str = field(default='', compare=False) class-attribute instance-attribute

Optional name for the job for easier identification.

name_auto: bool = field(default=False, compare=False) class-attribute instance-attribute

Whether the name was auto-generated from the callable and trigger ID.

args: tuple[Any, ...] = field(default_factory=tuple, compare=False) class-attribute instance-attribute

Positional arguments to pass to the job callable.

kwargs: dict[str, Any] = field(default_factory=dict, compare=False) class-attribute instance-attribute

Keyword arguments to pass to the job callable.

error_handler: SchedulerErrorHandlerType | None = field(default=None, compare=False) class-attribute instance-attribute

Optional error handler for this job.

When set, this handler is invoked if the job raises an exception (including TimeoutError, but excluding CancelledError). Stored as-is for identity comparison in matches(). compare=False prevents Callable | None from corrupting the @dataclass(order=True) heap ordering.

db_id: int | None = field(default=None, compare=False) class-attribute instance-attribute

Database row ID for this job. Set by the executor after persistence; None until then.

source_location: str = field(default='', compare=False) class-attribute instance-attribute

Captured source location (file:line) of the user code that scheduled this job.

registration_source: str = field(default='', compare=False) class-attribute instance-attribute

Captured source code snippet of the scheduling call.

source_tier: SourceTier = field(default='app', compare=False) class-attribute instance-attribute

Whether this job originates from a user app or the framework itself.

app_error_handler_resolver: Callable[[], SchedulerErrorHandlerType | None] | None = field(default=None, init=False, repr=False) class-attribute instance-attribute

Closure that resolves the app-level error handler at dispatch time.

mark_registered(db_id: int) -> None

Set the database ID. Called by SchedulerService.add_job() after persistence.

First call wins — a second call is a no-op, so a retry or double-registration cannot overwrite the original id. Mirrors Listener.mark_registered.

Source code in src/hassette/scheduler/classes.py
259
260
261
262
263
264
265
266
def mark_registered(self, db_id: int) -> None:
    """Set the database ID. Called by SchedulerService.add_job() after persistence.

    First call wins — a second call is a no-op, so a retry or double-registration
    cannot overwrite the original id. Mirrors ``Listener.mark_registered``.
    """
    if self.db_id is None:
        self.db_id = db_id

matches(other: ScheduledJob) -> bool

Check whether two jobs represent the same logical configuration.

Compares callable, trigger (by trigger_id()), group, args, and kwargs. Does not compare runtime state (db_id, next_run, sort_index, _scheduler, _dequeued, owner, or any other mutable runtime field).

Two jobs with identical callable/trigger/args but different groups are distinct logical jobs and will not match.

Source code in src/hassette/scheduler/classes.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def matches(self, other: "ScheduledJob") -> bool:
    """Check whether two jobs represent the same logical configuration.

    Compares callable, trigger (by trigger_id()), group, args, and kwargs.
    Does not compare runtime state (db_id, next_run, sort_index, _scheduler,
    _dequeued, owner, or any other mutable runtime field).

    Two jobs with identical callable/trigger/args but different groups are distinct
    logical jobs and will not match.
    """
    if self.trigger is not None and other.trigger is not None:
        triggers_match = self.trigger.trigger_id() == other.trigger.trigger_id()
    else:
        triggers_match = self.trigger is other.trigger
    return (
        self.job == other.job
        and triggers_match
        and self.group == other.group
        and self.jitter == other.jitter
        and self.timeout == other.timeout
        and self.timeout_disabled == other.timeout_disabled
        and self.args == other.args
        and self.kwargs == other.kwargs
        and self.error_handler is other.error_handler
    )

diff_fields(other: ScheduledJob) -> list[str]

Return a list of configuration field names that differ between two jobs.

Compares the same fields as matches() — callable, trigger, group, jitter, timeout, timeout_disabled, args, kwargs.

Source code in src/hassette/scheduler/classes.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def diff_fields(self, other: "ScheduledJob") -> list[str]:
    """Return a list of configuration field names that differ between two jobs.

    Compares the same fields as ``matches()`` — callable, trigger, group,
    jitter, timeout, timeout_disabled, args, kwargs.
    """
    changed: list[str] = []
    if self.job != other.job:
        changed.append("job")
    self_tid = self.trigger.trigger_id() if self.trigger is not None else None
    other_tid = other.trigger.trigger_id() if other.trigger is not None else None
    if self_tid != other_tid:
        changed.append("trigger")
    if self.group != other.group:
        changed.append("group")
    if self.jitter != other.jitter:
        changed.append("jitter")
    if self.timeout != other.timeout:
        changed.append("timeout")
    if self.timeout_disabled != other.timeout_disabled:
        changed.append("timeout_disabled")
    if self.args != other.args:
        changed.append("args")
    if self.kwargs != other.kwargs:
        changed.append("kwargs")
    if self.error_handler is not other.error_handler:
        changed.append("error_handler")
    return changed

set_app_error_handler_resolver(resolver: Callable[[], SchedulerErrorHandlerType | None]) -> None

Set the closure that resolves the app-level error handler at dispatch time.

Source code in src/hassette/scheduler/classes.py
323
324
325
def set_app_error_handler_resolver(self, resolver: "Callable[[], SchedulerErrorHandlerType | None]") -> None:
    """Set the closure that resolves the app-level error handler at dispatch time."""
    self.app_error_handler_resolver = resolver

cancel() -> None

Cancel the job by delegating to the owning Scheduler.

Raises:

Type Description
RuntimeError

When called on a job that has not been registered with a Scheduler. Use Scheduler.cancel_job(job) directly, or register the job first.

Source code in src/hassette/scheduler/classes.py
327
328
329
330
331
332
333
334
335
336
337
338
339
def cancel(self) -> None:
    """Cancel the job by delegating to the owning Scheduler.

    Raises:
        RuntimeError: When called on a job that has not been registered with a Scheduler.
            Use ``Scheduler.cancel_job(job)`` directly, or register the job first.
    """
    if self._scheduler is None:
        raise RuntimeError(
            "cancel() called on a job not registered with a Scheduler. "
            "Use Scheduler.cancel_job(job) or register the job first."
        )
    self._scheduler.cancel_job(self)

set_next_run(next_run: ZonedDateTime) -> None

Update the next run timestamp, fire_at, and ordering metadata.

Both next_run and fire_at are set to the rounded value. Call SchedulerService._apply_jitter_to_heap() after this to set a jittered fire_at when the job has jitter configured.

Source code in src/hassette/scheduler/classes.py
341
342
343
344
345
346
347
348
349
350
351
def set_next_run(self, next_run: ZonedDateTime) -> None:
    """Update the next run timestamp, fire_at, and ordering metadata.

    Both ``next_run`` and ``fire_at`` are set to the rounded value. Call
    ``SchedulerService._apply_jitter_to_heap()`` after this to set a jittered
    ``fire_at`` when the job has ``jitter`` configured.
    """
    rounded = next_run.round(unit="second")
    self.next_run = rounded
    self.fire_at = rounded
    self.sort_index = (rounded.timestamp_nanos(), id(self))