diff options
-rw-r--r-- | Widgets/ICPage.py | 124 | ||||
-rw-r--r-- | extras/VehicleSimulator.py | 197 |
2 files changed, 209 insertions, 112 deletions
diff --git a/Widgets/ICPage.py b/Widgets/ICPage.py index 33a1401..4a32211 100644 --- a/Widgets/ICPage.py +++ b/Widgets/ICPage.py @@ -4,10 +4,8 @@ # SPDX-License-Identifier: Apache-2.0 import os -import sys import logging -import threading -import time +import sys from PyQt5 import uic, QtCore, QtWidgets from PyQt5.QtWidgets import QApplication from PyQt5.QtGui import QIcon, QPixmap, QPainter @@ -15,6 +13,7 @@ from PyQt5.QtCore import QObject, pyqtSignal from PyQt5.QtWidgets import QWidget from qtwidgets import AnimatedToggle from extras.KuksaClient import KuksaClient +from extras.VehicleSimulator import VehicleSimulator current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -131,10 +130,11 @@ class ICWidget(Base, Form): """ speed = int(self.Speed_slider.value()) self.Speed_monitor.display(speed) - try: - self.kuksa_client.set(self.IC.speed, str(speed), 'value') - except Exception as e: - logging.error(f"Error sending values to kuksa {e}") + if not self.simulator_running: + try: + self.kuksa_client.set(self.IC.speed, str(speed), 'value') + except Exception as e: + logging.error(f"Error sending values to kuksa {e}") def update_RPM_monitor(self): """ @@ -142,10 +142,11 @@ class ICWidget(Base, Form): """ rpm = int(self.RPM_slider.value()) self.RPM_monitor.display(rpm) - try: - self.kuksa_client.set(self.IC.engineRPM, str(rpm), 'value') - except Exception as e: - logging.error(f"Error sending values to kuksa {e}") + if not self.simulator_running: + try: + self.kuksa_client.set(self.IC.engineRPM, str(rpm), 'value') + except Exception as e: + logging.error(f"Error sending values to kuksa {e}") def update_coolantTemp_monitor(self): """ @@ -366,107 +367,6 @@ class AccelerationFns(): return int(engine_rpm) -class VehicleSimulator(QObject): - # Define signals for updating speed and rpm - speed_changed = pyqtSignal(int) - rpm_changed = pyqtSignal(int) - - DEFAULT_IDLE_RPM = 1000 - - def __init__(self): - super().__init__() - self.freq = 10 - self.vehicle_speed = 0 - self.engine_speed = self.DEFAULT_IDLE_RPM - self.running = False - self.lock = threading.Lock() - self.thread = threading.Thread(target=self.run) - - def start(self): - if not self.running: - self.reset() - self.running = True - if not self.thread.is_alive(): - self.thread.start() - - def stop(self): - self.running = False - - def reset(self): - with self.lock: - self.vehicle_speed = 0 - self.engine_speed = self.DEFAULT_IDLE_RPM - - def run(self): - while self.running: - if not self.running: - break - - # Simulate acceleration and update speed and rpm - self.accelerate(60, 1800, 3) - self.accelerate(65, 1700, 1) - self.accelerate(80, 2500, 6) - self.accelerate(100, 3000, 4) - self.brake(80, 3000, 3) - self.accelerate(104, 4000, 6) - self.brake(40, 2000, 4) - self.accelerate(90, 3000, 5) - self.brake(1, 650, 5) - - # Ensure reset is called when not in cruise mode - if not self.running: - self.reset() - - time.sleep(5) - - def accelerate(self, target_speed, target_rpm, duration): - if target_speed <= self.vehicle_speed: - return - v = (target_speed - self.vehicle_speed) / (duration * self.freq) - r = (target_rpm - self.engine_speed) / (duration * self.freq) - while self.vehicle_speed < target_speed and self.running: - with self.lock: - self.vehicle_speed += v - self.engine_speed += r - self.speed_changed.emit(int(self.vehicle_speed)) - self.rpm_changed.emit(int(self.engine_speed)) - time.sleep(1 / self.freq) - - def brake(self, target_speed, target_rpm, duration): - if target_speed >= self.vehicle_speed: - return - v = (self.vehicle_speed - target_speed) / (duration * self.freq) - r = (self.engine_speed - target_rpm) / (duration * self.freq) - while self.vehicle_speed > target_speed and self.running: - with self.lock: - self.vehicle_speed -= v - self.engine_speed -= r - self.speed_changed.emit(int(self.vehicle_speed)) - self.rpm_changed.emit(int(self.engine_speed)) - time.sleep(1 / self.freq) - - def increase(self, bycruise=True): - if self.CRUISEACTIVE: - target_speed = self.vehicle_speed + 5 - target_rpm = self.engine_speed * 1.1 - self.accelerate(target_speed, target_rpm, 2, bycruise) - - def decrease(self, bycruise=True): - if self.CRUISEACTIVE: - target_speed = self.vehicle_speed - 5 - target_rpm = self.engine_speed * 0.9 - self.brake(target_speed, target_rpm, 2, bycruise) - - def resume(self, bycruise=True): - target_speed = self.CRUISESPEED - target_rpm = self.CRUISERPM - current_speed = self.get_vehicle_speed() - if target_speed > current_speed: - self.accelerate(target_speed, target_rpm, 2, bycruise) - else: - self.brake(target_speed, target_rpm, 2, bycruise) - - if __name__ == '__main__': app = QApplication(sys.argv) w = ICWidget() diff --git a/extras/VehicleSimulator.py b/extras/VehicleSimulator.py new file mode 100644 index 0000000..cf790d2 --- /dev/null +++ b/extras/VehicleSimulator.py @@ -0,0 +1,197 @@ +# Copyright (C) 2023 Suchinton Chakravarty +# Copyright (C) 2024 Konsulko Group +# +# SPDX-License-Identifier: Apache-2.0 + +import logging +import math +import random +import time +import threading +from PyQt5.QtCore import QObject, pyqtSignal +from extras.KuksaClient import KuksaClient + +class VehicleSimulator(QObject): + # Define signals for updating speed and rpm + speed_changed = pyqtSignal(int) + rpm_changed = pyqtSignal(int) + + DEFAULT_IDLE_RPM = 1000 + # NOTE: Highway by Nuremberg Messe + DEFAULT_STARTING_LAT = 49.416410 + DEFAULT_STARTING_LON = 11.110604 + + def __init__(self): + super().__init__() + self.running = False + self.lock = threading.Lock() + self.thread = None + self.kuksa_client = KuksaClient() + self.freq = 10 + self.vehicle_speed = 0 + self.engine_speed = self.DEFAULT_IDLE_RPM + self.latitude = self.DEFAULT_STARTING_LAT + self.longitude = self.DEFAULT_STARTING_LON + self.count = 0 + + random.seed() + + def start(self): + if not self.running: + self.kuksa_client.set_instance() + self.reset() + self.running = True + if not self.thread.is_alive(): + self.thread.start() + + def stop(self): + self.running = False + + def reset(self): + with self.lock: + self.vehicle_speed = 0 + self.engine_speed = self.DEFAULT_IDLE_RPM + self.latitude = self.DEFAULT_STARTING_LAT + self.longitude = self.DEFAULT_STARTING_LON + self.count = 0 + self.thread = threading.Thread(target=self.run) + + def run(self): + while self.running: + if not self.running: + break + + self.set_signal("Vehicle.Powertrain.Transmission.SelectedGear", 127) + + # Simulate acceleration and update speed and rpm + self.accelerate(60, 1800, 3) + self.accelerate(65, 1700, 1) + self.accelerate(80, 2500, 6) + self.accelerate(100, 3000, 4) + self.brake(80, 3000, 3) + self.accelerate(104, 4000, 6) + self.brake(40, 2000, 4) + self.accelerate(90, 3000, 5) + self.brake(1, 650, 5) + + self.set_signal("Vehicle.Powertrain.Transmission.SelectedGear", 126) + + # Ensure reset is called when not in cruise mode + if not self.running: + self.reset() + + time.sleep(5) + + def accelerate(self, target_speed, target_rpm, duration): + if target_speed <= self.vehicle_speed: + return + v = (target_speed - self.vehicle_speed) / (duration * self.freq) + r = (target_rpm - self.engine_speed) / (duration * self.freq) + while self.vehicle_speed < target_speed and self.running: + with self.lock: + self.vehicle_speed += v + self.engine_speed += r + self.speed_changed.emit(int(self.vehicle_speed)) + self.rpm_changed.emit(int(self.engine_speed)) + updates = {} + updates["Vehicle.Speed"] = self.vehicle_speed + updates["Vehicle.Powertrain.CombustionEngine.Speed"] = self.engine_speed + self.simulate_position(self.vehicle_speed, v, False, updates) + self.count = self.count + 1 + if self.count > 1: + self.set_signals(updates) + self.count = 0 + time.sleep(1 / self.freq) + + def brake(self, target_speed, target_rpm, duration): + if target_speed >= self.vehicle_speed: + return + v = (self.vehicle_speed - target_speed) / (duration * self.freq) + r = (self.engine_speed - target_rpm) / (duration * self.freq) + while self.vehicle_speed > target_speed and self.running: + with self.lock: + self.vehicle_speed -= v + self.engine_speed -= r + self.speed_changed.emit(int(self.vehicle_speed)) + self.rpm_changed.emit(int(self.engine_speed)) + updates = {} + updates["Vehicle.Speed"] = self.vehicle_speed + updates["Vehicle.Powertrain.CombustionEngine.Speed"] = self.engine_speed + self.simulate_position(self.vehicle_speed, v, True, updates) + self.count = self.count + 1 + if self.count > 1: + self.set_signals(updates) + self.count = 0 + time.sleep(1 / self.freq) + + def increase(self, bycruise=True): + if self.CRUISEACTIVE: + target_speed = self.vehicle_speed + 5 + target_rpm = self.engine_speed * 1.1 + self.accelerate(target_speed, target_rpm, 2, bycruise) + + def decrease(self, bycruise=True): + if self.CRUISEACTIVE: + target_speed = self.vehicle_speed - 5 + target_rpm = self.engine_speed * 0.9 + self.brake(target_speed, target_rpm, 2, bycruise) + + def resume(self, bycruise=True): + target_speed = self.CRUISESPEED + target_rpm = self.CRUISERPM + current_speed = self.get_vehicle_speed() + if target_speed > current_speed: + self.accelerate(target_speed, target_rpm, 2, bycruise) + else: + self.brake(target_speed, target_rpm, 2, bycruise) + + def set_signal(self, signal, value): + try: + self.kuksa_client.set(signal, value, 'value') + except Exception as e: + logging.error(f"Error sending value to kuksa {e}") + + def set_signals(self, values): + try: + self.kuksa_client.setValues(values) + except Exception as e: + logging.error(f"Error sending values to kuksa {e}") + + def simulate_position(self, speed_current, speed_delta, braking, updates): + # From https://stackoverflow.com/questions/1253499/simple-calculations-for-working-with-lat-lon-and-km-distance + latKmPerDegree = 110.574 + lonKmPerDegree = 111.320 * math.cos((self.latitude * math.pi) / (180)) + time_seconds = 1 / self.freq + self.latitude += (speed_current * 0.7 * time_seconds / 3600) / latKmPerDegree + self.longitude += (speed_current * 0.3 * time_seconds / 3600) / lonKmPerDegree + + accelX = (speed_delta * 1000 / 3600) * self.freq + throttlePosition = accelX / 6 * 100 + brakePosition = 0 + if throttlePosition > 100: + throttlePosition = 100 + if braking: + brakePosition = throttlePosition + throttlePosition = 0 + accelX = -accelX + accelY = random.randrange(-150, 150) / 100.0 + accelZ = random.randrange(-100, 100) / 100.0 + + pitch = random.randrange(-1000, 1000) / 300.0 + roll = random.randrange(-1000, 1000) / 300.0 + yaw = random.randrange(-1000, 1000) / 300.0 + + steeringAngle = random.randrange(-90, 90) + + updates["Vehicle.OBD.ThrottlePosition"] = throttlePosition + updates["Vehicle.Chassis.Brake.PedalPosition"] = brakePosition + updates["Vehicle.Chassis.SteeringWheel.Angle"] = steeringAngle + updates["Vehicle.CurrentLocation.Latitude"] = self.latitude + updates["Vehicle.CurrentLocation.Longitude"] = self.longitude + updates["Vehicle.Acceleration.Lateral"] = accelX + updates["Vehicle.Acceleration.Longitudinal"] = accelY + updates["Vehicle.Acceleration.Vertical"] = accelZ + updates["Vehicle.AngularVelocity.Pitch"] = pitch + updates["Vehicle.AngularVelocity.Roll"] = roll + updates["Vehicle.AngularVelocity.Yaw"] = yaw + |