Machine learning integration elevates Reachy Mini from a programmable robot to an intelligent system capable of learning, adapting, and making autonomous decisions. This comprehensive guide explores how to implement various ML models, optimize inference performance, and create truly intelligent robotic behaviors.
Machine Learning Architecture for Robotics
Successful ML integration in robotics requires careful consideration of computational constraints, real-time performance requirements, and model accuracy trade-offs. Reachy Mini's computational capabilities allow for both edge inference and cloud-based processing depending on your application needs.
Key Considerations: Model size vs accuracy, inference latency, power consumption, network connectivity requirements, and the ability to update models over time.
Setting Up the ML Environment
Before implementing machine learning models, establish a robust development environment that supports multiple frameworks and efficient model deployment.
# Install required ML libraries
pip install tensorflow torch torchvision opencv-python scikit-learn
pip install transformers huggingface-hub onnx onnxruntime
pip install mediapipe ultralytics
import tensorflow as tf
import torch
import cv2
import numpy as np
from reachy_sdk import ReachySDK
import threading
import time
from collections import deque
class MLIntegrationManager:
def __init__(self, reachy):
self.reachy = reachy
self.models = {}
self.inference_queue = deque(maxlen=10)
self.results_cache = {}
self.performance_metrics = {}
def load_model(self, model_name, model_path, framework='tensorflow'):
"""Load ML model with specified framework"""
try:
if framework == 'tensorflow':
model = tf.keras.models.load_model(model_path)
elif framework == 'pytorch':
model = torch.load(model_path)
model.eval()
elif framework == 'onnx':
import onnxruntime
model = onnxruntime.InferenceSession(model_path)
self.models[model_name] = {
'model': model,
'framework': framework,
'loaded_time': time.time()
}
print(f"Model {model_name} loaded successfully using {framework}")
return True
except Exception as e:
print(f"Failed to load model {model_name}: {e}")
return False
def preprocess_input(self, input_data, model_name):
"""Preprocess input data for specific model requirements"""
if model_name in ['object_detection', 'face_recognition']:
# Image preprocessing
if isinstance(input_data, np.ndarray):
# Resize and normalize
processed = cv2.resize(input_data, (224, 224))
processed = processed.astype(np.float32) / 255.0
return np.expand_dims(processed, axis=0)
elif model_name in ['nlp_intent', 'conversation']:
# Text preprocessing
return self.tokenize_text(input_data)
return input_data
def run_inference(self, model_name, input_data):
"""Run inference with specified model"""
if model_name not in self.models:
return None
start_time = time.time()
model_info = self.models[model_name]
model = model_info['model']
framework = model_info['framework']
# Preprocess input
processed_input = self.preprocess_input(input_data, model_name)
try:
if framework == 'tensorflow':
predictions = model.predict(processed_input)
elif framework == 'pytorch':
with torch.no_grad():
tensor_input = torch.FloatTensor(processed_input)
predictions = model(tensor_input).numpy()
elif framework == 'onnx':
input_name = model.get_inputs()[0].name
predictions = model.run(None, {input_name: processed_input})
# Record performance metrics
inference_time = time.time() - start_time
self.record_performance(model_name, inference_time, True)
return self.postprocess_output(predictions, model_name)
except Exception as e:
print(f"Inference error for {model_name}: {e}")
self.record_performance(model_name, time.time() - start_time, False)
return None
Computer Vision Integration
Computer vision capabilities transform how your robot perceives and interacts with its environment. From object detection to facial recognition, these models enable rich visual understanding.
Real-Time Object Detection
Implement real-time object detection using YOLO or similar models to enable your robot to identify and track objects in its environment.
from ultralytics import YOLO
import cv2
class ObjectDetectionSystem:
def __init__(self, reachy):
self.reachy = reachy
self.model = YOLO('yolov8n.pt') # Load YOLOv8 nano model
self.detection_history = deque(maxlen=30) # Store last 30 frames
self.tracking_objects = {}
def detect_objects_in_frame(self, frame):
"""Detect objects in a single frame"""
results = self.model(frame)
detections = []
for result in results:
boxes = result.boxes
if boxes is not None:
for box in boxes:
# Extract detection information
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
confidence = box.conf[0].cpu().numpy()
class_id = int(box.cls[0].cpu().numpy())
class_name = self.model.names[class_id]
if confidence > 0.5: # Confidence threshold
detection = {
'bbox': (int(x1), int(y1), int(x2), int(y2)),
'confidence': float(confidence),
'class': class_name,
'class_id': class_id
}
detections.append(detection)
return detections
def track_objects_continuous(self):
"""Continuously track objects using robot camera"""
cap = cv2.VideoCapture(0) # Use appropriate camera index
while True:
ret, frame = cap.read()
if not ret:
continue
# Detect objects in current frame
detections = self.detect_objects_in_frame(frame)
# Process detections for robot response
self.process_detections(detections, frame)
# Store detection history
self.detection_history.append({
'timestamp': time.time(),
'detections': detections,
'frame_shape': frame.shape
})
# Optional: Display annotated frame for debugging
annotated_frame = self.annotate_frame(frame, detections)
cv2.imshow('Reachy Vision', annotated_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
def process_detections(self, detections, frame):
"""Process detections and trigger appropriate robot responses"""
for detection in detections:
class_name = detection['class']
bbox = detection['bbox']
confidence = detection['confidence']
# Calculate object center for robot attention
center_x = (bbox[0] + bbox[2]) // 2
center_y = (bbox[1] + bbox[3]) // 2
# Convert to robot coordinate system
robot_target = self.pixel_to_robot_coordinates(
center_x, center_y, frame.shape
)
# Trigger appropriate behavior based on object class
if class_name == 'person' and confidence > 0.8:
self.respond_to_person(robot_target)
elif class_name in ['cup', 'bottle'] and confidence > 0.7:
self.respond_to_object(class_name, robot_target)
elif class_name == 'book' and confidence > 0.6:
self.respond_to_book(robot_target)
def pixel_to_robot_coordinates(self, pixel_x, pixel_y, frame_shape):
"""Convert pixel coordinates to robot coordinate system"""
height, width = frame_shape[:2]
# Normalize to [-1, 1] range
norm_x = (pixel_x - width/2) / (width/2)
norm_y = (pixel_y - height/2) / (height/2)
# Convert to robot coordinate system
robot_x = norm_x * 0.3 # Scale to robot workspace
robot_y = -norm_y * 0.3 # Invert Y axis
robot_z = 0.1 # Default height
return (robot_x, robot_y, robot_z)
def respond_to_person(self, target):
"""Respond to person detection"""
try:
# Look towards the person
self.reachy.head.look_at(target[0], target[1], target[2], duration=1.0)
print(f"Person detected, looking at {target}")
except Exception as e:
print(f"Error responding to person: {e}")
Natural Language Processing Integration
NLP integration enables your robot to understand and respond to natural language commands, creating more intuitive human-robot interactions.
Performance Note: Large language models can be computationally intensive. Consider using quantized models or cloud APIs for complex NLP tasks while maintaining simple on-device processing for basic commands.
Intent Recognition and Response Generation
Implement sophisticated intent recognition that can understand context and generate appropriate responses for natural conversation.
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
import torch
class NLPProcessor:
def __init__(self):
self.intent_classifier = None
self.text_generator = None
self.sentiment_analyzer = None
self.conversation_history = []
self.setup_models()
def setup_models(self):
"""Initialize NLP models"""
try:
# Intent classification model
self.intent_classifier = pipeline(
"text-classification",
model="microsoft/DialoGPT-medium",
device=0 if torch.cuda.is_available() else -1
)
# Sentiment analysis
self.sentiment_analyzer = pipeline(
"sentiment-analysis",
model="cardiffnlp/twitter-roberta-base-sentiment-latest",
device=0 if torch.cuda.is_available() else -1
)
# Text generation for responses
self.text_generator = pipeline(
"text-generation",
model="gpt2",
max_length=100,
device=0 if torch.cuda.is_available() else -1
)
print("NLP models loaded successfully")
except Exception as e:
print(f"Error loading NLP models: {e}")
self.setup_fallback_models()
def setup_fallback_models(self):
"""Setup simpler models if main models fail to load"""
self.simple_intents = {
'greeting': ['hello', 'hi', 'hey', 'good morning', 'good afternoon'],
'movement': ['move', 'go', 'turn', 'look', 'point'],
'question': ['what', 'why', 'how', 'when', 'where', 'who'],
'goodbye': ['bye', 'goodbye', 'see you', 'farewell'],
'compliment': ['good', 'great', 'awesome', 'nice', 'excellent'],
'request': ['please', 'can you', 'could you', 'would you']
}
def process_user_input(self, user_text):
"""Process user input and generate appropriate response"""
# Clean and normalize input
cleaned_text = self.preprocess_text(user_text)
# Analyze sentiment
sentiment = self.analyze_sentiment(cleaned_text)
# Classify intent
intent = self.classify_intent(cleaned_text)
# Generate response based on intent and sentiment
response = self.generate_response(cleaned_text, intent, sentiment)
# Store in conversation history
self.conversation_history.append({
'timestamp': time.time(),
'user_input': user_text,
'cleaned_text': cleaned_text,
'intent': intent,
'sentiment': sentiment,
'bot_response': response
})
return {
'response': response,
'intent': intent,
'sentiment': sentiment,
'confidence': 0.8 # Placeholder - use actual model confidence
}
def classify_intent(self, text):
"""Classify the intent of user input"""
if self.intent_classifier:
try:
result = self.intent_classifier(text)
return result[0]['label']
except Exception as e:
print(f"Intent classification error: {e}")
# Fallback to simple keyword matching
text_lower = text.lower()
for intent, keywords in self.simple_intents.items():
if any(keyword in text_lower for keyword in keywords):
return intent
return 'unknown'
def analyze_sentiment(self, text):
"""Analyze sentiment of user input"""
if self.sentiment_analyzer:
try:
result = self.sentiment_analyzer(text)
return result[0]['label']
except Exception as e:
print(f"Sentiment analysis error: {e}")
# Simple fallback sentiment analysis
positive_words = ['good', 'great', 'awesome', 'love', 'like', 'happy']
negative_words = ['bad', 'terrible', 'hate', 'dislike', 'sad', 'angry']
text_lower = text.lower()
positive_count = sum(1 for word in positive_words if word in text_lower)
negative_count = sum(1 for word in negative_words if word in text_lower)
if positive_count > negative_count:
return 'POSITIVE'
elif negative_count > positive_count:
return 'NEGATIVE'
else:
return 'NEUTRAL'
def generate_response(self, text, intent, sentiment):
"""Generate appropriate response based on analysis"""
responses = {
'greeting': [
"Hello! How can I help you today?",
"Hi there! I'm excited to interact with you!",
"Good day! What would you like to do?"
],
'movement': [
"I'll move as requested.",
"Moving now!",
"Executing movement command."
],
'question': [
"That's an interesting question. Let me think about it.",
"I'll do my best to help answer that.",
"Great question! Here's what I know..."
],
'compliment': [
"Thank you! That means a lot to me.",
"I appreciate your kind words!",
"You're very kind, thank you!"
],
'goodbye': [
"Goodbye! It was nice talking with you.",
"See you later! Have a great day!",
"Farewell! Come back anytime."
]
}
# Adjust response based on sentiment
if sentiment == 'NEGATIVE':
return "I sense you might be frustrated. How can I help make things better?"
elif sentiment == 'POSITIVE':
return "I'm glad you're feeling positive! " + responses.get(intent, ["How can I help?"])[0]
# Return intent-based response
intent_responses = responses.get(intent, ["I understand. How can I assist you?"])
return intent_responses[0] # Simple selection - could be randomized
Reinforcement Learning for Adaptive Behavior
Reinforcement learning enables your robot to learn and improve its behavior through interaction with the environment, creating truly adaptive and intelligent systems.
Basic RL Implementation
Implement a simple Q-learning system that allows your robot to learn optimal behaviors for specific tasks.
import numpy as np
import pickle
from collections import defaultdict
class RobotQLearning:
def __init__(self, actions, learning_rate=0.1, discount_factor=0.9, epsilon=0.1):
self.actions = actions
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.epsilon = epsilon
self.q_table = defaultdict(lambda: defaultdict(float))
self.episode_rewards = []
def get_state(self, sensor_data):
"""Convert sensor data to state representation"""
# Discretize continuous sensor values
proximity = "close" if sensor_data.get('proximity', 100) < 30 else "far"
person_detected = "yes" if sensor_data.get('person_detected', False) else "no"
interaction_state = sensor_data.get('interaction_state', 'idle')
return f"{proximity}_{person_detected}_{interaction_state}"
def choose_action(self, state):
"""Choose action using epsilon-greedy policy"""
if np.random.random() < self.epsilon:
# Explore: choose random action
return np.random.choice(self.actions)
else:
# Exploit: choose best known action
q_values = [self.q_table[state][action] for action in self.actions]
best_action_index = np.argmax(q_values)
return self.actions[best_action_index]
def update_q_value(self, state, action, reward, next_state):
"""Update Q-value using Q-learning update rule"""
current_q = self.q_table[state][action]
max_next_q = max([self.q_table[next_state][a] for a in self.actions])
new_q = current_q + self.learning_rate * (
reward + self.discount_factor * max_next_q - current_q
)
self.q_table[state][action] = new_q
def calculate_reward(self, action, result, context):
"""Calculate reward based on action outcome"""
reward = 0
if action == "greet_person" and result == "positive_response":
reward = 10
elif action == "greet_person" and result == "no_response":
reward = -2
elif action == "avoid_obstacle" and result == "successful":
reward = 5
elif action == "avoid_obstacle" and result == "collision":
reward = -10
elif action == "idle" and context.get("person_present"):
reward = -1 # Encourage interaction when person is present
elif action == "idle" and not context.get("person_present"):
reward = 1 # Reward for not wasting energy
return reward
def save_model(self, filename):
"""Save Q-table to file"""
with open(filename, 'wb') as f:
pickle.dump(dict(self.q_table), f)
def load_model(self, filename):
"""Load Q-table from file"""
try:
with open(filename, 'rb') as f:
loaded_q_table = pickle.load(f)
self.q_table = defaultdict(lambda: defaultdict(float), loaded_q_table)
return True
except FileNotFoundError:
print(f"Model file {filename} not found. Starting with empty Q-table.")
return False
class AdaptiveBehaviorSystem:
def __init__(self, reachy):
self.reachy = reachy
self.actions = [
"greet_person", "idle", "explore", "avoid_obstacle",
"look_around", "approach_object", "retreat"
]
self.rl_agent = RobotQLearning(self.actions)
self.current_state = None
self.last_action = None
self.learning_enabled = True
def run_adaptive_behavior_loop(self):
"""Main loop for adaptive behavior learning"""
while True:
# Get current sensor data
sensor_data = self.get_sensor_data()
# Convert to state representation
current_state = self.rl_agent.get_state(sensor_data)
# Choose and execute action
action = self.rl_agent.choose_action(current_state)
result = self.execute_action(action)
# Calculate reward and update Q-values if learning is enabled
if self.learning_enabled and self.current_state is not None:
reward = self.rl_agent.calculate_reward(
self.last_action, result, sensor_data
)
self.rl_agent.update_q_value(
self.current_state, self.last_action, reward, current_state
)
# Update state and action history
self.current_state = current_state
self.last_action = action
time.sleep(0.5) # Control loop frequency
def execute_action(self, action):
"""Execute the chosen action and return result"""
try:
if action == "greet_person":
# Implementation for greeting behavior
self.reachy.head.look_at(0.3, 0.0, 0.0, duration=1.0)
return "positive_response" # Simplified - would need actual detection
elif action == "look_around":
# Scan environment
self.reachy.head.look_at(0.0, 0.3, 0.0, duration=1.0)
time.sleep(0.5)
self.reachy.head.look_at(0.0, -0.3, 0.0, duration=1.0)
return "successful"
elif action == "idle":
# Return to neutral position
self.reachy.head.look_at(0.0, 0.0, 0.0, duration=2.0)
return "successful"
else:
print(f"Action {action} not implemented")
return "unknown"
except Exception as e:
print(f"Error executing action {action}: {e}")
return "error"
Model Optimization and Deployment
Optimizing ML models for robotics applications requires balancing accuracy, inference speed, and resource usage. Various optimization techniques can significantly improve performance on edge devices.
Optimization Techniques: Model quantization, pruning, knowledge distillation, ONNX conversion, and TensorRT optimization for GPU acceleration.
Model Quantization Example
Quantization reduces model size and inference time while maintaining acceptable accuracy levels.
import tensorflow as tf
def quantize_tensorflow_model(model_path, quantized_path):
"""Quantize TensorFlow model for faster inference"""
# Load the model
model = tf.keras.models.load_model(model_path)
# Create quantization converter
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
# Additional quantization settings
converter.representative_dataset = generate_representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
# Convert the model
quantized_model = converter.convert()
# Save quantized model
with open(quantized_path, 'wb') as f:
f.write(quantized_model)
print(f"Model quantized and saved to {quantized_path}")
# Compare model sizes
original_size = os.path.getsize(model_path)
quantized_size = len(quantized_model)
print(f"Original model size: {original_size / (1024*1024):.2f} MB")
print(f"Quantized model size: {quantized_size / (1024*1024):.2f} MB")
print(f"Size reduction: {((original_size - quantized_size) / original_size) * 100:.1f}%")
def generate_representative_dataset():
"""Generate representative dataset for quantization"""
for _ in range(100):
# Generate dummy data matching your model's input shape
data = np.random.random((1, 224, 224, 3)).astype(np.float32)
yield [data]
class OptimizedInferenceEngine:
def __init__(self):
self.models = {}
self.benchmark_results = {}
def load_optimized_model(self, model_name, model_path, model_type='tflite'):
"""Load optimized model for inference"""
try:
if model_type == 'tflite':
interpreter = tf.lite.Interpreter(model_path=model_path)
interpreter.allocate_tensors()
self.models[model_name] = {
'interpreter': interpreter,
'type': 'tflite'
}
elif model_type == 'onnx':
import onnxruntime
session = onnxruntime.InferenceSession(model_path)
self.models[model_name] = {
'session': session,
'type': 'onnx'
}
print(f"Optimized model {model_name} loaded successfully")
return True
except Exception as e:
print(f"Failed to load optimized model: {e}")
return False
def benchmark_model(self, model_name, input_data, num_runs=100):
"""Benchmark model performance"""
if model_name not in self.models:
return None
model_info = self.models[model_name]
inference_times = []
for _ in range(num_runs):
start_time = time.time()
if model_info['type'] == 'tflite':
interpreter = model_info['interpreter']
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
output = interpreter.get_tensor(output_details[0]['index'])
elif model_info['type'] == 'onnx':
session = model_info['session']
input_name = session.get_inputs()[0].name
output = session.run(None, {input_name: input_data})
inference_time = time.time() - start_time
inference_times.append(inference_time)
# Calculate statistics
avg_time = np.mean(inference_times)
std_time = np.std(inference_times)
min_time = np.min(inference_times)
max_time = np.max(inference_times)
benchmark_result = {
'average_time': avg_time,
'std_time': std_time,
'min_time': min_time,
'max_time': max_time,
'fps': 1.0 / avg_time
}
self.benchmark_results[model_name] = benchmark_result
print(f"Benchmark results for {model_name}:")
print(f" Average inference time: {avg_time*1000:.2f} ms")
print(f" FPS: {benchmark_result['fps']:.1f}")
return benchmark_result
Conclusion
Machine learning integration transforms Reachy Mini into a truly intelligent robotic platform capable of perception, understanding, and adaptive behavior. By combining computer vision, natural language processing, and reinforcement learning, you create sophisticated applications that can learn and improve over time.
The key to successful ML integration lies in careful model selection, proper optimization for edge deployment, and thoughtful system design that balances capability with computational constraints. Start with simpler models and gradually increase complexity as your applications mature.
Future Directions: Explore federated learning for multi-robot systems, investigate neuromorphic computing for ultra-low power AI, and consider hybrid cloud-edge architectures for advanced capabilities.