#!/usr/bin/env python from . import knowledge_evaluation from . import depth_meter from .session.org_mode import global_session as session import re import copy from functools import reduce from typing import List, Dict from .modifiable_property import ModifiableProperty from . import parameters from .atoms import Atom, a, is_atom def lookahead_for_tokens_or_strucutral_elements(knowledge_base, remaining): for se in knowledge_base.structural_elements: found_position = remaining.find(se) found = found_position >= 0 session().annotate('Looking for structure with “{}”, found? {}'.format(se, found)) if found: return [ (remaining[:found_position], se, remaining[found_position + len(se):]) ] for token in knowledge_base.knowledge.keys(): found_position = remaining.find(token) found = found_position >= 0 session().annotate('Looking for token “{}”, found? {}'.format(token, found)) if found: return [ (remaining[:found_position], token, remaining[found_position + len(token):]) ] return None def to_tokens(knowledge_base, text, precedent=None): if len(text) == 0: session().annotate("No text remaining") yield [''] return with session().log("Tokenizing {}".format(text)): for option in knowledge_base.expected_token_after_precedent(precedent): with session().log("Next: “{}”".format(option)): with session().log("Matching “{}” on “{}”".format(option, text)): for token_match in tokenization_match(option, text, knowledge_base): if token_match is None: session().annotate("No match") match, remaining = token_match if len(remaining) == len(text): raise Exception('No text consumed in match') session().annotate('Match: “{}”'.format(match)) with session().log('Remaining “{}”'.format(remaining)): for sublevel in to_tokens(knowledge_base, remaining, match): candidate = list(filter(lambda x: x != '', [match] + sublevel)) session().annotate('Yielding candidate “{}”'.format(candidate)) yield candidate def tokenization_match(element, text, knowledge_base): # Constant/structural string matching if isinstance(element, str): if text.find(element) == 0: # This match comes from a structuring element # It doesn't appear on the tokenization # So we should return it as an empty string yield ('', text[len(element):]) return else: # No match found return elif is_atom(element, 'token'): yield from match_single_token(text, knowledge_base) return raise NotImplementedError() def match_single_token(text, knowledge_base): found_token = False for token in knowledge_base.knowledge.keys(): if text.find(token) == 0: yield token, text[len(token):] found_token = True if found_token: return session().annotate('No token found at the start of ”{}”'.format(text)) session().annotate('using structural elements to infer it') # TODO: review this when multiple structural elements are available for se in knowledge_base.structural_elements: session().annotate('Looking for se “{}” in “{}”'.format(se, text)) position = text.find(se, 0) found = position > 0 # 0 is not considered a valid position for this kind of split if found: session().annotate('Found ”{}”, inferring “{}”'.format(se, text[:position])) yield text[:position], text[position:] session().annotate('No structural element or token found, inferring only token remaining') yield text, '' # Using other tokens for cutoff for token in knowledge_base.knowledge.keys(): session().annotate('Looking for token “{}” in “{}”'.format(token, text)) position = text.find(token) found = position >= 0 if found: session().annotate('Found ”{}”, in position ”{}”'.format(token, position)) yield text[:position], text[position:] def integrate_tokenization(knowledge_base, example): text = example['text'] tokens = example['tokens'] meaning = example.get('meaning') return integrate_token_to_text_matching(knowledge_base, text, tokens) def integrate_token_to_text_matching(knowledge_base, text, tokens): texts = [text] # Convert to tokens for token_id, token in enumerate(tokens): # Look for token in texts for i, text in enumerate(texts): if isinstance(text, int): continue if token in text: before, after = text.split(token, maxsplit=1) texts = (texts[:i] + [before] + [a('token')] + [after] + texts[i + 1:]) break else: raise Exception('Token not found') # Remove leftovers from splits texts = list(filter(lambda x: x != '', texts)) session().log("Tokenized as {} over {}".format(texts, tokens)) for i, element in enumerate(texts[:-1]): learn_token_pair(element, texts[i + 1], knowledge_base) return tokens def learn_token_pair(precedent, consequent, knowledge_base): knowledge_base.add_token_pair(precedent, consequent) def pick_one_tokenization(options, knowledge_base): ''' Heuristic function to pick the most probable tokenization. Just pick the one with more results. ''' options = list(options) with session().log("Picking among: {} options".format(len(options))): session().log("Options: \n{}".format('\n'.join(map(str, options)))) return pick_by_score(options, [ # By number of splits without structuring elements lambda tokenization: sum(map( lambda split: sum(map( lambda se: se in split, knowledge_base.structural_elements )), tokenization)), # By number of unknown tokens lambda tokenization: len(list(filter(lambda token: (token not in knowledge_base.knowledge.keys()) and (token not in knowledge_base.structural_elements), tokenization))), # By number of splits lambda tokenization: -len(tokenization), ]) def pick_by_score(options, heuristics): for heuristic in heuristics: assert(len(options) > 0) options = list(map(lambda opt: (heuristic(opt), opt), options)) sorted_options = sorted(options, key=lambda x: x[0], reverse=False) heuristic_cutoff = sorted_options[0][0] session().annotate(sorted_options) pass_heuristic = [opt for (score, opt) in sorted_options if score <= heuristic_cutoff] options = pass_heuristic session().log("{} finalists: \n{}".format(len(options), '\n'.join(map(str, options)))) return options[0] def make_template(knowledge_base, tokens, parsed): matcher = list(tokens) template = list(parsed) session().annotate(" -- MK TEMPLATE --") session().annotate("MATCHR: {}".format(matcher)) session().annotate("TEMPLT: {}".format(template)) for i in range(len(matcher)): word = matcher[i] if word in template: template[template.index(word)] = i matcher[i] = { 'groups': set(knowledge_base.knowledge.get(word, {}).get('groups', set())), } return tokens, matcher, template def is_bottom_level(tree): for element in tree: if isinstance(element, list) or isinstance(element, tuple): return False return True def get_lower_levels(parsed): lower = [] def aux(subtree, path): nonlocal lower deeper = len(path) == 0 for i, element in enumerate(subtree): if isinstance(element, list) or isinstance(element, tuple): aux(element, path + (i,)) deeper = True if not deeper: lower.append((path, subtree)) aux(parsed, path=()) return lower # TODO: probably optimize this, it creates lots of unnecessary tuples def replace_position(tree, position, new_element): session().annotate("REPLACE POSITIONS:") session().annotate(" TREE : {}".format(tree)) session().annotate("POSITION: {}".format(position)) session().annotate("NEW ELEM: {}".format(new_element)) session().annotate("------------------") def aux(current_tree, remaining_route): if len(remaining_route) == 0: return new_element else: step = remaining_route[0] return ( tree[:step] + (aux(tree[step], remaining_route[1:]),) + tree[step + 2:] ) result = aux(tree, position) session().annotate("-RESULT: {}".format(result)) return result def integrate_language(knowledge_base, example): text = example["text"].lower() parsed = example["parsed"] resolved_parsed = copy.deepcopy(parsed) tokens = list(pick_one_tokenization(to_tokens(knowledge_base, text), knowledge_base)) while True: session().annotate("P: {}".format(resolved_parsed)) lower_levels = get_lower_levels(resolved_parsed) session().annotate("Lower: {}".format(lower_levels)) if len(lower_levels) == 0: break for position, atom in lower_levels: with session().log("Atom {}".format(atom)): similars = get_similar_tree(knowledge_base, atom, tokens) for similar in similars: result = build_remix_matrix(knowledge_base, tokens, atom, similar) if result is not None: break if result is None: raise Exception("No match found") remix, (start_bounds, end_bounds) = result after_remix = apply_remix(tokens[len(start_bounds):-len(end_bounds)], remix) session().annotate("--FIND MIX--") session().annotate("-MIX- | {}".format(remix)) session().annotate("-FRM- | {}".format(tokens)) session().annotate("-AFT- | {}".format(after_remix)) session().annotate("--- TEMPLATE ---") _, matcher, result = make_template(knowledge_base, after_remix, atom) session().annotate("Tx: {}".format(after_remix)) session().annotate("Mx: {}".format(matcher)) session().annotate("Rx: {}".format(result)) session().annotate("Sx: {}".format(start_bounds)) session().annotate("Ex: {}".format(end_bounds)) assert(len(after_remix) + len(start_bounds) + len(end_bounds) == len(tokens)) session().annotate( " +-> {}".format(after_remix)) subquery_type = knowledge_evaluation.get_subquery_type(knowledge_base.knowledge, atom) session().annotate(r" \-> <{}>".format(subquery_type)) # Clean remaining tokens new_tokens = list(tokens) offset = len(start_bounds) for _ in range(len(remix)): new_tokens.pop(offset) # TODO: Get a specific types for... types new_tokens.insert(offset, (subquery_type, remix)) tokens = new_tokens resolved_parsed = replace_position(resolved_parsed, position, offset) session().annotate("RP: {}".format(resolved_parsed)) session().annotate("AT: {}".format(atom)) session().annotate("#########") tokens, matcher, result = make_template(knowledge_base, tokens, resolved_parsed) session().annotate("T: {}".format(tokens)) session().annotate("M: {}".format(matcher)) session().annotate("R: {}".format(result)) session().annotate("---") return tokens, matcher, result def apply_remix(tokens, remix): rebuilt = [] for i in remix: if isinstance(i, int): if i >= len(tokens): return None rebuilt.append(tokens[i]) else: assert(isinstance(i, str)) rebuilt.append(i) return rebuilt def build_remix_matrix(knowledge_base, tokens, atom, similar): tokens = list(tokens) with session().log("Remix matrix for {} - {}".format(tokens, atom)): tokens, matcher, result = make_template(knowledge_base, tokens, atom) similar_matcher, similar_result, similar_result_resolved, _, _ = similar start_bounds, end_bounds = find_bounds(knowledge_base, matcher, similar_matcher) for i, element in (end_bounds + start_bounds[::-1]): matcher.pop(i) tokens.pop(i) possible_remixes = get_possible_remixes(knowledge_base, matcher, similar_matcher) session().annotate("Possible remixes: {}".format(possible_remixes)) if len(possible_remixes) < 1: return None chosen_remix = possible_remixes[0] return chosen_remix, (start_bounds, end_bounds) def get_possible_remixes(knowledge_base, matcher, similar_matcher): matrix = [] with session().log("Possible remixes from matcher: {}".format(matcher)): for element in matcher: with session().log("Element `{}`".format(element)): session().annotate("Similar `{}`".format(similar_matcher)) if element in similar_matcher or isinstance(element, dict): if isinstance(element, dict): indexes = all_matching_indexes(knowledge_base, similar_matcher, element) session().annotate("Dict element matching: {}".format(indexes)) else: indexes = all_indexes(similar_matcher, element) session().annotate("* element matching: {}".format(indexes)) matrix.append(indexes) else: session().annotate("`else` element matching: [element]") matrix.append([element]) # TODO: do some scoring to find the most "interesting combination" return [list(x) for x in list(zip(*matrix))] def all_indexes(collection, element): indexes = [] base = 0 for _ in range(collection.count(element)): i = collection.index(element, base) base = i + 1 indexes.append(i) return indexes def all_matching_indexes(knowledge_base, collection, element): indexes = [] assert("groups" in element) element = element["groups"] for i, instance in enumerate(collection): if isinstance(instance, dict): instance = instance["groups"] elif instance in knowledge_base.knowledge: instance = knowledge_base.knowledge[instance]["groups"] intersection = set(instance) & set(element) if (len(intersection) > 0 or (0 == len(instance) == len(element))): indexes.append((i, intersection)) return [x[0] for x in sorted(indexes, key=lambda x: len(x[1]), reverse=True)] def element_matches_groups(knowledge, element: Dict, groups): if isinstance(groups, str) and groups in knowledge: return len(knowledge[groups].get("groups", set()) & element['groups']) > 0 elif isinstance(groups, dict): return len(element.get("groups", set()) & element['groups']) > 0 return False def find_bounds(knowledge, matcher, similar_matcher): start_bounds = [] for i, element in enumerate(matcher): if element in similar_matcher: break else: start_bounds.append((i, element)) end_bounds = [] for i, element in enumerate(matcher[::-1]): in_similar = False if isinstance(element, str): in_similar = element in similar_matcher elif isinstance(element, dict): in_similar = any(map(lambda groups: element_matches_groups(knowledge.knowledge, element, groups), similar_matcher)) if in_similar: break else: end_bounds.append((len(matcher) - (i + 1), element)) return start_bounds, end_bounds def get_similar_tree(knowledge_base, atom, tokens): possibilities = [] # Find matching possibilities for entry, tree in knowledge_base.trained: if not is_bottom_level(tree): continue if tree[0] == atom[0]: possibilities.append((entry, tree)) # Sort by more matching elements sorted_possibilities = [] for (raw, possibility) in possibilities: resolved = [] for element in atom: if isinstance(element, str): resolved.append(element) else: resolved.append(knowledge_evaluation.resolve( knowledge_base.knowledge, element, raw)) # TODO: Probably should take into account the categories of the elements in the "intake" ([0]) element atom_score = sum([resolved[i] == atom[i] for i in range(min(len(resolved), len(atom)))]) token_score = sum([similar_token in tokens for similar_token in raw]) sorted_possibilities.append((raw, possibility, resolved, atom_score, token_score)) sorted_possibilities = sorted(sorted_possibilities, key=lambda p: p[3] * 100 + p[4], reverse=True) if len(sorted_possibilities) < 1: return None for i, possibility in enumerate(sorted_possibilities): similar_matcher, similar_result, similar_result_resolved, _atom_score, _token_score = possibility with session().log("Like {}".format(similar_matcher)): session().annotate('AST: {}'.format(similar_result)) session().annotate('Results on: {}'.format(similar_result_resolved)) session().annotate('Atom score: {}'.format(_atom_score)) session().annotate('Token score: {}'.format(_token_score)) return sorted_possibilities # TODO: unroll this mess def get_matching(sample, other): l = len(sample[0]) other = list(filter(lambda x: len(x[0]) == l, other)) for i in range(l): if len(other) == 0: return [] if isinstance(sample[0][i], dict): # Dictionaries are compared by groups other = list(filter(lambda x: isinstance(x[0][i], dict) and len(x[0][i]['groups'] & sample[0][i]['groups']) > 0, other)) elif isinstance(sample[0][i], tuple): # Tuples are compared by types [0] other = list(filter(lambda x: isinstance(x[0][i], tuple) and x[0][i][0] == sample[0][i][0], other)) matching = [] for x in range(l): # Generate the combination of this and other(s) matcher first_sample_data = sample[0][x] if isinstance(first_sample_data, str): matching.append(first_sample_data) elif isinstance(first_sample_data, tuple): matching.append(first_sample_data) else: this_groups = sample[0][x]['groups'] if len(other) > 0: other_groups = reduce(lambda a, b: a & b, map(lambda y: y[0][x]['groups'], other)) this_groups = this_groups & other_groups matching.append({'groups': this_groups}) return matching def reprocess_language_knowledge(knowledge_base, examples): examples = knowledge_base.examples + examples pattern_examples = [] for i, sample in enumerate(examples): other = examples[:i] + examples[i + 1:] match = get_matching(sample, other) if len(match) > 0: sample = (match, sample[1],) pattern_examples.append(sample) return pattern_examples def reverse_remix(tree_section, remix): result_section = [] offset = 0 for origin in remix: if isinstance(origin, int): if (origin + offset) >= len(tree_section): return None result_section.append(copy.deepcopy(tree_section[origin + offset])) else: assert(isinstance(origin, str)) offset += 1 return result_section + tree_section[len(remix):] def get_fit(knowledge, tokens, remaining_recursions=parameters.MAX_RECURSIONS): results = [] for matcher, ast in knowledge.trained: with session().log("{} <- {}".format(matcher, tokens)): result = match_fit(knowledge, tokens, matcher, ast, remaining_recursions) if result is not None: with session().log("Result: {}".format(result)): results.append(result) if len(results) > 0: return results[0] def is_definite_minisegment(minisegment): return isinstance(minisegment, str) or isinstance(minisegment, dict) def match_token(knowledge, next_token, minisegment): if isinstance(minisegment, dict): return knowledge_evaluation.can_be_used_in_place(knowledge, next_token, minisegment) elif isinstance(minisegment, str): # TODO: check if the two elements can be used in each other place return next_token == minisegment return False def resolve_fit(knowledge, fit, remaining_recursions): fitted = [] for element in fit: if is_definite_minisegment(element): fitted.append(element) else: with session().log("Resolving fit of `{}`".format(element)): ((result_type, remixer), tokens) = element remixed_tokens = reverse_remix(tokens, remixer) if remixed_tokens is None: return None minifit = get_fit(knowledge, remixed_tokens, remaining_recursions - 1) if minifit is None: return None minitokens, miniast = minifit session().annotate(" AST | {}".format(miniast)) subproperty = knowledge_evaluation.resolve(knowledge.knowledge, minitokens, miniast) fitted.append(subproperty) return fitted def match_fit(knowledge, tokens, matcher, ast, remaining_recursions): segment_possibilities = [([], tokens)] # Matched tokens, remaining tokens indent = ' ' * (parameters.MAX_RECURSIONS - remaining_recursions) session().annotate(indent + 'T> {}'.format(tokens)) session().annotate(indent + 'M> {}'.format(matcher)) for minisegment in matcher: with session().log("Minisegment `{}`".format(minisegment)): possibilities_after_round = [] for matched_tokens, remaining_tokens in segment_possibilities: if len(remaining_tokens) < 1: continue session().annotate(indent + "RT {}".format(remaining_tokens[0])) session().annotate(indent + "DEF {}".format(is_definite_minisegment(minisegment))) if is_definite_minisegment(minisegment): # What if not match -----< if match_token(knowledge, remaining_tokens[0], minisegment): possibilities_after_round.append(( matched_tokens + [remaining_tokens[0]], remaining_tokens[1:] )) else: # What if not match!!!!!!-----< # TODO: optimize this with a look ahead for i in range(1, len(tokens)): possibilities_after_round.append(( matched_tokens + [(minisegment, remaining_tokens[:i])], remaining_tokens[i:] )) session().annotate(indent + "## PA {}".format(possibilities_after_round)) else: segment_possibilities = possibilities_after_round for possibility in segment_possibilities: with session().log("Possibility: `{}`".format(possibility)): pass if len(segment_possibilities) < 1: with session().log("NO POSSIBLE"): pass fully_matched_segments = [(matched, remaining) for (matched, remaining) in segment_possibilities if len(remaining) == 0] resolved_fits = [] with session().log("Full matches"): for fit, _ in fully_matched_segments: with session().log(fit): # REMIXES HAVE TO BE APPLIED BEFORE!!! pass with session().log("Resolutions"): for fit, _ in fully_matched_segments: with session().log("Resolving {}".format(fit)): # REMIXES HAVE TO BE APPLIED BEFORE!!! resolved_fit = resolve_fit(knowledge, fit, remaining_recursions) if resolved_fit is not None: resolved_fits.append(resolved_fit) else: session().annotate("Not resolved") if len(resolved_fits) == 0: return None return resolved_fits[0], ast