SLAM

Shahaboddin Ghajari (sg2367) and William Smith (wds68)

12/12/2019

Video

Objective

In this project we are going to build a robot that can map its environment and keep track of its own location in that area. In order to generate a map, the robot needs to know its location, and to know its location, it should have the map. That is one of the main challenges of this project. We are planning to map a room. One of the possible solutions is to keep track of the robot location by keeping its speed constant. Meanwhile, we can use ultrasound sensors to detect walls and obstacles in the room.

Introduction

We have used the robot platform that we built in the lab, including two servo motors. To observe the environment, we have used three RCWL-1601 ultrasound modules. Moreover, we installed an Adafruit IMU (BNO055) to keep track of the robot heading and room temperature. The temperature reading is used to calculate the speed of sound. A sweeping algorithm is implemented for robot movement to map its environment.

Design & Testing

Robot from the side
Robot from the top

We assembled the robot platform that we had in Lab 3 as the base of our robot. It included the Raspberry Pi, two continuous rotation servo motors, and the PiTFT. We added three ultrasound sensors, one in front of the robot and two on its sides. Since we had space limitations on the platform we had to design tiny boards for the ultrasound sensors to connect them to the Raspberry Pi. We had two sets of headers, one 10k resistor and two 1k resistors on the tiny boards. One of the headers was used to connect the board to Raspberry Pi and the other was connected to the sensor. We used velcro and electrical tape to connect the tiny boards to the platform. RCWL-1601 ultrasound sensors were used in this design since these were available in the lab. Moreover, they required a 3 V supply voltage which was provided by the Pi. These sensors have a limited range of measurements. They were accurate up to 65 cm to some extent. They also have a 15 degree measurement cone. We decided to use a resolution of 5 cm for our map; therefore the maximum distance that we could measure with these sensors was limited to 35 cm. Otherwise, it was possible to make errors in positioning the obstacles.


We used BNO055 (IMU) to check the heading of the robot. Moreover, using the built-in temperature sensor in this chip, it was possible to continuously monitor the temperature and adjust the distance measurement based on the speed of sound.


In order to calibrate the motor speeds, we tried to come up with a software solution. Using the IMU we check the heading of the robot. If it is moving forward and the heading is changing, the robot changes the speed of one of the motors to make the robot move straight forward. This approach is fairly effective at making the robot drive straight, but it is not perfect. The important part of our design was that the robot does not rely on driving straight. Instead, it keeps track of its position by integrating its open-loop instantaneous velocity (i.e. an estimate based on the PWM values delivered to the motors) in combination with its heading from the IMU. The robot makes observations of the world relative to its current position estimate. Thus exact movement is not necessary.


In order to make sure that the robot has visited all the room, we decided to sweep the whole room. The robot starts to move forward. When it detects a wall it turns by 90 degrees, moves forward a little bit and turns 90 degrees again. Therefore, the robot ideally traverses the entire room. In practice the robot does not cover the entire room in the desired manner because its movements are not sufficiently precise, but this movement pattern is still a good approximation for achieving reasonable coverage of the room.


If the front ultrasound sensor senses an object in 30 cm or less, it considers it as a wall. The side sensors also monitor the environment to detect objects as far as 35 cm. If the side objects are too close to the robot, it tries to avoid collision by turning 30 degrees.


Visualizer display

We have designed a live visualizer to display the map both on the PiTFT or on a monitor if someone SSH into the Pi. We have a grid of 5 meters by 5 meters with a resolution of 5 cm. The visualizer tries to keep the heading of the robot on the display constant; therefore, its surrounding rotates when the robot changes its heading. The obstacles are displayed by red dots. The free space is displayed by green dots, and the robot path is displayed by white dots.


Results

The robot successfully navigates the room and builds a map of the world. Features such as walls, chairs, and people are recognizable in the constructed map. There are lots of issues though.


First, the position information is highly susceptible to drift, so when the robot passes the same wall twice, it may place it in two different locations (although the performance is better than expected). This drift is exacerbated as time passes.


The ultrasound sensors have numerous issues. For one, they have a maximum experimental range of about 60 cm. They are supposed to have a much longer range but we were not able to get reliable data over larger ranges. Sometimes there are issues where the sensor is angled down too much and this causes the sensor to see the floor as a wall. We attempted to correct this by taping the sensors in place, but there are still more false positives that we would like even with two layers of software filtering in place.


