diff options
author | suchinton2001 <suchinton.2001@gmail.com> | 2023-10-09 16:00:58 +0530 |
---|---|---|
committer | suchinton2001 <suchinton.2001@gmail.com> | 2023-10-09 16:00:58 +0530 |
commit | db862e32df7f31e6453d7f05a6f011091b96ffab (patch) | |
tree | 6abcc3a2eceb79a43b7836d1d23a8594da375826 /Widgets/ICPage.py | |
parent | 260bbf89836ff9ce98054e5166699a163a0d7c11 (diff) |
agl-demo-control-panel: Add grpc support for databroker
- Add grpc support for databroker (set default protocol)
- Add virtual car for script mode in IC app
- Refine UI elements
- Use specific grpc/ws jwt tokens
- Simplify settings menu
Bug-AGL: SPEC-4905
Signed-off-by: suchinton2001 <suchinton.2001@gmail.com>
Change-Id: I59c4b1de80e280fe22993b2d2f7c92d6b41a89c7
Diffstat (limited to 'Widgets/ICPage.py')
-rw-r--r-- | Widgets/ICPage.py | 373 |
1 files changed, 240 insertions, 133 deletions
diff --git a/Widgets/ICPage.py b/Widgets/ICPage.py index 64fa2d3..2d11cdf 100644 --- a/Widgets/ICPage.py +++ b/Widgets/ICPage.py @@ -18,10 +18,12 @@ import os import sys from PyQt5 import uic, QtCore, QtWidgets from PyQt5.QtWidgets import QApplication -from PyQt5.QtWidgets import QSlider, QLCDNumber, QPushButton from PyQt5.QtGui import QIcon, QPixmap, QPainter +from PyQt5.QtCore import QObject, pyqtSignal import time -from PyQt5.QtCore import QThread +from PyQt5.QtWidgets import QWidget +from qtwidgets import AnimatedToggle +import threading current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -49,35 +51,40 @@ class IC_Paths(): class ICWidget(Base, Form): + """ + This class represents the ICWidget which is a widget for the AGL Demo Control Panel. + It inherits from the Base and Form classes. + """ + def __init__(self, parent=None): + """ + Initializes the ICWidget object. + + Args: + - parent: The parent widget. Defaults to None. + """ super(self.__class__, self).__init__(parent) self.setupUi(self) self.IC = IC_Paths() + #self.vehicle_simulator = VehicleSimulator(self) self.feed_kuksa = FeedKuksa() self.feed_kuksa.start() + self.vehicle_simulator = VehicleSimulator() - # # load the stylesheet - # theme = open(os.path.join(current_dir, "../ui/styles/Tron/ICPage.qss"), 'r') - # self.setStyleSheet(theme.read()) - # theme.close() - - self.scriptBtn = self.findChild(QPushButton, "scriptBtn") + header_frame = self.findChild(QWidget, "header_frame") + layout = header_frame.layout() - self.Speed_slider = self.findChild(QSlider, "Speed_slider") - self.Speed_monitor = self.findChild(QLCDNumber, "Speed_monitor") - self.RPM_slider = self.findChild(QSlider, "RPM_slider") - self.RPM_monitor = self.findChild(QLCDNumber, "RPM_monitor") + self.IC_Frame = self.findChild(QWidget, "frame_1") - self.coolantTemp_slider = self.findChild(QSlider, "coolantTemp_slider") - self.fuelLevel_slider = self.findChild(QSlider, "fuelLevel_slider") + self.Script_toggle = AnimatedToggle( + checked_color="#4BD7D6", + pulse_checked_color="#00ffff" + ) - self.accelerationBtn = self.findChild(QPushButton, "accelerationBtn") - - self.leftIndicatorBtn = self.findChild(QPushButton, "leftIndicatorBtn") - self.rightIndicatorBtn = self.findChild(QPushButton, "rightIndicatorBtn") - self.hazardBtn = self.findChild(QPushButton, "hazardBtn") + layout.replaceWidget(self.demoToggle, self.Script_toggle) + self.demoToggle.deleteLater() buttons = [self.parkBtn, self.reverseBtn, @@ -85,6 +92,10 @@ class ICWidget(Base, Form): self.driveBtn] # group for the buttons for mutual exclusion + + self.vehicle_simulator.speed_changed.connect(self.set_Vehicle_Speed) + self.vehicle_simulator.rpm_changed.connect(self.set_Vehicle_RPM) + self.driveGroupBtns = QtWidgets.QButtonGroup(self) self.driveGroupBtns.setExclusive(True) @@ -93,8 +104,6 @@ class ICWidget(Base, Form): self.driveGroupBtns.buttonClicked.connect(self.driveBtnClicked) - self.scriptBtn.clicked.connect(self.scriptBtnClicked) - self.Speed_slider.valueChanged.connect(self.update_Speed_monitor) self.Speed_slider.setMinimum(0) self.Speed_slider.setMaximum(240) @@ -112,107 +121,136 @@ class ICWidget(Base, Form): self.accelerationBtn.released.connect(self.accelerationBtnReleased) # make both buttons checkable + self.Script_toggle.clicked.connect(self.handle_Script_toggle) self.leftIndicatorBtn.setCheckable(True) self.rightIndicatorBtn.setCheckable(True) self.hazardBtn.setCheckable(True) - self.leftIndicatorBtn.clicked.connect(self.leftIndicatorBtnClicked) - self.rightIndicatorBtn.clicked.connect(self.rightIndicatorBtnClicked) - self.hazardBtn.clicked.connect(self.hazardBtnClicked) + self.leftIndicatorBtn.toggled.connect(self.leftIndicatorBtnClicked) + self.rightIndicatorBtn.toggled.connect(self.rightIndicatorBtnClicked) + self.hazardBtn.toggled.connect(self.hazardBtnClicked) - def scriptBtnClicked(self): - if self.scriptBtn.isChecked(): - ICScript.start_script() + def set_Vehicle_Speed(self, speed): + self.Speed_slider.setValue(speed) - if not self.ScriptBtn.isChecked(): - ICScript.stop_script() + def set_Vehicle_RPM(self, rpm): + self.RPM_slider.setValue(rpm) def update_Speed_monitor(self): + """ + Updates the speed monitor with the current speed value. + """ speed = int(self.Speed_slider.value()) self.Speed_monitor.display(speed) - self.feed_kuksa.send_values(self.IC.speed, str(speed), 'value') + try: self.feed_kuksa.send_values(self.IC.speed, str(speed), 'value') + except Exception as e: print(e) def update_RPM_monitor(self): + """ + Updates the RPM monitor with the current RPM value. + """ rpm = int(self.RPM_slider.value()) self.RPM_monitor.display(rpm) - self.feed_kuksa.send_values(self.IC.engineRPM, str(rpm), 'value') + try: self.feed_kuksa.send_values(self.IC.engineRPM, str(rpm), 'value') + except Exception as e: print(e) def update_coolantTemp_monitor(self): + """ + Updates the coolant temperature monitor with the current coolant temperature value. + """ coolantTemp = int(self.coolantTemp_slider.value()) - self.feed_kuksa.send_values(self.IC.coolantTemp, str(coolantTemp), 'value') + try: self.feed_kuksa.send_values(self.IC.coolantTemp, str(coolantTemp), 'value') + except Exception as e: print(e) def update_fuelLevel_monitor(self): + """ + Updates the fuel level monitor with the current fuel level value. + """ fuelLevel = int(self.fuelLevel_slider.value()) - self.feed_kuksa.send_values(self.IC.fuelLevel, str(fuelLevel)) + try: self.feed_kuksa.send_values(self.IC.fuelLevel, str(fuelLevel)) + except Exception as e: print(e) def hazardBtnClicked(self): + """ + Handles the hazard button click event. + """ hazardIcon = QPixmap(":/Images/Images/hazard.png") + painter = QPainter(hazardIcon) + painter.setCompositionMode(QPainter.CompositionMode_SourceIn) + if self.hazardBtn.isChecked(): - painter = QPainter(hazardIcon) - painter.setCompositionMode(QPainter.CompositionMode_SourceIn) - painter.fillRect(hazardIcon.rect(), QtCore.Qt.yellow) - painter.end() - self.hazardBtn.setIcon(QIcon(hazardIcon)) - - self.leftIndicatorBtn.setChecked(True) - self.rightIndicatorBtn.setChecked(True) - self.feed_kuksa.send_values(self.IC.leftIndicator, "true") - self.feed_kuksa.send_values(self.IC.rightIndicator, "true") - self.feed_kuksa.send_values(self.IC.hazard, "true") + color = QtCore.Qt.yellow + value = "true" else: - painter = QPainter(hazardIcon) - painter.setCompositionMode(QPainter.CompositionMode_SourceIn) - painter.fillRect(hazardIcon.rect(), QtCore.Qt.black) - painter.end() - self.hazardBtn.setIcon(QIcon(hazardIcon)) - - self.leftIndicatorBtn.setChecked(False) - self.rightIndicatorBtn.setChecked(False) - self.feed_kuksa.send_values(self.IC.leftIndicator, "false") - self.feed_kuksa.send_values(self.IC.rightIndicator, "false") - self.feed_kuksa.send_values(self.IC.hazard, "false") + color = QtCore.Qt.black + value = "false" + + painter.fillRect(hazardIcon.rect(), color) + painter.end() + self.hazardBtn.setIcon(QIcon(hazardIcon)) + + self.leftIndicatorBtn.setChecked(self.hazardBtn.isChecked()) + self.rightIndicatorBtn.setChecked(self.hazardBtn.isChecked()) + + try: + self.feed_kuksa.send_values(self.IC.leftIndicator, value) + self.feed_kuksa.send_values(self.IC.rightIndicator, value) + self.feed_kuksa.send_values(self.IC.hazard, value) + except Exception as e: + print(e) + def leftIndicatorBtnClicked(self): + """ + Handles the left indicator button click event. + """ leftIndicatorIcon = QPixmap(":/Images/Images/left.png") - if self.leftIndicatorBtn.isChecked(): + painter = QPainter(leftIndicatorIcon) + painter.setCompositionMode(QPainter.CompositionMode_SourceIn) - painter = QPainter(leftIndicatorIcon) - painter.setCompositionMode(QPainter.CompositionMode_SourceIn) - painter.fillRect(leftIndicatorIcon.rect(), QtCore.Qt.green) - painter.end() - - self.leftIndicatorBtn.setIcon(QIcon(leftIndicatorIcon)) - self.feed_kuksa.send_values(self.IC.leftIndicator, "true") + if self.leftIndicatorBtn.isChecked(): + color = QtCore.Qt.green + value = "true" else: + color = QtCore.Qt.black + value = "false" - painter = QPainter(leftIndicatorIcon) - painter.setCompositionMode(QPainter.CompositionMode_SourceIn) - painter.fillRect(leftIndicatorIcon.rect(), QtCore.Qt.black) - painter.end() + painter.fillRect(leftIndicatorIcon.rect(), color) + painter.end() + self.leftIndicatorBtn.setIcon(QIcon(leftIndicatorIcon)) - self.leftIndicatorBtn.setIcon(QIcon(leftIndicatorIcon)) - self.feed_kuksa.send_values(self.IC.leftIndicator, "false") + try: self.feed_kuksa.send_values(self.IC.leftIndicator, value) + except Exception as e: print(e) def rightIndicatorBtnClicked(self): + """ + Handles the right indicator button click event. + """ rightIndicatorIcon = QPixmap(":/Images/Images/right.png") - if self.rightIndicatorBtn.isChecked(): + painter = QPainter(rightIndicatorIcon) + painter.setCompositionMode(QPainter.CompositionMode_SourceIn) - painter = QPainter(rightIndicatorIcon) - painter.setCompositionMode(QPainter.CompositionMode_SourceIn) - painter.fillRect(rightIndicatorIcon.rect(), QtCore.Qt.green) - painter.end() - self.rightIndicatorBtn.setIcon(QIcon(rightIndicatorIcon)) - self.feed_kuksa.send_values(self.IC.rightIndicator, "true") + if self.rightIndicatorBtn.isChecked(): + color = QtCore.Qt.green + value = "true" else: + color = QtCore.Qt.black + value = "false" + + painter.fillRect(rightIndicatorIcon.rect(), color) + painter.end() + self.rightIndicatorBtn.setIcon(QIcon(rightIndicatorIcon)) + + try: + self.feed_kuksa.send_values(self.IC.rightIndicator, value) + except Exception as e: + print(e) - painter = QPainter(rightIndicatorIcon) - painter.setCompositionMode(QPainter.CompositionMode_SourceIn) - painter.fillRect(rightIndicatorIcon.rect(), QtCore.Qt.black) - painter.end() - self.rightIndicatorBtn.setIcon(QIcon(rightIndicatorIcon)) - self.feed_kuksa.send_values(self.IC.rightIndicator, "false") def accelerationBtnPressed(self): + """ + Handles the acceleration button press event. + """ self.startTime = QtCore.QTime.currentTime() self.acceleration_timer = QtCore.QTimer() self.acceleration_timer.timeout.connect( @@ -227,6 +265,14 @@ class ICWidget(Base, Form): lambda: self.updateSpeedAndEngineRpm("Decelerate")) self.acceleration_timer.start(100) + def handle_Script_toggle(self): + if self.Script_toggle.isChecked(): + self.set_Vehicle_RPM(1000) + self.set_Vehicle_Speed(0) + self.vehicle_simulator.start() + else: + self.vehicle_simulator.stop() + def updateSpeedAndEngineRpm(self, action, acceleration=(60/5)): if action == "Accelerate": pass @@ -257,33 +303,26 @@ class ICWidget(Base, Form): self.update_RPM_monitor() def driveBtnClicked(self): - # // Selected Gear output = > 0 = Neutral, 1/2/.. = Forward, -1/.. = Reverse, 126 = Park, 127 = Drive - # #859287 ; /* light green */ - - if self.driveGroupBtns.checkedButton() == self.driveBtn: + gear_mapping = { + self.driveBtn: "127", + self.parkBtn: "126", + self.reverseBtn: "-1", + self.neutralBtn: "0" + } + + checked_button = self.driveGroupBtns.checkedButton() + + if checked_button in gear_mapping: + gear_value = gear_mapping[checked_button] self.accelerationBtn.setEnabled(True) - self.Speed_slider.setEnabled(True) - self.RPM_slider.setEnabled(True) - self.feed_kuksa.send_values(self.IC.selectedGear, "127") - - if self.driveGroupBtns.checkedButton() == self.parkBtn: - self.accelerationBtn.setEnabled(False) - self.Speed_slider.setEnabled(False) - self.RPM_slider.setEnabled(False) - self.feed_kuksa.send_values(self.IC.selectedGear, "126") - - if self.driveGroupBtns.checkedButton() == self.reverseBtn: - self.accelerationBtn.setEnabled(True) - self.Speed_slider.setEnabled(True) - self.RPM_slider.setEnabled(True) - self.feed_kuksa.send_values(self.IC.selectedGear, "-1") - - if self.driveGroupBtns.checkedButton() == self.neutralBtn: - self.accelerationBtn.setEnabled(False) - self.Speed_slider.setEnabled(False) + self.Speed_slider.setEnabled(checked_button != self.neutralBtn) self.RPM_slider.setEnabled(True) - self.feed_kuksa.send_values(self.IC.selectedGear, "0") - + try: + self.feed_kuksa.send_values(self.IC.selectedGear, gear_value) + except Exception as e: + print(e) + else: + print("Unknown button checked!") class AccelerationFns(): def calculate_speed(time, acceleration) -> int: @@ -314,37 +353,105 @@ class AccelerationFns(): engine_rpm = wheel_rps * gear_ratios[current_gear - 1] * 60 return int(engine_rpm) + +class VehicleSimulator(QObject): + # Define signals for updating speed and rpm + speed_changed = pyqtSignal(int) + rpm_changed = pyqtSignal(int) + DEFAULT_IDLE_RPM = 1000 -class ICScript(ICWidget): - def start_script(self): - ICWidget.reset() - - # disable all widgets in the scroll area - for widget in ICWidget.scrollAreaWidgetContents.children(): - widget.setEnabled(False) - - rates = [(60/5), (60/4), (60/3)] + def __init__(self): + super().__init__() + self.freq = 10 + self.vehicle_speed = 0 + self.engine_speed = self.DEFAULT_IDLE_RPM + self.running = False + self.lock = threading.Lock() + self.thread = threading.Thread(target=self.run, daemon=True) + + def start(self): + if not self.running: + self.reset() + self.running = True + self.thread.start() + + def stop(self): + self.running = False + + def reset(self): + with self.lock: + self.vehicle_speed = 0 + self.engine_speed = self.DEFAULT_IDLE_RPM + + def run(self): + while self.running: + if not self.running: + break - # start assigning values to the speed and rpm sliders and send them to the IC do this in a loop for each rate - for rate in rates: - ICWidget.accelerationBtnPressed() - ICWidget.acceleration_timer.timeout.connect( - lambda: ICWidget.updateSpeedAndEngineRpm("Accelerate"), rate) - ICWidget.acceleration_timer.start(100) - time.sleep(5) - ICWidget.accelerationBtnReleased() - ICWidget.acceleration_timer.timeout.connect( - lambda: ICWidget.updateSpeedAndEngineRpm("Decelerate"), rate) - ICWidget.acceleration_timer.start(100) + # Simulate acceleration and update speed and rpm + self.accelerate(60, 1800, 3) + self.accelerate(65, 1700, 1) + self.accelerate(80, 2500, 6) + self.accelerate(100, 3000, 4) + self.brake(80, 3000, 3) + self.accelerate(104, 4000, 6) + self.brake(40, 2000, 4) + self.accelerate(90, 3000, 5) + self.brake(1, 650, 5) + + # Ensure reset is called when not in cruise mode + if not self.running: + self.reset() + time.sleep(5) - def stop_script(self): - ICWidget.reset() - - # enable all widgets in the scroll area - for widget in ICWidget.scrollAreaWidgetContents.children(): - widget.setEnabled(True) + def accelerate(self, target_speed, target_rpm, duration): + if target_speed <= self.vehicle_speed: + return + v = (target_speed - self.vehicle_speed) / (duration * self.freq) + r = (target_rpm - self.engine_speed) / (duration * self.freq) + while self.vehicle_speed < target_speed and self.running: + with self.lock: + self.vehicle_speed += v + self.engine_speed += r + self.speed_changed.emit(int(self.vehicle_speed)) + self.rpm_changed.emit(int(self.engine_speed)) + time.sleep(1 / self.freq) + + def brake(self, target_speed, target_rpm, duration): + if target_speed >= self.vehicle_speed: + return + v = (self.vehicle_speed - target_speed) / (duration * self.freq) + r = (self.engine_speed - target_rpm) / (duration * self.freq) + while self.vehicle_speed > target_speed and self.running: + with self.lock: + self.vehicle_speed -= v + self.engine_speed -= r + self.speed_changed.emit(int(self.vehicle_speed)) + self.rpm_changed.emit(int(self.engine_speed)) + time.sleep(1 / self.freq) + + def increase(self, bycruise = True): + if self.CRUISEACTIVE: + target_speed = self.vehicle_speed + 5 + target_rpm = self.engine_speed * 1.1 + self.accelerate(target_speed, target_rpm, 2, bycruise) + + def decrease(self, bycruise = True): + if self.CRUISEACTIVE: + target_speed = self.vehicle_speed - 5 + target_rpm = self.engine_speed * 0.9 + self.brake(target_speed, target_rpm, 2, bycruise) + + def resume(self, bycruise = True): + target_speed = self.CRUISESPEED + target_rpm = self.CRUISERPM + current_speed = self.get_vehicle_speed() + if target_speed > current_speed: + self.accelerate(target_speed, target_rpm, 2, bycruise) + else: + self.brake(target_speed, target_rpm, 2, bycruise) if __name__ == '__main__': app = QApplication(sys.argv) |