commit 3b3f3df53cd5156c094992d1269c340cb27bfa06 Author: Tanishq Dubey Date: Tue Dec 16 11:49:43 2025 -0500 feat: initial implementation of Datacat - Add project structure with UV/hatch build system - Implement config module (env vars + YAML file support) - Implement collector module for pylitterbot integration - Implement metrics module for Datadog submission - Support LR3, LR4, and Feeder Robot metrics - Add event emission for state changes - Add CLI with --once mode for single collection diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..54c6156 --- /dev/null +++ b/.env.example @@ -0,0 +1,20 @@ +# Datacat Environment Variables +# Copy this file to .env and update with your credentials + +# Whisker/Litter Robot credentials +WHISKER_USERNAME=your-email@example.com +WHISKER_PASSWORD=your-password + +# Datadog credentials +DATADOG_API_KEY=your-datadog-api-key +DATADOG_APP_KEY=your-datadog-app-key +DATADOG_SITE=datadoghq.com + +# Collector settings +DATACAT_POLL_INTERVAL=120 +DATACAT_INCLUDE_PETS=true +DATACAT_EMIT_EVENTS=true +DATACAT_METRIC_PREFIX=litterrobot + +# Optional: Path to config file (if using YAML config instead of env vars) +# DATACAT_CONFIG_FILE=/path/to/config.yaml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..978ba3e --- /dev/null +++ b/.gitignore @@ -0,0 +1,81 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# Config files with secrets +config.yaml +config.yml + +# UV +.uv/ +uv.lock + +# macOS +.DS_Store + +# Datacat specific +*.log diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..e4fba21 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.12 diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/config.example.yaml b/config.example.yaml new file mode 100644 index 0000000..e112916 --- /dev/null +++ b/config.example.yaml @@ -0,0 +1,20 @@ +# Datacat Configuration Example +# Copy this file to config.yaml and update with your credentials + +# Whisker/Litter Robot credentials +whisker: + username: "your-email@example.com" + password: "your-password" + +# Datadog configuration +datadog: + api_key: "your-datadog-api-key" + app_key: "your-datadog-app-key" # Optional, required for some features + site: "datadoghq.com" # Use "datadoghq.eu" for EU region + metric_prefix: "litterrobot" # Prefix for all metrics + +# Collector settings +collector: + poll_interval_seconds: 120 # How often to collect metrics (default: 2 minutes) + include_pets: true # Include pet profile metrics + emit_events: true # Emit Datadog events for state changes diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..a78eee9 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,29 @@ +[project] +name = "datacat" +version = "0.1.0" +description = "Collect metrics from Whisker Litter Robot devices and submit to Datadog" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "pylitterbot>=2025.0.0", + "datadog>=0.50.0", + "pyyaml>=6.0", + "python-dotenv>=1.0.0", +] + +[project.scripts] +datacat = "datacat:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/datacat"] + +[tool.ruff] +line-length = 100 +target-version = "py312" + +[tool.ruff.lint] +select = ["E", "F", "I", "N", "W", "UP"] diff --git a/src/datacat/__init__.py b/src/datacat/__init__.py new file mode 100644 index 0000000..346750a --- /dev/null +++ b/src/datacat/__init__.py @@ -0,0 +1,6 @@ +"""Datacat - Litter Robot metrics collector for Datadog.""" + +from datacat.main import main + +__version__ = "0.1.0" +__all__ = ["main"] diff --git a/src/datacat/__main__.py b/src/datacat/__main__.py new file mode 100644 index 0000000..290acd2 --- /dev/null +++ b/src/datacat/__main__.py @@ -0,0 +1,6 @@ +"""Allow running as python -m datacat.""" + +from datacat.main import main + +if __name__ == "__main__": + main() diff --git a/src/datacat/collector.py b/src/datacat/collector.py new file mode 100644 index 0000000..f21c950 --- /dev/null +++ b/src/datacat/collector.py @@ -0,0 +1,604 @@ +"""Collector module for fetching data from Whisker/Litter Robot API.""" + +import logging +from datetime import datetime, timezone +from typing import Any + +from pylitterbot import Account +from pylitterbot.robot import Robot +from pylitterbot.robot.feederrobot import FeederRobot +from pylitterbot.robot.litterrobot import LitterRobot +from pylitterbot.robot.litterrobot3 import LitterRobot3 +from pylitterbot.robot.litterrobot4 import LitterRobot4 + +from datacat.config import Config +from datacat.models import ( + CollectionResult, + Event, + Metric, + MetricType, + PetMetrics, + RobotMetrics, +) + +logger = logging.getLogger(__name__) + + +class Collector: + """Collects metrics from Whisker Litter Robot devices.""" + + def __init__(self, config: Config): + """Initialize the collector with configuration.""" + self.config = config + self._account: Account | None = None + self._previous_states: dict[str, dict[str, Any]] = {} + + async def connect(self) -> None: + """Connect to the Whisker API.""" + logger.info("Connecting to Whisker API...") + self._account = Account() + await self._account.connect( + username=self.config.whisker.username, + password=self.config.whisker.password, + load_robots=True, + load_pets=self.config.collector.include_pets, + ) + logger.info( + "Connected. Found %d robots and %d pets.", + len(self._account.robots), + len(self._account.pets) if self.config.collector.include_pets else 0, + ) + + async def disconnect(self) -> None: + """Disconnect from the Whisker API.""" + if self._account: + logger.info("Disconnecting from Whisker API...") + await self._account.disconnect() + self._account = None + + async def collect(self) -> CollectionResult: + """Collect metrics from all robots and pets.""" + if not self._account: + raise RuntimeError("Not connected. Call connect() first.") + + timestamp = datetime.now(timezone.utc) + result = CollectionResult(timestamp=timestamp) + + # Refresh robot data + try: + await self._account.refresh_robots() + except Exception as e: + logger.error("Failed to refresh robots: %s", e) + result.errors.append(f"Failed to refresh robots: {e}") + + # Collect robot metrics + for robot in self._account.robots: + try: + robot_metrics = self._collect_robot_metrics(robot, timestamp) + result.robots.append(robot_metrics) + except Exception as e: + logger.error("Failed to collect metrics for robot %s: %s", robot.name, e) + result.errors.append(f"Failed to collect metrics for robot {robot.name}: {e}") + + # Collect pet metrics + if self.config.collector.include_pets: + for pet in self._account.pets: + try: + pet_metrics = self._collect_pet_metrics(pet, timestamp) + result.pets.append(pet_metrics) + except Exception as e: + logger.error("Failed to collect metrics for pet %s: %s", pet.name, e) + result.errors.append(f"Failed to collect metrics for pet {pet.name}: {e}") + + return result + + def _get_base_tags(self, robot: Robot) -> list[str]: + """Get base tags for a robot.""" + return [ + f"robot_id:{robot.id}", + f"robot_serial:{robot.serial}", + f"robot_name:{robot.name}", + f"robot_model:{robot.model}", + ] + + def _collect_robot_metrics(self, robot: Robot, timestamp: datetime) -> RobotMetrics: + """Collect metrics from a single robot.""" + prefix = self.config.datadog.metric_prefix + base_tags = self._get_base_tags(robot) + metrics: list[Metric] = [] + events: list[Event] = [] + + # Common metrics for all robots + metrics.append( + Metric( + name=f"{prefix}.online", + value=1.0 if robot.is_online else 0.0, + tags=base_tags, + timestamp=timestamp, + ) + ) + metrics.append( + Metric( + name=f"{prefix}.night_light_enabled", + value=1.0 if robot.night_light_mode_enabled else 0.0, + tags=base_tags, + timestamp=timestamp, + ) + ) + metrics.append( + Metric( + name=f"{prefix}.panel_lock_enabled", + value=1.0 if robot.panel_lock_enabled else 0.0, + tags=base_tags, + timestamp=timestamp, + ) + ) + + # Power status as metric + power_status_map = {"AC": 2.0, "DC": 1.0, "NC": 0.0} + metrics.append( + Metric( + name=f"{prefix}.power_status", + value=power_status_map.get(robot.power_status, 0.0), + tags=base_tags + [f"power_type:{robot.power_status}"], + timestamp=timestamp, + ) + ) + + # Litter Robot specific metrics + if isinstance(robot, LitterRobot): + metrics.extend(self._collect_litter_robot_metrics(robot, prefix, base_tags, timestamp)) + events.extend(self._check_litter_robot_events(robot, base_tags, timestamp)) + + # Litter Robot 4 specific metrics + if isinstance(robot, LitterRobot4): + metrics.extend(self._collect_lr4_metrics(robot, prefix, base_tags, timestamp)) + + # Feeder Robot specific metrics + if isinstance(robot, FeederRobot): + metrics.extend(self._collect_feeder_metrics(robot, prefix, base_tags, timestamp)) + events.extend(self._check_feeder_events(robot, base_tags, timestamp)) + + # Check for online/offline state changes + if self.config.collector.emit_events: + events.extend(self._check_online_status_change(robot, base_tags, timestamp)) + + # Update previous state + self._update_previous_state(robot) + + return RobotMetrics( + robot_id=robot.id, + robot_serial=robot.serial, + robot_name=robot.name, + robot_model=robot.model, + metrics=metrics, + events=events, + ) + + def _collect_litter_robot_metrics( + self, robot: LitterRobot, prefix: str, base_tags: list[str], timestamp: datetime + ) -> list[Metric]: + """Collect Litter Robot specific metrics.""" + metrics = [] + status_tags = base_tags + [f"status:{robot.status.value}"] + + metrics.append( + Metric( + name=f"{prefix}.waste_drawer_level", + value=float(robot.waste_drawer_level), + tags=status_tags, + timestamp=timestamp, + ) + ) + metrics.append( + Metric( + name=f"{prefix}.cycle_count", + value=float(robot.cycle_count), + tags=status_tags, + timestamp=timestamp, + ) + ) + metrics.append( + Metric( + name=f"{prefix}.cycle_capacity", + value=float(robot.cycle_capacity), + tags=status_tags, + timestamp=timestamp, + ) + ) + metrics.append( + Metric( + name=f"{prefix}.cycles_after_drawer_full", + value=float(robot.cycles_after_drawer_full), + tags=status_tags, + timestamp=timestamp, + ) + ) + metrics.append( + Metric( + name=f"{prefix}.is_sleeping", + value=1.0 if robot.is_sleeping else 0.0, + tags=status_tags, + timestamp=timestamp, + ) + ) + metrics.append( + Metric( + name=f"{prefix}.sleep_mode_enabled", + value=1.0 if robot.sleep_mode_enabled else 0.0, + tags=status_tags, + timestamp=timestamp, + ) + ) + metrics.append( + Metric( + name=f"{prefix}.waste_drawer_full", + value=1.0 if robot.is_waste_drawer_full else 0.0, + tags=status_tags, + timestamp=timestamp, + ) + ) + metrics.append( + Metric( + name=f"{prefix}.drawer_full_indicator", + value=1.0 if robot.is_drawer_full_indicator_triggered else 0.0, + tags=status_tags, + timestamp=timestamp, + ) + ) + metrics.append( + Metric( + name=f"{prefix}.wait_time_minutes", + value=float(robot.clean_cycle_wait_time_minutes), + tags=status_tags, + timestamp=timestamp, + ) + ) + + return metrics + + def _collect_lr4_metrics( + self, robot: LitterRobot4, prefix: str, base_tags: list[str], timestamp: datetime + ) -> list[Metric]: + """Collect Litter Robot 4 specific metrics.""" + metrics = [] + + metrics.append( + Metric( + name=f"{prefix}.litter_level", + value=float(robot.litter_level), + tags=base_tags, + timestamp=timestamp, + ) + ) + metrics.append( + Metric( + name=f"{prefix}.litter_level_calculated", + value=float(robot.litter_level_calculated), + tags=base_tags, + timestamp=timestamp, + ) + ) + metrics.append( + Metric( + name=f"{prefix}.pet_weight", + value=float(robot.pet_weight), + tags=base_tags, + timestamp=timestamp, + ) + ) + metrics.append( + Metric( + name=f"{prefix}.scoops_saved", + value=float(robot.scoops_saved_count), + tags=base_tags, + timestamp=timestamp, + ) + ) + metrics.append( + Metric( + name=f"{prefix}.night_light_brightness", + value=float(robot.night_light_brightness), + tags=base_tags, + timestamp=timestamp, + ) + ) + + # Litter level state as tags + if robot.litter_level_state: + metrics.append( + Metric( + name=f"{prefix}.litter_level_state", + value=1.0, + tags=base_tags + [f"litter_state:{robot.litter_level_state.value}"], + timestamp=timestamp, + ) + ) + + # Hopper status + if robot.hopper_status: + hopper_ok = robot.hopper_status.value == "ENABLED" + metrics.append( + Metric( + name=f"{prefix}.hopper_enabled", + value=1.0 if hopper_ok else 0.0, + tags=base_tags + [f"hopper_status:{robot.hopper_status.value}"], + timestamp=timestamp, + ) + ) + + # Raw data metrics if available + raw_data = robot.to_dict() + if "wifiRssi" in raw_data: + metrics.append( + Metric( + name=f"{prefix}.wifi_rssi", + value=float(raw_data.get("wifiRssi", 0)), + tags=base_tags, + timestamp=timestamp, + ) + ) + if "odometerPowerCycles" in raw_data: + metrics.append( + Metric( + name=f"{prefix}.odometer_power_cycles", + value=float(raw_data.get("odometerPowerCycles", 0)), + tags=base_tags, + timestamp=timestamp, + metric_type=MetricType.COUNT, + ) + ) + if "odometerEmptyCycles" in raw_data: + metrics.append( + Metric( + name=f"{prefix}.odometer_empty_cycles", + value=float(raw_data.get("odometerEmptyCycles", 0)), + tags=base_tags, + timestamp=timestamp, + metric_type=MetricType.COUNT, + ) + ) + + return metrics + + def _collect_feeder_metrics( + self, robot: FeederRobot, prefix: str, base_tags: list[str], timestamp: datetime + ) -> list[Metric]: + """Collect Feeder Robot specific metrics.""" + # Use feederrobot prefix for feeder-specific metrics + feeder_prefix = prefix.replace("litterrobot", "feederrobot") + metrics = [] + + metrics.append( + Metric( + name=f"{feeder_prefix}.food_level", + value=float(robot.food_level), + tags=base_tags, + timestamp=timestamp, + ) + ) + metrics.append( + Metric( + name=f"{feeder_prefix}.gravity_mode_enabled", + value=1.0 if robot.gravity_mode_enabled else 0.0, + tags=base_tags, + timestamp=timestamp, + ) + ) + metrics.append( + Metric( + name=f"{feeder_prefix}.meal_insert_size", + value=float(robot.meal_insert_size), + tags=base_tags, + timestamp=timestamp, + ) + ) + + return metrics + + def _check_online_status_change( + self, robot: Robot, base_tags: list[str], timestamp: datetime + ) -> list[Event]: + """Check for online/offline status changes and generate events.""" + events = [] + prev_state = self._previous_states.get(robot.id, {}) + prev_online = prev_state.get("is_online") + + if prev_online is not None and prev_online != robot.is_online: + if robot.is_online: + events.append( + Event( + title=f"{robot.name} is back online", + text=f"Robot {robot.name} ({robot.serial}) has come back online.", + tags=base_tags, + alert_type="success", + timestamp=timestamp, + ) + ) + else: + events.append( + Event( + title=f"{robot.name} went offline", + text=f"Robot {robot.name} ({robot.serial}) has gone offline.", + tags=base_tags, + alert_type="warning", + timestamp=timestamp, + ) + ) + + return events + + def _check_litter_robot_events( + self, robot: LitterRobot, base_tags: list[str], timestamp: datetime + ) -> list[Event]: + """Check for Litter Robot state changes and generate events.""" + if not self.config.collector.emit_events: + return [] + + events = [] + prev_state = self._previous_states.get(robot.id, {}) + + # Drawer full status change + prev_drawer_full = prev_state.get("is_waste_drawer_full") + if prev_drawer_full is not None and not prev_drawer_full and robot.is_waste_drawer_full: + events.append( + Event( + title=f"{robot.name} waste drawer is full", + text=f"The waste drawer on {robot.name} ({robot.serial}) needs to be emptied. " + f"Cycle count: {robot.cycle_count}", + tags=base_tags + [f"status:{robot.status.value}"], + alert_type="warning", + timestamp=timestamp, + ) + ) + + # Status change events for certain error states + prev_status = prev_state.get("status") + current_status = robot.status.value if hasattr(robot.status, "value") else str(robot.status) + + error_statuses = { + "CSF": "Cat Sensor Fault", + "DHF": "Dump + Home Position Fault", + "DPF": "Dump Position Fault", + "HPF": "Home Position Fault", + "OTF": "Over Torque Fault", + "PD": "Pinch Detect", + "SCF": "Cat Sensor Fault At Startup", + "SPF": "Pinch Detect At Startup", + } + + if prev_status != current_status and current_status in error_statuses: + events.append( + Event( + title=f"{robot.name} error: {error_statuses[current_status]}", + text=f"Robot {robot.name} ({robot.serial}) has encountered an error: " + f"{error_statuses[current_status]} (status code: {current_status})", + tags=base_tags + [f"status:{current_status}"], + alert_type="error", + timestamp=timestamp, + ) + ) + + return events + + def _check_feeder_events( + self, robot: FeederRobot, base_tags: list[str], timestamp: datetime + ) -> list[Event]: + """Check for Feeder Robot state changes and generate events.""" + if not self.config.collector.emit_events: + return [] + + events = [] + prev_state = self._previous_states.get(robot.id, {}) + + # Low food level warning + prev_food_level = prev_state.get("food_level") + if prev_food_level is not None: + # Alert when food drops below 20% + if prev_food_level >= 20 and robot.food_level < 20: + events.append( + Event( + title=f"{robot.name} food level low", + text=f"Food level on {robot.name} ({robot.serial}) is low: {robot.food_level}%", + tags=base_tags, + alert_type="warning", + timestamp=timestamp, + ) + ) + + return events + + def _update_previous_state(self, robot: Robot) -> None: + """Update the previous state for a robot.""" + state: dict[str, Any] = { + "is_online": robot.is_online, + } + + if isinstance(robot, LitterRobot): + state["is_waste_drawer_full"] = robot.is_waste_drawer_full + state["status"] = robot.status.value if hasattr(robot.status, "value") else str(robot.status) + + if isinstance(robot, FeederRobot): + state["food_level"] = robot.food_level + + self._previous_states[robot.id] = state + + def _collect_pet_metrics(self, pet, timestamp: datetime) -> PetMetrics: + """Collect metrics from a pet profile.""" + prefix = self.config.datadog.metric_prefix + pet_type_str = str(pet.pet_type) if pet.pet_type else "unknown" + base_tags = [ + f"pet_id:{pet.id}", + f"pet_name:{pet.name}", + f"pet_type:{pet_type_str}", + ] + metrics: list[Metric] = [] + + # Weight metrics + if pet.weight: + metrics.append( + Metric( + name=f"{prefix}.pet.weight", + value=float(pet.weight), + tags=base_tags, + timestamp=timestamp, + ) + ) + + if pet.estimated_weight: + metrics.append( + Metric( + name=f"{prefix}.pet.estimated_weight", + value=float(pet.estimated_weight), + tags=base_tags, + timestamp=timestamp, + ) + ) + + if pet.last_weight_reading: + metrics.append( + Metric( + name=f"{prefix}.pet.last_weight_reading", + value=float(pet.last_weight_reading), + tags=base_tags, + timestamp=timestamp, + ) + ) + + # Health status + metrics.append( + Metric( + name=f"{prefix}.pet.is_healthy", + value=1.0 if pet.is_healthy else 0.0, + tags=base_tags, + timestamp=timestamp, + ) + ) + + metrics.append( + Metric( + name=f"{prefix}.pet.is_active", + value=1.0 if pet.is_active else 0.0, + tags=base_tags, + timestamp=timestamp, + ) + ) + + # Age if available + if pet.age: + metrics.append( + Metric( + name=f"{prefix}.pet.age", + value=float(pet.age), + tags=base_tags, + timestamp=timestamp, + ) + ) + + return PetMetrics( + pet_id=pet.id, + pet_name=pet.name, + pet_type=pet_type_str, + metrics=metrics, + events=[], + ) diff --git a/src/datacat/config.py b/src/datacat/config.py new file mode 100644 index 0000000..6e22b6b --- /dev/null +++ b/src/datacat/config.py @@ -0,0 +1,157 @@ +"""Configuration loading from environment variables and config file.""" + +import logging +import os +from dataclasses import dataclass, field +from pathlib import Path + +import yaml +from dotenv import load_dotenv + +logger = logging.getLogger(__name__) + + +@dataclass +class WhiskerConfig: + """Whisker/Litter Robot API configuration.""" + + username: str + password: str + + +@dataclass +class DatadogConfig: + """Datadog configuration.""" + + api_key: str + app_key: str | None = None + site: str = "datadoghq.com" + metric_prefix: str = "litterrobot" + + +@dataclass +class CollectorConfig: + """Collector behavior configuration.""" + + poll_interval_seconds: int = 120 + include_pets: bool = True + emit_events: bool = True + + +@dataclass +class Config: + """Main application configuration.""" + + whisker: WhiskerConfig + datadog: DatadogConfig + collector: CollectorConfig = field(default_factory=CollectorConfig) + + +def load_config(config_path: str | Path | None = None) -> Config: + """Load configuration from environment variables and optional config file. + + Environment variables take precedence over config file values. + + Environment variables: + WHISKER_USERNAME: Whisker account email + WHISKER_PASSWORD: Whisker account password + DATADOG_API_KEY: Datadog API key + DATADOG_APP_KEY: Datadog application key (optional) + DATADOG_SITE: Datadog site (default: datadoghq.com) + DATACAT_POLL_INTERVAL: Polling interval in seconds (default: 120) + DATACAT_INCLUDE_PETS: Include pet metrics (default: true) + DATACAT_EMIT_EVENTS: Emit Datadog events (default: true) + DATACAT_CONFIG_FILE: Path to config file (optional) + """ + # Load .env file if present + load_dotenv() + + # Determine config file path + if config_path is None: + config_path = os.getenv("DATACAT_CONFIG_FILE") + + # Load config file if specified + file_config: dict = {} + if config_path: + config_file = Path(config_path) + if config_file.exists(): + logger.info("Loading configuration from %s", config_file) + with open(config_file) as f: + file_config = yaml.safe_load(f) or {} + else: + logger.warning("Config file not found: %s", config_file) + + # Helper to get value with env var precedence + def get_value(env_var: str, file_path: list[str], default=None): + """Get config value with environment variable taking precedence.""" + env_value = os.getenv(env_var) + if env_value is not None: + return env_value + + # Navigate nested dict + value = file_config + for key in file_path: + if isinstance(value, dict): + value = value.get(key) + else: + value = None + break + + return value if value is not None else default + + def get_bool(env_var: str, file_path: list[str], default: bool) -> bool: + """Get boolean config value.""" + value = get_value(env_var, file_path, default) + if isinstance(value, bool): + return value + if isinstance(value, str): + return value.lower() in ("true", "1", "yes") + return default + + def get_int(env_var: str, file_path: list[str], default: int) -> int: + """Get integer config value.""" + value = get_value(env_var, file_path, default) + if isinstance(value, int): + return value + if isinstance(value, str): + return int(value) + return default + + # Build configuration + whisker_username = get_value("WHISKER_USERNAME", ["whisker", "username"]) + whisker_password = get_value("WHISKER_PASSWORD", ["whisker", "password"]) + + if not whisker_username or not whisker_password: + raise ValueError( + "Whisker credentials required. Set WHISKER_USERNAME and WHISKER_PASSWORD " + "environment variables or provide them in config file." + ) + + datadog_api_key = get_value("DATADOG_API_KEY", ["datadog", "api_key"]) + if not datadog_api_key: + raise ValueError( + "Datadog API key required. Set DATADOG_API_KEY environment variable " + "or provide it in config file." + ) + + return Config( + whisker=WhiskerConfig( + username=whisker_username, + password=whisker_password, + ), + datadog=DatadogConfig( + api_key=datadog_api_key, + app_key=get_value("DATADOG_APP_KEY", ["datadog", "app_key"]), + site=get_value("DATADOG_SITE", ["datadog", "site"], "datadoghq.com"), + metric_prefix=get_value( + "DATACAT_METRIC_PREFIX", ["datadog", "metric_prefix"], "litterrobot" + ), + ), + collector=CollectorConfig( + poll_interval_seconds=get_int( + "DATACAT_POLL_INTERVAL", ["collector", "poll_interval_seconds"], 120 + ), + include_pets=get_bool("DATACAT_INCLUDE_PETS", ["collector", "include_pets"], True), + emit_events=get_bool("DATACAT_EMIT_EVENTS", ["collector", "emit_events"], True), + ), + ) diff --git a/src/datacat/main.py b/src/datacat/main.py new file mode 100644 index 0000000..8ea7dac --- /dev/null +++ b/src/datacat/main.py @@ -0,0 +1,167 @@ +"""Main application entry point.""" + +import argparse +import asyncio +import logging +import signal +import sys +from pathlib import Path + +from datacat.collector import Collector +from datacat.config import load_config +from datacat.metrics import MetricsClient + +logger = logging.getLogger(__name__) + + +class Datacat: + """Main application class for Datacat.""" + + def __init__(self, config_path: str | Path | None = None): + """Initialize Datacat application.""" + self.config = load_config(config_path) + self.collector = Collector(self.config) + self.metrics_client = MetricsClient(self.config.datadog) + self._shutdown_event = asyncio.Event() + + async def run(self) -> None: + """Run the main collection loop.""" + logger.info("Starting Datacat...") + logger.info( + "Poll interval: %d seconds, Include pets: %s, Emit events: %s", + self.config.collector.poll_interval_seconds, + self.config.collector.include_pets, + self.config.collector.emit_events, + ) + + try: + await self.collector.connect() + + while not self._shutdown_event.is_set(): + try: + await self._collect_and_submit() + except Exception as e: + logger.error("Collection cycle failed: %s", e) + + # Wait for next collection cycle or shutdown + try: + await asyncio.wait_for( + self._shutdown_event.wait(), + timeout=self.config.collector.poll_interval_seconds, + ) + except asyncio.TimeoutError: + pass # Normal timeout, continue to next cycle + + finally: + await self.collector.disconnect() + self.metrics_client.close() + logger.info("Datacat stopped.") + + async def _collect_and_submit(self) -> None: + """Collect metrics and submit to Datadog.""" + logger.debug("Starting collection cycle...") + + result = await self.collector.collect() + + if result.errors: + for error in result.errors: + logger.warning("Collection error: %s", error) + + metrics_count, events_count, errors_count = self.metrics_client.submit(result) + + logger.info( + "Collection complete: %d metrics, %d events submitted (%d errors)", + metrics_count, + events_count, + errors_count, + ) + + def shutdown(self) -> None: + """Signal the application to shut down.""" + logger.info("Shutdown requested...") + self._shutdown_event.set() + + +def setup_logging(verbose: bool = False) -> None: + """Configure logging.""" + level = logging.DEBUG if verbose else logging.INFO + logging.basicConfig( + level=level, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], + ) + + # Reduce noise from third-party libraries + logging.getLogger("aiohttp").setLevel(logging.WARNING) + logging.getLogger("urllib3").setLevel(logging.WARNING) + logging.getLogger("botocore").setLevel(logging.WARNING) + + +def parse_args() -> argparse.Namespace: + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + prog="datacat", + description="Collect metrics from Whisker Litter Robot devices and submit to Datadog", + ) + parser.add_argument( + "-c", + "--config", + type=str, + help="Path to configuration file (YAML)", + ) + parser.add_argument( + "-v", + "--verbose", + action="store_true", + help="Enable verbose logging", + ) + parser.add_argument( + "--once", + action="store_true", + help="Run a single collection cycle and exit", + ) + return parser.parse_args() + + +async def async_main(args: argparse.Namespace) -> None: + """Async main function.""" + app = Datacat(config_path=args.config) + + # Set up signal handlers + loop = asyncio.get_running_loop() + + def signal_handler(): + app.shutdown() + + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, signal_handler) + + if args.once: + # Single collection mode + try: + await app.collector.connect() + await app._collect_and_submit() + finally: + await app.collector.disconnect() + app.metrics_client.close() + else: + # Continuous collection mode + await app.run() + + +def main() -> None: + """Main entry point.""" + args = parse_args() + setup_logging(verbose=args.verbose) + + try: + asyncio.run(async_main(args)) + except KeyboardInterrupt: + logger.info("Interrupted by user") + except Exception as e: + logger.exception("Fatal error: %s", e) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/src/datacat/metrics.py b/src/datacat/metrics.py new file mode 100644 index 0000000..6c3247b --- /dev/null +++ b/src/datacat/metrics.py @@ -0,0 +1,137 @@ +"""Datadog metrics submission module.""" + +import logging +import time + +from datadog import DogStatsd, initialize +from datadog.api import Event as DDEvent + +from datacat.config import DatadogConfig +from datacat.models import CollectionResult, Event, Metric, MetricType + +logger = logging.getLogger(__name__) + + +class MetricsClient: + """Client for submitting metrics and events to Datadog.""" + + def __init__(self, config: DatadogConfig): + """Initialize the Datadog metrics client.""" + self.config = config + self._statsd: DogStatsd | None = None + + # Initialize the Datadog API client for events + initialize( + api_key=config.api_key, + app_key=config.app_key, + api_host=f"https://api.{config.site}", + ) + + def _get_statsd(self) -> DogStatsd: + """Get or create the DogStatsd client.""" + if self._statsd is None: + self._statsd = DogStatsd( + constant_tags=[], + telemetry_min_flush_interval=0, + ) + return self._statsd + + def submit(self, result: CollectionResult) -> tuple[int, int, int]: + """Submit collection results to Datadog. + + Returns: + Tuple of (metrics_submitted, events_submitted, errors) + """ + metrics_count = 0 + events_count = 0 + errors_count = 0 + + # Submit robot metrics + for robot in result.robots: + for metric in robot.metrics: + try: + self._submit_metric(metric) + metrics_count += 1 + except Exception as e: + logger.error("Failed to submit metric %s: %s", metric.name, e) + errors_count += 1 + + for event in robot.events: + try: + self._submit_event(event) + events_count += 1 + except Exception as e: + logger.error("Failed to submit event %s: %s", event.title, e) + errors_count += 1 + + # Submit pet metrics + for pet in result.pets: + for metric in pet.metrics: + try: + self._submit_metric(metric) + metrics_count += 1 + except Exception as e: + logger.error("Failed to submit metric %s: %s", metric.name, e) + errors_count += 1 + + for event in pet.events: + try: + self._submit_event(event) + events_count += 1 + except Exception as e: + logger.error("Failed to submit event %s: %s", event.title, e) + errors_count += 1 + + # Flush any buffered metrics + try: + statsd = self._get_statsd() + statsd.flush() + except Exception as e: + logger.warning("Failed to flush metrics: %s", e) + + return metrics_count, events_count, errors_count + + def _submit_metric(self, metric: Metric) -> None: + """Submit a single metric to Datadog.""" + statsd = self._get_statsd() + timestamp = metric.timestamp.timestamp() if metric.timestamp else time.time() + + logger.debug( + "Submitting metric: %s = %s (tags: %s)", + metric.name, + metric.value, + metric.tags, + ) + + match metric.metric_type: + case MetricType.GAUGE: + statsd.gauge(metric.name, metric.value, tags=metric.tags) + case MetricType.COUNT: + statsd.increment(metric.name, metric.value, tags=metric.tags) + case MetricType.RATE: + statsd.increment(metric.name, metric.value, tags=metric.tags) + + def _submit_event(self, event: Event) -> None: + """Submit a single event to Datadog.""" + logger.debug("Submitting event: %s", event.title) + + timestamp = int(event.timestamp.timestamp()) if event.timestamp else int(time.time()) + + DDEvent.create( + title=event.title, + text=event.text, + tags=event.tags, + alert_type=event.alert_type, + date_happened=timestamp, + source_type_name="datacat", + ) + + def close(self) -> None: + """Close the metrics client.""" + if self._statsd: + try: + self._statsd.flush() + self._statsd.close() + except Exception as e: + logger.warning("Error closing DogStatsd: %s", e) + self._statsd = None diff --git a/src/datacat/models.py b/src/datacat/models.py new file mode 100644 index 0000000..1de33a0 --- /dev/null +++ b/src/datacat/models.py @@ -0,0 +1,68 @@ +"""Data models for metrics collection.""" + +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum + + +class MetricType(Enum): + """Datadog metric types.""" + + GAUGE = "gauge" + COUNT = "count" + RATE = "rate" + + +@dataclass +class Metric: + """A single metric to be submitted to Datadog.""" + + name: str + value: float + metric_type: MetricType = MetricType.GAUGE + tags: list[str] = field(default_factory=list) + timestamp: datetime | None = None + + +@dataclass +class Event: + """A Datadog event.""" + + title: str + text: str + tags: list[str] = field(default_factory=list) + alert_type: str = "info" # info, warning, error, success + timestamp: datetime | None = None + + +@dataclass +class RobotMetrics: + """Collection of metrics from a single robot.""" + + robot_id: str + robot_serial: str + robot_name: str + robot_model: str + metrics: list[Metric] = field(default_factory=list) + events: list[Event] = field(default_factory=list) + + +@dataclass +class PetMetrics: + """Collection of metrics from a pet profile.""" + + pet_id: str + pet_name: str + pet_type: str | None + metrics: list[Metric] = field(default_factory=list) + events: list[Event] = field(default_factory=list) + + +@dataclass +class CollectionResult: + """Result of a metrics collection cycle.""" + + timestamp: datetime + robots: list[RobotMetrics] = field(default_factory=list) + pets: list[PetMetrics] = field(default_factory=list) + errors: list[str] = field(default_factory=list)