Sometimes the robot runs into a wall and gets stuck. This is not supposed to happen, but occasionally the ultrasound sensors fail to detect the wall for whatever reason, particularly when the robot approaches the wall at oblique angles. A problem is that the sensors also have a minimum range of about 5 cm, so once the robot gets too close it can’t see the wall at all. We are normally able to run the robot for several minutes before it inevitably gets stuck, but this is still an unfortunate and serious issue.


We originally planned to build encoders for the motors to more accurately determine position, but we did not have time to do this. This would have helped a bit with the drifting of the position data.


We planned to have a more sophisticated movement algorithm than what we currently have implemented. In particular, we tried to make the robot follow the walls. We ran into significant issues when attempting to do this which probably could have been remedied with more time.

Conclusion

As stated above, the robot is capable of building reasonably accurate maps of the world.


In the future, we are likely to stay away from ultrasound sensors because they proved too fickle for the accuracy that we needed. Perhaps laser range finders would have worked better.


We know that more sophisticated mapping techniques (i.e. actual probabilistic SLAM with Kalman filters and the like) would have produced a more accurate map of the world, but we are still presently surprised with the relative quality of maps that we were able to produce. We did not try navigating using the maps, which would likely prove infeasible.


Future Work

It is possible to use more efficient algorithms to search the room. For example, the robot can use its map to make sure it does not visit one area twice.


By adding more sensors, such as some switches, or some IR sensors, the robot will be able to detect a possible collision and prevent it. Moreover, instead of ultrasound sensors, laser distance meters can be used, which are more accurate.

Budget

  • Raspberry Pi (provided in lab) ($35)
  • Servo motors (provided in lab) ($17.99 x2)
  • Ultrasound sensors (provided in lab) ($3.95 x3)
  • IMU (provided in lab) ($33.99)
  • Wires and resistors (provided in lab)
  • All the parts were provided in the lab, and we did not spend anything.

Contributions

Shahab:

  • Hardware implementation
  • Ultrasound board implementation
  • Ultrasound sensor setup
  • Sweeping algorithm

Will:

  • Software implementation
  • Visualizer
  • Multiprocessing setup
  • Python coding

Code Appendix

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# collision.py

# Helper functions for deciding if there are walls near.
# We were planning to expand this to be based on the map, but
# presently it is based solely on current sensor readings.

import time
import math

import shm

def is_wall_front():
    reading = shm.ultrasound_dist[0]
    return reading != -1 and reading < 0.30

def is_wall_left():
    reading = shm.ultrasound_dist[2]
    return reading != -1 and reading < 0.12

def is_wall_right():
    reading = shm.ultrasound_dist[1]
    return reading != -1 and reading < 0.30

def is_wall_front_close():
    reading = shm.ultrasound_dist[0]
    return reading != -1 and reading < 0.25

def left_wall_too_far():
    reading = shm.ultrasound_dist[2]
    return reading == -1 or reading > 0.30

def left_wall_too_close():
    reading = shm.ultrasound_dist[2]
    return reading != -1 and reading < 0.22

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# config.py

# Global configuration parameters.

import math

class Ultrasound():
    def __init__(self, gpio_in, gpio_out, offx, offy, theta):
        self.gpio_in = gpio_in
        self.gpio_out = gpio_out
        self.offx = offx
        self.offy = offy
        self.theta = theta

ULTRASOUND = [
    Ultrasound(13, 19, 0.1, 0.0, 0.0), # forward
    Ultrasound(16, 24, 0.0, 0.1, math.pi/2), # right
    Ultrasound(12, 21, 0.0, -0.1, -math.pi/2), # left
]

# Channels used for motors
MOTOR_LEFT = 6
MOTOR_RIGHT = 5

# number of boxes (n x n)
GRID_SIZE = 200
# size of box, in meters
GRID_RESOLUTION = 0.05

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# drive.py

#!/usr/bin/env python3

# Control PWM (based on code from earlier labs).
# Includes code for calibrating left/right tuning on the
# fly to try to drive in straight lines.

import RPi.GPIO as gpio
import sys
import time
import math

import config
import shm

from movement import *

gpio.setmode(gpio.BCM)
pins = []

def calc_freq_duty_cycle(speed):
    """
    Returns (freq, duty_cycle) for the given speed in [-1, 1]
    """
    assert -1 <= speed <= 1
    pulse_width = 1.5 + 0.2 * speed
    freq = 1000 / (20 + pulse_width)
    duty_cycle = pulse_width / (20 + pulse_width) * 100
    return (freq, duty_cycle)

