Skip to content

Scheduling Methods

The scheduler provides several methods to run tasks at different times. All methods return a ScheduledJob.

Primary Entry Point

schedule

The primary entry point for scheduling. All convenience methods delegate here. Use it directly when working with trigger objects.

Parameter Type Default Description
func callable (required) The function to run.
trigger TriggerProtocol (required) A trigger object that determines the schedule. See Trigger Types.
name str "" Optional name for the job.
group str \| None None Optional group name for bulk management. See Job Groups.
jitter float \| None None Optional seconds of random offset applied at enqueue time. See Jitter.
timeout float \| None None Per-job timeout in seconds. None uses the global scheduler_job_timeout_seconds default. A positive float overrides it. See Timeouts.
timeout_disabled bool False When True, timeout enforcement is disabled for this job regardless of the global default.
if_exists "error" | "skip" | "replace" "error" Behavior when a job with this name already exists. See Idempotent Registration.
args tuple | None None Positional arguments passed to func.
kwargs Mapping | None None Keyword arguments passed to func.
from hassette import App, AppConfig
from hassette.scheduler import Cron, Daily, Every


class ScheduleExampleApp(App[AppConfig]):
    async def on_initialize(self) -> None:
        # Fixed interval
        job = await self.scheduler.schedule(self.check_sensors, Every(minutes=5))  # pyright: ignore[reportUnusedVariable]

        # Daily at a specific time
        job = await self.scheduler.schedule(self.morning_routine, Daily(at="07:00"), group="morning")  # pyright: ignore[reportUnusedVariable]

        # Cron expression
        job = await self.scheduler.schedule(self.workday_task, Cron("0 9 * * 1-5"))  # pyright: ignore[reportUnusedVariable]

    async def check_sensors(self) -> None: ...
    async def morning_routine(self) -> None: ...
    async def workday_task(self) -> None: ...

Convenience Methods

run_in

Run once after a delay. Useful for timeouts or delayed actions.

Parameter Type Default Description
func callable (required) The function to run.
delay float (required) Delay in seconds before running.
name str "" Optional name for the job.
group str \| None None Optional group name. See Job Groups.
timeout float \| None None Per-job timeout in seconds. None uses the global default. See Timeouts.
timeout_disabled bool False Disable timeout enforcement for this job.
if_exists "error" | "skip" | "replace" "error" Behavior when a job with this name already exists. See Idempotent Registration.
args tuple | None None Positional arguments passed to func.
kwargs Mapping | None None Keyword arguments passed to func.
from hassette import App, AppConfig


class DelayApp(App[AppConfig]):
    async def on_initialize(self):
        # Run in 5 seconds
        await self.scheduler.run_in(self.turn_off_light, delay=5.0, name="turn_off_light")

        # Run in 10 minutes (using TimeDelta or seconds)
        await self.scheduler.run_in(self.check_status, delay=600, name="check_status")

    async def turn_off_light(self):
        pass

    async def check_status(self):
        pass

run_once

Run once at a specific wall-clock time. Accepts a "HH:MM" string or a ZonedDateTime.

Parameter Type Default Description
func callable (required) The function to run.
at str \| ZonedDateTime (required) Target time. "HH:MM" is interpreted as today in the system timezone; if already past, defers to tomorrow.
name str "" Optional name for the job.
group str \| None None Optional group name. See Job Groups.
timeout float \| None None Per-job timeout in seconds. None uses the global default. See Timeouts.
timeout_disabled bool False Disable timeout enforcement for this job.
if_exists "error" | "skip" | "replace" "error" Behavior when a job with this name already exists. See Idempotent Registration.
args tuple | None None Positional arguments passed to func.
kwargs Mapping | None None Keyword arguments passed to func.

Past times defer to tomorrow

If the "HH:MM" time has already passed today, the job is deferred to tomorrow and a WARNING is logged. To fire immediately instead, use run_in with a short delay.

from hassette import App, AppConfig


