Machine Learning Integration with Reachy Mini

Transform your Reachy Mini into an intelligent AI-powered robot by integrating state-of-the-art machine learning models for computer vision, natural language processing, and autonomous decision-making.

Machine learning integration elevates Reachy Mini from a programmable robot to an intelligent system capable of learning, adapting, and making autonomous decisions. This comprehensive guide explores how to implement various ML models, optimize inference performance, and create truly intelligent robotic behaviors.

Machine Learning Architecture for Robotics

Successful ML integration in robotics requires careful consideration of computational constraints, real-time performance requirements, and model accuracy trade-offs. Reachy Mini's computational capabilities allow for both edge inference and cloud-based processing depending on your application needs.

Key Considerations: Model size vs accuracy, inference latency, power consumption, network connectivity requirements, and the ability to update models over time.

Setting Up the ML Environment

Before implementing machine learning models, establish a robust development environment that supports multiple frameworks and efficient model deployment.

# Install required ML libraries pip install tensorflow torch torchvision opencv-python scikit-learn pip install transformers huggingface-hub onnx onnxruntime pip install mediapipe ultralytics import tensorflow as tf import torch import cv2 import numpy as np from reachy_sdk import ReachySDK import threading import time from collections import deque class MLIntegrationManager: def __init__(self, reachy): self.reachy = reachy self.models = {} self.inference_queue = deque(maxlen=10) self.results_cache = {} self.performance_metrics = {} def load_model(self, model_name, model_path, framework='tensorflow'): """Load ML model with specified framework""" try: if framework == 'tensorflow': model = tf.keras.models.load_model(model_path) elif framework == 'pytorch': model = torch.load(model_path) model.eval() elif framework == 'onnx': import onnxruntime model = onnxruntime.InferenceSession(model_path) self.models[model_name] = { 'model': model, 'framework': framework, 'loaded_time': time.time() } print(f"Model {model_name} loaded successfully using {framework}") return True except Exception as e: print(f"Failed to load model {model_name}: {e}") return False def preprocess_input(self, input_data, model_name): """Preprocess input data for specific model requirements""" if model_name in ['object_detection', 'face_recognition']: # Image preprocessing if isinstance(input_data, np.ndarray): # Resize and normalize processed = cv2.resize(input_data, (224, 224)) processed = processed.astype(np.float32) / 255.0 return np.expand_dims(processed, axis=0) elif model_name in ['nlp_intent', 'conversation']: # Text preprocessing return self.tokenize_text(input_data) return input_data def run_inference(self, model_name, input_data): """Run inference with specified model""" if model_name not in self.models: return None start_time = time.time() model_info = self.models[model_name] model = model_info['model'] framework = model_info['framework'] # Preprocess input processed_input = self.preprocess_input(input_data, model_name) try: if framework == 'tensorflow': predictions = model.predict(processed_input) elif framework == 'pytorch': with torch.no_grad(): tensor_input = torch.FloatTensor(processed_input) predictions = model(tensor_input).numpy() elif framework == 'onnx': input_name = model.get_inputs()[0].name predictions = model.run(None, {input_name: processed_input}) # Record performance metrics inference_time = time.time() - start_time self.record_performance(model_name, inference_time, True) return self.postprocess_output(predictions, model_name) except Exception as e: print(f"Inference error for {model_name}: {e}") self.record_performance(model_name, time.time() - start_time, False) return None

Computer Vision Integration

Computer vision capabilities transform how your robot perceives and interacts with its environment. From object detection to facial recognition, these models enable rich visual understanding.

Real-Time Object Detection

Implement real-time object detection using YOLO or similar models to enable your robot to identify and track objects in its environment.

