596 lines
20 KiB
Python
596 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
|
|
MARKDOWN_EXTENSION = '.md'
|
|
EXTENSIONS = [
|
|
MARKDOWN_EXTENSION,
|
|
]
|
|
|
|
MARKDOWN_EXTRA_FEATURES = [
|
|
# See more in: https://python-markdown.github.io/extensions/
|
|
'markdown.extensions.fenced_code',
|
|
'markdown.extensions.codehilite',
|
|
'markdown.extensions.extra',
|
|
]
|
|
|
|
import copy
|
|
import json
|
|
import logging
|
|
import sys
|
|
import os
|
|
import datetime
|
|
import shutil
|
|
import traceback
|
|
import time
|
|
import re
|
|
from typing import List
|
|
|
|
from bs4 import BeautifulSoup as bs4
|
|
import bs4 as BeautifulSoup
|
|
import jinja2
|
|
import inotify.adapters
|
|
import yaml
|
|
import markdown
|
|
from unidecode import unidecode
|
|
|
|
SUMMARIZE_MAX_TOKENS = 1000
|
|
ITEMS_IN_RSS = 50
|
|
|
|
NIKOLA_DATE_RE = re.compile(r'^([0-2]\d|30|31)\.(0\d|1[012])\.(\d{4}), (\d{1,2}):(\d{2})$')
|
|
|
|
COMPLETE_DATE_RE = re.compile(r'^(\d{4})-(0\d|1[012])-([0-2]\d|30|31) '
|
|
+ r'(\d{2}):(\d{2})(:\d{2})( .+)?$')
|
|
SLUG_HYPHENATE_RE = re.compile(r'[\s\-]+')
|
|
SLUG_REMOVE_RE = re.compile(r'[^\s\-a-zA-Z0-9]*')
|
|
|
|
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
STATIC_PATH = os.path.join(ROOT_DIR, 'static')
|
|
ARTICLE_TEMPLATE_NAME = 'article.tmpl.html'
|
|
BLOG_INDEX_TEMPLATE_NAME = 'blog_index.tmpl.html'
|
|
CATEGORY_LIST_TEMPLATE_NAME = 'category_list.tmpl.html'
|
|
ARTICLE_LIST_TEMPLATE_NAME = 'article_list.tmpl.html'
|
|
RSS_TEMPLATE_NAME = 'rss.tmpl.xml'
|
|
BLOG_INDEX_PAGE_SIZE = 10
|
|
|
|
STATIC_RESOURCES = (
|
|
('style.css', 'css/style.css'),
|
|
('light-syntax.css', 'css/light-syntax.css'),
|
|
('dark-syntax.css', 'css/dark-syntax.css', ('@media (prefers-color-scheme: dark) {\n', '\n}')),
|
|
)
|
|
|
|
JINJA_ENV = jinja2.Environment(
|
|
loader=jinja2.FileSystemLoader(STATIC_PATH),
|
|
autoescape=jinja2.select_autoescape()
|
|
)
|
|
|
|
def update_statics():
|
|
global ARTICLE_TEMPLATE
|
|
ARTICLE_TEMPLATE = JINJA_ENV.get_template(ARTICLE_TEMPLATE_NAME)
|
|
global BLOG_INDEX_TEMPLATE
|
|
BLOG_INDEX_TEMPLATE = JINJA_ENV.get_template(BLOG_INDEX_TEMPLATE_NAME)
|
|
global CATEGORY_LIST_TEMPLATE
|
|
CATEGORY_LIST_TEMPLATE = JINJA_ENV.get_template(CATEGORY_LIST_TEMPLATE_NAME)
|
|
global ARTICLE_LIST_TEMPLATE
|
|
ARTICLE_LIST_TEMPLATE = JINJA_ENV.get_template(ARTICLE_LIST_TEMPLATE_NAME)
|
|
global RSS_TEMPLATE
|
|
RSS_TEMPLATE = JINJA_ENV.get_template(RSS_TEMPLATE_NAME)
|
|
|
|
update_statics()
|
|
|
|
MONITORED_EVENT_TYPES = (
|
|
'IN_CREATE',
|
|
# 'IN_MODIFY',
|
|
'IN_CLOSE_WRITE',
|
|
'IN_DELETE',
|
|
'IN_MOVED_FROM',
|
|
'IN_MOVED_TO',
|
|
'IN_DELETE_SELF',
|
|
'IN_MOVE_SELF',
|
|
)
|
|
LANG_PRIORITY = ('en', 'es', 'gl')
|
|
|
|
|
|
def parse_nikola_date(match):
|
|
return datetime.datetime(year=int(match.group(3)),
|
|
month=int(match.group(2)),
|
|
day=int(match.group(1)),
|
|
hour=int(match.group(4)),
|
|
minute=int(match.group(5)),
|
|
# Note this final assumption is not good
|
|
# and might get you in trouble if trying
|
|
# to sort closely-published posts
|
|
# when others are in complete-date format
|
|
tzinfo=datetime.timezone.utc,
|
|
)
|
|
|
|
|
|
def parse_complete_date(match):
|
|
return datetime.datetime.strptime(match.group(0), '%Y-%m-%d %H:%M:%S %Z%z')
|
|
|
|
def split_tags(tags: str) -> List[str]:
|
|
if isinstance(tags, str):
|
|
return [tag.strip() for tag in tags.split(',')]
|
|
elif isinstance(tags, list):
|
|
return tags
|
|
else:
|
|
raise NotImplementedError("Unknown tag type: {}".format(type(tags)))
|
|
|
|
def slugify(title):
|
|
"""
|
|
Made for compatibility with Nikola's slugify within CodigoParaLlevar blog.
|
|
"""
|
|
slug = unidecode(title).lower()
|
|
slug = SLUG_REMOVE_RE.sub('', slug)
|
|
slug = SLUG_HYPHENATE_RE.sub('-', slug)
|
|
|
|
return slug.strip()
|
|
|
|
|
|
def read_markdown(path):
|
|
with open(path, 'rt') as f:
|
|
data = f.read()
|
|
if data.startswith('---'):
|
|
start = data.index('\n')
|
|
if '---\n' not in data[start:]:
|
|
raise Exception('Front matter not finished on: {}'.format(path))
|
|
front_matter_str, content = data[start:].split('---\n', 1)
|
|
front_matter = yaml.load(front_matter_str, Loader=yaml.SafeLoader)
|
|
else:
|
|
raise Exception('Front matter is needed for proper rendering. Not found on: {}'.format(
|
|
path
|
|
))
|
|
doc = markdown.markdown(content, extensions=MARKDOWN_EXTRA_FEATURES)
|
|
return doc, front_matter
|
|
|
|
|
|
def get_out_path(front_matter):
|
|
if 'date' in front_matter:
|
|
if m := NIKOLA_DATE_RE.match(front_matter['date']):
|
|
front_matter['date'] = parse_nikola_date(m)
|
|
elif m := COMPLETE_DATE_RE.match(front_matter['date']):
|
|
front_matter['date'] = parse_complete_date(m)
|
|
else:
|
|
raise NotImplementedError('Unknown date format: {}'.format(
|
|
front_matter['date']))
|
|
else:
|
|
raise Exception('No date found on: {}'.format(
|
|
path
|
|
))
|
|
|
|
if 'slug' not in front_matter:
|
|
if 'title' not in front_matter:
|
|
raise Exception('No title found on: {}'.format(
|
|
path
|
|
))
|
|
|
|
front_matter['slug'] = slugify(front_matter['title'])
|
|
|
|
out_path = os.path.join(str(front_matter['date'].year), front_matter['slug'])
|
|
if front_matter.get('lang', LANG_PRIORITY[0]) != LANG_PRIORITY[0]:
|
|
out_path = os.path.join(str(front_matter['date'].year), front_matter['lang'], front_matter['slug'])
|
|
return out_path
|
|
|
|
|
|
def load_all(top_dir_relative):
|
|
top = os.path.abspath(top_dir_relative)
|
|
|
|
docs = {}
|
|
|
|
count = 0
|
|
for root, dirs, files in os.walk(top):
|
|
for name in files:
|
|
if all([not name.endswith(ext) for ext in EXTENSIONS]):
|
|
# The logic is negative... but it works
|
|
continue
|
|
|
|
if name.endswith(MARKDOWN_EXTENSION):
|
|
path = os.path.join(root, name)
|
|
doc, front_matter = read_markdown(path)
|
|
out_path = get_out_path(front_matter)
|
|
docs[path] = (doc, front_matter, out_path)
|
|
print('\rLoading posts... {}'.format(count), end='', flush=True)
|
|
count += 1
|
|
else:
|
|
raise NotImplementedError('Unknown filetype: {}'.format(name))
|
|
|
|
print(" [DONE]")
|
|
return docs
|
|
|
|
|
|
def load_doc(filepath):
|
|
doc, front_matter = read_markdown(filepath)
|
|
out_path = get_out_path(front_matter)
|
|
return (doc, front_matter, out_path)
|
|
|
|
|
|
def render_article(doc, front_matter, f, out_path):
|
|
extsep = '/' if '/' in out_path else '\\'
|
|
subdirs = len(out_path.split(extsep))
|
|
base_path = os.path.join(*(['..'] * subdirs))
|
|
result = ARTICLE_TEMPLATE.render(
|
|
content=doc,
|
|
title=front_matter['title'],
|
|
post_publication_date=front_matter['date'],
|
|
post_tags=split_tags(front_matter['tags']),
|
|
base_path=base_path,
|
|
)
|
|
f.write(result)
|
|
|
|
def summarize(doc):
|
|
tree = bs4(doc, features='lxml')
|
|
|
|
html = list(tree.children)[0]
|
|
body = list(html.children)[0]
|
|
|
|
comments = tree.find_all(string=lambda text: isinstance(text, BeautifulSoup.Comment))
|
|
|
|
teaser_end = None
|
|
for comment in comments:
|
|
if 'TEASER_END' in comment:
|
|
teaser_end = comment
|
|
break
|
|
|
|
if 'gnucash' in doc:
|
|
assert teaser_end is not None
|
|
|
|
def recur_select_to_summarize(source, dest, num_tokens):
|
|
for item in source.children:
|
|
if num_tokens + len(item.text) < SUMMARIZE_MAX_TOKENS:
|
|
# All source fits
|
|
num_tokens += len(item.text)
|
|
dest.append(item)
|
|
|
|
else:
|
|
if not isinstance(item, BeautifulSoup.NavigableString):
|
|
# Let's take as much source as we can and then stop
|
|
subsect = bs4()
|
|
recur_select_to_summarize(item, subsect, num_tokens)
|
|
|
|
if len(list(subsect.children)) > 0:
|
|
dest.append(subsect)
|
|
break
|
|
|
|
def cut_after_element(reference):
|
|
while reference.next_sibling is None:
|
|
if reference.parent is None:
|
|
logging.warning("Reached root when looking for cutting point for teaser. Doc: {}".format(doc[:100]))
|
|
return
|
|
reference = reference.parent
|
|
|
|
nxt = reference.next_sibling
|
|
while nxt is not None:
|
|
was = nxt
|
|
if reference.next_sibling is not None:
|
|
# Move to the "right"
|
|
nxt = reference.next_sibling
|
|
|
|
else:
|
|
# Move "up and right"
|
|
nxt = reference.parent
|
|
if nxt is not None:
|
|
nxt = nxt.next_sibling
|
|
was.extract()
|
|
|
|
if teaser_end is None:
|
|
result = bs4()
|
|
|
|
recur_select_to_summarize(body, result, 0)
|
|
else:
|
|
summary = copy.copy(body)
|
|
comments = summary.find_all(string=lambda text: isinstance(text, BeautifulSoup.Comment))
|
|
|
|
teaser_end = None
|
|
for comment in comments:
|
|
if 'TEASER_END' in comment:
|
|
teaser_end = comment
|
|
break
|
|
assert teaser_end is not None, 'Error finding teaser end on copy'
|
|
|
|
cut_after_element(teaser_end)
|
|
result = bs4()
|
|
for child in summary.children:
|
|
result.append(child)
|
|
|
|
# Update summary links and hrefs
|
|
for v in result.find_all('video') + result.find_all('image'):
|
|
if 'src' in v.attrs and ':' not in v['src']:
|
|
v['src'] = '/blog/' + v['src'].lstrip('/')
|
|
|
|
for v in result.find_all('a'):
|
|
if 'href' in v.attrs and ':' not in v['href']:
|
|
v['href'] = '/blog/' + v['href'].lstrip('/')
|
|
|
|
return result
|
|
|
|
def render_index(docs, dest_top):
|
|
# Collect all languages accepted for all docs
|
|
docs_by_slug = {}
|
|
for (doc, front_matter, out_path) in docs.values():
|
|
if front_matter['slug'] not in docs_by_slug:
|
|
docs_by_slug[front_matter['slug']] = {}
|
|
docs_by_slug[front_matter['slug']][front_matter.get('lang', LANG_PRIORITY[0])] = (doc, front_matter, out_path)
|
|
|
|
# Remove duplicated for langs with less priority
|
|
selected_docs = []
|
|
for (doc, front_matter, out_path) in docs.values():
|
|
langs = docs_by_slug[front_matter['slug']]
|
|
lang_priority = LANG_PRIORITY.index(front_matter.get('lang', LANG_PRIORITY[0]))
|
|
min_lang_priority = min([
|
|
LANG_PRIORITY.index(lang)
|
|
for lang in langs.keys()
|
|
])
|
|
if lang_priority == min_lang_priority:
|
|
selected_docs.append((doc, front_matter, out_path, langs))
|
|
|
|
docs = sorted(selected_docs, key=lambda x: x[1]['date'], reverse=True)
|
|
|
|
index_ranges = range(0, len(docs), BLOG_INDEX_PAGE_SIZE)
|
|
|
|
for off in index_ranges:
|
|
page = docs[off: off + BLOG_INDEX_PAGE_SIZE]
|
|
|
|
posts = [
|
|
{
|
|
"doc": doc,
|
|
"title": front_matter['title'],
|
|
"post_publication_date": front_matter['date'],
|
|
"post_tags": split_tags(front_matter['tags']),
|
|
"summary": summarize(doc),
|
|
"link": out_path.rstrip('/') + '/',
|
|
}
|
|
for (doc, front_matter, out_path, _alternatives) in page
|
|
]
|
|
|
|
prev_index_num = None
|
|
next_index_num = off // BLOG_INDEX_PAGE_SIZE + 1
|
|
if off > 0:
|
|
prev_index_num = off // BLOG_INDEX_PAGE_SIZE - 1
|
|
if next_index_num >= len(index_ranges):
|
|
next_index_num = None
|
|
|
|
result = BLOG_INDEX_TEMPLATE.render(
|
|
posts=posts,
|
|
prev_index_num=prev_index_num,
|
|
next_index_num=next_index_num,
|
|
)
|
|
|
|
if off == 0:
|
|
fname = 'index.html'
|
|
else:
|
|
fname = 'index-{}.html'.format(off // BLOG_INDEX_PAGE_SIZE)
|
|
with open(os.path.join(dest_top, fname), 'wt') as f:
|
|
f.write(result)
|
|
|
|
def render_categories(docs, dest_top):
|
|
categories = {}
|
|
for (doc, front_matter, out_path) in docs.values():
|
|
for tag in split_tags(front_matter['tags']):
|
|
if tag not in categories:
|
|
categories[tag] = []
|
|
categories[tag].append((doc, front_matter, out_path))
|
|
|
|
print("Found {} tags".format(len(categories), categories))
|
|
for tag, docs in categories.items():
|
|
docs = sorted(docs, key=lambda x: x[1]['date'], reverse=True)
|
|
|
|
posts = [
|
|
{
|
|
# "doc": doc,
|
|
"title": front_matter['title'],
|
|
"post_publication_date": front_matter['date'],
|
|
"post_tags": split_tags(front_matter['tags']),
|
|
# "summary": summarize(doc),
|
|
"link": out_path.rstrip('/') + '/',
|
|
}
|
|
for (doc, front_matter, out_path) in docs
|
|
]
|
|
|
|
result = CATEGORY_LIST_TEMPLATE.render(
|
|
posts=posts,
|
|
)
|
|
path = os.path.join(dest_top, "tags", tag, "index.html")
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
with open(path, 'wt') as f:
|
|
f.write(result)
|
|
|
|
def render_archive(docs, dest_top):
|
|
docs = sorted(docs.values(), key=lambda x: x[1]['date'], reverse=True)
|
|
|
|
posts = [
|
|
{
|
|
# "doc": doc,
|
|
"title": front_matter['title'],
|
|
"post_publication_date": front_matter['date'],
|
|
"post_tags": split_tags(front_matter['tags']),
|
|
# "summary": summarize(doc),
|
|
"link": out_path.rstrip('/') + '/',
|
|
}
|
|
for (doc, front_matter, out_path) in docs
|
|
]
|
|
|
|
result = ARTICLE_LIST_TEMPLATE.render(
|
|
posts=posts,
|
|
)
|
|
path = os.path.join(dest_top, "articles", "index.html")
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
with open(path, 'wt') as f:
|
|
f.write(result)
|
|
|
|
def render_rss(docs, dest_top):
|
|
# Collect all languages accepted for all docs
|
|
docs_by_slug = {}
|
|
for (doc, front_matter, out_path) in docs.values():
|
|
if front_matter['slug'] not in docs_by_slug:
|
|
docs_by_slug[front_matter['slug']] = {}
|
|
docs_by_slug[front_matter['slug']][front_matter.get('lang', LANG_PRIORITY[0])] = (doc, front_matter, out_path)
|
|
|
|
# Remove duplicated for langs with less priority
|
|
selected_docs = []
|
|
for (doc, front_matter, out_path) in docs.values():
|
|
langs = docs_by_slug[front_matter['slug']]
|
|
lang_priority = LANG_PRIORITY.index(front_matter.get('lang', LANG_PRIORITY[0]))
|
|
min_lang_priority = min([
|
|
LANG_PRIORITY.index(lang)
|
|
for lang in langs.keys()
|
|
])
|
|
if lang_priority == min_lang_priority:
|
|
selected_docs.append((doc, front_matter, out_path, langs))
|
|
|
|
docs = sorted(selected_docs, key=lambda x: x[1]['date'], reverse=True)
|
|
|
|
posts = [
|
|
{
|
|
# "doc": doc,
|
|
"title": front_matter['title'],
|
|
"post_publication_date": front_matter['date'],
|
|
"post_tags": split_tags(front_matter['tags']),
|
|
"summary": summarize(doc),
|
|
"link": out_path.rstrip('/') + '/',
|
|
}
|
|
for (doc, front_matter, out_path, langs) in docs[:ITEMS_IN_RSS]
|
|
]
|
|
|
|
result = RSS_TEMPLATE.render(
|
|
posts=posts,
|
|
last_build_date=datetime.datetime.utcnow(),
|
|
)
|
|
path = os.path.join(dest_top, "rss.xml")
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
with open(path, 'wt') as f:
|
|
f.write(result)
|
|
|
|
|
|
def regen_all(source_top, dest_top, docs=None):
|
|
if docs is None:
|
|
docs = load_all(source_top)
|
|
|
|
# Render posts
|
|
for (doc, front_matter, out_path) in docs.values():
|
|
doc_full_path = os.path.join(dest_top, out_path)
|
|
os.makedirs(os.path.dirname(doc_full_path), exist_ok=True)
|
|
# print("==", doc_full_path)
|
|
full_out_path = doc_full_path + '/index.html'
|
|
os.makedirs(os.path.dirname(full_out_path), exist_ok=True)
|
|
with open(full_out_path, 'wt') as f:
|
|
try:
|
|
render_article(doc, front_matter, f, out_path)
|
|
except:
|
|
logging.error(traceback.format_exc())
|
|
logging.error("Rendering failed 😿")
|
|
continue
|
|
|
|
# Render statics
|
|
for static in STATIC_RESOURCES:
|
|
src_path = static[0]
|
|
dest_path = static[1]
|
|
|
|
if len(static) > 2:
|
|
before, after = static[2]
|
|
else:
|
|
before, after = '', ''
|
|
target_dest = os.path.join(dest_top, dest_path)
|
|
os.makedirs(os.path.dirname(target_dest), exist_ok=True)
|
|
with open(os.path.join(STATIC_PATH, src_path), 'rt') as src:
|
|
data = before + src.read() + after
|
|
|
|
with open(target_dest, 'wt') as f:
|
|
f.write(data)
|
|
|
|
# Render index
|
|
render_index(docs, dest_top)
|
|
|
|
# Render categories
|
|
render_categories(docs, dest_top)
|
|
|
|
# Render archive
|
|
render_archive(docs, dest_top)
|
|
|
|
# Render RSS
|
|
render_rss(docs, dest_top)
|
|
|
|
return docs
|
|
|
|
|
|
def main(source_top, dest_top):
|
|
notifier = inotify.adapters.InotifyTrees([source_top, STATIC_PATH])
|
|
|
|
## Initial load
|
|
t0 = time.time()
|
|
logging.info("Initial load...")
|
|
docs = regen_all(source_top, dest_top)
|
|
logging.info("Initial load completed in {:.2f}s".format(time.time() - t0))
|
|
|
|
## Updating
|
|
for event in notifier.event_gen(yield_nones=False):
|
|
(ev, types, directory, file) = event
|
|
if not any([type in MONITORED_EVENT_TYPES for type in types]):
|
|
continue
|
|
filepath = os.path.join(directory, file)
|
|
if filepath.startswith(STATIC_PATH):
|
|
t0 = time.time()
|
|
try:
|
|
update_statics()
|
|
except:
|
|
logging.error(traceback.format_exc())
|
|
logging.error("Loading new templates failed 😿")
|
|
continue
|
|
|
|
is_static_resource = False
|
|
for static in STATIC_RESOURCES:
|
|
src_path = static[0]
|
|
dest_path = static[1]
|
|
if file == os.path.basename(src_path):
|
|
is_static_resource = True
|
|
|
|
if len(static) > 2:
|
|
before, after = static[2]
|
|
else:
|
|
before, after = '', ''
|
|
target_dest = os.path.join(dest_top, dest_path)
|
|
os.makedirs(os.path.dirname(target_dest), exist_ok=True)
|
|
with open(os.path.join(STATIC_PATH, src_path), 'rt') as src:
|
|
data = before + src.read() + after
|
|
|
|
with open(target_dest, 'wt') as f:
|
|
f.write(data)
|
|
|
|
if is_static_resource:
|
|
logging.info("Updated static resources in {:.2f}s".format(time.time() - t0))
|
|
else:
|
|
docs = regen_all(source_top, dest_top, docs)
|
|
logging.info("Updated all in {:.2f}s".format(time.time() - t0))
|
|
|
|
else:
|
|
try:
|
|
print("Reloading: {}".format(filepath))
|
|
(doc, front_matter, out_path) = load_doc(filepath)
|
|
except:
|
|
logging.error(traceback.format_exc())
|
|
logging.error("Skipping update 😿")
|
|
continue
|
|
|
|
t0 = time.time()
|
|
docs[filepath] = (doc, front_matter, out_path)
|
|
doc_full_path = os.path.join(dest_top, out_path)
|
|
print("Updated: {}.html".format(doc_full_path))
|
|
os.makedirs(os.path.dirname(doc_full_path), exist_ok=True)
|
|
# print("==", doc_full_path)
|
|
with open(doc_full_path + '/index.html', 'wt') as f:
|
|
try:
|
|
render_article(doc, front_matter, f, out_path)
|
|
except:
|
|
logging.error(traceback.format_exc())
|
|
logging.error("Rendering failed 😿")
|
|
continue
|
|
|
|
logging.info("Updated all in {:.2f}s".format(time.time() - t0))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) != 3:
|
|
print("Usage: {} SOURCE_TOP DEST_TOP".format(sys.argv[0]))
|
|
exit(0)
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(levelname)-8s %(message)s")
|
|
main(sys.argv[1], sys.argv[2])
|