class AlarmApp(App[AppConfig]):
    async def on_initialize(self):
        # Run once at 7:30 AM today (or tomorrow if already past)
        await self.scheduler.run_once(self.morning_alarm, at="07:30", name="morning_alarm")

    async def morning_alarm(self):
        self.logger.info("Good morning!")

run_every

Run repeatedly at a fixed interval. Specify the interval using hours, minutes, and/or seconds keyword arguments (they are additive).

Parameter Type Default Description
func callable (required) The function to run.
hours float 0 Hours component of the interval.
minutes float 0 Minutes component of the interval.
seconds float 0 Seconds component of the interval.
name str "" Optional name for the job.
group str \| None None Optional group name. See Job Groups.
timeout float \| None None Per-job timeout in seconds. None uses the global default. See Timeouts.
timeout_disabled bool False Disable timeout enforcement for this job.
if_exists "error" | "skip" | "replace" "error" Behavior when a job with this name already exists. See Idempotent Registration.
args tuple | None None Positional arguments passed to func.
kwargs Mapping | None None Keyword arguments passed to func.
from hassette import App, AppConfig


class IntervalApp(App[AppConfig]):
    async def on_initialize(self):
        # Every 10 seconds
        await self.scheduler.run_every(self.poll_api, seconds=10, name="poll_api")

        # Every hour (using hours parameter)
        await self.scheduler.run_every(self.hourly_check, hours=1, name="hourly_check")

    async def poll_api(self):
        pass

    async def hourly_check(self):
        pass

Convenience Interval Helpers

run_minutely

Run every N minutes. Shorthand for run_every(minutes=N).

Parameter Type Default Description
func callable (required) The function to run.
minutes int 1 Minute interval. Must be at least 1.
name str "" Optional name for the job.
group str \| None None Optional group name. See Job Groups.
timeout float \| None None Per-job timeout in seconds. None uses the global default. See Timeouts.
timeout_disabled bool False Disable timeout enforcement for this job.
if_exists "error" | "skip" | "replace" "error" Behavior when a job with this name already exists. See Idempotent Registration.
args tuple | None None Positional arguments passed to func.
kwargs Mapping | None None Keyword arguments passed to func.
from hassette import App, AppConfig


class MinutelyApp(App[AppConfig]):
    async def on_initialize(self):
        # Every minute
        await self.scheduler.run_minutely(self.task, name="task_minutely")

        # Every 5 minutes
        await self.scheduler.run_minutely(self.task, minutes=5, name="task_every_5m")

    async def task(self):
        pass

run_hourly

Run every N hours. Shorthand for run_every(hours=N).

Parameter Type Default Description
func callable (required) The function to run.
hours int 1 Hour interval. Must be at least 1.
name str "" Optional name for the job.
group str \| None None Optional group name. See Job Groups.
timeout float \| None None Per-job timeout in seconds. None uses the global default. See Timeouts.
timeout_disabled bool False Disable timeout enforcement for this job.
if_exists "error" | "skip" | "replace" "error" Behavior when a job with this name already exists. See Idempotent Registration.
args tuple | None None Positional arguments passed to func.
kwargs Mapping | None None Keyword arguments passed to func.
from hassette import App, AppConfig


class HourlyApp(App[AppConfig]):
    async def on_initialize(self):
        # Every hour
        await self.scheduler.run_hourly(self.task, name="task_hourly")

        # Every 4 hours
        await self.scheduler.run_hourly(self.task, hours=4, name="task_every_4h")

    async def task(self):
        pass

run_daily

Run once per day at a fixed wall-clock time. Uses a cron-based trigger internally for DST-correct, wall-clock-aligned scheduling.

Parameter Type Default Description
func callable (required) The function to run.
at str "00:00" Target wall-clock time in "HH:MM" format.
name str "" Optional name for the job.
group str \| None None Optional group name. See Job Groups.
timeout float \| None None Per-job timeout in seconds. None uses the global default. See Timeouts.
timeout_disabled bool False Disable timeout enforcement for this job.
if_exists "error" | "skip" | "replace" "error" Behavior when a job with this name already exists. See Idempotent Registration.
args tuple | None None Positional arguments passed to func.
kwargs Mapping | None None Keyword arguments passed to func.
from hassette import App, AppConfig