from ultralytics import YOLO import cv2 class ObjectDetectionSystem: def __init__(self, reachy): self.reachy = reachy self.model = YOLO('yolov8n.pt') # Load YOLOv8 nano model self.detection_history = deque(maxlen=30) # Store last 30 frames self.tracking_objects = {} def detect_objects_in_frame(self, frame): """Detect objects in a single frame""" results = self.model(frame) detections = [] for result in results: boxes = result.boxes if boxes is not None: for box in boxes: # Extract detection information x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() confidence = box.conf[0].cpu().numpy() class_id = int(box.cls[0].cpu().numpy()) class_name = self.model.names[class_id] if confidence > 0.5: # Confidence threshold detection = { 'bbox': (int(x1), int(y1), int(x2), int(y2)), 'confidence': float(confidence), 'class': class_name, 'class_id': class_id } detections.append(detection) return detections def track_objects_continuous(self): """Continuously track objects using robot camera""" cap = cv2.VideoCapture(0) # Use appropriate camera index while True: ret, frame = cap.read() if not ret: continue # Detect objects in current frame detections = self.detect_objects_in_frame(frame) # Process detections for robot response self.process_detections(detections, frame) # Store detection history self.detection_history.append({ 'timestamp': time.time(), 'detections': detections, 'frame_shape': frame.shape }) # Optional: Display annotated frame for debugging annotated_frame = self.annotate_frame(frame, detections) cv2.imshow('Reachy Vision', annotated_frame) if cv2.waitKey(1) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() def process_detections(self, detections, frame): """Process detections and trigger appropriate robot responses""" for detection in detections: class_name = detection['class'] bbox = detection['bbox'] confidence = detection['confidence'] # Calculate object center for robot attention center_x = (bbox[0] + bbox[2]) // 2 center_y = (bbox[1] + bbox[3]) // 2 # Convert to robot coordinate system robot_target = self.pixel_to_robot_coordinates( center_x, center_y, frame.shape ) # Trigger appropriate behavior based on object class if class_name == 'person' and confidence > 0.8: self.respond_to_person(robot_target) elif class_name in ['cup', 'bottle'] and confidence > 0.7: self.respond_to_object(class_name, robot_target) elif class_name == 'book' and confidence > 0.6: self.respond_to_book(robot_target) def pixel_to_robot_coordinates(self, pixel_x, pixel_y, frame_shape): """Convert pixel coordinates to robot coordinate system""" height, width = frame_shape[:2] # Normalize to [-1, 1] range norm_x = (pixel_x - width/2) / (width/2) norm_y = (pixel_y - height/2) / (height/2) # Convert to robot coordinate system robot_x = norm_x * 0.3 # Scale to robot workspace robot_y = -norm_y * 0.3 # Invert Y axis robot_z = 0.1 # Default height return (robot_x, robot_y, robot_z) def respond_to_person(self, target): """Respond to person detection""" try: # Look towards the person self.reachy.head.look_at(target[0], target[1], target[2], duration=1.0) print(f"Person detected, looking at {target}") except Exception as e: print(f"Error responding to person: {e}")

Natural Language Processing Integration

NLP integration enables your robot to understand and respond to natural language commands, creating more intuitive human-robot interactions.

Performance Note: Large language models can be computationally intensive. Consider using quantized models or cloud APIs for complex NLP tasks while maintaining simple on-device processing for basic commands.

Intent Recognition and Response Generation

Implement sophisticated intent recognition that can understand context and generate appropriate responses for natural conversation.

