aboutsummaryrefslogtreecommitdiffstats
path: root/Scripts/record_playback.py
diff options
context:
space:
mode:
authorSuchinton <suchinton.2001@gmail.com>2024-07-01 00:10:54 +0530
committerSuchinton <suchinton.2001@gmail.com>2024-07-15 07:52:42 +0530
commitb0844193f37f477c9e7e509e0b4eaf221886192b (patch)
treea125cc5d3d90db067eb65399b1c4d812a0f0a4b1 /Scripts/record_playback.py
parent25d451d87046a1cfbf7ac3cd47c2303fd29a22c5 (diff)
Add Python Script to Convert CARLA data into CAN messagessalmon_18.90.0salmon/18.90.018.90.0
V1: - Add carla_to_CAN.py script to convert CARLA data into CAN messages - Add README and requirements.txt V2: - Add script to record and playback messages from can interface - Fix mappings to agl-vcar.dbc file V3: - Fix playback feature for record_playback.py - Update requirements.txt - Update README to explain setup and usage of Scripts with CARLA V4: - Add file playback feature to Demo Control Panel - Remove dependency on numpy to calculate vehicle speed, use math lib instead - record_playback.py can now be imported and also be used in standalone mode - Fix: Now data is sent to CAN interface only when it is updated - Fix: Delay is now based on previous timestamp and not the starting timestamp - Fix: Send correct Gear messages, compatible with the agl-vcar signals Bug-AGL: SPEC-5161 Change-Id: I18a14e8e6ac4d24e6ed8774402fb93a36dec274e Signed-off-by: Suchinton <suchinton.2001@gmail.com>
Diffstat (limited to 'Scripts/record_playback.py')
-rw-r--r--Scripts/record_playback.py141
1 files changed, 141 insertions, 0 deletions
diff --git a/Scripts/record_playback.py b/Scripts/record_playback.py
new file mode 100644
index 0000000..e518356
--- /dev/null
+++ b/Scripts/record_playback.py
@@ -0,0 +1,141 @@
+# Copyright (C) 2024 Suchinton Chakravarty
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import can
+import time
+from rich.console import Console
+import os
+import argparse
+
+class CAN_playback:
+ def __init__(self, interface='vcan0'):
+ """
+ Initialize the CAN Tool with the specified interface.
+
+ Args:
+ interface (str): The CAN interface name (default: 'vcan0')
+ """
+ self.console_mode = False
+ self.interface = interface
+ self.bus = can.interface.Bus(interface='socketcan', channel=self.interface, bitrate=500000)
+ self.output_file = os.path.join(os.path.dirname(__file__), 'can_messages.txt')
+
+ def write_to_file(self, messages):
+ """
+ Write captured CAN messages to a file.
+
+ Args:
+ messages (list): List of can.Message objects to write
+ """
+ with open(self.output_file, 'w') as file:
+ for msg in messages:
+ file.write(f"{msg.timestamp},{msg.arbitration_id},{msg.dlc},{','.join(map(lambda b: f'0x{b:02x}', msg.data))}\n")
+
+ def playback_messages(self):
+ if os.path.exists(self.output_file):
+ #console.print("Replaying captured messages...")
+ messages = []
+ with open(self.output_file, 'r') as file:
+ for line in file:
+ parts = line.strip().split(',', 3) # Split into at most 4 parts
+ timestamp, arbitration_id, dlc, data_str = parts
+ # Extract the data bytes, removing '0x' and splitting by ','
+ data_bytes = [int(byte, 16) for byte in data_str.split(',') if byte]
+
+ msg = can.Message(
+ timestamp=float(timestamp),
+ arbitration_id=int(arbitration_id, 0),
+ dlc=int(dlc),
+ data=data_bytes
+ )
+ messages.append(msg)
+ self.replay_messages(messages)
+
+ def stop(self):
+ self._running = False
+ if self.bus is not None:
+ self.bus.shutdown()
+
+ def replay_messages(self, messages):
+ """
+ Replay CAN messages on the specified bus.
+
+ Args:
+ messages (list): List of can.Message objects to replay
+ """
+ self._running = True
+ start_time = messages[0].timestamp
+ for msg in messages:
+ delay = msg.timestamp - start_time
+ self.bus.send(msg)
+ time.sleep(delay)
+ start_time = msg.timestamp
+ if self._running == False: return
+
+ def capture_can_messages(self):
+ """
+ Capture CAN messages from the specified bus.
+
+ Returns:
+ list: List of captured can.Message objects
+ """
+ messages = []
+
+ if self.console_mode:
+ console = Console()
+ console.print(f"Capturing CAN messages on {self.interface}. Press Ctrl+C to stop.")
+
+
+ try:
+ while True:
+ message = self.bus.recv()
+ if message is not None:
+ messages.append(message)
+ except KeyboardInterrupt:
+ console.print("Capture stopped.")
+
+ return messages
+
+def main():
+ from rich.console import Console
+ from rich.prompt import Prompt
+
+ parser = argparse.ArgumentParser(description='CAN Message Capture and Replay Tool')
+ parser.add_argument('--interface', '-i', type=str, default='vcan0', help='Specify the CAN interface (default: vcan0)')
+ args = parser.parse_args()
+
+ # Initialize the CAN Tool with the specified interface
+ can_tool = CAN_playback(interface=args.interface)
+ can_tool.console_mode = True
+
+ console = Console()
+ while True:
+ console.print("\n[bold]CAN Message Capture and Replay[/bold]")
+ console.print("1. Capture CAN messages")
+ console.print("2. Replay captured messages")
+ console.print("3. Exit")
+
+ choice = Prompt.ask("Enter your choice", choices=['1', '2', '3'])
+
+ if choice == '1':
+ messages = can_tool.capture_can_messages()
+ console.print(f"Captured {len(messages)} messages.")
+ can_tool.write_to_file(messages)
+ console.print(f"CAN messages written to {can_tool.output_file}")
+ elif choice == '2':
+ if os.path.exists(can_tool.output_file):
+ console.print("Replaying captured messages...")
+ can_tool.playback_messages()
+ console.print("Replay completed.")
+ else:
+ console.print(f"No captured messages found in {can_tool.output_file}")
+
+
+
+ else:
+ console.print("Exiting...")
+ break
+
+if __name__ == "__main__":
+ main()