aboutsummaryrefslogtreecommitdiffstats
path: root/snips_inference_agl/nlu_engine/nlu_engine.py
blob: 3f19aba48fa61a1334ed359ae881e4e17f6ad01f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
from __future__ import unicode_literals

import json
import logging
from builtins import str
from pathlib import Path

from future.utils import itervalues

from snips_inference_agl.__about__ import __model_version__, __version__
from snips_inference_agl.common.log_utils import log_elapsed_time
from snips_inference_agl.common.utils import (fitted_required)
from snips_inference_agl.constants import (
    AUTOMATICALLY_EXTENSIBLE, BUILTIN_ENTITY_PARSER, CUSTOM_ENTITY_PARSER,
    ENTITIES, ENTITY_KIND, LANGUAGE, RESOLVED_VALUE, RES_ENTITY,
    RES_INTENT, RES_INTENT_NAME, RES_MATCH_RANGE, RES_PROBA, RES_SLOTS,
    RES_VALUE, RESOURCES, BYPASS_VERSION_CHECK)
# from snips_inference_agl.dataset import validate_and_format_dataset
from snips_inference_agl.entity_parser import CustomEntityParser
from snips_inference_agl.entity_parser.builtin_entity_parser import (
    BuiltinEntityParser, is_builtin_entity)
from snips_inference_agl.exceptions import (
    InvalidInputError, IntentNotFoundError, LoadingError,
    IncompatibleModelError)
from snips_inference_agl.intent_parser import IntentParser
from snips_inference_agl.pipeline.configs import NLUEngineConfig
from snips_inference_agl.pipeline.processing_unit import ProcessingUnit
from snips_inference_agl.resources import load_resources_from_dir
from snips_inference_agl.result import (
    builtin_slot, custom_slot, empty_result, extraction_result, is_empty,
    parsing_result)

logger = logging.getLogger(__name__)


@ProcessingUnit.register("nlu_engine")
class SnipsNLUEngine(ProcessingUnit):
    """Main class to use for intent parsing

    A :class:`SnipsNLUEngine` relies on a list of :class:`.IntentParser`
    object to parse intents, by calling them successively using the first
    positive output.

    With the default parameters, it will use the two following intent parsers
    in this order:

    - a :class:`.DeterministicIntentParser`
    - a :class:`.ProbabilisticIntentParser`

    The logic behind is to first use a conservative parser which has a very
    good precision while its recall is modest, so simple patterns will be
    caught, and then fallback on a second parser which is machine-learning
    based and will be able to parse unseen utterances while ensuring a good
    precision and recall.
    """

    config_type = NLUEngineConfig

    def __init__(self, config=None, **shared):
        """The NLU engine can be configured by passing a
        :class:`.NLUEngineConfig`"""
        super(SnipsNLUEngine, self).__init__(config, **shared)
        self.intent_parsers = []
        """list of :class:`.IntentParser`"""
        self.dataset_metadata = None

    @classmethod
    def default_config(cls):
        # Do not use the global default config, and use per-language default
        # configs instead
        return None
    
    @property
    def fitted(self):
        """Whether or not the nlu engine has already been fitted"""
        return self.dataset_metadata is not None

    @log_elapsed_time(logger, logging.DEBUG, "Parsed input in {elapsed_time}")
    @fitted_required
    def parse(self, text, intents=None, top_n=None):
        """Performs intent parsing on the provided *text* by calling its intent
        parsers successively

        Args:
            text (str): Input
            intents (str or list of str, optional): If provided, reduces the
                scope of intent parsing to the provided list of intents.
                The ``None`` intent is never filtered out, meaning that it can
                be returned even when using an intents scope.
            top_n (int, optional): when provided, this method will return a
                list of at most ``top_n`` most likely intents, instead of a
                single parsing result.
                Note that the returned list can contain less than ``top_n``
                elements, for instance when the parameter ``intents`` is not
                None, or when ``top_n`` is greater than the total number of
                intents.

        Returns:
            dict or list: the most likely intent(s) along with the extracted
            slots. See :func:`.parsing_result` and :func:`.extraction_result`
            for the output format.

        Raises:
            NotTrained: When the nlu engine is not fitted
            InvalidInputError: When input type is not unicode
        """
        if not isinstance(text, str):
            raise InvalidInputError("Expected unicode but received: %s"
                                    % type(text))

        if isinstance(intents, str):
            intents = {intents}
        elif isinstance(intents, list):
            intents = set(intents)

        if intents is not None:
            for intent in intents:
                if intent not in self.dataset_metadata["slot_name_mappings"]:
                    raise IntentNotFoundError(intent)

        if top_n is None:
            none_proba = 0.0
            for parser in self.intent_parsers:
                res = parser.parse(text, intents)
                if is_empty(res):
                    none_proba = res[RES_INTENT][RES_PROBA]
                    continue
                resolved_slots = self._resolve_slots(text, res[RES_SLOTS])
                return parsing_result(text, intent=res[RES_INTENT],
                                      slots=resolved_slots)
            return empty_result(text, none_proba)

        intents_results = self.get_intents(text)
        if intents is not None:
            intents_results = [res for res in intents_results
                               if res[RES_INTENT_NAME] is None
                               or res[RES_INTENT_NAME] in intents]
        intents_results = intents_results[:top_n]
        results = []
        for intent_res in intents_results:
            slots = self.get_slots(text, intent_res[RES_INTENT_NAME])
            results.append(extraction_result(intent_res, slots))
        return results

    @log_elapsed_time(logger, logging.DEBUG, "Got intents in {elapsed_time}")
    @fitted_required
    def get_intents(self, text):
        """Performs intent classification on the provided *text* and returns
        the list of intents ordered by decreasing probability

        The length of the returned list is exactly the number of intents in the
        dataset + 1 for the None intent

        .. note::

            The probabilities returned along with each intent are not
            guaranteed to sum to 1.0. They should be considered as scores
            between 0 and 1.
        """
        results = None
        for parser in self.intent_parsers:
            parser_results = parser.get_intents(text)
            if results is None:
                results = {res[RES_INTENT_NAME]: res for res in parser_results}
                continue

            for res in parser_results:
                intent = res[RES_INTENT_NAME]
                proba = max(res[RES_PROBA], results[intent][RES_PROBA])
                results[intent][RES_PROBA] = proba

        return sorted(itervalues(results), key=lambda res: -res[RES_PROBA])

    @log_elapsed_time(logger, logging.DEBUG, "Parsed slots in {elapsed_time}")
    @fitted_required
    def get_slots(self, text, intent):
        """Extracts slots from a text input, with the knowledge of the intent

        Args:
            text (str): input
            intent (str): the intent which the input corresponds to

        Returns:
            list: the list of extracted slots

        Raises:
            IntentNotFoundError: When the intent was not part of the training
                data
            InvalidInputError: When input type is not unicode
        """
        if not isinstance(text, str):
            raise InvalidInputError("Expected unicode but received: %s"
                                    % type(text))

        if intent is None:
            return []

        if intent not in self.dataset_metadata["slot_name_mappings"]:
            raise IntentNotFoundError(intent)

        for parser in self.intent_parsers:
            slots = parser.get_slots(text, intent)
            if not slots:
                continue
            return self._resolve_slots(text, slots)
        return []


    @classmethod
    def from_path(cls, path, **shared):
        """Loads a :class:`SnipsNLUEngine` instance from a directory path

        The data at the given path must have been generated using
        :func:`~SnipsNLUEngine.persist`

        Args:
            path (str): The path where the nlu engine is stored

        Raises:
            LoadingError: when some files are missing
            IncompatibleModelError: when trying to load an engine model which
                is not compatible with the current version of the lib
        """
        directory_path = Path(path)
        model_path = directory_path / "nlu_engine.json"
        if not model_path.exists():
            raise LoadingError("Missing nlu engine model file: %s"
                               % model_path.name)

        with model_path.open(encoding="utf8") as f:
            model = json.load(f)
        model_version = model.get("model_version")
        if model_version is None or model_version != __model_version__:
            bypass_version_check = shared.get(BYPASS_VERSION_CHECK, False)
            if bypass_version_check:
                logger.warning(
                    "Incompatible model version found. The library expected "
                    "'%s' but the loaded engine is '%s'. The NLU engine may "
                    "not load correctly.", __model_version__, model_version)
            else:
                raise IncompatibleModelError(model_version)

        dataset_metadata = model["dataset_metadata"]
        if shared.get(RESOURCES) is None and dataset_metadata is not None:
            language = dataset_metadata["language_code"]
            resources_dir = directory_path / "resources" / language
            if resources_dir.is_dir():
                resources = load_resources_from_dir(resources_dir)
                shared[RESOURCES] = resources

        if shared.get(BUILTIN_ENTITY_PARSER) is None:
            path = model["builtin_entity_parser"]
            if path is not None:
                parser_path = directory_path / path
                shared[BUILTIN_ENTITY_PARSER] = BuiltinEntityParser.from_path(
                    parser_path)

        if shared.get(CUSTOM_ENTITY_PARSER) is None:
            path = model["custom_entity_parser"]
            if path is not None:
                parser_path = directory_path / path
                shared[CUSTOM_ENTITY_PARSER] = CustomEntityParser.from_path(
                    parser_path)

        config = cls.config_type.from_dict(model["config"])
        nlu_engine = cls(config=config, **shared)
        nlu_engine.dataset_metadata = dataset_metadata
        intent_parsers = []
        for parser_idx, parser_name in enumerate(model["intent_parsers"]):
            parser_config = config.intent_parsers_configs[parser_idx]
            intent_parser_path = directory_path / parser_name
            intent_parser = IntentParser.load_from_path(
                intent_parser_path, parser_config.unit_name, **shared)
            intent_parsers.append(intent_parser)
        nlu_engine.intent_parsers = intent_parsers
        return nlu_engine

    def _resolve_slots(self, text, slots):
        builtin_scope = [slot[RES_ENTITY] for slot in slots
                         if is_builtin_entity(slot[RES_ENTITY])]
        custom_scope = [slot[RES_ENTITY] for slot in slots
                        if not is_builtin_entity(slot[RES_ENTITY])]
        # Do not use cached entities here as datetimes must be computed using
        # current context
        builtin_entities = self.builtin_entity_parser.parse(
            text, builtin_scope, use_cache=False)
        custom_entities = self.custom_entity_parser.parse(
            text, custom_scope, use_cache=True)

        resolved_slots = []
        for slot in slots:
            entity_name = slot[RES_ENTITY]
            raw_value = slot[RES_VALUE]
            is_builtin = is_builtin_entity(entity_name)
            if is_builtin:
                entities = builtin_entities
                parser = self.builtin_entity_parser
                slot_builder = builtin_slot
                use_cache = False
                extensible = False
            else:
                entities = custom_entities
                parser = self.custom_entity_parser
                slot_builder = custom_slot
                use_cache = True
                extensible = self.dataset_metadata[ENTITIES][entity_name][
                    AUTOMATICALLY_EXTENSIBLE]

            resolved_slot = None
            for ent in entities:
                if ent[ENTITY_KIND] == entity_name and \
                        ent[RES_MATCH_RANGE] == slot[RES_MATCH_RANGE]:
                    resolved_slot = slot_builder(slot, ent[RESOLVED_VALUE])
                    break
            if resolved_slot is None:
                matches = parser.parse(
                    raw_value, scope=[entity_name], use_cache=use_cache)
                if matches:
                    match = matches[0]
                    if is_builtin or len(match[RES_VALUE]) == len(raw_value):
                        resolved_slot = slot_builder(
                            slot, match[RESOLVED_VALUE])

            if resolved_slot is None and extensible:
                resolved_slot = slot_builder(slot)

            if resolved_slot is not None:
                resolved_slots.append(resolved_slot)

        return resolved_slots