from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification import torch class NLPProcessor: def __init__(self): self.intent_classifier = None self.text_generator = None self.sentiment_analyzer = None self.conversation_history = [] self.setup_models() def setup_models(self): """Initialize NLP models""" try: # Intent classification model self.intent_classifier = pipeline( "text-classification", model="microsoft/DialoGPT-medium", device=0 if torch.cuda.is_available() else -1 ) # Sentiment analysis self.sentiment_analyzer = pipeline( "sentiment-analysis", model="cardiffnlp/twitter-roberta-base-sentiment-latest", device=0 if torch.cuda.is_available() else -1 ) # Text generation for responses self.text_generator = pipeline( "text-generation", model="gpt2", max_length=100, device=0 if torch.cuda.is_available() else -1 ) print("NLP models loaded successfully") except Exception as e: print(f"Error loading NLP models: {e}") self.setup_fallback_models() def setup_fallback_models(self): """Setup simpler models if main models fail to load""" self.simple_intents = { 'greeting': ['hello', 'hi', 'hey', 'good morning', 'good afternoon'], 'movement': ['move', 'go', 'turn', 'look', 'point'], 'question': ['what', 'why', 'how', 'when', 'where', 'who'], 'goodbye': ['bye', 'goodbye', 'see you', 'farewell'], 'compliment': ['good', 'great', 'awesome', 'nice', 'excellent'], 'request': ['please', 'can you', 'could you', 'would you'] } def process_user_input(self, user_text): """Process user input and generate appropriate response""" # Clean and normalize input cleaned_text = self.preprocess_text(user_text) # Analyze sentiment sentiment = self.analyze_sentiment(cleaned_text) # Classify intent intent = self.classify_intent(cleaned_text) # Generate response based on intent and sentiment response = self.generate_response(cleaned_text, intent, sentiment) # Store in conversation history self.conversation_history.append({ 'timestamp': time.time(), 'user_input': user_text, 'cleaned_text': cleaned_text, 'intent': intent, 'sentiment': sentiment, 'bot_response': response }) return { 'response': response, 'intent': intent, 'sentiment': sentiment, 'confidence': 0.8 # Placeholder - use actual model confidence } def classify_intent(self, text): """Classify the intent of user input""" if self.intent_classifier: try: result = self.intent_classifier(text) return result[0]['label'] except Exception as e: print(f"Intent classification error: {e}") # Fallback to simple keyword matching text_lower = text.lower() for intent, keywords in self.simple_intents.items(): if any(keyword in text_lower for keyword in keywords): return intent return 'unknown' def analyze_sentiment(self, text): """Analyze sentiment of user input""" if self.sentiment_analyzer: try: result = self.sentiment_analyzer(text) return result[0]['label'] except Exception as e: print(f"Sentiment analysis error: {e}") # Simple fallback sentiment analysis positive_words = ['good', 'great', 'awesome', 'love', 'like', 'happy'] negative_words = ['bad', 'terrible', 'hate', 'dislike', 'sad', 'angry'] text_lower = text.lower() positive_count = sum(1 for word in positive_words if word in text_lower) negative_count = sum(1 for word in negative_words if word in text_lower) if positive_count > negative_count: return 'POSITIVE' elif negative_count > positive_count: return 'NEGATIVE' else: return 'NEUTRAL' def generate_response(self, text, intent, sentiment): """Generate appropriate response based on analysis""" responses = { 'greeting': [ "Hello! How can I help you today?", "Hi there! I'm excited to interact with you!", "Good day! What would you like to do?" ], 'movement': [ "I'll move as requested.", "Moving now!", "Executing movement command." ], 'question': [ "That's an interesting question. Let me think about it.", "I'll do my best to help answer that.", "Great question! Here's what I know..." ], 'compliment': [ "Thank you! That means a lot to me.", "I appreciate your kind words!", "You're very kind, thank you!" ], 'goodbye': [ "Goodbye! It was nice talking with you.", "See you later! Have a great day!", "Farewell! Come back anytime." ] } # Adjust response based on sentiment if sentiment == 'NEGATIVE': return "I sense you might be frustrated. How can I help make things better?" elif sentiment == 'POSITIVE': return "I'm glad you're feeling positive! " + responses.get(intent, ["How can I help?"])[0] # Return intent-based response intent_responses = responses.get(intent, ["I understand. How can I assist you?"]) return intent_responses[0] # Simple selection - could be randomized

Reinforcement Learning for Adaptive Behavior

Reinforcement learning enables your robot to learn and improve its behavior through interaction with the environment, creating truly adaptive and intelligent systems.

Basic RL Implementation

Implement a simple Q-learning system that allows your robot to learn optimal behaviors for specific tasks.