def set_speed(pin, speed, print_out=False):
    freq, duty_cycle = calc_freq_duty_cycle(speed)
    if print_out:
        print('f: {}'.format(str(freq)))
        print('dc: {}'.format(str(duty_cycle)))
    pin.ChangeFrequency(freq)
    pin.ChangeDutyCycle(duty_cycle)

def stop(pin):
    pin.ChangeDutyCycle(0)

def set_speed_or_stop(pin, speed):
    """
    Set a PWM speed, but disable motor if the speed is zero
    to prevent twitching.
    """
    if speed == 0:
        stop(pin)
    else:
        set_speed(pin, speed)

def init_motor(pin_num):
    global pins
    gpio.setup(pin_num, gpio.OUT)
    pin = gpio.PWM(pin_num, 1)
    pin.start(0)
    pins.append(pin)
    return pin

def setup_button(pin_num, callback):
    gpio.setup(pin_num, gpio.IN, pull_up_down=gpio.PUD_UP)
    gpio.add_event_detect(pin_num, gpio.FALLING, \
                          callback=lambda _: callback(), bouncetime=300)

def cleanup():
    for pin in pins:
        pin.stop()
    gpio.cleanup()

def main():
    left = init_motor(config.MOTOR_LEFT)
    right = init_motor(config.MOTOR_RIGHT)

    # Ratio between left and right output multipliers.
    # This is constantly updated to try to maintain driving straight.
    right_left_tuning_ratio = 1.0

    # From movement.py.
    task = search()

    def drive(speed_left, speed_right):
        """
        Drive at a certain speed, but apply the on-the-fly calibrations.
        """
        if speed_left == speed_right:
            # Only apply calibration if we are trying to drive straight.
            # Make the bigger one equal to 1 and scale the other accordingly.
            if right_left_tuning_ratio > 1:
                right_tuning = 1.0
                left_tuning = 1.0 / right_left_tuning_ratio
            else:
                left_tuning = 1.0
                right_tuning = right_left_tuning_ratio
        else:
            right_tuning = 1.0
            left_tuning = 1.0
        set_speed_or_stop(left, speed_left * left_tuning)
        set_speed_or_stop(right, -speed_right * right_tuning)
        # Update current SHM values so that we can integrate
        # to find position in position.py.
        shm.motor_left.value = speed_left
        shm.motor_right.value = speed_right

    def stop_drive():
        drive(0, 0)

    # it doesn't work right away for some reason
    time.sleep(2)

    # Used for detecting deviations from straight movement
    last_heading = None

    while True:
        time.sleep(0.01)
        if task.finished:
            # Stop when we are done
            stop_drive()
            last_heading = None
        else:
            task.tick(drive)

            if shm.motor_left.value == shm.motor_right.value:
                if last_heading is not None:
                    heading = shm.heading.value
                    angle_diff = math.atan2(math.sin(heading - last_heading), \
                                            math.cos(heading - last_heading))
                    calib_coef = 0.8
                    # Update calibrations if we are drifting
                    # away from a straight line
                    if angle_diff > 0:
                        right_left_tuning_ratio *= calib_coef
                    elif angle_diff < 0:
                        right_left_tuning_ratio /= calib_coef

                last_heading = shm.heading.value
            else:
                last_heading = None

# This lets us run this file directly,
# but that was only for testing at the beginning
if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        cleanup()
                  

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# imu.py

# Reads from IMU using Adafruit library and writes values to SHM.
# Note that the IMU heading only has a precision of +/- 5 degrees.

import time
import math

import adafruit_bno055
from busio import I2C
from board import SCL, SDA

import shm

def main():
    # Sometimes the IMU fails to initialize at first, so keep
    # trying until it loads correctly.
    while True:
        try:
            i2c = I2C(SCL, SDA)
            sensor = adafruit_bno055.BNO055(i2c)
            break
        except:
            print('Failed to initialize IMU, trying again')
            time.sleep(0.1)

    try:
        while True:
            heading = sensor.euler[0]
            if heading is not None:
                shm.heading.value = -math.radians(heading) + math.pi * 2
            linear_acc = sensor.linear_acceleration
            if linear_acc is not None and all([x is not None for x in linear_acc]):
                shm.linear_acc.value = math.sqrt(sum([x**2 for x in linear_acc[:]]))
            temperature = sensor.temperature
            if temperature is not None:
                shm.temperature.value = temperature
            # Update 100 times per second
            time.sleep(0.01)
    finally:
        pass
                  

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# mapper.py

