#!/usr/bin/env python import knowledge_evaluation import depth_meter import logging import re import copy from functools import reduce from typing import List from modifiable_property import ModifiableProperty MAX_RECURSIONS = 5 # TODO: more flexible tokenization def to_tokens(text): return re.findall(r'(\w+|[^\s])', text) def make_template(knowledge_base, tokens, parsed): matcher = list(tokens) template = list(parsed) for i in range(len(matcher)): word = matcher[i] if word in template: template[template.index(word)] = i matcher[i] = { 'groups': set(knowledge_base.knowledge[word]['groups']) } return tokens, matcher, template def is_bottom_level(tree): for element in tree: if isinstance(element, list) or isinstance(element, tuple): return False return True def get_lower_levels(parsed): lower = [] def aux(subtree, path): nonlocal lower deeper = len(path) == 0 for i, element in enumerate(subtree): if isinstance(element, list) or isinstance(element, tuple): aux(element, path + (i,)) deeper = True if not deeper: lower.append((path, subtree)) aux(parsed, path=()) return lower # TODO: probably optimize this, it creates lots of unnecessary tuples def replace_position(tree, position, new_element): def aux(current_tree, remaining_route): if len(remaining_route) == 0: return new_element else: step = remaining_route[0] return ( tree[:step] + (aux(tree[step], remaining_route[1:]),) + tree[step + 2:] ) return aux(tree, position) def integrate_language(knowledge_base, example): text = example["text"].lower() parsed = example["parsed"] resolved_parsed = copy.deepcopy(parsed) tokens = to_tokens(text) while True: logging.debug("P:", resolved_parsed) lower_levels = get_lower_levels(resolved_parsed) logging.debug("Lower:", lower_levels) if len(lower_levels) == 0: break for position, atom in lower_levels: logging.debug("\x1b[1mSelecting\x1b[0m:", atom) similar = get_similar_tree(knowledge_base, atom) logging.debug("___>", similar) remix, (start_bounds, end_bounds) = build_remix_matrix(knowledge_base, tokens, atom, similar) _, matcher, result = make_template(knowledge_base, tokens, atom) logging.debug("Tx:", tokens) logging.debug("Mx:", matcher) logging.debug("Rx:", result) logging.debug("Remix:", remix) after_remix = apply_remix(tokens[len(start_bounds):-len(end_bounds)], remix) assert(len(after_remix) + len(start_bounds) + len(end_bounds) == len(tokens)) logging.debug( " +->", after_remix) subquery_type = knowledge_evaluation.get_subquery_type(knowledge_base.knowledge, atom) logging.debug(r" \-> <{}>".format(subquery_type)) # Clean remaining tokens new_tokens = list(tokens) offset = len(start_bounds) for _ in range(len(remix)): new_tokens.pop(offset) # TODO: Get a specific types for... types new_tokens.insert(offset, (subquery_type, remix)) tokens = new_tokens resolved_parsed = replace_position(resolved_parsed, position, offset) logging.debug("#########") tokens, matcher, result = make_template(knowledge_base, tokens, resolved_parsed) logging.debug("T:", tokens) logging.debug("M:", matcher) logging.debug("R:", result) logging.debug("---") return tokens, matcher, result def apply_remix(tokens, remix): rebuilt = [] for i in remix: rebuilt.append(tokens[i]) return rebuilt def build_remix_matrix(knowledge_base, tokens, atom, similar): # logging.debug("+" * 20) tokens = list(tokens) tokens, matcher, result = make_template(knowledge_base, tokens, atom) similar_matcher, similar_result, similar_result_resolved, _ = similar # logging.debug("NEW:") # logging.debug("Tokens:", tokens) # logging.debug("Matcher:", matcher) # logging.debug("Result:", result) # logging.debug("---") # logging.debug("Similar:") # logging.debug("Matcher:", similar_matcher) # logging.debug("Result:", similar_result) start_bounds, end_bounds = find_bounds(matcher, similar_matcher) # logging.debug("---") # logging.debug("Bounds:") # logging.debug("Start:", start_bounds) # logging.debug("End: ", end_bounds) for i, element in (end_bounds + start_bounds[::-1]): matcher.pop(i) tokens.pop(i) possible_remixes = get_possible_remixes(knowledge_base, matcher, similar_matcher) chosen_remix = possible_remixes[0] # logging.debug("New tokens:", tokens) # logging.debug("-" * 20) return chosen_remix, (start_bounds, end_bounds) def get_possible_remixes(knowledge_base, matcher, similar_matcher): # logging.debug("*" * 20) # logging.debug(matcher) # logging.debug(similar_matcher) matrix = [] for element in matcher: logging.debug("-", element) logging.debug("+", similar_matcher) assert(element in similar_matcher or isinstance(element, dict)) if isinstance(element, dict): indexes = all_matching_indexes(knowledge_base, similar_matcher, element) else: indexes = all_indexes(similar_matcher, element) matrix.append(indexes) # logging.debug(matrix) # logging.debug([list(x) for x in list(zip(*matrix))]) # TODO: do some scoring to find the most "interesting combination" return [list(x) for x in list(zip(*matrix))] def all_indexes(collection, element): indexes = [] base = 0 for _ in range(collection.count(element)): i = collection.index(element, base) base = i + 1 indexes.append(i) return indexes def all_matching_indexes(knowledge_base, collection, element): indexes = [] assert("groups" in element) element = element["groups"] for i, instance in enumerate(collection): if isinstance(instance, dict): instance = instance["groups"] elif instance in knowledge_base.knowledge: instance = knowledge_base.knowledge[instance]["groups"] intersection = set(instance) & set(element) if len(intersection) > 0: indexes.append((i, intersection)) return [x[0] for x in sorted(indexes, key=lambda x: len(x[1]), reverse=True)] def find_bounds(matcher, similar_matcher): start_bounds = [] for i, element in enumerate(matcher): if element in similar_matcher: break else: start_bounds.append((i, element)) end_bounds = [] for i, element in enumerate(matcher[::-1]): if element in similar_matcher: break else: end_bounds.append((len(matcher) - (i + 1), element)) return start_bounds, end_bounds def get_similar_tree(knowledge_base, atom): possibilities = [] # Find matching possibilities for entry, tree in knowledge_base.trained: if not is_bottom_level(tree): continue if tree[0] == atom[0]: possibilities.append((entry, tree)) # Sort by more matching elements sorted_possibilities = [] for (raw, possibility) in possibilities: resolved = [] for element in atom: if isinstance(element, str): resolved.append(element) else: resolved.append(knowledge_evaluation.resolve( knowledge_base.knowledge, element, raw)) # TODO: Probably should take into account the categories of the elements in the "intake" ([0]) element score = sum([resolved[i] == atom[i] for i in range(min(len(resolved), len(atom)))]) sorted_possibilities.append((raw, possibility, resolved, score)) sorted_possibilities = sorted(sorted_possibilities, key=lambda p: p[3], reverse=True) if len(sorted_possibilities) < 1: return None return sorted_possibilities[0] # TODO: unroll this mess def get_matching(sample, other): l = len(sample[0]) other = list(filter(lambda x: len(x[0]) == l, other)) for i in range(l): if len(other) == 0: return [] if isinstance(sample[0][i], dict): # Dictionaries are compared by groups other = list(filter(lambda x: isinstance(x[0][i], dict) and len(x[0][i]['groups'] & sample[0][i]['groups']) > 0, other)) elif isinstance(sample[0][i], tuple): # Tuples are compared by types [0] other = list(filter(lambda x: isinstance(x[0][i], tuple) and x[0][i][0] == sample[0][i][0], other)) return [sample[0][x] if isinstance(sample[0][x], str) else sample[0][x] if isinstance(sample[0][x], tuple) else {'groups': sample[0][x]['groups'] & reduce(lambda a, b: a & b, map(lambda y: y[0][x]['groups'], other))} for x in range(l)] def reprocess_language_knowledge(knowledge_base, examples): examples = knowledge_base.examples + examples logging.debug('\n'.join(map(str, knowledge_base.examples))) logging.debug("--") pattern_examples = [] for i, sample in enumerate(examples): other = examples[:i] + examples[i + 1:] match = get_matching(sample, other) logging.debug("->", match) if len(match) > 0: sample = (match, sample[1],) pattern_examples.append(sample) logging.debug("---") logging.debug("\x1b[7m--\x1b[0m") return pattern_examples def fitting_return_type(knowledge, return_type, remixer, input_stream, tail_of_ouput_stream, remaining_recursions: int): indent = " " + " " * (MAX_RECURSIONS - remaining_recursions) for sample, ast in knowledge.trained: try: parsed_input = [] parsed_output = [] remaining_input = reverse_remix(input_stream, remixer) logging.debug(indent + "RMXin:", remaining_input) remaining_output = copy.deepcopy(sample) logging.debug(indent + "S:", sample) logging.debug(indent + "A:", ast) logging.debug("---") while len(remaining_output) > 0: for (elements, (remaining_input, remaining_output)) in match_token(knowledge, remaining_input, remaining_output, remaining_recursions - 1): parsed_input += elements logging.debug(indent + "Elements:", elements) break logging.debug(indent + "Pi:", parsed_input) logging.debug(indent + "Po:", parsed_output) logging.debug("\x1b[7m" + indent + "Ri:", remaining_input, "\x1b[0m") logging.debug("\x1b[7m" + indent + "Ro:", remaining_output + tail_of_ouput_stream, "\x1b[0m") logging.debug("---") resolved_input = knowledge_evaluation.resolve(knowledge.knowledge, parsed_input, ast) if isinstance(resolved_input, ModifiableProperty): resolved_input = resolved_input.getter() yield ([resolved_input], (remaining_input, remaining_output + tail_of_ouput_stream)) except TypeError as e: logging.debug(indent + "X " + str(e)) pass except ValueError as e: logging.debug(indent + "X " + str(e)) pass except IndexError as e: logging.debug(indent + "X " + str(e)) pass except KeyError as e: logging.debug(indent + "X " + str(e)) pass raise TypeError("No matching type found") def reverse_remix(tree_section, remix): result_section = [] for origin in remix: result_section.append(copy.deepcopy(tree_section[origin])) return result_section + tree_section[len(remix):] def match_token(knowledge, input: List[str], trained: List[str], remaining_recursions: int): if remaining_recursions < 1: yield None # logging.debug("#" * (MAX_RECURSIONS - remaining_recursions)) # logging.debug("Input:", input) # logging.debug("Output:", trained) depth_meter.show_depth(MAX_RECURSIONS - remaining_recursions) indent = " " + " " * (MAX_RECURSIONS - remaining_recursions) first_input = input[0] expected_first = trained[0] logging.debug(indent + "Ex?", expected_first) logging.debug(indent + "Fo!", first_input) if isinstance(expected_first, dict): # TODO: check if the dictionary matches the values yield (([first_input]), (input[1:], trained[1:])) elif isinstance(expected_first, tuple): return_type, remixer = expected_first for r in fitting_return_type(knowledge, return_type, remixer, input, trained[1:], remaining_recursions): logging.debug("-->", r) yield r elif expected_first == first_input: yield (([first_input]), (input[1:], trained[1:])) yield None def get_fit_onwards(knowledge, ast, remaining_input, remaining_output, remaining_recursions): indent = "." + " " * (MAX_RECURSIONS - remaining_recursions) try: # TODO: merge with get_return type, as uses the same mechanism if len(remaining_output) > 0: for (elements, (input_for_next_level, output_for_next_level)) in match_token(knowledge, remaining_input, remaining_output, remaining_recursions): logging.debug("Nli:", input_for_next_level) logging.debug("Nlo:", output_for_next_level) logging.debug(indent + "E", elements) try: result = get_fit_onwards(knowledge, ast, input_for_next_level, output_for_next_level, remaining_recursions) logging.debug(indent + "→", result) lower_elements, _ = result logging.debug("<<<<< ELM:", elements, lower_elements) return elements + lower_elements, ast except TypeError as e: logging.debug(indent + "X " + str(e)) except IndexError as e: logging.debug(indent + "X " + str(e)) else: logging.debug(indent + "Ri:", remaining_input) logging.debug(indent + "Ro:", remaining_output) logging.debug("OK") elif len(remaining_input) == 0 and len(remaining_input) == 0: logging.debug("<<<<< AST:", ast) return [], ast except TypeError as e: logging.debug(indent + "X " + str(e)) except IndexError as e: logging.debug(indent + "X " + str(e)) return None def get_fit(knowledge, row, remaining_recursions=MAX_RECURSIONS): tokens = to_tokens(row) indent = " " * (MAX_RECURSIONS - remaining_recursions) for sample, ast in knowledge.trained: logging.debug("-----") logging.debug("TOK:", tokens) try: remaining_input = copy.deepcopy(tokens) remaining_output = copy.deepcopy(sample) logging.debug(indent + "AST:", ast) logging.debug(indent + "S:", sample) result = get_fit_onwards(knowledge, ast, remaining_input, remaining_output, remaining_recursions) if result is not None: return result except TypeError as e: logging.debug(indent + "X " + str(e)) except IndexError as e: logging.debug(indent + "X " + str(e)) logging.debug("---") else: return None