"""A collection of websocket routes"""
import base64
from flask_socketio import SocketIO, emit
from adafruit_mpu6050 import MPU6050
from adafruit_lsm9ds1 import LSM9DS1_I2C
from .inputs.check_platform import ON_WINDOWS
from .inputs.config import d_train, IMUs, gps, nav
from .inputs.imu import MAG3110, calc_heading, calc_yaw_pitch_roll
from .inputs.camera_manager import CameraManager
from .utils.virtual_terminal import VTerminal
from .utils.super_logger import logger
socketio = SocketIO(logger=False, engineio_logger=False, async_mode='eventlet')
# for virtual terminal access
if not ON_WINDOWS:
vterm = VTerminal(socketio)
vterm.register_output_listener(
lambda output: socketio.emit("terminal-output", {"output": output}, namespace="/pty")
)
# Initialize the camera
camera_manager = CameraManager()
[docs]def getHYPR():
"""
This function will try to determine the robot's Heading, Yaw, Pitch, & Roll (HYPR)
dependent on the specific data returned from the IMU device connected to the robot.
"""
heading = []
yaw = 0
pitch = 0
roll = 0
for imu in IMUs:
if type(imu, LSM9DS1_I2C):
heading.append(calc_heading(imu.magnetic))
yaw, pitch, roll = calc_yaw_pitch_roll(imu.acceleration, imu.gyro)
elif type(imu, MAG3110):
heading.append(imu.get_heading())
if not heading:
heading.append(0)
logger.debug('sensor data', f'Heading: {heading[0]}, Yaw: {yaw}, Pitch: {pitch}, Roll: {roll}')
return [heading[0], yaw, pitch, roll]
[docs]def get_imu_data():
"""Returns a 2d array containing the following
* ``senses[0] = accel[x, y, z]`` for accelerometer data
* ``senses[1] = gyro[x, y, z]`` for gyroscope data
* ``senses[2] = mag[x, y, z]`` for magnetometer data
.. note:: Not all data may be aggregated depending on the IMU device connected to the robot.
"""
senses = [
[100, 50, 25],
[-100, -50, -25],
[100, -50, 25]
]
for imu in IMUs:
if isinstance(imu, LSM9DS1_I2C):
senses[0] = list(imu.acceleration)
senses[1] = list(imu.gyro)
senses[2] = list(imu.magnetic)
elif isinstance(imu, MPU6050):
senses[0] = list(imu.acceleration)
senses[1] = list(imu.gryo)
return senses
[docs]@socketio.on('connect')
def handle_connect():
"""This event fired when a websocket client establishes a connection to the server"""
logger.debug('web sockets', 'websocket Client connected!')
[docs]@socketio.on('disconnect')
def handle_disconnect():
"""This event fired when a websocket client breaks connection to the server"""
logger.debug('web sockets', 'websocket Client disconnected')
# If the camera was recently opened, then close it and reopen it to free the resource for
# future use. The reason for "rebooting" the camera is that the camera device will be
# considered "in use" until the corresponding resource is freed, for which we can re-initialize
# the camera resource again.
# if camera_manager.initialized:
# camera_manager.close_camera()
# camera_manager.open_camera()
# If the vterm was initialized and/or running, close file descriptor and kill child process
if not ON_WINDOWS:
if vterm.running or vterm.initialized:
vterm.cleanup()
[docs]@socketio.on('webcam-init')
def handle_webcam_init():
"""Initialize the camera when the user goes to the remote control page."""
if not camera_manager.initialized:
camera_manager.open_camera()
[docs]@socketio.on('webcam')
def handle_webcam_request():
"""This event is to stream the webcam over websockets."""
if camera_manager.initialized:
buffer = camera_manager.capture_image()
b64 = base64.b64encode(buffer)
emit('webcam-response', b64)
[docs]@socketio.on('webcam-cleanup')
def handle_webcam_cleanup():
"""Cleanup the camera when the user leaves the remote control page."""
if camera_manager.initialized:
camera_manager.close_camera()
[docs]@socketio.on('WaypointList')
def build_wapypoints(waypoints, clear):
"""Builds a list of waypoints based on the order they were created on
the 'automode.html' page
:param list waypoints: A list of GPS latitude & longitude pairs for the robot to
travel to in sequence.
:param bool clear: A flag that will clear the existing list of GPS waypoints before appending to
it.
"""
if nav is not None:
if clear:
nav.clear()
logger.debug('navigation', 'received waypoints')
for point in waypoints:
nav.insert(point)
nav.printWP()
[docs]@socketio.on('gps')
def handle_gps_request():
"""This event fired when a websocket client's response to the server about GPS coordinates."""
logger.debug('navigation', 'gps data sent')
NESW = (0, 0)
if gps:
gps[0].get_data()
NESW = (gps[0].lat, gps[0].lng)
else:
NESW = (37.967135, -122.071210)
emit('gps-response', [NESW[0], NESW[1]])
[docs]@socketio.on('sensorDoF')
def handle_DoF_request():
"""This event fired when a websocket client a response to the server about IMU
device's data."""
senses = get_imu_data()
emit('sensorDoF-response', senses)
logger.debug('sensor data', 'DoF sensor data sent')
[docs]@socketio.on('remoteOut')
def handle_remoteOut(args):
"""This event gets fired when the client sends data to the server about remote controls
(via remote control page) specific to the robot's drivetrain.
:param list args: The list of motor inputs received from the remote control page.
"""
logger.debug('navigation', f'remote = {repr(args)}')
if d_train: # if there is a drivetrain connected
d_train[0].go([args[0] * 655.35, args[1] * 655.35])
# NOTE: Source for virtual terminal functions: https://github.com/cs01/pyxterm.js
# virtual terminal handlers
[docs]@socketio.on("terminal-resize", namespace="/pty")
def on_terminal_resize(data):
"""This event is fired when a websocket clients' window gets resized."""
if not ON_WINDOWS:
vterm.resize_terminal(data["rows"], data["cols"])
[docs]@socketio.on("connect", namespace="/pty")
def on_terminal_connect():
"""This event is fired when a new client has connected to the server's terminal."""
if not ON_WINDOWS:
vterm.init_connect(["/bin/bash", "./webapp/bash_scripts/ask_pass_before_bash.sh"])