import numpy as np import pickle from collections import defaultdict class RobotQLearning: def __init__(self, actions, learning_rate=0.1, discount_factor=0.9, epsilon=0.1): self.actions = actions self.learning_rate = learning_rate self.discount_factor = discount_factor self.epsilon = epsilon self.q_table = defaultdict(lambda: defaultdict(float)) self.episode_rewards = [] def get_state(self, sensor_data): """Convert sensor data to state representation""" # Discretize continuous sensor values proximity = "close" if sensor_data.get('proximity', 100) < 30 else "far" person_detected = "yes" if sensor_data.get('person_detected', False) else "no" interaction_state = sensor_data.get('interaction_state', 'idle') return f"{proximity}_{person_detected}_{interaction_state}" def choose_action(self, state): """Choose action using epsilon-greedy policy""" if np.random.random() < self.epsilon: # Explore: choose random action return np.random.choice(self.actions) else: # Exploit: choose best known action q_values = [self.q_table[state][action] for action in self.actions] best_action_index = np.argmax(q_values) return self.actions[best_action_index] def update_q_value(self, state, action, reward, next_state): """Update Q-value using Q-learning update rule""" current_q = self.q_table[state][action] max_next_q = max([self.q_table[next_state][a] for a in self.actions]) new_q = current_q + self.learning_rate * ( reward + self.discount_factor * max_next_q - current_q ) self.q_table[state][action] = new_q def calculate_reward(self, action, result, context): """Calculate reward based on action outcome""" reward = 0 if action == "greet_person" and result == "positive_response": reward = 10 elif action == "greet_person" and result == "no_response": reward = -2 elif action == "avoid_obstacle" and result == "successful": reward = 5 elif action == "avoid_obstacle" and result == "collision": reward = -10 elif action == "idle" and context.get("person_present"): reward = -1 # Encourage interaction when person is present elif action == "idle" and not context.get("person_present"): reward = 1 # Reward for not wasting energy return reward def save_model(self, filename): """Save Q-table to file""" with open(filename, 'wb') as f: pickle.dump(dict(self.q_table), f) def load_model(self, filename): """Load Q-table from file""" try: with open(filename, 'rb') as f: loaded_q_table = pickle.load(f) self.q_table = defaultdict(lambda: defaultdict(float), loaded_q_table) return True except FileNotFoundError: print(f"Model file {filename} not found. Starting with empty Q-table.") return False class AdaptiveBehaviorSystem: def __init__(self, reachy): self.reachy = reachy self.actions = [ "greet_person", "idle", "explore", "avoid_obstacle", "look_around", "approach_object", "retreat" ] self.rl_agent = RobotQLearning(self.actions) self.current_state = None self.last_action = None self.learning_enabled = True def run_adaptive_behavior_loop(self): """Main loop for adaptive behavior learning""" while True: # Get current sensor data sensor_data = self.get_sensor_data() # Convert to state representation current_state = self.rl_agent.get_state(sensor_data) # Choose and execute action action = self.rl_agent.choose_action(current_state) result = self.execute_action(action) # Calculate reward and update Q-values if learning is enabled if self.learning_enabled and self.current_state is not None: reward = self.rl_agent.calculate_reward( self.last_action, result, sensor_data ) self.rl_agent.update_q_value( self.current_state, self.last_action, reward, current_state ) # Update state and action history self.current_state = current_state self.last_action = action time.sleep(0.5) # Control loop frequency def execute_action(self, action): """Execute the chosen action and return result""" try: if action == "greet_person": # Implementation for greeting behavior self.reachy.head.look_at(0.3, 0.0, 0.0, duration=1.0) return "positive_response" # Simplified - would need actual detection elif action == "look_around": # Scan environment self.reachy.head.look_at(0.0, 0.3, 0.0, duration=1.0) time.sleep(0.5) self.reachy.head.look_at(0.0, -0.3, 0.0, duration=1.0) return "successful" elif action == "idle": # Return to neutral position self.reachy.head.look_at(0.0, 0.0, 0.0, duration=2.0) return "successful" else: print(f"Action {action} not implemented") return "unknown" except Exception as e: print(f"Error executing action {action}: {e}") return "error"

Model Optimization and Deployment

Optimizing ML models for robotics applications requires balancing accuracy, inference speed, and resource usage. Various optimization techniques can significantly improve performance on edge devices.

