aboutsummaryrefslogtreecommitdiffstats
path: root/agl_service_voiceagent/utils/mapper.py
diff options
context:
space:
mode:
Diffstat (limited to 'agl_service_voiceagent/utils/mapper.py')
-rw-r--r--agl_service_voiceagent/utils/mapper.py261
1 files changed, 261 insertions, 0 deletions
diff --git a/agl_service_voiceagent/utils/mapper.py b/agl_service_voiceagent/utils/mapper.py
new file mode 100644
index 0000000..7529645
--- /dev/null
+++ b/agl_service_voiceagent/utils/mapper.py
@@ -0,0 +1,261 @@
+# SPDX-License-Identifier: Apache-2.0
+#
+# Copyright (c) 2023 Malik Talha
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from agl_service_voiceagent.utils.config import get_config_value
+from agl_service_voiceagent.utils.common import load_json_file, words_to_number
+
+
+class Intent2VSSMapper:
+ """
+ Intent2VSSMapper is a class that facilitates the mapping of natural language intent to
+ corresponding vehicle signal specifications (VSS) for automated vehicle control systems.
+ """
+
+ def __init__(self):
+ """
+ Initializes the Intent2VSSMapper class by loading Intent-to-VSS signal mappings
+ and VSS signal specifications from external configuration files.
+ """
+ intents_vss_map_file = get_config_value("intents_vss_map", "Mapper")
+ vss_signals_spec_file = get_config_value("vss_signals_spec", "Mapper")
+ self.intents_vss_map = load_json_file(intents_vss_map_file).get("intents", {})
+ self.vss_signals_spec = load_json_file(vss_signals_spec_file).get("signals", {})
+
+ if not self.validate_signal_spec_structure():
+ raise ValueError("[-] Invalid VSS signal specification structure.")
+
+ def validate_signal_spec_structure(self):
+ """
+ Validates the structure of the VSS signal specification data.
+ """
+
+ signals = self.vss_signals_spec
+
+ # Iterate over each signal in the 'signals' dictionary
+ for signal_name, signal_data in signals.items():
+ # Check if the required keys are present in the signal data
+ if not all(key in signal_data for key in ['default_value', 'default_change_factor', 'actions', 'values', 'default_fallback', 'value_set_intents']):
+ print(f"[-] {signal_name}: Missing required keys in signal data.")
+ return False
+
+ actions = signal_data['actions']
+
+ # Check if 'actions' is a dictionary with at least one action
+ if not isinstance(actions, dict) or not actions:
+ print(f"[-] {signal_name}: Invalid 'actions' key in signal data. Must be an object with at least one action.")
+ return False
+
+ # Check if the actions match the allowed actions ["set", "increase", "decrease"]
+ for action in actions.keys():
+ if action not in ["set", "increase", "decrease"]:
+ print(f"[-] {signal_name}: Invalid action in signal data. Allowed actions: ['set', 'increase', 'decrease']")
+ return False
+
+ # Check if the 'synonyms' list is present for each action and is either a list or None
+ for action_data in actions.values():
+ synonyms = action_data.get('synonyms')
+ if synonyms is not None and (not isinstance(synonyms, list) or not all(isinstance(synonym, str) for synonym in synonyms)):
+ print(f"[-] {signal_name}: Invalid 'synonyms' value in signal data. Must be a list of strings.")
+ return False
+
+ values = signal_data['values']
+
+ # Check if 'values' is a dictionary with the required keys
+ if not isinstance(values, dict) or not all(key in values for key in ['ranged', 'start', 'end', 'ignore', 'additional']):
+ print(f"[-] {signal_name}: Invalid 'values' key in signal data. Required keys: ['ranged', 'start', 'end', 'ignore', 'additional']")
+ return False
+
+ # Check if 'ranged' is a boolean
+ if not isinstance(values['ranged'], bool):
+ print(f"[-] {signal_name}: Invalid 'ranged' value in signal data. Allowed values: [true, false]")
+ return False
+
+ default_fallback = signal_data['default_fallback']
+
+ # Check if 'default_fallback' is a boolean
+ if not isinstance(default_fallback, bool):
+ print(f"[-] {signal_name}: Invalid 'default_fallback' value in signal data. Allowed values: [true, false]")
+ return False
+
+ # If all checks pass, the self.vss_signals_spec structure is valid
+ return True
+
+
+ def map_intent_to_signal(self, intent_name):
+ """
+ Maps an intent name to the corresponding VSS signals and their specifications.
+
+ Args:
+ intent_name (str): The name of the intent to be mapped.
+
+ Returns:
+ dict: A dictionary containing VSS signals as keys and their specifications as values.
+ """
+
+ intent_data = self.intents_vss_map.get(intent_name, None)
+ result = {}
+ if intent_data:
+ signals = intent_data.get("signals", [])
+
+ for signal in signals:
+ signal_info = self.vss_signals_spec.get(signal, {})
+ if signal_info:
+ result.update({signal: signal_info})
+
+ return result
+
+
+ def parse_intent(self, intent_name, intent_slots = []):
+ """
+ Parses an intent, extracting relevant VSS signals, actions, modifiers, and values
+ based on the intent and its associated slots.
+
+ Args:
+ intent_name (str): The name of the intent to be parsed.
+ intent_slots (list): A list of dictionaries representing intent slots.
+
+ Returns:
+ list: A list of dictionaries describing actions and signal-related details for execution.
+
+ Note:
+ - If no relevant VSS signals are found for the intent, an empty list is returned.
+ - If no specific action or modifier is determined, default values are used.
+ """
+ vss_signal_data = self.map_intent_to_signal(intent_name)
+ execution_list = []
+ for signal_name, signal_data in vss_signal_data.items():
+ action = self.determine_action(signal_data, intent_slots)
+ modifier = self.determine_modifier(signal_data, intent_slots)
+ value = self.determine_value(signal_data, intent_slots)
+
+ if value != None and not self.verify_value(signal_data, value):
+ value = None
+
+ change_factor = signal_data["default_change_factor"]
+
+ if action in ["increase", "decrease"]:
+ if value and modifier == "to":
+ execution_list.append({"action": action, "signal": signal_name, "value": str(value)})
+
+ elif value and modifier == "by":
+ execution_list.append({"action": action, "signal": signal_name, "factor": str(value)})
+
+ elif value:
+ execution_list.append({"action": action, "signal": signal_name, "value": str(value)})
+
+ elif signal_data["default_fallback"]:
+ execution_list.append({"action": action, "signal": signal_name, "factor": str(change_factor)})
+
+ # if no value found set the default value
+ if value == None and signal_data["default_fallback"]:
+ value = signal_data["default_value"]
+
+ if action == "set" and value != None:
+ execution_list.append({"action": action, "signal": signal_name, "value": str(value)})
+
+
+ return execution_list
+
+
+ def determine_action(self, signal_data, intent_slots):
+ """
+ Determines the action (e.g., set, increase, decrease) based on the intent slots
+ and VSS signal data.
+
+ Args:
+ signal_data (dict): The specification data for a VSS signal.
+ intent_slots (list): A list of dictionaries representing intent slots.
+
+ Returns:
+ str: The determined action or None if no action can be determined.
+ """
+ action_res = None
+ for intent_slot in intent_slots:
+ for action, action_data in signal_data["actions"].items():
+ if intent_slot["name"] in action_data["intents"] and intent_slot["value"] in action_data["synonyms"]:
+ action_res = action
+ break
+
+ return action_res
+
+
+ def determine_modifier(self, signal_data, intent_slots):
+ """
+ Determines the modifier (e.g., 'to' or 'by') based on the intent slots
+ and VSS signal data.
+
+ Args:
+ signal_data (dict): The specification data for a VSS signal.
+ intent_slots (list): A list of dictionaries representing intent slots.
+
+ Returns:
+ str: The determined modifier or None if no modifier can be determined.
+ """
+ modifier_res = None
+ for intent_slot in intent_slots:
+ for _, action_data in signal_data["actions"].items():
+ intent_val = intent_slot["value"]
+ if "modifier_intents" in action_data and intent_slot["name"] in action_data["modifier_intents"] and ("to" in intent_val or "by" in intent_val):
+ modifier_res = "to" if "to" in intent_val else "by" if "by" in intent_val else None
+ break
+
+ return modifier_res
+
+
+ def determine_value(self, signal_data, intent_slots):
+ """
+ Determines the value associated with the intent slot, considering the data type
+ and converting it to a numeric string representation if necessary.
+
+ Args:
+ signal_data (dict): The specification data for a VSS signal.
+ intent_slots (list): A list of dictionaries representing intent slots.
+
+ Returns:
+ str: The determined value or None if no value can be determined.
+ """
+ result = None
+ for intent_slot in intent_slots:
+ for value, value_data in signal_data["value_set_intents"].items():
+ if intent_slot["name"] == value:
+ result = intent_slot["value"]
+
+ if value_data["datatype"] == "number":
+ result = words_to_number(result) # we assume our model will always return a number in words
+
+ # the value should always returned as str because Kuksa expects str values
+ return str(result) if result != None else None
+
+
+ def verify_value(self, signal_data, value):
+ """
+ Verifies that the value is valid based on the VSS signal data.
+
+ Args:
+ signal_data (dict): The specification data for a VSS signal.
+ value (str): The value to be verified.
+
+ Returns:
+ bool: True if the value is valid, False otherwise.
+ """
+ if value in signal_data["values"]["ignore"]:
+ return False
+
+ elif signal_data["values"]["ranged"] and isinstance(value, (int, float)):
+ return value >= signal_data["values"]["start"] and value <= signal_data["values"]["end"]
+
+ else:
+ return value in signal_data["values"]["additional"]