lang-model/naive-nlu/tree_nlu/layers/parsing.py

501 lines
19 KiB
Python

#!/usr/bin/env python
from ..session.org_mode import global_session as session
import re
import copy
from functools import reduce
from typing import List, Dict
from ..modifiable_property import ModifiableProperty
from .. import parameters
from ..atoms import Atom, a, is_atom
from .. import knowledge_evaluation
def make_template(knowledge_base, tokens, parsed):
matcher = list(tokens)
template = list(parsed)
session().annotate(" -- MK TEMPLATE --")
session().annotate("MATCHR: {}".format(matcher))
session().annotate("TEMPLT: {}".format(template))
for i in range(len(matcher)):
word = matcher[i]
if word in template:
template[template.index(word)] = i
matcher[i] = {
'groups': set(knowledge_base.knowledge.get(word, {}).get('groups', set())),
}
return tokens, matcher, template
def is_bottom_level(tree):
for element in tree:
if isinstance(element, list) or isinstance(element, tuple):
return False
return True
def get_lower_levels(parsed):
lower = []
def aux(subtree, path):
nonlocal lower
deeper = len(path) == 0
for i, element in enumerate(subtree):
if isinstance(element, list) or isinstance(element, tuple):
aux(element, path + (i,))
deeper = True
if not deeper:
lower.append((path, subtree))
aux(parsed, path=())
return lower
# TODO: probably optimize this, it creates lots of unnecessary tuples
def replace_position(tree, position, new_element):
session().annotate("REPLACE POSITIONS:")
session().annotate(" TREE : {}".format(tree))
session().annotate("POSITION: {}".format(position))
session().annotate("NEW ELEM: {}".format(new_element))
session().annotate("------------------")
def aux(current_tree, remaining_route):
if len(remaining_route) == 0:
return new_element
else:
step = remaining_route[0]
return (
tree[:step]
+ (aux(tree[step], remaining_route[1:]),)
+ tree[step + 2:]
)
result = aux(tree, position)
session().annotate("-RESULT: {}".format(result))
return result
def integrate_language(knowledge_base, example):
text = example["text"].lower()
parsed = example["parsed"]
tokens = example['tokens']
resolved_parsed = copy.deepcopy(parsed)
while True:
session().annotate("P: {}".format(resolved_parsed))
lower_levels = get_lower_levels(resolved_parsed)
session().annotate("Lower: {}".format(lower_levels))
if len(lower_levels) == 0:
break
for position, atom in lower_levels:
with session().log("Atom {}".format(atom)):
result = None
similars = get_similar_tree(knowledge_base, atom, tokens)
for similar in similars:
result = build_remix_matrix(knowledge_base, tokens, atom, similar)
if result is not None:
break
else:
raise Exception('Similar not found')
remix, (start_bounds, end_bounds) = result
after_remix = apply_remix(tokens[len(start_bounds):-len(end_bounds)], remix)
session().annotate("--FIND MIX--")
session().annotate("-MIX- | {}".format(remix))
session().annotate("-FRM- | {}".format(tokens))
session().annotate("-AFT- | {}".format(after_remix))
session().annotate("--- TEMPLATE ---")
_, matcher, result = make_template(knowledge_base, after_remix, atom)
session().annotate("Tx: {}".format(after_remix))
session().annotate("Mx: {}".format(matcher))
session().annotate("Rx: {}".format(result))
session().annotate("Sx: {}".format(start_bounds))
session().annotate("Ex: {}".format(end_bounds))
assert(len(after_remix) + len(start_bounds) + len(end_bounds) == len(tokens))
session().annotate( " +-> {}".format(after_remix))
subquery_type = knowledge_evaluation.get_subquery_type(knowledge_base.knowledge, atom)
session().annotate(r" \-> <{}>".format(subquery_type))
# Clean remaining tokens
new_tokens = list(tokens)
offset = len(start_bounds)
for _ in range(len(remix)):
new_tokens.pop(offset)
# TODO: Get a specific types for... types
new_tokens.insert(offset, (subquery_type, remix))
tokens = new_tokens
resolved_parsed = replace_position(resolved_parsed, position, offset)
session().annotate("RP: {}".format(resolved_parsed))
session().annotate("AT: {}".format(atom))
session().annotate("#########")
tokens, matcher, result = make_template(knowledge_base, tokens, resolved_parsed)
session().annotate("T: {}".format(tokens))
session().annotate("M: {}".format(matcher))
session().annotate("R: {}".format(result))
session().annotate("---")
yield tokens, matcher, result
def apply_remix(tokens, remix):
rebuilt = []
for i in remix:
if isinstance(i, int):
if i >= len(tokens):
return None
rebuilt.append(tokens[i])
else:
assert(isinstance(i, str))
rebuilt.append(i)
return rebuilt
def build_remix_matrix(knowledge_base, tokens, atom, similar):
tokens = list(tokens)
with session().log("Remix matrix for {} - {}".format(tokens, atom)):
tokens, matcher, result = make_template(knowledge_base, tokens, atom)
similar_matcher, similar_result, similar_result_resolved, _, _ = similar
start_bounds, end_bounds = find_bounds(knowledge_base, matcher, similar_matcher)
for i, element in (end_bounds + start_bounds[::-1]):
matcher.pop(i)
tokens.pop(i)
possible_remixes = get_possible_remixes(knowledge_base, matcher, similar_matcher)
session().annotate("Possible remixes: {}".format(possible_remixes))
if len(possible_remixes) < 1:
return None
chosen_remix = possible_remixes[0]
return chosen_remix, (start_bounds, end_bounds)
def get_possible_remixes(knowledge_base, matcher, similar_matcher):
matrix = []
with session().log("Possible remixes from matcher: {}".format(matcher)):
for element in matcher:
with session().log("Element `{}`".format(element)):
session().annotate("Similar `{}`".format(similar_matcher))
if element in similar_matcher or isinstance(element, dict):
if isinstance(element, dict):
indexes = all_matching_indexes(knowledge_base, similar_matcher, element)
session().annotate("Dict element matching: {}".format(indexes))
else:
indexes = all_indexes(similar_matcher, element)
session().annotate("* element matching: {}".format(indexes))
matrix.append(indexes)
else:
session().annotate("`else` element matching: [element]")
matrix.append([element])
# TODO: do some scoring to find the most "interesting combination"
return [list(x) for x in list(zip(*matrix))]
def all_indexes(collection, element):
indexes = []
base = 0
for _ in range(collection.count(element)):
i = collection.index(element, base)
base = i + 1
indexes.append(i)
return indexes
def all_matching_indexes(knowledge_base, collection, element):
indexes = []
with session().log('Matching “{}'.format(element)):
assert("groups" in element)
element = element["groups"]
for i, instance in enumerate(collection):
session().log('Checking “{}'.format(instance))
if isinstance(instance, dict):
instance = instance["groups"]
elif instance in knowledge_base.knowledge:
session().log('Knowledge about “{}”: ”{}'.format(instance, knowledge_base.knowledge[instance]))
if "groups" not in knowledge_base.knowledge[instance]:
# This means that is only known as token
# so we should try to avoid using it
continue
instance = knowledge_base.knowledge[instance]["groups"]
intersection = set(instance) & set(element)
if (len(intersection) > 0 or (0 == len(instance) == len(element))):
indexes.append((i, intersection))
return [x[0] for x in sorted(indexes, key=lambda x: len(x[1]), reverse=True)]
def element_matches_groups(knowledge, element: Dict, groups):
with session().log("Checking if e “{}” matches groups “{}".format(element, groups)):
if isinstance(groups, str) and groups in knowledge:
return len(knowledge[groups].get("groups", set()) & element['groups']) > 0
elif isinstance(groups, dict):
return len(element.get("groups", set()) & element['groups']) > 0
return False
def find_bounds(knowledge, matcher, similar_matcher):
start_bounds = []
for i, element in enumerate(matcher):
if element in similar_matcher:
break
else:
start_bounds.append((i, element))
end_bounds = []
for i, element in enumerate(matcher[::-1]):
in_similar = False
if isinstance(element, str):
in_similar = element in similar_matcher
elif isinstance(element, dict):
in_similar = any(map(lambda groups: element_matches_groups(knowledge.knowledge,
element, groups),
similar_matcher))
if in_similar:
break
else:
end_bounds.append((len(matcher) - (i + 1), element))
return start_bounds, end_bounds
def get_similar_tree(knowledge_base, atom, tokens):
possibilities = []
# Find matching possibilities
for entry, tree in knowledge_base.trained:
if not is_bottom_level(tree):
continue
if tree[0] == atom[0]:
possibilities.append((entry, tree))
# Sort by more matching elements
sorted_possibilities = []
for (raw, possibility) in possibilities:
resolved = []
for element in atom:
if isinstance(element, str):
resolved.append(element)
else:
resolved.append(knowledge_evaluation.resolve(
knowledge_base.knowledge,
element,
raw))
# TODO: Probably should take into account the categories of the elements in the "intake" ([0]) element
atom_score = sum([resolved[i] == atom[i]
for i
in range(min(len(resolved),
len(atom)))])
token_score = sum([similar_token in tokens
for similar_token
in raw])
sorted_possibilities.append((raw, possibility, resolved, atom_score, token_score))
sorted_possibilities = sorted(sorted_possibilities, key=lambda p: p[3] * 100 + p[4], reverse=True)
if len(sorted_possibilities) < 1:
return []
for i, possibility in enumerate(sorted_possibilities):
similar_matcher, similar_result, similar_result_resolved, _atom_score, _token_score = possibility
with session().log("Like {}".format(similar_matcher)):
session().annotate('AST: {}'.format(similar_result))
session().annotate('Results on: {}'.format(similar_result_resolved))
session().annotate('Atom score: {}'.format(_atom_score))
session().annotate('Token score: {}'.format(_token_score))
return sorted_possibilities
# TODO: unroll this mess
def get_matching(sample, other):
l = len(sample[0])
other = list(filter(lambda x: len(x[0]) == l, other))
for i in range(l):
if len(other) == 0:
return []
if isinstance(sample[0][i], dict): # Dictionaries are compared by groups
other = list(filter(lambda x: isinstance(x[0][i], dict) and
len(x[0][i]['groups'] & sample[0][i]['groups']) > 0,
other))
elif isinstance(sample[0][i], tuple): # Tuples are compared by types [0]
other = list(filter(lambda x: isinstance(x[0][i], tuple) and
x[0][i][0] == sample[0][i][0],
other))
matching = []
for x in range(l): # Generate the combination of this and other(s) matcher
first_sample_data = sample[0][x]
if isinstance(first_sample_data, str):
matching.append(first_sample_data)
elif isinstance(first_sample_data, tuple):
matching.append(first_sample_data)
else:
this_groups = sample[0][x]['groups']
if len(other) > 0:
other_groups = reduce(lambda a, b: a & b,
map(lambda y: y[0][x]['groups'],
other))
this_groups = this_groups & other_groups
matching.append({'groups': this_groups})
return matching
def reverse_remix(tree_section, remix):
result_section = []
offset = 0
for origin in remix:
if isinstance(origin, int):
if (origin + offset) >= len(tree_section):
return None
result_section.append(copy.deepcopy(tree_section[origin + offset]))
else:
assert(isinstance(origin, str))
offset += 1
return result_section + tree_section[len(remix):]
def get_fit(knowledge, tokens, remaining_recursions=parameters.MAX_RECURSIONS):
results = []
for matcher, ast in knowledge.trained:
with session().log("{} <- {}".format(matcher, tokens)):
result = match_fit(knowledge, tokens, matcher, ast,
remaining_recursions)
if result is not None:
with session().log("Result: {}".format(result)):
results.append(result)
if len(results) > 0:
return results[0]
def is_definite_minisegment(minisegment):
return isinstance(minisegment, str) or isinstance(minisegment, dict)
def match_token(knowledge, next_token, minisegment):
if isinstance(minisegment, dict):
return knowledge_evaluation.can_be_used_in_place(knowledge, next_token, minisegment)
elif isinstance(minisegment, str):
# TODO: check if the two elements can be used in each other place
return next_token == minisegment
return False
def resolve_fit(knowledge, fit, remaining_recursions):
fitted = []
for element in fit:
if is_definite_minisegment(element):
fitted.append(element)
else:
with session().log("Resolving fit of `{}`".format(element)):
((result_type, remixer), tokens) = element
remixed_tokens = reverse_remix(tokens, remixer)
if remixed_tokens is None:
return None
minifit = get_fit(knowledge, remixed_tokens, remaining_recursions - 1)
if minifit is None:
return None
minitokens, miniast = minifit
session().annotate(" AST | {}".format(miniast))
subproperty = knowledge_evaluation.resolve(knowledge.knowledge, minitokens, miniast)
fitted.append(subproperty)
return fitted
def match_fit(knowledge, tokens, matcher, ast, remaining_recursions):
segment_possibilities = [([], tokens)] # Matched tokens, remaining tokens
indent = ' ' * (parameters.MAX_RECURSIONS - remaining_recursions)
session().annotate(indent + 'T> {}'.format(tokens))
session().annotate(indent + 'M> {}'.format(matcher))
for minisegment in matcher:
with session().log("Minisegment `{}`".format(minisegment)):
possibilities_after_round = []
for matched_tokens, remaining_tokens in segment_possibilities:
if len(remaining_tokens) < 1:
continue
session().annotate(indent + "RT {}".format(remaining_tokens[0]))
session().annotate(indent + "DEF {}".format(is_definite_minisegment(minisegment)))
if is_definite_minisegment(minisegment):
# What if not match -----<
if match_token(knowledge, remaining_tokens[0], minisegment):
possibilities_after_round.append((
matched_tokens + [remaining_tokens[0]],
remaining_tokens[1:]
))
else:
# What if not match!!!!!!-----<
# TODO: optimize this with a look ahead
for i in range(1, len(tokens)):
possibilities_after_round.append((
matched_tokens + [(minisegment, remaining_tokens[:i])],
remaining_tokens[i:]
))
session().annotate(indent + "## PA {}".format(possibilities_after_round))
else:
segment_possibilities = possibilities_after_round
for possibility in segment_possibilities:
with session().log("Possibility: `{}`".format(possibility)):
pass
if len(segment_possibilities) < 1:
with session().log("NO POSSIBLE"):
pass
fully_matched_segments = [(matched, remaining)
for (matched, remaining)
in segment_possibilities
if len(remaining) == 0]
resolved_fits = []
with session().log("Full matches"):
for fit, _ in fully_matched_segments:
with session().log(fit): # REMIXES HAVE TO BE APPLIED BEFORE!!!
pass
with session().log("Resolutions"):
for fit, _ in fully_matched_segments:
with session().log("Resolving {}".format(fit)): # REMIXES HAVE TO BE APPLIED BEFORE!!!
resolved_fit = resolve_fit(knowledge, fit, remaining_recursions)
if resolved_fit is not None:
resolved_fits.append(resolved_fit)
else:
session().annotate("Not resolved")
if len(resolved_fits) == 0:
return None
return resolved_fits[0], ast