Guidance

A Tactile-based Feedback Navigation System for Bikers
A Project By Sofya Pugach (sp2535) and Darian Nwankwo (don4)


Demonstration Video


Introduction

Current GPS solutions require you to pay attention to the screen and distract users from paying attention to the constantly changing situation on the road. Most of the common GPS technologies have audio navigation direction to inform the user about upcoming turns. However, if a bicycle rider is using the system, she is forced to wear earphones to hear the directions and that makes riding on the roads unsafe. Here we are proposing an interactive system consisting of two wristbands that would inform the rider about the upcoming turns. Once the raspberry Pi knows that there is an upcoming turn, it could inform the user about it by actuating vibration in the wristbands. The increasing frequency of the vibration would inform the user that the turn is getting closer and closer. The right wristband would inform about the right turns, and the left wristband would inform about the left turns.


Project Objective

Successful communication of navigation instructions via vibration-based wristbands that signal when to turn left or right.


Design and Testing of Components

The system was designed with objects in mind using Python as our primary tool. We created custom built classes for each subsystem, such as motor, monitor, bluetoothctl, etc. In order to receive the GPS location and desired destination of the user, we tailored an existing iOS application to communicate with the HM-10 bluetooth module connected to the central hub (RPi 3). After we converted the Swift 3 code to Swift 4, we integrated Google's Directions API into the application. The data received from Directions API consists of information about directions between locations. The API supports several modes of transportation, including transit, driving, walking, or cycling.

Once the information was received on the device, we did some preprocessing and managed to reduce our payload by a factor of 20. A payload that was previously ~4000 bytes became ~200 bytes when we stripped all newline, carriage return, space characters and irrelavant information found in the JSON info received. The reason we stripped all information related to white space is due to the fact that the integrity of the data is not comprised due to the structure of JSON.

After the payload was sent from the iPhone and received by the RPi3, we created a basic parsing algorithm to parse the structure of the data and get the relevant information. From this point, we dispatched the distance until the next turn to the respective wristband. The entire structure of the central hub followed this basic iterative process: listen--parse--send. Once the information was sent, the central hub continues to listen for the next payload and the wristband either actuates or remains steady depending on how far you are away from your next turn. In order to sustain our connection to the RPi Zeros, we used PyBluez, a python library that handles a lot of the bluetooth communication necessary to establish and maintain a proper connection.

To further understand what information was being transmitted to the wristbands, we integrated the piTFT screen into our overall system. The screen outputs the direction and distance in the form of "Turning L/R in {number} feet." The screen also enables the user to exit the application and shutdown respective devices. In order to create such an interface, we leveraged python's pygame library and enabled the shutdown of the entire system by using RPi.GPIO library to establish edge detection with the buttons on the piTFT.

When it came to vibrating the motors, we used a vibrating mini motor from Adafruit to warn the riders about upcoming turns. To power the motor, a transistor, diode, and resistor were used according to this tutorial. In our case we used a N2222 transistor, a 1k Ohm resistor to protect the GPIO pin used, and a 1M4001 diode. The motors are PWM controlled and we used the RPi.GPIO library to control them. We were able to control the duration of the buzzing by keeping the frequency constant and varying the duty cycle. We designed 4 discrete variations to recognize difference distances before the user approaches their turn. The closer the biker gets to the turning location, the longer the motor vibrates.

Issues

When sending trivial messages to the HM-10 from the iOS application, we noticed that the payload had to be sent in 64 byte packets or less or the information was simply disregarded.

Originally, we tried to send the entire payload received from querying the API directly to the RPi3 is 64 byte chunks. This proved problematic due to the nature of bluetooth low energy communication and introduced an enormous delay in our system.

For the bluetooth communication, we encountered several errors when using the bluetoothctl command in linux because we performed some actions in the incorrect order. We discovered that in order to pair the devices, we had to follow these actions sequentially before the connection could be established: power on, pairable on discoverable on, agent on, deault-agent, trust <MAC_ADDRESS>.

A major issue that slowed development was trying to communicate with python2 and python3. Our primary system was built using python3, however, the interface for the piTFT was built using python2. In order to circumvent this, we used fifos to communicate direction and distance information to the piTFT, but this caused us to do a lot of hacky things when we wanted the two processes to communicate with one another. Eventually, we found a resource (listed below) that helped us install the necessary dependencies for PyGame's usage in Python3.


