summaryrefslogtreecommitdiffstats
path: root/Widgets
diff options
context:
space:
mode:
authorsuchinton2001 <suchinton.2001@gmail.com>2023-10-09 16:00:58 +0530
committersuchinton2001 <suchinton.2001@gmail.com>2023-10-09 16:00:58 +0530
commitdb862e32df7f31e6453d7f05a6f011091b96ffab (patch)
tree6abcc3a2eceb79a43b7836d1d23a8594da375826 /Widgets
parent260bbf89836ff9ce98054e5166699a163a0d7c11 (diff)
agl-demo-control-panel: Add grpc support for databroker
- Add grpc support for databroker (set default protocol) - Add virtual car for script mode in IC app - Refine UI elements - Use specific grpc/ws jwt tokens - Simplify settings menu Bug-AGL: SPEC-4905 Signed-off-by: suchinton2001 <suchinton.2001@gmail.com> Change-Id: I59c4b1de80e280fe22993b2d2f7c92d6b41a89c7
Diffstat (limited to 'Widgets')
-rw-r--r--Widgets/ICPage.py373
-rw-r--r--Widgets/SteeringCtrlPage.py206
-rw-r--r--Widgets/settings.py137
3 files changed, 455 insertions, 261 deletions
diff --git a/Widgets/ICPage.py b/Widgets/ICPage.py
index 64fa2d3..2d11cdf 100644
--- a/Widgets/ICPage.py
+++ b/Widgets/ICPage.py
@@ -18,10 +18,12 @@ import os
import sys
from PyQt5 import uic, QtCore, QtWidgets
from PyQt5.QtWidgets import QApplication
-from PyQt5.QtWidgets import QSlider, QLCDNumber, QPushButton
from PyQt5.QtGui import QIcon, QPixmap, QPainter
+from PyQt5.QtCore import QObject, pyqtSignal
import time
-from PyQt5.QtCore import QThread
+from PyQt5.QtWidgets import QWidget
+from qtwidgets import AnimatedToggle
+import threading
current_dir = os.path.dirname(os.path.abspath(__file__))
@@ -49,35 +51,40 @@ class IC_Paths():
class ICWidget(Base, Form):
+ """
+ This class represents the ICWidget which is a widget for the AGL Demo Control Panel.
+ It inherits from the Base and Form classes.
+ """
+
def __init__(self, parent=None):
+ """
+ Initializes the ICWidget object.
+
+ Args:
+ - parent: The parent widget. Defaults to None.
+ """
super(self.__class__, self).__init__(parent)
self.setupUi(self)
self.IC = IC_Paths()
+ #self.vehicle_simulator = VehicleSimulator(self)
self.feed_kuksa = FeedKuksa()
self.feed_kuksa.start()
+ self.vehicle_simulator = VehicleSimulator()
- # # load the stylesheet
- # theme = open(os.path.join(current_dir, "../ui/styles/Tron/ICPage.qss"), 'r')
- # self.setStyleSheet(theme.read())
- # theme.close()
-
- self.scriptBtn = self.findChild(QPushButton, "scriptBtn")
+ header_frame = self.findChild(QWidget, "header_frame")
+ layout = header_frame.layout()
- self.Speed_slider = self.findChild(QSlider, "Speed_slider")
- self.Speed_monitor = self.findChild(QLCDNumber, "Speed_monitor")
- self.RPM_slider = self.findChild(QSlider, "RPM_slider")
- self.RPM_monitor = self.findChild(QLCDNumber, "RPM_monitor")
+ self.IC_Frame = self.findChild(QWidget, "frame_1")
- self.coolantTemp_slider = self.findChild(QSlider, "coolantTemp_slider")
- self.fuelLevel_slider = self.findChild(QSlider, "fuelLevel_slider")
+ self.Script_toggle = AnimatedToggle(
+ checked_color="#4BD7D6",
+ pulse_checked_color="#00ffff"
+ )
- self.accelerationBtn = self.findChild(QPushButton, "accelerationBtn")
-
- self.leftIndicatorBtn = self.findChild(QPushButton, "leftIndicatorBtn")
- self.rightIndicatorBtn = self.findChild(QPushButton, "rightIndicatorBtn")
- self.hazardBtn = self.findChild(QPushButton, "hazardBtn")
+ layout.replaceWidget(self.demoToggle, self.Script_toggle)
+ self.demoToggle.deleteLater()
buttons = [self.parkBtn,
self.reverseBtn,
@@ -85,6 +92,10 @@ class ICWidget(Base, Form):
self.driveBtn]
# group for the buttons for mutual exclusion
+
+ self.vehicle_simulator.speed_changed.connect(self.set_Vehicle_Speed)
+ self.vehicle_simulator.rpm_changed.connect(self.set_Vehicle_RPM)
+
self.driveGroupBtns = QtWidgets.QButtonGroup(self)
self.driveGroupBtns.setExclusive(True)
@@ -93,8 +104,6 @@ class ICWidget(Base, Form):
self.driveGroupBtns.buttonClicked.connect(self.driveBtnClicked)
- self.scriptBtn.clicked.connect(self.scriptBtnClicked)
-
self.Speed_slider.valueChanged.connect(self.update_Speed_monitor)
self.Speed_slider.setMinimum(0)
self.Speed_slider.setMaximum(240)
@@ -112,107 +121,136 @@ class ICWidget(Base, Form):
self.accelerationBtn.released.connect(self.accelerationBtnReleased)
# make both buttons checkable
+ self.Script_toggle.clicked.connect(self.handle_Script_toggle)
self.leftIndicatorBtn.setCheckable(True)
self.rightIndicatorBtn.setCheckable(True)
self.hazardBtn.setCheckable(True)
- self.leftIndicatorBtn.clicked.connect(self.leftIndicatorBtnClicked)
- self.rightIndicatorBtn.clicked.connect(self.rightIndicatorBtnClicked)
- self.hazardBtn.clicked.connect(self.hazardBtnClicked)
+ self.leftIndicatorBtn.toggled.connect(self.leftIndicatorBtnClicked)
+ self.rightIndicatorBtn.toggled.connect(self.rightIndicatorBtnClicked)
+ self.hazardBtn.toggled.connect(self.hazardBtnClicked)
- def scriptBtnClicked(self):
- if self.scriptBtn.isChecked():
- ICScript.start_script()
+ def set_Vehicle_Speed(self, speed):
+ self.Speed_slider.setValue(speed)
- if not self.ScriptBtn.isChecked():
- ICScript.stop_script()
+ def set_Vehicle_RPM(self, rpm):
+ self.RPM_slider.setValue(rpm)
def update_Speed_monitor(self):
+ """
+ Updates the speed monitor with the current speed value.
+ """
speed = int(self.Speed_slider.value())
self.Speed_monitor.display(speed)
- self.feed_kuksa.send_values(self.IC.speed, str(speed), 'value')
+ try: self.feed_kuksa.send_values(self.IC.speed, str(speed), 'value')
+ except Exception as e: print(e)
def update_RPM_monitor(self):
+ """
+ Updates the RPM monitor with the current RPM value.
+ """
rpm = int(self.RPM_slider.value())
self.RPM_monitor.display(rpm)
- self.feed_kuksa.send_values(self.IC.engineRPM, str(rpm), 'value')
+ try: self.feed_kuksa.send_values(self.IC.engineRPM, str(rpm), 'value')
+ except Exception as e: print(e)
def update_coolantTemp_monitor(self):
+ """
+ Updates the coolant temperature monitor with the current coolant temperature value.
+ """
coolantTemp = int(self.coolantTemp_slider.value())
- self.feed_kuksa.send_values(self.IC.coolantTemp, str(coolantTemp), 'value')
+ try: self.feed_kuksa.send_values(self.IC.coolantTemp, str(coolantTemp), 'value')
+ except Exception as e: print(e)
def update_fuelLevel_monitor(self):
+ """
+ Updates the fuel level monitor with the current fuel level value.
+ """
fuelLevel = int(self.fuelLevel_slider.value())
- self.feed_kuksa.send_values(self.IC.fuelLevel, str(fuelLevel))
+ try: self.feed_kuksa.send_values(self.IC.fuelLevel, str(fuelLevel))
+ except Exception as e: print(e)
def hazardBtnClicked(self):
+ """
+ Handles the hazard button click event.
+ """
hazardIcon = QPixmap(":/Images/Images/hazard.png")
+ painter = QPainter(hazardIcon)
+ painter.setCompositionMode(QPainter.CompositionMode_SourceIn)
+
if self.hazardBtn.isChecked():
- painter = QPainter(hazardIcon)
- painter.setCompositionMode(QPainter.CompositionMode_SourceIn)
- painter.fillRect(hazardIcon.rect(), QtCore.Qt.yellow)
- painter.end()
- self.hazardBtn.setIcon(QIcon(hazardIcon))
-
- self.leftIndicatorBtn.setChecked(True)
- self.rightIndicatorBtn.setChecked(True)
- self.feed_kuksa.send_values(self.IC.leftIndicator, "true")
- self.feed_kuksa.send_values(self.IC.rightIndicator, "true")
- self.feed_kuksa.send_values(self.IC.hazard, "true")
+ color = QtCore.Qt.yellow
+ value = "true"
else:
- painter = QPainter(hazardIcon)
- painter.setCompositionMode(QPainter.CompositionMode_SourceIn)
- painter.fillRect(hazardIcon.rect(), QtCore.Qt.black)
- painter.end()
- self.hazardBtn.setIcon(QIcon(hazardIcon))
-
- self.leftIndicatorBtn.setChecked(False)
- self.rightIndicatorBtn.setChecked(False)
- self.feed_kuksa.send_values(self.IC.leftIndicator, "false")
- self.feed_kuksa.send_values(self.IC.rightIndicator, "false")
- self.feed_kuksa.send_values(self.IC.hazard, "false")
+ color = QtCore.Qt.black
+ value = "false"
+
+ painter.fillRect(hazardIcon.rect(), color)
+ painter.end()
+ self.hazardBtn.setIcon(QIcon(hazardIcon))
+
+ self.leftIndicatorBtn.setChecked(self.hazardBtn.isChecked())
+ self.rightIndicatorBtn.setChecked(self.hazardBtn.isChecked())
+
+ try:
+ self.feed_kuksa.send_values(self.IC.leftIndicator, value)
+ self.feed_kuksa.send_values(self.IC.rightIndicator, value)
+ self.feed_kuksa.send_values(self.IC.hazard, value)
+ except Exception as e:
+ print(e)
+
def leftIndicatorBtnClicked(self):
+ """
+ Handles the left indicator button click event.
+ """
leftIndicatorIcon = QPixmap(":/Images/Images/left.png")
- if self.leftIndicatorBtn.isChecked():
+ painter = QPainter(leftIndicatorIcon)
+ painter.setCompositionMode(QPainter.CompositionMode_SourceIn)
- painter = QPainter(leftIndicatorIcon)
- painter.setCompositionMode(QPainter.CompositionMode_SourceIn)
- painter.fillRect(leftIndicatorIcon.rect(), QtCore.Qt.green)
- painter.end()
-
- self.leftIndicatorBtn.setIcon(QIcon(leftIndicatorIcon))
- self.feed_kuksa.send_values(self.IC.leftIndicator, "true")
+ if self.leftIndicatorBtn.isChecked():
+ color = QtCore.Qt.green
+ value = "true"
else:
+ color = QtCore.Qt.black
+ value = "false"
- painter = QPainter(leftIndicatorIcon)
- painter.setCompositionMode(QPainter.CompositionMode_SourceIn)
- painter.fillRect(leftIndicatorIcon.rect(), QtCore.Qt.black)
- painter.end()
+ painter.fillRect(leftIndicatorIcon.rect(), color)
+ painter.end()
+ self.leftIndicatorBtn.setIcon(QIcon(leftIndicatorIcon))
- self.leftIndicatorBtn.setIcon(QIcon(leftIndicatorIcon))
- self.feed_kuksa.send_values(self.IC.leftIndicator, "false")
+ try: self.feed_kuksa.send_values(self.IC.leftIndicator, value)
+ except Exception as e: print(e)
def rightIndicatorBtnClicked(self):
+ """
+ Handles the right indicator button click event.
+ """
rightIndicatorIcon = QPixmap(":/Images/Images/right.png")
- if self.rightIndicatorBtn.isChecked():
+ painter = QPainter(rightIndicatorIcon)
+ painter.setCompositionMode(QPainter.CompositionMode_SourceIn)
- painter = QPainter(rightIndicatorIcon)
- painter.setCompositionMode(QPainter.CompositionMode_SourceIn)
- painter.fillRect(rightIndicatorIcon.rect(), QtCore.Qt.green)
- painter.end()
- self.rightIndicatorBtn.setIcon(QIcon(rightIndicatorIcon))
- self.feed_kuksa.send_values(self.IC.rightIndicator, "true")
+ if self.rightIndicatorBtn.isChecked():
+ color = QtCore.Qt.green
+ value = "true"
else:
+ color = QtCore.Qt.black
+ value = "false"
+
+ painter.fillRect(rightIndicatorIcon.rect(), color)
+ painter.end()
+ self.rightIndicatorBtn.setIcon(QIcon(rightIndicatorIcon))
+
+ try:
+ self.feed_kuksa.send_values(self.IC.rightIndicator, value)
+ except Exception as e:
+ print(e)
- painter = QPainter(rightIndicatorIcon)
- painter.setCompositionMode(QPainter.CompositionMode_SourceIn)
- painter.fillRect(rightIndicatorIcon.rect(), QtCore.Qt.black)
- painter.end()
- self.rightIndicatorBtn.setIcon(QIcon(rightIndicatorIcon))
- self.feed_kuksa.send_values(self.IC.rightIndicator, "false")
def accelerationBtnPressed(self):
+ """
+ Handles the acceleration button press event.
+ """
self.startTime = QtCore.QTime.currentTime()
self.acceleration_timer = QtCore.QTimer()
self.acceleration_timer.timeout.connect(
@@ -227,6 +265,14 @@ class ICWidget(Base, Form):
lambda: self.updateSpeedAndEngineRpm("Decelerate"))
self.acceleration_timer.start(100)
+ def handle_Script_toggle(self):
+ if self.Script_toggle.isChecked():
+ self.set_Vehicle_RPM(1000)
+ self.set_Vehicle_Speed(0)
+ self.vehicle_simulator.start()
+ else:
+ self.vehicle_simulator.stop()
+
def updateSpeedAndEngineRpm(self, action, acceleration=(60/5)):
if action == "Accelerate":
pass
@@ -257,33 +303,26 @@ class ICWidget(Base, Form):
self.update_RPM_monitor()
def driveBtnClicked(self):
- # // Selected Gear output = > 0 = Neutral, 1/2/.. = Forward, -1/.. = Reverse, 126 = Park, 127 = Drive
- # #859287 ; /* light green */
-
- if self.driveGroupBtns.checkedButton() == self.driveBtn:
+ gear_mapping = {
+ self.driveBtn: "127",
+ self.parkBtn: "126",
+ self.reverseBtn: "-1",
+ self.neutralBtn: "0"
+ }
+
+ checked_button = self.driveGroupBtns.checkedButton()
+
+ if checked_button in gear_mapping:
+ gear_value = gear_mapping[checked_button]
self.accelerationBtn.setEnabled(True)
- self.Speed_slider.setEnabled(True)
- self.RPM_slider.setEnabled(True)
- self.feed_kuksa.send_values(self.IC.selectedGear, "127")
-
- if self.driveGroupBtns.checkedButton() == self.parkBtn:
- self.accelerationBtn.setEnabled(False)
- self.Speed_slider.setEnabled(False)
- self.RPM_slider.setEnabled(False)
- self.feed_kuksa.send_values(self.IC.selectedGear, "126")
-
- if self.driveGroupBtns.checkedButton() == self.reverseBtn:
- self.accelerationBtn.setEnabled(True)
- self.Speed_slider.setEnabled(True)
- self.RPM_slider.setEnabled(True)
- self.feed_kuksa.send_values(self.IC.selectedGear, "-1")
-
- if self.driveGroupBtns.checkedButton() == self.neutralBtn:
- self.accelerationBtn.setEnabled(False)
- self.Speed_slider.setEnabled(False)
+ self.Speed_slider.setEnabled(checked_button != self.neutralBtn)
self.RPM_slider.setEnabled(True)
- self.feed_kuksa.send_values(self.IC.selectedGear, "0")
-
+ try:
+ self.feed_kuksa.send_values(self.IC.selectedGear, gear_value)
+ except Exception as e:
+ print(e)
+ else:
+ print("Unknown button checked!")
class AccelerationFns():
def calculate_speed(time, acceleration) -> int:
@@ -314,37 +353,105 @@ class AccelerationFns():
engine_rpm = wheel_rps * gear_ratios[current_gear - 1] * 60
return int(engine_rpm)
+
+class VehicleSimulator(QObject):
+ # Define signals for updating speed and rpm
+ speed_changed = pyqtSignal(int)
+ rpm_changed = pyqtSignal(int)
+ DEFAULT_IDLE_RPM = 1000
-class ICScript(ICWidget):
- def start_script(self):
- ICWidget.reset()
-
- # disable all widgets in the scroll area
- for widget in ICWidget.scrollAreaWidgetContents.children():
- widget.setEnabled(False)
-
- rates = [(60/5), (60/4), (60/3)]
+ def __init__(self):
+ super().__init__()
+ self.freq = 10
+ self.vehicle_speed = 0
+ self.engine_speed = self.DEFAULT_IDLE_RPM
+ self.running = False
+ self.lock = threading.Lock()
+ self.thread = threading.Thread(target=self.run, daemon=True)
+
+ def start(self):
+ if not self.running:
+ self.reset()
+ self.running = True
+ self.thread.start()
+
+ def stop(self):
+ self.running = False
+
+ def reset(self):
+ with self.lock:
+ self.vehicle_speed = 0
+ self.engine_speed = self.DEFAULT_IDLE_RPM
+
+ def run(self):
+ while self.running:
+ if not self.running:
+ break
- # start assigning values to the speed and rpm sliders and send them to the IC do this in a loop for each rate
- for rate in rates:
- ICWidget.accelerationBtnPressed()
- ICWidget.acceleration_timer.timeout.connect(
- lambda: ICWidget.updateSpeedAndEngineRpm("Accelerate"), rate)
- ICWidget.acceleration_timer.start(100)
- time.sleep(5)
- ICWidget.accelerationBtnReleased()
- ICWidget.acceleration_timer.timeout.connect(
- lambda: ICWidget.updateSpeedAndEngineRpm("Decelerate"), rate)
- ICWidget.acceleration_timer.start(100)
+ # Simulate acceleration and update speed and rpm
+ self.accelerate(60, 1800, 3)
+ self.accelerate(65, 1700, 1)
+ self.accelerate(80, 2500, 6)
+ self.accelerate(100, 3000, 4)
+ self.brake(80, 3000, 3)
+ self.accelerate(104, 4000, 6)
+ self.brake(40, 2000, 4)
+ self.accelerate(90, 3000, 5)
+ self.brake(1, 650, 5)
+
+ # Ensure reset is called when not in cruise mode
+ if not self.running:
+ self.reset()
+
time.sleep(5)
- def stop_script(self):
- ICWidget.reset()
-
- # enable all widgets in the scroll area
- for widget in ICWidget.scrollAreaWidgetContents.children():
- widget.setEnabled(True)
+ def accelerate(self, target_speed, target_rpm, duration):
+ if target_speed <= self.vehicle_speed:
+ return
+ v = (target_speed - self.vehicle_speed) / (duration * self.freq)
+ r = (target_rpm - self.engine_speed) / (duration * self.freq)
+ while self.vehicle_speed < target_speed and self.running:
+ with self.lock:
+ self.vehicle_speed += v
+ self.engine_speed += r
+ self.speed_changed.emit(int(self.vehicle_speed))
+ self.rpm_changed.emit(int(self.engine_speed))
+ time.sleep(1 / self.freq)
+
+ def brake(self, target_speed, target_rpm, duration):
+ if target_speed >= self.vehicle_speed:
+ return
+ v = (self.vehicle_speed - target_speed) / (duration * self.freq)
+ r = (self.engine_speed - target_rpm) / (duration * self.freq)
+ while self.vehicle_speed > target_speed and self.running:
+ with self.lock:
+ self.vehicle_speed -= v
+ self.engine_speed -= r
+ self.speed_changed.emit(int(self.vehicle_speed))
+ self.rpm_changed.emit(int(self.engine_speed))
+ time.sleep(1 / self.freq)
+
+ def increase(self, bycruise = True):
+ if self.CRUISEACTIVE:
+ target_speed = self.vehicle_speed + 5
+ target_rpm = self.engine_speed * 1.1
+ self.accelerate(target_speed, target_rpm, 2, bycruise)
+
+ def decrease(self, bycruise = True):
+ if self.CRUISEACTIVE:
+ target_speed = self.vehicle_speed - 5
+ target_rpm = self.engine_speed * 0.9
+ self.brake(target_speed, target_rpm, 2, bycruise)
+
+ def resume(self, bycruise = True):
+ target_speed = self.CRUISESPEED
+ target_rpm = self.CRUISERPM
+ current_speed = self.get_vehicle_speed()
+ if target_speed > current_speed:
+ self.accelerate(target_speed, target_rpm, 2, bycruise)
+ else:
+ self.brake(target_speed, target_rpm, 2, bycruise)
if __name__ == '__main__':
app = QApplication(sys.argv)
diff --git a/Widgets/SteeringCtrlPage.py b/Widgets/SteeringCtrlPage.py
index 61271c2..696c6c9 100644
--- a/Widgets/SteeringCtrlPage.py
+++ b/Widgets/SteeringCtrlPage.py
@@ -28,7 +28,9 @@ current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.dirname(current_dir))
-import extras.Kuksa_Instance as kuksa_instance
+from extras.FeedKuksa import FeedKuksa
+import extras.FeedCAN as feed_can
+from . import settings
Form, Base = uic.loadUiType(os.path.join(current_dir, "../ui/SteeringControls.ui"))
@@ -36,31 +38,62 @@ Form, Base = uic.loadUiType(os.path.join(current_dir, "../ui/SteeringControls.ui
class Steering_Paths():
def __init__(self):
- self.VolumeUp = "Vehicle.Cabin.SteeringWheel.Switches.VolumeUp"
- self.VolumeDown = "Vehicle.Cabin.SteeringWheel.Switches.VolumeDown"
- self.VolumeMute = "Vehicle.Cabin.SteeringWheel.Switches.VolumeMute"
-
- self.Mode = "Vehicle.Cabin.SteeringWheel.Switches.Mode"
-
- self.NextTrack = "Vehicle.Cabin.SteeringWheel.Switches.Next"
- self.PreviousTrack = "Vehicle.Cabin.SteeringWheel.Switches.Previous"
-
- self.Info = "Vehicle.Cabin.SteeringWheel.Switches.Info"
-
- self.PhoneCall = "Vehicle.Cabin.SteeringWheel.Switches.PhoneCall"
- self.PhoneHangup = "Vehicle.Cabin.SteeringWheel.Switches.PhoneHangup"
- self.Voice = "Vehicle.Cabin.SteeringWheel.Switches.Voice"
- self.LaneDeparture = "Vehicle.Cabin.SteeringWheel.Switches.LaneDepartureWarning"
-
- self.Horn = "Vehicle.Cabin.SteeringWheel.Switches.Horn"
-
- self.CruiseEnable = "Vehicle.Cabin.SteeringWheel.Switches.CruiseEnable"
- self.CruseSet = "Vehicle.Cabin.SteeringWheel.Switches.CruiseSet"
- self.CruiseResume = "Vehicle.Cabin.SteeringWheel.Switches.CruiseResume"
- self.CruiseCancel = "Vehicle.Cabin.SteeringWheel.Switches.CruiseCancel"
-
- self.CruiseLimit = "Vehicle.Cabin.SteeringWheel.Switches.CruiseLimit"
- self.CruiseDistance = "Vehicle.Cabin.SteeringWheel.Switches.CruiseDistance"
+ self.switches = {
+ "VolumeUp": {
+ "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.VolumeUp",
+ "CAN": "021#FFFFFFFF40000000"},
+ "VolumeDown": {
+ "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.VolumeDown",
+ "CAN": "021#FFFFFFFF10000000"},
+ "VolumeMute": {
+ "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.VolumeMute",
+ "CAN": "021#FFFFFFFF01000000"},
+ "Mode": {
+ "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.Mode",
+ "CAN": "021#FFFFFFFF20000000"},
+ "NextTrack": {
+ "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.Next",
+ "CAN": "021#FFFFFFFF08000000"},
+ "PreviousTrack": {
+ "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.Previous",
+ "CAN": "021#FFFFFFFF80000000"},
+ "Info": {
+ "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.Info",
+ "CAN": "021#FFFFFFFF02000000"},
+ "PhoneCall": {
+ "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.PhoneCall",
+ "CAN": "021#FFFFFFFF00010000"},
+ "PhoneHangup": {
+ "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.PhoneHangup",
+ "CAN": "021#FFFFFFFF00020000"},
+ "Voice": {
+ "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.Voice",
+ "CAN": "021#FFFFFFFF00040000"},
+ "LaneDeparture": {
+ "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.LaneDepartureWarning",
+ "CAN": "021#FFFFFFFF00000001"},
+ "Horn": {
+ "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.Horn",
+ "CAN": "021#FFFFFFFF00000080"},
+ "CruiseEnable": {
+ "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.CruiseEnable",
+ "CAN": "021#FFFFFFFF00008000"},
+ "CruiseSet": {
+ "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.CruiseSet",
+ "CAN": "021#FFFFFFFF00001000"},
+ "CruiseResume": {
+ "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.CruiseResume",
+ "CAN": "021#FFFFFFFF00004000"},
+ "CruiseCancel": {
+ "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.CruiseCancel",
+ "CAN": "021#FFFFFFFF00000800"},
+ "CruiseLimit": {
+ "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.CruiseLimit",
+ "CAN": "021#FFFFFFFF00000200"},
+ "CruiseDistance": {
+ "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.CruiseDistance",
+ "CAN": "021#FFFFFFFF00000100"}
+ }
class SteeringCtrlWidget(Base, Form):
def __init__(self, parent=None):
@@ -69,94 +102,49 @@ class SteeringCtrlWidget(Base, Form):
self.Steering = Steering_Paths()
self.feed_kuksa = FeedKuksa()
+ self.settings = settings
+
self.add_buttons()
def add_buttons(self):
# Define button groups and actions
- LeftControlsBtns = [self.VolUpBtn,
- self.VolDownBtn,
- self.ModeBtn,
- self.VolMuteBtn,
- self.NextTrackBtn,
- self.PrevTrackBtn,
- self.InfoBtn]
-
-
- PhoneBtns = [self.PhoneCallBtn, self.PhoneHangupBtn]
- ExtraContolsBtns = [self.VoiceBtn, self.LaneDepartureBtn]
-
- RightControlsBtns = [self.CruiseEnableBtn,
- self.CruiseSetBtn,
- self.CruiseResumeBtn,
- self.CruiseCancelBtn,
- self.CruiseLimitBtn,
- self.CruiseDistanceBtn]
-
- self.LeftControlsBtnsGroup = QButtonGroup()
- self.PhoneBtnsGroup = QButtonGroup()
- self.ExtraContolsBtnsGroup = QButtonGroup()
- self.RiqhtControlsBtnsGroup = QButtonGroup()
-
- for btn in LeftControlsBtns:
- self.LeftControlsBtnsGroup.addButton(btn)
-
- for btn in PhoneBtns:
- self.PhoneBtnsGroup.addButton(btn)
-
-
- for btn in RightControlsBtns:
- self.RiqhtControlsBtnsGroup.addButton(btn)
-
- self.LeftControlsBtnsGroup.buttonClicked.connect(self.left_controls_clicked)
- self.RiqhtControlsBtnsGroup.buttonClicked.connect(self.right_controls_clicked)
-
- self.HornBtn.clicked.connect(self.horn_clicked)
-
- def left_controls_clicked(self):
- print("Left controls clicked")
-
- def right_controls_clicked(self):
- print("Right controls clicked")
-
- def horn_clicked(self):
- print("Horn clicked")
-
-class FeedKuksa(QThread):
- def __init__(self, parent=None):
- QThread.__init__(self,parent)
- self.stop_flag = False
- self.set_instance()
-
- def run(self):
- print("Starting thread")
- self.set_instance()
- while not self.stop_flag:
- self.send_values()
-
- def stop(self):
- self.stop_flag = True
- print("Stopping thread")
-
- def set_instance(self):
- self.kuksa = kuksa_instance.KuksaClientSingleton.get_instance()
- self.client = self.kuksa.get_client()
-
- def send_values(self, Path=None, Value=None, Attribute=None):
- if self.client is not None:
- if self.client.checkConnection() is True:
-
- if Attribute is not None:
- self.client.setValue(Path, Value, Attribute)
- else:
- self.client.setValue(Path, Value)
- else:
- print("Could not connect to Kuksa")
- self.set_instance()
- else:
- print("Kuksa client is None, try reconnecting")
- time.sleep(2)
- self.set_instance()
+ ControlsBtns = [self.VolumeUp,
+ self.VolumeDown,
+ self.Mode,
+ self.VolumeMute,
+ self.NextTrack,
+ self.PreviousTrack,
+ self.Info,
+ self.PhoneCall,
+ self.PhoneHangup,
+ self.Voice,
+ self.LaneDeparture,
+ self.CruiseEnable,
+ self.CruiseSet,
+ self.CruiseResume,
+ self.CruiseCancel,
+ self.CruiseLimit,
+ self.CruiseDistance]
+
+ self.ControlsBtnsGroup = QButtonGroup()
+
+ for btn in ControlsBtns:
+ self.ControlsBtnsGroup.addButton(btn)
+
+ self.ControlsBtnsGroup.buttonClicked.connect(self.controls_clicked)
+
+ def controls_clicked(self, button):
+ button_clicked = button.objectName()
+ signal_type = settings.Steering_Signal_Type
+ if signal_type == "Kuksa":
+ self.feed_kuksa.send_values(self.Steering.switches[button_clicked]["Kuksa"], 1)
+ self.feed_kuksa.send_values(self.Steering.switches[button_clicked]["Kuksa"], 0)
+ elif signal_type == "CAN":
+ feed_can.send_can_signal(self.Steering.switches[button_clicked]["CAN"])
+ # Making sure button state goes back to off
+ feed_can.send_can_signal("021#FFFFFFFF00000000")
+ print(button_clicked + " button clicked")
if __name__ == '__main__':
import sys
diff --git a/Widgets/settings.py b/Widgets/settings.py
index 11800a2..17ea7f6 100644
--- a/Widgets/settings.py
+++ b/Widgets/settings.py
@@ -12,12 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import extras.Kuksa_Instance as kuksa_instance
+
import os
import sys
import time
from PyQt5 import uic
-from PyQt5.QtWidgets import QApplication, QLineEdit, QPushButton, QLabel, QCheckBox
+from PyQt5.QtWidgets import QApplication, QLineEdit, QPushButton, QLabel
+from qtwidgets import AnimatedToggle
+from PyQt5.QtWidgets import QWidget
+from PyQt5.QtCore import QThread
+from PyQt5 import QtGui
+import logging
current_dir = os.path.dirname(os.path.abspath(__file__))
@@ -25,47 +30,115 @@ current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.dirname(current_dir))
+import extras.Kuksa_Instance as kuksa_instance
Form, Base = uic.loadUiType(os.path.join(
current_dir, "../ui/Settings_Window.ui"))
# ========================================
+Steering_Signal_Type = "Kuksa"
class settings(Base, Form):
+ """
+ A class representing the settings widget of the AGL Demo Control Panel.
+
+ Attributes:
+ - SSL_toggle: An AnimatedToggle object representing the SSL toggle button.
+ - CAN_Kuksa_toggle: An AnimatedToggle object representing the CAN/Kuksa toggle button.
+ - connectionStatus: A QLabel object representing the connection status label.
+ - connectionLogo: A QLabel object representing the connection logo label.
+ - IPAddrInput: A QLineEdit object representing the IP address input field.
+ - reconnectBtn: A QPushButton object representing the reconnect button.
+ - refreshBtn: A QPushButton object representing the refresh button.
+ - startClientBtn: A QPushButton object representing the start client button.
+ """
+
def __init__(self, parent=None):
+ """
+ Initializes the settings widget of the AGL Demo Control Panel.
+ """
super(self.__class__, self).__init__(parent)
self.setupUi(self)
- self.SSLToggle = self.findChild(QCheckBox, "SSLToggle")
+ self.SSL_toggle = AnimatedToggle(
+ checked_color="#4BD7D6",
+ pulse_checked_color="#00ffff",
+ )
+
+ self.Protocol_toggle = AnimatedToggle(
+ checked_color="#4BD7D6",
+ pulse_checked_color="#00ffff"
+ )
+
+ self.CAN_Kuksa_toggle = AnimatedToggle(
+ checked_color="#4BD7D6",
+ pulse_checked_color="#00ffff"
+ )
self.connectionStatus = self.findChild(QLabel, "connectionStatus")
self.connectionLogo = self.findChild(QLabel, "connectionLogo")
self.IPAddrInput = self.findChild(QLineEdit, "IPAddrInput")
- self.tokenPathInput = self.findChild(QLineEdit, "tokenPathInput")
self.reconnectBtn = self.findChild(QPushButton, "reconnectBtn")
- self.refreshBtn = self.findChild(QPushButton, "refreshBtn")
self.startClientBtn = self.findChild(QPushButton, "startClientBtn")
self.startClientBtn.clicked.connect(self.set_instance)
self.reconnectBtn.clicked.connect(self.reconnectClient)
- self.refreshBtn.clicked.connect(self.refreshStatus)
+ self.SSL_toggle.clicked.connect(self.toggleSSL)
+ self.CAN_Kuksa_toggle.clicked.connect(self.toggle_CAN_Kuksa)
+
+ Frame_GS = self.findChild(QWidget, "frame_general_settings")
+ Frame_PS = self.findChild(QWidget, "frame_page_settings")
+ GS_layout = Frame_GS.layout()
+ PS_layout = Frame_PS.layout()
+
+ GS_layout.replaceWidget(self.place_holder_toggle_1, self.SSL_toggle)
+ GS_layout.replaceWidget(self.place_holder_toggle_2, self.Protocol_toggle)
+ PS_layout.replaceWidget(self.place_holder_toggle_3, self.CAN_Kuksa_toggle)
+
+ self.place_holder_toggle_1.deleteLater()
+ self.place_holder_toggle_2.deleteLater()
+ self.place_holder_toggle_3.deleteLater()
self.refreshStatus()
- self.show()
+
+ def toggleSSL(self):
+ """
+ Toggles the SSL connection.
+ """
+ self.kuksa_config["insecure"] = not self.SSL_toggle.isChecked()
+ print(self.kuksa_config)
+
+ def toggle_CAN_Kuksa(self):
+ """
+ Toggles the CAN/Kuksa connection.
+ """
+ global Steering_Signal_Type
+ if (self.CAN_Kuksa_toggle.isChecked()):
+ Steering_Signal_Type = "CAN"
+ else:
+ Steering_Signal_Type = "Kuksa"
+
+ def get_protocol(self):
+ if (not self.Protocol_toggle.isChecked()):
+ return "ws"
+ else:
+ return "grpc"
def set_instance(self):
- self.kuksa = kuksa_instance.KuksaClientSingleton.get_instance()
+ """
+ Sets the instance of the Kuksa client.
+ """
+ self.kuksa = kuksa_instance.KuksaClientSingleton.instance()
self.client = self.kuksa.get_client()
- self.config = self.kuksa.get_config()
- self.token = self.kuksa.get_token()
+ self.kuksa_config = self.kuksa.get_config()
- self.IPAddrInput.setText(self.config["ip"])
- self.SSLToggle.setChecked(self.config["insecure"])
- self.tokenPathInput.setText(self.token)
+ self.IPAddrInput.setText(self.kuksa_config["ip"])
+ self.SSL_toggle.setChecked(not self.kuksa_config["insecure"])
+ self.Protocol_toggle.setChecked(self.kuksa_config["protocol"] == 'grpc')
time.sleep(2)
@@ -76,16 +149,21 @@ class settings(Base, Form):
self.refreshStatus()
def refreshStatus(self):
+ """
+ Refreshes the connection status.
+ """
try:
if (self.client is None):
self.connectionStatus.setText('Not Connected')
self.connectionLogo.setStyleSheet("background-color: red")
+ self.connectionLogo.setPixmap(QtGui.QPixmap(":/Carbon_Icons/carbon_icons/connection-signal--off.svg"))
return None
if (self.client.checkConnection() == True):
self.connectionStatus.setText('Connected')
- self.connectionLogo.setPixmap(":/icons/feather/check-circle.svg")
self.connectionLogo.setStyleSheet("background-color: green")
+ # change cnnection logo pixmap to connected.svf from resources
+ self.connectionLogo.setPixmap(QtGui.QPixmap(":/Carbon_Icons/carbon_icons/connection-signal.svg"))
self.client.start()
return True
@@ -93,20 +171,41 @@ class settings(Base, Form):
self.client.stop()
self.connectionStatus.setText('Disconnected')
self.connectionLogo.setStyleSheet("background-color: yellow")
+ self.connectionLogo.setPixmap(QtGui.QPixmap(":/Carbon_Icons/carbon_icons/connection-signal--off.svg"))
return False
except:
pass
def reconnectClient(self):
+ """
+ Reconnects the client.
+ """
try:
- self.config["ip"] = self.IPAddrInput.text()
- self.config["insecure"] = self.SSLToggle.isChecked()
- self.token = self.tokenPathInput.text()
- self.client = self.kuksa.reconnect_client(self.config, self.token)
+ self.kuksa_config["ip"] = self.IPAddrInput.text()
+ self.kuksa_config["insecure"] = not self.SSL_toggle.isChecked()
+ self.kuksa_config["protocol"] = self.get_protocol()
+ if self.kuksa_config["protocol"] == 'ws':
+ self.kuksa_config["port"] = "8090"
+ if self.kuksa_config["protocol"] == 'grpc':
+ self.kuksa_config["port"] = "55555"
+ self.client = self.kuksa.reconnect(self.kuksa_config)
self.client.start()
self.refreshStatus()
+
+ self.refreshThread = RefreshThread(self)
+ self.refreshThread.start()
+
except Exception as e:
- print(e)
+ logging.error(e)
+
+class RefreshThread(QThread):
+ def __init__(self, settings):
+ QThread.__init__(self)
+ self.settings = settings
+
+ def run(self):
+ time.sleep(2)
+ self.settings.refreshStatus()
if __name__ == '__main__':
import sys