Shahaboddin Ghajari (sg2367) and William Smith (wds68)
12/12/2019
In this project we are going to build a robot that can map its environment and keep track of its own location in that area. In order to generate a map, the robot needs to know its location, and to know its location, it should have the map. That is one of the main challenges of this project. We are planning to map a room. One of the possible solutions is to keep track of the robot location by keeping its speed constant. Meanwhile, we can use ultrasound sensors to detect walls and obstacles in the room.
We have used the robot platform that we built in the lab, including two servo motors. To observe the environment, we have used three RCWL-1601 ultrasound modules. Moreover, we installed an Adafruit IMU (BNO055) to keep track of the robot heading and room temperature. The temperature reading is used to calculate the speed of sound. A sweeping algorithm is implemented for robot movement to map its environment.
We assembled the robot platform that we had in Lab 3 as the base of our robot. It included the Raspberry Pi, two continuous rotation servo motors, and the PiTFT. We added three ultrasound sensors, one in front of the robot and two on its sides. Since we had space limitations on the platform we had to design tiny boards for the ultrasound sensors to connect them to the Raspberry Pi. We had two sets of headers, one 10k resistor and two 1k resistors on the tiny boards. One of the headers was used to connect the board to Raspberry Pi and the other was connected to the sensor. We used velcro and electrical tape to connect the tiny boards to the platform. RCWL-1601 ultrasound sensors were used in this design since these were available in the lab. Moreover, they required a 3 V supply voltage which was provided by the Pi. These sensors have a limited range of measurements. They were accurate up to 65 cm to some extent. They also have a 15 degree measurement cone. We decided to use a resolution of 5 cm for our map; therefore the maximum distance that we could measure with these sensors was limited to 35 cm. Otherwise, it was possible to make errors in positioning the obstacles.
We used BNO055 (IMU) to check the heading of the robot. Moreover, using the built-in temperature sensor in this chip, it was possible to continuously monitor the temperature and adjust the distance measurement based on the speed of sound.
In order to calibrate the motor speeds, we tried to come up with a software solution. Using the IMU we check the heading of the robot. If it is moving forward and the heading is changing, the robot changes the speed of one of the motors to make the robot move straight forward. This approach is fairly effective at making the robot drive straight, but it is not perfect. The important part of our design was that the robot does not rely on driving straight. Instead, it keeps track of its position by integrating its open-loop instantaneous velocity (i.e. an estimate based on the PWM values delivered to the motors) in combination with its heading from the IMU. The robot makes observations of the world relative to its current position estimate. Thus exact movement is not necessary.
In order to make sure that the robot has visited all the room, we decided to sweep the whole room. The robot starts to move forward. When it detects a wall it turns by 90 degrees, moves forward a little bit and turns 90 degrees again. Therefore, the robot ideally traverses the entire room. In practice the robot does not cover the entire room in the desired manner because its movements are not sufficiently precise, but this movement pattern is still a good approximation for achieving reasonable coverage of the room.
If the front ultrasound sensor senses an object in 30 cm or less, it considers it as a wall. The side sensors also monitor the environment to detect objects as far as 35 cm. If the side objects are too close to the robot, it tries to avoid collision by turning 30 degrees.
We have designed a live visualizer to display the map both on the PiTFT or on a monitor if someone SSH into the Pi. We have a grid of 5 meters by 5 meters with a resolution of 5 cm. The visualizer tries to keep the heading of the robot on the display constant; therefore, its surrounding rotates when the robot changes its heading. The obstacles are displayed by red dots. The free space is displayed by green dots, and the robot path is displayed by white dots.
The robot successfully navigates the room and builds a map of the world. Features such as walls, chairs, and people are recognizable in the constructed map. There are lots of issues though.
First, the position information is highly susceptible to drift, so when the robot passes the same wall twice, it may place it in two different locations (although the performance is better than expected). This drift is exacerbated as time passes.
The ultrasound sensors have numerous issues. For one, they have a maximum experimental range of about 60 cm. They are supposed to have a much longer range but we were not able to get reliable data over larger ranges. Sometimes there are issues where the sensor is angled down too much and this causes the sensor to see the floor as a wall. We attempted to correct this by taping the sensors in place, but there are still more false positives that we would like even with two layers of software filtering in place.
Sometimes the robot runs into a wall and gets stuck. This is not supposed to happen, but occasionally the ultrasound sensors fail to detect the wall for whatever reason, particularly when the robot approaches the wall at oblique angles. A problem is that the sensors also have a minimum range of about 5 cm, so once the robot gets too close it can’t see the wall at all. We are normally able to run the robot for several minutes before it inevitably gets stuck, but this is still an unfortunate and serious issue.
We originally planned to build encoders for the motors to more accurately determine position, but we did not have time to do this. This would have helped a bit with the drifting of the position data.
We planned to have a more sophisticated movement algorithm than what we currently have implemented. In particular, we tried to make the robot follow the walls. We ran into significant issues when attempting to do this which probably could have been remedied with more time.
As stated above, the robot is capable of building reasonably accurate maps of the world.
In the future, we are likely to stay away from ultrasound sensors because they proved too fickle for the accuracy that we needed. Perhaps laser range finders would have worked better.
We know that more sophisticated mapping techniques (i.e. actual probabilistic SLAM with Kalman filters and the like) would have produced a more accurate map of the world, but we are still presently surprised with the relative quality of maps that we were able to produce. We did not try navigating using the maps, which would likely prove infeasible.
It is possible to use more efficient algorithms to search the room. For example, the robot can use its map to make sure it does not visit one area twice.
By adding more sensors, such as some switches, or some IR sensors, the robot will be able to detect a possible collision and prevent it. Moreover, instead of ultrasound sensors, laser distance meters can be used, which are more accurate.
Shahab:
Will:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | # collision.py # Helper functions for deciding if there are walls near. # We were planning to expand this to be based on the map, but # presently it is based solely on current sensor readings. import time import math import shm def is_wall_front(): reading = shm.ultrasound_dist[0] return reading != -1 and reading < 0.30 def is_wall_left(): reading = shm.ultrasound_dist[2] return reading != -1 and reading < 0.12 def is_wall_right(): reading = shm.ultrasound_dist[1] return reading != -1 and reading < 0.30 def is_wall_front_close(): reading = shm.ultrasound_dist[0] return reading != -1 and reading < 0.25 def left_wall_too_far(): reading = shm.ultrasound_dist[2] return reading == -1 or reading > 0.30 def left_wall_too_close(): reading = shm.ultrasound_dist[2] return reading != -1 and reading < 0.22 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | # config.py # Global configuration parameters. import math class Ultrasound(): def __init__(self, gpio_in, gpio_out, offx, offy, theta): self.gpio_in = gpio_in self.gpio_out = gpio_out self.offx = offx self.offy = offy self.theta = theta ULTRASOUND = [ Ultrasound(13, 19, 0.1, 0.0, 0.0), # forward Ultrasound(16, 24, 0.0, 0.1, math.pi/2), # right Ultrasound(12, 21, 0.0, -0.1, -math.pi/2), # left ] # Channels used for motors MOTOR_LEFT = 6 MOTOR_RIGHT = 5 # number of boxes (n x n) GRID_SIZE = 200 # size of box, in meters GRID_RESOLUTION = 0.05 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | # drive.py #!/usr/bin/env python3 # Control PWM (based on code from earlier labs). # Includes code for calibrating left/right tuning on the # fly to try to drive in straight lines. import RPi.GPIO as gpio import sys import time import math import config import shm from movement import * gpio.setmode(gpio.BCM) pins = [] def calc_freq_duty_cycle(speed): """ Returns (freq, duty_cycle) for the given speed in [-1, 1] """ assert -1 <= speed <= 1 pulse_width = 1.5 + 0.2 * speed freq = 1000 / (20 + pulse_width) duty_cycle = pulse_width / (20 + pulse_width) * 100 return (freq, duty_cycle) def set_speed(pin, speed, print_out=False): freq, duty_cycle = calc_freq_duty_cycle(speed) if print_out: print('f: {}'.format(str(freq))) print('dc: {}'.format(str(duty_cycle))) pin.ChangeFrequency(freq) pin.ChangeDutyCycle(duty_cycle) def stop(pin): pin.ChangeDutyCycle(0) def set_speed_or_stop(pin, speed): """ Set a PWM speed, but disable motor if the speed is zero to prevent twitching. """ if speed == 0: stop(pin) else: set_speed(pin, speed) def init_motor(pin_num): global pins gpio.setup(pin_num, gpio.OUT) pin = gpio.PWM(pin_num, 1) pin.start(0) pins.append(pin) return pin def setup_button(pin_num, callback): gpio.setup(pin_num, gpio.IN, pull_up_down=gpio.PUD_UP) gpio.add_event_detect(pin_num, gpio.FALLING, \ callback=lambda _: callback(), bouncetime=300) def cleanup(): for pin in pins: pin.stop() gpio.cleanup() def main(): left = init_motor(config.MOTOR_LEFT) right = init_motor(config.MOTOR_RIGHT) # Ratio between left and right output multipliers. # This is constantly updated to try to maintain driving straight. right_left_tuning_ratio = 1.0 # From movement.py. task = search() def drive(speed_left, speed_right): """ Drive at a certain speed, but apply the on-the-fly calibrations. """ if speed_left == speed_right: # Only apply calibration if we are trying to drive straight. # Make the bigger one equal to 1 and scale the other accordingly. if right_left_tuning_ratio > 1: right_tuning = 1.0 left_tuning = 1.0 / right_left_tuning_ratio else: left_tuning = 1.0 right_tuning = right_left_tuning_ratio else: right_tuning = 1.0 left_tuning = 1.0 set_speed_or_stop(left, speed_left * left_tuning) set_speed_or_stop(right, -speed_right * right_tuning) # Update current SHM values so that we can integrate # to find position in position.py. shm.motor_left.value = speed_left shm.motor_right.value = speed_right def stop_drive(): drive(0, 0) # it doesn't work right away for some reason time.sleep(2) # Used for detecting deviations from straight movement last_heading = None while True: time.sleep(0.01) if task.finished: # Stop when we are done stop_drive() last_heading = None else: task.tick(drive) if shm.motor_left.value == shm.motor_right.value: if last_heading is not None: heading = shm.heading.value angle_diff = math.atan2(math.sin(heading - last_heading), \ math.cos(heading - last_heading)) calib_coef = 0.8 # Update calibrations if we are drifting # away from a straight line if angle_diff > 0: right_left_tuning_ratio *= calib_coef elif angle_diff < 0: right_left_tuning_ratio /= calib_coef last_heading = shm.heading.value else: last_heading = None # This lets us run this file directly, # but that was only for testing at the beginning if __name__ == '__main__': try: main() except KeyboardInterrupt: cleanup() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | # imu.py # Reads from IMU using Adafruit library and writes values to SHM. # Note that the IMU heading only has a precision of +/- 5 degrees. import time import math import adafruit_bno055 from busio import I2C from board import SCL, SDA import shm def main(): # Sometimes the IMU fails to initialize at first, so keep # trying until it loads correctly. while True: try: i2c = I2C(SCL, SDA) sensor = adafruit_bno055.BNO055(i2c) break except: print('Failed to initialize IMU, trying again') time.sleep(0.1) try: while True: heading = sensor.euler[0] if heading is not None: shm.heading.value = -math.radians(heading) + math.pi * 2 linear_acc = sensor.linear_acceleration if linear_acc is not None and all([x is not None for x in linear_acc]): shm.linear_acc.value = math.sqrt(sum([x**2 for x in linear_acc[:]])) temperature = sensor.temperature if temperature is not None: shm.temperature.value = temperature # Update 100 times per second time.sleep(0.01) finally: pass |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | # mapper.py # Utility functions for reading from and writing to the map. # There are a lot of different coordinate systems at play here: # - Position: world coordinates, relative to where robot started # and independent of current heading # - Robot coordinates: relative to robot's current body frame # (not used in this file) # - Coords: coordinates in the grid map # - Index: index into the grid array, because the grid is stored # as a flattened 1D array import sys import time import math import shm from config import GRID_SIZE, GRID_RESOLUTION # For probabilities, not actually used in final. POSITIVE_WEIGHT = 10 NEGATIVE_WEIGHT = 1 def position_to_coords(x, y): return (round(GRID_SIZE // 2 + x / GRID_RESOLUTION), round(GRID_SIZE // 2 + y / GRID_RESOLUTION)) def coords_to_position(x, y): return ((x - GRID_SIZE // 2) * GRID_RESOLUTION, (y - GRID_SIZE // 2) * GRID_RESOLUTION) def coords_to_index(x, y): return int(x + y * GRID_SIZE) def index_to_coords(i): return (i % GRID_SIZE, i // GRID_SIZE) def position_to_index(x, y): return coords_to_index(*position_to_coords(x, y)) def index_to_position(i): return coords_to_position(*index_to_coords(i)) def observe(wx, wy, seen): """ Make an observation at the provided position (world coordinates). seen=True for a wall, seen=False for the absence of a wall. """ grid = shm.grid_positive if seen else shm.grid_negative gx, gy = position_to_coords(wx, wy) index = coords_to_index(gx, gy) # Don't make out-of-bounds observations if 0 <= gx < GRID_SIZE and \ 0 <= gy < GRID_SIZE and \ 0 <= index < GRID_SIZE**2: with grid.get_lock(): grid[index] += 1 def totals_from_coords(x, y): index = coords_to_index(x, y) return shm.grid_positive[index], shm.grid_negative[index] def totals_from_position(x, y): index = position_to_index(x, y) return shm.grid_positive[index], shm.grid_negative[index] def wall_probability_from_index(index): """ Calculate the approximate probability that there is a wall at a given index. Not used in the final, but we were planning to use it. """ pos = shm.grid_positive[index] * POSITIVE_WEIGHT neg = shm.grid_negative[index] * NEGATIVE_WEIGHT return pos / (pos + neg) if (pos + neg) > 0 else -1 def wall_probability_from_coords(x, y): return wall_probability_from_index(coords_to_index(x, y)) def wall_probability_from_position(x, y): return wall_probability_from_index(position_to_index(x, y)) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | # master.py #!/usr/bin/env python3 # This is the main process that starts all of the other processes. from multiprocessing import Process import sys import time import importlib import RPi.GPIO as gpio import shm import config # True to enable visualizer, False will not use the display visualize = True # True to run on PiTFT, False to run over ssh (with X-forwarding enabled) touch_screen = 'ssh' not in sys.argv class Service(): def __init__(self, fname, args=[], main='main'): self.fname = fname self.args = args self.main = main services = [ Service('imu'), Service('position'), Service('observe'), Service('drive'), ] if visualize: services.append(Service('visualizer')) for i, _ in enumerate(config.ULTRASOUND): services.append(Service('ultrasound', args=[i])) def init_escape(shutdown_func): # escape button gpio.setmode(gpio.BCM) gpio.setup(17, gpio.IN, pull_up_down=gpio.PUD_UP) gpio.add_event_detect(17, gpio.FALLING, \ callback=lambda _: shutdown_func(), \ bouncetime=300) def main(): if visualize: # We need to initialize pygame in the main process # so that it works correctly. import pygame from visualizer import init_pygame init_pygame(touch_screen) processes = [] def shutdown(): # There are issues with processes not shutting down correctly, # but the combination of these approaches seems to do the trick. shm.quit_flag.value = True if visualize: pygame.display.quit() pygame.quit() gpio.cleanup() for process in processes: process.terminate() sys.exit(0) init_escape(shutdown) for service in services: # Start each service as a separate process mod = importlib.import_module(service.fname) process = Process(target=getattr(mod, service.main), args=service.args) processes.append(process) process.start() try: while True: # We put things here for debugging purposes, # but it is empty in the final. time.sleep(1) except KeyboardInterrupt: shutdown() if __name__ == '__main__': main() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 | # movement.py # Contains movement logic building blocks and the main movement pattern. import time import math import shm import collision class Task(): """ Base class for tasks. """ def __init__(self): self.finished = False def tick(self, callback): pass def finish(self): self.finished = True class MovementTask(Task): """ Move each motor a certain amount for a certain number of ticks. """ def __init__(self, left_speed, right_speed, ticks): super().__init__() self.left_speed = left_speed self.right_speed = right_speed self.ticks = ticks def tick(self, callback): callback(self.left_speed, self.right_speed) self.ticks -= 1 if self.ticks <= 0: self.finish() class SequentialTask(Task): """ Run a series of tasks in sequence. """ def __init__(self, *tasks): super().__init__() self.tasks = list(tasks) def tick(self, callback): if len(self.tasks) == 0: self.finish() return elif self.tasks[0].finished: del self.tasks[0] if len(self.tasks) > 0: self.tasks[0].tick(callback) class WhileTask(Task): """ Run a task as long as a condition is true. """ def __init__(self, task_func, condition): super().__init__() self.task_func = task_func self.task = None self.condition = condition def tick(self, callback): if not self.condition(): self.finish() else: if self.task is None or self.task.finished: self.task = self.task_func() self.task.tick(callback) class LambdaTask(Task): """ Run a task that should not be initialized until it is started. """ def __init__(self, task_func): super().__init__() self.task_func = task_func self.task = None def tick(self, callback): if self.task is None: self.task = self.task_func() self.task.tick(callback) if self.task.finished: self.finish() class LogTask(Task): """ Print a log message. """ def __init__(self, message): super().__init__() self.message = message def tick(self, callback): print(self.message) self.finish() def relative_heading(relative): """ Change heading by a specified amount relative to the current heading. Uses the actual heading readings from the IMU to check. Note that IMU heading resolution is only +/- 5 degrees. """ def getter(): direction = 1 if relative < 0 else -1 current = shm.heading.value target = current + math.radians(relative) if target < 0: target += math.pi * 2 elif target > math.pi * 2: target -= math.pi * 2 def check(): heading = shm.heading.value angle_diff = math.atan2(math.sin(target - heading), \ math.cos(target - heading)) # Stop once we get close enough to the target heading return abs(angle_diff) > 0.1 return WhileTask(lambda: MovementTask(0.2 * direction, \ -0.2 * direction, \ math.inf), check) return LambdaTask(getter) forward = lambda time: MovementTask(0.3, 0.3, time * 100) stop = lambda: MovementTask(0, 0, 0) turn_right = lambda time: MovementTask(0.3, -0.3, time * 100) turn_left = lambda time: MovementTask(-0.3, 0.3, time * 100) wait = lambda time: MovementTask(0, 0, time * 100) ninety_left = lambda: relative_heading(90) ninety_right = lambda: relative_heading(-90) log = lambda msg: LogTask(msg) def hysteresis(times, func, inv): """ Turn a boolean function into one with hysteresis, i.e. one that checks for consistency of its output. This filter requires the function to consistently return True counter times in a row. """ counter = 0 def checker(): nonlocal counter res = func() if res != inv: counter += 1 else: counter = 0 return (counter > times) != inv return checker # Move back and forth pattern = lambda: WhileTask( lambda: SequentialTask( ninety_right(), forward(5), ninety_left(), forward(0.5), ninety_left(), forward(5), ninety_right(), forward(0.5), ), lambda: True ) # Attempt to follow a wall to the left. Doesn't work and not used in final. follow_wall = lambda: SequentialTask( WhileTask( lambda: MovementTask(0.3, 0.3, math.inf), lambda: not collision.is_wall_front_close(), ), log('found a wall'), WhileTask( lambda: MovementTask(0.2, -0.2, math.inf), lambda: collision.left_wall_too_far(), ), log('aligned with wall'), WhileTask( lambda: SequentialTask( WhileTask( lambda: SequentialTask( WhileTask( lambda: MovementTask(0.3, 0.3, math.inf), lambda: not collision.left_wall_too_far(), ), log('got too far'), relative_heading(10), WhileTask( lambda: MovementTask(0.2, -0.2, math.inf), lambda: collision.left_wall_too_far(), ) ), lambda: not collision.left_wall_too_close(), ), log('got too close'), relative_heading(-10), WhileTask( lambda: MovementTask(-0.2, 0.2, math.inf), lambda: collision.left_wall_too_close() or \ collision.is_wall_front_close(), ) ), lambda: True, ), ) # Used to alternate turning directions so that we don't # trace out the same path repeatedly. direction = 1 def get_turn(): global direction direction = -direction return SequentialTask( relative_heading(90 * direction), forward(1.0), relative_heading(90 * direction), ) # Bounce between walls to try to cover the entire room. wall_finder = lambda: WhileTask( lambda: SequentialTask( log('searching for wall'), WhileTask( lambda: MovementTask(0.3, 0.3, math.inf), hysteresis(3, lambda: not collision.is_wall_front(), True), ), log('saw a wall in front'), get_turn(), ), lambda: True ) # Twitch the heading at the start to wait for motors, IMU etc. to load. search = lambda: SequentialTask( relative_heading(10), relative_heading(-10), wall_finder(), ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 | # observe.py # Make observations based on ultrasound readings and current position estimate. # Observes seen walls and absence of walls between robot and readings. import sys import time import math import config import shm import mapper import ultrasound def main(): while True: for i, ultra in enumerate(config.ULTRASOUND): with shm.pos.get_lock(): x, y = shm.pos[0], shm.pos[1] heading = shm.heading.value dist = shm.ultrasound_dist[i] if dist != -1: # We see a wall, so observe it # robot frame rtx = -ultra.offx + dist * math.cos(-ultra.theta) rty = -ultra.offy + dist * math.sin(-ultra.theta) # global frame tx = x + rtx * math.cos(heading) - rty * math.sin(heading) ty = y + rtx * math.sin(heading) + rty * math.cos(heading) mapper.observe(tx, ty, True) else: # We don't see a wall, but still calculate how far we can see rtx = -ultra.offx + ultrasound.MAX_DIST * math.cos(-ultra.theta) rty = -ultra.offy + ultrasound.MAX_DIST * math.sin(-ultra.theta) tx = x + rtx * math.cos(heading) - rty * math.sin(heading) ty = y + rty * math.sin(heading) + rty * math.cos(heading) grid_pos = mapper.position_to_coords(*shm.pos[:]) grid_x, grid_y = int(grid_pos[0]), int(grid_pos[1]) target_pos = mapper.position_to_coords(tx, ty) tx, ty = int(target_pos[0]), int(target_pos[1]) # Observe the absence of walls for xs_x in range(min(grid_x, tx), max(grid_x, tx) + 1): if tx - grid_x != 0: tgnt = (ty - grid_y) / (tx - grid_x) ys_x = (xs_x - grid_x) * tgnt + grid_y posx, posy = mapper.coords_to_position(xs_x, ys_x) mapper.observe(posx, posy, False) for ys_y in range(min(grid_y, ty), max(grid_y, ty) + 1): if ty - grid_y != 0: tgnt = (tx - grid_x) / (ty - grid_y) xs_y = (ys_y - grid_y) * tgnt + grid_x posx, posy = mapper.coords_to_position(xs_y, ys_y) mapper.observe(posx, posy, False) time.sleep(0.1) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | # position.py # Continuously estimate current position based on # PWM values and current heading. import sys import time import math import shm TIME_STEP = 0.01 SPEED_TO_METERS = 0.5 BASE_WIDTH = 0.10 def main(): while True: with shm.pos.get_lock(): x, y = shm.pos[0], shm.pos[1] sl, sr = shm.motor_left.value, shm.motor_right.value heading = shm.heading.value # differential drive, hopefully # robot frame dxr = (sl + sr) / 2 * SPEED_TO_METERS dyr = 0 # TODO this is not correct but I don't feel like doing math right now # global frame dx = dxr * math.cos(heading) - dyr * math.sin(heading) dy = dxr * math.sin(heading) + dyr * math.cos(heading) with shm.pos.get_lock(): shm.pos[0] = x + dx * TIME_STEP shm.pos[1] = y + dy * TIME_STEP time.sleep(TIME_STEP) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | # pwm_calibrate.py #!/usr/bin/env python3 # We use this script for adjusting motor calibrations with a screwdriver. import RPi.GPIO as gpio import sys import time pin_nums = [5, 6] gpio.setmode(gpio.BCM) pins = [] def calc_freq_duty_cycle(speed): """ Returns (freq, duty_cycle) for the given speed in [-1, 1] """ assert -1 <= speed <= 1 pulse_width = 1.5 + 0.2 * speed freq = 1000 / (20 + pulse_width) duty_cycle = pulse_width / (20 + pulse_width) * 100 return (freq, duty_cycle) def set_speed(speed): freq, duty_cycle = calc_freq_duty_cycle(speed) print('f: {}'.format(str(freq))) print('dc: {}'.format(str(duty_cycle))) for pin in pins: pin.ChangeFrequency(freq) pin.ChangeDutyCycle(duty_cycle) def main(): if len(sys.argv) > 1: speed = float(sys.argv[1]) else: speed = 0 global pins for pin_num in pin_nums: gpio.setup(pin_num, gpio.OUT) pin = gpio.PWM(pin_num, 1) pin.start(50) pins.append(pin) set_speed(speed) while True: time.sleep(1) if __name__ == '__main__': try: main() except KeyboardInterrupt: for pin in pins: pin.stop() gpio.cleanup() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | # shm.py # Shared memory between processes. from multiprocessing import Value, Array from ctypes import c_float, c_int, c_bool import config # Current distance readings from ultrasound sensors. ultrasound_dist = Array(c_float, len(config.ULTRASOUND)) # Current readings from IMU. heading = Value(c_float) linear_acc = Value(c_float) temperature = Value(c_float) # Current PWM values. motor_left = Value(c_float) motor_right = Value(c_float) # Current position (x, y). pos = Array(c_float, 2) # Positive observations grid_positive = Array(c_int, config.GRID_SIZE**2) # Negative observations grid_negative = Array(c_int, config.GRID_SIZE**2) # Used for killing visualizer. quit_flag = Value(c_bool) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | # ultrasound.py #!/usr/bin/env python3 # Continuously read distances from ultrasound sensors. import RPi.GPIO as gpio import sys import time import shm from config import ULTRASOUND # The number of samples we take per cycle SAMPLES = 5 # The number of samples returning readings we need to make an observation SAMPLES_ACCEPTED = 4 # How close the sensors can see MIN_DIST = 0.05 # How far the sensors can see MAX_DIST = 0.35 out = None def main(index): config = ULTRASOUND[index] shm.ultrasound_dist[index] = -1.0 global out gpio.setmode(gpio.BCM) gpio.setup(config.gpio_out, gpio.OUT) gpio.setup(config.gpio_in, gpio.IN, pull_up_down=gpio.PUD_DOWN) out = gpio.PWM(config.gpio_out, 100) out.start(0.1) try: while True: samples = [] # Calculate speed of sound using current temperature reading speed_of_sound = 331.3 + 0.606 * shm.temperature.value for _ in range(SAMPLES): gpio.wait_for_edge(config.gpio_in, gpio.RISING) start = time.time() gpio.wait_for_edge(config.gpio_in, gpio.FALLING) # Calculate distance based on speed of sound and time of pulse value = (time.time() - start) * speed_of_sound / 2 if MIN_DIST < value < MAX_DIST: samples.append(value) # Average multiple samples; report -1.0 if we are out of range avg = sum(samples) / len(samples) if len(samples) > SAMPLES_ACCEPTED else -1.0 shm.ultrasound_dist[index] = avg time.sleep(0.1) finally: out.stop() gpio.cleanup() if __name__ == '__main__': if len(sys.argv) >= 2: index = int(sys.argv[1]) else: index = 0 wrapper(index) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 | # visualizer.py #!/usr/bin/env python3 # Display map on PiTFT or a window over SSH using pygame. import sys import os import time import math import numpy as np import pygame from pygame.locals import * from config import GRID_SIZE, GRID_RESOLUTION import shm import mapper # How many pixels per tile TILE_SIZE = 2 screen = None font = None width = None height = None def init_pygame(touch_screen=True): """ This needs to get called separately so that pygame is initialized in the main process """ global screen, font, width, height if touch_screen: os.putenv('SDL_VIDEODRIVER', 'fbcon') # Display on piTFT os.putenv('SDL_FBDEV', '/dev/fb1') os.putenv('SDL_MOUSEDRV', 'TSLIB') # Track mouse clicks on piTFT os.putenv('SDL_MOUSEDEV', '/dev/input/touchscreen') pygame.init() font = pygame.font.SysFont('Arial', 15) if touch_screen: pygame.mouse.set_visible(False) width, height = size = (320, 240) screen = pygame.display.set_mode(size) def main(): if None in [screen, font, width, height]: raise Exception("Didn't call init_pygame before calling visualizer.main!") start = time.time() center = np.array([width / 2, height / 2]) robot_color = (255, 255, 255) wall_color = (255, 0, 0) history = [] def rotate(mat, theta): rot = np.matrix([[np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)]]) return np.matmul(mat, rot) def rect(x, y, w, h): return [[x, y], [x + w, y], [x + w, y + h], [x, y + h]] frame = 0 while True: screen.fill((0, 0, 0)) # Coordinate system is flipped on screen heading = -shm.heading.value pos = shm.pos[:] # Draw all grid points for x in range(GRID_SIZE): for y in range(GRID_SIZE): pos_obs, neg_obs = mapper.totals_from_coords(x, y) # Draw a wall if present, otherwise draw absent wall if present if pos_obs > 0: pts = rect((x - GRID_SIZE // 2 - pos[0] / GRID_RESOLUTION) * TILE_SIZE, -(y - GRID_SIZE // 2 - pos[1] / GRID_RESOLUTION) * TILE_SIZE, 2, 2) pygame.draw.polygon(screen, (255, 0, 0), center + np.array(rotate(pts, heading))) elif neg_obs > 0: pts = rect((x - GRID_SIZE // 2 - pos[0] / GRID_RESOLUTION) * TILE_SIZE, -(y - GRID_SIZE // 2 - pos[1] / GRID_RESOLUTION) * TILE_SIZE, 2, 2) pygame.draw.polygon(screen, (0, 255, 0), center + np.array(rotate(pts, heading))) # Display position history for (x, y) in history[:]: pos = shm.pos[:] pts = rect((x - pos[0]) / GRID_RESOLUTION * TILE_SIZE, -(y - pos[1]) / GRID_RESOLUTION * TILE_SIZE, 2, 2) pygame.draw.polygon(screen, (255, 255, 255), center + np.array(rotate(pts, heading))) # Draw robot pts = np.matrix([[-10, -5], [-10, 5], [10, 0]]) pygame.draw.polygon(screen, robot_color, center + np.array(pts)) pygame.display.flip() # Record position history if frame % 2 == 0: history.append(shm.pos[:]) time.sleep(0.1) frame += 1 if shm.quit_flag.value: sys.exit(0) raise Exception() if __name__ == '__main__': init_pygame() main() |