Drawings and Pictures

A caption for the above image.
A caption for the above image.
A caption for the above image.
A caption for the above image.
A caption for the above image.
A caption for the above image.

Testing of System

Initially, we tested individual components of the system and iteratively increased the complexity of our tests. We started with testing the communication of the wristbands with the main RPi3. If we were not able to achieve this, then the entire project would have crumbled. The information we sent followed the structure of the real data to be received, but came from generated fake data. In order to generate fake instructions, we wrote a simple script (found here). After successful simulation of a fake joy ride, we began testing communication between the iPhone app and the HM-10 bluetooth module and ensured that no information is lost and data can be correctly interpreted. Finally, we moved on to the testing of the whole systems indoors. Do determine the stability of our system and get an idea of how long it could function sending and receiving information throughout the entire system, we let it run for about 30 minutes and encountered zero errors.


Result

We were able to create a functional prototype of the navigation system and meet the original objectives proposed. Our final system worked in the following sequence. First, we turn on the wristband Pi Zeros, which notify us that they are waiting for a connection by vibrating for 3 seconds. once they are ready, the mainn RPi is turned on and it automatically searches for the wristbands' bluetooth service. Then, we are conecting the HM-10 bluetooth module of the RPi to the iPhone and specifying the desired location in the app. The system is ready to work, and we only need to follow the directions.

During the indoors tests, we were able to obtain the naviation directions from an iPhone and successfully transmit it to the RPi3 and send the directions to the respective wristbands for 30 minutes contiguously and have a working interface. The interface allowed us to have a visual representation of the upcoming maneuvers and also shutdown the program when the user is done. However, during the outdoors tests, we believe the structure of the data being received was either different or the buffer exceeded capacity on the HM-10 module, which resulted in incorrect parsing of the data and inability to interpret and send meaningful signals to the wristbands.


Conclusion

What results did we achieve?

We successfully realized an embedded system by enabling our application to begin as soon as it received energy from a power source. We were able to query the Directions API and use the iPhone's GPS information and translate it into mechanical vibrations and send this information via bluetooth to notify the biker of an impending turn.



What definitely did not work?

Most of the debugging was happening while we were using monitors for each of the RPis and we output intermediate logs into the terminal. We were able to track the progress and understand what was going on with our system ONLY when we were tethered to a monitor. When running our system untethered, we ran into some issues, which we could not identify, due to the lack of error/log output onto the piTFT.


Future Work


Work Distribution

Generic placeholder image

Project group picture

Generic placeholder image

Sofya Pugach

sp2535@cornell.edu

Designed the interface between the RPi and piTFT

Designed the circuit for the vibrational motor

Integrated vibrational motor into overall system

Created the design of the overall system

Generic placeholder image

Darian Nwankwo

don4@cornell.edu

Modified existing open-source iOS app with Directions API

Integrated existing python wrapper for bluetoothctl

Created parsing and dispatch algorithm of API information

Created software architecture to support system design


Parts List

Total: ~$71.00


References

Install PyGame w/Python3
Vibrating Motor Wiring
Bootstrap
Pi-GPIO Library
R-Pi GPIO Documentation
Pybluez Library
Pexpect Library
Vibrating Motor Specifications
Bluetoothctl Wrapper in Python
PyGame
Directions API

Code Appendix

All code can be found here: @DarianNwankwo

dummy_data.py

import os

from random import randint
from random import choice

if __name__ == "__main__":
    os.makedirs(os.path.dirname(os.path.realpath(__file__)) + "/data", exist_ok=True)

    turns = ["L", "R"]
    distances = [d*20 for d in range(30, 60)]

    with open("./data/dummy_travel_data.csv", "w") as f:
        f.write("Direction,Distance(feet)\n")
        for _ in range(20):
            cur_turn = choice(turns)
            cur_dist = choice(distances)
            for dist in range(cur_dist, 0, -100):
                f.write(f"{cur_turn},{dist}\n")

main.py

"""
Serves as the primary controller for setting up a device for bluetooth communication with
neighboring devices.

This file needs to be tailored towards your device specific mac address
"""
import csv
import json
import os
import random
import serial
import subprocess
import sys


from guidance.bluetoothctl import Bluetoothctl
from guidance.device import Device
from guidance.monitor import Monitor
from guidance.monitor import *

