diff options
Diffstat (limited to 'snips_inference_agl/intent_classifier')
5 files changed, 811 insertions, 0 deletions
diff --git a/snips_inference_agl/intent_classifier/__init__.py b/snips_inference_agl/intent_classifier/__init__.py new file mode 100644 index 0000000..89ccf95 --- /dev/null +++ b/snips_inference_agl/intent_classifier/__init__.py @@ -0,0 +1,3 @@ +from .intent_classifier import IntentClassifier +from .log_reg_classifier import LogRegIntentClassifier +from .featurizer import Featurizer, CooccurrenceVectorizer, TfidfVectorizer diff --git a/snips_inference_agl/intent_classifier/featurizer.py b/snips_inference_agl/intent_classifier/featurizer.py new file mode 100644 index 0000000..116837f --- /dev/null +++ b/snips_inference_agl/intent_classifier/featurizer.py @@ -0,0 +1,452 @@ +from __future__ import division, unicode_literals + +import json +from builtins import str, zip +from copy import deepcopy +from pathlib import Path + +from future.utils import iteritems + +from snips_inference_agl.common.utils import ( + fitted_required, replace_entities_with_placeholders) +from snips_inference_agl.constants import ( + DATA, ENTITY, ENTITY_KIND, NGRAM, TEXT) +from snips_inference_agl.dataset import get_text_from_chunks +from snips_inference_agl.entity_parser.builtin_entity_parser import ( + is_builtin_entity) +from snips_inference_agl.exceptions import (LoadingError) +from snips_inference_agl.languages import get_default_sep +from snips_inference_agl.pipeline.configs import FeaturizerConfig +from snips_inference_agl.pipeline.configs.intent_classifier import ( + CooccurrenceVectorizerConfig, TfidfVectorizerConfig) +from snips_inference_agl.pipeline.processing_unit import ProcessingUnit +from snips_inference_agl.preprocessing import stem, tokenize_light +from snips_inference_agl.resources import get_stop_words, get_word_cluster +from snips_inference_agl.slot_filler.features_utils import get_all_ngrams + + +@ProcessingUnit.register("featurizer") +class Featurizer(ProcessingUnit): + """Feature extractor for text classification relying on ngrams tfidf and + optionally word cooccurrences features""" + + config_type = FeaturizerConfig + + def __init__(self, config=None, **shared): + super(Featurizer, self).__init__(config, **shared) + self.language = None + self.tfidf_vectorizer = None + self.cooccurrence_vectorizer = None + + @property + def fitted(self): + if not self.tfidf_vectorizer or not self.tfidf_vectorizer.vocabulary: + return False + return True + + def transform(self, utterances): + import scipy.sparse as sp + + x = self.tfidf_vectorizer.transform(utterances) + if self.cooccurrence_vectorizer: + x_cooccurrence = self.cooccurrence_vectorizer.transform(utterances) + x = sp.hstack((x, x_cooccurrence)) + return x + + @classmethod + def from_path(cls, path, **shared): + path = Path(path) + + model_path = path / "featurizer.json" + if not model_path.exists(): + raise LoadingError("Missing featurizer model file: %s" + % model_path.name) + with model_path.open("r", encoding="utf-8") as f: + featurizer_dict = json.load(f) + + featurizer_config = featurizer_dict["config"] + featurizer = cls(featurizer_config, **shared) + + featurizer.language = featurizer_dict["language_code"] + + tfidf_vectorizer = featurizer_dict["tfidf_vectorizer"] + if tfidf_vectorizer: + vectorizer_path = path / featurizer_dict["tfidf_vectorizer"] + tfidf_vectorizer = TfidfVectorizer.from_path( + vectorizer_path, **shared) + featurizer.tfidf_vectorizer = tfidf_vectorizer + + cooccurrence_vectorizer = featurizer_dict["cooccurrence_vectorizer"] + if cooccurrence_vectorizer: + vectorizer_path = path / featurizer_dict["cooccurrence_vectorizer"] + cooccurrence_vectorizer = CooccurrenceVectorizer.from_path( + vectorizer_path, **shared) + featurizer.cooccurrence_vectorizer = cooccurrence_vectorizer + + return featurizer + + +@ProcessingUnit.register("tfidf_vectorizer") +class TfidfVectorizer(ProcessingUnit): + """Wrapper of the scikit-learn TfidfVectorizer""" + + config_type = TfidfVectorizerConfig + + def __init__(self, config=None, **shared): + super(TfidfVectorizer, self).__init__(config, **shared) + self._tfidf_vectorizer = None + self._language = None + self.builtin_entity_scope = None + + @property + def fitted(self): + return self._tfidf_vectorizer is not None and hasattr( + self._tfidf_vectorizer, "vocabulary_") + + @fitted_required + def transform(self, x): + """Featurizes the given utterances after enriching them with builtin + entities matches, custom entities matches and the potential word + clusters matches + + Args: + x (list of dict): list of utterances + + Returns: + :class:`.scipy.sparse.csr_matrix`: A sparse matrix X of shape + (len(x), len(self.vocabulary)) where X[i, j] contains tfdif of + the ngram of index j of the vocabulary in the utterance i + + Raises: + NotTrained: when the vectorizer is not fitted: + """ + utterances = [self._enrich_utterance(*data) + for data in zip(*self._preprocess(x))] + return self._tfidf_vectorizer.transform(utterances) + + def _preprocess(self, utterances): + normalized_utterances = deepcopy(utterances) + for u in normalized_utterances: + nb_chunks = len(u[DATA]) + for i, chunk in enumerate(u[DATA]): + chunk[TEXT] = _normalize_stem( + chunk[TEXT], self.language, self.resources, + self.config.use_stemming) + if i < nb_chunks - 1: + chunk[TEXT] += " " + + # Extract builtin entities on unormalized utterances + builtin_ents = [ + self.builtin_entity_parser.parse( + get_text_from_chunks(u[DATA]), + self.builtin_entity_scope, use_cache=True) + for u in utterances + ] + # Extract builtin entities on normalized utterances + custom_ents = [ + self.custom_entity_parser.parse( + get_text_from_chunks(u[DATA]), use_cache=True) + for u in normalized_utterances + ] + if self.config.word_clusters_name: + # Extract world clusters on unormalized utterances + original_utterances_text = [get_text_from_chunks(u[DATA]) + for u in utterances] + w_clusters = [ + _get_word_cluster_features( + tokenize_light(u.lower(), self.language), + self.config.word_clusters_name, + self.resources) + for u in original_utterances_text + ] + else: + w_clusters = [None for _ in normalized_utterances] + + return normalized_utterances, builtin_ents, custom_ents, w_clusters + + def _enrich_utterance(self, utterance, builtin_entities, custom_entities, + word_clusters): + custom_entities_features = [ + _entity_name_to_feature(e[ENTITY_KIND], self.language) + for e in custom_entities] + + builtin_entities_features = [ + _builtin_entity_to_feature(ent[ENTITY_KIND], self.language) + for ent in builtin_entities + ] + + # We remove values of builtin slots from the utterance to avoid + # learning specific samples such as '42' or 'tomorrow' + filtered_tokens = [ + chunk[TEXT] for chunk in utterance[DATA] + if ENTITY not in chunk or not is_builtin_entity(chunk[ENTITY]) + ] + + features = get_default_sep(self.language).join(filtered_tokens) + + if builtin_entities_features: + features += " " + " ".join(sorted(builtin_entities_features)) + if custom_entities_features: + features += " " + " ".join(sorted(custom_entities_features)) + if word_clusters: + features += " " + " ".join(sorted(word_clusters)) + + return features + + @property + def language(self): + # Create this getter to prevent the language from being set elsewhere + # than in the fit + return self._language + + @property + def vocabulary(self): + if self._tfidf_vectorizer and hasattr( + self._tfidf_vectorizer, "vocabulary_"): + return self._tfidf_vectorizer.vocabulary_ + return None + + @property + def idf_diag(self): + if self._tfidf_vectorizer and hasattr( + self._tfidf_vectorizer, "vocabulary_"): + return self._tfidf_vectorizer.idf_ + return None + + @classmethod + # pylint: disable=W0212 + def from_path(cls, path, **shared): + import numpy as np + import scipy.sparse as sp + from sklearn.feature_extraction.text import ( + TfidfTransformer, TfidfVectorizer as SklearnTfidfVectorizer) + + path = Path(path) + + model_path = path / "vectorizer.json" + if not model_path.exists(): + raise LoadingError("Missing vectorizer model file: %s" + % model_path.name) + with model_path.open("r", encoding="utf-8") as f: + vectorizer_dict = json.load(f) + + vectorizer = cls(vectorizer_dict["config"], **shared) + vectorizer._language = vectorizer_dict["language_code"] + + builtin_entity_scope = vectorizer_dict["builtin_entity_scope"] + if builtin_entity_scope is not None: + builtin_entity_scope = set(builtin_entity_scope) + vectorizer.builtin_entity_scope = builtin_entity_scope + + vectorizer_ = vectorizer_dict["vectorizer"] + if vectorizer_: + vocab = vectorizer_["vocab"] + idf_diag_data = vectorizer_["idf_diag"] + idf_diag_data = np.array(idf_diag_data) + + idf_diag_shape = (len(idf_diag_data), len(idf_diag_data)) + row = list(range(idf_diag_shape[0])) + col = list(range(idf_diag_shape[0])) + idf_diag = sp.csr_matrix( + (idf_diag_data, (row, col)), shape=idf_diag_shape) + + tfidf_transformer = TfidfTransformer() + tfidf_transformer._idf_diag = idf_diag + + vectorizer_ = SklearnTfidfVectorizer( + tokenizer=lambda x: tokenize_light(x, vectorizer._language)) + vectorizer_.vocabulary_ = vocab + + vectorizer_._tfidf = tfidf_transformer + + vectorizer._tfidf_vectorizer = vectorizer_ + return vectorizer + + +@ProcessingUnit.register("cooccurrence_vectorizer") +class CooccurrenceVectorizer(ProcessingUnit): + """Featurizer that takes utterances and extracts ordered word cooccurrence + features matrix from them""" + + config_type = CooccurrenceVectorizerConfig + + def __init__(self, config=None, **shared): + super(CooccurrenceVectorizer, self).__init__(config, **shared) + self._word_pairs = None + self._language = None + self.builtin_entity_scope = None + + @property + def language(self): + # Create this getter to prevent the language from being set elsewhere + # than in the fit + return self._language + + @property + def word_pairs(self): + return self._word_pairs + + @property + def fitted(self): + """Whether or not the vectorizer is fitted""" + return self.word_pairs is not None + + @fitted_required + def transform(self, x): + """Computes the cooccurrence feature matrix. + + Args: + x (list of dict): list of utterances + + Returns: + :class:`.scipy.sparse.csr_matrix`: A sparse matrix X of shape + (len(x), len(self.word_pairs)) where X[i, j] = 1.0 if + x[i][0] contains the words cooccurrence (w1, w2) and if + self.word_pairs[(w1, w2)] = j + + Raises: + NotTrained: when the vectorizer is not fitted + """ + import numpy as np + import scipy.sparse as sp + + preprocessed = self._preprocess(x) + utterances = [ + self._enrich_utterance(utterance, builtin_ents, custom_ent) + for utterance, builtin_ents, custom_ent in zip(*preprocessed)] + + x_coo = sp.dok_matrix((len(x), len(self.word_pairs)), dtype=np.int32) + for i, u in enumerate(utterances): + for p in self._extract_word_pairs(u): + if p in self.word_pairs: + x_coo[i, self.word_pairs[p]] = 1 + + return x_coo.tocsr() + + def _preprocess(self, x): + # Extract all entities on unnormalized data + builtin_ents = [ + self.builtin_entity_parser.parse( + get_text_from_chunks(u[DATA]), + self.builtin_entity_scope, + use_cache=True + ) for u in x + ] + custom_ents = [ + self.custom_entity_parser.parse( + get_text_from_chunks(u[DATA]), use_cache=True) + for u in x + ] + return x, builtin_ents, custom_ents + + def _extract_word_pairs(self, utterance): + if self.config.filter_stop_words: + stop_words = get_stop_words(self.resources) + utterance = [t for t in utterance if t not in stop_words] + pairs = set() + for j, w1 in enumerate(utterance): + max_index = None + if self.config.window_size is not None: + max_index = j + self.config.window_size + 1 + for w2 in utterance[j + 1:max_index]: + key = (w1, w2) + if not self.config.keep_order: + key = tuple(sorted(key)) + pairs.add(key) + return pairs + + def _enrich_utterance(self, x, builtin_ents, custom_ents): + utterance = get_text_from_chunks(x[DATA]) + all_entities = builtin_ents + custom_ents + placeholder_fn = self._placeholder_fn + # Replace entities with placeholders + enriched_utterance = replace_entities_with_placeholders( + utterance, all_entities, placeholder_fn)[1] + # Tokenize + enriched_utterance = tokenize_light(enriched_utterance, self.language) + # Remove the unknownword strings if needed + if self.config.unknown_words_replacement_string: + enriched_utterance = [ + t for t in enriched_utterance + if t != self.config.unknown_words_replacement_string + ] + return enriched_utterance + + def _extract_word_pairs(self, utterance): + if self.config.filter_stop_words: + stop_words = get_stop_words(self.resources) + utterance = [t for t in utterance if t not in stop_words] + pairs = set() + for j, w1 in enumerate(utterance): + max_index = None + if self.config.window_size is not None: + max_index = j + self.config.window_size + 1 + for w2 in utterance[j + 1:max_index]: + key = (w1, w2) + if not self.config.keep_order: + key = tuple(sorted(key)) + pairs.add(key) + return pairs + + def _placeholder_fn(self, entity_name): + return "".join( + tokenize_light(str(entity_name), str(self.language))).upper() + + @classmethod + # pylint: disable=protected-access + def from_path(cls, path, **shared): + path = Path(path) + model_path = path / "vectorizer.json" + if not model_path.exists(): + raise LoadingError("Missing vectorizer model file: %s" + % model_path.name) + + with model_path.open(encoding="utf8") as f: + vectorizer_dict = json.load(f) + config = vectorizer_dict.pop("config") + + self = cls(config, **shared) + self._language = vectorizer_dict["language_code"] + self._word_pairs = None + + builtin_entity_scope = vectorizer_dict["builtin_entity_scope"] + if builtin_entity_scope is not None: + builtin_entity_scope = set(builtin_entity_scope) + self.builtin_entity_scope = builtin_entity_scope + + if vectorizer_dict["word_pairs"]: + self._word_pairs = { + tuple(p): int(i) + for i, p in iteritems(vectorizer_dict["word_pairs"]) + } + return self + +def _entity_name_to_feature(entity_name, language): + return "entityfeature%s" % "".join(tokenize_light( + entity_name.lower(), language)) + + +def _builtin_entity_to_feature(builtin_entity_label, language): + return "builtinentityfeature%s" % "".join(tokenize_light( + builtin_entity_label.lower(), language)) + + +def _normalize_stem(text, language, resources, use_stemming): + from snips_nlu_utils import normalize + + if use_stemming: + return stem(text, language, resources) + return normalize(text) + + +def _get_word_cluster_features(query_tokens, clusters_name, resources): + if not clusters_name: + return [] + ngrams = get_all_ngrams(query_tokens) + cluster_features = [] + for ngram in ngrams: + cluster = get_word_cluster(resources, clusters_name).get( + ngram[NGRAM].lower(), None) + if cluster is not None: + cluster_features.append(cluster) + return cluster_features diff --git a/snips_inference_agl/intent_classifier/intent_classifier.py b/snips_inference_agl/intent_classifier/intent_classifier.py new file mode 100644 index 0000000..f9a7952 --- /dev/null +++ b/snips_inference_agl/intent_classifier/intent_classifier.py @@ -0,0 +1,51 @@ +from abc import ABCMeta + +from future.utils import with_metaclass + +from snips_inference_agl.pipeline.processing_unit import ProcessingUnit +from snips_inference_agl.common.abc_utils import classproperty + + +class IntentClassifier(with_metaclass(ABCMeta, ProcessingUnit)): + """Abstraction which performs intent classification + + A custom intent classifier must inherit this class to be used in a + :class:`.ProbabilisticIntentParser` + """ + + @classproperty + def unit_name(cls): # pylint:disable=no-self-argument + return IntentClassifier.registered_name(cls) + + # @abstractmethod + def get_intent(self, text, intents_filter): + """Performs intent classification on the provided *text* + + Args: + text (str): Input + intents_filter (str or list of str): When defined, it will find + the most likely intent among the list, otherwise it will use + the whole list of intents defined in the dataset + + Returns: + dict or None: The most likely intent along with its probability or + *None* if no intent was found. See + :func:`.intent_classification_result` for the output format. + """ + pass + + # @abstractmethod + def get_intents(self, text): + """Performs intent classification on the provided *text* and returns + the list of intents ordered by decreasing probability + + The length of the returned list is exactly the number of intents in the + dataset + 1 for the None intent + + .. note:: + + The probabilities returned along with each intent are not + guaranteed to sum to 1.0. They should be considered as scores + between 0 and 1. + """ + pass diff --git a/snips_inference_agl/intent_classifier/log_reg_classifier.py b/snips_inference_agl/intent_classifier/log_reg_classifier.py new file mode 100644 index 0000000..09e537c --- /dev/null +++ b/snips_inference_agl/intent_classifier/log_reg_classifier.py @@ -0,0 +1,211 @@ +from __future__ import unicode_literals + +import json +import logging +from builtins import str, zip +from pathlib import Path + +from snips_inference_agl.common.log_utils import DifferedLoggingMessage +from snips_inference_agl.common.utils import (fitted_required) +from snips_inference_agl.constants import RES_PROBA +from snips_inference_agl.exceptions import LoadingError +from snips_inference_agl.intent_classifier.featurizer import Featurizer +from snips_inference_agl.intent_classifier.intent_classifier import IntentClassifier +from snips_inference_agl.intent_classifier.log_reg_classifier_utils import (text_to_utterance) +from snips_inference_agl.pipeline.configs import LogRegIntentClassifierConfig +from snips_inference_agl.result import intent_classification_result + +logger = logging.getLogger(__name__) + +# We set tol to 1e-3 to silence the following warning with Python 2 ( +# scikit-learn 0.20): +# +# FutureWarning: max_iter and tol parameters have been added in SGDClassifier +# in 0.19. If max_iter is set but tol is left unset, the default value for tol +# in 0.19 and 0.20 will be None (which is equivalent to -infinity, so it has no +# effect) but will change in 0.21 to 1e-3. Specify tol to silence this warning. + +LOG_REG_ARGS = { + "loss": "log", + "penalty": "l2", + "max_iter": 1000, + "tol": 1e-3, + "n_jobs": -1 +} + + +@IntentClassifier.register("log_reg_intent_classifier") +class LogRegIntentClassifier(IntentClassifier): + """Intent classifier which uses a Logistic Regression underneath""" + + config_type = LogRegIntentClassifierConfig + + def __init__(self, config=None, **shared): + """The LogReg intent classifier can be configured by passing a + :class:`.LogRegIntentClassifierConfig`""" + super(LogRegIntentClassifier, self).__init__(config, **shared) + self.classifier = None + self.intent_list = None + self.featurizer = None + + @property + def fitted(self): + """Whether or not the intent classifier has already been fitted""" + return self.intent_list is not None + + @fitted_required + def get_intent(self, text, intents_filter=None): + """Performs intent classification on the provided *text* + + Args: + text (str): Input + intents_filter (str or list of str): When defined, it will find + the most likely intent among the list, otherwise it will use + the whole list of intents defined in the dataset + + Returns: + dict or None: The most likely intent along with its probability or + *None* if no intent was found + + Raises: + :class:`snips_nlu.exceptions.NotTrained`: When the intent + classifier is not fitted + + """ + return self._get_intents(text, intents_filter)[0] + + @fitted_required + def get_intents(self, text): + """Performs intent classification on the provided *text* and returns + the list of intents ordered by decreasing probability + + The length of the returned list is exactly the number of intents in the + dataset + 1 for the None intent + + Raises: + :class:`snips_nlu.exceptions.NotTrained`: when the intent + classifier is not fitted + """ + return self._get_intents(text, intents_filter=None) + + def _get_intents(self, text, intents_filter): + if isinstance(intents_filter, str): + intents_filter = {intents_filter} + elif isinstance(intents_filter, list): + intents_filter = set(intents_filter) + + if not text or not self.intent_list or not self.featurizer: + results = [intent_classification_result(None, 1.0)] + results += [intent_classification_result(i, 0.0) + for i in self.intent_list if i is not None] + return results + + if len(self.intent_list) == 1: + return [intent_classification_result(self.intent_list[0], 1.0)] + + # pylint: disable=C0103 + X = self.featurizer.transform([text_to_utterance(text)]) + # pylint: enable=C0103 + proba_vec = self._predict_proba(X) + logger.debug( + "%s", DifferedLoggingMessage(self.log_activation_weights, text, X)) + results = [ + intent_classification_result(i, proba) + for i, proba in zip(self.intent_list, proba_vec[0]) + if intents_filter is None or i is None or i in intents_filter] + + return sorted(results, key=lambda res: -res[RES_PROBA]) + + def _predict_proba(self, X): # pylint: disable=C0103 + import numpy as np + + self.classifier._check_proba() # pylint: disable=W0212 + + prob = self.classifier.decision_function(X) + prob *= -1 + np.exp(prob, prob) + prob += 1 + np.reciprocal(prob, prob) + if prob.ndim == 1: + return np.vstack([1 - prob, prob]).T + return prob + + @classmethod + def from_path(cls, path, **shared): + """Loads a :class:`LogRegIntentClassifier` instance from a path + + The data at the given path must have been generated using + :func:`~LogRegIntentClassifier.persist` + """ + import numpy as np + from sklearn.linear_model import SGDClassifier + + path = Path(path) + model_path = path / "intent_classifier.json" + if not model_path.exists(): + raise LoadingError("Missing intent classifier model file: %s" + % model_path.name) + + with model_path.open(encoding="utf8") as f: + model_dict = json.load(f) + + # Create the classifier + config = LogRegIntentClassifierConfig.from_dict(model_dict["config"]) + intent_classifier = cls(config=config, **shared) + intent_classifier.intent_list = model_dict['intent_list'] + + # Create the underlying SGD classifier + sgd_classifier = None + coeffs = model_dict['coeffs'] + intercept = model_dict['intercept'] + t_ = model_dict["t_"] + if coeffs is not None and intercept is not None: + sgd_classifier = SGDClassifier(**LOG_REG_ARGS) + sgd_classifier.coef_ = np.array(coeffs) + sgd_classifier.intercept_ = np.array(intercept) + sgd_classifier.t_ = t_ + intent_classifier.classifier = sgd_classifier + + # Add the featurizer + featurizer = model_dict['featurizer'] + if featurizer is not None: + featurizer_path = path / featurizer + intent_classifier.featurizer = Featurizer.from_path( + featurizer_path, **shared) + + return intent_classifier + + def log_activation_weights(self, text, x, top_n=50): + import numpy as np + + if not hasattr(self.featurizer, "feature_index_to_feature_name"): + return None + + log = "\n\nTop {} feature activations for: \"{}\":\n".format( + top_n, text) + activations = np.multiply( + self.classifier.coef_, np.asarray(x.todense())) + abs_activation = np.absolute(activations).flatten().squeeze() + + if top_n > activations.size: + top_n = activations.size + + top_n_activations_ix = np.argpartition(abs_activation, -top_n, + axis=None)[-top_n:] + top_n_activations_ix = np.unravel_index( + top_n_activations_ix, activations.shape) + + index_to_feature = self.featurizer.feature_index_to_feature_name + features_intent_and_activation = [ + (self.intent_list[i], index_to_feature[f], activations[i, f]) + for i, f in zip(*top_n_activations_ix)] + + features_intent_and_activation = sorted( + features_intent_and_activation, key=lambda x: abs(x[2]), + reverse=True) + + for intent, feature, activation in features_intent_and_activation: + log += "\n\n\"{}\" -> ({}, {:.2f})".format( + intent, feature, float(activation)) + log += "\n\n" + return log diff --git a/snips_inference_agl/intent_classifier/log_reg_classifier_utils.py b/snips_inference_agl/intent_classifier/log_reg_classifier_utils.py new file mode 100644 index 0000000..75a8ab1 --- /dev/null +++ b/snips_inference_agl/intent_classifier/log_reg_classifier_utils.py @@ -0,0 +1,94 @@ +from __future__ import division, unicode_literals + +import itertools +import re +from builtins import next, range, str +from copy import deepcopy +from uuid import uuid4 + +from future.utils import iteritems, itervalues + +from snips_inference_agl.constants import (DATA, ENTITY, INTENTS, TEXT, + UNKNOWNWORD, UTTERANCES) +from snips_inference_agl.data_augmentation import augment_utterances +from snips_inference_agl.dataset import get_text_from_chunks +from snips_inference_agl.entity_parser.builtin_entity_parser import is_builtin_entity +from snips_inference_agl.preprocessing import tokenize_light +from snips_inference_agl.resources import get_noise + +NOISE_NAME = str(uuid4()) +WORD_REGEX = re.compile(r"\w+(\s+\w+)*") +UNKNOWNWORD_REGEX = re.compile(r"%s(\s+%s)*" % (UNKNOWNWORD, UNKNOWNWORD)) + + +def get_noise_it(noise, mean_length, std_length, random_state): + it = itertools.cycle(noise) + while True: + noise_length = int(random_state.normal(mean_length, std_length)) + # pylint: disable=stop-iteration-return + yield " ".join(next(it) for _ in range(noise_length)) + # pylint: enable=stop-iteration-return + + +def generate_smart_noise(noise, augmented_utterances, replacement_string, + language): + text_utterances = [get_text_from_chunks(u[DATA]) + for u in augmented_utterances] + vocab = [w for u in text_utterances for w in tokenize_light(u, language)] + vocab = set(vocab) + return [w if w in vocab else replacement_string for w in noise] + + +def generate_noise_utterances(augmented_utterances, noise, num_intents, + data_augmentation_config, language, + random_state): + import numpy as np + + if not augmented_utterances or not num_intents: + return [] + avg_num_utterances = len(augmented_utterances) / float(num_intents) + if data_augmentation_config.unknown_words_replacement_string is not None: + noise = generate_smart_noise( + noise, augmented_utterances, + data_augmentation_config.unknown_words_replacement_string, + language) + + noise_size = min( + int(data_augmentation_config.noise_factor * avg_num_utterances), + len(noise)) + utterances_lengths = [ + len(tokenize_light(get_text_from_chunks(u[DATA]), language)) + for u in augmented_utterances] + mean_utterances_length = np.mean(utterances_lengths) + std_utterances_length = np.std(utterances_lengths) + noise_it = get_noise_it(noise, mean_utterances_length, + std_utterances_length, random_state) + # Remove duplicate 'unknownword unknownword' + return [ + text_to_utterance(UNKNOWNWORD_REGEX.sub(UNKNOWNWORD, next(noise_it))) + for _ in range(noise_size)] + + +def add_unknown_word_to_utterances(utterances, replacement_string, + unknown_word_prob, max_unknown_words, + random_state): + if not max_unknown_words: + return utterances + + new_utterances = deepcopy(utterances) + for u in new_utterances: + if random_state.rand() < unknown_word_prob: + num_unknown = random_state.randint(1, max_unknown_words + 1) + # We choose to put the noise at the end of the sentence and not + # in the middle so that it doesn't impact to much ngrams + # computation + extra_chunk = { + TEXT: " " + " ".join( + replacement_string for _ in range(num_unknown)) + } + u[DATA].append(extra_chunk) + return new_utterances + + +def text_to_utterance(text): + return {DATA: [{TEXT: text}]} |