Skip to content
Snippets Groups Projects
Commit 15707985 authored by pintard's avatar pintard
Browse files

added backend code

parent 5edbfffd
No related branches found
No related tags found
No related merge requests found
Showing
with 1660 additions and 17 deletions
import time
import ros_robot_controller_sdk as rrc
print('''
**********************************************************
********功能:幻尔科技树莓派扩展板,蜂鸣器控制例程(function: Hiwonder Raspberry Pi expansion board, buzzer control routine)*********
**********************************************************
----------------------------------------------------------
Official website:https://www.hiwonder.com
Online mall:https://hiwonder.tmall.com
----------------------------------------------------------
Tips:
* 按下Ctrl+C可关闭此次程序运行,若失败请多次尝试!(press Ctrl+C to close the running program, please try multiple times if failed)
----------------------------------------------------------
''')
board = rrc.Board()
board.set_buzzer(1900, 0.1, 0.9, 1) # 以1900Hz的频率,持续响0.1秒,关闭0.9秒,重复1次(at a frequency of 1900Hz, continuously sound for 0.1 seconds, then silence for 0.9 seconds, repeat once)
time.sleep(2)
board.set_buzzer(1000, 0.5, 0.5, 0) # 以1000Hz的频率,持续响0.5秒,关闭0.5秒,一直重复(at a frequency of 1000Hz, continuously sound for 0.5 seconds, then silence for 0.5 seconds, repeat once)
time.sleep(3)
board.set_buzzer(1000, 0.0, 0.0, 1) # 关闭(close)
#!/usr/bin/python3
# coding=utf8
import time
import smbus
#四路巡线传感器使用例程(4-channel line patrol sensor usage routine)
class FourInfrared:
def __init__(self, address=0x78, bus=1):
self.address = address
self.bus = smbus.SMBus(bus)
def readData(self, register=0x01):
value = self.bus.read_byte_data(self.address, register)
return [True if value & v > 0 else False for v in [0x01, 0x02, 0x04, 0x08]]
if __name__ == "__main__":
line = FourInfrared()
while True:
data = line.readData()
#True表示识别到黑线,False表示没有识别到黑线(True means recognize the black line, False means does not recognize the black line)
print("Sensor1:", data[0], " Sensor2:", data[1], " Sensor3:", data[2], " Sensor4:", data[3])
time.sleep(0.5)
#!/usr/bin/env python3
def map(x, in_min, in_max, out_min, out_max):
return (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
def emptyFunc(img = None):
return img
def setRange(x, x_min, x_max):
tmp = x if x > x_min else x_min
tmp = tmp if tmp < x_max else x_max
return tmp
#!/usr/bin/python3
# coding=utf8
import sys
sys.path.append('/home/pi/TurboPi/')
import time
import signal
import ros_robot_controller_sdk as rrc
if sys.version_info.major == 2:
print('Please run this program with python3!')
sys.exit(0)
print('''
**********************************************************
********功能:幻尔科技树莓派扩展板,电机控制例程(function: Hiwonder Raspberry Pi expansion board, motor control routine)**********
**********************************************************
----------------------------------------------------------
Official website:https://www.hiwonder.com
Online mall:https://hiwonder.tmall.com
----------------------------------------------------------
Tips:
* 按下Ctrl+C可关闭此次程序运行,若失败请多次尝试!(press Ctrl+C to close the running program, please try multiple times if failed)
----------------------------------------------------------
''')
board = rrc.Board()
start = True
#关闭前处理(process program before closing)
def Stop(signum, frame):
global start
start = False
print('关闭中...')
board.set_motor_duty([[1, 0], [2, 0], [3, 0], [4, 0]]) # 关闭所有电机(close all motors)
signal.signal(signal.SIGINT, Stop)
if __name__ == '__main__':
while True:
board.set_motor_duty([[1, 35]]) #设置1号电机速度35(set motor 1 speed to 35)
time.sleep(0.2)
board.set_motor_duty([[1, 90]]) #设置1号电机速度90(set motor 1 speed to 90)
time.sleep(0.2)
if not start:
board.set_motor_duty([[1, 0], [2, 0], [3, 0], [4, 0]]) # 关闭所有电机(close all motors)
print('已关闭')
break
"""Ivmech PID Controller is simple implementation of a Proportional-Integral-Derivative (PID) Controller in the Python Programming Language.
More information about PID Controller: http://en.wikipedia.org/wiki/PID_controller
"""
import sys
import time
if sys.version_info.major == 2:
print('Please run this program with python3!')
sys.exit(0)
class PID:
"""PID Controller
"""
def __init__(self, P=0.2, I=0.0, D=0.0):
self.Kp = P
self.Ki = I
self.Kd = D
self.sample_time = 0.00
self.current_time = time.time()
self.last_time = self.current_time
self.clear()
def clear(self):
"""Clears PID computations and coefficients"""
self.SetPoint = 0.0
self.PTerm = 0.0
self.ITerm = 0.0
self.DTerm = 0.0
self.last_error = 0.0
# Windup Guard
self.int_error = 0.0
self.windup_guard = 20.0
self.output = 0.0
def update(self, feedback_value):
"""Calculates PID value for given reference feedback
.. math::
u(t) = K_p e(t) + K_i \int_{0}^{t} e(t)dt + K_d {de}/{dt}
.. figure:: images/pid_1.png
:align: center
Test PID with Kp=1.2, Ki=1, Kd=0.001 (test_pid.py)
"""
error = self.SetPoint - feedback_value
self.current_time = time.time()
delta_time = self.current_time - self.last_time
delta_error = error - self.last_error
if (delta_time >= self.sample_time):
self.PTerm = self.Kp * error
self.ITerm += error * delta_time
if (self.ITerm < -self.windup_guard):
self.ITerm = -self.windup_guard
elif (self.ITerm > self.windup_guard):
self.ITerm = self.windup_guard
self.DTerm = 0.0
if delta_time > 0:
self.DTerm = delta_error / delta_time
# Remember last time and last error for next calculation
self.last_time = self.current_time
self.last_error = error
self.output = self.PTerm + (self.Ki * self.ITerm) + (self.Kd * self.DTerm)
def setKp(self, proportional_gain):
"""Determines how aggressively the PID reacts to the current error with setting Proportional Gain"""
self.Kp = proportional_gain
def setKi(self, integral_gain):
"""Determines how aggressively the PID reacts to the current error with setting Integral Gain"""
self.Ki = integral_gain
def setKd(self, derivative_gain):
"""Determines how aggressively the PID reacts to the current error with setting Derivative Gain"""
self.Kd = derivative_gain
def setWindup(self, windup):
"""Integral windup, also known as integrator windup or reset windup,
refers to the situation in a PID feedback controller where
a large change in setpoint occurs (say a positive change)
and the integral terms accumulates a significant error
during the rise (windup), thus overshooting and continuing
to increase as this accumulated error is unwound
(offset by errors in the other direction).
The specific problem is the excess overshooting.
"""
self.windup_guard = windup
def setSampleTime(self, sample_time):
"""PID that should be updated at a regular interval.
Based on a pre-determined sampe time, the PID decides if it should compute or return immediately.
"""
self.sample_time = sample_time
if __name__ == '__main__':
x_pid = PID(P=0.2, I=0, D=0)
x_pid.SetPoint = 5
x_pid.update(10)
out = x_pid.output
print (out)
#!/usr/bin/python3
# coding=utf8
import sys
sys.path.append('/home/pi/TurboPi/')
import time
import signal
import threading
import ros_robot_controller_sdk as rrc
if sys.version_info.major == 2:
print('Please run this program with python3!')
sys.exit(0)
print('''
**********************************************************
********功能:幻尔科技树莓派扩展板,PWM舵机控制例程(function: Hiwonder Raspberry Pi expansion board, PWM servo control routine)**********
**********************************************************
----------------------------------------------------------
Official website:https://www.hiwonder.com
Online mall:https://hiwonder.tmall.com
----------------------------------------------------------
Tips:
* 按下Ctrl+C可关闭此次程序运行,若失败请多次尝试!(press Ctrl+C to close the running program, please try multiple times if failed)
----------------------------------------------------------
''')
board = rrc.Board()
start = True
#关闭前处理(process program before closing)
def Stop(signum, frame):
global start
start = False
print('关闭中...')
signal.signal(signal.SIGINT, Stop)
if __name__ == '__main__':
while True:
board.pwm_servo_set_position(1, [[1, 1100]]) # 设置1号舵机脉宽为2500,运行时间为1000毫秒(set servo 1 pulse width to 2500, running time to 1000 milliseconds)
time.sleep(1)
board.pwm_servo_set_position(1, [[1, 1500]]) # 设置1号舵机脉宽为1500,运行时间为1000毫秒(set servo 1 pulse width to 1500, running time to 1000 milliseconds)
time.sleep(1)
board.pwm_servo_set_position(1, [[1, 1900], [2, 1000]]) #设置1号舵机脉宽为1000,设置2号舵机脉宽为1000, 运行时间为1000毫秒(set servo 1 pulse width to 1000, set servo 2 pulse width to 1000)
time.sleep(1)
board.pwm_servo_set_position(1, [[1, 1500], [2, 2000]]) #设置1号舵机脉宽为2000,设置2号舵机脉宽为2000, 运行时间为1000毫秒(set servo 1 pulse width to 2000, set servo 2 pulse width to 2000, running time to 1000 milliseconds)
time.sleep(1)
if not start:
board.pwm_servo_set_position(1, [[1, 1500], [2, 1500]]) # 设置1号舵机脉宽为1500,设置2号舵机脉宽为1500, 运行时间为1000毫秒(set servo 1 pulse width to 1500, set servo 2 pulse width to 1500, running time to 1000 milliseconds)
time.sleep(1)
print('已关闭')
break
import time
import signal
import ros_robot_controller_sdk as rrc
print('''
**********************************************************
********功能:幻尔科技树莓派扩展板,RGB灯控制例程(function: Hiwonder Raspberry Pi expansion board, RGB light control routine)**********
**********************************************************
----------------------------------------------------------
Official website:https://www.hiwonder.com
Online mall:https://hiwonder.tmall.com
----------------------------------------------------------
Tips:
* 按下Ctrl+C可关闭此次程序运行,若失败请多次尝试!(press Ctrl+C to close the running program, please try multiple times if failed)
----------------------------------------------------------
''')
start = True
#关闭前处理(process program before closing)
def Stop(signum, frame):
global start
start = False
print('关闭中...')
board = rrc.Board()
#先将所有灯关闭(turn all lights off firstly)
board.set_rgb([[1, 0, 0, 0], [2, 0, 0, 0]])
signal.signal(signal.SIGINT, Stop)
while True:
#设置2个灯为红色(set two lights to red)
board.set_rgb([[1, 255, 0, 0], [2, 255, 0, 0]])
time.sleep(1)
#设置2个灯为绿色(set two lights to green)
board.set_rgb([[1, 0, 255, 0], [2, 0, 255, 0]])
time.sleep(1)
#设置2个灯为蓝色(set two lights to blue)
board.set_rgb([[1, 0, 0, 255], [2, 0, 0, 255]])
time.sleep(1)
#设置2个灯为黄色(set two lights to yellow)
board.set_rgb([[1, 255, 255, 0], [2, 255, 255, 0]])
time.sleep(1)
if not start:
#所有灯关闭(turn all lights off)
board.set_rgb([[1, 0, 0, 0], [2, 0, 0, 0]])
print('已关闭')
break
import sys
import time
from smbus2 import SMBus, i2c_msg
# 幻尔科技iic超声波库(Hiwonder I2C ultrasonic library)
if sys.version_info.major == 2:
print('Please run this program with python3!')
sys.exit(0)
class Sonar:
__units = {"mm":0, "cm":1}
__dist_reg = 0
__RGB_MODE = 2
__RGB1_R = 3
__RGB1_G = 4
__RGB1_B = 5
__RGB2_R = 6
__RGB2_G = 7
__RGB2_B = 8
__RGB1_R_BREATHING_CYCLE = 9
__RGB1_G_BREATHING_CYCLE = 10
__RGB1_B_BREATHING_CYCLE = 11
__RGB2_R_BREATHING_CYCLE = 12
__RGB2_G_BREATHING_CYCLE = 13
__RGB2_B_BREATHING_CYCLE = 14
def __init__(self):
self.i2c_addr = 0x77
self.i2c = 1
self.Pixels = [0,0]
self.RGBMode = 0
def __getattr(self, attr):
if attr in self.__units:
return self.__units[attr]
if attr == "Distance":
return self.getDistance()
else:
raise AttributeError('Unknow attribute : %s'%attr)
def setRGBMode(self, mode):
try:
with SMBus(self.i2c) as bus:
bus.write_byte_data(self.i2c_addr, self.__RGB_MODE, mode)
except BaseException as e:
print(e)
def show(self): #占位,与扩展板RGB保持调用一致(occupied, consistent with the call to the expansion board RGB)
pass
def numPixels(self):
return 2
def setPixelColor(self, index, rgb):
color = (rgb[0] << 16) | (rgb[1] << 8) | rgb[2]
try:
if index != 0 and index != 1:
return
start_reg = 3 if index == 0 else 6
with SMBus(self.i2c) as bus:
bus.write_byte_data(self.i2c_addr, start_reg, 0xFF & (color >> 16))
bus.write_byte_data(self.i2c_addr, start_reg+1, 0xFF & (color >> 8))
bus.write_byte_data(self.i2c_addr, start_reg+2, 0xFF & color)
self.Pixels[index] = color
except BaseException as e:
print(e)
def getPixelColor(self, index):
if index != 0 and index != 1:
raise ValueError("Invalid pixel index", index)
return ((self.Pixels[index] >> 16) & 0xFF,
(self.Pixels[index] >> 8) & 0xFF,
self.Pixels[index] & 0xFF)
def setBreathCycle(self, index, rgb, cycle):
try:
if index != 0 and index != 1:
return
if rgb < 0 or rgb > 2:
return
start_reg = 9 if index == 0 else 12
cycle = int(cycle / 100)
with SMBus(self.i2c) as bus:
bus.write_byte_data(self.i2c_addr, start_reg + rgb, cycle)
except BaseException as e:
print(e)
def startSymphony(self):
self.setRGBMode(1)
self.setBreathCycle(1,0, 2000)
self.setBreathCycle(1,1, 3300)
self.setBreathCycle(1,2, 4700)
self.setBreathCycle(2,0, 4600)
self.setBreathCycle(2,1, 2000)
self.setBreathCycle(2,2, 3400)
def getDistance(self):
dist = 99999
try:
with SMBus(self.i2c) as bus:
msg = i2c_msg.write(self.i2c_addr, [0,])
bus.i2c_rdwr(msg)
read = i2c_msg.read(self.i2c_addr, 2)
bus.i2c_rdwr(read)
dist = int.from_bytes(bytes(list(read)), byteorder='little', signed=False)
if dist > 5000:
dist = 5000
except BaseException as e:
print(e)
return dist
if __name__ == '__main__':
s = Sonar()
s.setRGBMode(0)
s.setPixelColor(0, (0, 0, 0))
s.setPixelColor(1, (0, 0, 0))
s.show()
time.sleep(0.1)
s.setPixelColor(0, (255, 0, 0))
s.setPixelColor(1, (255, 0, 0))
s.show()
time.sleep(1)
s.setPixelColor(0, (0, 255, 0))
s.setPixelColor(1, (0, 255, 0))
s.show()
time.sleep(1)
s.setPixelColor(0, (0, 0, 255))
s.setPixelColor(1, (0, 0, 255))
s.show()
time.sleep(1)
s.startSymphony()
while True:
time.sleep(0.1)
print(s.getDistance())
#!/usr/bin/python3
# coding=utf8
import sys
import time
sys.path.append('/home/pi/TurboPi/')
import HiwonderSDK.ros_robot_controller_sdk as rrc
if sys.version_info.major == 2:
print('Please run this program with python3!')
sys.exit(0)
print('''
**********************************************************
********************PWM舵机和电机测试(PWM servo and motor test)************************
**********************************************************
----------------------------------------------------------
Official website:https://www.hiwonder.com
Online mall:https://hiwonder.tmall.com
----------------------------------------------------------
Tips:
* 按下Ctrl+C可关闭此次程序运行,若失败请多次尝试!(press Ctrl+C to close the running program, please try multiple times if failed)
----------------------------------------------------------
''')
board = rrc.Board()
board.pwm_servo_set_position(0.3, [[1, 1800]])
time.sleep(0.3)
board.pwm_servo_set_position(0.3, [[1, 1500]])
time.sleep(0.3)
board.pwm_servo_set_position(0.3, [[1, 1200]])
time.sleep(0.3)
board.pwm_servo_set_position(0.3, [[1, 1500]])
time.sleep(1.5)
board.pwm_servo_set_position(0.3, [[2, 1200]])
time.sleep(0.3)
board.pwm_servo_set_position(0.3, [[2, 1500]])
time.sleep(0.3)
board.pwm_servo_set_position(0.3, [[2, 1800]])
time.sleep(0.3)
board.pwm_servo_set_position(0.3, [[2, 1500]])
time.sleep(1.5)
board.set_motor_duty([[1, -45]])
time.sleep(0.5)
board.set_motor_duty([[1, 0]])
time.sleep(1)
board.set_motor_duty([[2, 45]])
time.sleep(0.5)
board.set_motor_duty([[2, 0]])
time.sleep(1)
board.set_motor_duty([[3, -45]])
time.sleep(0.5)
board.set_motor_duty([[3, 0]])
time.sleep(1)
board.set_motor_duty([[4, 45]])
time.sleep(0.5)
board.set_motor_duty([[4, 0]])
time.sleep(1)
#!/usr/bin/env python3
# encoding:utf-8
import time
import gpiod
try:
key1_pin = 13
chip = gpiod.Chip("gpiochip4")
key1 = chip.get_line(key1_pin)
key1.request(consumer="key1", type=gpiod.LINE_REQ_DIR_IN, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
key2_pin = 23
key2 = chip.get_line(key2_pin)
key2.request(consumer="key2", type=gpiod.LINE_REQ_DIR_IN, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
while True:
print('\rkey1: {} key2: {}'.format(key1.get_value(), key2.get_value()), end='', flush=True) # 打印key状态(press key state)
time.sleep(0.001)
chip.close()
except:
print('按键默认被hw_button_scan占用,需要先关闭服务')
print('sudo systemctl stop hw_button_scan.service')
#!/usr/bin/env python3
# encoding:utf-8
import time
import gpiod
try:
print('led闪烁 0.1/s')
led1_pin = 16 # 蓝色led(blue led)
led2_pin = 26
chip = gpiod.Chip('gpiochip4')
led1 = chip.get_line(led1_pin)
led1.request(consumer="led1", type=gpiod.LINE_REQ_DIR_OUT)
led2 = chip.get_line(led2_pin)
led2.request(consumer="led2", type=gpiod.LINE_REQ_DIR_OUT)
while True:
led1.set_value(0)
led2.set_value(0)
time.sleep(0.1)
led1.set_value(1)
led2.set_value(1)
time.sleep(0.1)
except:
print('led默认被hw_wifi占用,需要自行注释掉相关代码')
print('然后重启服务sudo systemctl restart hw_wifi.service')
#!/usr/bin/python3
# coding=utf8
import sys
sys.path.append('/home/pi/TurboPi/')
import math
import HiwonderSDK.ros_robot_controller_sdk as rrc
board = rrc.Board()
class MecanumChassis:
# A = 67 # mm
# B = 59 # mm
# WHEEL_DIAMETER = 65 # mm
def __init__(self, a=67, b=59, wheel_diameter=65):
self.a = a
self.b = b
self.wheel_diameter = wheel_diameter
self.velocity = 0
self.direction = 0
self.angular_rate = 0
def reset_motors(self):
board.set_motor_duty([[1, 0], [2, 0], [3, 0], [4, 0]])
self.velocity = 0
self.direction = 0
self.angular_rate = 0
def set_velocity(self, velocity, direction, angular_rate, fake=False):
"""
Use polar coordinates to control moving
motor1 v1| ↑ |v2 motor2
| |
motor3 v3| |v4 motor4
:param velocity: mm/s
:param direction: Moving direction 0~360deg, 180deg<--- ↑ ---> 0deg
:param angular_rate: The speed at which the chassis rotates
:param fake:
:return:
"""
rad_per_deg = math.pi / 180
vx = velocity * math.cos(direction * rad_per_deg)
vy = velocity * math.sin(direction * rad_per_deg)
vp = -angular_rate * (self.a + self.b)
v1 = int(vy + vx - vp)
v2 = int(vy - vx + vp)
v3 = int(vy - vx - vp)
v4 = int(vy + vx + vp)
if fake:
return
board.set_motor_duty([[1, -v1], [2, v2], [3, -v3], [4, v4]])
self.velocity = velocity
self.direction = direction
self.angular_rate = angular_rate
def translation(self, velocity_x, velocity_y, fake=False):
velocity = math.sqrt(velocity_x ** 2 + velocity_y ** 2)
if velocity_x == 0:
direction = 90 if velocity_y >= 0 else 270 # pi/2 90deg, (pi * 3) / 2 270deg
else:
if velocity_y == 0:
direction = 0 if velocity_x > 0 else 180
else:
direction = math.atan(velocity_y / velocity_x) # θ=arctan(y/x) (x!=0)
direction = direction * 180 / math.pi
if velocity_x < 0:
direction += 180
else:
if velocity_y < 0:
direction += 360
if fake:
return velocity, direction
else:
return self.set_velocity(velocity, direction, 0)
This diff is collapsed.
import sys
sys.path.append("/home/pi/TurboPi/")
import threading
import time
import cv2
import serial
import HiwonderSDK.mecanum as mecanum
import HiwonderSDK.ros_robot_controller_sdk as rrc
import Camera
state = {"battery": "N/A", "temperature": "N/A", "current_command": "stopped"}
SPEED = 50
ROTATION_SPEED = 0.2
car = mecanum.MecanumChassis()
board = None
camera = Camera.Camera()
sensor_lock = threading.Lock()
board_lock = threading.Lock()
def initialize_board():
global board
with board_lock:
try:
if board and board.port.is_open:
print("🔌 Closing existing serial port...")
board.port.close()
time.sleep(1)
board = rrc.Board()
if not board.port.is_open:
board.port.open()
board.enable_reception()
print("✅ Board initialized successfully")
except serial.SerialException as e:
print(f"❌ Serial error during board initialization: {e}")
board = None
time.sleep(1)
except Exception as e:
print(f"❌ Failed to initialize board: {e}")
board = None
def update_sensors():
global board
while True:
with sensor_lock:
if not board:
initialize_board()
batt = None
retries = 5
while retries > 0 and batt is None:
try:
with board_lock: # <-- Lock serial port
if board.port.is_open:
batt = board.get_battery()
except serial.SerialException as e:
print(f"❌ Serial exception: {e}")
initialize_board()
time.sleep(0.5)
break
except Exception as e:
print(f"❌ Battery sensor error: {e}")
time.sleep(0.5)
retries -= 1
time.sleep(0.05)
if batt is not None:
voltage = batt / 1000.0
state["battery"] = f"{voltage:.1f}V"
else:
state["battery"] = "N/A"
try:
with open("/sys/class/thermal/thermal_zone0/temp", "r") as f:
temp_str = f.read().strip()
temp_c = int(temp_str) / 1000.0
state["temperature"] = f"{temp_c:.1f}°C"
except Exception as e:
print(f"❌ Temperature sensor error: {e}")
state["temperature"] = "N/A"
time.sleep(5)
def stop_robot() -> None:
car.set_velocity(0, 0, 0)
state["current_command"] = "stopped"
print("🛑 Robot stopped.")
def move(direction: str) -> None:
if direction == "forward":
car.set_velocity(SPEED, 90, 0)
elif direction == "backward":
car.set_velocity(SPEED, 270, 0)
elif direction == "left":
car.set_velocity(SPEED, 0, 0)
elif direction == "right":
car.set_velocity(SPEED, 180, 0)
elif direction == "rotate_left":
car.set_velocity(0, 90, -ROTATION_SPEED)
elif direction == "rotate_right":
car.set_velocity(0, 90, ROTATION_SPEED)
else:
stop_robot()
return
state["current_command"] = direction
print(f"🚗 Moving {direction}")
def generate_frames():
while True:
frame = camera.frame
if frame is not None:
ret, buffer = cv2.imencode(".jpg", frame)
if ret:
yield (
b"--frame\r\n"
b"Content-Type: image/jpeg\r\n\r\n" + buffer.tobytes() + b"\r\n"
)
time.sleep(0.05)
initialize_board()
import threading
import time
from flask import Flask, Response
from flask_cors import CORS
from flask_socketio import SocketIO, emit
import serial
from robot import (
initialize_board,
move,
generate_frames,
camera,
update_sensors,
state,
)
app = Flask(__name__)
CORS(app)
app.config["SECRET_KEY"] = "secret!"
socketio = SocketIO(app, cors_allowed_origins="*", async_mode="threading")
sensor_lock = threading.Lock()
state["bluetooth_connected"] = False
def get_status_update():
return {
"command": state["current_command"],
"sender": "server",
"battery": state["battery"],
"temperature": state["temperature"],
"bluetooth_connected": state["bluetooth_connected"],
}
@app.route("/")
def index():
return "Try /video_feed or connect via WebSocket."
@app.route("/video_feed")
def video_feed():
return Response(
generate_frames(), mimetype="multipart/x-mixed-replace; boundary=frame"
)
@socketio.on("bluetooth_status")
def handle_bluetooth_status(data):
state["bluetooth_connected"] = data.get("connected", False)
print(f"Bluetooth connected: {state['bluetooth_connected']}")
emit("status_update", get_status_update(), broadcast=True)
@socketio.on("connect")
def on_connect():
print("Client connected")
emit("status_update", get_status_update())
@socketio.on("disconnect")
def on_disconnect():
print("Bluetooth server disconnected")
state["bluetooth_connected"] = False
emit("status_update", get_status_update(), broadcast=True)
@socketio.on("move")
def on_move(data):
command = data.get("command")
if command:
move(command)
emit("status_update", get_status_update(), broadcast=True)
def emit_sensor_update_loop():
while True:
try:
print(f"Battery: {state['battery']} | Temperature: {state['temperature']}")
socketio.emit("status_update", get_status_update())
socketio.sleep(5)
except serial.SerialException as e:
print(f"❌ Serial error during sensor update: {e}")
initialize_board()
time.sleep(1)
except Exception as e:
print(f"❌ Sensor update error: {e}")
time.sleep(1)
if __name__ == "__main__":
threading.Thread(target=update_sensors, daemon=True).start()
threading.Thread(
target=lambda: camera.camera_open(correction=True), daemon=True
).start()
socketio.start_background_task(emit_sensor_update_loop)
print("Starting Flask server on 192.168.149.1:5000")
socketio.run(
app,
host="0.0.0.0",
port=5000,
debug=False,
use_reloader=False,
allow_unsafe_werkzeug=True,
)
import sys
sys.path.append("/home/pi/TurboPi/")
import asyncio
from evdev import InputDevice, ecodes, list_devices
from socketio import Client
import lab2.backend.robot as robot
sio = Client()
connected = False
SERVER_URL = "http://192.168.149.1:5000"
GAMEPAD_DEVICE = "/dev/input/event8"
async def connect_to_server():
retries = 5
delay = 2
for attempt in range(retries):
try:
print(
f"🔌 Attempting to connect to Flask server... (try {attempt + 1}/{retries})"
)
sio.connect(SERVER_URL)
print("✅ Connected to Flask server.")
return
except Exception as e:
print(f"❌ Connection attempt {attempt + 1} failed: {e}")
if attempt < retries - 1:
await asyncio.sleep(delay)
else:
print(
"🚨 Could not connect to Flask server. Please check if Flask is running."
)
return
async def handle_input(event):
command = None
if event.type == ecodes.EV_KEY:
with robot.board_lock:
if event.code == ecodes.BTN_SOUTH and event.value == 1: # A Button
command = "forward"
robot.move(command)
elif event.code == ecodes.BTN_EAST and event.value == 1: # B Button
command = "backward"
robot.move(command)
elif event.code == ecodes.BTN_WEST and event.value == 1: # X Button
command = "rotate_right"
robot.move(command)
elif event.code == ecodes.BTN_NORTH and event.value == 1: # Y Button
command = "rotate_left"
robot.move(command)
elif event.code == ecodes.BTN_TL and event.value == 1: # Left bumper
command = "left"
robot.move(command)
elif event.code == ecodes.BTN_TR and event.value == 1: # Right bumper
command = "right"
robot.move(command)
elif event.value == 0:
command = "stop"
robot.stop_robot()
if command:
await broadcast_state(command)
async def broadcast_state(command):
global connected
if sio.connected:
print(f"Broadcasting state: {command or 'no command'}")
sio.emit(
"bluetooth_status",
{
"connected": connected,
"command": command or robot.state["current_command"],
"battery": robot.state["battery"],
"temperature": robot.state["temperature"],
},
)
async def listen_controller():
global connected
try:
gamepad = InputDevice(GAMEPAD_DEVICE)
connected = True
print(f"🎮 Connected to {gamepad.name}")
await broadcast_state("connected")
async for event in gamepad.async_read_loop():
await handle_input(event)
except OSError as e:
print(f"❌ Controller connection lost: {e}")
connected = False
await broadcast_state("disconnected")
await asyncio.sleep(2)
await try_reconnect_controller()
async def try_reconnect_controller():
global connected
while not connected:
try:
devices = list_devices()
if GAMEPAD_DEVICE in devices:
print("✅ Reconnecting to controller...")
await listen_controller()
else:
print("🎮 Waiting for controller to reconnect...")
await asyncio.sleep(2)
except Exception as e:
print(f"❌ Error while reconnecting: {e}")
await asyncio.sleep(2)
async def main():
await connect_to_server()
print("Starting Bluetooth server...")
await try_reconnect_controller()
if __name__ == "__main__":
asyncio.run(main())
...@@ -91,7 +91,7 @@ export default function CarController(): JSX.Element { ...@@ -91,7 +91,7 @@ export default function CarController(): JSX.Element {
</div> </div>
<div className="flex-1 flex flex-row items-center overflow-hidden p-4 pt-0 gap-5"> <div className="flex-1 flex flex-row items-center overflow-hidden p-4 pt-0 gap-5">
<LogViewer logs={logs} /> <LogViewer logs={logs} setLogs={setLogs} />
<DirectionalControls onMove={handleMove} /> <DirectionalControls onMove={handleMove} />
</div> </div>
</div> </div>
......
import { FaArrowUp, FaArrowDown, FaArrowLeft, FaArrowRight, FaUndo, FaRedo } from 'react-icons/fa' import {
FaArrowUp,
FaArrowDown,
FaArrowLeft,
FaArrowRight,
FaUndo,
FaRedo,
FaStop
} from 'react-icons/fa'
interface DirectionalControlsProps { interface DirectionalControlsProps {
onMove: (command: string) => void onMove: (command: string) => void
...@@ -18,9 +26,12 @@ export default function DirectionalControls({ onMove }: DirectionalControlsProps ...@@ -18,9 +26,12 @@ export default function DirectionalControls({ onMove }: DirectionalControlsProps
</defs> </defs>
</svg> </svg>
<div className="absolute w-48 h-48 bg-blue-200" style={{ clipPath: 'url(#myClippy)' }} /> <div
className="absolute w-48 h-48 bg-blue-200"
style={{ clipPath: 'url(#myClippy)', top: '10%' }}
/>
<div className="absolute bg-blue-200 p-4 rounded-full" style={{ top: '-15%', left: '-5%' }}> <div className="absolute bg-blue-200 p-4 rounded-full" style={{ top: '-14%', left: '-5%' }}>
<button <button
onClick={() => onMove('rotate_left')} onClick={() => onMove('rotate_left')}
className="relative z-10 w-10 h-10 bg-blue-400 text-white rounded-full flex items-center justify-center hover:bg-blue-600 active:scale-95 transition" className="relative z-10 w-10 h-10 bg-blue-400 text-white rounded-full flex items-center justify-center hover:bg-blue-600 active:scale-95 transition"
...@@ -29,7 +40,7 @@ export default function DirectionalControls({ onMove }: DirectionalControlsProps ...@@ -29,7 +40,7 @@ export default function DirectionalControls({ onMove }: DirectionalControlsProps
</button> </button>
</div> </div>
<div className="absolute bg-blue-200 p-4 rounded-full" style={{ top: '-15%', right: '-5%' }}> <div className="absolute bg-blue-200 p-4 rounded-full" style={{ top: '-14%', right: '-5%' }}>
<button <button
onClick={() => onMove('rotate_right')} onClick={() => onMove('rotate_right')}
className="relative z-10 w-10 h-10 bg-blue-400 text-white rounded-full flex items-center justify-center hover:bg-blue-600 active:scale-95 transition" className="relative z-10 w-10 h-10 bg-blue-400 text-white rounded-full flex items-center justify-center hover:bg-blue-600 active:scale-95 transition"
...@@ -40,15 +51,16 @@ export default function DirectionalControls({ onMove }: DirectionalControlsProps ...@@ -40,15 +51,16 @@ export default function DirectionalControls({ onMove }: DirectionalControlsProps
<button <button
onClick={() => onMove('stop')} onClick={() => onMove('stop')}
className="z-10 w-12 h-12 bg-red-600 text-white font-bold text-xs flex items-center justify-center hover:bg-red-700 transition rounded-full" className="absolute z-10 w-12 h-12 bg-red-500 text-white font-bold text-xs flex items-center justify-center hover:bg-red-600 transition rounded-full"
style={{ top: '48%', left: '50%', transform: 'translateX(-50%)' }}
> >
stop <FaStop className="text-lg" />
</button> </button>
<button <button
onClick={() => onMove('forward')} onClick={() => onMove('forward')}
className="absolute z-10 w-10 h-10 bg-blue-400 text-blue-100 rounded-full flex items-center justify-center hover:bg-blue-600 active:scale-95 transition" className="absolute z-10 w-10 h-10 bg-blue-400 text-blue-100 rounded-full flex items-center justify-center hover:bg-blue-600 active:scale-95 transition"
style={{ top: '8%', left: '50%', transform: 'translateX(-50%)' }} style={{ top: '18%', left: '50%', transform: 'translateX(-50%)' }}
> >
<FaArrowUp /> <FaArrowUp />
</button> </button>
...@@ -56,7 +68,7 @@ export default function DirectionalControls({ onMove }: DirectionalControlsProps ...@@ -56,7 +68,7 @@ export default function DirectionalControls({ onMove }: DirectionalControlsProps
<button <button
onClick={() => onMove('backward')} onClick={() => onMove('backward')}
className="absolute z-10 w-10 h-10 bg-blue-400 text-blue-100 rounded-full flex items-center justify-center hover:bg-blue-600 active:scale-95 transition" className="absolute z-10 w-10 h-10 bg-blue-400 text-blue-100 rounded-full flex items-center justify-center hover:bg-blue-600 active:scale-95 transition"
style={{ bottom: '8%', left: '50%', transform: 'translateX(-50%)' }} style={{ bottom: '-3%', left: '50%', transform: 'translateX(-50%)' }}
> >
<FaArrowDown /> <FaArrowDown />
</button> </button>
...@@ -64,7 +76,7 @@ export default function DirectionalControls({ onMove }: DirectionalControlsProps ...@@ -64,7 +76,7 @@ export default function DirectionalControls({ onMove }: DirectionalControlsProps
<button <button
onClick={() => onMove('left')} onClick={() => onMove('left')}
className="absolute z-10 w-10 h-10 bg-blue-400 text-blue-100 rounded-full flex items-center justify-center hover:bg-blue-600 active:scale-95 transition" className="absolute z-10 w-10 h-10 bg-blue-400 text-blue-100 rounded-full flex items-center justify-center hover:bg-blue-600 active:scale-95 transition"
style={{ left: '8%', top: '50%', transform: 'translateY(-50%)' }} style={{ left: '8%', top: '60%', transform: 'translateY(-50%)' }}
> >
<FaArrowLeft /> <FaArrowLeft />
</button> </button>
...@@ -72,7 +84,7 @@ export default function DirectionalControls({ onMove }: DirectionalControlsProps ...@@ -72,7 +84,7 @@ export default function DirectionalControls({ onMove }: DirectionalControlsProps
<button <button
onClick={() => onMove('right')} onClick={() => onMove('right')}
className="absolute z-10 w-10 h-10 bg-blue-400 text-blue-100 rounded-full flex items-center justify-center hover:bg-blue-600 active:scale-95 transition" className="absolute z-10 w-10 h-10 bg-blue-400 text-blue-100 rounded-full flex items-center justify-center hover:bg-blue-600 active:scale-95 transition"
style={{ right: '8%', top: '50%', transform: 'translateY(-50%)' }} style={{ right: '8%', top: '60%', transform: 'translateY(-50%)' }}
> >
<FaArrowRight /> <FaArrowRight />
</button> </button>
......
import { useEffect, useRef } from 'react' import { useEffect, useRef } from 'react'
import { LogEntry } from './CarController' import { LogEntry } from './CarController'
import { FaTrash } from 'react-icons/fa'
interface LogViewerProps { interface LogViewerProps {
logs: LogEntry[] logs: LogEntry[]
setLogs: (logs: LogEntry[]) => void
} }
export default function LogViewer({ logs }: LogViewerProps): JSX.Element { export default function LogViewer({ logs, setLogs }: LogViewerProps): JSX.Element {
const containerRef = useRef<HTMLDivElement>(null) const containerRef = useRef<HTMLDivElement>(null)
useEffect(() => { useEffect(() => {
...@@ -14,19 +16,24 @@ export default function LogViewer({ logs }: LogViewerProps): JSX.Element { ...@@ -14,19 +16,24 @@ export default function LogViewer({ logs }: LogViewerProps): JSX.Element {
} }
}, [logs]) }, [logs])
function deleteLogs(): void {
setLogs([])
}
return ( return (
<div className="flex-1 flex flex-col gap-2 h-full"> <div className="flex-1 flex flex-col gap-2 h-full">
<div className="bg-blue-200 border border-blue-500 p-2 pl-3 rounded-lg"> <div className="bg-blue-200 border border-blue-500 p-2 px-4 rounded-lg flex flex-row justify-between items-center">
<h2 className="font-bold text-blue-500">LOGS</h2> <h2 className="font-bold text-blue-500">LOGS</h2>
<FaTrash className="text-blue-500 hover:text-blue-600" onClick={deleteLogs} />
</div> </div>
<div className="flex-1 bg-blue-200 border border-blue-500 rounded-lg p-4 flex flex-col min-h-0"> <div className="flex-1 bg-blue-200 border border-blue-500 rounded-lg p-4 flex flex-col min-h-0">
<div ref={containerRef} className="flex-1 overflow-y-auto text-sm rounded-lg"> <div ref={containerRef} className="flex-1 overflow-y-auto text-sm rounded-lg">
<table className="w-full table-auto border-collapse"> <table className="w-full table-auto border-collapse">
<thead className="sticky top-0 bg-blue-400 text-blue-100"> <thead className="sticky top-0 bg-blue-400 text-blue-100">
<tr> <tr>
<th className="py-1 px-2 text-left">Time</th> <th className="py-2 px-2 text-left">Time</th>
<th className="py-1 px-2 text-left">Sender</th> <th className="py-2 px-2 text-left">Sender</th>
<th className="py-1 px-2 text-left">Message</th> <th className="py-2 px-2 text-left">Message</th>
</tr> </tr>
</thead> </thead>
......
...@@ -80,7 +80,7 @@ export default function SidePanel({ ...@@ -80,7 +80,7 @@ export default function SidePanel({
return ( return (
<div className="w-80 flex flex-col gap-2 p-4 pr-0 pb-0"> <div className="w-80 flex flex-col gap-2 p-4 pr-0 pb-0">
<div className="bg-blue-200 border border-blue-500 p-3 rounded-lg"> <div className="bg-blue-200 border border-blue-500 p-2 px-4 rounded-lg">
<h2 className="font-bold text-blue-500">STATUS</h2> <h2 className="font-bold text-blue-500">STATUS</h2>
</div> </div>
<div className="flex-1 p-3 space-y-2 bg-blue-200 border border-blue-500 rounded-lg"> <div className="flex-1 p-3 space-y-2 bg-blue-200 border border-blue-500 rounded-lg">
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment