aboutsummaryrefslogtreecommitdiffstats
path: root/snips_inference_agl/intent_classifier/featurizer.py
diff options
context:
space:
mode:
Diffstat (limited to 'snips_inference_agl/intent_classifier/featurizer.py')
-rw-r--r--snips_inference_agl/intent_classifier/featurizer.py452
1 files changed, 452 insertions, 0 deletions
diff --git a/snips_inference_agl/intent_classifier/featurizer.py b/snips_inference_agl/intent_classifier/featurizer.py
new file mode 100644
index 0000000..116837f
--- /dev/null
+++ b/snips_inference_agl/intent_classifier/featurizer.py
@@ -0,0 +1,452 @@
+from __future__ import division, unicode_literals
+
+import json
+from builtins import str, zip
+from copy import deepcopy
+from pathlib import Path
+
+from future.utils import iteritems
+
+from snips_inference_agl.common.utils import (
+ fitted_required, replace_entities_with_placeholders)
+from snips_inference_agl.constants import (
+ DATA, ENTITY, ENTITY_KIND, NGRAM, TEXT)
+from snips_inference_agl.dataset import get_text_from_chunks
+from snips_inference_agl.entity_parser.builtin_entity_parser import (
+ is_builtin_entity)
+from snips_inference_agl.exceptions import (LoadingError)
+from snips_inference_agl.languages import get_default_sep
+from snips_inference_agl.pipeline.configs import FeaturizerConfig
+from snips_inference_agl.pipeline.configs.intent_classifier import (
+ CooccurrenceVectorizerConfig, TfidfVectorizerConfig)
+from snips_inference_agl.pipeline.processing_unit import ProcessingUnit
+from snips_inference_agl.preprocessing import stem, tokenize_light
+from snips_inference_agl.resources import get_stop_words, get_word_cluster
+from snips_inference_agl.slot_filler.features_utils import get_all_ngrams
+
+
+@ProcessingUnit.register("featurizer")
+class Featurizer(ProcessingUnit):
+ """Feature extractor for text classification relying on ngrams tfidf and
+ optionally word cooccurrences features"""
+
+ config_type = FeaturizerConfig
+
+ def __init__(self, config=None, **shared):
+ super(Featurizer, self).__init__(config, **shared)
+ self.language = None
+ self.tfidf_vectorizer = None
+ self.cooccurrence_vectorizer = None
+
+ @property
+ def fitted(self):
+ if not self.tfidf_vectorizer or not self.tfidf_vectorizer.vocabulary:
+ return False
+ return True
+
+ def transform(self, utterances):
+ import scipy.sparse as sp
+
+ x = self.tfidf_vectorizer.transform(utterances)
+ if self.cooccurrence_vectorizer:
+ x_cooccurrence = self.cooccurrence_vectorizer.transform(utterances)
+ x = sp.hstack((x, x_cooccurrence))
+ return x
+
+ @classmethod
+ def from_path(cls, path, **shared):
+ path = Path(path)
+
+ model_path = path / "featurizer.json"
+ if not model_path.exists():
+ raise LoadingError("Missing featurizer model file: %s"
+ % model_path.name)
+ with model_path.open("r", encoding="utf-8") as f:
+ featurizer_dict = json.load(f)
+
+ featurizer_config = featurizer_dict["config"]
+ featurizer = cls(featurizer_config, **shared)
+
+ featurizer.language = featurizer_dict["language_code"]
+
+ tfidf_vectorizer = featurizer_dict["tfidf_vectorizer"]
+ if tfidf_vectorizer:
+ vectorizer_path = path / featurizer_dict["tfidf_vectorizer"]
+ tfidf_vectorizer = TfidfVectorizer.from_path(
+ vectorizer_path, **shared)
+ featurizer.tfidf_vectorizer = tfidf_vectorizer
+
+ cooccurrence_vectorizer = featurizer_dict["cooccurrence_vectorizer"]
+ if cooccurrence_vectorizer:
+ vectorizer_path = path / featurizer_dict["cooccurrence_vectorizer"]
+ cooccurrence_vectorizer = CooccurrenceVectorizer.from_path(
+ vectorizer_path, **shared)
+ featurizer.cooccurrence_vectorizer = cooccurrence_vectorizer
+
+ return featurizer
+
+
+@ProcessingUnit.register("tfidf_vectorizer")
+class TfidfVectorizer(ProcessingUnit):
+ """Wrapper of the scikit-learn TfidfVectorizer"""
+
+ config_type = TfidfVectorizerConfig
+
+ def __init__(self, config=None, **shared):
+ super(TfidfVectorizer, self).__init__(config, **shared)
+ self._tfidf_vectorizer = None
+ self._language = None
+ self.builtin_entity_scope = None
+
+ @property
+ def fitted(self):
+ return self._tfidf_vectorizer is not None and hasattr(
+ self._tfidf_vectorizer, "vocabulary_")
+
+ @fitted_required
+ def transform(self, x):
+ """Featurizes the given utterances after enriching them with builtin
+ entities matches, custom entities matches and the potential word
+ clusters matches
+
+ Args:
+ x (list of dict): list of utterances
+
+ Returns:
+ :class:`.scipy.sparse.csr_matrix`: A sparse matrix X of shape
+ (len(x), len(self.vocabulary)) where X[i, j] contains tfdif of
+ the ngram of index j of the vocabulary in the utterance i
+
+ Raises:
+ NotTrained: when the vectorizer is not fitted:
+ """
+ utterances = [self._enrich_utterance(*data)
+ for data in zip(*self._preprocess(x))]
+ return self._tfidf_vectorizer.transform(utterances)
+
+ def _preprocess(self, utterances):
+ normalized_utterances = deepcopy(utterances)
+ for u in normalized_utterances:
+ nb_chunks = len(u[DATA])
+ for i, chunk in enumerate(u[DATA]):
+ chunk[TEXT] = _normalize_stem(
+ chunk[TEXT], self.language, self.resources,
+ self.config.use_stemming)
+ if i < nb_chunks - 1:
+ chunk[TEXT] += " "
+
+ # Extract builtin entities on unormalized utterances
+ builtin_ents = [
+ self.builtin_entity_parser.parse(
+ get_text_from_chunks(u[DATA]),
+ self.builtin_entity_scope, use_cache=True)
+ for u in utterances
+ ]
+ # Extract builtin entities on normalized utterances
+ custom_ents = [
+ self.custom_entity_parser.parse(
+ get_text_from_chunks(u[DATA]), use_cache=True)
+ for u in normalized_utterances
+ ]
+ if self.config.word_clusters_name:
+ # Extract world clusters on unormalized utterances
+ original_utterances_text = [get_text_from_chunks(u[DATA])
+ for u in utterances]
+ w_clusters = [
+ _get_word_cluster_features(
+ tokenize_light(u.lower(), self.language),
+ self.config.word_clusters_name,
+ self.resources)
+ for u in original_utterances_text
+ ]
+ else:
+ w_clusters = [None for _ in normalized_utterances]
+
+ return normalized_utterances, builtin_ents, custom_ents, w_clusters
+
+ def _enrich_utterance(self, utterance, builtin_entities, custom_entities,
+ word_clusters):
+ custom_entities_features = [
+ _entity_name_to_feature(e[ENTITY_KIND], self.language)
+ for e in custom_entities]
+
+ builtin_entities_features = [
+ _builtin_entity_to_feature(ent[ENTITY_KIND], self.language)
+ for ent in builtin_entities
+ ]
+
+ # We remove values of builtin slots from the utterance to avoid
+ # learning specific samples such as '42' or 'tomorrow'
+ filtered_tokens = [
+ chunk[TEXT] for chunk in utterance[DATA]
+ if ENTITY not in chunk or not is_builtin_entity(chunk[ENTITY])
+ ]
+
+ features = get_default_sep(self.language).join(filtered_tokens)
+
+ if builtin_entities_features:
+ features += " " + " ".join(sorted(builtin_entities_features))
+ if custom_entities_features:
+ features += " " + " ".join(sorted(custom_entities_features))
+ if word_clusters:
+ features += " " + " ".join(sorted(word_clusters))
+
+ return features
+
+ @property
+ def language(self):
+ # Create this getter to prevent the language from being set elsewhere
+ # than in the fit
+ return self._language
+
+ @property
+ def vocabulary(self):
+ if self._tfidf_vectorizer and hasattr(
+ self._tfidf_vectorizer, "vocabulary_"):
+ return self._tfidf_vectorizer.vocabulary_
+ return None
+
+ @property
+ def idf_diag(self):
+ if self._tfidf_vectorizer and hasattr(
+ self._tfidf_vectorizer, "vocabulary_"):
+ return self._tfidf_vectorizer.idf_
+ return None
+
+ @classmethod
+ # pylint: disable=W0212
+ def from_path(cls, path, **shared):
+ import numpy as np
+ import scipy.sparse as sp
+ from sklearn.feature_extraction.text import (
+ TfidfTransformer, TfidfVectorizer as SklearnTfidfVectorizer)
+
+ path = Path(path)
+
+ model_path = path / "vectorizer.json"
+ if not model_path.exists():
+ raise LoadingError("Missing vectorizer model file: %s"
+ % model_path.name)
+ with model_path.open("r", encoding="utf-8") as f:
+ vectorizer_dict = json.load(f)
+
+ vectorizer = cls(vectorizer_dict["config"], **shared)
+ vectorizer._language = vectorizer_dict["language_code"]
+
+ builtin_entity_scope = vectorizer_dict["builtin_entity_scope"]
+ if builtin_entity_scope is not None:
+ builtin_entity_scope = set(builtin_entity_scope)
+ vectorizer.builtin_entity_scope = builtin_entity_scope
+
+ vectorizer_ = vectorizer_dict["vectorizer"]
+ if vectorizer_:
+ vocab = vectorizer_["vocab"]
+ idf_diag_data = vectorizer_["idf_diag"]
+ idf_diag_data = np.array(idf_diag_data)
+
+ idf_diag_shape = (len(idf_diag_data), len(idf_diag_data))
+ row = list(range(idf_diag_shape[0]))
+ col = list(range(idf_diag_shape[0]))
+ idf_diag = sp.csr_matrix(
+ (idf_diag_data, (row, col)), shape=idf_diag_shape)
+
+ tfidf_transformer = TfidfTransformer()
+ tfidf_transformer._idf_diag = idf_diag
+
+ vectorizer_ = SklearnTfidfVectorizer(
+ tokenizer=lambda x: tokenize_light(x, vectorizer._language))
+ vectorizer_.vocabulary_ = vocab
+
+ vectorizer_._tfidf = tfidf_transformer
+
+ vectorizer._tfidf_vectorizer = vectorizer_
+ return vectorizer
+
+
+@ProcessingUnit.register("cooccurrence_vectorizer")
+class CooccurrenceVectorizer(ProcessingUnit):
+ """Featurizer that takes utterances and extracts ordered word cooccurrence
+ features matrix from them"""
+
+ config_type = CooccurrenceVectorizerConfig
+
+ def __init__(self, config=None, **shared):
+ super(CooccurrenceVectorizer, self).__init__(config, **shared)
+ self._word_pairs = None
+ self._language = None
+ self.builtin_entity_scope = None
+
+ @property
+ def language(self):
+ # Create this getter to prevent the language from being set elsewhere
+ # than in the fit
+ return self._language
+
+ @property
+ def word_pairs(self):
+ return self._word_pairs
+
+ @property
+ def fitted(self):
+ """Whether or not the vectorizer is fitted"""
+ return self.word_pairs is not None
+
+ @fitted_required
+ def transform(self, x):
+ """Computes the cooccurrence feature matrix.
+
+ Args:
+ x (list of dict): list of utterances
+
+ Returns:
+ :class:`.scipy.sparse.csr_matrix`: A sparse matrix X of shape
+ (len(x), len(self.word_pairs)) where X[i, j] = 1.0 if
+ x[i][0] contains the words cooccurrence (w1, w2) and if
+ self.word_pairs[(w1, w2)] = j
+
+ Raises:
+ NotTrained: when the vectorizer is not fitted
+ """
+ import numpy as np
+ import scipy.sparse as sp
+
+ preprocessed = self._preprocess(x)
+ utterances = [
+ self._enrich_utterance(utterance, builtin_ents, custom_ent)
+ for utterance, builtin_ents, custom_ent in zip(*preprocessed)]
+
+ x_coo = sp.dok_matrix((len(x), len(self.word_pairs)), dtype=np.int32)
+ for i, u in enumerate(utterances):
+ for p in self._extract_word_pairs(u):
+ if p in self.word_pairs:
+ x_coo[i, self.word_pairs[p]] = 1
+
+ return x_coo.tocsr()
+
+ def _preprocess(self, x):
+ # Extract all entities on unnormalized data
+ builtin_ents = [
+ self.builtin_entity_parser.parse(
+ get_text_from_chunks(u[DATA]),
+ self.builtin_entity_scope,
+ use_cache=True
+ ) for u in x
+ ]
+ custom_ents = [
+ self.custom_entity_parser.parse(
+ get_text_from_chunks(u[DATA]), use_cache=True)
+ for u in x
+ ]
+ return x, builtin_ents, custom_ents
+
+ def _extract_word_pairs(self, utterance):
+ if self.config.filter_stop_words:
+ stop_words = get_stop_words(self.resources)
+ utterance = [t for t in utterance if t not in stop_words]
+ pairs = set()
+ for j, w1 in enumerate(utterance):
+ max_index = None
+ if self.config.window_size is not None:
+ max_index = j + self.config.window_size + 1
+ for w2 in utterance[j + 1:max_index]:
+ key = (w1, w2)
+ if not self.config.keep_order:
+ key = tuple(sorted(key))
+ pairs.add(key)
+ return pairs
+
+ def _enrich_utterance(self, x, builtin_ents, custom_ents):
+ utterance = get_text_from_chunks(x[DATA])
+ all_entities = builtin_ents + custom_ents
+ placeholder_fn = self._placeholder_fn
+ # Replace entities with placeholders
+ enriched_utterance = replace_entities_with_placeholders(
+ utterance, all_entities, placeholder_fn)[1]
+ # Tokenize
+ enriched_utterance = tokenize_light(enriched_utterance, self.language)
+ # Remove the unknownword strings if needed
+ if self.config.unknown_words_replacement_string:
+ enriched_utterance = [
+ t for t in enriched_utterance
+ if t != self.config.unknown_words_replacement_string
+ ]
+ return enriched_utterance
+
+ def _extract_word_pairs(self, utterance):
+ if self.config.filter_stop_words:
+ stop_words = get_stop_words(self.resources)
+ utterance = [t for t in utterance if t not in stop_words]
+ pairs = set()
+ for j, w1 in enumerate(utterance):
+ max_index = None
+ if self.config.window_size is not None:
+ max_index = j + self.config.window_size + 1
+ for w2 in utterance[j + 1:max_index]:
+ key = (w1, w2)
+ if not self.config.keep_order:
+ key = tuple(sorted(key))
+ pairs.add(key)
+ return pairs
+
+ def _placeholder_fn(self, entity_name):
+ return "".join(
+ tokenize_light(str(entity_name), str(self.language))).upper()
+
+ @classmethod
+ # pylint: disable=protected-access
+ def from_path(cls, path, **shared):
+ path = Path(path)
+ model_path = path / "vectorizer.json"
+ if not model_path.exists():
+ raise LoadingError("Missing vectorizer model file: %s"
+ % model_path.name)
+
+ with model_path.open(encoding="utf8") as f:
+ vectorizer_dict = json.load(f)
+ config = vectorizer_dict.pop("config")
+
+ self = cls(config, **shared)
+ self._language = vectorizer_dict["language_code"]
+ self._word_pairs = None
+
+ builtin_entity_scope = vectorizer_dict["builtin_entity_scope"]
+ if builtin_entity_scope is not None:
+ builtin_entity_scope = set(builtin_entity_scope)
+ self.builtin_entity_scope = builtin_entity_scope
+
+ if vectorizer_dict["word_pairs"]:
+ self._word_pairs = {
+ tuple(p): int(i)
+ for i, p in iteritems(vectorizer_dict["word_pairs"])
+ }
+ return self
+
+def _entity_name_to_feature(entity_name, language):
+ return "entityfeature%s" % "".join(tokenize_light(
+ entity_name.lower(), language))
+
+
+def _builtin_entity_to_feature(builtin_entity_label, language):
+ return "builtinentityfeature%s" % "".join(tokenize_light(
+ builtin_entity_label.lower(), language))
+
+
+def _normalize_stem(text, language, resources, use_stemming):
+ from snips_nlu_utils import normalize
+
+ if use_stemming:
+ return stem(text, language, resources)
+ return normalize(text)
+
+
+def _get_word_cluster_features(query_tokens, clusters_name, resources):
+ if not clusters_name:
+ return []
+ ngrams = get_all_ngrams(query_tokens)
+ cluster_features = []
+ for ngram in ngrams:
+ cluster = get_word_cluster(resources, clusters_name).get(
+ ngram[NGRAM].lower(), None)
+ if cluster is not None:
+ cluster_features.append(cluster)
+ return cluster_features