from bluetooth import *
from time import sleep
from time import time

MOTOR_PIN = 27
BLUETOOTH_PORT = 1
BLUETOOTH_PORT2 = 2
PAYLOAD = 1024
END_TRANSMISSION = b""
PI_ZERO_ADDRESS1 = "B8:27:EB:2D:D7:36"
PI_ZERO_ADDRESS2 = "B8:27:EB:D2:45:EF"
QUERY_TIME_DELTA = 1 # seconds
API_IS_WORKING = True 
PATH_TO_FIFO = "/home/pi/Development/guidance/log_fifo"
PACKET_SIZE = 64
RIGHT_OPTIONS = (
        "turn-slight-right", "turn-sharp-right", "uturn-right",
        "turn-right", "ramp-right", "fork-right", "roundabout-right",
        )


def get_direction(sock):
    """Queries API every QUERY_TIME_DELTA seconds"""
    if not sock:
        with open("./data/dummy_travel_data.csv") as directions:
            for direction in csv.reader(directions):
                # yield direction
                return direction
    else:
        ser = serial.Serial("/dev/ttyUSB0") # default baud rate 9600
        print("Serial Port Initialized...")

        incoming_data = ser.read(1)
        while chr(incoming_data[-1]) != "{":
            print("Incoming Data: ", incoming_data)
            incoming_data += ser.read(1)

        payload_size = incoming_data[:-1]
        print("Payload Size: {}".format(payload_size))
        # payload_size = int(ser.read(4))
        payload_size = int(payload_size)
        primary_chunk_size = payload_size // PACKET_SIZE
        remaining_chunk_size = payload_size - primary_chunk_size - 1

        data = bytes(chr(incoming_data[-1]), "utf-8")
        data += ser.read(primary_chunk_size + remaining_chunk_size)
        print("\nData: {}\n".format(data))

        try:
            data_as_dict = json.loads( data.decode("utf-8") )
        except:
            print("Trouble parsing string as JSON.")

        stepsArray = []
        remaining_distance = 9999
        try:
            stepsArray = data_as_dict["routes"][0]["legs"][0]["steps"]
            remaining_distance = data_as_dict["routes"][0]["legs"][0]["distance"]["text"]
            
            metric = remaining_distance[-2:]
            print("Metric: ", metric)
            s = remaining_distance[: len(remaining_distance) - 2]
            print("Value to convert and type: {} - {}".format(s, type(s)))
            if metric == "mi":
                print("Inside miles...")
                remaining_distance = float(remaining_distance[: len(remaining_distance) - 2] ) * 5280
            else:
                print("Here...")
                remaining_distance = float(remaining_distance[: len(remaining_distance) - 2] )
        except:
            print("Encountered error searching for keys.")

        if remaining_distance < 100:
            ser.close()
            return ["A", "-1"]

        distance = 0
        direction = ""
        if not isinstance(stepsArray, list):
            stepsArray = [stepsArray] # convert to iterable
        
        for step in stepsArray:
            dist = step["distance"]["text"]
            distance += int(dist[: len(dist) - 2 ]) # expecting distance in ft
            if "maneuver" in step:
                direction = "R" if any(right_word in step["maneuver"] for right_word in RIGHT_OPTIONS) else "L" 
                break
        ser.close()
        return [direction, str(distance)]


def get_recipient(direction):
    """Returns the address of the motor to receive some signal.
    
    :param: direction - should be a single character indicating left or right motor
    """
    return PI_ZERO_ADDRESS1 if direction.upper() == "L" else PI_ZERO_ADDRESS2


def process_data(data):
    """When the data is received from the iPhone, process it before sending to pi zeros."""
    if isinstance(data, list):
        # Get data from dummy data source
        return bytes(" ".join(data), "utf-8")
    else:
        # Data is coming in from some api
        # Do some processing
        return data


if __name__ == "__main__":
    btctl = Bluetoothctl()
    device = Device(btctl.get_address(), BLUETOOTH_PORT)
    device2 = Device(btctl.get_address(), BLUETOOTH_PORT2)
    direction = ""
        
    monitor = Monitor()
    monitor.update_screen()

    while device.is_active():
        try:
        # Listen for data
            now = time()
            data = get_direction(True)

            # Process data
            print("Data from api: {}".format(data))
            direction, distance = process_data(data).split(b" ")
            direction = direction.decode("utf-8")
            # direction = random.choice(["L", "R"]) # delete after
            distance = distance.decode("utf-8")
            monitor.set_direction(direction)
            monitor.set_distance(distance)
            monitor.update_screen()
            device.active = monitor.check_screen()

            # Send data
            recipient = get_recipient(direction)
            # print("Here...")
            if not device.is_active() or direction == "A":
                device.active = False
                device.connect(PI_ZERO_ADDRESS1)
                device.send(distance)
                device.close_connection_to_peer()
                device2.connect(PI_ZERO_ADDRESS2)
                device2.send(distance)
                device2.close_connection_to_peer()
            else:
                if recipient == PI_ZERO_ADDRESS1:
                    device.connect(recipient)
                    device.send(distance)
                    device.close_connection_to_peer()
                else:
                    device2.connect(recipient)
                    device2.send(distance)
                    device2.close_connection_to_peer()
            end = time()
            print("Time Delta: {}".format(end - now))
        except KeyboardInterrupt:
            sys.exit(0)
        except:
            # Send the error message to the TFT and retry
            print("There was an error...")

zero.py

"""
Serves as the primary controller for setting up a device for bluetooth communication with
neighboring devices.

This file needs to be tailored
"""
import sys


from guidance.bluetoothctl import Bluetoothctl
from guidance.device import Device
from guidance.motor import Motor


MOTOR_PIN = 27
BLUETOOTH_PORT = 1
PAYLOAD = 1024
END_TRANSMISSION = b"-1"
SLEEP_TIME_DELTA = 5


def signal_handler(sig, frame):
    print("You pressed Ctrl+C")
    sys.exit(0)


if __name__ == "__main__":
    btctl = Bluetoothctl()
    device = Device(btctl.get_address(), BLUETOOTH_PORT)
    motor = Motor(MOTOR_PIN, SLEEP_TIME_DELTA)
    
    while device.is_active():
        try:
            print("Waiting for connection...")
            # Listen for data
            client_sock, client_info = device.accept()
            data = client_sock.recv(PAYLOAD)
            client_sock.close()

            # Translate data to motor command
            distance = int(data)
            print("Data: {}".format(distance))
            if distance < 0:
                device.active = False
                motor.stop_vibrating()
                motor.stop()
            else:
                motor.vibrate(distance)

        except KeyboardInterrupt:
            print("Ending program...")
            sys.exit(0)
        except:
            print("Something bad happened. Trying again.")

bluetoothctl.py

# ReachView code is placed under the GPL license.
# Written by Egor Fedorov (egor.fedorov@emlid.com)
# Copyright (c) 2015, Emlid Limited
# All rights reserved.

# If you are interested in using ReachView code as a part of a
# closed source project, please contact Emlid Limited (info@emlid.com).

# This file is part of ReachView.

# ReachView is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# ReachView is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with ReachView.  If not, see <http://www.gnu.org/licenses/>.

import time
import pexpect
import subprocess
import sys
import re

class BluetoothctlError(Exception):
    """This exception is raised, when bluetoothctl fails to start."""
    pass


