diff options
Diffstat (limited to 'snips_inference_agl/intent_parser')
5 files changed, 1366 insertions, 0 deletions
diff --git a/snips_inference_agl/intent_parser/__init__.py b/snips_inference_agl/intent_parser/__init__.py new file mode 100644 index 0000000..1b0d446 --- /dev/null +++ b/snips_inference_agl/intent_parser/__init__.py @@ -0,0 +1,4 @@ +from .deterministic_intent_parser import DeterministicIntentParser +from .intent_parser import IntentParser +from .lookup_intent_parser import LookupIntentParser +from .probabilistic_intent_parser import ProbabilisticIntentParser diff --git a/snips_inference_agl/intent_parser/deterministic_intent_parser.py b/snips_inference_agl/intent_parser/deterministic_intent_parser.py new file mode 100644 index 0000000..845e59d --- /dev/null +++ b/snips_inference_agl/intent_parser/deterministic_intent_parser.py @@ -0,0 +1,518 @@ +from __future__ import unicode_literals + +import json +import logging +import re +from builtins import str +from collections import defaultdict +from pathlib import Path + +from future.utils import iteritems, itervalues + +from snips_inference_agl.common.dataset_utils import get_slot_name_mappings +from snips_inference_agl.common.log_utils import log_elapsed_time, log_result +from snips_inference_agl.common.utils import ( + check_persisted_path, deduplicate_overlapping_items, fitted_required, + json_string, ranges_overlap, regex_escape, + replace_entities_with_placeholders) +from snips_inference_agl.constants import ( + DATA, END, ENTITIES, ENTITY, + INTENTS, LANGUAGE, RES_INTENT, RES_INTENT_NAME, + RES_MATCH_RANGE, RES_SLOTS, RES_VALUE, SLOT_NAME, START, TEXT, UTTERANCES, + RES_PROBA) +from snips_inference_agl.dataset import validate_and_format_dataset +from snips_inference_agl.dataset.utils import get_stop_words_whitelist +from snips_inference_agl.entity_parser.builtin_entity_parser import is_builtin_entity +from snips_inference_agl.exceptions import IntentNotFoundError, LoadingError +from snips_inference_agl.intent_parser.intent_parser import IntentParser +from snips_inference_agl.pipeline.configs import DeterministicIntentParserConfig +from snips_inference_agl.preprocessing import normalize_token, tokenize, tokenize_light +from snips_inference_agl.resources import get_stop_words +from snips_inference_agl.result import (empty_result, extraction_result, + intent_classification_result, parsing_result, + unresolved_slot) + +WHITESPACE_PATTERN = r"\s*" + +logger = logging.getLogger(__name__) + + +@IntentParser.register("deterministic_intent_parser") +class DeterministicIntentParser(IntentParser): + """Intent parser using pattern matching in a deterministic manner + + This intent parser is very strict by nature, and tends to have a very good + precision but a low recall. For this reason, it is interesting to use it + first before potentially falling back to another parser. + """ + + config_type = DeterministicIntentParserConfig + + def __init__(self, config=None, **shared): + """The deterministic intent parser can be configured by passing a + :class:`.DeterministicIntentParserConfig`""" + super(DeterministicIntentParser, self).__init__(config, **shared) + self._language = None + self._slot_names_to_entities = None + self._group_names_to_slot_names = None + self._stop_words = None + self._stop_words_whitelist = None + self.slot_names_to_group_names = None + self.regexes_per_intent = None + self.entity_scopes = None + + @property + def language(self): + return self._language + + @language.setter + def language(self, value): + self._language = value + if value is None: + self._stop_words = None + else: + if self.config.ignore_stop_words: + self._stop_words = get_stop_words(self.resources) + else: + self._stop_words = set() + + @property + def slot_names_to_entities(self): + return self._slot_names_to_entities + + @slot_names_to_entities.setter + def slot_names_to_entities(self, value): + self._slot_names_to_entities = value + if value is None: + self.entity_scopes = None + else: + self.entity_scopes = { + intent: { + "builtin": {ent for ent in itervalues(slot_mapping) + if is_builtin_entity(ent)}, + "custom": {ent for ent in itervalues(slot_mapping) + if not is_builtin_entity(ent)} + } + for intent, slot_mapping in iteritems(value)} + + @property + def group_names_to_slot_names(self): + return self._group_names_to_slot_names + + @group_names_to_slot_names.setter + def group_names_to_slot_names(self, value): + self._group_names_to_slot_names = value + if value is not None: + self.slot_names_to_group_names = { + slot_name: group for group, slot_name in iteritems(value)} + + @property + def patterns(self): + """Dictionary of patterns per intent""" + if self.regexes_per_intent is not None: + return {i: [r.pattern for r in regex_list] for i, regex_list in + iteritems(self.regexes_per_intent)} + return None + + @patterns.setter + def patterns(self, value): + if value is not None: + self.regexes_per_intent = dict() + for intent, pattern_list in iteritems(value): + regexes = [re.compile(r"%s" % p, re.IGNORECASE) + for p in pattern_list] + self.regexes_per_intent[intent] = regexes + + @property + def fitted(self): + """Whether or not the intent parser has already been trained""" + return self.regexes_per_intent is not None + + @log_elapsed_time( + logger, logging.INFO, "Fitted deterministic parser in {elapsed_time}") + def fit(self, dataset, force_retrain=True): + """Fits the intent parser with a valid Snips dataset""" + logger.info("Fitting deterministic intent parser...") + dataset = validate_and_format_dataset(dataset) + self.load_resources_if_needed(dataset[LANGUAGE]) + self.fit_builtin_entity_parser_if_needed(dataset) + self.fit_custom_entity_parser_if_needed(dataset) + self.language = dataset[LANGUAGE] + self.regexes_per_intent = dict() + entity_placeholders = _get_entity_placeholders(dataset, self.language) + self.slot_names_to_entities = get_slot_name_mappings(dataset) + self.group_names_to_slot_names = _get_group_names_to_slot_names( + self.slot_names_to_entities) + self._stop_words_whitelist = get_stop_words_whitelist( + dataset, self._stop_words) + + # Do not use ambiguous patterns that appear in more than one intent + all_patterns = set() + ambiguous_patterns = set() + intent_patterns = dict() + for intent_name, intent in iteritems(dataset[INTENTS]): + patterns = self._generate_patterns(intent_name, intent[UTTERANCES], + entity_placeholders) + patterns = [p for p in patterns + if len(p) < self.config.max_pattern_length] + existing_patterns = {p for p in patterns if p in all_patterns} + ambiguous_patterns.update(existing_patterns) + all_patterns.update(set(patterns)) + intent_patterns[intent_name] = patterns + + for intent_name, patterns in iteritems(intent_patterns): + patterns = [p for p in patterns if p not in ambiguous_patterns] + patterns = patterns[:self.config.max_queries] + regexes = [re.compile(p, re.IGNORECASE) for p in patterns] + self.regexes_per_intent[intent_name] = regexes + return self + + @log_result( + logger, logging.DEBUG, "DeterministicIntentParser result -> {result}") + @log_elapsed_time(logger, logging.DEBUG, "Parsed in {elapsed_time}.") + @fitted_required + def parse(self, text, intents=None, top_n=None): + """Performs intent parsing on the provided *text* + + Intent and slots are extracted simultaneously through pattern matching + + Args: + text (str): input + intents (str or list of str): if provided, reduces the scope of + intent parsing to the provided list of intents + top_n (int, optional): when provided, this method will return a + list of at most top_n most likely intents, instead of a single + parsing result. + Note that the returned list can contain less than ``top_n`` + elements, for instance when the parameter ``intents`` is not + None, or when ``top_n`` is greater than the total number of + intents. + + Returns: + dict or list: the most likely intent(s) along with the extracted + slots. See :func:`.parsing_result` and :func:`.extraction_result` + for the output format. + + Raises: + NotTrained: when the intent parser is not fitted + """ + if top_n is None: + top_intents = self._parse_top_intents(text, top_n=1, + intents=intents) + if top_intents: + intent = top_intents[0][RES_INTENT] + slots = top_intents[0][RES_SLOTS] + if intent[RES_PROBA] <= 0.5: + # return None in case of ambiguity + return empty_result(text, probability=1.0) + return parsing_result(text, intent, slots) + return empty_result(text, probability=1.0) + return self._parse_top_intents(text, top_n=top_n, intents=intents) + + def _parse_top_intents(self, text, top_n, intents=None): + if isinstance(intents, str): + intents = {intents} + elif isinstance(intents, list): + intents = set(intents) + + if top_n < 1: + raise ValueError( + "top_n argument must be greater or equal to 1, but got: %s" + % top_n) + + def placeholder_fn(entity_name): + return _get_entity_name_placeholder(entity_name, self.language) + + results = [] + + for intent, entity_scope in iteritems(self.entity_scopes): + if intents is not None and intent not in intents: + continue + builtin_entities = self.builtin_entity_parser.parse( + text, scope=entity_scope["builtin"], use_cache=True) + custom_entities = self.custom_entity_parser.parse( + text, scope=entity_scope["custom"], use_cache=True) + all_entities = builtin_entities + custom_entities + mapping, processed_text = replace_entities_with_placeholders( + text, all_entities, placeholder_fn=placeholder_fn) + cleaned_text = self._preprocess_text(text, intent) + cleaned_processed_text = self._preprocess_text(processed_text, + intent) + for regex in self.regexes_per_intent[intent]: + res = self._get_matching_result(text, cleaned_text, regex, + intent) + if res is None and cleaned_text != cleaned_processed_text: + res = self._get_matching_result( + text, cleaned_processed_text, regex, intent, mapping) + + if res is not None: + results.append(res) + break + + # In some rare cases there can be multiple ambiguous intents + # In such cases, priority is given to results containing fewer slots + weights = [1.0 / (1.0 + len(res[RES_SLOTS])) for res in results] + total_weight = sum(weights) + + for res, weight in zip(results, weights): + res[RES_INTENT][RES_PROBA] = weight / total_weight + + results = sorted(results, key=lambda r: -r[RES_INTENT][RES_PROBA]) + + return results[:top_n] + + @fitted_required + def get_intents(self, text): + """Returns the list of intents ordered by decreasing probability + + The length of the returned list is exactly the number of intents in the + dataset + 1 for the None intent + """ + nb_intents = len(self.regexes_per_intent) + top_intents = [intent_result[RES_INTENT] for intent_result in + self._parse_top_intents(text, top_n=nb_intents)] + matched_intents = {res[RES_INTENT_NAME] for res in top_intents} + for intent in self.regexes_per_intent: + if intent not in matched_intents: + top_intents.append(intent_classification_result(intent, 0.0)) + + # The None intent is not included in the regex patterns and is thus + # never matched by the deterministic parser + top_intents.append(intent_classification_result(None, 0.0)) + return top_intents + + @fitted_required + def get_slots(self, text, intent): + """Extracts slots from a text input, with the knowledge of the intent + + Args: + text (str): input + intent (str): the intent which the input corresponds to + + Returns: + list: the list of extracted slots + + Raises: + IntentNotFoundError: When the intent was not part of the training + data + """ + if intent is None: + return [] + + if intent not in self.regexes_per_intent: + raise IntentNotFoundError(intent) + + slots = self.parse(text, intents=[intent])[RES_SLOTS] + if slots is None: + slots = [] + return slots + + def _get_intent_stop_words(self, intent): + whitelist = self._stop_words_whitelist.get(intent, set()) + return self._stop_words.difference(whitelist) + + def _preprocess_text(self, string, intent): + """Replaces stop words and characters that are tokenized out by + whitespaces""" + tokens = tokenize(string, self.language) + current_idx = 0 + cleaned_string = "" + stop_words = self._get_intent_stop_words(intent) + for token in tokens: + if stop_words and normalize_token(token) in stop_words: + token.value = "".join(" " for _ in range(len(token.value))) + prefix_length = token.start - current_idx + cleaned_string += "".join((" " for _ in range(prefix_length))) + cleaned_string += token.value + current_idx = token.end + suffix_length = len(string) - current_idx + cleaned_string += "".join((" " for _ in range(suffix_length))) + return cleaned_string + + def _get_matching_result(self, text, processed_text, regex, intent, + entities_ranges_mapping=None): + found_result = regex.match(processed_text) + if found_result is None: + return None + parsed_intent = intent_classification_result(intent_name=intent, + probability=1.0) + slots = [] + for group_name in found_result.groupdict(): + ref_group_name = group_name + if "_" in group_name: + ref_group_name = group_name.split("_")[0] + slot_name = self.group_names_to_slot_names[ref_group_name] + entity = self.slot_names_to_entities[intent][slot_name] + rng = (found_result.start(group_name), + found_result.end(group_name)) + if entities_ranges_mapping is not None: + if rng in entities_ranges_mapping: + rng = entities_ranges_mapping[rng] + else: + shift = _get_range_shift( + rng, entities_ranges_mapping) + rng = {START: rng[0] + shift, END: rng[1] + shift} + else: + rng = {START: rng[0], END: rng[1]} + value = text[rng[START]:rng[END]] + parsed_slot = unresolved_slot( + match_range=rng, value=value, entity=entity, + slot_name=slot_name) + slots.append(parsed_slot) + parsed_slots = _deduplicate_overlapping_slots(slots, self.language) + parsed_slots = sorted(parsed_slots, + key=lambda s: s[RES_MATCH_RANGE][START]) + return extraction_result(parsed_intent, parsed_slots) + + def _generate_patterns(self, intent, intent_utterances, + entity_placeholders): + unique_patterns = set() + patterns = [] + stop_words = self._get_intent_stop_words(intent) + for utterance in intent_utterances: + pattern = self._utterance_to_pattern( + utterance, stop_words, entity_placeholders) + if pattern not in unique_patterns: + unique_patterns.add(pattern) + patterns.append(pattern) + return patterns + + def _utterance_to_pattern(self, utterance, stop_words, + entity_placeholders): + from snips_nlu_utils import normalize + + slot_names_count = defaultdict(int) + pattern = [] + for chunk in utterance[DATA]: + if SLOT_NAME in chunk: + slot_name = chunk[SLOT_NAME] + slot_names_count[slot_name] += 1 + group_name = self.slot_names_to_group_names[slot_name] + count = slot_names_count[slot_name] + if count > 1: + group_name = "%s_%s" % (group_name, count) + placeholder = entity_placeholders[chunk[ENTITY]] + pattern.append(r"(?P<%s>%s)" % (group_name, placeholder)) + else: + tokens = tokenize_light(chunk[TEXT], self.language) + pattern += [regex_escape(t.lower()) for t in tokens + if normalize(t) not in stop_words] + + pattern = r"^%s%s%s$" % (WHITESPACE_PATTERN, + WHITESPACE_PATTERN.join(pattern), + WHITESPACE_PATTERN) + return pattern + + @check_persisted_path + def persist(self, path): + """Persists the object at the given path""" + path.mkdir() + parser_json = json_string(self.to_dict()) + parser_path = path / "intent_parser.json" + + with parser_path.open(mode="w", encoding="utf8") as f: + f.write(parser_json) + self.persist_metadata(path) + + @classmethod + def from_path(cls, path, **shared): + """Loads a :class:`DeterministicIntentParser` instance from a path + + The data at the given path must have been generated using + :func:`~DeterministicIntentParser.persist` + """ + path = Path(path) + model_path = path / "intent_parser.json" + if not model_path.exists(): + raise LoadingError( + "Missing deterministic intent parser metadata file: %s" + % model_path.name) + + with model_path.open(encoding="utf8") as f: + metadata = json.load(f) + return cls.from_dict(metadata, **shared) + + def to_dict(self): + """Returns a json-serializable dict""" + stop_words_whitelist = None + if self._stop_words_whitelist is not None: + stop_words_whitelist = { + intent: sorted(values) + for intent, values in iteritems(self._stop_words_whitelist)} + return { + "config": self.config.to_dict(), + "language_code": self.language, + "patterns": self.patterns, + "group_names_to_slot_names": self.group_names_to_slot_names, + "slot_names_to_entities": self.slot_names_to_entities, + "stop_words_whitelist": stop_words_whitelist + } + + @classmethod + def from_dict(cls, unit_dict, **shared): + """Creates a :class:`DeterministicIntentParser` instance from a dict + + The dict must have been generated with + :func:`~DeterministicIntentParser.to_dict` + """ + config = cls.config_type.from_dict(unit_dict["config"]) + parser = cls(config=config, **shared) + parser.patterns = unit_dict["patterns"] + parser.language = unit_dict["language_code"] + parser.group_names_to_slot_names = unit_dict[ + "group_names_to_slot_names"] + parser.slot_names_to_entities = unit_dict["slot_names_to_entities"] + if parser.fitted: + whitelist = unit_dict.get("stop_words_whitelist", dict()) + # pylint:disable=protected-access + parser._stop_words_whitelist = { + intent: set(values) for intent, values in iteritems(whitelist)} + # pylint:enable=protected-access + return parser + + +def _get_range_shift(matched_range, ranges_mapping): + shift = 0 + previous_replaced_range_end = None + matched_start = matched_range[0] + for replaced_range, orig_range in iteritems(ranges_mapping): + if replaced_range[1] <= matched_start: + if previous_replaced_range_end is None \ + or replaced_range[1] > previous_replaced_range_end: + previous_replaced_range_end = replaced_range[1] + shift = orig_range[END] - replaced_range[1] + return shift + + +def _get_group_names_to_slot_names(slot_names_mapping): + slot_names = {slot_name for mapping in itervalues(slot_names_mapping) + for slot_name in mapping} + return {"group%s" % i: name + for i, name in enumerate(sorted(slot_names))} + + +def _get_entity_placeholders(dataset, language): + return { + e: _get_entity_name_placeholder(e, language) + for e in dataset[ENTITIES] + } + + +def _deduplicate_overlapping_slots(slots, language): + def overlap(lhs_slot, rhs_slot): + return ranges_overlap(lhs_slot[RES_MATCH_RANGE], + rhs_slot[RES_MATCH_RANGE]) + + def sort_key_fn(slot): + tokens = tokenize(slot[RES_VALUE], language) + return -(len(tokens) + len(slot[RES_VALUE])) + + deduplicated_slots = deduplicate_overlapping_items( + slots, overlap, sort_key_fn) + return sorted(deduplicated_slots, + key=lambda slot: slot[RES_MATCH_RANGE][START]) + + +def _get_entity_name_placeholder(entity_label, language): + return "%%%s%%" % "".join( + tokenize_light(entity_label, language)).upper() diff --git a/snips_inference_agl/intent_parser/intent_parser.py b/snips_inference_agl/intent_parser/intent_parser.py new file mode 100644 index 0000000..b269774 --- /dev/null +++ b/snips_inference_agl/intent_parser/intent_parser.py @@ -0,0 +1,85 @@ +from abc import abstractmethod, ABCMeta + +from future.utils import with_metaclass + +from snips_inference_agl.common.abc_utils import classproperty +from snips_inference_agl.pipeline.processing_unit import ProcessingUnit + + +class IntentParser(with_metaclass(ABCMeta, ProcessingUnit)): + """Abstraction which performs intent parsing + + A custom intent parser must inherit this class to be used in a + :class:`.SnipsNLUEngine` + """ + + @classproperty + def unit_name(cls): # pylint:disable=no-self-argument + return IntentParser.registered_name(cls) + + @abstractmethod + def fit(self, dataset, force_retrain): + """Fit the intent parser with a valid Snips dataset + + Args: + dataset (dict): valid Snips NLU dataset + force_retrain (bool): specify whether or not sub units of the + intent parser that may be already trained should be retrained + """ + pass + + @abstractmethod + def parse(self, text, intents, top_n): + """Performs intent parsing on the provided *text* + + Args: + text (str): input + intents (str or list of str): if provided, reduces the scope of + intent parsing to the provided list of intents + top_n (int, optional): when provided, this method will return a + list of at most top_n most likely intents, instead of a single + parsing result. + Note that the returned list can contain less than ``top_n`` + elements, for instance when the parameter ``intents`` is not + None, or when ``top_n`` is greater than the total number of + intents. + + Returns: + dict or list: the most likely intent(s) along with the extracted + slots. See :func:`.parsing_result` and :func:`.extraction_result` + for the output format. + """ + pass + + @abstractmethod + def get_intents(self, text): + """Performs intent classification on the provided *text* and returns + the list of intents ordered by decreasing probability + + The length of the returned list is exactly the number of intents in the + dataset + 1 for the None intent + + .. note:: + + The probabilities returned along with each intent are not + guaranteed to sum to 1.0. They should be considered as scores + between 0 and 1. + """ + pass + + @abstractmethod + def get_slots(self, text, intent): + """Extract slots from a text input, with the knowledge of the intent + + Args: + text (str): input + intent (str): the intent which the input corresponds to + + Returns: + list: the list of extracted slots + + Raises: + IntentNotFoundError: when the intent was not part of the training + data + """ + pass diff --git a/snips_inference_agl/intent_parser/lookup_intent_parser.py b/snips_inference_agl/intent_parser/lookup_intent_parser.py new file mode 100644 index 0000000..921dcc5 --- /dev/null +++ b/snips_inference_agl/intent_parser/lookup_intent_parser.py @@ -0,0 +1,509 @@ +from __future__ import unicode_literals + +import json +import logging +from builtins import str +from collections import defaultdict +from itertools import combinations +from pathlib import Path + +from future.utils import iteritems, itervalues +from snips_nlu_utils import normalize, hash_str + +from snips_inference_agl.common.log_utils import log_elapsed_time, log_result +from snips_inference_agl.common.utils import ( + check_persisted_path, deduplicate_overlapping_entities, fitted_required, + json_string) +from snips_inference_agl.constants import ( + DATA, END, ENTITIES, ENTITY, ENTITY_KIND, INTENTS, LANGUAGE, RES_INTENT, + RES_INTENT_NAME, RES_MATCH_RANGE, RES_SLOTS, SLOT_NAME, START, TEXT, + UTTERANCES, RES_PROBA) +from snips_inference_agl.dataset import ( + validate_and_format_dataset, extract_intent_entities) +from snips_inference_agl.dataset.utils import get_stop_words_whitelist +from snips_inference_agl.entity_parser.builtin_entity_parser import is_builtin_entity +from snips_inference_agl.exceptions import IntentNotFoundError, LoadingError +from snips_inference_agl.intent_parser.intent_parser import IntentParser +from snips_inference_agl.pipeline.configs import LookupIntentParserConfig +from snips_inference_agl.preprocessing import tokenize_light +from snips_inference_agl.resources import get_stop_words +from snips_inference_agl.result import ( + empty_result, intent_classification_result, parsing_result, + unresolved_slot, extraction_result) + +logger = logging.getLogger(__name__) + + +@IntentParser.register("lookup_intent_parser") +class LookupIntentParser(IntentParser): + """A deterministic Intent parser implementation based on a dictionary + + This intent parser is very strict by nature, and tends to have a very good + precision but a low recall. For this reason, it is interesting to use it + first before potentially falling back to another parser. + """ + + config_type = LookupIntentParserConfig + + def __init__(self, config=None, **shared): + """The lookup intent parser can be configured by passing a + :class:`.LookupIntentParserConfig`""" + super(LookupIntentParser, self).__init__(config, **shared) + self._language = None + self._stop_words = None + self._stop_words_whitelist = None + self._map = None + self._intents_names = [] + self._slots_names = [] + self._intents_mapping = dict() + self._slots_mapping = dict() + self._entity_scopes = None + + @property + def language(self): + return self._language + + @language.setter + def language(self, value): + self._language = value + if value is None: + self._stop_words = None + else: + if self.config.ignore_stop_words: + self._stop_words = get_stop_words(self.resources) + else: + self._stop_words = set() + + @property + def fitted(self): + """Whether or not the intent parser has already been trained""" + return self._map is not None + + @log_elapsed_time( + logger, logging.INFO, "Fitted lookup intent parser in {elapsed_time}") + def fit(self, dataset, force_retrain=True): + """Fits the intent parser with a valid Snips dataset""" + logger.info("Fitting lookup intent parser...") + dataset = validate_and_format_dataset(dataset) + self.load_resources_if_needed(dataset[LANGUAGE]) + self.fit_builtin_entity_parser_if_needed(dataset) + self.fit_custom_entity_parser_if_needed(dataset) + self.language = dataset[LANGUAGE] + self._entity_scopes = _get_entity_scopes(dataset) + self._map = dict() + self._stop_words_whitelist = get_stop_words_whitelist( + dataset, self._stop_words) + entity_placeholders = _get_entity_placeholders(dataset, self.language) + + ambiguous_keys = set() + for (key, val) in self._generate_io_mapping(dataset[INTENTS], + entity_placeholders): + key = hash_str(key) + # handle key collisions -*- flag ambiguous entries -*- + if key in self._map and self._map[key] != val: + ambiguous_keys.add(key) + else: + self._map[key] = val + + # delete ambiguous keys + for key in ambiguous_keys: + self._map.pop(key) + + return self + + @log_result(logger, logging.DEBUG, "LookupIntentParser result -> {result}") + @log_elapsed_time(logger, logging.DEBUG, "Parsed in {elapsed_time}.") + @fitted_required + def parse(self, text, intents=None, top_n=None): + """Performs intent parsing on the provided *text* + + Intent and slots are extracted simultaneously through pattern matching + + Args: + text (str): input + intents (str or list of str): if provided, reduces the scope of + intent parsing to the provided list of intents + top_n (int, optional): when provided, this method will return a + list of at most top_n most likely intents, instead of a single + parsing result. + Note that the returned list can contain less than ``top_n`` + elements, for instance when the parameter ``intents`` is not + None, or when ``top_n`` is greater than the total number of + intents. + + Returns: + dict or list: the most likely intent(s) along with the extracted + slots. See :func:`.parsing_result` and :func:`.extraction_result` + for the output format. + + Raises: + NotTrained: when the intent parser is not fitted + """ + if top_n is None: + top_intents = self._parse_top_intents(text, top_n=1, + intents=intents) + if top_intents: + intent = top_intents[0][RES_INTENT] + slots = top_intents[0][RES_SLOTS] + if intent[RES_PROBA] <= 0.5: + # return None in case of ambiguity + return empty_result(text, probability=1.0) + return parsing_result(text, intent, slots) + return empty_result(text, probability=1.0) + return self._parse_top_intents(text, top_n=top_n, intents=intents) + + def _parse_top_intents(self, text, top_n, intents=None): + if isinstance(intents, str): + intents = {intents} + elif isinstance(intents, list): + intents = set(intents) + + if top_n < 1: + raise ValueError( + "top_n argument must be greater or equal to 1, but got: %s" + % top_n) + + results_per_intent = defaultdict(list) + for text_candidate, entities in self._get_candidates(text, intents): + val = self._map.get(hash_str(text_candidate)) + if val is not None: + result = self._parse_map_output(text, val, entities, intents) + if result: + intent_name = result[RES_INTENT][RES_INTENT_NAME] + results_per_intent[intent_name].append(result) + + results = [] + for intent_results in itervalues(results_per_intent): + sorted_results = sorted(intent_results, + key=lambda res: len(res[RES_SLOTS])) + results.append(sorted_results[0]) + + # In some rare cases there can be multiple ambiguous intents + # In such cases, priority is given to results containing fewer slots + weights = [1.0 / (1.0 + len(res[RES_SLOTS])) for res in results] + total_weight = sum(weights) + + for res, weight in zip(results, weights): + res[RES_INTENT][RES_PROBA] = weight / total_weight + + results = sorted(results, key=lambda r: -r[RES_INTENT][RES_PROBA]) + return results[:top_n] + + def _get_candidates(self, text, intents): + candidates = defaultdict(list) + for grouped_entity_scope in self._entity_scopes: + entity_scope = grouped_entity_scope["entity_scope"] + intent_group = grouped_entity_scope["intent_group"] + intent_group = [intent_ for intent_ in intent_group + if intents is None or intent_ in intents] + if not intent_group: + continue + + builtin_entities = self.builtin_entity_parser.parse( + text, scope=entity_scope["builtin"], use_cache=True) + custom_entities = self.custom_entity_parser.parse( + text, scope=entity_scope["custom"], use_cache=True) + all_entities = builtin_entities + custom_entities + all_entities = deduplicate_overlapping_entities(all_entities) + + # We generate all subsets of entities to match utterances + # containing ambivalent words which can be both entity values or + # random words + for entities in _get_entities_combinations(all_entities): + processed_text = self._replace_entities_with_placeholders( + text, entities) + for intent in intent_group: + cleaned_text = self._preprocess_text(text, intent) + cleaned_processed_text = self._preprocess_text( + processed_text, intent) + + raw_candidate = cleaned_text, [] + placeholder_candidate = cleaned_processed_text, entities + intent_candidates = [raw_candidate, placeholder_candidate] + for text_input, text_entities in intent_candidates: + if text_input not in candidates \ + or text_entities not in candidates[text_input]: + candidates[text_input].append(text_entities) + yield text_input, text_entities + + def _parse_map_output(self, text, output, entities, intents): + """Parse the map output to the parser's result format""" + intent_id, slot_ids = output + intent_name = self._intents_names[intent_id] + if intents is not None and intent_name not in intents: + return None + + parsed_intent = intent_classification_result( + intent_name=intent_name, probability=1.0) + slots = [] + # assert invariant + assert len(slot_ids) == len(entities) + for slot_id, entity in zip(slot_ids, entities): + slot_name = self._slots_names[slot_id] + rng_start = entity[RES_MATCH_RANGE][START] + rng_end = entity[RES_MATCH_RANGE][END] + slot_value = text[rng_start:rng_end] + entity_name = entity[ENTITY_KIND] + slot = unresolved_slot( + [rng_start, rng_end], slot_value, entity_name, slot_name) + slots.append(slot) + + return extraction_result(parsed_intent, slots) + + @fitted_required + def get_intents(self, text): + """Returns the list of intents ordered by decreasing probability + + The length of the returned list is exactly the number of intents in the + dataset + 1 for the None intent + """ + nb_intents = len(self._intents_names) + top_intents = [intent_result[RES_INTENT] for intent_result in + self._parse_top_intents(text, top_n=nb_intents)] + matched_intents = {res[RES_INTENT_NAME] for res in top_intents} + for intent in self._intents_names: + if intent not in matched_intents: + top_intents.append(intent_classification_result(intent, 0.0)) + + # The None intent is not included in the lookup table and is thus + # never matched by the lookup parser + top_intents.append(intent_classification_result(None, 0.0)) + return top_intents + + @fitted_required + def get_slots(self, text, intent): + """Extracts slots from a text input, with the knowledge of the intent + + Args: + text (str): input + intent (str): the intent which the input corresponds to + + Returns: + list: the list of extracted slots + + Raises: + IntentNotFoundError: When the intent was not part of the training + data + """ + if intent is None: + return [] + + if intent not in self._intents_names: + raise IntentNotFoundError(intent) + + slots = self.parse(text, intents=[intent])[RES_SLOTS] + if slots is None: + slots = [] + return slots + + def _get_intent_stop_words(self, intent): + whitelist = self._stop_words_whitelist.get(intent, set()) + return self._stop_words.difference(whitelist) + + def _get_intent_id(self, intent_name): + """generate a numeric id for an intent + + Args: + intent_name (str): intent name + + Returns: + int: numeric id + + """ + intent_id = self._intents_mapping.get(intent_name) + if intent_id is None: + intent_id = len(self._intents_names) + self._intents_names.append(intent_name) + self._intents_mapping[intent_name] = intent_id + + return intent_id + + def _get_slot_id(self, slot_name): + """generate a numeric id for a slot + + Args: + slot_name (str): intent name + + Returns: + int: numeric id + + """ + slot_id = self._slots_mapping.get(slot_name) + if slot_id is None: + slot_id = len(self._slots_names) + self._slots_names.append(slot_name) + self._slots_mapping[slot_name] = slot_id + + return slot_id + + def _preprocess_text(self, txt, intent): + """Replaces stop words and characters that are tokenized out by + whitespaces""" + stop_words = self._get_intent_stop_words(intent) + tokens = tokenize_light(txt, self.language) + cleaned_string = " ".join( + [tkn for tkn in tokens if normalize(tkn) not in stop_words]) + return cleaned_string.lower() + + def _generate_io_mapping(self, intents, entity_placeholders): + """Generate input-output pairs""" + for intent_name, intent in sorted(iteritems(intents)): + intent_id = self._get_intent_id(intent_name) + for entry in intent[UTTERANCES]: + yield self._build_io_mapping( + intent_id, entry, entity_placeholders) + + def _build_io_mapping(self, intent_id, utterance, entity_placeholders): + input_ = [] + output = [intent_id] + slots = [] + for chunk in utterance[DATA]: + if SLOT_NAME in chunk: + slot_name = chunk[SLOT_NAME] + slot_id = self._get_slot_id(slot_name) + entity_name = chunk[ENTITY] + placeholder = entity_placeholders[entity_name] + input_.append(placeholder) + slots.append(slot_id) + else: + input_.append(chunk[TEXT]) + output.append(slots) + + intent = self._intents_names[intent_id] + key = self._preprocess_text(" ".join(input_), intent) + + return key, output + + def _replace_entities_with_placeholders(self, text, entities): + if not entities: + return text + entities = sorted(entities, key=lambda e: e[RES_MATCH_RANGE][START]) + processed_text = "" + current_idx = 0 + for ent in entities: + start = ent[RES_MATCH_RANGE][START] + end = ent[RES_MATCH_RANGE][END] + processed_text += text[current_idx:start] + place_holder = _get_entity_name_placeholder( + ent[ENTITY_KIND], self.language) + processed_text += place_holder + current_idx = end + processed_text += text[current_idx:] + + return processed_text + + @check_persisted_path + def persist(self, path): + """Persists the object at the given path""" + path.mkdir() + parser_json = json_string(self.to_dict()) + parser_path = path / "intent_parser.json" + + with parser_path.open(mode="w", encoding="utf8") as pfile: + pfile.write(parser_json) + self.persist_metadata(path) + + @classmethod + def from_path(cls, path, **shared): + """Loads a :class:`LookupIntentParser` instance from a path + + The data at the given path must have been generated using + :func:`~LookupIntentParser.persist` + """ + path = Path(path) + model_path = path / "intent_parser.json" + if not model_path.exists(): + raise LoadingError( + "Missing lookup intent parser metadata file: %s" + % model_path.name) + + with model_path.open(encoding="utf8") as pfile: + metadata = json.load(pfile) + return cls.from_dict(metadata, **shared) + + def to_dict(self): + """Returns a json-serializable dict""" + stop_words_whitelist = None + if self._stop_words_whitelist is not None: + stop_words_whitelist = { + intent: sorted(values) + for intent, values in iteritems(self._stop_words_whitelist)} + return { + "config": self.config.to_dict(), + "language_code": self.language, + "map": self._map, + "slots_names": self._slots_names, + "intents_names": self._intents_names, + "entity_scopes": self._entity_scopes, + "stop_words_whitelist": stop_words_whitelist, + } + + @classmethod + def from_dict(cls, unit_dict, **shared): + """Creates a :class:`LookupIntentParser` instance from a dict + + The dict must have been generated with + :func:`~LookupIntentParser.to_dict` + """ + config = cls.config_type.from_dict(unit_dict["config"]) + parser = cls(config=config, **shared) + parser.language = unit_dict["language_code"] + # pylint:disable=protected-access + parser._map = _convert_dict_keys_to_int(unit_dict["map"]) + parser._slots_names = unit_dict["slots_names"] + parser._intents_names = unit_dict["intents_names"] + parser._entity_scopes = unit_dict["entity_scopes"] + if parser.fitted: + whitelist = unit_dict["stop_words_whitelist"] + parser._stop_words_whitelist = { + intent: set(values) for intent, values in iteritems(whitelist)} + # pylint:enable=protected-access + return parser + + +def _get_entity_scopes(dataset): + intent_entities = extract_intent_entities(dataset) + intent_groups = [] + entity_scopes = [] + for intent, entities in sorted(iteritems(intent_entities)): + scope = { + "builtin": list( + {ent for ent in entities if is_builtin_entity(ent)}), + "custom": list( + {ent for ent in entities if not is_builtin_entity(ent)}) + } + if scope in entity_scopes: + group_idx = entity_scopes.index(scope) + intent_groups[group_idx].append(intent) + else: + entity_scopes.append(scope) + intent_groups.append([intent]) + return [ + { + "intent_group": intent_group, + "entity_scope": entity_scope + } for intent_group, entity_scope in zip(intent_groups, entity_scopes) + ] + + +def _get_entity_placeholders(dataset, language): + return { + e: _get_entity_name_placeholder(e, language) for e in dataset[ENTITIES] + } + + +def _get_entity_name_placeholder(entity_label, language): + return "%%%s%%" % "".join(tokenize_light(entity_label, language)).upper() + + +def _convert_dict_keys_to_int(dct): + if isinstance(dct, dict): + return {int(k): v for k, v in iteritems(dct)} + return dct + + +def _get_entities_combinations(entities): + yield () + for nb_entities in reversed(range(1, len(entities) + 1)): + for combination in combinations(entities, nb_entities): + yield combination diff --git a/snips_inference_agl/intent_parser/probabilistic_intent_parser.py b/snips_inference_agl/intent_parser/probabilistic_intent_parser.py new file mode 100644 index 0000000..23e7829 --- /dev/null +++ b/snips_inference_agl/intent_parser/probabilistic_intent_parser.py @@ -0,0 +1,250 @@ +from __future__ import unicode_literals + +import json +import logging +from builtins import str +from copy import deepcopy +from datetime import datetime +from pathlib import Path + +from future.utils import iteritems, itervalues + +from snips_inference_agl.common.log_utils import log_elapsed_time, log_result +from snips_inference_agl.common.utils import ( + check_persisted_path, elapsed_since, fitted_required, json_string) +from snips_inference_agl.constants import INTENTS, RES_INTENT_NAME +from snips_inference_agl.dataset import validate_and_format_dataset +from snips_inference_agl.exceptions import IntentNotFoundError, LoadingError +from snips_inference_agl.intent_classifier import IntentClassifier +from snips_inference_agl.intent_parser.intent_parser import IntentParser +from snips_inference_agl.pipeline.configs import ProbabilisticIntentParserConfig +from snips_inference_agl.result import parsing_result, extraction_result +from snips_inference_agl.slot_filler import SlotFiller + +logger = logging.getLogger(__name__) + + +@IntentParser.register("probabilistic_intent_parser") +class ProbabilisticIntentParser(IntentParser): + """Intent parser which consists in two steps: intent classification then + slot filling""" + + config_type = ProbabilisticIntentParserConfig + + def __init__(self, config=None, **shared): + """The probabilistic intent parser can be configured by passing a + :class:`.ProbabilisticIntentParserConfig`""" + super(ProbabilisticIntentParser, self).__init__(config, **shared) + self.intent_classifier = None + self.slot_fillers = dict() + + @property + def fitted(self): + """Whether or not the intent parser has already been fitted""" + return self.intent_classifier is not None \ + and self.intent_classifier.fitted \ + and all(slot_filler is not None and slot_filler.fitted + for slot_filler in itervalues(self.slot_fillers)) + + @log_elapsed_time(logger, logging.INFO, + "Fitted probabilistic intent parser in {elapsed_time}") + # pylint:disable=arguments-differ + def fit(self, dataset, force_retrain=True): + """Fits the probabilistic intent parser + + Args: + dataset (dict): A valid Snips dataset + force_retrain (bool, optional): If *False*, will not retrain intent + classifier and slot fillers when they are already fitted. + Default to *True*. + + Returns: + :class:`ProbabilisticIntentParser`: The same instance, trained + """ + logger.info("Fitting probabilistic intent parser...") + dataset = validate_and_format_dataset(dataset) + intents = list(dataset[INTENTS]) + if self.intent_classifier is None: + self.intent_classifier = IntentClassifier.from_config( + self.config.intent_classifier_config, + builtin_entity_parser=self.builtin_entity_parser, + custom_entity_parser=self.custom_entity_parser, + resources=self.resources, + random_state=self.random_state, + ) + + if force_retrain or not self.intent_classifier.fitted: + self.intent_classifier.fit(dataset) + + if self.slot_fillers is None: + self.slot_fillers = dict() + slot_fillers_start = datetime.now() + for intent_name in intents: + # We need to copy the slot filler config as it may be mutated + if self.slot_fillers.get(intent_name) is None: + slot_filler_config = deepcopy(self.config.slot_filler_config) + self.slot_fillers[intent_name] = SlotFiller.from_config( + slot_filler_config, + builtin_entity_parser=self.builtin_entity_parser, + custom_entity_parser=self.custom_entity_parser, + resources=self.resources, + random_state=self.random_state, + ) + if force_retrain or not self.slot_fillers[intent_name].fitted: + self.slot_fillers[intent_name].fit(dataset, intent_name) + logger.debug("Fitted slot fillers in %s", + elapsed_since(slot_fillers_start)) + return self + + # pylint:enable=arguments-differ + + @log_result(logger, logging.DEBUG, + "ProbabilisticIntentParser result -> {result}") + @log_elapsed_time(logger, logging.DEBUG, + "ProbabilisticIntentParser parsed in {elapsed_time}") + @fitted_required + def parse(self, text, intents=None, top_n=None): + """Performs intent parsing on the provided *text* by first classifying + the intent and then using the correspond slot filler to extract slots + + Args: + text (str): input + intents (str or list of str): if provided, reduces the scope of + intent parsing to the provided list of intents + top_n (int, optional): when provided, this method will return a + list of at most top_n most likely intents, instead of a single + parsing result. + Note that the returned list can contain less than ``top_n`` + elements, for instance when the parameter ``intents`` is not + None, or when ``top_n`` is greater than the total number of + intents. + + Returns: + dict or list: the most likely intent(s) along with the extracted + slots. See :func:`.parsing_result` and :func:`.extraction_result` + for the output format. + + Raises: + NotTrained: when the intent parser is not fitted + """ + if isinstance(intents, str): + intents = {intents} + elif isinstance(intents, list): + intents = list(intents) + + if top_n is None: + intent_result = self.intent_classifier.get_intent(text, intents) + intent_name = intent_result[RES_INTENT_NAME] + if intent_name is not None: + slots = self.slot_fillers[intent_name].get_slots(text) + else: + slots = [] + return parsing_result(text, intent_result, slots) + + results = [] + intents_results = self.intent_classifier.get_intents(text) + for intent_result in intents_results[:top_n]: + intent_name = intent_result[RES_INTENT_NAME] + if intent_name is not None: + slots = self.slot_fillers[intent_name].get_slots(text) + else: + slots = [] + results.append(extraction_result(intent_result, slots)) + return results + + @fitted_required + def get_intents(self, text): + """Returns the list of intents ordered by decreasing probability + + The length of the returned list is exactly the number of intents in the + dataset + 1 for the None intent + """ + return self.intent_classifier.get_intents(text) + + @fitted_required + def get_slots(self, text, intent): + """Extracts slots from a text input, with the knowledge of the intent + + Args: + text (str): input + intent (str): the intent which the input corresponds to + + Returns: + list: the list of extracted slots + + Raises: + IntentNotFoundError: When the intent was not part of the training + data + """ + if intent is None: + return [] + + if intent not in self.slot_fillers: + raise IntentNotFoundError(intent) + return self.slot_fillers[intent].get_slots(text) + + @check_persisted_path + def persist(self, path): + """Persists the object at the given path""" + path.mkdir() + sorted_slot_fillers = sorted(iteritems(self.slot_fillers)) + slot_fillers = [] + for i, (intent, slot_filler) in enumerate(sorted_slot_fillers): + slot_filler_name = "slot_filler_%s" % i + slot_filler.persist(path / slot_filler_name) + slot_fillers.append({ + "intent": intent, + "slot_filler_name": slot_filler_name + }) + + if self.intent_classifier is not None: + self.intent_classifier.persist(path / "intent_classifier") + + model = { + "config": self.config.to_dict(), + "slot_fillers": slot_fillers + } + model_json = json_string(model) + model_path = path / "intent_parser.json" + with model_path.open(mode="w") as f: + f.write(model_json) + self.persist_metadata(path) + + @classmethod + def from_path(cls, path, **shared): + """Loads a :class:`ProbabilisticIntentParser` instance from a path + + The data at the given path must have been generated using + :func:`~ProbabilisticIntentParser.persist` + """ + path = Path(path) + model_path = path / "intent_parser.json" + if not model_path.exists(): + raise LoadingError( + "Missing probabilistic intent parser model file: %s" + % model_path.name) + + with model_path.open(encoding="utf8") as f: + model = json.load(f) + + config = cls.config_type.from_dict(model["config"]) + parser = cls(config=config, **shared) + classifier = None + intent_classifier_path = path / "intent_classifier" + if intent_classifier_path.exists(): + classifier_unit_name = config.intent_classifier_config.unit_name + classifier = IntentClassifier.load_from_path( + intent_classifier_path, classifier_unit_name, **shared) + + slot_fillers = dict() + slot_filler_unit_name = config.slot_filler_config.unit_name + for slot_filler_conf in model["slot_fillers"]: + intent = slot_filler_conf["intent"] + slot_filler_path = path / slot_filler_conf["slot_filler_name"] + slot_filler = SlotFiller.load_from_path( + slot_filler_path, slot_filler_unit_name, **shared) + slot_fillers[intent] = slot_filler + + parser.intent_classifier = classifier + parser.slot_fillers = slot_fillers + return parser |