I started this subsumption chat on the tail end of another thread, but thought it better to start a new thread.
My first attempt at a subsumption behavioural model is video-ed below., trying to self navigate a wall, scanning for the best route out.
Subsumption was a very early pre-AI robot behavioral control model and requires minimal processing power.
It’s not ROS based as I had originally planned to do, as its too much stuff for me to integrate whilst I’m learning. I’ll possibly do that later.
It’s a wanderer at it’s lowest level, overridden at the next level by voice commands and also not walking into a wall! There is some memory function in there to not repeat failed escape patterns. It’s not a serious bit of code just a dev step for my own education.
There’s a few bits of random sounds to give a bit of personality. I used a cheap USB microphone from amazon (other re-sellers are also available!) to gather voice commands. If you don’t have a mic, then just comment out the voice init and threads.
I’ll add more subsumption levels and see how it progresses. If there’s any interest, I’ll share the new code as I progress. If no interest I’ll just keep it for the grand kids!
There seems to be a bug in the threading somewhere where voice commands don’t always get a response.. just like a real dog
And here’s the code if anyone wishes to have a play. Apologies to any SW engineers out there, this would be far better as a python package, not a single file, but I have no idea how to put that onto a forum like this.
Careful on stairs and tables, it will fall off!
See my 2nd posting below for a slightly more realistic version
It will also need any missing libraries installing as referenced in the python import statements and the vosk model referenced therein also installing from here VOSK Models
And don’t forget to back up your SD card first! As unfortunately it’s necessary to use --break-system-packages to install vosk unless using a venv
Hopefully I’ve managed to keep the indentations correct with the cut and paste..
import random
import queue
import threading
import time
from collections import deque
from pidog import Pidog
from vosk import Model, KaldiRecognizer
import pygame
from vilib import Vilib
import sounddevice as sd
from scipy.signal import resample_poly
import numpy as np
import json
# Constants
USB_MIC_INDEX = 2 # USB mic device id
INPUT_SR = 44100 # input sample rate from mic
VOSK_SR = 16000 # Vosk expects 16kHz audio
CHANNELS = 1
BLOCKSIZE = 8020 # frames per callback
SOUNDS_PATH = "/home/robot/pidog/sounds/"
OBSTACLE_DISTANCE_CM = 30 # Stop and avoid if object is closer than this
FORWARD_SPEED = 98 #Walk speed, slow him down when debugging!!
TURN_SPEED = 98
BACKWARD_TIME = 1.0 # Time in seconds to move backward
TURN_TIME = 0.6 # Time in seconds to turn
FORWARD_INTERVAL = 0.2 # How often to check while moving forward
# Initialize pygame mixer for playing mp3 sounds
pygame.mixer.init()
def load_sound(filename):
return pygame.mixer.Sound(SOUNDS_PATH + filename)
# Sounds dictionary for emotions
SOUNDS = {
"happy": load_sound("single_bark_1.mp3"),
"curious": load_sound("woohoo.mp3"),
"startled": load_sound("growl_1.mp3"),
"bored": load_sound("snoring.mp3"),
"lonely": load_sound("howling.mp3"),
}
#Global inits
dog = Pidog()
OKToWalk = threading.Event()
OKToWalk.set()
voice_command_queue = queue.Queue()
###############################################
def play_sound(sound):
if sound:
sound.play()
###############################################
def voice_recognition_worker():
model = Model("/home/robot/pidog/examples/vosk-model-small-en-us-0.15")
rec = KaldiRecognizer(model, VOSK_SR)
audio_queue = queue.Queue()
def audio_callback(indata, frames, time_info, status):
if status:
print("Sounddevice status:", status)
audio_queue.put(indata.copy())
with sd.InputStream(device=USB_MIC_INDEX, channels=CHANNELS,
samplerate=INPUT_SR, blocksize=BLOCKSIZE,
dtype='int16', callback=audio_callback):
print("Voice recognition started")
while True:
raw_audio = audio_queue.get()
#May need to add some filters here as servos can be noisy at hi speed, saturating voice commands
audio_16k = resample_poly(raw_audio.flatten(), VOSK_SR, INPUT_SR)
audio_16k_int16 = np.int16(audio_16k)
if rec.AcceptWaveform(audio_16k_int16.tobytes()):
result = json.loads(rec.Result())
text = result.get("text", "")
if text:
print("Voice input recognized:", text)
voice_command_queue.put(text)
else:
# partial = rec.PartialResult() # Uncomment to debug partial
pass
###############################################
#Just a mini function to min typing for voice command handler
def indiv_commands(operation):
OKToWalk.clear()
dog.legs_stop()
time.sleep(1)
print("Command: ", operation)
dog.do_action(operation)
dog.wait_legs_done()
time.sleep(5)
OKToWalk.set()
######################################################
#Servo noise makes it difficult for pidog to hear clearly, will try a different vosk model (huge!)
#Or som kind of filtering TBD
def voice_command_handler():
while True:
command = voice_command_queue.get()
cmd_lower = command.lower()
if "sit" in cmd_lower or "six" in cmd_lower or "sick" in cmd_lower or "fit" in cmd_lower:
indiv_commands('sit')
elif "stand" in cmd_lower:
indiv_commands('stand')
elif "walk" in cmd_lower:
indiv_commands('forward')
elif "stretch" in cmd_lower:
indiv_commands('stretch')
elif "push up" in cmd_lower or "push-up" in cmd_lower:
indiv_commands('push_up')
elif "hand" in cmd_lower or "shake" in cmd_lower:
indiv_commands('handshake')
elif "stop" in cmd_lower:
OKToWalk.clear()
dog.legs_stop()
time.sleep(1)
print("Command: Emergency stop not resuming wandering")
else:
print("Unknown command:", command)
######################################################
#The default low heirarchy behaviour in my subsumption model
class PiDogWanderWrapper:
def __init__(self):
self.dog = dog
self.running = True
self.turn_history = deque(maxlen=5)
self.emotions = ["happy", "curious", "startled", "bored", "lonely"]
self.last_emotion_time = time.time()
self.emotion_interval = 30 # seconds between emotion triggers
def get_distance(self):
dist = self.dog.read_distance()
if dist is None or dist < 0 or dist > 150:
return None
return dist
def scan_direction(self, head_yaw):
self.dog.head_move([(head_yaw, 0, 0)], immediately=True, speed=50)
time.sleep(0.8)
distance = self.get_distance()
if distance is None:
distance = 999.0
print(f"Scanned {head_yaw:+}°: {distance:.1f} cm")
return distance
def turn_smart(self):
left_distance = self.scan_direction(-30)
right_distance = self.scan_direction(30)
left_count = self.turn_history.count("left")
right_count = self.turn_history.count("right")
print(f"[Smart Turn] Left: {left_distance:.1f} cm, Right: {right_distance:.1f} cm")
print(f"[Turn History] Left: {left_count}, Right: {right_count}")
preferred = "left" if left_distance > right_distance else "right"
if self.turn_history.count(preferred) >= 3:
preferred = "left" if preferred == "right" else "right"
print("[Smart Turn] Switching direction to avoid repeating turns.")
if preferred == "left":
print("Smart turning left")
self.dog.do_action("turn_left", speed=TURN_SPEED)
self.turn_history.append("left")
else:
print("Smart turning right")
self.dog.do_action("turn_right", speed=TURN_SPEED)
self.turn_history.append("right")
dog.wait_legs_done()
time.sleep(TURN_TIME)
self.stop()
def stand_up(self):
self.dog.do_action("stand", speed=70)
self.dog.wait_all_done()
def move_forward(self):
self.dog.do_action("forward", speed=FORWARD_SPEED)
def stop(self):
self.dog.body_stop()
self.dog.wait_all_done()
def backup(self, duration=BACKWARD_TIME):
self.dog.do_action("backward", speed=FORWARD_SPEED)
time.sleep(duration)
self.dog.wait_legs_done()
def play_emotion(self, emotion):
sound = SOUNDS.get(emotion)
if sound:
print(f"Emotion triggered: {emotion}")
play_sound(sound)
# Simple servo reactions for emotion (optional)
if emotion == "happy":
self.dog.head_move([(0, 0, 10)], immediately=True, speed=100)
elif emotion == "curious":
self.dog.head_move([(-20, 0, 10)], immediately=True, speed=80)
time.sleep(0.5)
self.dog.head_move([(20, 0, 10)], immediately=True, speed=80)
time.sleep(0.5)
self.dog.head_move([(0, 0, 10)], immediately=True, speed=80)
elif emotion == "startled":
self.dog.do_action("shake_head")
elif emotion == "bored":
# maybe a slow nod ?
pass
elif emotion == "lonely":
# maybe a slow wag tail or look around?
pass
def wander(self):
print("Starting autonomous wandering mode...")
self.stand_up()
while self.running:
#Suspend walk behaviour while other higher priorities such as voice are executing
OKToWalk.wait()
distance = self.get_distance()
if distance is not None and distance < OBSTACLE_DISTANCE_CM:
print("Obstacle detected! Avoiding...")
self.stop()
self.backup()
self.turn_smart()
self.play_emotion("startled")
else:
self.move_forward()
self.dog.wait_legs_done()
# Occasionally trigger random emotions during idle wandering
now = time.time()
if now - self.last_emotion_time > self.emotion_interval:
emotion = random.choice(self.emotions)
self.play_emotion(emotion)
self.last_emotion_time = now
time.sleep(FORWARD_INTERVAL)
def shutdown(self):
print("Shutting down dog...")
self.stop()
self.dog.do_action("sit", speed=70)
self.dog.close()
######################################################
def start_dog():
print("Starting PiDog subsumption system...")
wanderer = PiDogWanderWrapper()
dog.head_move([(0, 0, 10)], immediately=True, speed=50)
# Start threads
threading.Thread(target=voice_recognition_worker, daemon=True).start()
threading.Thread(target=voice_command_handler, daemon=True).start()
threading.Thread(target=wanderer.wander, daemon=True).start()
print("PiDog is ready and listening...")
# Keep main thread alive
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
wanderer.shutdown()
if __name__ == "__main__":
start_dog()