class Bluetoothctl:
    """A wrapper for bluetoothctl utility."""

    def __init__(self):
        out = subprocess.check_output("rfkill unblock bluetooth", shell = True)
        self.child = pexpect.spawn("bluetoothctl", echo = False)
        self._setup()


    def _setup(self):
        """Run a sequence of commands to enable bluetooth communication."""
        # power on, pairable on, discoverable on, agent on, default-agent, trust
        cmds = ["power on", "pairable on", "discoverable on", "agent on", "default-agent"]
        for cmd in cmds:
            try:
                out = self.get_output(cmd)
            except BluetoothctlError as e:
                print(e)
                return None


    def get_address(self):
        return ""
    
    
    def get_output(self, command, pause = 0):
        """Run a command in bluetoothctl prompt, return output as a list of lines."""
        self.child.send(command + "\n")
        time.sleep(pause)
        start_failed = self.child.expect(["bluetooth", pexpect.EOF])

        if start_failed:
            raise BluetoothctlError("Bluetoothctl failed after running " + command)

        return self.child.before.split(b"\r\n")

    def start_scan(self):
        """Start bluetooth scanning process."""
        try:
            out = self.get_output("scan on")
        except BluetoothctlError as e:
            print(e)
            return None

    def make_discoverable(self):
        """Make device discoverable."""
        try:
            out = self.get_output("discoverable on")
        except BluetoothctlError as e:
            print(e)
            return None

    def parse_device_info(self, info_string):
        """Parse a string corresponding to a device."""
        device = {}
        # block_list = [b"[\x1b[0;", b"removed"]
        block_list = []
        print("Info String: ", info_string)
        string_valid = not any(keyword in info_string for keyword in block_list)

        if string_valid:
            try:
                device_position = info_string.index(b"Device")
            except ValueError:
                pass
            else:
                if device_position > -1:
                    attribute_list = info_string[device_position:].split(b" ", 2)
                    device = {
                        "mac_address": attribute_list[1],
                        "name": attribute_list[2]
                    }

        return device

    def get_available_devices(self):
        """Return a list of tuples of paired and discoverable devices."""
        try:
            out = self.get_output("devices")
        except BluetoothctlError as e:
            print(e)
            return None
        else:
            available_devices = []
            for line in out:
                device = self.parse_device_info(line)
                if device:
                    available_devices.append(device)

            return available_devices

    def get_paired_devices(self):
        """Return a list of tuples of paired devices."""
        try:
            out = self.get_output("paired-devices")
        except BluetoothctlError as e:
            print(e)
            return None
        else:
            paired_devices = []
            for line in out:
                device = self.parse_device_info(line)
                if device:
                    paired_devices.append(device)

            return paired_devices

    def get_discoverable_devices(self):
        """Filter paired devices out of available."""
        available = self.get_available_devices()
        paired = self.get_paired_devices()

        return [d for d in available if d not in paired]

    def get_device_info(self, mac_address):
        """Get device info by mac address."""
        try:
            out = self.get_output("info " + mac_address)
        except BluetoothctlError as e:
            print(e)
            return None
        else:
            return out

    def get_connectable_devices(self):
        """Get a  list of connectable devices.
        Must install 'sudo apt-get install bluez blueztools' to use this"""
        try:
            res = []
            out = subprocess.check_output(["hcitool", "scan"])  # Requires 'apt-get install bluez'
            out = out.split("\n")
            device_name_re = re.compile("^\t([0-9,:,A-F]{17})\t(.*)$")
            for line in out:
                device_name = device_name_re.match(line)
                if device_name != None:
                    res.append({
                            "mac_address": device_name.group(1),
                            "name": device_name.group(2)
                        })
        except BluetoothctlError as e:
            print(e)
            return None
        else:
            return res

    def is_connected(self):
        """Returns True if there is a current connection to any device, otherwise returns False"""
        try:
            res = False
            out = subprocess.check_output(["hcitool", "con"])  # Requires 'apt-get install bluez'
            out = out.split("\n")
            mac_addr_re = re.compile("^.*([0-9,:,A-F]{17}).*$")
            for line in out:
                mac_addr = mac_addr_re.match(line)
                if mac_addr != None:
                    res = True
        except BluetoothctlError as e:
            print(e)
            return None
        else:
            return res

    def pair(self, mac_address):
        """Try to pair with a device by mac address."""
        try:
            out = self.get_output("pair " + mac_address, 4)
        except BluetoothctlError as e:
            print(e)
            return None
        else:
            res = self.child.expect(["Failed to pair", "Pairing successful", pexpect.EOF])
            success = True if res == 1 else False
            return success

    def remove(self, mac_address):
        """Remove paired device by mac address, return success of the operation."""
        try:
            out = self.get_output("remove " + mac_address, 3)
        except BluetoothctlError as e:
            print(e)
            return None
        else:
            res = self.child.expect(["not available", "Device has been removed", pexpect.EOF])
            success = True if res == 1 else False
            return success

    def connect(self, mac_address):
        """Try to connect to a device by mac address."""
        try:
            out = self.get_output("connect " + mac_address, 2)
        except BluetoothctlError as e:
            print(e)
            return None
        else:
            res = self.child.expect(["Failed to connect", "Connection successful", pexpect.EOF])
            success = True if res == 1 else False
            return success

    def disconnect(self, mac_address):
        """Try to disconnect to a device by mac address."""
        try:
            out = self.get_output("disconnect " + mac_address, 2)
        except BluetoothctlError as e:
            print(e)
            return None
        else:
            res = self.child.expect(["Failed to disconnect", "Successful disconnected", pexpect.EOF])
            success = True if res == 1 else False
            return success

    def trust(self, mac_address):
        """Trust the device with the given MAC address"""
        try:
            out = self.get_output("trust " + mac_address, 4)
        except BluetoothctlError as e:
            print(e)
            return None
        else:
            res = self.child.expect(["not available", "trust succeeded", pexpect.EOF])
            success = True if res == 1 else False
            return success

    def start_agent(self):
        """Start agent"""
        try:
            out = self.get_output("agent on")
        except BluetoothctlError as e:
            print(e)
            return None

    def default_agent(self):
        """Start default agent"""
        try:
            out = self.get_output("default-agent")
        except BluetoothctlError as e:
            print(e)
            return None

