1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
|
from __future__ import unicode_literals
import json
import logging
from builtins import str
from pathlib import Path
from future.utils import itervalues
from snips_inference_agl.__about__ import __model_version__, __version__
from snips_inference_agl.common.log_utils import log_elapsed_time
from snips_inference_agl.common.utils import (fitted_required)
from snips_inference_agl.constants import (
AUTOMATICALLY_EXTENSIBLE, BUILTIN_ENTITY_PARSER, CUSTOM_ENTITY_PARSER,
ENTITIES, ENTITY_KIND, LANGUAGE, RESOLVED_VALUE, RES_ENTITY,
RES_INTENT, RES_INTENT_NAME, RES_MATCH_RANGE, RES_PROBA, RES_SLOTS,
RES_VALUE, RESOURCES, BYPASS_VERSION_CHECK)
# from snips_inference_agl.dataset import validate_and_format_dataset
from snips_inference_agl.entity_parser import CustomEntityParser
from snips_inference_agl.entity_parser.builtin_entity_parser import (
BuiltinEntityParser, is_builtin_entity)
from snips_inference_agl.exceptions import (
InvalidInputError, IntentNotFoundError, LoadingError,
IncompatibleModelError)
from snips_inference_agl.intent_parser import IntentParser
from snips_inference_agl.pipeline.configs import NLUEngineConfig
from snips_inference_agl.pipeline.processing_unit import ProcessingUnit
from snips_inference_agl.resources import load_resources_from_dir
from snips_inference_agl.result import (
builtin_slot, custom_slot, empty_result, extraction_result, is_empty,
parsing_result)
logger = logging.getLogger(__name__)
@ProcessingUnit.register("nlu_engine")
class SnipsNLUEngine(ProcessingUnit):
"""Main class to use for intent parsing
A :class:`SnipsNLUEngine` relies on a list of :class:`.IntentParser`
object to parse intents, by calling them successively using the first
positive output.
With the default parameters, it will use the two following intent parsers
in this order:
- a :class:`.DeterministicIntentParser`
- a :class:`.ProbabilisticIntentParser`
The logic behind is to first use a conservative parser which has a very
good precision while its recall is modest, so simple patterns will be
caught, and then fallback on a second parser which is machine-learning
based and will be able to parse unseen utterances while ensuring a good
precision and recall.
"""
config_type = NLUEngineConfig
def __init__(self, config=None, **shared):
"""The NLU engine can be configured by passing a
:class:`.NLUEngineConfig`"""
super(SnipsNLUEngine, self).__init__(config, **shared)
self.intent_parsers = []
"""list of :class:`.IntentParser`"""
self.dataset_metadata = None
@classmethod
def default_config(cls):
# Do not use the global default config, and use per-language default
# configs instead
return None
@property
def fitted(self):
"""Whether or not the nlu engine has already been fitted"""
return self.dataset_metadata is not None
@log_elapsed_time(logger, logging.DEBUG, "Parsed input in {elapsed_time}")
@fitted_required
def parse(self, text, intents=None, top_n=None):
"""Performs intent parsing on the provided *text* by calling its intent
parsers successively
Args:
text (str): Input
intents (str or list of str, optional): If provided, reduces the
scope of intent parsing to the provided list of intents.
The ``None`` intent is never filtered out, meaning that it can
be returned even when using an intents scope.
top_n (int, optional): when provided, this method will return a
list of at most ``top_n`` most likely intents, instead of a
single parsing result.
Note that the returned list can contain less than ``top_n``
elements, for instance when the parameter ``intents`` is not
None, or when ``top_n`` is greater than the total number of
intents.
Returns:
dict or list: the most likely intent(s) along with the extracted
slots. See :func:`.parsing_result` and :func:`.extraction_result`
for the output format.
Raises:
NotTrained: When the nlu engine is not fitted
InvalidInputError: When input type is not unicode
"""
if not isinstance(text, str):
raise InvalidInputError("Expected unicode but received: %s"
% type(text))
if isinstance(intents, str):
intents = {intents}
elif isinstance(intents, list):
intents = set(intents)
if intents is not None:
for intent in intents:
if intent not in self.dataset_metadata["slot_name_mappings"]:
raise IntentNotFoundError(intent)
if top_n is None:
none_proba = 0.0
for parser in self.intent_parsers:
res = parser.parse(text, intents)
if is_empty(res):
none_proba = res[RES_INTENT][RES_PROBA]
continue
resolved_slots = self._resolve_slots(text, res[RES_SLOTS])
return parsing_result(text, intent=res[RES_INTENT],
slots=resolved_slots)
return empty_result(text, none_proba)
intents_results = self.get_intents(text)
if intents is not None:
intents_results = [res for res in intents_results
if res[RES_INTENT_NAME] is None
or res[RES_INTENT_NAME] in intents]
intents_results = intents_results[:top_n]
results = []
for intent_res in intents_results:
slots = self.get_slots(text, intent_res[RES_INTENT_NAME])
results.append(extraction_result(intent_res, slots))
return results
@log_elapsed_time(logger, logging.DEBUG, "Got intents in {elapsed_time}")
@fitted_required
def get_intents(self, text):
"""Performs intent classification on the provided *text* and returns
the list of intents ordered by decreasing probability
The length of the returned list is exactly the number of intents in the
dataset + 1 for the None intent
.. note::
The probabilities returned along with each intent are not
guaranteed to sum to 1.0. They should be considered as scores
between 0 and 1.
"""
results = None
for parser in self.intent_parsers:
parser_results = parser.get_intents(text)
if results is None:
results = {res[RES_INTENT_NAME]: res for res in parser_results}
continue
for res in parser_results:
intent = res[RES_INTENT_NAME]
proba = max(res[RES_PROBA], results[intent][RES_PROBA])
results[intent][RES_PROBA] = proba
return sorted(itervalues(results), key=lambda res: -res[RES_PROBA])
@log_elapsed_time(logger, logging.DEBUG, "Parsed slots in {elapsed_time}")
@fitted_required
def get_slots(self, text, intent):
"""Extracts slots from a text input, with the knowledge of the intent
Args:
text (str): input
intent (str): the intent which the input corresponds to
Returns:
list: the list of extracted slots
Raises:
IntentNotFoundError: When the intent was not part of the training
data
InvalidInputError: When input type is not unicode
"""
if not isinstance(text, str):
raise InvalidInputError("Expected unicode but received: %s"
% type(text))
if intent is None:
return []
if intent not in self.dataset_metadata["slot_name_mappings"]:
raise IntentNotFoundError(intent)
for parser in self.intent_parsers:
slots = parser.get_slots(text, intent)
if not slots:
continue
return self._resolve_slots(text, slots)
return []
@classmethod
def from_path(cls, path, **shared):
"""Loads a :class:`SnipsNLUEngine` instance from a directory path
The data at the given path must have been generated using
:func:`~SnipsNLUEngine.persist`
Args:
path (str): The path where the nlu engine is stored
Raises:
LoadingError: when some files are missing
IncompatibleModelError: when trying to load an engine model which
is not compatible with the current version of the lib
"""
directory_path = Path(path)
model_path = directory_path / "nlu_engine.json"
if not model_path.exists():
raise LoadingError("Missing nlu engine model file: %s"
% model_path.name)
with model_path.open(encoding="utf8") as f:
model = json.load(f)
model_version = model.get("model_version")
if model_version is None or model_version != __model_version__:
bypass_version_check = shared.get(BYPASS_VERSION_CHECK, False)
if bypass_version_check:
logger.warning(
"Incompatible model version found. The library expected "
"'%s' but the loaded engine is '%s'. The NLU engine may "
"not load correctly.", __model_version__, model_version)
else:
raise IncompatibleModelError(model_version)
dataset_metadata = model["dataset_metadata"]
if shared.get(RESOURCES) is None and dataset_metadata is not None:
language = dataset_metadata["language_code"]
resources_dir = directory_path / "resources" / language
if resources_dir.is_dir():
resources = load_resources_from_dir(resources_dir)
shared[RESOURCES] = resources
if shared.get(BUILTIN_ENTITY_PARSER) is None:
path = model["builtin_entity_parser"]
if path is not None:
parser_path = directory_path / path
shared[BUILTIN_ENTITY_PARSER] = BuiltinEntityParser.from_path(
parser_path)
if shared.get(CUSTOM_ENTITY_PARSER) is None:
path = model["custom_entity_parser"]
if path is not None:
parser_path = directory_path / path
shared[CUSTOM_ENTITY_PARSER] = CustomEntityParser.from_path(
parser_path)
config = cls.config_type.from_dict(model["config"])
nlu_engine = cls(config=config, **shared)
nlu_engine.dataset_metadata = dataset_metadata
intent_parsers = []
for parser_idx, parser_name in enumerate(model["intent_parsers"]):
parser_config = config.intent_parsers_configs[parser_idx]
intent_parser_path = directory_path / parser_name
intent_parser = IntentParser.load_from_path(
intent_parser_path, parser_config.unit_name, **shared)
intent_parsers.append(intent_parser)
nlu_engine.intent_parsers = intent_parsers
return nlu_engine
def _resolve_slots(self, text, slots):
builtin_scope = [slot[RES_ENTITY] for slot in slots
if is_builtin_entity(slot[RES_ENTITY])]
custom_scope = [slot[RES_ENTITY] for slot in slots
if not is_builtin_entity(slot[RES_ENTITY])]
# Do not use cached entities here as datetimes must be computed using
# current context
builtin_entities = self.builtin_entity_parser.parse(
text, builtin_scope, use_cache=False)
custom_entities = self.custom_entity_parser.parse(
text, custom_scope, use_cache=True)
resolved_slots = []
for slot in slots:
entity_name = slot[RES_ENTITY]
raw_value = slot[RES_VALUE]
is_builtin = is_builtin_entity(entity_name)
if is_builtin:
entities = builtin_entities
parser = self.builtin_entity_parser
slot_builder = builtin_slot
use_cache = False
extensible = False
else:
entities = custom_entities
parser = self.custom_entity_parser
slot_builder = custom_slot
use_cache = True
extensible = self.dataset_metadata[ENTITIES][entity_name][
AUTOMATICALLY_EXTENSIBLE]
resolved_slot = None
for ent in entities:
if ent[ENTITY_KIND] == entity_name and \
ent[RES_MATCH_RANGE] == slot[RES_MATCH_RANGE]:
resolved_slot = slot_builder(slot, ent[RESOLVED_VALUE])
break
if resolved_slot is None:
matches = parser.parse(
raw_value, scope=[entity_name], use_cache=use_cache)
if matches:
match = matches[0]
if is_builtin or len(match[RES_VALUE]) == len(raw_value):
resolved_slot = slot_builder(
slot, match[RESOLVED_VALUE])
if resolved_slot is None and extensible:
resolved_slot = slot_builder(slot)
if resolved_slot is not None:
resolved_slots.append(resolved_slot)
return resolved_slots
|