aboutsummaryrefslogtreecommitdiffstats
path: root/snips_inference_agl/intent_classifier
diff options
context:
space:
mode:
Diffstat (limited to 'snips_inference_agl/intent_classifier')
-rw-r--r--snips_inference_agl/intent_classifier/__init__.py3
-rw-r--r--snips_inference_agl/intent_classifier/featurizer.py452
-rw-r--r--snips_inference_agl/intent_classifier/intent_classifier.py51
-rw-r--r--snips_inference_agl/intent_classifier/log_reg_classifier.py211
-rw-r--r--snips_inference_agl/intent_classifier/log_reg_classifier_utils.py94
5 files changed, 811 insertions, 0 deletions
diff --git a/snips_inference_agl/intent_classifier/__init__.py b/snips_inference_agl/intent_classifier/__init__.py
new file mode 100644
index 0000000..89ccf95
--- /dev/null
+++ b/snips_inference_agl/intent_classifier/__init__.py
@@ -0,0 +1,3 @@
+from .intent_classifier import IntentClassifier
+from .log_reg_classifier import LogRegIntentClassifier
+from .featurizer import Featurizer, CooccurrenceVectorizer, TfidfVectorizer
diff --git a/snips_inference_agl/intent_classifier/featurizer.py b/snips_inference_agl/intent_classifier/featurizer.py
new file mode 100644
index 0000000..116837f
--- /dev/null
+++ b/snips_inference_agl/intent_classifier/featurizer.py
@@ -0,0 +1,452 @@
+from __future__ import division, unicode_literals
+
+import json
+from builtins import str, zip
+from copy import deepcopy
+from pathlib import Path
+
+from future.utils import iteritems
+
+from snips_inference_agl.common.utils import (
+ fitted_required, replace_entities_with_placeholders)
+from snips_inference_agl.constants import (
+ DATA, ENTITY, ENTITY_KIND, NGRAM, TEXT)
+from snips_inference_agl.dataset import get_text_from_chunks
+from snips_inference_agl.entity_parser.builtin_entity_parser import (
+ is_builtin_entity)
+from snips_inference_agl.exceptions import (LoadingError)
+from snips_inference_agl.languages import get_default_sep
+from snips_inference_agl.pipeline.configs import FeaturizerConfig
+from snips_inference_agl.pipeline.configs.intent_classifier import (
+ CooccurrenceVectorizerConfig, TfidfVectorizerConfig)
+from snips_inference_agl.pipeline.processing_unit import ProcessingUnit
+from snips_inference_agl.preprocessing import stem, tokenize_light
+from snips_inference_agl.resources import get_stop_words, get_word_cluster
+from snips_inference_agl.slot_filler.features_utils import get_all_ngrams
+
+
+@ProcessingUnit.register("featurizer")
+class Featurizer(ProcessingUnit):
+ """Feature extractor for text classification relying on ngrams tfidf and
+ optionally word cooccurrences features"""
+
+ config_type = FeaturizerConfig
+
+ def __init__(self, config=None, **shared):
+ super(Featurizer, self).__init__(config, **shared)
+ self.language = None
+ self.tfidf_vectorizer = None
+ self.cooccurrence_vectorizer = None
+
+ @property
+ def fitted(self):
+ if not self.tfidf_vectorizer or not self.tfidf_vectorizer.vocabulary:
+ return False
+ return True
+
+ def transform(self, utterances):
+ import scipy.sparse as sp
+
+ x = self.tfidf_vectorizer.transform(utterances)
+ if self.cooccurrence_vectorizer:
+ x_cooccurrence = self.cooccurrence_vectorizer.transform(utterances)
+ x = sp.hstack((x, x_cooccurrence))
+ return x
+
+ @classmethod
+ def from_path(cls, path, **shared):
+ path = Path(path)
+
+ model_path = path / "featurizer.json"
+ if not model_path.exists():
+ raise LoadingError("Missing featurizer model file: %s"
+ % model_path.name)
+ with model_path.open("r", encoding="utf-8") as f:
+ featurizer_dict = json.load(f)
+
+ featurizer_config = featurizer_dict["config"]
+ featurizer = cls(featurizer_config, **shared)
+
+ featurizer.language = featurizer_dict["language_code"]
+
+ tfidf_vectorizer = featurizer_dict["tfidf_vectorizer"]
+ if tfidf_vectorizer:
+ vectorizer_path = path / featurizer_dict["tfidf_vectorizer"]
+ tfidf_vectorizer = TfidfVectorizer.from_path(
+ vectorizer_path, **shared)
+ featurizer.tfidf_vectorizer = tfidf_vectorizer
+
+ cooccurrence_vectorizer = featurizer_dict["cooccurrence_vectorizer"]
+ if cooccurrence_vectorizer:
+ vectorizer_path = path / featurizer_dict["cooccurrence_vectorizer"]
+ cooccurrence_vectorizer = CooccurrenceVectorizer.from_path(
+ vectorizer_path, **shared)
+ featurizer.cooccurrence_vectorizer = cooccurrence_vectorizer
+
+ return featurizer
+
+
+@ProcessingUnit.register("tfidf_vectorizer")
+class TfidfVectorizer(ProcessingUnit):
+ """Wrapper of the scikit-learn TfidfVectorizer"""
+
+ config_type = TfidfVectorizerConfig
+
+ def __init__(self, config=None, **shared):
+ super(TfidfVectorizer, self).__init__(config, **shared)
+ self._tfidf_vectorizer = None
+ self._language = None
+ self.builtin_entity_scope = None
+
+ @property
+ def fitted(self):
+ return self._tfidf_vectorizer is not None and hasattr(
+ self._tfidf_vectorizer, "vocabulary_")
+
+ @fitted_required
+ def transform(self, x):
+ """Featurizes the given utterances after enriching them with builtin
+ entities matches, custom entities matches and the potential word
+ clusters matches
+
+ Args:
+ x (list of dict): list of utterances
+
+ Returns:
+ :class:`.scipy.sparse.csr_matrix`: A sparse matrix X of shape
+ (len(x), len(self.vocabulary)) where X[i, j] contains tfdif of
+ the ngram of index j of the vocabulary in the utterance i
+
+ Raises:
+ NotTrained: when the vectorizer is not fitted:
+ """
+ utterances = [self._enrich_utterance(*data)
+ for data in zip(*self._preprocess(x))]
+ return self._tfidf_vectorizer.transform(utterances)
+
+ def _preprocess(self, utterances):
+ normalized_utterances = deepcopy(utterances)
+ for u in normalized_utterances:
+ nb_chunks = len(u[DATA])
+ for i, chunk in enumerate(u[DATA]):
+ chunk[TEXT] = _normalize_stem(
+ chunk[TEXT], self.language, self.resources,
+ self.config.use_stemming)
+ if i < nb_chunks - 1:
+ chunk[TEXT] += " "
+
+ # Extract builtin entities on unormalized utterances
+ builtin_ents = [
+ self.builtin_entity_parser.parse(
+ get_text_from_chunks(u[DATA]),
+ self.builtin_entity_scope, use_cache=True)
+ for u in utterances
+ ]
+ # Extract builtin entities on normalized utterances
+ custom_ents = [
+ self.custom_entity_parser.parse(
+ get_text_from_chunks(u[DATA]), use_cache=True)
+ for u in normalized_utterances
+ ]
+ if self.config.word_clusters_name:
+ # Extract world clusters on unormalized utterances
+ original_utterances_text = [get_text_from_chunks(u[DATA])
+ for u in utterances]
+ w_clusters = [
+ _get_word_cluster_features(
+ tokenize_light(u.lower(), self.language),
+ self.config.word_clusters_name,
+ self.resources)
+ for u in original_utterances_text
+ ]
+ else:
+ w_clusters = [None for _ in normalized_utterances]
+
+ return normalized_utterances, builtin_ents, custom_ents, w_clusters
+
+ def _enrich_utterance(self, utterance, builtin_entities, custom_entities,
+ word_clusters):
+ custom_entities_features = [
+ _entity_name_to_feature(e[ENTITY_KIND], self.language)
+ for e in custom_entities]
+
+ builtin_entities_features = [
+ _builtin_entity_to_feature(ent[ENTITY_KIND], self.language)
+ for ent in builtin_entities
+ ]
+
+ # We remove values of builtin slots from the utterance to avoid
+ # learning specific samples such as '42' or 'tomorrow'
+ filtered_tokens = [
+ chunk[TEXT] for chunk in utterance[DATA]
+ if ENTITY not in chunk or not is_builtin_entity(chunk[ENTITY])
+ ]
+
+ features = get_default_sep(self.language).join(filtered_tokens)
+
+ if builtin_entities_features:
+ features += " " + " ".join(sorted(builtin_entities_features))
+ if custom_entities_features:
+ features += " " + " ".join(sorted(custom_entities_features))
+ if word_clusters:
+ features += " " + " ".join(sorted(word_clusters))
+
+ return features
+
+ @property
+ def language(self):
+ # Create this getter to prevent the language from being set elsewhere
+ # than in the fit
+ return self._language
+
+ @property
+ def vocabulary(self):
+ if self._tfidf_vectorizer and hasattr(
+ self._tfidf_vectorizer, "vocabulary_"):
+ return self._tfidf_vectorizer.vocabulary_
+ return None
+
+ @property
+ def idf_diag(self):
+ if self._tfidf_vectorizer and hasattr(
+ self._tfidf_vectorizer, "vocabulary_"):
+ return self._tfidf_vectorizer.idf_
+ return None
+
+ @classmethod
+ # pylint: disable=W0212
+ def from_path(cls, path, **shared):
+ import numpy as np
+ import scipy.sparse as sp
+ from sklearn.feature_extraction.text import (
+ TfidfTransformer, TfidfVectorizer as SklearnTfidfVectorizer)
+
+ path = Path(path)
+
+ model_path = path / "vectorizer.json"
+ if not model_path.exists():
+ raise LoadingError("Missing vectorizer model file: %s"
+ % model_path.name)
+ with model_path.open("r", encoding="utf-8") as f:
+ vectorizer_dict = json.load(f)
+
+ vectorizer = cls(vectorizer_dict["config"], **shared)
+ vectorizer._language = vectorizer_dict["language_code"]
+
+ builtin_entity_scope = vectorizer_dict["builtin_entity_scope"]
+ if builtin_entity_scope is not None:
+ builtin_entity_scope = set(builtin_entity_scope)
+ vectorizer.builtin_entity_scope = builtin_entity_scope
+
+ vectorizer_ = vectorizer_dict["vectorizer"]
+ if vectorizer_:
+ vocab = vectorizer_["vocab"]
+ idf_diag_data = vectorizer_["idf_diag"]
+ idf_diag_data = np.array(idf_diag_data)
+
+ idf_diag_shape = (len(idf_diag_data), len(idf_diag_data))
+ row = list(range(idf_diag_shape[0]))
+ col = list(range(idf_diag_shape[0]))
+ idf_diag = sp.csr_matrix(
+ (idf_diag_data, (row, col)), shape=idf_diag_shape)
+
+ tfidf_transformer = TfidfTransformer()
+ tfidf_transformer._idf_diag = idf_diag
+
+ vectorizer_ = SklearnTfidfVectorizer(
+ tokenizer=lambda x: tokenize_light(x, vectorizer._language))
+ vectorizer_.vocabulary_ = vocab
+
+ vectorizer_._tfidf = tfidf_transformer
+
+ vectorizer._tfidf_vectorizer = vectorizer_
+ return vectorizer
+
+
+@ProcessingUnit.register("cooccurrence_vectorizer")
+class CooccurrenceVectorizer(ProcessingUnit):
+ """Featurizer that takes utterances and extracts ordered word cooccurrence
+ features matrix from them"""
+
+ config_type = CooccurrenceVectorizerConfig
+
+ def __init__(self, config=None, **shared):
+ super(CooccurrenceVectorizer, self).__init__(config, **shared)
+ self._word_pairs = None
+ self._language = None
+ self.builtin_entity_scope = None
+
+ @property
+ def language(self):
+ # Create this getter to prevent the language from being set elsewhere
+ # than in the fit
+ return self._language
+
+ @property
+ def word_pairs(self):
+ return self._word_pairs
+
+ @property
+ def fitted(self):
+ """Whether or not the vectorizer is fitted"""
+ return self.word_pairs is not None
+
+ @fitted_required
+ def transform(self, x):
+ """Computes the cooccurrence feature matrix.
+
+ Args:
+ x (list of dict): list of utterances
+
+ Returns:
+ :class:`.scipy.sparse.csr_matrix`: A sparse matrix X of shape
+ (len(x), len(self.word_pairs)) where X[i, j] = 1.0 if
+ x[i][0] contains the words cooccurrence (w1, w2) and if
+ self.word_pairs[(w1, w2)] = j
+
+ Raises:
+ NotTrained: when the vectorizer is not fitted
+ """
+ import numpy as np
+ import scipy.sparse as sp
+
+ preprocessed = self._preprocess(x)
+ utterances = [
+ self._enrich_utterance(utterance, builtin_ents, custom_ent)
+ for utterance, builtin_ents, custom_ent in zip(*preprocessed)]
+
+ x_coo = sp.dok_matrix((len(x), len(self.word_pairs)), dtype=np.int32)
+ for i, u in enumerate(utterances):
+ for p in self._extract_word_pairs(u):
+ if p in self.word_pairs:
+ x_coo[i, self.word_pairs[p]] = 1
+
+ return x_coo.tocsr()
+
+ def _preprocess(self, x):
+ # Extract all entities on unnormalized data
+ builtin_ents = [
+ self.builtin_entity_parser.parse(
+ get_text_from_chunks(u[DATA]),
+ self.builtin_entity_scope,
+ use_cache=True
+ ) for u in x
+ ]
+ custom_ents = [
+ self.custom_entity_parser.parse(
+ get_text_from_chunks(u[DATA]), use_cache=True)
+ for u in x
+ ]
+ return x, builtin_ents, custom_ents
+
+ def _extract_word_pairs(self, utterance):
+ if self.config.filter_stop_words:
+ stop_words = get_stop_words(self.resources)
+ utterance = [t for t in utterance if t not in stop_words]
+ pairs = set()
+ for j, w1 in enumerate(utterance):
+ max_index = None
+ if self.config.window_size is not None:
+ max_index = j + self.config.window_size + 1
+ for w2 in utterance[j + 1:max_index]:
+ key = (w1, w2)
+ if not self.config.keep_order:
+ key = tuple(sorted(key))
+ pairs.add(key)
+ return pairs
+
+ def _enrich_utterance(self, x, builtin_ents, custom_ents):
+ utterance = get_text_from_chunks(x[DATA])
+ all_entities = builtin_ents + custom_ents
+ placeholder_fn = self._placeholder_fn
+ # Replace entities with placeholders
+ enriched_utterance = replace_entities_with_placeholders(
+ utterance, all_entities, placeholder_fn)[1]
+ # Tokenize
+ enriched_utterance = tokenize_light(enriched_utterance, self.language)
+ # Remove the unknownword strings if needed
+ if self.config.unknown_words_replacement_string:
+ enriched_utterance = [
+ t for t in enriched_utterance
+ if t != self.config.unknown_words_replacement_string
+ ]
+ return enriched_utterance
+
+ def _extract_word_pairs(self, utterance):
+ if self.config.filter_stop_words:
+ stop_words = get_stop_words(self.resources)
+ utterance = [t for t in utterance if t not in stop_words]
+ pairs = set()
+ for j, w1 in enumerate(utterance):
+ max_index = None
+ if self.config.window_size is not None:
+ max_index = j + self.config.window_size + 1
+ for w2 in utterance[j + 1:max_index]:
+ key = (w1, w2)
+ if not self.config.keep_order:
+ key = tuple(sorted(key))
+ pairs.add(key)
+ return pairs
+
+ def _placeholder_fn(self, entity_name):
+ return "".join(
+ tokenize_light(str(entity_name), str(self.language))).upper()
+
+ @classmethod
+ # pylint: disable=protected-access
+ def from_path(cls, path, **shared):
+ path = Path(path)
+ model_path = path / "vectorizer.json"
+ if not model_path.exists():
+ raise LoadingError("Missing vectorizer model file: %s"
+ % model_path.name)
+
+ with model_path.open(encoding="utf8") as f:
+ vectorizer_dict = json.load(f)
+ config = vectorizer_dict.pop("config")
+
+ self = cls(config, **shared)
+ self._language = vectorizer_dict["language_code"]
+ self._word_pairs = None
+
+ builtin_entity_scope = vectorizer_dict["builtin_entity_scope"]
+ if builtin_entity_scope is not None:
+ builtin_entity_scope = set(builtin_entity_scope)
+ self.builtin_entity_scope = builtin_entity_scope
+
+ if vectorizer_dict["word_pairs"]:
+ self._word_pairs = {
+ tuple(p): int(i)
+ for i, p in iteritems(vectorizer_dict["word_pairs"])
+ }
+ return self
+
+def _entity_name_to_feature(entity_name, language):
+ return "entityfeature%s" % "".join(tokenize_light(
+ entity_name.lower(), language))
+
+
+def _builtin_entity_to_feature(builtin_entity_label, language):
+ return "builtinentityfeature%s" % "".join(tokenize_light(
+ builtin_entity_label.lower(), language))
+
+
+def _normalize_stem(text, language, resources, use_stemming):
+ from snips_nlu_utils import normalize
+
+ if use_stemming:
+ return stem(text, language, resources)
+ return normalize(text)
+
+
+def _get_word_cluster_features(query_tokens, clusters_name, resources):
+ if not clusters_name:
+ return []
+ ngrams = get_all_ngrams(query_tokens)
+ cluster_features = []
+ for ngram in ngrams:
+ cluster = get_word_cluster(resources, clusters_name).get(
+ ngram[NGRAM].lower(), None)
+ if cluster is not None:
+ cluster_features.append(cluster)
+ return cluster_features
diff --git a/snips_inference_agl/intent_classifier/intent_classifier.py b/snips_inference_agl/intent_classifier/intent_classifier.py
new file mode 100644
index 0000000..f9a7952
--- /dev/null
+++ b/snips_inference_agl/intent_classifier/intent_classifier.py
@@ -0,0 +1,51 @@
+from abc import ABCMeta
+
+from future.utils import with_metaclass
+
+from snips_inference_agl.pipeline.processing_unit import ProcessingUnit
+from snips_inference_agl.common.abc_utils import classproperty
+
+
+class IntentClassifier(with_metaclass(ABCMeta, ProcessingUnit)):
+ """Abstraction which performs intent classification
+
+ A custom intent classifier must inherit this class to be used in a
+ :class:`.ProbabilisticIntentParser`
+ """
+
+ @classproperty
+ def unit_name(cls): # pylint:disable=no-self-argument
+ return IntentClassifier.registered_name(cls)
+
+ # @abstractmethod
+ def get_intent(self, text, intents_filter):
+ """Performs intent classification on the provided *text*
+
+ Args:
+ text (str): Input
+ intents_filter (str or list of str): When defined, it will find
+ the most likely intent among the list, otherwise it will use
+ the whole list of intents defined in the dataset
+
+ Returns:
+ dict or None: The most likely intent along with its probability or
+ *None* if no intent was found. See
+ :func:`.intent_classification_result` for the output format.
+ """
+ pass
+
+ # @abstractmethod
+ def get_intents(self, text):
+ """Performs intent classification on the provided *text* and returns
+ the list of intents ordered by decreasing probability
+
+ The length of the returned list is exactly the number of intents in the
+ dataset + 1 for the None intent
+
+ .. note::
+
+ The probabilities returned along with each intent are not
+ guaranteed to sum to 1.0. They should be considered as scores
+ between 0 and 1.
+ """
+ pass
diff --git a/snips_inference_agl/intent_classifier/log_reg_classifier.py b/snips_inference_agl/intent_classifier/log_reg_classifier.py
new file mode 100644
index 0000000..09e537c
--- /dev/null
+++ b/snips_inference_agl/intent_classifier/log_reg_classifier.py
@@ -0,0 +1,211 @@
+from __future__ import unicode_literals
+
+import json
+import logging
+from builtins import str, zip
+from pathlib import Path
+
+from snips_inference_agl.common.log_utils import DifferedLoggingMessage
+from snips_inference_agl.common.utils import (fitted_required)
+from snips_inference_agl.constants import RES_PROBA
+from snips_inference_agl.exceptions import LoadingError
+from snips_inference_agl.intent_classifier.featurizer import Featurizer
+from snips_inference_agl.intent_classifier.intent_classifier import IntentClassifier
+from snips_inference_agl.intent_classifier.log_reg_classifier_utils import (text_to_utterance)
+from snips_inference_agl.pipeline.configs import LogRegIntentClassifierConfig
+from snips_inference_agl.result import intent_classification_result
+
+logger = logging.getLogger(__name__)
+
+# We set tol to 1e-3 to silence the following warning with Python 2 (
+# scikit-learn 0.20):
+#
+# FutureWarning: max_iter and tol parameters have been added in SGDClassifier
+# in 0.19. If max_iter is set but tol is left unset, the default value for tol
+# in 0.19 and 0.20 will be None (which is equivalent to -infinity, so it has no
+# effect) but will change in 0.21 to 1e-3. Specify tol to silence this warning.
+
+LOG_REG_ARGS = {
+ "loss": "log",
+ "penalty": "l2",
+ "max_iter": 1000,
+ "tol": 1e-3,
+ "n_jobs": -1
+}
+
+
+@IntentClassifier.register("log_reg_intent_classifier")
+class LogRegIntentClassifier(IntentClassifier):
+ """Intent classifier which uses a Logistic Regression underneath"""
+
+ config_type = LogRegIntentClassifierConfig
+
+ def __init__(self, config=None, **shared):
+ """The LogReg intent classifier can be configured by passing a
+ :class:`.LogRegIntentClassifierConfig`"""
+ super(LogRegIntentClassifier, self).__init__(config, **shared)
+ self.classifier = None
+ self.intent_list = None
+ self.featurizer = None
+
+ @property
+ def fitted(self):
+ """Whether or not the intent classifier has already been fitted"""
+ return self.intent_list is not None
+
+ @fitted_required
+ def get_intent(self, text, intents_filter=None):
+ """Performs intent classification on the provided *text*
+
+ Args:
+ text (str): Input
+ intents_filter (str or list of str): When defined, it will find
+ the most likely intent among the list, otherwise it will use
+ the whole list of intents defined in the dataset
+
+ Returns:
+ dict or None: The most likely intent along with its probability or
+ *None* if no intent was found
+
+ Raises:
+ :class:`snips_nlu.exceptions.NotTrained`: When the intent
+ classifier is not fitted
+
+ """
+ return self._get_intents(text, intents_filter)[0]
+
+ @fitted_required
+ def get_intents(self, text):
+ """Performs intent classification on the provided *text* and returns
+ the list of intents ordered by decreasing probability
+
+ The length of the returned list is exactly the number of intents in the
+ dataset + 1 for the None intent
+
+ Raises:
+ :class:`snips_nlu.exceptions.NotTrained`: when the intent
+ classifier is not fitted
+ """
+ return self._get_intents(text, intents_filter=None)
+
+ def _get_intents(self, text, intents_filter):
+ if isinstance(intents_filter, str):
+ intents_filter = {intents_filter}
+ elif isinstance(intents_filter, list):
+ intents_filter = set(intents_filter)
+
+ if not text or not self.intent_list or not self.featurizer:
+ results = [intent_classification_result(None, 1.0)]
+ results += [intent_classification_result(i, 0.0)
+ for i in self.intent_list if i is not None]
+ return results
+
+ if len(self.intent_list) == 1:
+ return [intent_classification_result(self.intent_list[0], 1.0)]
+
+ # pylint: disable=C0103
+ X = self.featurizer.transform([text_to_utterance(text)])
+ # pylint: enable=C0103
+ proba_vec = self._predict_proba(X)
+ logger.debug(
+ "%s", DifferedLoggingMessage(self.log_activation_weights, text, X))
+ results = [
+ intent_classification_result(i, proba)
+ for i, proba in zip(self.intent_list, proba_vec[0])
+ if intents_filter is None or i is None or i in intents_filter]
+
+ return sorted(results, key=lambda res: -res[RES_PROBA])
+
+ def _predict_proba(self, X): # pylint: disable=C0103
+ import numpy as np
+
+ self.classifier._check_proba() # pylint: disable=W0212
+
+ prob = self.classifier.decision_function(X)
+ prob *= -1
+ np.exp(prob, prob)
+ prob += 1
+ np.reciprocal(prob, prob)
+ if prob.ndim == 1:
+ return np.vstack([1 - prob, prob]).T
+ return prob
+
+ @classmethod
+ def from_path(cls, path, **shared):
+ """Loads a :class:`LogRegIntentClassifier` instance from a path
+
+ The data at the given path must have been generated using
+ :func:`~LogRegIntentClassifier.persist`
+ """
+ import numpy as np
+ from sklearn.linear_model import SGDClassifier
+
+ path = Path(path)
+ model_path = path / "intent_classifier.json"
+ if not model_path.exists():
+ raise LoadingError("Missing intent classifier model file: %s"
+ % model_path.name)
+
+ with model_path.open(encoding="utf8") as f:
+ model_dict = json.load(f)
+
+ # Create the classifier
+ config = LogRegIntentClassifierConfig.from_dict(model_dict["config"])
+ intent_classifier = cls(config=config, **shared)
+ intent_classifier.intent_list = model_dict['intent_list']
+
+ # Create the underlying SGD classifier
+ sgd_classifier = None
+ coeffs = model_dict['coeffs']
+ intercept = model_dict['intercept']
+ t_ = model_dict["t_"]
+ if coeffs is not None and intercept is not None:
+ sgd_classifier = SGDClassifier(**LOG_REG_ARGS)
+ sgd_classifier.coef_ = np.array(coeffs)
+ sgd_classifier.intercept_ = np.array(intercept)
+ sgd_classifier.t_ = t_
+ intent_classifier.classifier = sgd_classifier
+
+ # Add the featurizer
+ featurizer = model_dict['featurizer']
+ if featurizer is not None:
+ featurizer_path = path / featurizer
+ intent_classifier.featurizer = Featurizer.from_path(
+ featurizer_path, **shared)
+
+ return intent_classifier
+
+ def log_activation_weights(self, text, x, top_n=50):
+ import numpy as np
+
+ if not hasattr(self.featurizer, "feature_index_to_feature_name"):
+ return None
+
+ log = "\n\nTop {} feature activations for: \"{}\":\n".format(
+ top_n, text)
+ activations = np.multiply(
+ self.classifier.coef_, np.asarray(x.todense()))
+ abs_activation = np.absolute(activations).flatten().squeeze()
+
+ if top_n > activations.size:
+ top_n = activations.size
+
+ top_n_activations_ix = np.argpartition(abs_activation, -top_n,
+ axis=None)[-top_n:]
+ top_n_activations_ix = np.unravel_index(
+ top_n_activations_ix, activations.shape)
+
+ index_to_feature = self.featurizer.feature_index_to_feature_name
+ features_intent_and_activation = [
+ (self.intent_list[i], index_to_feature[f], activations[i, f])
+ for i, f in zip(*top_n_activations_ix)]
+
+ features_intent_and_activation = sorted(
+ features_intent_and_activation, key=lambda x: abs(x[2]),
+ reverse=True)
+
+ for intent, feature, activation in features_intent_and_activation:
+ log += "\n\n\"{}\" -> ({}, {:.2f})".format(
+ intent, feature, float(activation))
+ log += "\n\n"
+ return log
diff --git a/snips_inference_agl/intent_classifier/log_reg_classifier_utils.py b/snips_inference_agl/intent_classifier/log_reg_classifier_utils.py
new file mode 100644
index 0000000..75a8ab1
--- /dev/null
+++ b/snips_inference_agl/intent_classifier/log_reg_classifier_utils.py
@@ -0,0 +1,94 @@
+from __future__ import division, unicode_literals
+
+import itertools
+import re
+from builtins import next, range, str
+from copy import deepcopy
+from uuid import uuid4
+
+from future.utils import iteritems, itervalues
+
+from snips_inference_agl.constants import (DATA, ENTITY, INTENTS, TEXT,
+ UNKNOWNWORD, UTTERANCES)
+from snips_inference_agl.data_augmentation import augment_utterances
+from snips_inference_agl.dataset import get_text_from_chunks
+from snips_inference_agl.entity_parser.builtin_entity_parser import is_builtin_entity
+from snips_inference_agl.preprocessing import tokenize_light
+from snips_inference_agl.resources import get_noise
+
+NOISE_NAME = str(uuid4())
+WORD_REGEX = re.compile(r"\w+(\s+\w+)*")
+UNKNOWNWORD_REGEX = re.compile(r"%s(\s+%s)*" % (UNKNOWNWORD, UNKNOWNWORD))
+
+
+def get_noise_it(noise, mean_length, std_length, random_state):
+ it = itertools.cycle(noise)
+ while True:
+ noise_length = int(random_state.normal(mean_length, std_length))
+ # pylint: disable=stop-iteration-return
+ yield " ".join(next(it) for _ in range(noise_length))
+ # pylint: enable=stop-iteration-return
+
+
+def generate_smart_noise(noise, augmented_utterances, replacement_string,
+ language):
+ text_utterances = [get_text_from_chunks(u[DATA])
+ for u in augmented_utterances]
+ vocab = [w for u in text_utterances for w in tokenize_light(u, language)]
+ vocab = set(vocab)
+ return [w if w in vocab else replacement_string for w in noise]
+
+
+def generate_noise_utterances(augmented_utterances, noise, num_intents,
+ data_augmentation_config, language,
+ random_state):
+ import numpy as np
+
+ if not augmented_utterances or not num_intents:
+ return []
+ avg_num_utterances = len(augmented_utterances) / float(num_intents)
+ if data_augmentation_config.unknown_words_replacement_string is not None:
+ noise = generate_smart_noise(
+ noise, augmented_utterances,
+ data_augmentation_config.unknown_words_replacement_string,
+ language)
+
+ noise_size = min(
+ int(data_augmentation_config.noise_factor * avg_num_utterances),
+ len(noise))
+ utterances_lengths = [
+ len(tokenize_light(get_text_from_chunks(u[DATA]), language))
+ for u in augmented_utterances]
+ mean_utterances_length = np.mean(utterances_lengths)
+ std_utterances_length = np.std(utterances_lengths)
+ noise_it = get_noise_it(noise, mean_utterances_length,
+ std_utterances_length, random_state)
+ # Remove duplicate 'unknownword unknownword'
+ return [
+ text_to_utterance(UNKNOWNWORD_REGEX.sub(UNKNOWNWORD, next(noise_it)))
+ for _ in range(noise_size)]
+
+
+def add_unknown_word_to_utterances(utterances, replacement_string,
+ unknown_word_prob, max_unknown_words,
+ random_state):
+ if not max_unknown_words:
+ return utterances
+
+ new_utterances = deepcopy(utterances)
+ for u in new_utterances:
+ if random_state.rand() < unknown_word_prob:
+ num_unknown = random_state.randint(1, max_unknown_words + 1)
+ # We choose to put the noise at the end of the sentence and not
+ # in the middle so that it doesn't impact to much ngrams
+ # computation
+ extra_chunk = {
+ TEXT: " " + " ".join(
+ replacement_string for _ in range(num_unknown))
+ }
+ u[DATA].append(extra_chunk)
+ return new_utterances
+
+
+def text_to_utterance(text):
+ return {DATA: [{TEXT: text}]}