Optimization Techniques: Model quantization, pruning, knowledge distillation, ONNX conversion, and TensorRT optimization for GPU acceleration.

Model Quantization Example

Quantization reduces model size and inference time while maintaining acceptable accuracy levels.

import tensorflow as tf def quantize_tensorflow_model(model_path, quantized_path): """Quantize TensorFlow model for faster inference""" # Load the model model = tf.keras.models.load_model(model_path) # Create quantization converter converter = tf.lite.TFLiteConverter.from_keras_model(model) converter.optimizations = [tf.lite.Optimize.DEFAULT] # Additional quantization settings converter.representative_dataset = generate_representative_dataset converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8] converter.inference_input_type = tf.int8 converter.inference_output_type = tf.int8 # Convert the model quantized_model = converter.convert() # Save quantized model with open(quantized_path, 'wb') as f: f.write(quantized_model) print(f"Model quantized and saved to {quantized_path}") # Compare model sizes original_size = os.path.getsize(model_path) quantized_size = len(quantized_model) print(f"Original model size: {original_size / (1024*1024):.2f} MB") print(f"Quantized model size: {quantized_size / (1024*1024):.2f} MB") print(f"Size reduction: {((original_size - quantized_size) / original_size) * 100:.1f}%") def generate_representative_dataset(): """Generate representative dataset for quantization""" for _ in range(100): # Generate dummy data matching your model's input shape data = np.random.random((1, 224, 224, 3)).astype(np.float32) yield [data] class OptimizedInferenceEngine: def __init__(self): self.models = {} self.benchmark_results = {} def load_optimized_model(self, model_name, model_path, model_type='tflite'): """Load optimized model for inference""" try: if model_type == 'tflite': interpreter = tf.lite.Interpreter(model_path=model_path) interpreter.allocate_tensors() self.models[model_name] = { 'interpreter': interpreter, 'type': 'tflite' } elif model_type == 'onnx': import onnxruntime session = onnxruntime.InferenceSession(model_path) self.models[model_name] = { 'session': session, 'type': 'onnx' } print(f"Optimized model {model_name} loaded successfully") return True except Exception as e: print(f"Failed to load optimized model: {e}") return False def benchmark_model(self, model_name, input_data, num_runs=100): """Benchmark model performance""" if model_name not in self.models: return None model_info = self.models[model_name] inference_times = [] for _ in range(num_runs): start_time = time.time() if model_info['type'] == 'tflite': interpreter = model_info['interpreter'] input_details = interpreter.get_input_details() output_details = interpreter.get_output_details() interpreter.set_tensor(input_details[0]['index'], input_data) interpreter.invoke() output = interpreter.get_tensor(output_details[0]['index']) elif model_info['type'] == 'onnx': session = model_info['session'] input_name = session.get_inputs()[0].name output = session.run(None, {input_name: input_data}) inference_time = time.time() - start_time inference_times.append(inference_time) # Calculate statistics avg_time = np.mean(inference_times) std_time = np.std(inference_times) min_time = np.min(inference_times) max_time = np.max(inference_times) benchmark_result = { 'average_time': avg_time, 'std_time': std_time, 'min_time': min_time, 'max_time': max_time, 'fps': 1.0 / avg_time } self.benchmark_results[model_name] = benchmark_result print(f"Benchmark results for {model_name}:") print(f" Average inference time: {avg_time*1000:.2f} ms") print(f" FPS: {benchmark_result['fps']:.1f}") return benchmark_result

Conclusion

Machine learning integration transforms Reachy Mini into a truly intelligent robotic platform capable of perception, understanding, and adaptive behavior. By combining computer vision, natural language processing, and reinforcement learning, you create sophisticated applications that can learn and improve over time.

The key to successful ML integration lies in careful model selection, proper optimization for edge deployment, and thoughtful system design that balances capability with computational constraints. Start with simpler models and gradually increase complexity as your applications mature.

Future Directions: Explore federated learning for multi-robot systems, investigate neuromorphic computing for ultra-low power AI, and consider hybrid cloud-edge architectures for advanced capabilities.