App Cache: Patterns & Examples
This page covers practical patterns for self.cache. The Overview covers setup and basic usage.
Pattern: API Response Caching
Avoid hitting external API rate limits by storing responses with a timestamp and checking freshness before making a new request:
from hassette import App, AppConfig
class WeatherApp(App[AppConfig]):
async def on_initialize(self):
await self.scheduler.run_every(self.update_weather, 60)
async def get_weather(self, location: str) -> dict:
cache_key = f"weather:{location}"
# Check cache first
if cache_key in self.cache:
cached_time, data = self.cache[cache_key]
# Return cached data if less than 30 minutes old
if cached_time > self.now().subtract(minutes=30):
self.logger.info("Using cached weather for %s", location)
return data # pyright: ignore[reportReturnType]
# Fetch fresh data from API
self.logger.info("Fetching fresh weather for %s", location)
data = await self.fetch_weather_api(location)
self.cache[cache_key] = (self.now(), data)
return data
async def fetch_weather_api(self, location: str) -> dict:
# Your external API call here
return {"temperature": 72}
async def update_weather(self):
weather = await self.get_weather("New York")
await self.api.set_state(
"sensor.weather_forecast",
str(weather["temperature"]),
)
The pattern: check if the cached entry exists and is within the TTL window, return it if so, otherwise fetch fresh data and update the cache.
Pattern: Rate-Limiting Notifications
Prevent notification spam by recording when the last notification was sent and skipping the call if the cooldown has not elapsed:
from hassette import App, AppConfig, P
class WaterLeakAlertApp(App[AppConfig]):
async def on_initialize(self):
await self.bus.on_state_change(
"binary_sensor.water_leak",
handler=self.on_leak_detected,
where=P.StateTo("on"),
name="water_leak",
)
async def on_leak_detected(self, event):
"""Send notification, but not more than once every 4 hours."""
cache_key = "last_leak_notification"
# Check when we last sent a notification
last_sent = self.cache.get(cache_key)
if last_sent is not None:
if last_sent > self.now().subtract(hours=4):
time_since_last = self.now() - last_sent
self.logger.info("Skipping notification - last sent %s ago", time_since_last)
return
# Send the notification
await self.api.call_service(
"notify",
"mobile_app",
message="Water leak detected!",
title="Alert",
)
# Update cache with current time
self.cache[cache_key] = self.now()
self.logger.info("Leak notification sent")
For per-entity rate-limiting (e.g., one cooldown per sensor rather than a single global cooldown), include the entity ID in the cache key: f"last_notification:{event.data.entity_id}".
Pattern: Persistent Counters
Track events across restarts by loading the counter from the cache at initialization and writing it back on every increment:
from hassette import App, AppConfig, P
class MotionCounterApp(App[AppConfig]):
async def on_initialize(self):
# Restore counter from cache, or start at 0
self.motion_count = self.cache.get("motion_count", 0)
self.logger.info("Motion count restored: %s", self.motion_count)
await self.bus.on_state_change(
"binary_sensor.motion",
handler=self.on_motion,
where=P.StateTo("on"),
name="motion_counter",
)
async def on_motion(self, event):
self.motion_count += 1
self.cache["motion_count"] = self.motion_count
self.logger.info("Total motion events: %s", self.motion_count)
The counter is restored from disk the next time the app starts, so motion_count accumulates across Hassette restarts.
Pattern: Storing Complex Data
The cache stores any picklable Python object — including dataclasses with typed fields:
import dataclasses
from dataclasses import dataclass
from hassette import App, AppConfig
from whenever import ZonedDateTime
@dataclass
class EnergyStats:
total_kwh: float
peak_usage: float
last_updated: ZonedDateTime
class EnergyTrackerApp(App[AppConfig]):
async def on_initialize(self):
# Load previous stats or create new ones
self.stats: EnergyStats = self.cache.get( # pyright: ignore[reportAttributeAccessIssue]
"energy_stats",
EnergyStats(0.0, 0.0, self.now()),
)
await self.scheduler.run_hourly(self.update_stats)
async def update_stats(self):
current_usage = await self.get_current_usage()
# Create a new stats object — do not mutate the existing one
self.stats = dataclasses.replace(
self.stats,
total_kwh=self.stats.total_kwh + current_usage,
peak_usage=max(self.stats.peak_usage, current_usage),
last_updated=self.now(),
)
# Persist to cache
self.cache["energy_stats"] = self.stats
self.logger.info("Updated stats: %s", self.stats)
async def get_current_usage(self) -> float:
state = await self.api.get_state("sensor.power_usage")
return float(state.value)
Create new objects instead of mutating
Use dataclasses.replace() to produce a new object rather than modifying the existing one. This keeps your app logic predictable and avoids partially-written state if an error occurs before the cache write.
Pattern: Expiring Cache Entries
For simple expiration, use self.cache.set(key, value, expire=seconds) — diskcache removes the entry automatically once the timeout elapses:
self.cache.set("weather_data", payload, expire=3600) # expires in 1 hour
When you need access to the timestamp itself — for example, to display "last fetched" information or to implement custom staleness logic — store a timestamp alongside the value instead:
from hassette import App, AppConfig
class DataCacheApp(App[AppConfig]):
async def get_cached_data(self, key: str, ttl_minutes: int = 60):
"""Get data from cache if not expired, or None if expired or absent."""
cache_key = f"data:{key}"
if cache_key in self.cache:
timestamp, value = self.cache[cache_key]
# Return cached data if still within TTL
if timestamp > self.now().subtract(minutes=ttl_minutes):
return value
# Data expired or not found
return None
async def set_cached_data(self, key: str, value) -> None:
"""Store data alongside a timestamp for TTL tracking."""
cache_key = f"data:{key}"
self.cache[cache_key] = (self.now(), value)
Pattern: Load Once, Write on Shutdown
For data that is read frequently during a run but only needs to be persisted at shutdown, load from the cache at initialization into an instance variable and write back at shutdown:
from hassette import App, AppConfig
class OptimizedApp(App[AppConfig]):
async def on_initialize(self):
# Load from disk cache once into an instance variable
self.config_data: dict = self.cache.get("config", {}) # pyright: ignore[reportAttributeAccessIssue]
# Use the in-memory copy throughout the app's lifetime
setting = self.config_data.get("some_setting")
self.logger.info("Setting: %s", setting)
async def on_shutdown(self):
# Persist changes back to disk cache at shutdown
self.cache["config"] = self.config_data
This avoids disk I/O on every access while still persisting the data across restarts.
Best Practices
What to Cache
Good uses:
- Notification timestamps for rate-limiting
- External API responses that have a meaningful TTL
- Computed values that are expensive to recalculate
- Rolling counters and statistics
- User preferences or app settings
Avoid caching:
- Real-time Home Assistant entity state — use
self.statesinstead - Large binary files — consider external storage
- Session-only temporary flags — use instance variables
Cache vs. StateManager
| Use Case | Tool | Reason |
|---|---|---|
| Current sensor values | self.states |
Real-time HA state |
| Historical data | self.cache |
Persists across restarts |
| Computed aggregates | self.cache |
Not part of HA state |
| External API responses | self.cache |
Reduce external calls |
| Temporary flags (this run only) | Instance variables | No persistence needed |
Performance
Cache access involves disk I/O and is not instantaneous. For data that is read many times per second within a single run, load into an instance variable at initialization (see the Load Once, Write on Shutdown pattern above). The cache is thread-safe and can be accessed from multiple async tasks concurrently.
Troubleshooting
Cache Not Persisting
If data is not surviving restarts:
- Confirm you are writing to
self.cache, not a local variable namedcache - Confirm the app completes initialization without raising an exception (a startup error can prevent the shutdown flush)
- Confirm the cache directory has write permissions
- Confirm the value is picklable — unpicklable objects raise a
PicklingErrorat write time
Cache Size Exceeded
When the cache reaches default_cache_size, the least recently used items are evicted automatically. If you are losing important data:
- Increase
default_cache_sizein Global Settings - Implement expiration logic to remove stale entries (see Expiring Cache Entries)
- Consider storing large objects externally and caching only references or identifiers
Debugging Cache Operations
Enable debug logging to see cache operations in the logs:
[hassette]
log_level = "DEBUG"
Verify the cache directory exists and contains data:
ls -lah ~/.local/share/hassette/v0/MyApp/cache/
See Also
- App Cache Overview — how it works, configuration, lifecycle
- Global Settings —
data_diranddefault_cache_size - Apps Overview — app lifecycle
- diskcache documentation — full cache library reference