class DailyApp(App[AppConfig]):
    async def on_initialize(self):
        # Every day at midnight (default)
        await self.scheduler.run_daily(self.task, name="task_daily")

        # Every day at 7:00 AM (wall-clock, DST-safe)
        await self.scheduler.run_daily(self.morning_routine, at="07:00", name="morning_routine")

    async def task(self):
        pass

    async def morning_routine(self):
        pass

Cron Scheduling

run_cron

Run on a schedule defined by a cron expression. Accepts both 5-field (standard Unix cron: minute hour dom month dow) and 6-field expressions (with seconds appended as a 6th field).

Parameter Type Default Description
func callable (required) The function to run.
expression str (required) A valid 5- or 6-field cron expression.
name str "" Optional name for the job.
group str \| None None Optional group name. See Job Groups.
timeout float \| None None Per-job timeout in seconds. None uses the global default. See Timeouts.
timeout_disabled bool False Disable timeout enforcement for this job.
if_exists "error" | "skip" | "replace" "error" Behavior when a job with this name already exists. See Idempotent Registration.
args tuple | None None Positional arguments passed to func.
kwargs Mapping | None None Keyword arguments passed to func.

Cron expression fields (5-field standard):

Position Field Values Example
1 minute 0-59 */15 — every 15 minutes
2 hour 0-23 9 — 9 AM
3 day of month 1-31 1,15 — 1st and 15th
4 month 1-12 6 — June
5 day of week 0-6 (Sunday=0) 1-5 — weekdays
from hassette import App, AppConfig


class CronApp(App[AppConfig]):
    async def on_initialize(self):
        # Weekdays at 9 AM (5-field standard cron: minute hour dom month dow)
        await self.scheduler.run_cron(self.work_start, "0 9 * * 1-5")

        # Every 15 minutes
        await self.scheduler.run_cron(self.check, "*/15 * * * *")

        # First of the month at midnight
        await self.scheduler.run_cron(self.monthly_job, "0 0 1 * *")

    async def work_start(self):
        pass

    async def check(self):
        pass

    async def monthly_job(self):
        pass

Job Groups

Schedule related jobs into a named group for bulk management. Pass group= to any scheduling method or to schedule() directly.

from hassette import App, AppConfig


class MorningApp(App[AppConfig]):
    async def on_initialize(self) -> None:
        await self.scheduler.run_daily(self.open_blinds, at="08:00", group="morning")
        await self.scheduler.run_daily(self.play_music, at="08:05", group="morning")
        await self.scheduler.run_daily(self.start_coffee, at="08:10", group="morning")

    async def on_vacation_start(self) -> None:
        self.scheduler.cancel_group("morning")

    async def open_blinds(self) -> None: ...
    async def play_music(self) -> None: ...
    async def start_coffee(self) -> None: ...
Method Description
cancel_group(group) Cancel all jobs in the group. No-op if the group does not exist.
list_jobs(group=group) Return all jobs in the group. Without group=, returns all jobs.

Jitter

Add random offset to a job's enqueue time with the jitter= parameter. This spreads out jobs that would otherwise fire at the exact same instant — useful for avoiding thundering-herd scenarios when many apps schedule work at the same wall-clock time.

# Spread the actual fire time by up to 30 seconds
await self.scheduler.schedule(self.check_sensors, Daily(at="06:00"), jitter=30)

Jitter is applied to the heap sort index only — the logical next_run timestamp remains unjittered. This means the trigger's interval grid is not affected by jitter, only the order in which co-scheduled jobs are dispatched.


Idempotent Registration

Job names must be unique within each app instance. If you register a job with a name that already exists, the scheduler raises ValueError by default.

All scheduling methods accept an if_exists parameter to control this behavior:

