@rpc@emit@periodic@rpcread on demand@emitalert on thresholds@periodicpoll every N seconds@rpc@emit@on@rpcremote commands@emitstatus updates@onreact to sensor alerts@rpc@emit@rpccapture, pan/tilt@emitmotion detected@rpc@emit@rpcopen, close, set@emitstate changed# sensor.py — runs on the Pi class SensorDriver(DeviceDriver): device_type = "sensor" @rpc() async def get_reading(self) -> dict: """Return current temperature.""" return {"temp": 22.5, "humidity": 45} @emit() async def alert(self, level, message): """Broadcast an alert event.""" pass @periodic(interval=10) async def poll(self): reading = await self.get_reading() if reading["temp"] > 30: await self.alert("warning", "Hot!")
# agent.py — runs anywhere from device_connect_agent_tools import ( connect, discover_devices, invoke_device ) connect() # Find what's on the network devices = discover_devices(device_type="sensor") # Call the @rpc function remotely result = invoke_device("sensor-001", "get_reading") # → {"temp": 22.5, "humidity": 45} # Listen for @emit events subscribe("*.event.alert", on_alert)