if __name__ == "__main__":
    print("Init bluetooth...")
    bl = Bluetoothctl()
    bl.setup()
    print("Ready!")
    bl.start_scan()
    print("Scanning for 10 seconds...")
    for i in range(0, 10):
        print(i)
        time.sleep(1)

    # print(bl.get_discoverable_devices())
    for dev in bl.get_discoverable_devices():
        print("Name: {} - Address: {}".format(dev["name"], dev["mac_address"]))

device.py

import sys

from bluetooth import *


class Device:
    def __init__(self, addr, port_num):
        self.port_num = port_num
        self.uuid = "94f39d29-7d6d-437d-973b-fba39e49d4ee"
        self._setup(port_num)
        self._broadcast_service()
        self.addr = addr
        self.active = True
    

    def is_active(self):
        return self.active
    
    
    def _setup(self, port_num):
        """Opens a port socket communication."""
        self.port_num = port_num
        self.server_sock = BluetoothSocket(RFCOMM)
        self.server_sock.bind(("", port_num))
        self.server_sock.listen(1)

    
    def _broadcast_service(self):
        """Advertises device service to enable peers to see host."""
        advertise_service(
            self.server_sock, "<ServerName>",
            service_id=self.uuid,
            service_classes=[self.uuid, SERIAL_PORT_CLASS],
            profiles=[SERIAL_PORT_PROFILE]
        )


    def accept(self):
        client_sock, client_info = self.server_sock.accept()
        return client_sock, client_info


    def find(self, addr):
        """Search for device with mac address addr."""
        return find_service(uuid=self.uuid, address=addr)


    def connect(self, addr):
        service_match = find_service(uuid=self.uuid, address=addr) # the UUID are the same for both devices at the moment
        
        if len(service_match) == 0:
            print("Couldn't find the <ServerName> service.")
            sys.exit(0)

        match = service_match[0]
        port, name, host = match["port"], match["name"], match["host"]

        sock = BluetoothSocket(RFCOMM)
        sock.connect((host, port))
        self.peer_server_sock = sock


    def send(self, payload):
        self.peer_server_sock.sendall(payload)


    def close_connection_to_peer(self):
        self.peer_server_sock.close()


if __name__ == "__main__":
    local = Device(2)
    local.listen()

monitor.py

import RPi.GPIO as GPIO

import os
import pygame
import sys
import subprocess
import time

from pygame.locals import *

isOnTFT = True

path = "/home/pi/Development/guidance/log_fifo"

if isOnTFT:
    os.putenv("SDL_VIDEODRIVER","fbcon")
    os.putenv("SDL_FBDEV","/dev/fb1")
    os.putenv("SDL_MOUSEDRV","TSLIB")
    os.putenv("SDL_MOUSEDEV","/dev/input/touchscreen")

BLACK = (0 , 0, 0)
WHITE = (255, 255, 255)
EXIT_POS = (260, 220)
CENTER_POS = (160, 120)


def quit_it(channel):
    subprocess.call(["sudo", "shutdown", "-h", "now"])
    # sys.exit(0)


