feat: initial implementation of Datacat

- Add project structure with UV/hatch build system
- Implement config module (env vars + YAML file support)
- Implement collector module for pylitterbot integration
- Implement metrics module for Datadog submission
- Support LR3, LR4, and Feeder Robot metrics
- Add event emission for state changes
- Add CLI with --once mode for single collection
This commit is contained in:
Tanishq Dubey
2025-12-16 11:49:43 -05:00
commit 3b3f3df53c
13 changed files with 1296 additions and 0 deletions

20
.env.example Normal file
View File

@@ -0,0 +1,20 @@
# Datacat Environment Variables
# Copy this file to .env and update with your credentials
# Whisker/Litter Robot credentials
WHISKER_USERNAME=your-email@example.com
WHISKER_PASSWORD=your-password
# Datadog credentials
DATADOG_API_KEY=your-datadog-api-key
DATADOG_APP_KEY=your-datadog-app-key
DATADOG_SITE=datadoghq.com
# Collector settings
DATACAT_POLL_INTERVAL=120
DATACAT_INCLUDE_PETS=true
DATACAT_EMIT_EVENTS=true
DATACAT_METRIC_PREFIX=litterrobot
# Optional: Path to config file (if using YAML config instead of env vars)
# DATACAT_CONFIG_FILE=/path/to/config.yaml

81
.gitignore vendored Normal file
View File

@@ -0,0 +1,81 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# IDE
.idea/
.vscode/
*.swp
*.swo
*~
# Config files with secrets
config.yaml
config.yml
# UV
.uv/
uv.lock
# macOS
.DS_Store
# Datacat specific
*.log

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.12

0
README.md Normal file
View File

20
config.example.yaml Normal file
View File

@@ -0,0 +1,20 @@
# Datacat Configuration Example
# Copy this file to config.yaml and update with your credentials
# Whisker/Litter Robot credentials
whisker:
username: "your-email@example.com"
password: "your-password"
# Datadog configuration
datadog:
api_key: "your-datadog-api-key"
app_key: "your-datadog-app-key" # Optional, required for some features
site: "datadoghq.com" # Use "datadoghq.eu" for EU region
metric_prefix: "litterrobot" # Prefix for all metrics
# Collector settings
collector:
poll_interval_seconds: 120 # How often to collect metrics (default: 2 minutes)
include_pets: true # Include pet profile metrics
emit_events: true # Emit Datadog events for state changes

29
pyproject.toml Normal file
View File

@@ -0,0 +1,29 @@
[project]
name = "datacat"
version = "0.1.0"
description = "Collect metrics from Whisker Litter Robot devices and submit to Datadog"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"pylitterbot>=2025.0.0",
"datadog>=0.50.0",
"pyyaml>=6.0",
"python-dotenv>=1.0.0",
]
[project.scripts]
datacat = "datacat:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/datacat"]
[tool.ruff]
line-length = 100
target-version = "py312"
[tool.ruff.lint]
select = ["E", "F", "I", "N", "W", "UP"]

6
src/datacat/__init__.py Normal file
View File

@@ -0,0 +1,6 @@
"""Datacat - Litter Robot metrics collector for Datadog."""
from datacat.main import main
__version__ = "0.1.0"
__all__ = ["main"]

6
src/datacat/__main__.py Normal file
View File

@@ -0,0 +1,6 @@
"""Allow running as python -m datacat."""
from datacat.main import main
if __name__ == "__main__":
main()

604
src/datacat/collector.py Normal file
View File

