A Tactile-based Feedback Navigation System for Bikers
A Project By Sofya Pugach (sp2535)
and Darian Nwankwo (don4)
Current GPS solutions require you to pay attention to the screen and distract users from paying attention to the constantly changing situation on the road. Most of the common GPS technologies have audio navigation direction to inform the user about upcoming turns. However, if a bicycle rider is using the system, she is forced to wear earphones to hear the directions and that makes riding on the roads unsafe. Here we are proposing an interactive system consisting of two wristbands that would inform the rider about the upcoming turns. Once the raspberry Pi knows that there is an upcoming turn, it could inform the user about it by actuating vibration in the wristbands. The increasing frequency of the vibration would inform the user that the turn is getting closer and closer. The right wristband would inform about the right turns, and the left wristband would inform about the left turns.
Successful communication of navigation instructions via vibration-based wristbands that signal when to turn left or right.
The system was designed with objects in mind using Python as our primary tool. We created custom built
classes for each subsystem, such as motor, monitor, bluetoothctl, etc.
In order to receive the GPS location and desired destination of the user, we tailored an
existing iOS
application to communicate with the HM-10 bluetooth module connected to the central hub (RPi 3). After
we converted the Swift 3 code to Swift 4, we integrated
Google's Directions API
into the application. The data received from Directions API consists of information about directions
between locations. The API supports several modes of transportation, including transit, driving,
walking, or cycling.
Once the information was received on the device, we did some preprocessing and
managed to reduce our payload by a factor of 20. A payload that was previously ~4000 bytes became ~200
bytes when we stripped all newline, carriage return, space characters and irrelavant information found in
the JSON info received. The reason we stripped all information related to white space is due to the fact
that the integrity of the data is not comprised due to the structure of JSON.
After the payload was sent from the iPhone and received by the RPi3, we created a basic parsing
algorithm to parse the structure of the data and get the relevant information. From this point, we
dispatched the distance until the next turn to the respective wristband. The entire structure of the central
hub followed this basic iterative process: listen--parse--send. Once the information was sent, the central
hub continues to listen for the next payload and the wristband either actuates or remains steady depending
on how far you are away from your next turn. In order to sustain our connection to the RPi Zeros, we used
PyBluez, a python library that handles a lot of the
bluetooth communication necessary to establish and maintain a proper connection.
To further understand what information was being transmitted to the wristbands, we integrated the piTFT screen
into our overall system. The screen outputs the direction and distance in the form of "Turning L/R in {number}
feet." The screen also enables the user to exit the application and shutdown respective devices. In order to
create such an interface, we leveraged python's pygame library and enabled the shutdown of the entire system by
using RPi.GPIO library to establish edge detection with the buttons on the piTFT.
When it came to vibrating the motors, we used a vibrating mini motor from Adafruit to warn the riders about
upcoming turns. To power the motor, a transistor, diode, and resistor were used according to this
tutorial. In our case we used a
N2222 transistor, a 1k Ohm resistor to protect the GPIO pin used, and a 1M4001 diode. The motors are PWM
controlled and we used the RPi.GPIO library to control them. We were able to control the duration of the buzzing
by keeping the frequency constant and varying the duty cycle. We designed 4 discrete variations to recognize
difference distances before the user approaches their turn. The closer the biker gets to the turning location,
the longer the motor vibrates.
When sending trivial messages to the HM-10 from the iOS application, we noticed that the payload had to
be sent in 64 byte packets or less or the information was simply disregarded.
Originally, we tried to send the entire payload received from querying the API directly to the RPi3 is 64
byte chunks. This proved problematic due to the nature of bluetooth low energy communication and
introduced an enormous delay in our system.
For the bluetooth communication, we encountered several errors when using the bluetoothctl command in linux
because we performed some actions in the incorrect order. We discovered that in order to pair the devices,
we had to follow these actions sequentially before the connection could be established: power on, pairable on
discoverable on, agent on, deault-agent, trust <MAC_ADDRESS>.
A major issue that slowed development was trying to communicate with python2 and python3. Our primary system was
built using python3, however, the interface for the piTFT was built using python2. In order to circumvent this,
we used fifos to communicate direction and distance information to the piTFT, but this caused us to do a lot of
hacky things when we wanted the two processes to communicate with one another. Eventually, we found a resource
(listed below) that helped us install the necessary dependencies for PyGame's usage in Python3.
Initially, we tested individual components of the system and iteratively increased the complexity of our tests. We started with testing the communication of the wristbands with the main RPi3. If we were not able to achieve this, then the entire project would have crumbled. The information we sent followed the structure of the real data to be received, but came from generated fake data. In order to generate fake instructions, we wrote a simple script (found here). After successful simulation of a fake joy ride, we began testing communication between the iPhone app and the HM-10 bluetooth module and ensured that no information is lost and data can be correctly interpreted. Finally, we moved on to the testing of the whole systems indoors. Do determine the stability of our system and get an idea of how long it could function sending and receiving information throughout the entire system, we let it run for about 30 minutes and encountered zero errors.
We were able to create a functional prototype of the navigation system and meet the original objectives
proposed. Our final system worked in the following sequence. First, we turn on the wristband Pi Zeros,
which notify us that they are waiting for a connection by vibrating for 3 seconds. once they are ready,
the mainn RPi is turned on and it automatically searches for the wristbands' bluetooth service. Then, we
are conecting the HM-10 bluetooth module of the RPi to the iPhone and specifying the desired location in
the app. The system is ready to work, and we only need to follow the directions.
During the indoors tests, we were able to obtain the naviation directions from an iPhone and successfully
transmit it to the RPi3 and send the directions to the respective wristbands for 30 minutes contiguously
and have a working interface. The interface allowed us to have a visual representation of the upcoming
maneuvers and also shutdown the program when the user is done. However, during the outdoors tests, we
believe the structure of the data being received was either different or the buffer exceeded capacity on
the HM-10 module, which resulted in incorrect parsing of the data and inability to interpret and send
meaningful signals to the wristbands.
What results did we achieve?
We successfully realized an embedded system by enabling our application to begin as soon as it received energy from a power source. We were able to query the Directions API and use the iPhone's GPS information and translate it into mechanical vibrations and send this information via bluetooth to notify the biker of an impending turn.
What definitely did not work?
Most of the debugging was happening while we were using monitors for each of the RPis and we output intermediate logs into the terminal. We were able to track the progress and understand what was going on with our system ONLY when we were tethered to a monitor. When running our system untethered, we ran into some issues, which we could not identify, due to the lack of error/log output onto the piTFT.
sp2535@cornell.edu
Designed the interface between the RPi and piTFT
Designed the circuit for the vibrational motor
Integrated vibrational motor into overall system
Created the design of the overall system
don4@cornell.edu
Modified existing open-source iOS app with Directions API
Integrated existing python wrapper for bluetoothctl
Created parsing and dispatch algorithm of API information
Created software architecture to support system design
All code can be found here: @DarianNwankwo
import os from random import randint from random import choice if __name__ == "__main__": os.makedirs(os.path.dirname(os.path.realpath(__file__)) + "/data", exist_ok=True) turns = ["L", "R"] distances = [d*20 for d in range(30, 60)] with open("./data/dummy_travel_data.csv", "w") as f: f.write("Direction,Distance(feet)\n") for _ in range(20): cur_turn = choice(turns) cur_dist = choice(distances) for dist in range(cur_dist, 0, -100): f.write(f"{cur_turn},{dist}\n")
""" Serves as the primary controller for setting up a device for bluetooth communication with neighboring devices. This file needs to be tailored towards your device specific mac address """ import csv import json import os import random import serial import subprocess import sys from guidance.bluetoothctl import Bluetoothctl from guidance.device import Device from guidance.monitor import Monitor from guidance.monitor import * from bluetooth import * from time import sleep from time import time MOTOR_PIN = 27 BLUETOOTH_PORT = 1 BLUETOOTH_PORT2 = 2 PAYLOAD = 1024 END_TRANSMISSION = b"" PI_ZERO_ADDRESS1 = "B8:27:EB:2D:D7:36" PI_ZERO_ADDRESS2 = "B8:27:EB:D2:45:EF" QUERY_TIME_DELTA = 1 # seconds API_IS_WORKING = True PATH_TO_FIFO = "/home/pi/Development/guidance/log_fifo" PACKET_SIZE = 64 RIGHT_OPTIONS = ( "turn-slight-right", "turn-sharp-right", "uturn-right", "turn-right", "ramp-right", "fork-right", "roundabout-right", ) def get_direction(sock): """Queries API every QUERY_TIME_DELTA seconds""" if not sock: with open("./data/dummy_travel_data.csv") as directions: for direction in csv.reader(directions): # yield direction return direction else: ser = serial.Serial("/dev/ttyUSB0") # default baud rate 9600 print("Serial Port Initialized...") incoming_data = ser.read(1) while chr(incoming_data[-1]) != "{": print("Incoming Data: ", incoming_data) incoming_data += ser.read(1) payload_size = incoming_data[:-1] print("Payload Size: {}".format(payload_size)) # payload_size = int(ser.read(4)) payload_size = int(payload_size) primary_chunk_size = payload_size // PACKET_SIZE remaining_chunk_size = payload_size - primary_chunk_size - 1 data = bytes(chr(incoming_data[-1]), "utf-8") data += ser.read(primary_chunk_size + remaining_chunk_size) print("\nData: {}\n".format(data)) try: data_as_dict = json.loads( data.decode("utf-8") ) except: print("Trouble parsing string as JSON.") stepsArray = [] remaining_distance = 9999 try: stepsArray = data_as_dict["routes"][0]["legs"][0]["steps"] remaining_distance = data_as_dict["routes"][0]["legs"][0]["distance"]["text"] metric = remaining_distance[-2:] print("Metric: ", metric) s = remaining_distance[: len(remaining_distance) - 2] print("Value to convert and type: {} - {}".format(s, type(s))) if metric == "mi": print("Inside miles...") remaining_distance = float(remaining_distance[: len(remaining_distance) - 2] ) * 5280 else: print("Here...") remaining_distance = float(remaining_distance[: len(remaining_distance) - 2] ) except: print("Encountered error searching for keys.") if remaining_distance < 100: ser.close() return ["A", "-1"] distance = 0 direction = "" if not isinstance(stepsArray, list): stepsArray = [stepsArray] # convert to iterable for step in stepsArray: dist = step["distance"]["text"] distance += int(dist[: len(dist) - 2 ]) # expecting distance in ft if "maneuver" in step: direction = "R" if any(right_word in step["maneuver"] for right_word in RIGHT_OPTIONS) else "L" break ser.close() return [direction, str(distance)] def get_recipient(direction): """Returns the address of the motor to receive some signal. :param: direction - should be a single character indicating left or right motor """ return PI_ZERO_ADDRESS1 if direction.upper() == "L" else PI_ZERO_ADDRESS2 def process_data(data): """When the data is received from the iPhone, process it before sending to pi zeros.""" if isinstance(data, list): # Get data from dummy data source return bytes(" ".join(data), "utf-8") else: # Data is coming in from some api # Do some processing return data if __name__ == "__main__": btctl = Bluetoothctl() device = Device(btctl.get_address(), BLUETOOTH_PORT) device2 = Device(btctl.get_address(), BLUETOOTH_PORT2) direction = "" monitor = Monitor() monitor.update_screen() while device.is_active(): try: # Listen for data now = time() data = get_direction(True) # Process data print("Data from api: {}".format(data)) direction, distance = process_data(data).split(b" ") direction = direction.decode("utf-8") # direction = random.choice(["L", "R"]) # delete after distance = distance.decode("utf-8") monitor.set_direction(direction) monitor.set_distance(distance) monitor.update_screen() device.active = monitor.check_screen() # Send data recipient = get_recipient(direction) # print("Here...") if not device.is_active() or direction == "A": device.active = False device.connect(PI_ZERO_ADDRESS1) device.send(distance) device.close_connection_to_peer() device2.connect(PI_ZERO_ADDRESS2) device2.send(distance) device2.close_connection_to_peer() else: if recipient == PI_ZERO_ADDRESS1: device.connect(recipient) device.send(distance) device.close_connection_to_peer() else: device2.connect(recipient) device2.send(distance) device2.close_connection_to_peer() end = time() print("Time Delta: {}".format(end - now)) except KeyboardInterrupt: sys.exit(0) except: # Send the error message to the TFT and retry print("There was an error...")
""" Serves as the primary controller for setting up a device for bluetooth communication with neighboring devices. This file needs to be tailored """ import sys from guidance.bluetoothctl import Bluetoothctl from guidance.device import Device from guidance.motor import Motor MOTOR_PIN = 27 BLUETOOTH_PORT = 1 PAYLOAD = 1024 END_TRANSMISSION = b"-1" SLEEP_TIME_DELTA = 5 def signal_handler(sig, frame): print("You pressed Ctrl+C") sys.exit(0) if __name__ == "__main__": btctl = Bluetoothctl() device = Device(btctl.get_address(), BLUETOOTH_PORT) motor = Motor(MOTOR_PIN, SLEEP_TIME_DELTA) while device.is_active(): try: print("Waiting for connection...") # Listen for data client_sock, client_info = device.accept() data = client_sock.recv(PAYLOAD) client_sock.close() # Translate data to motor command distance = int(data) print("Data: {}".format(distance)) if distance < 0: device.active = False motor.stop_vibrating() motor.stop() else: motor.vibrate(distance) except KeyboardInterrupt: print("Ending program...") sys.exit(0) except: print("Something bad happened. Trying again.")
# ReachView code is placed under the GPL license. # Written by Egor Fedorov (egor.fedorov@emlid.com) # Copyright (c) 2015, Emlid Limited # All rights reserved. # If you are interested in using ReachView code as a part of a # closed source project, please contact Emlid Limited (info@emlid.com). # This file is part of ReachView. # ReachView is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # ReachView is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with ReachView. If not, see <http://www.gnu.org/licenses/>. import time import pexpect import subprocess import sys import re class BluetoothctlError(Exception): """This exception is raised, when bluetoothctl fails to start.""" pass class Bluetoothctl: """A wrapper for bluetoothctl utility.""" def __init__(self): out = subprocess.check_output("rfkill unblock bluetooth", shell = True) self.child = pexpect.spawn("bluetoothctl", echo = False) self._setup() def _setup(self): """Run a sequence of commands to enable bluetooth communication.""" # power on, pairable on, discoverable on, agent on, default-agent, trust cmds = ["power on", "pairable on", "discoverable on", "agent on", "default-agent"] for cmd in cmds: try: out = self.get_output(cmd) except BluetoothctlError as e: print(e) return None def get_address(self): return "" def get_output(self, command, pause = 0): """Run a command in bluetoothctl prompt, return output as a list of lines.""" self.child.send(command + "\n") time.sleep(pause) start_failed = self.child.expect(["bluetooth", pexpect.EOF]) if start_failed: raise BluetoothctlError("Bluetoothctl failed after running " + command) return self.child.before.split(b"\r\n") def start_scan(self): """Start bluetooth scanning process.""" try: out = self.get_output("scan on") except BluetoothctlError as e: print(e) return None def make_discoverable(self): """Make device discoverable.""" try: out = self.get_output("discoverable on") except BluetoothctlError as e: print(e) return None def parse_device_info(self, info_string): """Parse a string corresponding to a device.""" device = {} # block_list = [b"[\x1b[0;", b"removed"] block_list = [] print("Info String: ", info_string) string_valid = not any(keyword in info_string for keyword in block_list) if string_valid: try: device_position = info_string.index(b"Device") except ValueError: pass else: if device_position > -1: attribute_list = info_string[device_position:].split(b" ", 2) device = { "mac_address": attribute_list[1], "name": attribute_list[2] } return device def get_available_devices(self): """Return a list of tuples of paired and discoverable devices.""" try: out = self.get_output("devices") except BluetoothctlError as e: print(e) return None else: available_devices = [] for line in out: device = self.parse_device_info(line) if device: available_devices.append(device) return available_devices def get_paired_devices(self): """Return a list of tuples of paired devices.""" try: out = self.get_output("paired-devices") except BluetoothctlError as e: print(e) return None else: paired_devices = [] for line in out: device = self.parse_device_info(line) if device: paired_devices.append(device) return paired_devices def get_discoverable_devices(self): """Filter paired devices out of available.""" available = self.get_available_devices() paired = self.get_paired_devices() return [d for d in available if d not in paired] def get_device_info(self, mac_address): """Get device info by mac address.""" try: out = self.get_output("info " + mac_address) except BluetoothctlError as e: print(e) return None else: return out def get_connectable_devices(self): """Get a list of connectable devices. Must install 'sudo apt-get install bluez blueztools' to use this""" try: res = [] out = subprocess.check_output(["hcitool", "scan"]) # Requires 'apt-get install bluez' out = out.split("\n") device_name_re = re.compile("^\t([0-9,:,A-F]{17})\t(.*)$") for line in out: device_name = device_name_re.match(line) if device_name != None: res.append({ "mac_address": device_name.group(1), "name": device_name.group(2) }) except BluetoothctlError as e: print(e) return None else: return res def is_connected(self): """Returns True if there is a current connection to any device, otherwise returns False""" try: res = False out = subprocess.check_output(["hcitool", "con"]) # Requires 'apt-get install bluez' out = out.split("\n") mac_addr_re = re.compile("^.*([0-9,:,A-F]{17}).*$") for line in out: mac_addr = mac_addr_re.match(line) if mac_addr != None: res = True except BluetoothctlError as e: print(e) return None else: return res def pair(self, mac_address): """Try to pair with a device by mac address.""" try: out = self.get_output("pair " + mac_address, 4) except BluetoothctlError as e: print(e) return None else: res = self.child.expect(["Failed to pair", "Pairing successful", pexpect.EOF]) success = True if res == 1 else False return success def remove(self, mac_address): """Remove paired device by mac address, return success of the operation.""" try: out = self.get_output("remove " + mac_address, 3) except BluetoothctlError as e: print(e) return None else: res = self.child.expect(["not available", "Device has been removed", pexpect.EOF]) success = True if res == 1 else False return success def connect(self, mac_address): """Try to connect to a device by mac address.""" try: out = self.get_output("connect " + mac_address, 2) except BluetoothctlError as e: print(e) return None else: res = self.child.expect(["Failed to connect", "Connection successful", pexpect.EOF]) success = True if res == 1 else False return success def disconnect(self, mac_address): """Try to disconnect to a device by mac address.""" try: out = self.get_output("disconnect " + mac_address, 2) except BluetoothctlError as e: print(e) return None else: res = self.child.expect(["Failed to disconnect", "Successful disconnected", pexpect.EOF]) success = True if res == 1 else False return success def trust(self, mac_address): """Trust the device with the given MAC address""" try: out = self.get_output("trust " + mac_address, 4) except BluetoothctlError as e: print(e) return None else: res = self.child.expect(["not available", "trust succeeded", pexpect.EOF]) success = True if res == 1 else False return success def start_agent(self): """Start agent""" try: out = self.get_output("agent on") except BluetoothctlError as e: print(e) return None def default_agent(self): """Start default agent""" try: out = self.get_output("default-agent") except BluetoothctlError as e: print(e) return None if __name__ == "__main__": print("Init bluetooth...") bl = Bluetoothctl() bl.setup() print("Ready!") bl.start_scan() print("Scanning for 10 seconds...") for i in range(0, 10): print(i) time.sleep(1) # print(bl.get_discoverable_devices()) for dev in bl.get_discoverable_devices(): print("Name: {} - Address: {}".format(dev["name"], dev["mac_address"]))
import sys from bluetooth import * class Device: def __init__(self, addr, port_num): self.port_num = port_num self.uuid = "94f39d29-7d6d-437d-973b-fba39e49d4ee" self._setup(port_num) self._broadcast_service() self.addr = addr self.active = True def is_active(self): return self.active def _setup(self, port_num): """Opens a port socket communication.""" self.port_num = port_num self.server_sock = BluetoothSocket(RFCOMM) self.server_sock.bind(("", port_num)) self.server_sock.listen(1) def _broadcast_service(self): """Advertises device service to enable peers to see host.""" advertise_service( self.server_sock, "<ServerName>", service_id=self.uuid, service_classes=[self.uuid, SERIAL_PORT_CLASS], profiles=[SERIAL_PORT_PROFILE] ) def accept(self): client_sock, client_info = self.server_sock.accept() return client_sock, client_info def find(self, addr): """Search for device with mac address addr.""" return find_service(uuid=self.uuid, address=addr) def connect(self, addr): service_match = find_service(uuid=self.uuid, address=addr) # the UUID are the same for both devices at the moment if len(service_match) == 0: print("Couldn't find the <ServerName> service.") sys.exit(0) match = service_match[0] port, name, host = match["port"], match["name"], match["host"] sock = BluetoothSocket(RFCOMM) sock.connect((host, port)) self.peer_server_sock = sock def send(self, payload): self.peer_server_sock.sendall(payload) def close_connection_to_peer(self): self.peer_server_sock.close() if __name__ == "__main__": local = Device(2) local.listen()
import RPi.GPIO as GPIO import os import pygame import sys import subprocess import time from pygame.locals import * isOnTFT = True path = "/home/pi/Development/guidance/log_fifo" if isOnTFT: os.putenv("SDL_VIDEODRIVER","fbcon") os.putenv("SDL_FBDEV","/dev/fb1") os.putenv("SDL_MOUSEDRV","TSLIB") os.putenv("SDL_MOUSEDEV","/dev/input/touchscreen") BLACK = (0 , 0, 0) WHITE = (255, 255, 255) EXIT_POS = (260, 220) CENTER_POS = (160, 120) def quit_it(channel): subprocess.call(["sudo", "shutdown", "-h", "now"]) # sys.exit(0) class Monitor: def __init__(self): pygame.init() self.display = pygame.display.set_mode( (320,240) ) pygame.mouse.set_visible(not isOnTFT) self.direction = "" self.distance = 0 GPIO.setmode(GPIO.BCM) GPIO.setup(27, GPIO.IN, pull_up_down=GPIO.PUD_UP) GPIO.add_event_detect(27, GPIO.FALLING, callback=quit_it) def set_direction(self, L_R): self.direction = L_R return self def set_distance(self, feet): self.distance = feet return self def _add_text(self, text, position, size, color): font = pygame.font.Font(None, size) text_surf = font.render(text, True, color) rect = text_surf.get_rect(center=position) return (text_surf, rect) def update_screen(self, color=(255,255,255), size=20, message=None): # Add text and the exit button if message == None: text=["Turning {} in {} feet".format(self.direction, self.distance), "Shutdown"] else: text=[message, "Shutdown"] position=[CENTER_POS,EXIT_POS] txt_pos=[(text[0], position[0]),(text[1], position[1])] self.display.fill(BLACK) display_objects=[] for txt,pos in txt_pos: text_surf,rect = self._add_text(txt, pos, size, color) self.display.blit(text_surf, rect) pygame.display.flip() def _inside_quit(self, tl, br, x, y): return tl[0] <= x and x<=br[0] and tl[1] <= y and y<= br[1] def check_screen(self): for event in pygame.event.get(): if event.type == pygame.QUIT: sys.exit() if event.type is MOUSEBUTTONUP: x,y = pygame.mouse.get_pos() print("Touch: {},{}".format(x, y)) tl=(EXIT_POS[0] - 80, EXIT_POS[1] - 20) br = (320, 240) return not self._inside_quit(tl, br, x, y) return True if __name__ == "__main__": # Initialize oygame pygame.init() screen = pygame.display.set_mode((320, 240)) pygame.mouse.set_visible(not isOnTFT) # Create Monitor instance nav = Monitor(screen) nav.set_direction("-") nav.set_distance(0) nav.update_screen(WHITE,30,"Welcome") #GPIO set up GPIO.setmode(GPIO.BCM) GPIO.setup(27, GPIO.IN, pull_up_down=GPIO.PUD_UP) GPIO.add_event_detect(27, GPIO.FALLING, callback = quit) running = True while running: running = nav.check_screen() with open(path) as fifo: myline = fifo.readline() print("myline: {}".format(myline)) if str(myline.strip()) == "Quit": running = False else: myline = myline.split(",") direction = myline[0] if direction == "A": nav.update_screen(WHITE,30,"Arrived") else: nav.set_direction(direction) nav.set_distance(float(myline[1])) nav.update_screen(WHITE,20) time.sleep(.02) GPIO.cleanup() f = open("shutdown","w+") #execute("Quit", PATH_TO_FIFO)
import RPi.GPIO as GPIO from time import sleep class Motor: """Controls the mini vibrating motor.""" avg_bike_rate = 15 # ft/sec vibration_levels = 4 def __init__(self, gpio_pin, time_delta): self.pin = gpio_pin self.freq = 1 self.time_delta = time_delta self._setup() self.duty_cycle = 50 def _setup(self): """Setup PWM connection to motor""" GPIO.setmode(GPIO.BCM) GPIO.setup(self.pin, GPIO.OUT) self.pwm = GPIO.PWM(self.pin, self.freq) self.pwm.start(1) def vibrate(self, dist): """Adjust the vibration based on the user's distance from next turn. TODO: Consider using a loop and making this method more general. The values chosen for the conditional checks are rather arbitrary. """ bound = self.time_delta * Motor.avg_bike_rate duty_cycle = 0 if dist < bound: duty_cycle = 90 elif dist <= 3 * bound: duty_cycle = 70 elif dist <= 6 * bound: duty_cycle = 40 elif dist <= 9 * bound: duty_cycle = 20 else: duty_cycle = 0 self.pwm.ChangeDutyCycle(duty_cycle) sleep(self.time_delta) self.stop_vibrating() return self def stop_vibrating(self): self.pwm.ChangeDutyCycle(0) return self def stop(self): self.pwm.stop() def change_freq(freq): """Change the motors frequency.""" self.pwm.ChangeFrequency(freq) return self