# Utility functions for reading from and writing to the map.

# There are a lot of different coordinate systems at play here:
#  - Position: world coordinates, relative to where robot started
#      and independent of current heading
#  - Robot coordinates: relative to robot's current body frame
#      (not used in this file)
#  - Coords: coordinates in the grid map
#  - Index: index into the grid array, because the grid is stored
#      as a flattened 1D array

import sys
import time
import math

import shm
from config import GRID_SIZE, GRID_RESOLUTION

# For probabilities, not actually used in final.
POSITIVE_WEIGHT = 10
NEGATIVE_WEIGHT = 1

def position_to_coords(x, y):
    return (round(GRID_SIZE // 2 + x / GRID_RESOLUTION),
            round(GRID_SIZE // 2 + y / GRID_RESOLUTION))

def coords_to_position(x, y):
    return ((x - GRID_SIZE // 2) * GRID_RESOLUTION,
            (y - GRID_SIZE // 2) * GRID_RESOLUTION)

def coords_to_index(x, y):
    return int(x + y * GRID_SIZE)

def index_to_coords(i):
    return (i % GRID_SIZE, i // GRID_SIZE)

def position_to_index(x, y):
    return coords_to_index(*position_to_coords(x, y))

def index_to_position(i):
    return coords_to_position(*index_to_coords(i))

def observe(wx, wy, seen):
    """
    Make an observation at the provided position (world coordinates).
    seen=True for a wall, seen=False for the absence of a wall.
    """
    grid = shm.grid_positive if seen else shm.grid_negative
    gx, gy = position_to_coords(wx, wy)
    index = coords_to_index(gx, gy)
    # Don't make out-of-bounds observations
    if 0 <= gx < GRID_SIZE and \
       0 <= gy < GRID_SIZE and \
       0 <= index < GRID_SIZE**2:
        with grid.get_lock():
            grid[index] += 1

def totals_from_coords(x, y):
    index = coords_to_index(x, y)
    return shm.grid_positive[index], shm.grid_negative[index]

def totals_from_position(x, y):
    index = position_to_index(x, y)
    return shm.grid_positive[index], shm.grid_negative[index]

def wall_probability_from_index(index):
    """
    Calculate the approximate probability that there is a wall at a given
    index. Not used in the final, but we were planning to use it.
    """
    pos = shm.grid_positive[index] * POSITIVE_WEIGHT
    neg = shm.grid_negative[index] * NEGATIVE_WEIGHT

    return pos / (pos + neg) if (pos + neg) > 0 else -1

def wall_probability_from_coords(x, y):
    return wall_probability_from_index(coords_to_index(x, y))

def wall_probability_from_position(x, y):
    return wall_probability_from_index(position_to_index(x, y))

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# master.py

#!/usr/bin/env python3

# This is the main process that starts all of the other processes.

from multiprocessing import Process
import sys
import time

import importlib

import RPi.GPIO as gpio

import shm
import config

# True to enable visualizer, False will not use the display 
visualize = True
# True to run on PiTFT, False to run over ssh (with X-forwarding enabled)
touch_screen = 'ssh' not in sys.argv

class Service():
    def __init__(self, fname, args=[], main='main'):
        self.fname = fname
        self.args = args
        self.main = main

services = [
    Service('imu'),
    Service('position'),
    Service('observe'),
    Service('drive'),
]
if visualize:
    services.append(Service('visualizer'))
for i, _ in enumerate(config.ULTRASOUND):
    services.append(Service('ultrasound', args=[i]))

def init_escape(shutdown_func):
    # escape button

    gpio.setmode(gpio.BCM)
    gpio.setup(17, gpio.IN, pull_up_down=gpio.PUD_UP)

    gpio.add_event_detect(17, gpio.FALLING, \
                          callback=lambda _: shutdown_func(), \
                          bouncetime=300)

def main():
    if visualize:
        # We need to initialize pygame in the main process
        # so that it works correctly.
        import pygame
        from visualizer import init_pygame
        init_pygame(touch_screen)

    processes = []

    def shutdown():
        # There are issues with processes not shutting down correctly,
        # but the combination of these approaches seems to do the trick.
        shm.quit_flag.value = True
        if visualize:
            pygame.display.quit()
            pygame.quit()
        gpio.cleanup()
        for process in processes:
            process.terminate()
        sys.exit(0)

    init_escape(shutdown)

    for service in services:
        # Start each service as a separate process
        mod = importlib.import_module(service.fname)
        process = Process(target=getattr(mod, service.main), args=service.args)
        processes.append(process)
        process.start()

    try:
        while True:
            # We put things here for debugging purposes,
            # but it is empty in the final.
            time.sleep(1)
    except KeyboardInterrupt:
        shutdown()

if __name__ == '__main__':
    main()

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# movement.py

# Contains movement logic building blocks and the main movement pattern.

import time
import math

import shm
import collision

class Task():
    """
    Base class for tasks.
    """

    def __init__(self):
        self.finished = False

    def tick(self, callback):
        pass

    def finish(self):
        self.finished = True

class MovementTask(Task):
    """
    Move each motor a certain amount for a certain number of ticks.
    """

    def __init__(self, left_speed, right_speed, ticks):
        super().__init__()
        self.left_speed = left_speed
        self.right_speed = right_speed
        self.ticks = ticks

    def tick(self, callback):
        callback(self.left_speed, self.right_speed)
        self.ticks -= 1
        if self.ticks <= 0:
            self.finish()

class SequentialTask(Task):
    """
    Run a series of tasks in sequence.
    """

    def __init__(self, *tasks):
        super().__init__()
        self.tasks = list(tasks)

    def tick(self, callback):
        if len(self.tasks) == 0:
            self.finish()
            return
        elif self.tasks[0].finished:
            del self.tasks[0]
        
        if len(self.tasks) > 0:
            self.tasks[0].tick(callback)

class WhileTask(Task):
    """
    Run a task as long as a condition is true.
    """

    def __init__(self, task_func, condition):
        super().__init__()
        self.task_func = task_func
        self.task = None
        self.condition = condition

    def tick(self, callback):
        if not self.condition():
            self.finish()
        else:
            if self.task is None or self.task.finished:
                self.task = self.task_func()
            self.task.tick(callback)

class LambdaTask(Task):
    """
    Run a task that should not be initialized until it is started.
    """

    def __init__(self, task_func):
        super().__init__()
        self.task_func = task_func
        self.task = None

    def tick(self, callback):
        if self.task is None:
            self.task = self.task_func()
        self.task.tick(callback)
        if self.task.finished:
            self.finish()

class LogTask(Task):
    """
    Print a log message.
    """

    def __init__(self, message):
        super().__init__()
        self.message = message

    def tick(self, callback):
        print(self.message)
        self.finish()

def relative_heading(relative):
    """
    Change heading by a specified amount relative to the current heading.
    Uses the actual heading readings from the IMU to check.
    Note that IMU heading resolution is only +/- 5 degrees.
    """
    def getter():
        direction = 1 if relative < 0 else -1
        current = shm.heading.value
        target = current + math.radians(relative)
        if target < 0:
            target += math.pi * 2
        elif target > math.pi * 2:
            target -= math.pi * 2
        def check():
            heading = shm.heading.value
            angle_diff = math.atan2(math.sin(target - heading), \
                                    math.cos(target - heading))
            # Stop once we get close enough to the target heading
            return abs(angle_diff) > 0.1
        return WhileTask(lambda: MovementTask(0.2 * direction, \
                                              -0.2 * direction, \
                                              math.inf), check)
    return LambdaTask(getter)

forward = lambda time: MovementTask(0.3, 0.3, time * 100)
stop = lambda: MovementTask(0, 0, 0)
turn_right = lambda time: MovementTask(0.3, -0.3, time * 100)
turn_left = lambda time: MovementTask(-0.3, 0.3, time * 100)
wait = lambda time: MovementTask(0, 0, time * 100)
ninety_left = lambda: relative_heading(90)
ninety_right = lambda: relative_heading(-90)
log = lambda msg: LogTask(msg)

def hysteresis(times, func, inv):
    """
    Turn a boolean function into one with hysteresis, i.e. one that checks
    for consistency of its output. This filter requires the function to
    consistently return True counter times in a row.
    """
    counter = 0
    def checker():
        nonlocal counter
        res = func()
        if res != inv:
            counter += 1
        else:
            counter = 0
        return (counter > times) != inv
    return checker

# Move back and forth
pattern = lambda: WhileTask(
    lambda: SequentialTask(
        ninety_right(),
        forward(5),
        ninety_left(),
        forward(0.5),
        ninety_left(),
        forward(5),
        ninety_right(),
        forward(0.5),
    ), lambda: True
)

# Attempt to follow a wall to the left. Doesn't work and not used in final.
follow_wall = lambda: SequentialTask(
    WhileTask(
        lambda: MovementTask(0.3, 0.3, math.inf),
        lambda: not collision.is_wall_front_close(),
    ),
    log('found a wall'),
    WhileTask(
        lambda: MovementTask(0.2, -0.2, math.inf),
        lambda: collision.left_wall_too_far(),
    ),
    log('aligned with wall'),
    WhileTask(
        lambda: SequentialTask(
            WhileTask(
                lambda: SequentialTask(
                    WhileTask(
                        lambda: MovementTask(0.3, 0.3, math.inf),
                        lambda: not collision.left_wall_too_far(),
                    ),
                    log('got too far'),
                    relative_heading(10),
                    WhileTask(
                        lambda: MovementTask(0.2, -0.2, math.inf),
                        lambda: collision.left_wall_too_far(),
                    )
                ),
                lambda: not collision.left_wall_too_close(),
            ),
            log('got too close'),
            relative_heading(-10),
            WhileTask(
                lambda: MovementTask(-0.2, 0.2, math.inf),
                lambda: collision.left_wall_too_close() or \
                collision.is_wall_front_close(),
            )
        ),
        lambda: True,
    ),
)

# Used to alternate turning directions so that we don't
# trace out the same path repeatedly.
direction = 1
def get_turn():
    global direction
    direction = -direction
    return SequentialTask(
        relative_heading(90 * direction),
        forward(1.0),
        relative_heading(90 * direction),
    )

# Bounce between walls to try to cover the entire room.
wall_finder = lambda: WhileTask(
    lambda: SequentialTask(
        log('searching for wall'),
        WhileTask(
            lambda: MovementTask(0.3, 0.3, math.inf),
            hysteresis(3, lambda: not collision.is_wall_front(), True),
        ),
        log('saw a wall in front'),
        get_turn(),
    ), lambda: True
)

# Twitch the heading at the start to wait for motors, IMU etc. to load.
search = lambda: SequentialTask(
    relative_heading(10),
    relative_heading(-10),
    wall_finder(),
)
                  

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# observe.py

# Make observations based on ultrasound readings and current position estimate.
# Observes seen walls and absence of walls between robot and readings.

import sys
import time
import math

import config
import shm
import mapper
import ultrasound

def main():
    while True:
        for i, ultra in enumerate(config.ULTRASOUND):
            with shm.pos.get_lock():
                x, y = shm.pos[0], shm.pos[1]
                heading = shm.heading.value

            dist = shm.ultrasound_dist[i]

            if dist != -1:
                # We see a wall, so observe it

                # robot frame
                rtx = -ultra.offx + dist * math.cos(-ultra.theta)
                rty = -ultra.offy + dist * math.sin(-ultra.theta)

                # global frame
                tx = x + rtx * math.cos(heading) - rty * math.sin(heading)
                ty = y + rtx * math.sin(heading) + rty * math.cos(heading)

                mapper.observe(tx, ty, True)
            else:
                # We don't see a wall, but still calculate how far we can see

                rtx = -ultra.offx + ultrasound.MAX_DIST * math.cos(-ultra.theta)
                rty = -ultra.offy + ultrasound.MAX_DIST * math.sin(-ultra.theta)
                tx = x + rtx * math.cos(heading) - rty * math.sin(heading)
                ty = y + rty * math.sin(heading) + rty * math.cos(heading)

            grid_pos = mapper.position_to_coords(*shm.pos[:])
            grid_x, grid_y = int(grid_pos[0]), int(grid_pos[1])
            target_pos = mapper.position_to_coords(tx, ty)
            tx, ty = int(target_pos[0]), int(target_pos[1])

            # Observe the absence of walls
            for xs_x in range(min(grid_x, tx), max(grid_x, tx) + 1):
                if tx - grid_x != 0:
                    tgnt = (ty - grid_y) / (tx - grid_x)
                    ys_x = (xs_x - grid_x) * tgnt + grid_y
                    posx, posy = mapper.coords_to_position(xs_x, ys_x)
                    mapper.observe(posx, posy, False)
            for ys_y in range(min(grid_y, ty), max(grid_y, ty) + 1):
                if ty - grid_y != 0:
                    tgnt = (tx - grid_x) / (ty - grid_y)
                    xs_y = (ys_y - grid_y) * tgnt + grid_x
                    posx, posy = mapper.coords_to_position(xs_y, ys_y)
                    mapper.observe(posx, posy, False)

        time.sleep(0.1)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# position.py

# Continuously estimate current position based on
# PWM values and current heading.

import sys
import time
import math

import shm

TIME_STEP = 0.01
SPEED_TO_METERS = 0.5
BASE_WIDTH = 0.10

def main():
    while True:
        with shm.pos.get_lock():
            x, y = shm.pos[0], shm.pos[1]
        sl, sr = shm.motor_left.value, shm.motor_right.value
        heading = shm.heading.value

        # differential drive, hopefully

        # robot frame
        dxr = (sl + sr) / 2 * SPEED_TO_METERS
        dyr = 0 # TODO this is not correct but I don't feel like doing math right now

        # global frame
        dx = dxr * math.cos(heading) - dyr * math.sin(heading)
        dy = dxr * math.sin(heading) + dyr * math.cos(heading)

        with shm.pos.get_lock():
            shm.pos[0] = x + dx * TIME_STEP
            shm.pos[1] = y + dy * TIME_STEP

        time.sleep(TIME_STEP)
                  

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# pwm_calibrate.py

#!/usr/bin/env python3

# We use this script for adjusting motor calibrations with a screwdriver.

import RPi.GPIO as gpio
import sys
import time

pin_nums = [5, 6]

gpio.setmode(gpio.BCM)
pins = []

def calc_freq_duty_cycle(speed):
    """
    Returns (freq, duty_cycle) for the given speed in [-1, 1]
    """
    assert -1 <= speed <= 1
    pulse_width = 1.5 + 0.2 * speed
    freq = 1000 / (20 + pulse_width)
    duty_cycle = pulse_width / (20 + pulse_width) * 100
    return (freq, duty_cycle)

def set_speed(speed):
    freq, duty_cycle = calc_freq_duty_cycle(speed)
    print('f: {}'.format(str(freq)))
    print('dc: {}'.format(str(duty_cycle)))
    for pin in pins:
        pin.ChangeFrequency(freq)
        pin.ChangeDutyCycle(duty_cycle)

def main():
    if len(sys.argv) > 1:
        speed = float(sys.argv[1])
    else:
        speed = 0

    global pins
    for pin_num in pin_nums:
        gpio.setup(pin_num, gpio.OUT)
        pin = gpio.PWM(pin_num, 1)
        pin.start(50)
        pins.append(pin)
    set_speed(speed)

    while True:
        time.sleep(1)

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        for pin in pins:
            pin.stop()
        gpio.cleanup()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# shm.py

# Shared memory between processes.

from multiprocessing import Value, Array
from ctypes import c_float, c_int, c_bool

import config

# Current distance readings from ultrasound sensors.
ultrasound_dist = Array(c_float, len(config.ULTRASOUND))

# Current readings from IMU.
heading = Value(c_float)
linear_acc = Value(c_float)
temperature = Value(c_float)

# Current PWM values.
motor_left = Value(c_float)
motor_right = Value(c_float)

# Current position (x, y).
pos = Array(c_float, 2)

# Positive observations
grid_positive = Array(c_int, config.GRID_SIZE**2)
# Negative observations
grid_negative = Array(c_int, config.GRID_SIZE**2)

# Used for killing visualizer.
quit_flag = Value(c_bool)
                  

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# ultrasound.py

#!/usr/bin/env python3

# Continuously read distances from ultrasound sensors.

import RPi.GPIO as gpio
import sys
import time

import shm
from config import ULTRASOUND

# The number of samples we take per cycle
SAMPLES = 5
# The number of samples returning readings we need to make an observation
SAMPLES_ACCEPTED = 4

# How close the sensors can see
MIN_DIST = 0.05
# How far the sensors can see
MAX_DIST = 0.35

out = None

def main(index):
    config = ULTRASOUND[index]

    shm.ultrasound_dist[index] = -1.0

    global out
    gpio.setmode(gpio.BCM)
    gpio.setup(config.gpio_out, gpio.OUT)
    gpio.setup(config.gpio_in, gpio.IN, pull_up_down=gpio.PUD_DOWN)
    out = gpio.PWM(config.gpio_out, 100)
    out.start(0.1)

    try:
        while True:
            samples = []
            # Calculate speed of sound using current temperature reading
            speed_of_sound = 331.3 + 0.606 * shm.temperature.value
            for _ in range(SAMPLES):
                gpio.wait_for_edge(config.gpio_in, gpio.RISING)
                start = time.time()
                gpio.wait_for_edge(config.gpio_in, gpio.FALLING)
                # Calculate distance based on speed of sound and time of pulse
                value = (time.time() - start) * speed_of_sound / 2
                if MIN_DIST < value < MAX_DIST:
                    samples.append(value)
            # Average multiple samples; report -1.0 if we are out of range
            avg = sum(samples) / len(samples) if len(samples) > SAMPLES_ACCEPTED else -1.0
            shm.ultrasound_dist[index] = avg
            time.sleep(0.1)
    finally:
        out.stop()
        gpio.cleanup()

if __name__ == '__main__':
    if len(sys.argv) >= 2:
        index = int(sys.argv[1])
    else:
        index = 0
    wrapper(index)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# visualizer.py

#!/usr/bin/env python3

# Display map on PiTFT or a window over SSH using pygame.

import sys
import os
import time
import math

import numpy as np

import pygame
from pygame.locals import *

from config import GRID_SIZE, GRID_RESOLUTION
import shm

import mapper

# How many pixels per tile
TILE_SIZE = 2

screen = None
font = None
width = None
height = None

def init_pygame(touch_screen=True):
    """
    This needs to get called separately so that pygame is initialized in the main process
    """
    global screen, font, width, height

    if touch_screen:
        os.putenv('SDL_VIDEODRIVER', 'fbcon') # Display on piTFT
        os.putenv('SDL_FBDEV', '/dev/fb1')
        os.putenv('SDL_MOUSEDRV', 'TSLIB') # Track mouse clicks on piTFT
        os.putenv('SDL_MOUSEDEV', '/dev/input/touchscreen')

    pygame.init()

    font = pygame.font.SysFont('Arial', 15)

    if touch_screen:
        pygame.mouse.set_visible(False)

    width, height = size = (320, 240)

    screen = pygame.display.set_mode(size)

def main():
    if None in [screen, font, width, height]:
        raise Exception("Didn't call init_pygame before calling visualizer.main!")

    start = time.time()

    center = np.array([width / 2, height / 2])

    robot_color = (255, 255, 255)
    wall_color = (255, 0, 0)

    history = []

    def rotate(mat, theta):
        rot = np.matrix([[np.cos(theta), -np.sin(theta)],
                         [np.sin(theta), np.cos(theta)]])
        return np.matmul(mat, rot)

    def rect(x, y, w, h):
        return [[x, y], [x + w, y], [x + w, y + h], [x, y + h]]

    frame = 0

    while True:
        screen.fill((0, 0, 0))

        # Coordinate system is flipped on screen
        heading = -shm.heading.value
        pos = shm.pos[:]

        # Draw all grid points
        for x in range(GRID_SIZE):
            for y in range(GRID_SIZE):
                pos_obs, neg_obs = mapper.totals_from_coords(x, y)
                # Draw a wall if present, otherwise draw absent wall if present
                if pos_obs > 0:
                    pts = rect((x - GRID_SIZE // 2 - pos[0] / GRID_RESOLUTION) * TILE_SIZE,
                               -(y - GRID_SIZE // 2 - pos[1] / GRID_RESOLUTION) * TILE_SIZE, 2, 2)
                    pygame.draw.polygon(screen, (255, 0, 0), center + np.array(rotate(pts, heading)))
                elif neg_obs > 0:
                    pts = rect((x - GRID_SIZE // 2 - pos[0] / GRID_RESOLUTION) * TILE_SIZE,
                               -(y - GRID_SIZE // 2 - pos[1] / GRID_RESOLUTION) * TILE_SIZE, 2, 2)
                    pygame.draw.polygon(screen, (0, 255, 0), center + np.array(rotate(pts, heading)))

        # Display position history
        for (x, y) in history[:]:
            pos = shm.pos[:]
            pts = rect((x - pos[0]) / GRID_RESOLUTION * TILE_SIZE,
                       -(y - pos[1]) / GRID_RESOLUTION * TILE_SIZE, 2, 2)
            pygame.draw.polygon(screen, (255, 255, 255), center + np.array(rotate(pts, heading)))

        # Draw robot
        pts = np.matrix([[-10, -5], [-10, 5], [10, 0]])
        pygame.draw.polygon(screen, robot_color, center + np.array(pts))

        pygame.display.flip()

        # Record position history
        if frame % 2 == 0:
            history.append(shm.pos[:])

        time.sleep(0.1)

        frame += 1

        if shm.quit_flag.value:
            sys.exit(0)
            raise Exception()

if __name__ == '__main__':
    init_pygame()
    main()