Building Custom Behaviors with Reachy Mini

Unlock the full potential of your Reachy Mini robot by creating sophisticated custom behaviors using state machines, event handling, and advanced programming patterns.

Creating custom behaviors is where Reachy Mini truly shines as a development platform. Beyond basic movement and predefined actions, you can build complex, intelligent behaviors that respond to environmental changes, user interactions, and internal states. This guide explores advanced programming techniques for creating sophisticated robot behaviors.

Understanding Behavior-Driven Architecture

Behavior-driven architecture organizes robot functionality into discrete, manageable behaviors that can be combined, prioritized, and coordinated. This approach enables complex robot applications while maintaining code clarity and reliability.

Key Benefits: Modular design, easier debugging, reusable components, and the ability to create complex emergent behaviors from simple building blocks.

Core Behavior Structure

Every behavior should follow a consistent structure with initialization, execution, and cleanup phases. This pattern ensures predictable behavior and proper resource management.

from abc import ABC, abstractmethod from reachy_sdk import ReachySDK import time import threading from enum import Enum class BehaviorState(Enum): INACTIVE = "inactive" ACTIVE = "active" PAUSED = "paused" COMPLETED = "completed" ERROR = "error" class BaseBehavior(ABC): def __init__(self, name, priority=1): self.name = name self.priority = priority self.state = BehaviorState.INACTIVE self.reachy = None self.start_time = None self.duration = None self.parameters = {} def initialize(self, reachy, **kwargs): """Initialize behavior with robot instance and parameters""" self.reachy = reachy self.parameters.update(kwargs) self.state = BehaviorState.ACTIVE self.start_time = time.time() self.on_initialize() def execute(self): """Main execution loop for the behavior""" if self.state == BehaviorState.ACTIVE: try: result = self.on_execute() if result == "completed": self.state = BehaviorState.COMPLETED self.on_complete() return result except Exception as e: self.state = BehaviorState.ERROR self.on_error(e) return "error" return self.state.value def cleanup(self): """Cleanup resources and finalize behavior""" self.on_cleanup() self.state = BehaviorState.INACTIVE @abstractmethod def on_initialize(self): """Override to implement initialization logic""" pass @abstractmethod def on_execute(self): """Override to implement main behavior logic""" pass def on_complete(self): """Override to implement completion logic""" pass def on_cleanup(self): """Override to implement cleanup logic""" pass def on_error(self, error): """Override to implement error handling""" print(f"Behavior {self.name} error: {error}")

Implementing State Machines

State machines provide a powerful framework for managing complex robot behaviors with multiple phases, conditions, and transitions. They enable clear logic flow and predictable behavior responses.

Finite State Machine Implementation

A well-designed state machine clearly defines possible states, valid transitions, and trigger conditions. This approach is particularly effective for interactive behaviors and multi-step processes.

