90 lines
3.1 KiB
Python
90 lines
3.1 KiB
Python
from ..session.org_mode import global_session as session
|
|
from ..atoms import Atom
|
|
from . import tokenization
|
|
import random
|
|
import copy
|
|
|
|
def randomized_weighted_list(elements):
|
|
# Randomized
|
|
randomized = list(elements)
|
|
random.shuffle(randomized)
|
|
|
|
# And return only once
|
|
already_returned = set()
|
|
for e in randomized:
|
|
if e in already_returned:
|
|
continue
|
|
|
|
yield e
|
|
already_returned.add(e)
|
|
|
|
class TokenizationLayer:
|
|
def __init__(self, knowledge_base):
|
|
self.structural_elements = set()
|
|
self.token_chains = {}
|
|
self.tokens = set()
|
|
self.knowledge_base = knowledge_base
|
|
self.knowledge = knowledge_base.knowledge
|
|
|
|
def integrate(self, knowledge_base, data):
|
|
assert knowledge_base is self.knowledge_base
|
|
|
|
assert 'text' in data
|
|
tokens = self.tokenize(data['text'])
|
|
data_with_row = copy.copy(data)
|
|
data_with_row['tokens'] = tokens
|
|
yield data_with_row
|
|
|
|
# with session().log("Tokenize: {}".format(data['text'])):
|
|
# for tokens in tokenization.to_tokens(self, data['text']):
|
|
# data_with_row = copy.copy(data)
|
|
# data_with_row['tokens'] = tokens
|
|
# yield data_with_row
|
|
|
|
def process(self, knowledge_base, row):
|
|
yield self.tokenize(row)
|
|
|
|
|
|
def tokenize(self, row, return_one=True):
|
|
row = row.lower()
|
|
with session().log("Tokenize: {}".format(row)):
|
|
options = list(tokenization.to_tokens(self, row))
|
|
session().log("Results:\n{}".format('\n'.join(map(str, options))))
|
|
|
|
if return_one:
|
|
chosen = tokenization.pick_one_tokenization(options, self)
|
|
session().log("Chosen: “{}”".format(chosen))
|
|
self.train({'text': row, 'tokens': chosen})
|
|
return chosen
|
|
return options
|
|
|
|
## Tokenization
|
|
def add_token_pair(self, precedent, consequent):
|
|
self.add_token(precedent)
|
|
self.add_token(consequent)
|
|
|
|
if precedent not in self.token_chains:
|
|
self.token_chains[precedent] = []
|
|
self.token_chains[precedent].append(consequent)
|
|
|
|
def add_token(self, token):
|
|
self.tokens.add(token)
|
|
if (not isinstance(token, Atom)) and (token not in self.structural_elements):
|
|
session().annotate('Found new structural element “{}”'.format(token))
|
|
self.structural_elements.add(token)
|
|
|
|
def expected_token_after_precedent(self, precedent=None):
|
|
if precedent not in self.token_chains: # If there's no known precedent, just return all tokens
|
|
return randomized_weighted_list(self.tokens)
|
|
|
|
return randomized_weighted_list(self.token_chains[precedent])
|
|
|
|
def train(self, example):
|
|
with session().log('Training tokenizer'):
|
|
session().annotate("Example: {}".format(example))
|
|
tokens = tokenization.integrate_tokenization(self, example)
|
|
|
|
# Integrate knowledge of concept
|
|
for token in tokens:
|
|
if not token in self.knowledge:
|
|
self.knowledge[token] = {} |