From 8b67b96d2fe724e59c4618417ab81b8cc1daa4d6 Mon Sep 17 00:00:00 2001 From: kenkeiras Date: Sun, 15 Apr 2018 22:15:28 +0200 Subject: [PATCH] Separate tokenization module. --- naive-nlu/tree_nlu/knowledge_base.py | 7 +- naive-nlu/tree_nlu/parsing.py | 187 +-------------------------- naive-nlu/tree_nlu/tokenization.py | 186 ++++++++++++++++++++++++++ 3 files changed, 192 insertions(+), 188 deletions(-) create mode 100644 naive-nlu/tree_nlu/tokenization.py diff --git a/naive-nlu/tree_nlu/knowledge_base.py b/naive-nlu/tree_nlu/knowledge_base.py index 8e12f5e..389a70a 100644 --- a/naive-nlu/tree_nlu/knowledge_base.py +++ b/naive-nlu/tree_nlu/knowledge_base.py @@ -5,6 +5,7 @@ from .session.org_mode import global_session as session from .atoms import Atom from . import parsing +from . import tokenization from . import knowledge_evaluation from .modifiable_property import is_modifiable_property import random @@ -63,7 +64,7 @@ class KnowledgeBase(object): def train_tokenizer(self, example): with session().log('Training tokenizer'): session().annotate("Example: {}".format(example)) - tokens = parsing.integrate_tokenization(self, example) + tokens = tokenization.integrate_tokenization(self, example) # Integrate knowledge of concept for token in tokens: @@ -115,11 +116,11 @@ class KnowledgeBase(object): def tokenize(self, row, return_one=True): row = row.lower() with session().log("Tokenize: {}".format(row)): - options = list(parsing.to_tokens(self, row)) + options = list(tokenization.to_tokens(self, row)) session().log("Results:\n{}".format('\n'.join(map(str, options)))) if return_one: - chosen = parsing.pick_one_tokenization(options, self) + chosen = tokenization.pick_one_tokenization(options, self) session().log("Chosen: “{}”".format(chosen)) self.train_tokenizer({'text': row, 'tokens': chosen}) return chosen diff --git a/naive-nlu/tree_nlu/parsing.py b/naive-nlu/tree_nlu/parsing.py index 1705286..f22a4ce 100644 --- a/naive-nlu/tree_nlu/parsing.py +++ b/naive-nlu/tree_nlu/parsing.py @@ -1,6 +1,7 @@ #!/usr/bin/env python from . import knowledge_evaluation +from . import tokenization from . import depth_meter from .session.org_mode import global_session as session @@ -13,190 +14,6 @@ from .modifiable_property import ModifiableProperty from . import parameters from .atoms import Atom, a, is_atom -def lookahead_for_tokens_or_strucutral_elements(knowledge_base, remaining): - for se in knowledge_base.structural_elements: - found_position = remaining.find(se) - found = found_position >= 0 - session().annotate('Looking for structure with “{}”, found? {}'.format(se, found)) - if found: - return [ - (remaining[:found_position], se, remaining[found_position + len(se):]) - ] - - for token in knowledge_base.knowledge.keys(): - found_position = remaining.find(token) - found = found_position >= 0 - session().annotate('Looking for token “{}”, found? {}'.format(token, found)) - if found: - return [ - (remaining[:found_position], token, remaining[found_position + len(token):]) - ] - - return None - - - -def to_tokens(knowledge_base, text, precedent=None): - if len(text) == 0: - session().annotate("No text remaining") - yield [''] - return - - with session().log("Tokenizing {}".format(text)): - for option in knowledge_base.expected_token_after_precedent(precedent): - with session().log("Next: “{}”".format(option)): - with session().log("Matching “{}” on “{}”".format(option, text)): - for token_match in tokenization_match(option, text, knowledge_base): - if token_match is None: - session().annotate("No match") - - match, remaining = token_match - if len(remaining) == len(text): - raise Exception('No text consumed in match') - - session().annotate('Match: “{}”'.format(match)) - with session().log('Remaining “{}”'.format(remaining)): - for sublevel in to_tokens(knowledge_base, remaining, match): - candidate = list(filter(lambda x: x != '', [match] + sublevel)) - session().annotate('Yielding candidate “{}”'.format(candidate)) - yield candidate - - -def tokenization_match(element, text, knowledge_base): - # Constant/structural string matching - if isinstance(element, str): - if text.find(element) == 0: - # This match comes from a structuring element - # It doesn't appear on the tokenization - # So we should return it as an empty string - yield ('', text[len(element):]) - return - else: - # No match found - return - - elif is_atom(element, 'token'): - yield from match_single_token(text, knowledge_base) - return - raise NotImplementedError() - - -def match_single_token(text, knowledge_base): - found_token = False - for token in knowledge_base.knowledge.keys(): - if text.find(token) == 0: - yield token, text[len(token):] - found_token = True - - if found_token: - return - - session().annotate('No token found at the start of ”{}”'.format(text)) - session().annotate('using structural elements to infer it') - # TODO: review this when multiple structural elements are available - for se in knowledge_base.structural_elements: - session().annotate('Looking for se “{}” in “{}”'.format(se, text)) - position = text.find(se, 0) - found = position > 0 # 0 is not considered a valid position for this kind of split - if found: - session().annotate('Found ”{}”, inferring “{}”'.format(se, text[:position])) - yield text[:position], text[position:] - - session().annotate('No structural element or token found, inferring only token remaining') - yield text, '' - - # Using other tokens for cutoff - for token in knowledge_base.knowledge.keys(): - session().annotate('Looking for token “{}” in “{}”'.format(token, text)) - position = text.find(token) - found = position >= 0 - if found: - session().annotate('Found ”{}”, in position ”{}”'.format(token, position)) - yield text[:position], text[position:] - - -def integrate_tokenization(knowledge_base, example): - text = example['text'] - tokens = example['tokens'] - meaning = example.get('meaning') - - return integrate_token_to_text_matching(knowledge_base, text, tokens) - - -def integrate_token_to_text_matching(knowledge_base, text, tokens): - texts = [text] - - # Convert to tokens - for token_id, token in enumerate(tokens): - # Look for token in texts - for i, text in enumerate(texts): - if isinstance(text, int): - continue - - if token in text: - before, after = text.split(token, maxsplit=1) - texts = (texts[:i] + [before] - + [a('token')] - + [after] + texts[i + 1:]) - break - else: - raise Exception('Token not found') - - # Remove leftovers from splits - texts = list(filter(lambda x: x != '', texts)) - session().log("Tokenized as {} over {}".format(texts, tokens)) - - for i, element in enumerate(texts[:-1]): - learn_token_pair(element, texts[i + 1], knowledge_base) - - return tokens - -def learn_token_pair(precedent, consequent, knowledge_base): - knowledge_base.add_token_pair(precedent, consequent) - - -def pick_one_tokenization(options, knowledge_base): - ''' - Heuristic function to pick the most probable tokenization. - - Just pick the one with more results. - ''' - options = list(options) - with session().log("Picking among: {} options".format(len(options))): - session().log("Options: \n{}".format('\n'.join(map(str, options)))) - return pick_by_score(options, - [ - # By number of splits without structuring elements - lambda tokenization: sum(map( - lambda split: sum(map( - lambda se: se in split, knowledge_base.structural_elements - )), tokenization)), - - # By number of unknown tokens - lambda tokenization: len(list(filter(lambda token: - (token not in knowledge_base.knowledge.keys()) and - (token not in knowledge_base.structural_elements), - tokenization))), - - # By number of splits - lambda tokenization: -len(tokenization), - ]) - -def pick_by_score(options, heuristics): - for heuristic in heuristics: - assert(len(options) > 0) - options = list(map(lambda opt: (heuristic(opt), opt), options)) - sorted_options = sorted(options, key=lambda x: x[0], reverse=False) - - heuristic_cutoff = sorted_options[0][0] - session().annotate(sorted_options) - pass_heuristic = [opt for (score, opt) in sorted_options if score <= heuristic_cutoff] - options = pass_heuristic - - session().log("{} finalists: \n{}".format(len(options), '\n'.join(map(str, options)))) - return options[0] - - def make_template(knowledge_base, tokens, parsed): matcher = list(tokens) template = list(parsed) @@ -267,7 +84,7 @@ def integrate_language(knowledge_base, example): parsed = example["parsed"] resolved_parsed = copy.deepcopy(parsed) - tokens = list(pick_one_tokenization(to_tokens(knowledge_base, text), knowledge_base)) + tokens = list(tokenization.pick_one_tokenization(tokenization.to_tokens(knowledge_base, text), knowledge_base)) while True: session().annotate("P: {}".format(resolved_parsed)) diff --git a/naive-nlu/tree_nlu/tokenization.py b/naive-nlu/tree_nlu/tokenization.py new file mode 100644 index 0000000..7322cb5 --- /dev/null +++ b/naive-nlu/tree_nlu/tokenization.py @@ -0,0 +1,186 @@ +from .session.org_mode import global_session as session +from .atoms import Atom, a, is_atom + +def lookahead_for_tokens_or_strucutral_elements(knowledge_base, remaining): + for se in knowledge_base.structural_elements: + found_position = remaining.find(se) + found = found_position >= 0 + session().annotate('Looking for structure with “{}”, found? {}'.format(se, found)) + if found: + return [ + (remaining[:found_position], se, remaining[found_position + len(se):]) + ] + + for token in knowledge_base.knowledge.keys(): + found_position = remaining.find(token) + found = found_position >= 0 + session().annotate('Looking for token “{}”, found? {}'.format(token, found)) + if found: + return [ + (remaining[:found_position], token, remaining[found_position + len(token):]) + ] + + return None + + + +def to_tokens(knowledge_base, text, precedent=None): + if len(text) == 0: + session().annotate("No text remaining") + yield [''] + return + + with session().log("Tokenizing {}".format(text)): + for option in knowledge_base.expected_token_after_precedent(precedent): + with session().log("Next: “{}”".format(option)): + with session().log("Matching “{}” on “{}”".format(option, text)): + for token_match in tokenization_match(option, text, knowledge_base): + if token_match is None: + session().annotate("No match") + + match, remaining = token_match + if len(remaining) == len(text): + raise Exception('No text consumed in match') + + session().annotate('Match: “{}”'.format(match)) + with session().log('Remaining “{}”'.format(remaining)): + for sublevel in to_tokens(knowledge_base, remaining, match): + candidate = list(filter(lambda x: x != '', [match] + sublevel)) + session().annotate('Yielding candidate “{}”'.format(candidate)) + yield candidate + + +def tokenization_match(element, text, knowledge_base): + # Constant/structural string matching + if isinstance(element, str): + if text.find(element) == 0: + # This match comes from a structuring element + # It doesn't appear on the tokenization + # So we should return it as an empty string + yield ('', text[len(element):]) + return + else: + # No match found + return + + elif is_atom(element, 'token'): + yield from match_single_token(text, knowledge_base) + return + raise NotImplementedError() + + +def match_single_token(text, knowledge_base): + found_token = False + for token in knowledge_base.knowledge.keys(): + if text.find(token) == 0: + yield token, text[len(token):] + found_token = True + + if found_token: + return + + session().annotate('No token found at the start of ”{}”'.format(text)) + session().annotate('using structural elements to infer it') + # TODO: review this when multiple structural elements are available + for se in knowledge_base.structural_elements: + session().annotate('Looking for se “{}” in “{}”'.format(se, text)) + position = text.find(se, 0) + found = position > 0 # 0 is not considered a valid position for this kind of split + if found: + session().annotate('Found ”{}”, inferring “{}”'.format(se, text[:position])) + yield text[:position], text[position:] + + session().annotate('No structural element or token found, inferring only token remaining') + yield text, '' + + # Using other tokens for cutoff + for token in knowledge_base.knowledge.keys(): + session().annotate('Looking for token “{}” in “{}”'.format(token, text)) + position = text.find(token) + found = position >= 0 + if found: + session().annotate('Found ”{}”, in position ”{}”'.format(token, position)) + yield text[:position], text[position:] + + +def integrate_tokenization(knowledge_base, example): + text = example['text'] + tokens = example['tokens'] + meaning = example.get('meaning') + + return integrate_token_to_text_matching(knowledge_base, text, tokens) + + +def integrate_token_to_text_matching(knowledge_base, text, tokens): + texts = [text] + + # Convert to tokens + for token_id, token in enumerate(tokens): + # Look for token in texts + for i, text in enumerate(texts): + if isinstance(text, int): + continue + + if token in text: + before, after = text.split(token, maxsplit=1) + texts = (texts[:i] + [before] + + [a('token')] + + [after] + texts[i + 1:]) + break + else: + raise Exception('Token not found') + + # Remove leftovers from splits + texts = list(filter(lambda x: x != '', texts)) + session().log("Tokenized as {} over {}".format(texts, tokens)) + + for i, element in enumerate(texts[:-1]): + learn_token_pair(element, texts[i + 1], knowledge_base) + + return tokens + +def learn_token_pair(precedent, consequent, knowledge_base): + knowledge_base.add_token_pair(precedent, consequent) + + +def pick_one_tokenization(options, knowledge_base): + ''' + Heuristic function to pick the most probable tokenization. + + Just pick the one with more results. + ''' + options = list(options) + with session().log("Picking among: {} options".format(len(options))): + session().log("Options: \n{}".format('\n'.join(map(str, options)))) + return pick_by_score(options, + [ + # By number of splits without structuring elements + lambda tokenization: sum(map( + lambda split: sum(map( + lambda se: se in split, knowledge_base.structural_elements + )), tokenization)), + + # By number of unknown tokens + lambda tokenization: len(list(filter(lambda token: + (token not in knowledge_base.knowledge.keys()) and + (token not in knowledge_base.structural_elements), + tokenization))), + + # By number of splits + lambda tokenization: -len(tokenization), + ]) + +def pick_by_score(options, heuristics): + for heuristic in heuristics: + assert(len(options) > 0) + options = list(map(lambda opt: (heuristic(opt), opt), options)) + sorted_options = sorted(options, key=lambda x: x[0], reverse=False) + + heuristic_cutoff = sorted_options[0][0] + session().annotate(sorted_options) + pass_heuristic = [opt for (score, opt) in sorted_options if score <= heuristic_cutoff] + options = pass_heuristic + + session().log("{} finalists: \n{}".format(len(options), '\n'.join(map(str, options)))) + return options[0] +