class InteractionStateMachine: def __init__(self, reachy): self.reachy = reachy self.current_state = "idle" self.previous_state = None self.state_data = {} # Define state transitions self.transitions = { "idle": ["detecting", "greeting", "shutdown"], "detecting": ["idle", "person_detected", "tracking"], "person_detected": ["greeting", "idle", "tracking"], "greeting": ["conversing", "idle", "waiting"], "conversing": ["listening", "speaking", "idle"], "listening": ["processing", "conversing", "idle"], "processing": ["speaking", "action_required", "idle"], "speaking": ["listening", "conversing", "idle"], "action_required": ["performing_action", "idle"], "performing_action": ["completed", "error", "idle"], "waiting": ["idle", "greeting", "timeout"], "tracking": ["person_detected", "idle", "lost_target"], "completed": ["idle", "greeting"], "error": ["idle", "recovery"], "recovery": ["idle", "error"] } # State handlers self.state_handlers = { "idle": self.handle_idle, "detecting": self.handle_detecting, "person_detected": self.handle_person_detected, "greeting": self.handle_greeting, "conversing": self.handle_conversing, "listening": self.handle_listening, "processing": self.handle_processing, "speaking": self.handle_speaking, "action_required": self.handle_action_required, "performing_action": self.handle_performing_action, "waiting": self.handle_waiting, "tracking": self.handle_tracking, "completed": self.handle_completed, "error": self.handle_error, "recovery": self.handle_recovery } def transition_to(self, new_state, **data): """Safely transition to a new state""" if new_state in self.transitions[self.current_state]: self.previous_state = self.current_state self.current_state = new_state self.state_data.update(data) print(f"State transition: {self.previous_state} -> {self.current_state}") return True else: print(f"Invalid transition: {self.current_state} -> {new_state}") return False def update(self): """Execute current state handler""" if self.current_state in self.state_handlers: return self.state_handlers[self.current_state]() return "unknown_state" def handle_idle(self): """Robot is idle, waiting for stimuli""" # Check for person detection if self.detect_person_nearby(): self.transition_to("detecting") return "continue" def handle_detecting(self): """Actively scanning for people""" person_data = self.scan_for_people() if person_data: self.transition_to("person_detected", person=person_data) elif self.detection_timeout(): self.transition_to("idle") return "continue" def handle_greeting(self): """Performing greeting sequence""" if self.execute_greeting_sequence(): self.transition_to("conversing") else: self.transition_to("idle") return "continue"

Event-Driven Behavior System

Event-driven systems allow behaviors to respond dynamically to various stimuli including sensor data, user commands, and internal state changes. This approach creates more responsive and interactive robot applications.

Performance Consideration: Event systems can generate high processing loads. Implement proper event filtering and prioritization to maintain real-time performance.

Event Manager Implementation

A centralized event manager coordinates between different robot subsystems and ensures proper event handling and response prioritization.

import queue import threading from dataclasses import dataclass from typing import Any, Callable, Dict, List @dataclass class Event: type: str data: Any = None priority: int = 1 timestamp: float = None source: str = None def __post_init__(self): if self.timestamp is None: self.timestamp = time.time() class EventManager: def __init__(self): self.event_queue = queue.PriorityQueue() self.subscribers = {} self.running = False self.processing_thread = None def subscribe(self, event_type: str, callback: Callable): """Subscribe to specific event types""" if event_type not in self.subscribers: self.subscribers[event_type] = [] self.subscribers[event_type].append(callback) def unsubscribe(self, event_type: str, callback: Callable): """Unsubscribe from event types""" if event_type in self.subscribers: self.subscribers[event_type].remove(callback) def publish(self, event: Event): """Publish event to the system""" self.event_queue.put((-event.priority, event.timestamp, event)) def start_processing(self): """Start event processing loop""" self.running = True self.processing_thread = threading.Thread(target=self._process_events) self.processing_thread.daemon = True self.processing_thread.start() def stop_processing(self): """Stop event processing""" self.running = False if self.processing_thread: self.processing_thread.join() def _process_events(self): """Internal event processing loop""" while self.running: try: # Get event with timeout priority, timestamp, event = self.event_queue.get(timeout=0.1) # Notify subscribers if event.type in self.subscribers: for callback in self.subscribers[event.type]: try: callback(event) except Exception as e: print(f"Event callback error: {e}") self.event_queue.task_done() except queue.Empty: continue except Exception as e: print(f"Event processing error: {e}")

Complex Behavior Coordination

Real-world robot applications often require multiple behaviors to work together harmoniously. Behavior coordination systems manage priorities, resource conflicts, and smooth transitions between different operational modes.

Behavior Arbiter System

A behavior arbiter manages multiple concurrent behaviors, resolving conflicts and ensuring system stability while maintaining responsive operation.