@@ -0,0 +1,604 @@
"""Collector module for fetching data from Whisker/Litter Robot API."""
import logging
from datetime import datetime, timezone
from typing import Any
from pylitterbot import Account
from pylitterbot.robot import Robot
from pylitterbot.robot.feederrobot import FeederRobot
from pylitterbot.robot.litterrobot import LitterRobot
from pylitterbot.robot.litterrobot3 import LitterRobot3
from pylitterbot.robot.litterrobot4 import LitterRobot4
from datacat.config import Config
from datacat.models import (
CollectionResult,
Event,
Metric,
MetricType,
PetMetrics,
RobotMetrics,
)
logger = logging.getLogger(__name__)
class Collector:
"""Collects metrics from Whisker Litter Robot devices."""
def __init__(self, config: Config):
"""Initialize the collector with configuration."""
self.config = config
self._account: Account | None = None
self._previous_states: dict[str, dict[str, Any]] = {}
async def connect(self) -> None:
"""Connect to the Whisker API."""
logger.info("Connecting to Whisker API...")
self._account = Account()
await self._account.connect(
username=self.config.whisker.username,
password=self.config.whisker.password,
load_robots=True,
load_pets=self.config.collector.include_pets,
)
logger.info(
"Connected. Found %d robots and %d pets.",
len(self._account.robots),
len(self._account.pets) if self.config.collector.include_pets else 0,
)
async def disconnect(self) -> None:
"""Disconnect from the Whisker API."""
if self._account:
logger.info("Disconnecting from Whisker API...")
await self._account.disconnect()
self._account = None
async def collect(self) -> CollectionResult:
"""Collect metrics from all robots and pets."""
if not self._account:
raise RuntimeError("Not connected. Call connect() first.")
timestamp = datetime.now(timezone.utc)
result = CollectionResult(timestamp=timestamp)
# Refresh robot data
try:
await self._account.refresh_robots()
except Exception as e:
logger.error("Failed to refresh robots: %s", e)
result.errors.append(f"Failed to refresh robots: {e}")
# Collect robot metrics
for robot in self._account.robots:
try:
robot_metrics = self._collect_robot_metrics(robot, timestamp)
result.robots.append(robot_metrics)
except Exception as e:
logger.error("Failed to collect metrics for robot %s: %s", robot.name, e)
result.errors.append(f"Failed to collect metrics for robot {robot.name}: {e}")
# Collect pet metrics
if self.config.collector.include_pets:
for pet in self._account.pets:
try:
pet_metrics = self._collect_pet_metrics(pet, timestamp)
result.pets.append(pet_metrics)
except Exception as e:
logger.error("Failed to collect metrics for pet %s: %s", pet.name, e)
result.errors.append(f"Failed to collect metrics for pet {pet.name}: {e}")
return result
def _get_base_tags(self, robot: Robot) -> list[str]:
"""Get base tags for a robot."""
return [
f"robot_id:{robot.id}",
f"robot_serial:{robot.serial}",
f"robot_name:{robot.name}",
f"robot_model:{robot.model}",
]
def _collect_robot_metrics(self, robot: Robot, timestamp: datetime) -> RobotMetrics:
"""Collect metrics from a single robot."""
prefix = self.config.datadog.metric_prefix
base_tags = self._get_base_tags(robot)
metrics: list[Metric] = []
events: list[Event] = []
# Common metrics for all robots
metrics.append(
Metric(
name=f"{prefix}.online",
value=1.0 if robot.is_online else 0.0,
tags=base_tags,
timestamp=timestamp,
)
)
metrics.append(
Metric(
name=f"{prefix}.night_light_enabled",
value=1.0 if robot.night_light_mode_enabled else 0.0,
tags=base_tags,
timestamp=timestamp,
)
)
metrics.append(
Metric(
name=f"{prefix}.panel_lock_enabled",
value=1.0 if robot.panel_lock_enabled else 0.0,
tags=base_tags,
timestamp=timestamp,
)
)
# Power status as metric
power_status_map = {"AC": 2.0, "DC": 1.0, "NC": 0.0}
metrics.append(
Metric(
name=f"{prefix}.power_status",
value=power_status_map.get(robot.power_status, 0.0),
tags=base_tags + [f"power_type:{robot.power_status}"],
timestamp=timestamp,
)
)
# Litter Robot specific metrics
if isinstance(robot, LitterRobot):
metrics.extend(self._collect_litter_robot_metrics(robot, prefix, base_tags, timestamp))
events.extend(self._check_litter_robot_events(robot, base_tags, timestamp))
# Litter Robot 4 specific metrics
if isinstance(robot, LitterRobot4):
metrics.extend(self._collect_lr4_metrics(robot, prefix, base_tags, timestamp))
# Feeder Robot specific metrics
if isinstance(robot, FeederRobot):
metrics.extend(self._collect_feeder_metrics(robot, prefix, base_tags, timestamp))
events.extend(self._check_feeder_events(robot, base_tags, timestamp))
# Check for online/offline state changes
if self.config.collector.emit_events:
events.extend(self._check_online_status_change(robot, base_tags, timestamp))
# Update previous state
self._update_previous_state(robot)
return RobotMetrics(
robot_id=robot.id,
robot_serial=robot.serial,
robot_name=robot.name,
robot_model=robot.model,
metrics=metrics,
events=events,
)
def _collect_litter_robot_metrics(
self, robot: LitterRobot, prefix: str, base_tags: list[str], timestamp: datetime
) -> list[Metric]:
"""Collect Litter Robot specific metrics."""
metrics = []
status_tags = base_tags + [f"status:{robot.status.value}"]
metrics.append(
Metric(
name=f"{prefix}.waste_drawer_level",
value=float(robot.waste_drawer_level),
tags=status_tags,
timestamp=timestamp,
)
)
metrics.append(
Metric(
name=f"{prefix}.cycle_count",
value=float(robot.cycle_count),
tags=status_tags,
timestamp=timestamp,
)
)
metrics.append(
Metric(
name=f"{prefix}.cycle_capacity",
value=float(robot.cycle_capacity),
tags=status_tags,
timestamp=timestamp,
)
)
metrics.append(
Metric(
name=f"{prefix}.cycles_after_drawer_full",
value=float(robot.cycles_after_drawer_full),
tags=status_tags,
timestamp=timestamp,
)
)
metrics.append(
Metric(
name=f"{prefix}.is_sleeping",
value=1.0 if robot.is_sleeping else 0.0,
tags=status_tags,
timestamp=timestamp,
)
)
metrics.append(
Metric(
name=f"{prefix}.sleep_mode_enabled",
value=1.0 if robot.sleep_mode_enabled else 0.0,
tags=status_tags,
timestamp=timestamp,
)
)
metrics.append(
Metric(
name=f"{prefix}.waste_drawer_full",
value=1.0 if robot.is_waste_drawer_full else 0.0,
tags=status_tags,
timestamp=timestamp,
)
)
metrics.append(
Metric(
name=f"{prefix}.drawer_full_indicator",
value=1.0 if robot.is_drawer_full_indicator_triggered else 0.0,
tags=status_tags,
timestamp=timestamp,
)
)
metrics.append(
Metric(
name=f"{prefix}.wait_time_minutes",
value=float(robot.clean_cycle_wait_time_minutes),
tags=status_tags,
timestamp=timestamp,
)
)
return metrics
def _collect_lr4_metrics(
self, robot: LitterRobot4, prefix: str, base_tags: list[str], timestamp: datetime
) -> list[Metric]:
"""Collect Litter Robot 4 specific metrics."""
metrics = []
metrics.append(
Metric(
name=f"{prefix}.litter_level",
value=float(robot.litter_level),
tags=base_tags,
timestamp=timestamp,
)
)
metrics.append(
Metric(
name=f"{prefix}.litter_level_calculated",
value=float(robot.litter_level_calculated),
tags=base_tags,
timestamp=timestamp,
)
)
metrics.append(
Metric(
name=f"{prefix}.pet_weight",
value=float(robot.pet_weight),
tags=base_tags,
timestamp=timestamp,
)
)
metrics.append(
Metric(
name=f"{prefix}.scoops_saved",
value=float(robot.scoops_saved_count),
tags=base_tags,
timestamp=timestamp,
)
)
metrics.append(
Metric(
name=f"{prefix}.night_light_brightness",
value=float(robot.night_light_brightness),
tags=base_tags,
timestamp=timestamp,
)
)
# Litter level state as tags
if robot.litter_level_state:
metrics.append(
Metric(
name=f"{prefix}.litter_level_state",
value=1.0,
tags=base_tags + [f"litter_state:{robot.litter_level_state.value}"],
timestamp=timestamp,
)
)
# Hopper status
if robot.hopper_status:
hopper_ok = robot.hopper_status.value == "ENABLED"
metrics.append(
Metric(
name=f"{prefix}.hopper_enabled",
value=1.0 if hopper_ok else 0.0,
tags=base_tags + [f"hopper_status:{robot.hopper_status.value}"],
timestamp=timestamp,
)
)
# Raw data metrics if available
raw_data = robot.to_dict()
if "wifiRssi" in raw_data:
metrics.append(
Metric(
name=f"{prefix}.wifi_rssi",
value=float(raw_data.get("wifiRssi", 0)),
tags=base_tags,
timestamp=timestamp,
)
)
if "odometerPowerCycles" in raw_data:
metrics.append(
Metric(
name=f"{prefix}.odometer_power_cycles",
value=float(raw_data.get("odometerPowerCycles", 0)),
tags=base_tags,
timestamp=timestamp,
metric_type=MetricType.COUNT,
)
)
if "odometerEmptyCycles" in raw_data:
metrics.append(
Metric(
name=f"{prefix}.odometer_empty_cycles",
value=float(raw_data.get("odometerEmptyCycles", 0)),
tags=base_tags,
timestamp=timestamp,
metric_type=MetricType.COUNT,
)
)
return metrics
def _collect_feeder_metrics(
self, robot: FeederRobot, prefix: str, base_tags: list[str], timestamp: datetime
) -> list[Metric]:
"""Collect Feeder Robot specific metrics."""
# Use feederrobot prefix for feeder-specific metrics
feeder_prefix = prefix.replace("litterrobot", "feederrobot")
metrics = []
metrics.append(
Metric(
name=f"{feeder_prefix}.food_level",
value=float(robot.food_level),
tags=base_tags,
timestamp=timestamp,
)
)
metrics.append(
Metric(
name=f"{feeder_prefix}.gravity_mode_enabled",
value=1.0 if robot.gravity_mode_enabled else 0.0,
tags=base_tags,
timestamp=timestamp,
)
)
metrics.append(
Metric(
name=f"{feeder_prefix}.meal_insert_size",
value=float(robot.meal_insert_size),
tags=base_tags,
timestamp=timestamp,
)
)
return metrics
def _check_online_status_change(
self, robot: Robot, base_tags: list[str], timestamp: datetime
) -> list[Event]:
"""Check for online/offline status changes and generate events."""
events = []
prev_state = self._previous_states.get(robot.id, {})
prev_online = prev_state.get("is_online")
if prev_online is not None and prev_online != robot.is_online:
if robot.is_online:
events.append(
Event(
title=f"{robot.name} is back online",
text=f"Robot {robot.name} ({robot.serial}) has come back online.",
tags=base_tags,
alert_type="success",
timestamp=timestamp,
)
)
else:
events.append(
Event(
title=f"{robot.name} went offline",
text=f"Robot {robot.name} ({robot.serial}) has gone offline.",
tags=base_tags,
alert_type="warning",
timestamp=timestamp,
)
)
return events
def _check_litter_robot_events(
self, robot: LitterRobot, base_tags: list[str], timestamp: datetime
) -> list[Event]:
"""Check for Litter Robot state changes and generate events."""
if not self.config.collector.emit_events:
return []
events = []
prev_state = self._previous_states.get(robot.id, {})
# Drawer full status change
prev_drawer_full = prev_state.get("is_waste_drawer_full")
if prev_drawer_full is not None and not prev_drawer_full and robot.is_waste_drawer_full:
events.append(
Event(
title=f"{robot.name} waste drawer is full",
text=f"The waste drawer on {robot.name} ({robot.serial}) needs to be emptied. "
f"Cycle count: {robot.cycle_count}",
tags=base_tags + [f"status:{robot.status.value}"],
alert_type="warning",
timestamp=timestamp,
)
)
# Status change events for certain error states
prev_status = prev_state.get("status")
current_status = robot.status.value if hasattr(robot.status, "value") else str(robot.status)
error_statuses = {
"CSF": "Cat Sensor Fault",
"DHF": "Dump + Home Position Fault",
"DPF": "Dump Position Fault",
"HPF": "Home Position Fault",
"OTF": "Over Torque Fault",
"PD": "Pinch Detect",
"SCF": "Cat Sensor Fault At Startup",
"SPF": "Pinch Detect At Startup",
}
if prev_status != current_status and current_status in error_statuses:
events.append(
Event(
title=f"{robot.name} error: {error_statuses[current_status]}",
text=f"Robot {robot.name} ({robot.serial}) has encountered an error: "
f"{error_statuses[current_status]} (status code: {current_status})",
tags=base_tags + [f"status:{current_status}"],
alert_type="error",
timestamp=timestamp,
)
)
return events
def _check_feeder_events(
self, robot: FeederRobot, base_tags: list[str], timestamp: datetime
) -> list[Event]:
"""Check for Feeder Robot state changes and generate events."""
if not self.config.collector.emit_events:
return []
events = []
prev_state = self._previous_states.get(robot.id, {})
# Low food level warning
prev_food_level = prev_state.get("food_level")
if prev_food_level is not None:
# Alert when food drops below 20%
if prev_food_level >= 20 and robot.food_level < 20:
events.append(
Event(
title=f"{robot.name} food level low",
text=f"Food level on {robot.name} ({robot.serial}) is low: {robot.food_level}%",
tags=base_tags,
alert_type="warning",
timestamp=timestamp,
)
)
return events
def _update_previous_state(self, robot: Robot) -> None:
"""Update the previous state for a robot."""
state: dict[str, Any] = {
"is_online": robot.is_online,
}
if isinstance(robot, LitterRobot):
state["is_waste_drawer_full"] = robot.is_waste_drawer_full
state["status"] = robot.status.value if hasattr(robot.status, "value") else str(robot.status)
if isinstance(robot, FeederRobot):
state["food_level"] = robot.food_level
self._previous_states[robot.id] = state
def _collect_pet_metrics(self, pet, timestamp: datetime) -> PetMetrics:
"""Collect metrics from a pet profile."""
prefix = self.config.datadog.metric_prefix
pet_type_str = str(pet.pet_type) if pet.pet_type else "unknown"
base_tags = [
f"pet_id:{pet.id}",
f"pet_name:{pet.name}",
f"pet_type:{pet_type_str}",
]
metrics: list[Metric] = []
# Weight metrics
if pet.weight:
metrics.append(
Metric(
name=f"{prefix}.pet.weight",
value=float(pet.weight),
tags=base_tags,
timestamp=timestamp,
)
)
if pet.estimated_weight:
metrics.append(
Metric(
name=f"{prefix}.pet.estimated_weight",
value=float(pet.estimated_weight),
tags=base_tags,
timestamp=timestamp,
)
)
if pet.last_weight_reading:
metrics.append(
Metric(
name=f"{prefix}.pet.last_weight_reading",
value=float(pet.last_weight_reading),
tags=base_tags,
timestamp=timestamp,
)
)
# Health status
metrics.append(
Metric(
name=f"{prefix}.pet.is_healthy",
value=1.0 if pet.is_healthy else 0.0,
tags=base_tags,
timestamp=timestamp,
)
)
metrics.append(
Metric(
name=f"{prefix}.pet.is_active",
value=1.0 if pet.is_active else 0.0,
tags=base_tags,
timestamp=timestamp,
)
)
# Age if available
if pet.age:
metrics.append(
Metric(
name=f"{prefix}.pet.age",
value=float(pet.age),
tags=base_tags,
timestamp=timestamp,
)
)
return PetMetrics(
pet_id=pet.id,
pet_name=pet.name,
pet_type=pet_type_str,
metrics=metrics,
events=[],
)

