Scheduling Methods
The scheduler provides several methods to run tasks at different times. All methods return a ScheduledJob.
Primary Entry Point
schedule
The primary entry point for scheduling. All convenience methods delegate here. Use it directly when working with trigger objects.
| Parameter | Type | Default | Description |
|---|---|---|---|
func |
callable | (required) | The function to run. |
trigger |
TriggerProtocol |
(required) | A trigger object that determines the schedule. See Trigger Types. |
name |
str |
"" |
Optional name for the job. |
group |
str \| None |
None |
Optional group name for bulk management. See Job Groups. |
jitter |
float \| None |
None |
Optional seconds of random offset applied at enqueue time. See Jitter. |
timeout |
float \| None |
None |
Per-job timeout in seconds. None uses the global scheduler_job_timeout_seconds default. A positive float overrides it. See Timeouts. |
timeout_disabled |
bool |
False |
When True, timeout enforcement is disabled for this job regardless of the global default. |
if_exists |
"error" | "skip" | "replace" |
"error" |
Behavior when a job with this name already exists. See Idempotent Registration. |
args |
tuple | None |
None |
Positional arguments passed to func. |
kwargs |
Mapping | None |
None |
Keyword arguments passed to func. |
from hassette import App, AppConfig
from hassette.scheduler import Cron, Daily, Every
class ScheduleExampleApp(App[AppConfig]):
async def on_initialize(self) -> None:
# Fixed interval
job = await self.scheduler.schedule(self.check_sensors, Every(minutes=5)) # pyright: ignore[reportUnusedVariable]
# Daily at a specific time
job = await self.scheduler.schedule(self.morning_routine, Daily(at="07:00"), group="morning") # pyright: ignore[reportUnusedVariable]
# Cron expression
job = await self.scheduler.schedule(self.workday_task, Cron("0 9 * * 1-5")) # pyright: ignore[reportUnusedVariable]
async def check_sensors(self) -> None: ...
async def morning_routine(self) -> None: ...
async def workday_task(self) -> None: ...
Convenience Methods
run_in
Run once after a delay. Useful for timeouts or delayed actions.
| Parameter | Type | Default | Description |
|---|---|---|---|
func |
callable | (required) | The function to run. |
delay |
float |
(required) | Delay in seconds before running. |
name |
str |
"" |
Optional name for the job. |
group |
str \| None |
None |
Optional group name. See Job Groups. |
timeout |
float \| None |
None |
Per-job timeout in seconds. None uses the global default. See Timeouts. |
timeout_disabled |
bool |
False |
Disable timeout enforcement for this job. |
if_exists |
"error" | "skip" | "replace" |
"error" |
Behavior when a job with this name already exists. See Idempotent Registration. |
args |
tuple | None |
None |
Positional arguments passed to func. |
kwargs |
Mapping | None |
None |
Keyword arguments passed to func. |
from hassette import App, AppConfig
class DelayApp(App[AppConfig]):
async def on_initialize(self):
# Run in 5 seconds
await self.scheduler.run_in(self.turn_off_light, delay=5.0, name="turn_off_light")
# Run in 10 minutes (using TimeDelta or seconds)
await self.scheduler.run_in(self.check_status, delay=600, name="check_status")
async def turn_off_light(self):
pass
async def check_status(self):
pass
run_once
Run once at a specific wall-clock time. Accepts a "HH:MM" string or a ZonedDateTime.
| Parameter | Type | Default | Description |
|---|---|---|---|
func |
callable | (required) | The function to run. |
at |
str \| ZonedDateTime |
(required) | Target time. "HH:MM" is interpreted as today in the system timezone; if already past, defers to tomorrow. |
name |
str |
"" |
Optional name for the job. |
group |
str \| None |
None |
Optional group name. See Job Groups. |
timeout |
float \| None |
None |
Per-job timeout in seconds. None uses the global default. See Timeouts. |
timeout_disabled |
bool |
False |
Disable timeout enforcement for this job. |
if_exists |
"error" | "skip" | "replace" |
"error" |
Behavior when a job with this name already exists. See Idempotent Registration. |
args |
tuple | None |
None |
Positional arguments passed to func. |
kwargs |
Mapping | None |
None |
Keyword arguments passed to func. |
Past times defer to tomorrow
If the "HH:MM" time has already passed today, the job is deferred to tomorrow and a WARNING is logged. To fire immediately instead, use run_in with a short delay.
from hassette import App, AppConfig
class AlarmApp(App[AppConfig]):
async def on_initialize(self):
# Run once at 7:30 AM today (or tomorrow if already past)
await self.scheduler.run_once(self.morning_alarm, at="07:30", name="morning_alarm")
async def morning_alarm(self):
self.logger.info("Good morning!")
run_every
Run repeatedly at a fixed interval. Specify the interval using hours, minutes, and/or seconds keyword arguments (they are additive).
| Parameter | Type | Default | Description |
|---|---|---|---|
func |
callable | (required) | The function to run. |
hours |
float |
0 |
Hours component of the interval. |
minutes |
float |
0 |
Minutes component of the interval. |
seconds |
float |
0 |
Seconds component of the interval. |
name |
str |
"" |
Optional name for the job. |
group |
str \| None |
None |
Optional group name. See Job Groups. |
timeout |
float \| None |
None |
Per-job timeout in seconds. None uses the global default. See Timeouts. |
timeout_disabled |
bool |
False |
Disable timeout enforcement for this job. |
if_exists |
"error" | "skip" | "replace" |
"error" |
Behavior when a job with this name already exists. See Idempotent Registration. |
args |
tuple | None |
None |
Positional arguments passed to func. |
kwargs |
Mapping | None |
None |
Keyword arguments passed to func. |
from hassette import App, AppConfig
class IntervalApp(App[AppConfig]):
async def on_initialize(self):
# Every 10 seconds
await self.scheduler.run_every(self.poll_api, seconds=10, name="poll_api")
# Every hour (using hours parameter)
await self.scheduler.run_every(self.hourly_check, hours=1, name="hourly_check")
async def poll_api(self):
pass
async def hourly_check(self):
pass
Convenience Interval Helpers
run_minutely
Run every N minutes. Shorthand for run_every(minutes=N).
| Parameter | Type | Default | Description |
|---|---|---|---|
func |
callable | (required) | The function to run. |
minutes |
int |
1 |
Minute interval. Must be at least 1. |
name |
str |
"" |
Optional name for the job. |
group |
str \| None |
None |
Optional group name. See Job Groups. |
timeout |
float \| None |
None |
Per-job timeout in seconds. None uses the global default. See Timeouts. |
timeout_disabled |
bool |
False |
Disable timeout enforcement for this job. |
if_exists |
"error" | "skip" | "replace" |
"error" |
Behavior when a job with this name already exists. See Idempotent Registration. |
args |
tuple | None |
None |
Positional arguments passed to func. |
kwargs |
Mapping | None |
None |
Keyword arguments passed to func. |
from hassette import App, AppConfig
class MinutelyApp(App[AppConfig]):
async def on_initialize(self):
# Every minute
await self.scheduler.run_minutely(self.task, name="task_minutely")
# Every 5 minutes
await self.scheduler.run_minutely(self.task, minutes=5, name="task_every_5m")
async def task(self):
pass
run_hourly
Run every N hours. Shorthand for run_every(hours=N).
| Parameter | Type | Default | Description |
|---|---|---|---|
func |
callable | (required) | The function to run. |
hours |
int |
1 |
Hour interval. Must be at least 1. |
name |
str |
"" |
Optional name for the job. |
group |
str \| None |
None |
Optional group name. See Job Groups. |
timeout |
float \| None |
None |
Per-job timeout in seconds. None uses the global default. See Timeouts. |
timeout_disabled |
bool |
False |
Disable timeout enforcement for this job. |
if_exists |
"error" | "skip" | "replace" |
"error" |
Behavior when a job with this name already exists. See Idempotent Registration. |
args |
tuple | None |
None |
Positional arguments passed to func. |
kwargs |
Mapping | None |
None |
Keyword arguments passed to func. |
from hassette import App, AppConfig
class HourlyApp(App[AppConfig]):
async def on_initialize(self):
# Every hour
await self.scheduler.run_hourly(self.task, name="task_hourly")
# Every 4 hours
await self.scheduler.run_hourly(self.task, hours=4, name="task_every_4h")
async def task(self):
pass
run_daily
Run once per day at a fixed wall-clock time. Uses a cron-based trigger internally for DST-correct, wall-clock-aligned scheduling.
| Parameter | Type | Default | Description |
|---|---|---|---|
func |
callable | (required) | The function to run. |
at |
str |
"00:00" |
Target wall-clock time in "HH:MM" format. |
name |
str |
"" |
Optional name for the job. |
group |
str \| None |
None |
Optional group name. See Job Groups. |
timeout |
float \| None |
None |
Per-job timeout in seconds. None uses the global default. See Timeouts. |
timeout_disabled |
bool |
False |
Disable timeout enforcement for this job. |
if_exists |
"error" | "skip" | "replace" |
"error" |
Behavior when a job with this name already exists. See Idempotent Registration. |
args |
tuple | None |
None |
Positional arguments passed to func. |
kwargs |
Mapping | None |
None |
Keyword arguments passed to func. |
from hassette import App, AppConfig
class DailyApp(App[AppConfig]):
async def on_initialize(self):
# Every day at midnight (default)
await self.scheduler.run_daily(self.task, name="task_daily")
# Every day at 7:00 AM (wall-clock, DST-safe)
await self.scheduler.run_daily(self.morning_routine, at="07:00", name="morning_routine")
async def task(self):
pass
async def morning_routine(self):
pass
Cron Scheduling
run_cron
Run on a schedule defined by a cron expression. Accepts both 5-field (standard Unix cron: minute hour dom month dow) and 6-field expressions (with seconds appended as a 6th field).
| Parameter | Type | Default | Description |
|---|---|---|---|
func |
callable | (required) | The function to run. |
expression |
str |
(required) | A valid 5- or 6-field cron expression. |
name |
str |
"" |
Optional name for the job. |
group |
str \| None |
None |
Optional group name. See Job Groups. |
timeout |
float \| None |
None |
Per-job timeout in seconds. None uses the global default. See Timeouts. |
timeout_disabled |
bool |
False |
Disable timeout enforcement for this job. |
if_exists |
"error" | "skip" | "replace" |
"error" |
Behavior when a job with this name already exists. See Idempotent Registration. |
args |
tuple | None |
None |
Positional arguments passed to func. |
kwargs |
Mapping | None |
None |
Keyword arguments passed to func. |
Cron expression fields (5-field standard):
| Position | Field | Values | Example |
|---|---|---|---|
| 1 | minute | 0-59 | */15 — every 15 minutes |
| 2 | hour | 0-23 | 9 — 9 AM |
| 3 | day of month | 1-31 | 1,15 — 1st and 15th |
| 4 | month | 1-12 | 6 — June |
| 5 | day of week | 0-6 (Sunday=0) | 1-5 — weekdays |
from hassette import App, AppConfig
class CronApp(App[AppConfig]):
async def on_initialize(self):
# Weekdays at 9 AM (5-field standard cron: minute hour dom month dow)
await self.scheduler.run_cron(self.work_start, "0 9 * * 1-5")
# Every 15 minutes
await self.scheduler.run_cron(self.check, "*/15 * * * *")
# First of the month at midnight
await self.scheduler.run_cron(self.monthly_job, "0 0 1 * *")
async def work_start(self):
pass
async def check(self):
pass
async def monthly_job(self):
pass
Job Groups
Schedule related jobs into a named group for bulk management. Pass group= to any scheduling method or to schedule() directly.
from hassette import App, AppConfig
class MorningApp(App[AppConfig]):
async def on_initialize(self) -> None:
await self.scheduler.run_daily(self.open_blinds, at="08:00", group="morning")
await self.scheduler.run_daily(self.play_music, at="08:05", group="morning")
await self.scheduler.run_daily(self.start_coffee, at="08:10", group="morning")
async def on_vacation_start(self) -> None:
self.scheduler.cancel_group("morning")
async def open_blinds(self) -> None: ...
async def play_music(self) -> None: ...
async def start_coffee(self) -> None: ...
| Method | Description |
|---|---|
cancel_group(group) |
Cancel all jobs in the group. No-op if the group does not exist. |
list_jobs(group=group) |
Return all jobs in the group. Without group=, returns all jobs. |
Jitter
Add random offset to a job's enqueue time with the jitter= parameter. This spreads out jobs that would otherwise fire at the exact same instant — useful for avoiding thundering-herd scenarios when many apps schedule work at the same wall-clock time.
# Spread the actual fire time by up to 30 seconds
await self.scheduler.schedule(self.check_sensors, Daily(at="06:00"), jitter=30)
Jitter is applied to the heap sort index only — the logical next_run timestamp remains unjittered. This means the trigger's interval grid is not affected by jitter, only the order in which co-scheduled jobs are dispatched.
Idempotent Registration
Job names must be unique within each app instance. If you register a job with a name that already exists, the scheduler raises ValueError by default.
All scheduling methods accept an if_exists parameter to control this behavior:
| Value | Behavior |
|---|---|
"error" (default) |
Raise ValueError if a job with the same name already exists. |
"skip" |
Return the existing job if its configuration matches. Raises ValueError if a job with the same name exists but has a different configuration. Two jobs match when they have the same callable, trigger (by trigger_id()), group, jitter, timeout, timeout_disabled, args, and kwargs. Useful for safe re-registration in on_initialize when the job configuration is stable across reloads. |
"replace" |
Cancel the existing job (recording it as cancelled in telemetry) and register the new job in its place. Unlike "skip", the new job does not need to match the existing one's configuration. Useful when the callable, trigger, or parameters may change between reloads. |
This is especially useful in on_initialize, which runs again on app reload:
# Safe to call on every reload — won't create duplicates
await self.scheduler.run_every(
self.check_sensors,
seconds=60,
name="sensor_check",
if_exists="skip",
)
Without if_exists="skip", a reload would raise ValueError because sensor_check is already registered from the previous initialization. Use if_exists="replace" instead when the job's configuration may change — the old job is cancelled and the new configuration takes effect immediately:
# Use replace when the trigger or handler may change between reloads
await self.scheduler.run_every(
self.check_sensors,
seconds=120,
name="sensor_check",
if_exists="replace",
)
Passing Arguments to Handlers
All scheduling methods accept args and kwargs to pass data to the scheduled handler at call time, so you avoid capturing mutable state in closures.
from hassette import App, AppConfig
class NotifyApp(App[AppConfig]):
async def on_initialize(self):
# Pass positional arguments to the handler
await self.scheduler.run_in(
self.send_alert,
delay=30.0,
name="startup_alert",
args=("Kitchen motion sensor", "triggered"),
)
# Pass keyword arguments to the handler
await self.scheduler.run_every(
self.log_status,
seconds=300,
name="status_log",
kwargs={"level": "info", "include_history": True},
)
# Combine args and kwargs
await self.scheduler.run_daily(
self.generate_report,
at="06:00",
name="daily_report",
args=("daily",),
kwargs={"recipients": ["admin"]},
)
async def send_alert(self, sensor: str, state: str):
self.logger.info("Alert: %s is %s", sensor, state)
async def log_status(self, level: str = "debug", include_history: bool = False):
self.logger.info("Status logged (level=%s, history=%s)", level, include_history)
async def generate_report(self, period: str, recipients: list[str]):
self.logger.info("Generating %s report for %s", period, recipients)
Custom Triggers
Implement TriggerProtocol to handle scheduling patterns the built-in triggers don't cover — for example, polling based on solar elevation.
class SolarPollTrigger:
"""Polls on a fixed interval for use with elevation-based logic in the callback."""
def __init__(self, check_every: int = 60):
self.check_every = check_every
def first_run_time(self, current_time: ZonedDateTime) -> ZonedDateTime:
return current_time.add(seconds=self.check_every)
def next_run_time(self, previous_run: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime:
return current_time.add(seconds=self.check_every)
def trigger_label(self) -> str:
return f"solar_poll (every {self.check_every}s)"
def trigger_detail(self) -> str | None:
return f"every {self.check_every}s"
def trigger_db_type(self) -> Literal["interval", "cron", "once", "after", "custom"]:
return "custom"
def trigger_id(self) -> str:
return f"solar_poll:{self.check_every}"
Use it with schedule():
await self.scheduler.schedule(self.check_sun_elevation, SolarPollTrigger(check_every=30))
The TriggerProtocol requires six methods:
| Method | Returns | Description |
|---|---|---|
first_run_time(current_time) |
ZonedDateTime |
When the job should first fire. |
next_run_time(previous_run, current_time) |
ZonedDateTime \| None |
When to fire next. Return None for one-shot triggers. |
trigger_label() |
str |
Short label for logs and the web UI. |
trigger_detail() |
str \| None |
Optional human-readable detail string. |
trigger_db_type() |
str |
Canonical type for database storage. |
trigger_id() |
str |
Stable identifier for deduplication (used by if_exists="skip" and auto-generated job names). |
See Also
- Job Management - Name, track, and cancel scheduled jobs
- Bus - Combine scheduled tasks with event-driven automation
- App Cache - Store data between scheduled runs