Creating custom behaviors is where Reachy Mini truly shines as a development platform. Beyond basic movement and predefined actions, you can build complex, intelligent behaviors that respond to environmental changes, user interactions, and internal states. This guide explores advanced programming techniques for creating sophisticated robot behaviors.
Understanding Behavior-Driven Architecture
Behavior-driven architecture organizes robot functionality into discrete, manageable behaviors that can be combined, prioritized, and coordinated. This approach enables complex robot applications while maintaining code clarity and reliability.
Key Benefits: Modular design, easier debugging, reusable components, and the ability to create complex emergent behaviors from simple building blocks.
Core Behavior Structure
Every behavior should follow a consistent structure with initialization, execution, and cleanup phases. This pattern ensures predictable behavior and proper resource management.
from abc import ABC, abstractmethod
from reachy_sdk import ReachySDK
import time
import threading
from enum import Enum
class BehaviorState(Enum):
INACTIVE = "inactive"
ACTIVE = "active"
PAUSED = "paused"
COMPLETED = "completed"
ERROR = "error"
class BaseBehavior(ABC):
def __init__(self, name, priority=1):
self.name = name
self.priority = priority
self.state = BehaviorState.INACTIVE
self.reachy = None
self.start_time = None
self.duration = None
self.parameters = {}
def initialize(self, reachy, **kwargs):
"""Initialize behavior with robot instance and parameters"""
self.reachy = reachy
self.parameters.update(kwargs)
self.state = BehaviorState.ACTIVE
self.start_time = time.time()
self.on_initialize()
def execute(self):
"""Main execution loop for the behavior"""
if self.state == BehaviorState.ACTIVE:
try:
result = self.on_execute()
if result == "completed":
self.state = BehaviorState.COMPLETED
self.on_complete()
return result
except Exception as e:
self.state = BehaviorState.ERROR
self.on_error(e)
return "error"
return self.state.value
def cleanup(self):
"""Cleanup resources and finalize behavior"""
self.on_cleanup()
self.state = BehaviorState.INACTIVE
@abstractmethod
def on_initialize(self):
"""Override to implement initialization logic"""
pass
@abstractmethod
def on_execute(self):
"""Override to implement main behavior logic"""
pass
def on_complete(self):
"""Override to implement completion logic"""
pass
def on_cleanup(self):
"""Override to implement cleanup logic"""
pass
def on_error(self, error):
"""Override to implement error handling"""
print(f"Behavior {self.name} error: {error}")
Implementing State Machines
State machines provide a powerful framework for managing complex robot behaviors with multiple phases, conditions, and transitions. They enable clear logic flow and predictable behavior responses.
Finite State Machine Implementation
A well-designed state machine clearly defines possible states, valid transitions, and trigger conditions. This approach is particularly effective for interactive behaviors and multi-step processes.
class InteractionStateMachine:
def __init__(self, reachy):
self.reachy = reachy
self.current_state = "idle"
self.previous_state = None
self.state_data = {}
# Define state transitions
self.transitions = {
"idle": ["detecting", "greeting", "shutdown"],
"detecting": ["idle", "person_detected", "tracking"],
"person_detected": ["greeting", "idle", "tracking"],
"greeting": ["conversing", "idle", "waiting"],
"conversing": ["listening", "speaking", "idle"],
"listening": ["processing", "conversing", "idle"],
"processing": ["speaking", "action_required", "idle"],
"speaking": ["listening", "conversing", "idle"],
"action_required": ["performing_action", "idle"],
"performing_action": ["completed", "error", "idle"],
"waiting": ["idle", "greeting", "timeout"],
"tracking": ["person_detected", "idle", "lost_target"],
"completed": ["idle", "greeting"],
"error": ["idle", "recovery"],
"recovery": ["idle", "error"]
}
# State handlers
self.state_handlers = {
"idle": self.handle_idle,
"detecting": self.handle_detecting,
"person_detected": self.handle_person_detected,
"greeting": self.handle_greeting,
"conversing": self.handle_conversing,
"listening": self.handle_listening,
"processing": self.handle_processing,
"speaking": self.handle_speaking,
"action_required": self.handle_action_required,
"performing_action": self.handle_performing_action,
"waiting": self.handle_waiting,
"tracking": self.handle_tracking,
"completed": self.handle_completed,
"error": self.handle_error,
"recovery": self.handle_recovery
}
def transition_to(self, new_state, **data):
"""Safely transition to a new state"""
if new_state in self.transitions[self.current_state]:
self.previous_state = self.current_state
self.current_state = new_state
self.state_data.update(data)
print(f"State transition: {self.previous_state} -> {self.current_state}")
return True
else:
print(f"Invalid transition: {self.current_state} -> {new_state}")
return False
def update(self):
"""Execute current state handler"""
if self.current_state in self.state_handlers:
return self.state_handlers[self.current_state]()
return "unknown_state"
def handle_idle(self):
"""Robot is idle, waiting for stimuli"""
# Check for person detection
if self.detect_person_nearby():
self.transition_to("detecting")
return "continue"
def handle_detecting(self):
"""Actively scanning for people"""
person_data = self.scan_for_people()
if person_data:
self.transition_to("person_detected", person=person_data)
elif self.detection_timeout():
self.transition_to("idle")
return "continue"
def handle_greeting(self):
"""Performing greeting sequence"""
if self.execute_greeting_sequence():
self.transition_to("conversing")
else:
self.transition_to("idle")
return "continue"
Event-Driven Behavior System
Event-driven systems allow behaviors to respond dynamically to various stimuli including sensor data, user commands, and internal state changes. This approach creates more responsive and interactive robot applications.
Performance Consideration: Event systems can generate high processing loads. Implement proper event filtering and prioritization to maintain real-time performance.
Event Manager Implementation
A centralized event manager coordinates between different robot subsystems and ensures proper event handling and response prioritization.
import queue
import threading
from dataclasses import dataclass
from typing import Any, Callable, Dict, List
@dataclass
class Event:
type: str
data: Any = None
priority: int = 1
timestamp: float = None
source: str = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
class EventManager:
def __init__(self):
self.event_queue = queue.PriorityQueue()
self.subscribers = {}
self.running = False
self.processing_thread = None
def subscribe(self, event_type: str, callback: Callable):
"""Subscribe to specific event types"""
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(callback)
def unsubscribe(self, event_type: str, callback: Callable):
"""Unsubscribe from event types"""
if event_type in self.subscribers:
self.subscribers[event_type].remove(callback)
def publish(self, event: Event):
"""Publish event to the system"""
self.event_queue.put((-event.priority, event.timestamp, event))
def start_processing(self):
"""Start event processing loop"""
self.running = True
self.processing_thread = threading.Thread(target=self._process_events)
self.processing_thread.daemon = True
self.processing_thread.start()
def stop_processing(self):
"""Stop event processing"""
self.running = False
if self.processing_thread:
self.processing_thread.join()
def _process_events(self):
"""Internal event processing loop"""
while self.running:
try:
# Get event with timeout
priority, timestamp, event = self.event_queue.get(timeout=0.1)
# Notify subscribers
if event.type in self.subscribers:
for callback in self.subscribers[event.type]:
try:
callback(event)
except Exception as e:
print(f"Event callback error: {e}")
self.event_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"Event processing error: {e}")
Complex Behavior Coordination
Real-world robot applications often require multiple behaviors to work together harmoniously. Behavior coordination systems manage priorities, resource conflicts, and smooth transitions between different operational modes.
Behavior Arbiter System
A behavior arbiter manages multiple concurrent behaviors, resolving conflicts and ensuring system stability while maintaining responsive operation.
class BehaviorArbiter:
def __init__(self, reachy):
self.reachy = reachy
self.active_behaviors = {}
self.behavior_queue = queue.PriorityQueue()
self.resource_locks = {}
self.running = False
def register_behavior(self, behavior: BaseBehavior):
"""Register a behavior with the arbiter"""
behavior_id = f"{behavior.name}_{id(behavior)}"
self.active_behaviors[behavior_id] = behavior
return behavior_id
def request_execution(self, behavior_id: str, **kwargs):
"""Request behavior execution with priority"""
if behavior_id in self.active_behaviors:
behavior = self.active_behaviors[behavior_id]
priority_score = -behavior.priority # Higher priority = lower number
self.behavior_queue.put((priority_score, time.time(), behavior_id, kwargs))
return True
return False
def execute_behaviors(self):
"""Main execution loop for behavior coordination"""
while self.running:
try:
if not self.behavior_queue.empty():
priority, timestamp, behavior_id, kwargs = self.behavior_queue.get_nowait()
if behavior_id in self.active_behaviors:
behavior = self.active_behaviors[behavior_id]
# Check resource availability
if self.can_execute_behavior(behavior):
self.execute_single_behavior(behavior, **kwargs)
else:
# Re-queue with lower priority
self.behavior_queue.put((priority + 1, time.time(), behavior_id, kwargs))
time.sleep(0.01) # Prevent busy waiting
except queue.Empty:
time.sleep(0.1)
except Exception as e:
print(f"Behavior execution error: {e}")
def can_execute_behavior(self, behavior):
"""Check if behavior can execute based on resource availability"""
required_resources = getattr(behavior, 'required_resources', [])
for resource in required_resources:
if resource in self.resource_locks and self.resource_locks[resource]:
return False
return True
def acquire_resources(self, behavior):
"""Acquire resources for behavior execution"""
required_resources = getattr(behavior, 'required_resources', [])
for resource in required_resources:
self.resource_locks[resource] = behavior.name
def release_resources(self, behavior):
"""Release resources after behavior completion"""
required_resources = getattr(behavior, 'required_resources', [])
for resource in required_resources:
if resource in self.resource_locks:
self.resource_locks[resource] = None
Sensor Integration and Reactive Behaviors
Reactive behaviors respond immediately to sensor input, creating dynamic and responsive robot interactions. This approach is essential for safety systems and natural user interactions.
Design Principle: Reactive behaviors should have the highest priority and be able to interrupt other behaviors for safety and responsiveness.
Sensor-Driven Behavior Example
This example demonstrates how to create behaviors that respond to real-time sensor data while maintaining system stability and predictable operation.
class ProximityReactiveBehavior(BaseBehavior):
def __init__(self):
super().__init__("proximity_reactive", priority=10) # High priority
self.proximity_threshold = 30 # cm
self.last_detection_time = 0
self.reaction_cooldown = 2.0 # seconds
def on_initialize(self):
"""Setup proximity monitoring"""
self.required_resources = ['head_movement']
print("Proximity reactive behavior initialized")
def on_execute(self):
"""Monitor proximity and react accordingly"""
current_time = time.time()
# Get proximity data (simulated - replace with actual sensor)
proximity_distance = self.get_proximity_distance()
if proximity_distance < self.proximity_threshold:
if current_time - self.last_detection_time > self.reaction_cooldown:
self.execute_proximity_reaction(proximity_distance)
self.last_detection_time = current_time
return "continue" # Reactive behaviors typically run continuously
def get_proximity_distance(self):
"""Get distance from proximity sensor"""
# Simulate sensor reading - replace with actual implementation
import random
return random.uniform(10, 100)
def execute_proximity_reaction(self, distance):
"""Execute reaction based on proximity distance"""
if distance < 15:
# Very close - back away
self.reachy.head.look_at(0.5, 0.0, 0.8, duration=0.5)
print(f"Object very close ({distance:.1f}cm) - backing away")
elif distance < 25:
# Moderately close - acknowledge
self.reachy.head.look_at(0.3, 0.0, 0.5, duration=0.8)
print(f"Object detected ({distance:.1f}cm) - acknowledging")
else:
# At threshold - mild attention
self.reachy.head.look_at(0.0, 0.0, 0.0, duration=1.0)
print(f"Object at threshold ({distance:.1f}cm) - mild attention")
class EmotionalResponseBehavior(BaseBehavior):
def __init__(self):
super().__init__("emotional_response", priority=5)
self.emotional_state = "neutral"
self.state_history = []
def on_initialize(self):
"""Initialize emotional response system"""
self.emotional_states = {
"happy": {"head_tilt": 0.2, "duration": 2.0},
"curious": {"head_tilt": -0.1, "duration": 1.5},
"concerned": {"head_tilt": 0.0, "duration": 3.0},
"neutral": {"head_tilt": 0.0, "duration": 1.0}
}
def on_execute(self):
"""Process emotional responses"""
# Analyze recent interactions to determine emotional state
new_emotion = self.analyze_interaction_context()
if new_emotion != self.emotional_state:
self.transition_emotion(new_emotion)
return "continue"
def analyze_interaction_context(self):
"""Analyze context to determine appropriate emotional response"""
# Simplified emotion analysis - replace with more sophisticated logic
recent_events = self.get_recent_events()
if "proximity_alert" in recent_events:
return "concerned"
elif "positive_interaction" in recent_events:
return "happy"
elif "new_person" in recent_events:
return "curious"
else:
return "neutral"
def transition_emotion(self, new_emotion):
"""Smoothly transition to new emotional state"""
if new_emotion in self.emotional_states:
state_config = self.emotional_states[new_emotion]
# Execute physical expression of emotion
self.reachy.head.look_at(
0.0,
state_config["head_tilt"],
0.0,
duration=state_config["duration"]
)
self.emotional_state = new_emotion
self.state_history.append({
"emotion": new_emotion,
"timestamp": time.time()
})
print(f"Emotional transition to: {new_emotion}")
Performance Optimization and Debugging
Complex behavior systems require careful optimization and debugging tools to maintain reliable operation. Implementing proper logging, performance monitoring, and diagnostic tools ensures robust robot applications.
Behavior Monitoring System
A comprehensive monitoring system tracks behavior performance, resource usage, and system health, enabling proactive maintenance and optimization.
class BehaviorMonitor:
def __init__(self):
self.behavior_metrics = {}
self.system_metrics = {}
self.performance_log = []
def record_behavior_execution(self, behavior_name, execution_time, success):
"""Record behavior performance metrics"""
if behavior_name not in self.behavior_metrics:
self.behavior_metrics[behavior_name] = {
"total_executions": 0,
"successful_executions": 0,
"total_time": 0.0,
"average_time": 0.0,
"success_rate": 0.0
}
metrics = self.behavior_metrics[behavior_name]
metrics["total_executions"] += 1
metrics["total_time"] += execution_time
if success:
metrics["successful_executions"] += 1
metrics["average_time"] = metrics["total_time"] / metrics["total_executions"]
metrics["success_rate"] = metrics["successful_executions"] / metrics["total_executions"]
# Log performance data
self.performance_log.append({
"timestamp": time.time(),
"behavior": behavior_name,
"execution_time": execution_time,
"success": success
})
def get_system_status(self):
"""Generate comprehensive system status report"""
status = {
"total_behaviors": len(self.behavior_metrics),
"system_uptime": time.time() - self.start_time,
"behavior_summary": {}
}
for behavior_name, metrics in self.behavior_metrics.items():
status["behavior_summary"][behavior_name] = {
"executions": metrics["total_executions"],
"avg_time": round(metrics["average_time"], 3),
"success_rate": round(metrics["success_rate"] * 100, 1)
}
return status
Conclusion
Building custom behaviors transforms your Reachy Mini from a programmable robot into an intelligent, responsive system capable of complex interactions and autonomous operation. By implementing proper architecture patterns, state management, and event-driven systems, you create robust and scalable robot applications.
The techniques covered in this guide provide a solid foundation for advanced robot development. As you continue developing, focus on modular design, proper resource management, and comprehensive testing to ensure reliable operation in diverse environments.
Best Practices: Always implement proper error handling, resource cleanup, and performance monitoring in your custom behaviors. Test thoroughly in controlled environments before deploying complex behavior systems.