aboutsummaryrefslogtreecommitdiffstats
path: root/Scripts
diff options
context:
space:
mode:
authorSuchinton Chakravarty <suchinton.2001@gmail.com>2024-09-29 23:38:41 +0530
committerSuchinton Chakravarty <suchinton.2001@gmail.com>2024-10-10 18:31:58 +0530
commit554ec4cd07d68f4bcb569277881e368c450d993a (patch)
tree0f17a3498c3e7881157fee69a3e306ce100c7119 /Scripts
parent65d4619371979c8921ff155a6fe1d7de0e1d3598 (diff)
Update Carla Playback Mode
- Now file playback changes values on control panel - Added Config file path to ini file - Fixed signal mapping for Indicator and Hazard lights - Fixed crash of carla_to_CAN due to `Break` signal. - can_messages.txt is now stored in assets dir - Script Toggle shows error when can interface is not available - Added cantools as new dependency - Fixed default paths for can_messages playback file Bug-AGL: SPEC-5161 Change-Id: I7b51ff3db1238e0c8addc19152d24d4ce2c8574e Signed-off-by: Suchinton Chakravarty <suchinton.2001@gmail.com>
Diffstat (limited to 'Scripts')
-rw-r--r--Scripts/carla_to_CAN.py90
-rw-r--r--Scripts/record_playback.py164
2 files changed, 177 insertions, 77 deletions
diff --git a/Scripts/carla_to_CAN.py b/Scripts/carla_to_CAN.py
index 951d29b..4ca6e32 100644
--- a/Scripts/carla_to_CAN.py
+++ b/Scripts/carla_to_CAN.py
@@ -7,12 +7,13 @@ import math
import can
import cantools
import argparse
+import subprocess
+import sys
# ==============================================================================
# -- CAN -----------------------------------------------------------------------
# ==============================================================================
-
class CAN(object):
"""
Represents a Controller Area Network (CAN) interface for sending messages to a vehicle.
@@ -32,9 +33,9 @@ class CAN(object):
lights_cache (str): The cached indicator lights state.
"""
- def __init__(self):
+ def __init__(self, can_bus):
self.db = cantools.database.load_file('agl-vcar.dbc')
- self.can_bus = can.interface.Bus('vcan0', interface='socketcan')
+ self.can_bus = can_bus
self.speed_message = self.db.get_message_by_name('Vehicle_Status_1')
self.gear_message = self.db.get_message_by_name('Transmission')
@@ -51,6 +52,12 @@ class CAN(object):
self.engine_speed_cache = 0
self.gear_cache = "P"
self.lights_cache = None
+ self.indicator_signals = {
+ 'LeftBlinker': {'PT_LeftTurnOn': 1, 'PT_RightTurnOn': 0, 'PT_HazardOn': 0},
+ 'RightBlinker': {'PT_LeftTurnOn': 0, 'PT_RightTurnOn': 1, 'PT_HazardOn': 0},
+ 'carla.libcarla.VehicleLightState(48)': {'PT_LeftTurnOn': 0, 'PT_RightTurnOn': 0, 'PT_HazardOn': 1},
+ 'NONE': {'PT_LeftTurnOn': 0, 'PT_RightTurnOn': 0, 'PT_HazardOn': 0}
+ }
def send_car_speed(self, speed):
"""
@@ -122,21 +129,10 @@ class CAN(object):
Args:
indicator (str): The indicator lights state ('LeftBlinker', 'RightBlinker', 'HazardLights').
"""
- # Mapping indicator names to signal values
- indicators_mapping = {
- 'LeftBlinker': {'PT_LeftTurnOn': 1},
- 'RightBlinker': {'PT_RightTurnOn': 1},
- 'HazardLights': {'PT_HazardOn': 1}
- }
-
- # Default signal values
- signals = {'PT_HazardOn': 0, 'PT_LeftTurnOn': 0, 'PT_RightTurnOn': 0}
-
- # Update signals based on the indicator argument
- signals.update(indicators_mapping.get(indicator, {}))
+ signal = self.indicator_signals[str(indicator)]
# Encode and send the CAN message
- data = self.Vehicle_Status_3_message.encode(signals)
+ data = self.Vehicle_Status_3_message.encode(signal)
message = can.Message(
arbitration_id=self.Vehicle_Status_3_message.frame_id, data=data)
self.can_bus.send(message)
@@ -173,14 +169,45 @@ class CAN(object):
self.engine_speed_cache = rpm
if lights is not None and lights != self.lights_cache:
- self.send_indicator(lights)
- self.lights_cache = lights
+ try:
+ self.send_indicator(lights)
+ self.lights_cache = lights
+ except Exception as e:
+ print(f"Error sending indicator lights: {e}")
+def check_interface(interface):
+ """Check if the given interface is up."""
+ try:
+ # Use ip link show to check if the interface is up
+ subprocess.check_output(['ip', 'link', 'show', interface])
+ return True
+ except subprocess.CalledProcessError:
+ return False
+
+def get_default_interface():
+ """Get the default CAN interface from the config file."""
+ try:
+ # Import config from the parent directory
+ sys.path.append('../')
+ from extras import config
+ return config.get_can_interface()
+ except ImportError:
+ # If extras module is not found, return None
+ return None
+
+def create_can_bus(can_interface):
+ """Create and return a CAN bus object for the given can_interface."""
+ try:
+ return can.interface.Bus(channel=can_interface, interface='socketcan')
+ except OSError as e:
+ raise RuntimeError(f'Failed to open CAN interface "{can_interface}": {e}')
+
def main(host='127.0.0.1', port=2000):
parser = argparse.ArgumentParser(description='Carla to CAN Converter')
parser.add_argument('--host', default='127.0.0.1', help='IP of the host server')
parser.add_argument('--port', default=2000, type=int, help='TCP port to listen to')
+ parser.add_argument('--interface', required=False, help='CAN interface to use')
args = parser.parse_args()
client = carla.Client(args.host, args.port)
@@ -188,7 +215,18 @@ def main(host='127.0.0.1', port=2000):
world = client.get_world()
- can = CAN()
+ # Determine the CAN interfaces
+ can_interface = args.interface or get_default_interface() or 'vcan0'
+
+ print(f"CAN interface: {can_interface}")
+
+ # Check interfaces
+ if check_interface(can_interface):
+ can_bus = create_can_bus(can_interface)
+ else:
+ raise RuntimeError("No available CAN interface found. To setup vcan0, run `sudo ./vcan.sh`.")
+
+ can = CAN(can_bus)
player_vehicle = None
@@ -202,7 +240,6 @@ def main(host='127.0.0.1', port=2000):
return
try:
-
speed_kmh_cache = None
engine_rpm_cache = None
throttle_cache = None
@@ -228,7 +265,6 @@ def main(host='127.0.0.1', port=2000):
lights = player_vehicle.get_light_state()
- # if any values have changed, try to send the CAN message
if (speed_kmh != speed_kmh_cache or
engine_rpm != engine_rpm_cache or
control.throttle != throttle_cache or
@@ -240,18 +276,18 @@ def main(host='127.0.0.1', port=2000):
throttle_cache = control.throttle
gear_cache = gear
lights_cache = lights
-
- can.send_can_message(speed_kmh, engine_rpm,
+ try:
+ can.send_can_message(speed_kmh, engine_rpm,
control.throttle, gear, lights)
+ except Exception as e:
+ print(f"New error: {e}")
except Exception as e:
- print(
- f"An error occurred: {e}. The CARLA simulation might have stopped.")
+ print(f"An error occurred: {e}. The CARLA simulation might have stopped.")
finally:
- if can.can_bus is not None:
+ if hasattr(can, 'can_bus') and can.can_bus is not None:
can.can_bus.shutdown()
print("CAN bus properly shut down.")
-
if __name__ == "__main__":
main()
diff --git a/Scripts/record_playback.py b/Scripts/record_playback.py
index 5d3956e..0e631ad 100644
--- a/Scripts/record_playback.py
+++ b/Scripts/record_playback.py
@@ -7,79 +7,147 @@ import time
from rich.console import Console
import os
import argparse
+from PyQt6.QtCore import QThread, pyqtSignal
+import cantools
+import sys
+
+current_dir = os.path.dirname(os.path.abspath(__file__))
+sys.path.append(os.path.dirname(current_dir))
+
+from extras import config
+
+class CAN_playback(QThread):
+ finished = pyqtSignal()
+
+ speedUpdate = pyqtSignal(int)
+ gearUpdate = pyqtSignal(str)
+ engineSpeedUpdate = pyqtSignal(int)
+ throttleUpdate = pyqtSignal(int)
+ indicatorUpdate = pyqtSignal(str)
+
+ try:
+ playbackFile = config.get_playback_file()
+ except Exception as e:
+ print(e)
+ # playbackFile = os.path.join(current_dir, "can_messages.txt")
-class CAN_playback:
def __init__(self, interface='vcan0'):
- """
- Initialize the CAN Tool with the specified interface.
+ super(CAN_playback, self).__init__()
+ self._running = False
- Args:
- interface (str): The CAN interface name (default: 'vcan0')
- """
self.console_mode = False
self.interface = interface
self.bus = can.interface.Bus(interface='socketcan', channel=self.interface, bitrate=500000)
- self.output_file = os.path.join(os.path.dirname(__file__), 'can_messages.txt')
+ self.output_file = self.playbackFile
+
+ # Load the DBC file
+ self.db = cantools.database.load_file(os.path.join(os.path.dirname(__file__), 'agl-vcar.dbc'))
+
+ # Create a mapping of relevant messages
+ self.message_map = {
+ 'Vehicle_Status_1': self.decode_speed,
+ 'Transmission': self.decode_gear,
+ 'Vehicle_Status_2': self.decode_engine_speed,
+ 'Engine': self.decode_throttle,
+ 'Vehicle_Status_3': self.decode_indicators
+ }
def write_to_file(self, messages):
- """
- Write captured CAN messages to a file.
-
- Args:
- messages (list): List of can.Message objects to write
- """
with open(self.output_file, 'w') as file:
for msg in messages:
file.write(f"{msg.timestamp},{msg.arbitration_id},{msg.dlc},{','.join(map(lambda b: f'0x{b:02x}', msg.data))}\n")
def playback_messages(self):
if os.path.exists(self.output_file):
- #console.print("Replaying captured messages...")
- messages = []
- with open(self.output_file, 'r') as file:
- for line in file:
- parts = line.strip().split(',', 3) # Split into at most 4 parts
- timestamp, arbitration_id, dlc, data_str = parts
- # Extract the data bytes, removing '0x' and splitting by ','
- data_bytes = [int(byte, 16) for byte in data_str.split(',') if byte]
-
- msg = can.Message(
- timestamp=float(timestamp),
- arbitration_id=int(arbitration_id, 0),
- dlc=int(dlc),
- data=data_bytes
- )
- messages.append(msg)
- self.replay_messages(messages)
-
+ messages = []
+ with open(self.output_file, 'r') as file:
+ for line in file:
+ parts = line.strip().split(',', 3)
+ timestamp, arbitration_id, dlc, data_str = parts
+ data_bytes = [int(byte, 16) for byte in data_str.split(',') if byte]
+
+ msg = can.Message(
+ timestamp=float(timestamp),
+ arbitration_id=int(arbitration_id, 0),
+ dlc=int(dlc),
+ data=data_bytes
+ )
+ messages.append(msg)
+ self.replay_messages(messages)
+
def stop(self):
self._running = False
if self.bus is not None:
self.bus.shutdown()
+ self.finished.emit()
def replay_messages(self, messages):
- """
- Replay CAN messages on the specified bus.
-
- Args:
- messages (list): List of can.Message objects to replay
- """
self._running = True
start_time = messages[0].timestamp
for msg in messages:
delay = msg.timestamp - start_time
self.bus.send(msg)
+
+ # Decode and emit signals for relevant messages
+ if msg.arbitration_id == self.db.get_message_by_frame_id(msg.arbitration_id).frame_id:
+ message_name = self.db.get_message_by_frame_id(msg.arbitration_id).name
+ if message_name in self.message_map:
+ decoded_data = self.decode_message(message_name, msg.data)
+ self.message_map[message_name](decoded_data)
+
time.sleep(delay)
start_time = msg.timestamp
- if self._running == False: return
+ if not self._running:
+ return
+
+ def decode_message(self, message_name, data):
+ message = self.db.get_message_by_name(message_name)
+ return message.decode(data)
+
+ def decode_speed(self, decoded_data):
+ speed = int(decoded_data['PT_VehicleAvgSpeed'])
+ self.speedUpdate.emit(speed)
+
+ def decode_gear(self, decoded_data):
+ gear_value = int(decoded_data['Gear'])
+ if gear_value == 126:
+ gear = "P"
+ elif gear_value == 127:
+ gear = "D"
+ elif gear_value == -1 or gear_value == -2:
+ gear = "R"
+ elif gear_value == 0:
+ gear = "N"
+ else:
+ gear = f"{gear_value}"
+ self.gearUpdate.emit(gear)
+
+ def decode_engine_speed(self, decoded_data):
+ engine_speed = int(decoded_data['PT_EngineSpeed'])
+ self.engineSpeedUpdate.emit(engine_speed)
+
+ def decode_throttle(self, decoded_data):
+ throttle_position = int(decoded_data['ThrottlePosition'])
+ self.throttleUpdate.emit(throttle_position)
+
+ def decode_indicators(self, decoded_data):
+ if decoded_data['PT_HazardOn'] == 1:
+ self.indicatorUpdate.emit('HazardOn')
+ if decoded_data['PT_HazardOn'] == 0:
+ self.indicatorUpdate.emit('HazardOff')
+
+ if decoded_data['PT_LeftTurnOn'] == 1:
+ self.indicatorUpdate.emit('LeftBlinkerOn')
+ print("Left Blinker On")
+ if decoded_data['PT_LeftTurnOn'] == 0:
+ self.indicatorUpdate.emit('LeftBlinkerOff')
+
+ if decoded_data['PT_RightTurnOn'] == 1:
+ self.indicatorUpdate.emit('RightBlinkerOn')
+ if decoded_data['PT_RightTurnOn'] == 0:
+ self.indicatorUpdate.emit('RightBlinkerOff')
def capture_can_messages(self):
- """
- Capture CAN messages from the specified bus.
-
- Returns:
- list: List of captured can.Message objects
- """
messages = []
if self.console_mode:
@@ -101,17 +169,16 @@ def main():
from rich.console import Console
from rich.prompt import Prompt
- parser = argparse.ArgumentParser(description='CAN Message Capture and Replay Tool')
+ parser = argparse.ArgumentParser(description='CAN Message Capture and Playback Tool')
parser.add_argument('--interface', '-i', type=str, default='vcan0', help='Specify the CAN interface (default: vcan0)')
args = parser.parse_args()
- # Initialize the CAN Tool with the specified interface
can_tool = CAN_playback(interface=args.interface)
can_tool.console_mode = True
console = Console()
while True:
- console.print("\n[bold]CAN Message Capture and Replay[/bold]")
+ console.print("\n[bold]CAN Message Capture and Playback[/bold]")
console.print("1. Capture CAN messages")
console.print("2. Replay captured messages")
console.print("3. Exit")
@@ -131,12 +198,9 @@ def main():
else:
console.print(f"No captured messages found in {can_tool.output_file}")
-
-
else:
console.print("Exiting...")
break
if __name__ == "__main__":
main()
-