Value Behavior
"error" (default) Raise ValueError if a job with the same name already exists.
"skip" Return the existing job if its configuration matches. Raises ValueError if a job with the same name exists but has a different configuration. Two jobs match when they have the same callable, trigger (by trigger_id()), group, jitter, timeout, timeout_disabled, args, and kwargs. Useful for safe re-registration in on_initialize when the job configuration is stable across reloads.
"replace" Cancel the existing job (recording it as cancelled in telemetry) and register the new job in its place. Unlike "skip", the new job does not need to match the existing one's configuration. Useful when the callable, trigger, or parameters may change between reloads.

This is especially useful in on_initialize, which runs again on app reload:

# Safe to call on every reload — won't create duplicates
await self.scheduler.run_every(
    self.check_sensors,
    seconds=60,
    name="sensor_check",
    if_exists="skip",
)

Without if_exists="skip", a reload would raise ValueError because sensor_check is already registered from the previous initialization. Use if_exists="replace" instead when the job's configuration may change — the old job is cancelled and the new configuration takes effect immediately:

# Use replace when the trigger or handler may change between reloads
await self.scheduler.run_every(
    self.check_sensors,
    seconds=120,
    name="sensor_check",
    if_exists="replace",
)

Passing Arguments to Handlers

All scheduling methods accept args and kwargs to pass data to the scheduled handler at call time, so you avoid capturing mutable state in closures.

from hassette import App, AppConfig


class NotifyApp(App[AppConfig]):
    async def on_initialize(self):
        # Pass positional arguments to the handler
        await self.scheduler.run_in(
            self.send_alert,
            delay=30.0,
            name="startup_alert",
            args=("Kitchen motion sensor", "triggered"),
        )

        # Pass keyword arguments to the handler
        await self.scheduler.run_every(
            self.log_status,
            seconds=300,
            name="status_log",
            kwargs={"level": "info", "include_history": True},
        )

        # Combine args and kwargs
        await self.scheduler.run_daily(
            self.generate_report,
            at="06:00",
            name="daily_report",
            args=("daily",),
            kwargs={"recipients": ["admin"]},
        )

    async def send_alert(self, sensor: str, state: str):
        self.logger.info("Alert: %s is %s", sensor, state)

    async def log_status(self, level: str = "debug", include_history: bool = False):
        self.logger.info("Status logged (level=%s, history=%s)", level, include_history)

    async def generate_report(self, period: str, recipients: list[str]):
        self.logger.info("Generating %s report for %s", period, recipients)

Custom Triggers

Implement TriggerProtocol to handle scheduling patterns the built-in triggers don't cover — for example, polling based on solar elevation.

class SolarPollTrigger:
    """Polls on a fixed interval for use with elevation-based logic in the callback."""

    def __init__(self, check_every: int = 60):
        self.check_every = check_every

    def first_run_time(self, current_time: ZonedDateTime) -> ZonedDateTime:
        return current_time.add(seconds=self.check_every)

    def next_run_time(self, previous_run: ZonedDateTime, current_time: ZonedDateTime) -> ZonedDateTime:
        return current_time.add(seconds=self.check_every)

    def trigger_label(self) -> str:
        return f"solar_poll (every {self.check_every}s)"

    def trigger_detail(self) -> str | None:
        return f"every {self.check_every}s"

    def trigger_db_type(self) -> Literal["interval", "cron", "once", "after", "custom"]:
        return "custom"

    def trigger_id(self) -> str:
        return f"solar_poll:{self.check_every}"

Use it with schedule():

await self.scheduler.schedule(self.check_sun_elevation, SolarPollTrigger(check_every=30))

The TriggerProtocol requires six methods:

Method Returns Description
first_run_time(current_time) ZonedDateTime When the job should first fire.
next_run_time(previous_run, current_time) ZonedDateTime \| None When to fire next. Return None for one-shot triggers.
trigger_label() str Short label for logs and the web UI.
trigger_detail() str \| None Optional human-readable detail string.
trigger_db_type() str Canonical type for database storage.
trigger_id() str Stable identifier for deduplication (used by if_exists="skip" and auto-generated job names).

See Also

  • Job Management - Name, track, and cancel scheduled jobs
  • Bus - Combine scheduled tasks with event-driven automation
  • App Cache - Store data between scheduled runs