class Monitor:

    def __init__(self):
        pygame.init()
        self.display = pygame.display.set_mode( (320,240) )
        pygame.mouse.set_visible(not isOnTFT)
        self.direction = ""
        self.distance = 0
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(27, GPIO.IN, pull_up_down=GPIO.PUD_UP)
        GPIO.add_event_detect(27, GPIO.FALLING, callback=quit_it)

        
    def set_direction(self, L_R):
        self.direction = L_R
        return self


    def set_distance(self, feet):
        self.distance = feet
        return self
    

    def _add_text(self, text, position, size, color):
        font = pygame.font.Font(None, size)
        text_surf = font.render(text, True, color)
        rect = text_surf.get_rect(center=position)
        return (text_surf, rect)


    def update_screen(self, color=(255,255,255), size=20, message=None):
        # Add text and the exit button
        if message == None:
            text=["Turning {} in {} feet".format(self.direction, self.distance), "Shutdown"]
        else:
            text=[message, "Shutdown"]

        position=[CENTER_POS,EXIT_POS]
        txt_pos=[(text[0], position[0]),(text[1], position[1])]
        self.display.fill(BLACK)
        
        display_objects=[]
        for txt,pos in txt_pos:
            text_surf,rect = self._add_text(txt, pos, size, color)
            self.display.blit(text_surf, rect)
        pygame.display.flip()


    def _inside_quit(self, tl, br, x, y):
        return tl[0] <= x and x<=br[0] and tl[1] <= y and y<= br[1]


    def check_screen(self):
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                sys.exit()
            if event.type is MOUSEBUTTONUP:
                x,y = pygame.mouse.get_pos()
                print("Touch: {},{}".format(x, y))
                tl=(EXIT_POS[0] - 80, EXIT_POS[1] - 20)
                br = (320, 240)
                return not self._inside_quit(tl, br, x, y)
        return True


if __name__ == "__main__":
    # Initialize oygame
    pygame.init()
    screen = pygame.display.set_mode((320, 240))
    pygame.mouse.set_visible(not isOnTFT)
    
    
    # Create Monitor instance
    nav = Monitor(screen)
    
    nav.set_direction("-")
    nav.set_distance(0)
    nav.update_screen(WHITE,30,"Welcome")

    #GPIO set up
    GPIO.setmode(GPIO.BCM)
    GPIO.setup(27, GPIO.IN, pull_up_down=GPIO.PUD_UP)
    GPIO.add_event_detect(27, GPIO.FALLING, callback = quit)

    running = True
    while running:
        running = nav.check_screen()
        with open(path) as fifo:
            myline = fifo.readline()
            print("myline: {}".format(myline))
            if str(myline.strip()) == "Quit":
                running = False
            else:
                myline = myline.split(",")
                direction = myline[0]
                if direction == "A":
                    nav.update_screen(WHITE,30,"Arrived")
                else:
                    nav.set_direction(direction)
                    nav.set_distance(float(myline[1]))
                    nav.update_screen(WHITE,20)
        time.sleep(.02)
    GPIO.cleanup()
    f = open("shutdown","w+")
    #execute("Quit", PATH_TO_FIFO)

motor.py

import RPi.GPIO as GPIO

from time import sleep

class Motor:
    """Controls the mini vibrating motor."""
    avg_bike_rate = 15 # ft/sec
    vibration_levels = 4

    def __init__(self, gpio_pin, time_delta):
        self.pin = gpio_pin
        self.freq = 1
        self.time_delta = time_delta
        self._setup()
        self.duty_cycle = 50

    
    def _setup(self):
        """Setup PWM connection to motor"""
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(self.pin, GPIO.OUT)
        self.pwm = GPIO.PWM(self.pin, self.freq)
        self.pwm.start(1)


    def vibrate(self, dist):
        """Adjust the vibration based on the user's distance from next turn.
        
        TODO: Consider using a loop and making this method more general.
        
        The values chosen for the conditional checks are rather arbitrary.
        """
        bound = self.time_delta * Motor.avg_bike_rate
        duty_cycle = 0
        if dist < bound:        duty_cycle = 90
        elif dist <= 3 * bound: duty_cycle = 70
        elif dist <= 6 * bound: duty_cycle = 40
        elif dist <= 9 * bound: duty_cycle = 20
        else:                   duty_cycle = 0
        self.pwm.ChangeDutyCycle(duty_cycle)
        sleep(self.time_delta)
        self.stop_vibrating()
        return self


    def stop_vibrating(self):
        self.pwm.ChangeDutyCycle(0)
        return self


    def stop(self):
        self.pwm.stop()


    def change_freq(freq):
        """Change the motors frequency."""
        self.pwm.ChangeFrequency(freq)
        return self