diff options
author | suchinton2001 <suchinton.2001@gmail.com> | 2023-10-09 16:00:58 +0530 |
---|---|---|
committer | suchinton2001 <suchinton.2001@gmail.com> | 2023-10-09 16:00:58 +0530 |
commit | db862e32df7f31e6453d7f05a6f011091b96ffab (patch) | |
tree | 6abcc3a2eceb79a43b7836d1d23a8594da375826 /Widgets | |
parent | 260bbf89836ff9ce98054e5166699a163a0d7c11 (diff) |
agl-demo-control-panel: Add grpc support for databroker
- Add grpc support for databroker (set default protocol)
- Add virtual car for script mode in IC app
- Refine UI elements
- Use specific grpc/ws jwt tokens
- Simplify settings menu
Bug-AGL: SPEC-4905
Signed-off-by: suchinton2001 <suchinton.2001@gmail.com>
Change-Id: I59c4b1de80e280fe22993b2d2f7c92d6b41a89c7
Diffstat (limited to 'Widgets')
-rw-r--r-- | Widgets/ICPage.py | 373 | ||||
-rw-r--r-- | Widgets/SteeringCtrlPage.py | 206 | ||||
-rw-r--r-- | Widgets/settings.py | 137 |
3 files changed, 455 insertions, 261 deletions
diff --git a/Widgets/ICPage.py b/Widgets/ICPage.py index 64fa2d3..2d11cdf 100644 --- a/Widgets/ICPage.py +++ b/Widgets/ICPage.py @@ -18,10 +18,12 @@ import os import sys from PyQt5 import uic, QtCore, QtWidgets from PyQt5.QtWidgets import QApplication -from PyQt5.QtWidgets import QSlider, QLCDNumber, QPushButton from PyQt5.QtGui import QIcon, QPixmap, QPainter +from PyQt5.QtCore import QObject, pyqtSignal import time -from PyQt5.QtCore import QThread +from PyQt5.QtWidgets import QWidget +from qtwidgets import AnimatedToggle +import threading current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -49,35 +51,40 @@ class IC_Paths(): class ICWidget(Base, Form): + """ + This class represents the ICWidget which is a widget for the AGL Demo Control Panel. + It inherits from the Base and Form classes. + """ + def __init__(self, parent=None): + """ + Initializes the ICWidget object. + + Args: + - parent: The parent widget. Defaults to None. + """ super(self.__class__, self).__init__(parent) self.setupUi(self) self.IC = IC_Paths() + #self.vehicle_simulator = VehicleSimulator(self) self.feed_kuksa = FeedKuksa() self.feed_kuksa.start() + self.vehicle_simulator = VehicleSimulator() - # # load the stylesheet - # theme = open(os.path.join(current_dir, "../ui/styles/Tron/ICPage.qss"), 'r') - # self.setStyleSheet(theme.read()) - # theme.close() - - self.scriptBtn = self.findChild(QPushButton, "scriptBtn") + header_frame = self.findChild(QWidget, "header_frame") + layout = header_frame.layout() - self.Speed_slider = self.findChild(QSlider, "Speed_slider") - self.Speed_monitor = self.findChild(QLCDNumber, "Speed_monitor") - self.RPM_slider = self.findChild(QSlider, "RPM_slider") - self.RPM_monitor = self.findChild(QLCDNumber, "RPM_monitor") + self.IC_Frame = self.findChild(QWidget, "frame_1") - self.coolantTemp_slider = self.findChild(QSlider, "coolantTemp_slider") - self.fuelLevel_slider = self.findChild(QSlider, "fuelLevel_slider") + self.Script_toggle = AnimatedToggle( + checked_color="#4BD7D6", + pulse_checked_color="#00ffff" + ) - self.accelerationBtn = self.findChild(QPushButton, "accelerationBtn") - - self.leftIndicatorBtn = self.findChild(QPushButton, "leftIndicatorBtn") - self.rightIndicatorBtn = self.findChild(QPushButton, "rightIndicatorBtn") - self.hazardBtn = self.findChild(QPushButton, "hazardBtn") + layout.replaceWidget(self.demoToggle, self.Script_toggle) + self.demoToggle.deleteLater() buttons = [self.parkBtn, self.reverseBtn, @@ -85,6 +92,10 @@ class ICWidget(Base, Form): self.driveBtn] # group for the buttons for mutual exclusion + + self.vehicle_simulator.speed_changed.connect(self.set_Vehicle_Speed) + self.vehicle_simulator.rpm_changed.connect(self.set_Vehicle_RPM) + self.driveGroupBtns = QtWidgets.QButtonGroup(self) self.driveGroupBtns.setExclusive(True) @@ -93,8 +104,6 @@ class ICWidget(Base, Form): self.driveGroupBtns.buttonClicked.connect(self.driveBtnClicked) - self.scriptBtn.clicked.connect(self.scriptBtnClicked) - self.Speed_slider.valueChanged.connect(self.update_Speed_monitor) self.Speed_slider.setMinimum(0) self.Speed_slider.setMaximum(240) @@ -112,107 +121,136 @@ class ICWidget(Base, Form): self.accelerationBtn.released.connect(self.accelerationBtnReleased) # make both buttons checkable + self.Script_toggle.clicked.connect(self.handle_Script_toggle) self.leftIndicatorBtn.setCheckable(True) self.rightIndicatorBtn.setCheckable(True) self.hazardBtn.setCheckable(True) - self.leftIndicatorBtn.clicked.connect(self.leftIndicatorBtnClicked) - self.rightIndicatorBtn.clicked.connect(self.rightIndicatorBtnClicked) - self.hazardBtn.clicked.connect(self.hazardBtnClicked) + self.leftIndicatorBtn.toggled.connect(self.leftIndicatorBtnClicked) + self.rightIndicatorBtn.toggled.connect(self.rightIndicatorBtnClicked) + self.hazardBtn.toggled.connect(self.hazardBtnClicked) - def scriptBtnClicked(self): - if self.scriptBtn.isChecked(): - ICScript.start_script() + def set_Vehicle_Speed(self, speed): + self.Speed_slider.setValue(speed) - if not self.ScriptBtn.isChecked(): - ICScript.stop_script() + def set_Vehicle_RPM(self, rpm): + self.RPM_slider.setValue(rpm) def update_Speed_monitor(self): + """ + Updates the speed monitor with the current speed value. + """ speed = int(self.Speed_slider.value()) self.Speed_monitor.display(speed) - self.feed_kuksa.send_values(self.IC.speed, str(speed), 'value') + try: self.feed_kuksa.send_values(self.IC.speed, str(speed), 'value') + except Exception as e: print(e) def update_RPM_monitor(self): + """ + Updates the RPM monitor with the current RPM value. + """ rpm = int(self.RPM_slider.value()) self.RPM_monitor.display(rpm) - self.feed_kuksa.send_values(self.IC.engineRPM, str(rpm), 'value') + try: self.feed_kuksa.send_values(self.IC.engineRPM, str(rpm), 'value') + except Exception as e: print(e) def update_coolantTemp_monitor(self): + """ + Updates the coolant temperature monitor with the current coolant temperature value. + """ coolantTemp = int(self.coolantTemp_slider.value()) - self.feed_kuksa.send_values(self.IC.coolantTemp, str(coolantTemp), 'value') + try: self.feed_kuksa.send_values(self.IC.coolantTemp, str(coolantTemp), 'value') + except Exception as e: print(e) def update_fuelLevel_monitor(self): + """ + Updates the fuel level monitor with the current fuel level value. + """ fuelLevel = int(self.fuelLevel_slider.value()) - self.feed_kuksa.send_values(self.IC.fuelLevel, str(fuelLevel)) + try: self.feed_kuksa.send_values(self.IC.fuelLevel, str(fuelLevel)) + except Exception as e: print(e) def hazardBtnClicked(self): + """ + Handles the hazard button click event. + """ hazardIcon = QPixmap(":/Images/Images/hazard.png") + painter = QPainter(hazardIcon) + painter.setCompositionMode(QPainter.CompositionMode_SourceIn) + if self.hazardBtn.isChecked(): - painter = QPainter(hazardIcon) - painter.setCompositionMode(QPainter.CompositionMode_SourceIn) - painter.fillRect(hazardIcon.rect(), QtCore.Qt.yellow) - painter.end() - self.hazardBtn.setIcon(QIcon(hazardIcon)) - - self.leftIndicatorBtn.setChecked(True) - self.rightIndicatorBtn.setChecked(True) - self.feed_kuksa.send_values(self.IC.leftIndicator, "true") - self.feed_kuksa.send_values(self.IC.rightIndicator, "true") - self.feed_kuksa.send_values(self.IC.hazard, "true") + color = QtCore.Qt.yellow + value = "true" else: - painter = QPainter(hazardIcon) - painter.setCompositionMode(QPainter.CompositionMode_SourceIn) - painter.fillRect(hazardIcon.rect(), QtCore.Qt.black) - painter.end() - self.hazardBtn.setIcon(QIcon(hazardIcon)) - - self.leftIndicatorBtn.setChecked(False) - self.rightIndicatorBtn.setChecked(False) - self.feed_kuksa.send_values(self.IC.leftIndicator, "false") - self.feed_kuksa.send_values(self.IC.rightIndicator, "false") - self.feed_kuksa.send_values(self.IC.hazard, "false") + color = QtCore.Qt.black + value = "false" + + painter.fillRect(hazardIcon.rect(), color) + painter.end() + self.hazardBtn.setIcon(QIcon(hazardIcon)) + + self.leftIndicatorBtn.setChecked(self.hazardBtn.isChecked()) + self.rightIndicatorBtn.setChecked(self.hazardBtn.isChecked()) + + try: + self.feed_kuksa.send_values(self.IC.leftIndicator, value) + self.feed_kuksa.send_values(self.IC.rightIndicator, value) + self.feed_kuksa.send_values(self.IC.hazard, value) + except Exception as e: + print(e) + def leftIndicatorBtnClicked(self): + """ + Handles the left indicator button click event. + """ leftIndicatorIcon = QPixmap(":/Images/Images/left.png") - if self.leftIndicatorBtn.isChecked(): + painter = QPainter(leftIndicatorIcon) + painter.setCompositionMode(QPainter.CompositionMode_SourceIn) - painter = QPainter(leftIndicatorIcon) - painter.setCompositionMode(QPainter.CompositionMode_SourceIn) - painter.fillRect(leftIndicatorIcon.rect(), QtCore.Qt.green) - painter.end() - - self.leftIndicatorBtn.setIcon(QIcon(leftIndicatorIcon)) - self.feed_kuksa.send_values(self.IC.leftIndicator, "true") + if self.leftIndicatorBtn.isChecked(): + color = QtCore.Qt.green + value = "true" else: + color = QtCore.Qt.black + value = "false" - painter = QPainter(leftIndicatorIcon) - painter.setCompositionMode(QPainter.CompositionMode_SourceIn) - painter.fillRect(leftIndicatorIcon.rect(), QtCore.Qt.black) - painter.end() + painter.fillRect(leftIndicatorIcon.rect(), color) + painter.end() + self.leftIndicatorBtn.setIcon(QIcon(leftIndicatorIcon)) - self.leftIndicatorBtn.setIcon(QIcon(leftIndicatorIcon)) - self.feed_kuksa.send_values(self.IC.leftIndicator, "false") + try: self.feed_kuksa.send_values(self.IC.leftIndicator, value) + except Exception as e: print(e) def rightIndicatorBtnClicked(self): + """ + Handles the right indicator button click event. + """ rightIndicatorIcon = QPixmap(":/Images/Images/right.png") - if self.rightIndicatorBtn.isChecked(): + painter = QPainter(rightIndicatorIcon) + painter.setCompositionMode(QPainter.CompositionMode_SourceIn) - painter = QPainter(rightIndicatorIcon) - painter.setCompositionMode(QPainter.CompositionMode_SourceIn) - painter.fillRect(rightIndicatorIcon.rect(), QtCore.Qt.green) - painter.end() - self.rightIndicatorBtn.setIcon(QIcon(rightIndicatorIcon)) - self.feed_kuksa.send_values(self.IC.rightIndicator, "true") + if self.rightIndicatorBtn.isChecked(): + color = QtCore.Qt.green + value = "true" else: + color = QtCore.Qt.black + value = "false" + + painter.fillRect(rightIndicatorIcon.rect(), color) + painter.end() + self.rightIndicatorBtn.setIcon(QIcon(rightIndicatorIcon)) + + try: + self.feed_kuksa.send_values(self.IC.rightIndicator, value) + except Exception as e: + print(e) - painter = QPainter(rightIndicatorIcon) - painter.setCompositionMode(QPainter.CompositionMode_SourceIn) - painter.fillRect(rightIndicatorIcon.rect(), QtCore.Qt.black) - painter.end() - self.rightIndicatorBtn.setIcon(QIcon(rightIndicatorIcon)) - self.feed_kuksa.send_values(self.IC.rightIndicator, "false") def accelerationBtnPressed(self): + """ + Handles the acceleration button press event. + """ self.startTime = QtCore.QTime.currentTime() self.acceleration_timer = QtCore.QTimer() self.acceleration_timer.timeout.connect( @@ -227,6 +265,14 @@ class ICWidget(Base, Form): lambda: self.updateSpeedAndEngineRpm("Decelerate")) self.acceleration_timer.start(100) + def handle_Script_toggle(self): + if self.Script_toggle.isChecked(): + self.set_Vehicle_RPM(1000) + self.set_Vehicle_Speed(0) + self.vehicle_simulator.start() + else: + self.vehicle_simulator.stop() + def updateSpeedAndEngineRpm(self, action, acceleration=(60/5)): if action == "Accelerate": pass @@ -257,33 +303,26 @@ class ICWidget(Base, Form): self.update_RPM_monitor() def driveBtnClicked(self): - # // Selected Gear output = > 0 = Neutral, 1/2/.. = Forward, -1/.. = Reverse, 126 = Park, 127 = Drive - # #859287 ; /* light green */ - - if self.driveGroupBtns.checkedButton() == self.driveBtn: + gear_mapping = { + self.driveBtn: "127", + self.parkBtn: "126", + self.reverseBtn: "-1", + self.neutralBtn: "0" + } + + checked_button = self.driveGroupBtns.checkedButton() + + if checked_button in gear_mapping: + gear_value = gear_mapping[checked_button] self.accelerationBtn.setEnabled(True) - self.Speed_slider.setEnabled(True) - self.RPM_slider.setEnabled(True) - self.feed_kuksa.send_values(self.IC.selectedGear, "127") - - if self.driveGroupBtns.checkedButton() == self.parkBtn: - self.accelerationBtn.setEnabled(False) - self.Speed_slider.setEnabled(False) - self.RPM_slider.setEnabled(False) - self.feed_kuksa.send_values(self.IC.selectedGear, "126") - - if self.driveGroupBtns.checkedButton() == self.reverseBtn: - self.accelerationBtn.setEnabled(True) - self.Speed_slider.setEnabled(True) - self.RPM_slider.setEnabled(True) - self.feed_kuksa.send_values(self.IC.selectedGear, "-1") - - if self.driveGroupBtns.checkedButton() == self.neutralBtn: - self.accelerationBtn.setEnabled(False) - self.Speed_slider.setEnabled(False) + self.Speed_slider.setEnabled(checked_button != self.neutralBtn) self.RPM_slider.setEnabled(True) - self.feed_kuksa.send_values(self.IC.selectedGear, "0") - + try: + self.feed_kuksa.send_values(self.IC.selectedGear, gear_value) + except Exception as e: + print(e) + else: + print("Unknown button checked!") class AccelerationFns(): def calculate_speed(time, acceleration) -> int: @@ -314,37 +353,105 @@ class AccelerationFns(): engine_rpm = wheel_rps * gear_ratios[current_gear - 1] * 60 return int(engine_rpm) + +class VehicleSimulator(QObject): + # Define signals for updating speed and rpm + speed_changed = pyqtSignal(int) + rpm_changed = pyqtSignal(int) + DEFAULT_IDLE_RPM = 1000 -class ICScript(ICWidget): - def start_script(self): - ICWidget.reset() - - # disable all widgets in the scroll area - for widget in ICWidget.scrollAreaWidgetContents.children(): - widget.setEnabled(False) - - rates = [(60/5), (60/4), (60/3)] + def __init__(self): + super().__init__() + self.freq = 10 + self.vehicle_speed = 0 + self.engine_speed = self.DEFAULT_IDLE_RPM + self.running = False + self.lock = threading.Lock() + self.thread = threading.Thread(target=self.run, daemon=True) + + def start(self): + if not self.running: + self.reset() + self.running = True + self.thread.start() + + def stop(self): + self.running = False + + def reset(self): + with self.lock: + self.vehicle_speed = 0 + self.engine_speed = self.DEFAULT_IDLE_RPM + + def run(self): + while self.running: + if not self.running: + break - # start assigning values to the speed and rpm sliders and send them to the IC do this in a loop for each rate - for rate in rates: - ICWidget.accelerationBtnPressed() - ICWidget.acceleration_timer.timeout.connect( - lambda: ICWidget.updateSpeedAndEngineRpm("Accelerate"), rate) - ICWidget.acceleration_timer.start(100) - time.sleep(5) - ICWidget.accelerationBtnReleased() - ICWidget.acceleration_timer.timeout.connect( - lambda: ICWidget.updateSpeedAndEngineRpm("Decelerate"), rate) - ICWidget.acceleration_timer.start(100) + # Simulate acceleration and update speed and rpm + self.accelerate(60, 1800, 3) + self.accelerate(65, 1700, 1) + self.accelerate(80, 2500, 6) + self.accelerate(100, 3000, 4) + self.brake(80, 3000, 3) + self.accelerate(104, 4000, 6) + self.brake(40, 2000, 4) + self.accelerate(90, 3000, 5) + self.brake(1, 650, 5) + + # Ensure reset is called when not in cruise mode + if not self.running: + self.reset() + time.sleep(5) - def stop_script(self): - ICWidget.reset() - - # enable all widgets in the scroll area - for widget in ICWidget.scrollAreaWidgetContents.children(): - widget.setEnabled(True) + def accelerate(self, target_speed, target_rpm, duration): + if target_speed <= self.vehicle_speed: + return + v = (target_speed - self.vehicle_speed) / (duration * self.freq) + r = (target_rpm - self.engine_speed) / (duration * self.freq) + while self.vehicle_speed < target_speed and self.running: + with self.lock: + self.vehicle_speed += v + self.engine_speed += r + self.speed_changed.emit(int(self.vehicle_speed)) + self.rpm_changed.emit(int(self.engine_speed)) + time.sleep(1 / self.freq) + + def brake(self, target_speed, target_rpm, duration): + if target_speed >= self.vehicle_speed: + return + v = (self.vehicle_speed - target_speed) / (duration * self.freq) + r = (self.engine_speed - target_rpm) / (duration * self.freq) + while self.vehicle_speed > target_speed and self.running: + with self.lock: + self.vehicle_speed -= v + self.engine_speed -= r + self.speed_changed.emit(int(self.vehicle_speed)) + self.rpm_changed.emit(int(self.engine_speed)) + time.sleep(1 / self.freq) + + def increase(self, bycruise = True): + if self.CRUISEACTIVE: + target_speed = self.vehicle_speed + 5 + target_rpm = self.engine_speed * 1.1 + self.accelerate(target_speed, target_rpm, 2, bycruise) + + def decrease(self, bycruise = True): + if self.CRUISEACTIVE: + target_speed = self.vehicle_speed - 5 + target_rpm = self.engine_speed * 0.9 + self.brake(target_speed, target_rpm, 2, bycruise) + + def resume(self, bycruise = True): + target_speed = self.CRUISESPEED + target_rpm = self.CRUISERPM + current_speed = self.get_vehicle_speed() + if target_speed > current_speed: + self.accelerate(target_speed, target_rpm, 2, bycruise) + else: + self.brake(target_speed, target_rpm, 2, bycruise) if __name__ == '__main__': app = QApplication(sys.argv) diff --git a/Widgets/SteeringCtrlPage.py b/Widgets/SteeringCtrlPage.py index 61271c2..696c6c9 100644 --- a/Widgets/SteeringCtrlPage.py +++ b/Widgets/SteeringCtrlPage.py @@ -28,7 +28,9 @@ current_dir = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.dirname(current_dir)) -import extras.Kuksa_Instance as kuksa_instance +from extras.FeedKuksa import FeedKuksa +import extras.FeedCAN as feed_can +from . import settings Form, Base = uic.loadUiType(os.path.join(current_dir, "../ui/SteeringControls.ui")) @@ -36,31 +38,62 @@ Form, Base = uic.loadUiType(os.path.join(current_dir, "../ui/SteeringControls.ui class Steering_Paths(): def __init__(self): - self.VolumeUp = "Vehicle.Cabin.SteeringWheel.Switches.VolumeUp" - self.VolumeDown = "Vehicle.Cabin.SteeringWheel.Switches.VolumeDown" - self.VolumeMute = "Vehicle.Cabin.SteeringWheel.Switches.VolumeMute" - - self.Mode = "Vehicle.Cabin.SteeringWheel.Switches.Mode" - - self.NextTrack = "Vehicle.Cabin.SteeringWheel.Switches.Next" - self.PreviousTrack = "Vehicle.Cabin.SteeringWheel.Switches.Previous" - - self.Info = "Vehicle.Cabin.SteeringWheel.Switches.Info" - - self.PhoneCall = "Vehicle.Cabin.SteeringWheel.Switches.PhoneCall" - self.PhoneHangup = "Vehicle.Cabin.SteeringWheel.Switches.PhoneHangup" - self.Voice = "Vehicle.Cabin.SteeringWheel.Switches.Voice" - self.LaneDeparture = "Vehicle.Cabin.SteeringWheel.Switches.LaneDepartureWarning" - - self.Horn = "Vehicle.Cabin.SteeringWheel.Switches.Horn" - - self.CruiseEnable = "Vehicle.Cabin.SteeringWheel.Switches.CruiseEnable" - self.CruseSet = "Vehicle.Cabin.SteeringWheel.Switches.CruiseSet" - self.CruiseResume = "Vehicle.Cabin.SteeringWheel.Switches.CruiseResume" - self.CruiseCancel = "Vehicle.Cabin.SteeringWheel.Switches.CruiseCancel" - - self.CruiseLimit = "Vehicle.Cabin.SteeringWheel.Switches.CruiseLimit" - self.CruiseDistance = "Vehicle.Cabin.SteeringWheel.Switches.CruiseDistance" + self.switches = { + "VolumeUp": { + "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.VolumeUp", + "CAN": "021#FFFFFFFF40000000"}, + "VolumeDown": { + "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.VolumeDown", + "CAN": "021#FFFFFFFF10000000"}, + "VolumeMute": { + "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.VolumeMute", + "CAN": "021#FFFFFFFF01000000"}, + "Mode": { + "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.Mode", + "CAN": "021#FFFFFFFF20000000"}, + "NextTrack": { + "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.Next", + "CAN": "021#FFFFFFFF08000000"}, + "PreviousTrack": { + "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.Previous", + "CAN": "021#FFFFFFFF80000000"}, + "Info": { + "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.Info", + "CAN": "021#FFFFFFFF02000000"}, + "PhoneCall": { + "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.PhoneCall", + "CAN": "021#FFFFFFFF00010000"}, + "PhoneHangup": { + "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.PhoneHangup", + "CAN": "021#FFFFFFFF00020000"}, + "Voice": { + "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.Voice", + "CAN": "021#FFFFFFFF00040000"}, + "LaneDeparture": { + "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.LaneDepartureWarning", + "CAN": "021#FFFFFFFF00000001"}, + "Horn": { + "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.Horn", + "CAN": "021#FFFFFFFF00000080"}, + "CruiseEnable": { + "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.CruiseEnable", + "CAN": "021#FFFFFFFF00008000"}, + "CruiseSet": { + "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.CruiseSet", + "CAN": "021#FFFFFFFF00001000"}, + "CruiseResume": { + "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.CruiseResume", + "CAN": "021#FFFFFFFF00004000"}, + "CruiseCancel": { + "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.CruiseCancel", + "CAN": "021#FFFFFFFF00000800"}, + "CruiseLimit": { + "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.CruiseLimit", + "CAN": "021#FFFFFFFF00000200"}, + "CruiseDistance": { + "Kuksa": "Vehicle.Cabin.SteeringWheel.Switches.CruiseDistance", + "CAN": "021#FFFFFFFF00000100"} + } class SteeringCtrlWidget(Base, Form): def __init__(self, parent=None): @@ -69,94 +102,49 @@ class SteeringCtrlWidget(Base, Form): self.Steering = Steering_Paths() self.feed_kuksa = FeedKuksa() + self.settings = settings + self.add_buttons() def add_buttons(self): # Define button groups and actions - LeftControlsBtns = [self.VolUpBtn, - self.VolDownBtn, - self.ModeBtn, - self.VolMuteBtn, - self.NextTrackBtn, - self.PrevTrackBtn, - self.InfoBtn] - - - PhoneBtns = [self.PhoneCallBtn, self.PhoneHangupBtn] - ExtraContolsBtns = [self.VoiceBtn, self.LaneDepartureBtn] - - RightControlsBtns = [self.CruiseEnableBtn, - self.CruiseSetBtn, - self.CruiseResumeBtn, - self.CruiseCancelBtn, - self.CruiseLimitBtn, - self.CruiseDistanceBtn] - - self.LeftControlsBtnsGroup = QButtonGroup() - self.PhoneBtnsGroup = QButtonGroup() - self.ExtraContolsBtnsGroup = QButtonGroup() - self.RiqhtControlsBtnsGroup = QButtonGroup() - - for btn in LeftControlsBtns: - self.LeftControlsBtnsGroup.addButton(btn) - - for btn in PhoneBtns: - self.PhoneBtnsGroup.addButton(btn) - - - for btn in RightControlsBtns: - self.RiqhtControlsBtnsGroup.addButton(btn) - - self.LeftControlsBtnsGroup.buttonClicked.connect(self.left_controls_clicked) - self.RiqhtControlsBtnsGroup.buttonClicked.connect(self.right_controls_clicked) - - self.HornBtn.clicked.connect(self.horn_clicked) - - def left_controls_clicked(self): - print("Left controls clicked") - - def right_controls_clicked(self): - print("Right controls clicked") - - def horn_clicked(self): - print("Horn clicked") - -class FeedKuksa(QThread): - def __init__(self, parent=None): - QThread.__init__(self,parent) - self.stop_flag = False - self.set_instance() - - def run(self): - print("Starting thread") - self.set_instance() - while not self.stop_flag: - self.send_values() - - def stop(self): - self.stop_flag = True - print("Stopping thread") - - def set_instance(self): - self.kuksa = kuksa_instance.KuksaClientSingleton.get_instance() - self.client = self.kuksa.get_client() - - def send_values(self, Path=None, Value=None, Attribute=None): - if self.client is not None: - if self.client.checkConnection() is True: - - if Attribute is not None: - self.client.setValue(Path, Value, Attribute) - else: - self.client.setValue(Path, Value) - else: - print("Could not connect to Kuksa") - self.set_instance() - else: - print("Kuksa client is None, try reconnecting") - time.sleep(2) - self.set_instance() + ControlsBtns = [self.VolumeUp, + self.VolumeDown, + self.Mode, + self.VolumeMute, + self.NextTrack, + self.PreviousTrack, + self.Info, + self.PhoneCall, + self.PhoneHangup, + self.Voice, + self.LaneDeparture, + self.CruiseEnable, + self.CruiseSet, + self.CruiseResume, + self.CruiseCancel, + self.CruiseLimit, + self.CruiseDistance] + + self.ControlsBtnsGroup = QButtonGroup() + + for btn in ControlsBtns: + self.ControlsBtnsGroup.addButton(btn) + + self.ControlsBtnsGroup.buttonClicked.connect(self.controls_clicked) + + def controls_clicked(self, button): + button_clicked = button.objectName() + signal_type = settings.Steering_Signal_Type + if signal_type == "Kuksa": + self.feed_kuksa.send_values(self.Steering.switches[button_clicked]["Kuksa"], 1) + self.feed_kuksa.send_values(self.Steering.switches[button_clicked]["Kuksa"], 0) + elif signal_type == "CAN": + feed_can.send_can_signal(self.Steering.switches[button_clicked]["CAN"]) + # Making sure button state goes back to off + feed_can.send_can_signal("021#FFFFFFFF00000000") + print(button_clicked + " button clicked") if __name__ == '__main__': import sys diff --git a/Widgets/settings.py b/Widgets/settings.py index 11800a2..17ea7f6 100644 --- a/Widgets/settings.py +++ b/Widgets/settings.py @@ -12,12 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import extras.Kuksa_Instance as kuksa_instance + import os import sys import time from PyQt5 import uic -from PyQt5.QtWidgets import QApplication, QLineEdit, QPushButton, QLabel, QCheckBox +from PyQt5.QtWidgets import QApplication, QLineEdit, QPushButton, QLabel +from qtwidgets import AnimatedToggle +from PyQt5.QtWidgets import QWidget +from PyQt5.QtCore import QThread +from PyQt5 import QtGui +import logging current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -25,47 +30,115 @@ current_dir = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.dirname(current_dir)) +import extras.Kuksa_Instance as kuksa_instance Form, Base = uic.loadUiType(os.path.join( current_dir, "../ui/Settings_Window.ui")) # ======================================== +Steering_Signal_Type = "Kuksa" class settings(Base, Form): + """ + A class representing the settings widget of the AGL Demo Control Panel. + + Attributes: + - SSL_toggle: An AnimatedToggle object representing the SSL toggle button. + - CAN_Kuksa_toggle: An AnimatedToggle object representing the CAN/Kuksa toggle button. + - connectionStatus: A QLabel object representing the connection status label. + - connectionLogo: A QLabel object representing the connection logo label. + - IPAddrInput: A QLineEdit object representing the IP address input field. + - reconnectBtn: A QPushButton object representing the reconnect button. + - refreshBtn: A QPushButton object representing the refresh button. + - startClientBtn: A QPushButton object representing the start client button. + """ + def __init__(self, parent=None): + """ + Initializes the settings widget of the AGL Demo Control Panel. + """ super(self.__class__, self).__init__(parent) self.setupUi(self) - self.SSLToggle = self.findChild(QCheckBox, "SSLToggle") + self.SSL_toggle = AnimatedToggle( + checked_color="#4BD7D6", + pulse_checked_color="#00ffff", + ) + + self.Protocol_toggle = AnimatedToggle( + checked_color="#4BD7D6", + pulse_checked_color="#00ffff" + ) + + self.CAN_Kuksa_toggle = AnimatedToggle( + checked_color="#4BD7D6", + pulse_checked_color="#00ffff" + ) self.connectionStatus = self.findChild(QLabel, "connectionStatus") self.connectionLogo = self.findChild(QLabel, "connectionLogo") self.IPAddrInput = self.findChild(QLineEdit, "IPAddrInput") - self.tokenPathInput = self.findChild(QLineEdit, "tokenPathInput") self.reconnectBtn = self.findChild(QPushButton, "reconnectBtn") - self.refreshBtn = self.findChild(QPushButton, "refreshBtn") self.startClientBtn = self.findChild(QPushButton, "startClientBtn") self.startClientBtn.clicked.connect(self.set_instance) self.reconnectBtn.clicked.connect(self.reconnectClient) - self.refreshBtn.clicked.connect(self.refreshStatus) + self.SSL_toggle.clicked.connect(self.toggleSSL) + self.CAN_Kuksa_toggle.clicked.connect(self.toggle_CAN_Kuksa) + + Frame_GS = self.findChild(QWidget, "frame_general_settings") + Frame_PS = self.findChild(QWidget, "frame_page_settings") + GS_layout = Frame_GS.layout() + PS_layout = Frame_PS.layout() + + GS_layout.replaceWidget(self.place_holder_toggle_1, self.SSL_toggle) + GS_layout.replaceWidget(self.place_holder_toggle_2, self.Protocol_toggle) + PS_layout.replaceWidget(self.place_holder_toggle_3, self.CAN_Kuksa_toggle) + + self.place_holder_toggle_1.deleteLater() + self.place_holder_toggle_2.deleteLater() + self.place_holder_toggle_3.deleteLater() self.refreshStatus() - self.show() + + def toggleSSL(self): + """ + Toggles the SSL connection. + """ + self.kuksa_config["insecure"] = not self.SSL_toggle.isChecked() + print(self.kuksa_config) + + def toggle_CAN_Kuksa(self): + """ + Toggles the CAN/Kuksa connection. + """ + global Steering_Signal_Type + if (self.CAN_Kuksa_toggle.isChecked()): + Steering_Signal_Type = "CAN" + else: + Steering_Signal_Type = "Kuksa" + + def get_protocol(self): + if (not self.Protocol_toggle.isChecked()): + return "ws" + else: + return "grpc" def set_instance(self): - self.kuksa = kuksa_instance.KuksaClientSingleton.get_instance() + """ + Sets the instance of the Kuksa client. + """ + self.kuksa = kuksa_instance.KuksaClientSingleton.instance() self.client = self.kuksa.get_client() - self.config = self.kuksa.get_config() - self.token = self.kuksa.get_token() + self.kuksa_config = self.kuksa.get_config() - self.IPAddrInput.setText(self.config["ip"]) - self.SSLToggle.setChecked(self.config["insecure"]) - self.tokenPathInput.setText(self.token) + self.IPAddrInput.setText(self.kuksa_config["ip"]) + self.SSL_toggle.setChecked(not self.kuksa_config["insecure"]) + self.Protocol_toggle.setChecked(self.kuksa_config["protocol"] == 'grpc') time.sleep(2) @@ -76,16 +149,21 @@ class settings(Base, Form): self.refreshStatus() def refreshStatus(self): + """ + Refreshes the connection status. + """ try: if (self.client is None): self.connectionStatus.setText('Not Connected') self.connectionLogo.setStyleSheet("background-color: red") + self.connectionLogo.setPixmap(QtGui.QPixmap(":/Carbon_Icons/carbon_icons/connection-signal--off.svg")) return None if (self.client.checkConnection() == True): self.connectionStatus.setText('Connected') - self.connectionLogo.setPixmap(":/icons/feather/check-circle.svg") self.connectionLogo.setStyleSheet("background-color: green") + # change cnnection logo pixmap to connected.svf from resources + self.connectionLogo.setPixmap(QtGui.QPixmap(":/Carbon_Icons/carbon_icons/connection-signal.svg")) self.client.start() return True @@ -93,20 +171,41 @@ class settings(Base, Form): self.client.stop() self.connectionStatus.setText('Disconnected') self.connectionLogo.setStyleSheet("background-color: yellow") + self.connectionLogo.setPixmap(QtGui.QPixmap(":/Carbon_Icons/carbon_icons/connection-signal--off.svg")) return False except: pass def reconnectClient(self): + """ + Reconnects the client. + """ try: - self.config["ip"] = self.IPAddrInput.text() - self.config["insecure"] = self.SSLToggle.isChecked() - self.token = self.tokenPathInput.text() - self.client = self.kuksa.reconnect_client(self.config, self.token) + self.kuksa_config["ip"] = self.IPAddrInput.text() + self.kuksa_config["insecure"] = not self.SSL_toggle.isChecked() + self.kuksa_config["protocol"] = self.get_protocol() + if self.kuksa_config["protocol"] == 'ws': + self.kuksa_config["port"] = "8090" + if self.kuksa_config["protocol"] == 'grpc': + self.kuksa_config["port"] = "55555" + self.client = self.kuksa.reconnect(self.kuksa_config) self.client.start() self.refreshStatus() + + self.refreshThread = RefreshThread(self) + self.refreshThread.start() + except Exception as e: - print(e) + logging.error(e) + +class RefreshThread(QThread): + def __init__(self, settings): + QThread.__init__(self) + self.settings = settings + + def run(self): + time.sleep(2) + self.settings.refreshStatus() if __name__ == '__main__': import sys |