lang-model/naive-nlu/parsing.py

417 lines
13 KiB
Python
Raw Normal View History

#!/usr/bin/env python
import knowledge_evaluation
import re
import copy
from functools import reduce
from typing import List
MAX_RECURSIONS = 10
# TODO: more flexible tokenization
def to_tokens(text):
return re.findall(r'(\w+|[^\s])', text)
def make_template(knowledge_base, tokens, parsed):
matcher = list(tokens)
template = list(parsed)
for i in range(len(matcher)):
word = matcher[i]
if word in template:
template[template.index(word)] = i
matcher[i] = {
'groups': set(knowledge_base.knowledge[word]['groups'])
}
return tokens, matcher, template
def is_bottom_level(tree):
for element in tree:
if isinstance(element, list) or isinstance(element, tuple):
return False
return True
def get_lower_levels(parsed):
lower = []
def aux(subtree, path):
nonlocal lower
deeper = len(path) == 0
for i, element in enumerate(subtree):
if isinstance(element, list) or isinstance(element, tuple):
aux(element, path + (i,))
deeper = True
if not deeper:
lower.append((path, subtree))
aux(parsed, path=())
return lower
# TODO: probably optimize this, it creates lots of unnecessary tuples
def replace_position(tree, position, new_element):
def aux(current_tree, remaining_route):
if len(remaining_route) == 0:
return new_element
else:
step = remaining_route[0]
return (
tree[:step]
+ (aux(tree[step], remaining_route[1:]),)
+ tree[step + 2:]
)
return aux(tree, position)
def integrate_language(knowledge_base, example):
text = example["text"].lower()
parsed = example["parsed"]
resolved_parsed = copy.deepcopy(parsed)
tokens = to_tokens(text)
while True:
print("P:", resolved_parsed)
lower_levels = get_lower_levels(resolved_parsed)
print("Lower:", lower_levels)
if len(lower_levels) == 0:
break
for position, atom in lower_levels:
print("\x1b[1mSelecting\x1b[0m:", atom)
similar = get_similar_tree(knowledge_base, atom)
print("___>", similar)
remix, (start_bounds, end_bounds) = build_remix_matrix(knowledge_base, tokens, atom, similar)
_, matcher, result = make_template(knowledge_base, tokens, atom)
print("Tx:", tokens)
print("Mx:", matcher)
print("Rx:", result)
2017-05-13 18:33:09 +00:00
print("Remix:", remix)
2017-05-13 18:33:09 +00:00
after_remix = apply_remix(tokens[len(start_bounds):-len(end_bounds)], remix)
assert(len(after_remix) + len(start_bounds) + len(end_bounds) == len(tokens))
print( " +->", after_remix)
subquery_type = knowledge_evaluation.get_subquery_type(knowledge_base.knowledge, atom)
print(r" \-> <{}>".format(subquery_type))
# Clean remaining tokens
new_tokens = list(tokens)
offset = len(start_bounds)
for _ in range(len(remix)):
new_tokens.pop(offset)
# TODO: Get a specific types for... types
new_tokens.insert(offset, (subquery_type, remix))
tokens = new_tokens
resolved_parsed = replace_position(resolved_parsed, position, subquery_type)
print("#########")
tokens, matcher, result = make_template(knowledge_base, tokens, parsed)
print("T:", tokens)
print("M:", matcher)
print("R:", result)
print()
return tokens, matcher, result
2017-05-13 18:33:09 +00:00
def apply_remix(tokens, remix):
rebuilt = []
for i in remix:
rebuilt.append(tokens[i])
return rebuilt
def build_remix_matrix(knowledge_base, tokens, atom, similar):
# print("+" * 20)
tokens = list(tokens)
tokens, matcher, result = make_template(knowledge_base, tokens, atom)
similar_matcher, similar_result, similar_result_resolved, _ = similar
# print("NEW:")
# print("Tokens:", tokens)
# print("Matcher:", matcher)
# print("Result:", result)
# print()
# print("Similar:")
# print("Matcher:", similar_matcher)
# print("Result:", similar_result)
start_bounds, end_bounds = find_bounds(matcher, similar_matcher)
# print()
# print("Bounds:")
# print("Start:", start_bounds)
# print("End: ", end_bounds)
for i, element in (end_bounds + start_bounds[::-1]):
matcher.pop(i)
tokens.pop(i)
possible_remixes = get_possible_remixes(matcher, similar_matcher)
chosen_remix = possible_remixes[0]
# print("New tokens:", tokens)
# print("-" * 20)
return chosen_remix, (start_bounds, end_bounds)
def get_possible_remixes(matcher, similar_matcher):
# print("*" * 20)
# print(matcher)
# print(similar_matcher)
matrix = []
for element in matcher:
assert(element in similar_matcher)
indexes = all_indexes(similar_matcher, element)
matrix.append(indexes)
# print(matrix)
# print([list(x) for x in list(zip(*matrix))])
# TODO: do some scoring to find the most "interesting combination"
return [list(x) for x in list(zip(*matrix))]
def all_indexes(collection, element):
indexes = []
base = 0
for _ in range(collection.count(element)):
i = collection.index(element, base)
base = i + 1
indexes.append(i)
return indexes
def find_bounds(matcher, similar_matcher):
start_bounds = []
for i, element in enumerate(matcher):
if element in similar_matcher:
break
else:
start_bounds.append((i, element))
end_bounds = []
for i, element in enumerate(matcher[::-1]):
if element in similar_matcher:
break
else:
end_bounds.append((len(matcher) - (i + 1), element))
return start_bounds, end_bounds
def get_similar_tree(knowledge_base, atom):
possibilities = []
# Find matching possibilities
for entry, tree in knowledge_base.trained:
if not is_bottom_level(tree):
continue
if tree[0] == atom[0]:
possibilities.append((entry, tree))
# Sort by more matching elements
sorted_possibilities = []
for (raw, possibility) in possibilities:
resolved = []
for element in atom:
if isinstance(element, str):
resolved.append(element)
else:
resolved.append(knowledge_evaluation.resolve(
knowledge_base.knowledge,
element,
raw))
# TODO: Probably should take into account the categories of the elements in the "intake" ([0]) element
score = sum([resolved[i] == atom[i]
for i
in range(min(len(resolved),
len(atom)))])
sorted_possibilities.append((raw, possibility, resolved, score))
sorted_possibilities = sorted(sorted_possibilities, key=lambda p: p[3], reverse=True)
if len(sorted_possibilities) < 1:
return None
return sorted_possibilities[0]
# TODO: unroll this mess
def get_matching(sample, other):
l = len(sample[0])
other = list(filter(lambda x: len(x[0]) == l, other))
for i in range(l):
if len(other) == 0:
return []
if isinstance(sample[0][i], dict): # Dictionaries are compared by groups
other = list(filter(lambda x: isinstance(x[0][i], dict) and
len(x[0][i]['groups'] & sample[0][i]['groups']) > 0,
other))
elif isinstance(sample[0][i], tuple): # Tuples are compared by types [0]
other = list(filter(lambda x: isinstance(x[0][i], tuple) and
x[0][i][0] == sample[0][i][0],
other))
return [sample[0][x] if isinstance(sample[0][x], str)
else
sample[0][x] if isinstance(sample[0][x], tuple)
else {'groups': sample[0][x]['groups'] & reduce(lambda a, b: a & b,
map(lambda y: y[0][x]['groups'],
other))}
for x
in range(l)]
def reprocess_language_knowledge(knowledge_base, examples):
examples = knowledge_base.examples + examples
print('\n'.join(map(str, knowledge_base.examples)))
print("--")
pattern_examples = []
for i, sample in enumerate(examples):
other = examples[:i] + examples[i + 1:]
match = get_matching(sample, other)
print("->", match)
if len(match) > 0:
sample = (match, sample[1],)
pattern_examples.append(sample)
print()
print("\x1b[7m--\x1b[0m")
return pattern_examples
def fitting_return_type(knowledge,
return_type, remixer,
input_stream,
tail_of_ouput_stream,
remaining_recursions: int):
indent = " " + " " * (MAX_RECURSIONS - remaining_recursions)
for sample, ast in knowledge.trained:
try:
parsed_input = []
parsed_output = []
remaining_input = reverse_remix(input_stream, remixer)
print(indent + "RMXin:", remaining_input)
remaining_output = copy.deepcopy(sample)
print(indent + "S:", sample)
print(indent + "A:", ast)
print()
while len(remaining_output) > 0:
((input, output),
(remaining_input, remaining_output)) = match_token(knowledge,
remaining_input,
remaining_output,
remaining_recursions - 1)
parsed_input += input
parsed_output += output
print(indent + "INP:", input)
print(indent + "OUT:", output)
print(indent + "Pi:", parsed_input)
print(indent + "Po:", parsed_output)
print("\x1b[7m", end='')
print(indent + "Ri:", remaining_input)
print(indent + "Ro:", remaining_output)
print("\x1b[0m")
return ((parsed_input, parsed_output),
(remaining_input, remaining_output + tail_of_ouput_stream))
except TypeError as e:
print(indent + "X " + str(e))
pass
except IndexError as e:
print(indent + "X " + str(e))
pass
raise TypeError("No matching type found")
def reverse_remix(tree_section, remix):
result_section = []
for origin in remix:
result_section.append(copy.deepcopy(tree_section[origin]))
return result_section + tree_section[len(remix):]
def match_token(knowledge,
input: List[str],
trained: List[str],
remaining_recursions: int):
if remaining_recursions < 1:
return None
# print("#" * (MAX_RECURSIONS - remaining_recursions))
# print("Input:", input)
# print("Output:", trained)
indent = " " + " " * (MAX_RECURSIONS - remaining_recursions)
first_input = input[0]
expected_first = trained[0]
print(indent + "Ex?", expected_first)
print(indent + "Fo!", first_input)
if isinstance(expected_first, dict):
# TODO: check if the dictionary matches the values
return (([first_input], [expected_first]), (input[1:], trained[1:]))
elif isinstance(expected_first, tuple):
return_type, remixer = expected_first
return fitting_return_type(knowledge,
return_type, remixer,
input, trained[1:],
remaining_recursions)
elif expected_first == first_input:
return (([first_input], [expected_first]), (input[1:], trained[1:]))
return None
def get_fit(knowledge, row, remaining_recursions=MAX_RECURSIONS):
tokens = to_tokens(row)
indent = " " * (MAX_RECURSIONS - remaining_recursions)
for sample, ast in knowledge.trained:
print("-----")
print("TOK:", tokens)
try:
remaining_input = copy.deepcopy(tokens)
remaining_output = copy.deepcopy(sample)
print(indent + "AST:", ast)
print(indent + "S:", sample)
# TODO: merge with get_return type, as uses the same mechanism
while len(remaining_output) > 0:
((_, _), (remaining_input, remaining_output)) = match_token(knowledge,
remaining_input,
remaining_output,
remaining_recursions)
print(indent + "Ri:", remaining_input)
print(indent + "Ro:", remaining_output)
if len(remaining_input) == 0 and len(remaining_input) == 0:
print("!!!", tokens, sample, ast)
return tokens, sample, ast
except TypeError as e:
print(indent + "X " + str(e))
pass
except IndexError as e:
print(indent + "X " + str(e))
pass
print()
else:
return None