class BehaviorArbiter: def __init__(self, reachy): self.reachy = reachy self.active_behaviors = {} self.behavior_queue = queue.PriorityQueue() self.resource_locks = {} self.running = False def register_behavior(self, behavior: BaseBehavior): """Register a behavior with the arbiter""" behavior_id = f"{behavior.name}_{id(behavior)}" self.active_behaviors[behavior_id] = behavior return behavior_id def request_execution(self, behavior_id: str, **kwargs): """Request behavior execution with priority""" if behavior_id in self.active_behaviors: behavior = self.active_behaviors[behavior_id] priority_score = -behavior.priority # Higher priority = lower number self.behavior_queue.put((priority_score, time.time(), behavior_id, kwargs)) return True return False def execute_behaviors(self): """Main execution loop for behavior coordination""" while self.running: try: if not self.behavior_queue.empty(): priority, timestamp, behavior_id, kwargs = self.behavior_queue.get_nowait() if behavior_id in self.active_behaviors: behavior = self.active_behaviors[behavior_id] # Check resource availability if self.can_execute_behavior(behavior): self.execute_single_behavior(behavior, **kwargs) else: # Re-queue with lower priority self.behavior_queue.put((priority + 1, time.time(), behavior_id, kwargs)) time.sleep(0.01) # Prevent busy waiting except queue.Empty: time.sleep(0.1) except Exception as e: print(f"Behavior execution error: {e}") def can_execute_behavior(self, behavior): """Check if behavior can execute based on resource availability""" required_resources = getattr(behavior, 'required_resources', []) for resource in required_resources: if resource in self.resource_locks and self.resource_locks[resource]: return False return True def acquire_resources(self, behavior): """Acquire resources for behavior execution""" required_resources = getattr(behavior, 'required_resources', []) for resource in required_resources: self.resource_locks[resource] = behavior.name def release_resources(self, behavior): """Release resources after behavior completion""" required_resources = getattr(behavior, 'required_resources', []) for resource in required_resources: if resource in self.resource_locks: self.resource_locks[resource] = None

Sensor Integration and Reactive Behaviors

Reactive behaviors respond immediately to sensor input, creating dynamic and responsive robot interactions. This approach is essential for safety systems and natural user interactions.

Design Principle: Reactive behaviors should have the highest priority and be able to interrupt other behaviors for safety and responsiveness.

Sensor-Driven Behavior Example

This example demonstrates how to create behaviors that respond to real-time sensor data while maintaining system stability and predictable operation.

