aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Widgets/ICPage.py124
-rw-r--r--extras/VehicleSimulator.py197
2 files changed, 209 insertions, 112 deletions
diff --git a/Widgets/ICPage.py b/Widgets/ICPage.py
index 33a1401..4a32211 100644
--- a/Widgets/ICPage.py
+++ b/Widgets/ICPage.py
@@ -4,10 +4,8 @@
# SPDX-License-Identifier: Apache-2.0
import os
-import sys
import logging
-import threading
-import time
+import sys
from PyQt5 import uic, QtCore, QtWidgets
from PyQt5.QtWidgets import QApplication
from PyQt5.QtGui import QIcon, QPixmap, QPainter
@@ -15,6 +13,7 @@ from PyQt5.QtCore import QObject, pyqtSignal
from PyQt5.QtWidgets import QWidget
from qtwidgets import AnimatedToggle
from extras.KuksaClient import KuksaClient
+from extras.VehicleSimulator import VehicleSimulator
current_dir = os.path.dirname(os.path.abspath(__file__))
@@ -131,10 +130,11 @@ class ICWidget(Base, Form):
"""
speed = int(self.Speed_slider.value())
self.Speed_monitor.display(speed)
- try:
- self.kuksa_client.set(self.IC.speed, str(speed), 'value')
- except Exception as e:
- logging.error(f"Error sending values to kuksa {e}")
+ if not self.simulator_running:
+ try:
+ self.kuksa_client.set(self.IC.speed, str(speed), 'value')
+ except Exception as e:
+ logging.error(f"Error sending values to kuksa {e}")
def update_RPM_monitor(self):
"""
@@ -142,10 +142,11 @@ class ICWidget(Base, Form):
"""
rpm = int(self.RPM_slider.value())
self.RPM_monitor.display(rpm)
- try:
- self.kuksa_client.set(self.IC.engineRPM, str(rpm), 'value')
- except Exception as e:
- logging.error(f"Error sending values to kuksa {e}")
+ if not self.simulator_running:
+ try:
+ self.kuksa_client.set(self.IC.engineRPM, str(rpm), 'value')
+ except Exception as e:
+ logging.error(f"Error sending values to kuksa {e}")
def update_coolantTemp_monitor(self):
"""
@@ -366,107 +367,6 @@ class AccelerationFns():
return int(engine_rpm)
-class VehicleSimulator(QObject):
- # Define signals for updating speed and rpm
- speed_changed = pyqtSignal(int)
- rpm_changed = pyqtSignal(int)
-
- DEFAULT_IDLE_RPM = 1000
-
- def __init__(self):
- super().__init__()
- self.freq = 10
- self.vehicle_speed = 0
- self.engine_speed = self.DEFAULT_IDLE_RPM
- self.running = False
- self.lock = threading.Lock()
- self.thread = threading.Thread(target=self.run)
-
- def start(self):
- if not self.running:
- self.reset()
- self.running = True
- if not self.thread.is_alive():
- self.thread.start()
-
- def stop(self):
- self.running = False
-
- def reset(self):
- with self.lock:
- self.vehicle_speed = 0
- self.engine_speed = self.DEFAULT_IDLE_RPM
-
- def run(self):
- while self.running:
- if not self.running:
- break
-
- # Simulate acceleration and update speed and rpm
- self.accelerate(60, 1800, 3)
- self.accelerate(65, 1700, 1)
- self.accelerate(80, 2500, 6)
- self.accelerate(100, 3000, 4)
- self.brake(80, 3000, 3)
- self.accelerate(104, 4000, 6)
- self.brake(40, 2000, 4)
- self.accelerate(90, 3000, 5)
- self.brake(1, 650, 5)
-
- # Ensure reset is called when not in cruise mode
- if not self.running:
- self.reset()
-
- time.sleep(5)
-
- def accelerate(self, target_speed, target_rpm, duration):
- if target_speed <= self.vehicle_speed:
- return
- v = (target_speed - self.vehicle_speed) / (duration * self.freq)
- r = (target_rpm - self.engine_speed) / (duration * self.freq)
- while self.vehicle_speed < target_speed and self.running:
- with self.lock:
- self.vehicle_speed += v
- self.engine_speed += r
- self.speed_changed.emit(int(self.vehicle_speed))
- self.rpm_changed.emit(int(self.engine_speed))
- time.sleep(1 / self.freq)
-
- def brake(self, target_speed, target_rpm, duration):
- if target_speed >= self.vehicle_speed:
- return
- v = (self.vehicle_speed - target_speed) / (duration * self.freq)
- r = (self.engine_speed - target_rpm) / (duration * self.freq)
- while self.vehicle_speed > target_speed and self.running:
- with self.lock:
- self.vehicle_speed -= v
- self.engine_speed -= r
- self.speed_changed.emit(int(self.vehicle_speed))
- self.rpm_changed.emit(int(self.engine_speed))
- time.sleep(1 / self.freq)
-
- def increase(self, bycruise=True):
- if self.CRUISEACTIVE:
- target_speed = self.vehicle_speed + 5
- target_rpm = self.engine_speed * 1.1
- self.accelerate(target_speed, target_rpm, 2, bycruise)
-
- def decrease(self, bycruise=True):
- if self.CRUISEACTIVE:
- target_speed = self.vehicle_speed - 5
- target_rpm = self.engine_speed * 0.9
- self.brake(target_speed, target_rpm, 2, bycruise)
-
- def resume(self, bycruise=True):
- target_speed = self.CRUISESPEED
- target_rpm = self.CRUISERPM
- current_speed = self.get_vehicle_speed()
- if target_speed > current_speed:
- self.accelerate(target_speed, target_rpm, 2, bycruise)
- else:
- self.brake(target_speed, target_rpm, 2, bycruise)
-
-
if __name__ == '__main__':
app = QApplication(sys.argv)
w = ICWidget()
diff --git a/extras/VehicleSimulator.py b/extras/VehicleSimulator.py
new file mode 100644
index 0000000..cf790d2
--- /dev/null
+++ b/extras/VehicleSimulator.py
@@ -0,0 +1,197 @@
+# Copyright (C) 2023 Suchinton Chakravarty
+# Copyright (C) 2024 Konsulko Group
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import logging
+import math
+import random
+import time
+import threading
+from PyQt5.QtCore import QObject, pyqtSignal
+from extras.KuksaClient import KuksaClient
+
+class VehicleSimulator(QObject):
+ # Define signals for updating speed and rpm
+ speed_changed = pyqtSignal(int)
+ rpm_changed = pyqtSignal(int)
+
+ DEFAULT_IDLE_RPM = 1000
+ # NOTE: Highway by Nuremberg Messe
+ DEFAULT_STARTING_LAT = 49.416410
+ DEFAULT_STARTING_LON = 11.110604
+
+ def __init__(self):
+ super().__init__()
+ self.running = False
+ self.lock = threading.Lock()
+ self.thread = None
+ self.kuksa_client = KuksaClient()
+ self.freq = 10
+ self.vehicle_speed = 0
+ self.engine_speed = self.DEFAULT_IDLE_RPM
+ self.latitude = self.DEFAULT_STARTING_LAT
+ self.longitude = self.DEFAULT_STARTING_LON
+ self.count = 0
+
+ random.seed()
+
+ def start(self):
+ if not self.running:
+ self.kuksa_client.set_instance()
+ self.reset()
+ self.running = True
+ if not self.thread.is_alive():
+ self.thread.start()
+
+ def stop(self):
+ self.running = False
+
+ def reset(self):
+ with self.lock:
+ self.vehicle_speed = 0
+ self.engine_speed = self.DEFAULT_IDLE_RPM
+ self.latitude = self.DEFAULT_STARTING_LAT
+ self.longitude = self.DEFAULT_STARTING_LON
+ self.count = 0
+ self.thread = threading.Thread(target=self.run)
+
+ def run(self):
+ while self.running:
+ if not self.running:
+ break
+
+ self.set_signal("Vehicle.Powertrain.Transmission.SelectedGear", 127)
+
+ # Simulate acceleration and update speed and rpm
+ self.accelerate(60, 1800, 3)
+ self.accelerate(65, 1700, 1)
+ self.accelerate(80, 2500, 6)
+ self.accelerate(100, 3000, 4)
+ self.brake(80, 3000, 3)
+ self.accelerate(104, 4000, 6)
+ self.brake(40, 2000, 4)
+ self.accelerate(90, 3000, 5)
+ self.brake(1, 650, 5)
+
+ self.set_signal("Vehicle.Powertrain.Transmission.SelectedGear", 126)
+
+ # Ensure reset is called when not in cruise mode
+ if not self.running:
+ self.reset()
+
+ time.sleep(5)
+
+ def accelerate(self, target_speed, target_rpm, duration):
+ if target_speed <= self.vehicle_speed:
+ return
+ v = (target_speed - self.vehicle_speed) / (duration * self.freq)
+ r = (target_rpm - self.engine_speed) / (duration * self.freq)
+ while self.vehicle_speed < target_speed and self.running:
+ with self.lock:
+ self.vehicle_speed += v
+ self.engine_speed += r
+ self.speed_changed.emit(int(self.vehicle_speed))
+ self.rpm_changed.emit(int(self.engine_speed))
+ updates = {}
+ updates["Vehicle.Speed"] = self.vehicle_speed
+ updates["Vehicle.Powertrain.CombustionEngine.Speed"] = self.engine_speed
+ self.simulate_position(self.vehicle_speed, v, False, updates)
+ self.count = self.count + 1
+ if self.count > 1:
+ self.set_signals(updates)
+ self.count = 0
+ time.sleep(1 / self.freq)
+
+ def brake(self, target_speed, target_rpm, duration):
+ if target_speed >= self.vehicle_speed:
+ return
+ v = (self.vehicle_speed - target_speed) / (duration * self.freq)
+ r = (self.engine_speed - target_rpm) / (duration * self.freq)
+ while self.vehicle_speed > target_speed and self.running:
+ with self.lock:
+ self.vehicle_speed -= v
+ self.engine_speed -= r
+ self.speed_changed.emit(int(self.vehicle_speed))
+ self.rpm_changed.emit(int(self.engine_speed))
+ updates = {}
+ updates["Vehicle.Speed"] = self.vehicle_speed
+ updates["Vehicle.Powertrain.CombustionEngine.Speed"] = self.engine_speed
+ self.simulate_position(self.vehicle_speed, v, True, updates)
+ self.count = self.count + 1
+ if self.count > 1:
+ self.set_signals(updates)
+ self.count = 0
+ time.sleep(1 / self.freq)
+
+ def increase(self, bycruise=True):
+ if self.CRUISEACTIVE:
+ target_speed = self.vehicle_speed + 5
+ target_rpm = self.engine_speed * 1.1
+ self.accelerate(target_speed, target_rpm, 2, bycruise)
+
+ def decrease(self, bycruise=True):
+ if self.CRUISEACTIVE:
+ target_speed = self.vehicle_speed - 5
+ target_rpm = self.engine_speed * 0.9
+ self.brake(target_speed, target_rpm, 2, bycruise)
+
+ def resume(self, bycruise=True):
+ target_speed = self.CRUISESPEED
+ target_rpm = self.CRUISERPM
+ current_speed = self.get_vehicle_speed()
+ if target_speed > current_speed:
+ self.accelerate(target_speed, target_rpm, 2, bycruise)
+ else:
+ self.brake(target_speed, target_rpm, 2, bycruise)
+
+ def set_signal(self, signal, value):
+ try:
+ self.kuksa_client.set(signal, value, 'value')
+ except Exception as e:
+ logging.error(f"Error sending value to kuksa {e}")
+
+ def set_signals(self, values):
+ try:
+ self.kuksa_client.setValues(values)
+ except Exception as e:
+ logging.error(f"Error sending values to kuksa {e}")
+
+ def simulate_position(self, speed_current, speed_delta, braking, updates):
+ # From https://stackoverflow.com/questions/1253499/simple-calculations-for-working-with-lat-lon-and-km-distance
+ latKmPerDegree = 110.574
+ lonKmPerDegree = 111.320 * math.cos((self.latitude * math.pi) / (180))
+ time_seconds = 1 / self.freq
+ self.latitude += (speed_current * 0.7 * time_seconds / 3600) / latKmPerDegree
+ self.longitude += (speed_current * 0.3 * time_seconds / 3600) / lonKmPerDegree
+
+ accelX = (speed_delta * 1000 / 3600) * self.freq
+ throttlePosition = accelX / 6 * 100
+ brakePosition = 0
+ if throttlePosition > 100:
+ throttlePosition = 100
+ if braking:
+ brakePosition = throttlePosition
+ throttlePosition = 0
+ accelX = -accelX
+ accelY = random.randrange(-150, 150) / 100.0
+ accelZ = random.randrange(-100, 100) / 100.0
+
+ pitch = random.randrange(-1000, 1000) / 300.0
+ roll = random.randrange(-1000, 1000) / 300.0
+ yaw = random.randrange(-1000, 1000) / 300.0
+
+ steeringAngle = random.randrange(-90, 90)
+
+ updates["Vehicle.OBD.ThrottlePosition"] = throttlePosition
+ updates["Vehicle.Chassis.Brake.PedalPosition"] = brakePosition
+ updates["Vehicle.Chassis.SteeringWheel.Angle"] = steeringAngle
+ updates["Vehicle.CurrentLocation.Latitude"] = self.latitude
+ updates["Vehicle.CurrentLocation.Longitude"] = self.longitude
+ updates["Vehicle.Acceleration.Lateral"] = accelX
+ updates["Vehicle.Acceleration.Longitudinal"] = accelY
+ updates["Vehicle.Acceleration.Vertical"] = accelZ
+ updates["Vehicle.AngularVelocity.Pitch"] = pitch
+ updates["Vehicle.AngularVelocity.Roll"] = roll
+ updates["Vehicle.AngularVelocity.Yaw"] = yaw
+