aboutsummaryrefslogtreecommitdiffstats
path: root/Scripts/record_playback.py
diff options
context:
space:
mode:
Diffstat (limited to 'Scripts/record_playback.py')
-rw-r--r--Scripts/record_playback.py164
1 files changed, 114 insertions, 50 deletions
diff --git a/Scripts/record_playback.py b/Scripts/record_playback.py
index 5d3956e..0e631ad 100644
--- a/Scripts/record_playback.py
+++ b/Scripts/record_playback.py
@@ -7,79 +7,147 @@ import time
from rich.console import Console
import os
import argparse
+from PyQt6.QtCore import QThread, pyqtSignal
+import cantools
+import sys
+
+current_dir = os.path.dirname(os.path.abspath(__file__))
+sys.path.append(os.path.dirname(current_dir))
+
+from extras import config
+
+class CAN_playback(QThread):
+ finished = pyqtSignal()
+
+ speedUpdate = pyqtSignal(int)
+ gearUpdate = pyqtSignal(str)
+ engineSpeedUpdate = pyqtSignal(int)
+ throttleUpdate = pyqtSignal(int)
+ indicatorUpdate = pyqtSignal(str)
+
+ try:
+ playbackFile = config.get_playback_file()
+ except Exception as e:
+ print(e)
+ # playbackFile = os.path.join(current_dir, "can_messages.txt")
-class CAN_playback:
def __init__(self, interface='vcan0'):
- """
- Initialize the CAN Tool with the specified interface.
+ super(CAN_playback, self).__init__()
+ self._running = False
- Args:
- interface (str): The CAN interface name (default: 'vcan0')
- """
self.console_mode = False
self.interface = interface
self.bus = can.interface.Bus(interface='socketcan', channel=self.interface, bitrate=500000)
- self.output_file = os.path.join(os.path.dirname(__file__), 'can_messages.txt')
+ self.output_file = self.playbackFile
+
+ # Load the DBC file
+ self.db = cantools.database.load_file(os.path.join(os.path.dirname(__file__), 'agl-vcar.dbc'))
+
+ # Create a mapping of relevant messages
+ self.message_map = {
+ 'Vehicle_Status_1': self.decode_speed,
+ 'Transmission': self.decode_gear,
+ 'Vehicle_Status_2': self.decode_engine_speed,
+ 'Engine': self.decode_throttle,
+ 'Vehicle_Status_3': self.decode_indicators
+ }
def write_to_file(self, messages):
- """
- Write captured CAN messages to a file.
-
- Args:
- messages (list): List of can.Message objects to write
- """
with open(self.output_file, 'w') as file:
for msg in messages:
file.write(f"{msg.timestamp},{msg.arbitration_id},{msg.dlc},{','.join(map(lambda b: f'0x{b:02x}', msg.data))}\n")
def playback_messages(self):
if os.path.exists(self.output_file):
- #console.print("Replaying captured messages...")
- messages = []
- with open(self.output_file, 'r') as file:
- for line in file:
- parts = line.strip().split(',', 3) # Split into at most 4 parts
- timestamp, arbitration_id, dlc, data_str = parts
- # Extract the data bytes, removing '0x' and splitting by ','
- data_bytes = [int(byte, 16) for byte in data_str.split(',') if byte]
-
- msg = can.Message(
- timestamp=float(timestamp),
- arbitration_id=int(arbitration_id, 0),
- dlc=int(dlc),
- data=data_bytes
- )
- messages.append(msg)
- self.replay_messages(messages)
-
+ messages = []
+ with open(self.output_file, 'r') as file:
+ for line in file:
+ parts = line.strip().split(',', 3)
+ timestamp, arbitration_id, dlc, data_str = parts
+ data_bytes = [int(byte, 16) for byte in data_str.split(',') if byte]
+
+ msg = can.Message(
+ timestamp=float(timestamp),
+ arbitration_id=int(arbitration_id, 0),
+ dlc=int(dlc),
+ data=data_bytes
+ )
+ messages.append(msg)
+ self.replay_messages(messages)
+
def stop(self):
self._running = False
if self.bus is not None:
self.bus.shutdown()
+ self.finished.emit()
def replay_messages(self, messages):
- """
- Replay CAN messages on the specified bus.
-
- Args:
- messages (list): List of can.Message objects to replay
- """
self._running = True
start_time = messages[0].timestamp
for msg in messages:
delay = msg.timestamp - start_time
self.bus.send(msg)
+
+ # Decode and emit signals for relevant messages
+ if msg.arbitration_id == self.db.get_message_by_frame_id(msg.arbitration_id).frame_id:
+ message_name = self.db.get_message_by_frame_id(msg.arbitration_id).name
+ if message_name in self.message_map:
+ decoded_data = self.decode_message(message_name, msg.data)
+ self.message_map[message_name](decoded_data)
+
time.sleep(delay)
start_time = msg.timestamp
- if self._running == False: return
+ if not self._running:
+ return
+
+ def decode_message(self, message_name, data):
+ message = self.db.get_message_by_name(message_name)
+ return message.decode(data)
+
+ def decode_speed(self, decoded_data):
+ speed = int(decoded_data['PT_VehicleAvgSpeed'])
+ self.speedUpdate.emit(speed)
+
+ def decode_gear(self, decoded_data):
+ gear_value = int(decoded_data['Gear'])
+ if gear_value == 126:
+ gear = "P"
+ elif gear_value == 127:
+ gear = "D"
+ elif gear_value == -1 or gear_value == -2:
+ gear = "R"
+ elif gear_value == 0:
+ gear = "N"
+ else:
+ gear = f"{gear_value}"
+ self.gearUpdate.emit(gear)
+
+ def decode_engine_speed(self, decoded_data):
+ engine_speed = int(decoded_data['PT_EngineSpeed'])
+ self.engineSpeedUpdate.emit(engine_speed)
+
+ def decode_throttle(self, decoded_data):
+ throttle_position = int(decoded_data['ThrottlePosition'])
+ self.throttleUpdate.emit(throttle_position)
+
+ def decode_indicators(self, decoded_data):
+ if decoded_data['PT_HazardOn'] == 1:
+ self.indicatorUpdate.emit('HazardOn')
+ if decoded_data['PT_HazardOn'] == 0:
+ self.indicatorUpdate.emit('HazardOff')
+
+ if decoded_data['PT_LeftTurnOn'] == 1:
+ self.indicatorUpdate.emit('LeftBlinkerOn')
+ print("Left Blinker On")
+ if decoded_data['PT_LeftTurnOn'] == 0:
+ self.indicatorUpdate.emit('LeftBlinkerOff')
+
+ if decoded_data['PT_RightTurnOn'] == 1:
+ self.indicatorUpdate.emit('RightBlinkerOn')
+ if decoded_data['PT_RightTurnOn'] == 0:
+ self.indicatorUpdate.emit('RightBlinkerOff')
def capture_can_messages(self):
- """
- Capture CAN messages from the specified bus.
-
- Returns:
- list: List of captured can.Message objects
- """
messages = []
if self.console_mode:
@@ -101,17 +169,16 @@ def main():
from rich.console import Console
from rich.prompt import Prompt
- parser = argparse.ArgumentParser(description='CAN Message Capture and Replay Tool')
+ parser = argparse.ArgumentParser(description='CAN Message Capture and Playback Tool')
parser.add_argument('--interface', '-i', type=str, default='vcan0', help='Specify the CAN interface (default: vcan0)')
args = parser.parse_args()
- # Initialize the CAN Tool with the specified interface
can_tool = CAN_playback(interface=args.interface)
can_tool.console_mode = True
console = Console()
while True:
- console.print("\n[bold]CAN Message Capture and Replay[/bold]")
+ console.print("\n[bold]CAN Message Capture and Playback[/bold]")
console.print("1. Capture CAN messages")
console.print("2. Replay captured messages")
console.print("3. Exit")
@@ -131,12 +198,9 @@ def main():
else:
console.print(f"No captured messages found in {can_tool.output_file}")
-
-
else:
console.print("Exiting...")
break
if __name__ == "__main__":
main()
-