157
src/datacat/config.py Normal file
View File

@@ -0,0 +1,157 @@
"""Configuration loading from environment variables and config file."""
import logging
import os
from dataclasses import dataclass, field
from pathlib import Path
import yaml
from dotenv import load_dotenv
logger = logging.getLogger(__name__)
@dataclass
class WhiskerConfig:
"""Whisker/Litter Robot API configuration."""
username: str
password: str
@dataclass
class DatadogConfig:
"""Datadog configuration."""
api_key: str
app_key: str | None = None
site: str = "datadoghq.com"
metric_prefix: str = "litterrobot"
@dataclass
class CollectorConfig:
"""Collector behavior configuration."""
poll_interval_seconds: int = 120
include_pets: bool = True
emit_events: bool = True
@dataclass
class Config:
"""Main application configuration."""
whisker: WhiskerConfig
datadog: DatadogConfig
collector: CollectorConfig = field(default_factory=CollectorConfig)
def load_config(config_path: str | Path | None = None) -> Config:
"""Load configuration from environment variables and optional config file.
Environment variables take precedence over config file values.
Environment variables:
WHISKER_USERNAME: Whisker account email
WHISKER_PASSWORD: Whisker account password
DATADOG_API_KEY: Datadog API key
DATADOG_APP_KEY: Datadog application key (optional)
DATADOG_SITE: Datadog site (default: datadoghq.com)
DATACAT_POLL_INTERVAL: Polling interval in seconds (default: 120)
DATACAT_INCLUDE_PETS: Include pet metrics (default: true)
DATACAT_EMIT_EVENTS: Emit Datadog events (default: true)
DATACAT_CONFIG_FILE: Path to config file (optional)
"""
# Load .env file if present
load_dotenv()
# Determine config file path
if config_path is None:
config_path = os.getenv("DATACAT_CONFIG_FILE")
# Load config file if specified
file_config: dict = {}
if config_path:
config_file = Path(config_path)
if config_file.exists():
logger.info("Loading configuration from %s", config_file)
with open(config_file) as f:
file_config = yaml.safe_load(f) or {}
else:
logger.warning("Config file not found: %s", config_file)
# Helper to get value with env var precedence
def get_value(env_var: str, file_path: list[str], default=None):
"""Get config value with environment variable taking precedence."""
env_value = os.getenv(env_var)
if env_value is not None:
return env_value
# Navigate nested dict
value = file_config
for key in file_path:
if isinstance(value, dict):
value = value.get(key)
else:
value = None
break
return value if value is not None else default
def get_bool(env_var: str, file_path: list[str], default: bool) -> bool:
"""Get boolean config value."""
value = get_value(env_var, file_path, default)
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.lower() in ("true", "1", "yes")
return default
def get_int(env_var: str, file_path: list[str], default: int) -> int:
"""Get integer config value."""
value = get_value(env_var, file_path, default)
if isinstance(value, int):
return value
if isinstance(value, str):
return int(value)
return default
# Build configuration
whisker_username = get_value("WHISKER_USERNAME", ["whisker", "username"])
whisker_password = get_value("WHISKER_PASSWORD", ["whisker", "password"])
if not whisker_username or not whisker_password:
raise ValueError(
"Whisker credentials required. Set WHISKER_USERNAME and WHISKER_PASSWORD "
"environment variables or provide them in config file."
)
datadog_api_key = get_value("DATADOG_API_KEY", ["datadog", "api_key"])
if not datadog_api_key:
raise ValueError(
"Datadog API key required. Set DATADOG_API_KEY environment variable "
"or provide it in config file."
)
return Config(
whisker=WhiskerConfig(
username=whisker_username,
password=whisker_password,
),
datadog=DatadogConfig(
api_key=datadog_api_key,
app_key=get_value("DATADOG_APP_KEY", ["datadog", "app_key"]),
site=get_value("DATADOG_SITE", ["datadog", "site"], "datadoghq.com"),
metric_prefix=get_value(
"DATACAT_METRIC_PREFIX", ["datadog", "metric_prefix"], "litterrobot"
),
),
collector=CollectorConfig(
poll_interval_seconds=get_int(
"DATACAT_POLL_INTERVAL", ["collector", "poll_interval_seconds"], 120
),
include_pets=get_bool("DATACAT_INCLUDE_PETS", ["collector", "include_pets"], True),
emit_events=get_bool("DATACAT_EMIT_EVENTS", ["collector", "emit_events"], True),
),
)