class ProximityReactiveBehavior(BaseBehavior): def __init__(self): super().__init__("proximity_reactive", priority=10) # High priority self.proximity_threshold = 30 # cm self.last_detection_time = 0 self.reaction_cooldown = 2.0 # seconds def on_initialize(self): """Setup proximity monitoring""" self.required_resources = ['head_movement'] print("Proximity reactive behavior initialized") def on_execute(self): """Monitor proximity and react accordingly""" current_time = time.time() # Get proximity data (simulated - replace with actual sensor) proximity_distance = self.get_proximity_distance() if proximity_distance < self.proximity_threshold: if current_time - self.last_detection_time > self.reaction_cooldown: self.execute_proximity_reaction(proximity_distance) self.last_detection_time = current_time return "continue" # Reactive behaviors typically run continuously def get_proximity_distance(self): """Get distance from proximity sensor""" # Simulate sensor reading - replace with actual implementation import random return random.uniform(10, 100) def execute_proximity_reaction(self, distance): """Execute reaction based on proximity distance""" if distance < 15: # Very close - back away self.reachy.head.look_at(0.5, 0.0, 0.8, duration=0.5) print(f"Object very close ({distance:.1f}cm) - backing away") elif distance < 25: # Moderately close - acknowledge self.reachy.head.look_at(0.3, 0.0, 0.5, duration=0.8) print(f"Object detected ({distance:.1f}cm) - acknowledging") else: # At threshold - mild attention self.reachy.head.look_at(0.0, 0.0, 0.0, duration=1.0) print(f"Object at threshold ({distance:.1f}cm) - mild attention") class EmotionalResponseBehavior(BaseBehavior): def __init__(self): super().__init__("emotional_response", priority=5) self.emotional_state = "neutral" self.state_history = [] def on_initialize(self): """Initialize emotional response system""" self.emotional_states = { "happy": {"head_tilt": 0.2, "duration": 2.0}, "curious": {"head_tilt": -0.1, "duration": 1.5}, "concerned": {"head_tilt": 0.0, "duration": 3.0}, "neutral": {"head_tilt": 0.0, "duration": 1.0} } def on_execute(self): """Process emotional responses""" # Analyze recent interactions to determine emotional state new_emotion = self.analyze_interaction_context() if new_emotion != self.emotional_state: self.transition_emotion(new_emotion) return "continue" def analyze_interaction_context(self): """Analyze context to determine appropriate emotional response""" # Simplified emotion analysis - replace with more sophisticated logic recent_events = self.get_recent_events() if "proximity_alert" in recent_events: return "concerned" elif "positive_interaction" in recent_events: return "happy" elif "new_person" in recent_events: return "curious" else: return "neutral" def transition_emotion(self, new_emotion): """Smoothly transition to new emotional state""" if new_emotion in self.emotional_states: state_config = self.emotional_states[new_emotion] # Execute physical expression of emotion self.reachy.head.look_at( 0.0, state_config["head_tilt"], 0.0, duration=state_config["duration"] ) self.emotional_state = new_emotion self.state_history.append({ "emotion": new_emotion, "timestamp": time.time() }) print(f"Emotional transition to: {new_emotion}")

Performance Optimization and Debugging

Complex behavior systems require careful optimization and debugging tools to maintain reliable operation. Implementing proper logging, performance monitoring, and diagnostic tools ensures robust robot applications.

Behavior Monitoring System

A comprehensive monitoring system tracks behavior performance, resource usage, and system health, enabling proactive maintenance and optimization.

class BehaviorMonitor: def __init__(self): self.behavior_metrics = {} self.system_metrics = {} self.performance_log = [] def record_behavior_execution(self, behavior_name, execution_time, success): """Record behavior performance metrics""" if behavior_name not in self.behavior_metrics: self.behavior_metrics[behavior_name] = { "total_executions": 0, "successful_executions": 0, "total_time": 0.0, "average_time": 0.0, "success_rate": 0.0 } metrics = self.behavior_metrics[behavior_name] metrics["total_executions"] += 1 metrics["total_time"] += execution_time if success: metrics["successful_executions"] += 1 metrics["average_time"] = metrics["total_time"] / metrics["total_executions"] metrics["success_rate"] = metrics["successful_executions"] / metrics["total_executions"] # Log performance data self.performance_log.append({ "timestamp": time.time(), "behavior": behavior_name, "execution_time": execution_time, "success": success }) def get_system_status(self): """Generate comprehensive system status report""" status = { "total_behaviors": len(self.behavior_metrics), "system_uptime": time.time() - self.start_time, "behavior_summary": {} } for behavior_name, metrics in self.behavior_metrics.items(): status["behavior_summary"][behavior_name] = { "executions": metrics["total_executions"], "avg_time": round(metrics["average_time"], 3), "success_rate": round(metrics["success_rate"] * 100, 1) } return status

Conclusion

Building custom behaviors transforms your Reachy Mini from a programmable robot into an intelligent, responsive system capable of complex interactions and autonomous operation. By implementing proper architecture patterns, state management, and event-driven systems, you create robust and scalable robot applications.

The techniques covered in this guide provide a solid foundation for advanced robot development. As you continue developing, focus on modular design, proper resource management, and comprehensive testing to ensure reliable operation in diverse environments.

Best Practices: Always implement proper error handling, resource cleanup, and performance monitoring in your custom behaviors. Test thoroughly in controlled environments before deploying complex behavior systems.