167
src/datacat/main.py Normal file
View File

@@ -0,0 +1,167 @@
"""Main application entry point."""
import argparse
import asyncio
import logging
import signal
import sys
from pathlib import Path
from datacat.collector import Collector
from datacat.config import load_config
from datacat.metrics import MetricsClient
logger = logging.getLogger(__name__)
class Datacat:
"""Main application class for Datacat."""
def __init__(self, config_path: str | Path | None = None):
"""Initialize Datacat application."""
self.config = load_config(config_path)
self.collector = Collector(self.config)
self.metrics_client = MetricsClient(self.config.datadog)
self._shutdown_event = asyncio.Event()
async def run(self) -> None:
"""Run the main collection loop."""
logger.info("Starting Datacat...")
logger.info(
"Poll interval: %d seconds, Include pets: %s, Emit events: %s",
self.config.collector.poll_interval_seconds,
self.config.collector.include_pets,
self.config.collector.emit_events,
)
try:
await self.collector.connect()
while not self._shutdown_event.is_set():
try:
await self._collect_and_submit()
except Exception as e:
logger.error("Collection cycle failed: %s", e)
# Wait for next collection cycle or shutdown
try:
await asyncio.wait_for(
self._shutdown_event.wait(),
timeout=self.config.collector.poll_interval_seconds,
)
except asyncio.TimeoutError:
pass # Normal timeout, continue to next cycle
finally:
await self.collector.disconnect()
self.metrics_client.close()
logger.info("Datacat stopped.")
async def _collect_and_submit(self) -> None:
"""Collect metrics and submit to Datadog."""
logger.debug("Starting collection cycle...")
result = await self.collector.collect()
if result.errors:
for error in result.errors:
logger.warning("Collection error: %s", error)
metrics_count, events_count, errors_count = self.metrics_client.submit(result)
logger.info(
"Collection complete: %d metrics, %d events submitted (%d errors)",
metrics_count,
events_count,
errors_count,
)
def shutdown(self) -> None:
"""Signal the application to shut down."""
logger.info("Shutdown requested...")
self._shutdown_event.set()
def setup_logging(verbose: bool = False) -> None:
"""Configure logging."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
# Reduce noise from third-party libraries
logging.getLogger("aiohttp").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("botocore").setLevel(logging.WARNING)
def parse_args() -> argparse.Namespace:
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
prog="datacat",
description="Collect metrics from Whisker Litter Robot devices and submit to Datadog",
)
parser.add_argument(
"-c",
"--config",
type=str,
help="Path to configuration file (YAML)",
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="Enable verbose logging",
)
parser.add_argument(
"--once",
action="store_true",
help="Run a single collection cycle and exit",
)
return parser.parse_args()
async def async_main(args: argparse.Namespace) -> None:
"""Async main function."""
app = Datacat(config_path=args.config)
# Set up signal handlers
loop = asyncio.get_running_loop()
def signal_handler():
app.shutdown()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, signal_handler)
if args.once:
# Single collection mode
try:
await app.collector.connect()
await app._collect_and_submit()
finally:
await app.collector.disconnect()
app.metrics_client.close()
else:
# Continuous collection mode
await app.run()
def main() -> None:
"""Main entry point."""
args = parse_args()
setup_logging(verbose=args.verbose)
try:
asyncio.run(async_main(args))
except KeyboardInterrupt:
logger.info("Interrupted by user")
except Exception as e:
logger.exception("Fatal error: %s", e)
sys.exit(1)
if __name__ == "__main__":
main()

137
src/datacat/metrics.py Normal file
View File

@@ -0,0 +1,137 @@
"""Datadog metrics submission module."""
import logging
import time
from datadog import DogStatsd, initialize
from datadog.api import Event as DDEvent
from datacat.config import DatadogConfig
from datacat.models import CollectionResult, Event, Metric, MetricType
logger = logging.getLogger(__name__)
class MetricsClient:
"""Client for submitting metrics and events to Datadog."""
def __init__(self, config: DatadogConfig):
"""Initialize the Datadog metrics client."""
self.config = config
self._statsd: DogStatsd | None = None
# Initialize the Datadog API client for events
initialize(
api_key=config.api_key,
app_key=config.app_key,
api_host=f"https://api.{config.site}",
)
def _get_statsd(self) -> DogStatsd:
"""Get or create the DogStatsd client."""
if self._statsd is None:
self._statsd = DogStatsd(
constant_tags=[],
telemetry_min_flush_interval=0,
)
return self._statsd
def submit(self, result: CollectionResult) -> tuple[int, int, int]:
"""Submit collection results to Datadog.
Returns:
Tuple of (metrics_submitted, events_submitted, errors)
"""
metrics_count = 0
events_count = 0
errors_count = 0
# Submit robot metrics
for robot in result.robots:
for metric in robot.metrics:
try:
self._submit_metric(metric)
metrics_count += 1
except Exception as e:
logger.error("Failed to submit metric %s: %s", metric.name, e)
errors_count += 1
for event in robot.events:
try:
self._submit_event(event)
events_count += 1
except Exception as e:
logger.error("Failed to submit event %s: %s", event.title, e)
errors_count += 1
# Submit pet metrics
for pet in result.pets:
for metric in pet.metrics:
try:
self._submit_metric(metric)
metrics_count += 1
except Exception as e:
logger.error("Failed to submit metric %s: %s", metric.name, e)
errors_count += 1
for event in pet.events:
try:
self._submit_event(event)
events_count += 1
except Exception as e:
logger.error("Failed to submit event %s: %s", event.title, e)
errors_count += 1
# Flush any buffered metrics
try:
statsd = self._get_statsd()
statsd.flush()
except Exception as e:
logger.warning("Failed to flush metrics: %s", e)
return metrics_count, events_count, errors_count
def _submit_metric(self, metric: Metric) -> None:
"""Submit a single metric to Datadog."""
statsd = self._get_statsd()
timestamp = metric.timestamp.timestamp() if metric.timestamp else time.time()
logger.debug(
"Submitting metric: %s = %s (tags: %s)",
metric.name,
metric.value,
metric.tags,
)
match metric.metric_type:
case MetricType.GAUGE:
statsd.gauge(metric.name, metric.value, tags=metric.tags)
case MetricType.COUNT:
statsd.increment(metric.name, metric.value, tags=metric.tags)
case MetricType.RATE:
statsd.increment(metric.name, metric.value, tags=metric.tags)
def _submit_event(self, event: Event) -> None:
"""Submit a single event to Datadog."""
logger.debug("Submitting event: %s", event.title)
timestamp = int(event.timestamp.timestamp()) if event.timestamp else int(time.time())
DDEvent.create(
title=event.title,
text=event.text,
tags=event.tags,
alert_type=event.alert_type,
date_happened=timestamp,
source_type_name="datacat",
)
def close(self) -> None:
"""Close the metrics client."""
if self._statsd:
try:
self._statsd.flush()
self._statsd.close()
except Exception as e:
logger.warning("Error closing DogStatsd: %s", e)
self._statsd = None

68
src/datacat/models.py Normal file
View File

@@ -0,0 +1,68 @@
"""Data models for metrics collection."""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
class MetricType(Enum):
"""Datadog metric types."""
GAUGE = "gauge"
COUNT = "count"
RATE = "rate"
@dataclass
class Metric:
"""A single metric to be submitted to Datadog."""
name: str
value: float
metric_type: MetricType = MetricType.GAUGE
tags: list[str] = field(default_factory=list)
timestamp: datetime | None = None
@dataclass
class Event:
"""A Datadog event."""
title: str
text: str
tags: list[str] = field(default_factory=list)
alert_type: str = "info" # info, warning, error, success
timestamp: datetime | None = None
@dataclass
class RobotMetrics:
"""Collection of metrics from a single robot."""
robot_id: str
robot_serial: str
robot_name: str
robot_model: str
metrics: list[Metric] = field(default_factory=list)
events: list[Event] = field(default_factory=list)
@dataclass
class PetMetrics:
"""Collection of metrics from a pet profile."""
pet_id: str
pet_name: str
pet_type: str | None
metrics: list[Metric] = field(default_factory=list)
events: list[Event] = field(default_factory=list)
@dataclass
class CollectionResult:
"""Result of a metrics collection cycle."""
timestamp: datetime
robots: list[RobotMetrics] = field(default_factory=list)
pets: list[PetMetrics] = field(default_factory=list)
errors: list[str] = field(default_factory=list)