new-codigoparallevar/scripts/blog.py
2023-10-04 00:19:39 +02:00

635 lines
21 KiB
Python

#!/usr/bin/env python3
MARKDOWN_EXTENSION = '.md'
EXTENSIONS = [
MARKDOWN_EXTENSION,
]
MARKDOWN_EXTRA_FEATURES = [
# See more in: https://python-markdown.github.io/extensions/
'markdown.extensions.fenced_code',
'markdown.extensions.codehilite',
'markdown.extensions.extra',
]
import copy
import json
import logging
import sys
import os
import datetime
import shutil
import traceback
import time
import re
import sqlite3
from typing import List
from bs4 import BeautifulSoup as bs4
import bs4 as BeautifulSoup
import jinja2
import inotify.adapters
import yaml
import markdown
from unidecode import unidecode
SUMMARIZE_MAX_TOKENS = 1000
ITEMS_IN_RSS = 50
NIKOLA_DATE_RE = re.compile(r'^([0-2]\d|30|31)\.(0\d|1[012])\.(\d{4}), (\d{1,2}):(\d{2})$')
COMPLETE_DATE_RE = re.compile(r'^(\d{4})-(0\d|1[012])-([0-2]\d|30|31) '
+ r'(\d{2}):(\d{2})(:\d{2})( .+)?$')
SLUG_HYPHENATE_RE = re.compile(r'[\s\-]+')
SLUG_REMOVE_RE = re.compile(r'[^\s\-a-zA-Z0-9]*')
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
STATIC_PATH = os.path.join(ROOT_DIR, 'static')
ARTICLE_TEMPLATE_NAME = 'article.tmpl.html'
BLOG_INDEX_TEMPLATE_NAME = 'blog_index.tmpl.html'
CATEGORY_LIST_TEMPLATE_NAME = 'category_list.tmpl.html'
ARTICLE_LIST_TEMPLATE_NAME = 'article_list.tmpl.html'
RSS_TEMPLATE_NAME = 'rss.tmpl.xml'
BLOG_INDEX_PAGE_SIZE = 10
STATIC_RESOURCES = (
('style.css', 'css/style.css'),
('light-syntax.css', 'css/light-syntax.css'),
('dark-syntax.css', 'css/dark-syntax.css', ('@media (prefers-color-scheme: dark) {\n', '\n}')),
)
JINJA_ENV = jinja2.Environment(
loader=jinja2.FileSystemLoader(STATIC_PATH),
autoescape=jinja2.select_autoescape()
)
PARSER_NAMESPACE = 'codigoparallevar.com/blog'
WATCH = True
if os.getenv('WATCH_AND_REBUILD', '1') == '0':
WATCH = False
def update_statics():
global ARTICLE_TEMPLATE
ARTICLE_TEMPLATE = JINJA_ENV.get_template(ARTICLE_TEMPLATE_NAME)
global BLOG_INDEX_TEMPLATE
BLOG_INDEX_TEMPLATE = JINJA_ENV.get_template(BLOG_INDEX_TEMPLATE_NAME)
global CATEGORY_LIST_TEMPLATE
CATEGORY_LIST_TEMPLATE = JINJA_ENV.get_template(CATEGORY_LIST_TEMPLATE_NAME)
global ARTICLE_LIST_TEMPLATE
ARTICLE_LIST_TEMPLATE = JINJA_ENV.get_template(ARTICLE_LIST_TEMPLATE_NAME)
global RSS_TEMPLATE
RSS_TEMPLATE = JINJA_ENV.get_template(RSS_TEMPLATE_NAME)
update_statics()
MONITORED_EVENT_TYPES = (
'IN_CREATE',
# 'IN_MODIFY',
'IN_CLOSE_WRITE',
'IN_DELETE',
'IN_MOVED_FROM',
'IN_MOVED_TO',
'IN_DELETE_SELF',
'IN_MOVE_SELF',
)
LANG_PRIORITY = ('en', 'es', 'gl')
def parse_nikola_date(match):
return datetime.datetime(year=int(match.group(3)),
month=int(match.group(2)),
day=int(match.group(1)),
hour=int(match.group(4)),
minute=int(match.group(5)),
# Note this final assumption is not good
# and might get you in trouble if trying
# to sort closely-published posts
# when others are in complete-date format
tzinfo=datetime.timezone.utc,
)
def parse_complete_date(match):
return datetime.datetime.strptime(match.group(0), '%Y-%m-%d %H:%M:%S %Z%z')
def split_tags(tags: str) -> List[str]:
if isinstance(tags, str):
return [tag.strip() for tag in tags.split(',')]
elif isinstance(tags, list):
return tags
else:
raise NotImplementedError("Unknown tag type: {}".format(type(tags)))
def slugify(title):
"""
Made for compatibility with Nikola's slugify within CodigoParaLlevar blog.
"""
slug = unidecode(title).lower()
slug = SLUG_REMOVE_RE.sub('', slug)
slug = SLUG_HYPHENATE_RE.sub('-', slug)
slug = slug.strip('-')
return slug.strip()
def read_markdown(path):
with open(path, 'rt') as f:
data = f.read()
if data.startswith('---'):
start = data.index('\n')
if '---\n' not in data[start:]:
raise Exception('Front matter not finished on: {}'.format(path))
front_matter_str, content = data[start:].split('---\n', 1)
front_matter = yaml.load(front_matter_str, Loader=yaml.SafeLoader)
else:
raise Exception('Front matter is needed for proper rendering. Not found on: {}'.format(
path
))
doc = markdown.markdown(content, extensions=MARKDOWN_EXTRA_FEATURES)
return doc, front_matter
def get_out_path(front_matter):
if 'date' in front_matter:
if m := NIKOLA_DATE_RE.match(front_matter['date']):
front_matter['date'] = parse_nikola_date(m)
elif m := COMPLETE_DATE_RE.match(front_matter['date']):
front_matter['date'] = parse_complete_date(m)
else:
raise NotImplementedError('Unknown date format: {}'.format(
front_matter['date']))
else:
raise Exception('No date found on: {}'.format(
path
))
if 'slug' not in front_matter:
if 'title' not in front_matter:
raise Exception('No title found on: {}'.format(
path
))
front_matter['slug'] = slugify(front_matter['title'])
out_path = os.path.join(str(front_matter['date'].year), front_matter['slug'])
if front_matter.get('lang', LANG_PRIORITY[0]) != LANG_PRIORITY[0]:
out_path = os.path.join(front_matter['lang'], str(front_matter['date'].year), front_matter['slug'])
return out_path
def create_db(path):
db = sqlite3.connect(path)
db.execute('CREATE VIRTUAL TABLE IF NOT EXISTS note_search USING fts5(note_id, title, body, top_level_title, is_done, is_todo, parser_namespace, url, tokenize="trigram");')
db.execute('DELETE FROM note_search WHERE parser_namespace = ?;', (PARSER_NAMESPACE,))
return db
def load_all(top_dir_relative):
top = os.path.abspath(top_dir_relative)
docs = {}
count = 0
for root, dirs, files in os.walk(top):
for name in files:
if all([not name.endswith(ext) for ext in EXTENSIONS]):
# The logic is negative... but it works
continue
if name.endswith(MARKDOWN_EXTENSION):
path = os.path.join(root, name)
doc, front_matter = read_markdown(path)
out_path = get_out_path(front_matter)
docs[path] = (doc, front_matter, out_path)
print('\rLoading posts... {}'.format(count), end='', flush=True)
count += 1
else:
raise NotImplementedError('Unknown filetype: {}'.format(name))
print(" [DONE]")
return docs
def load_doc(filepath):
doc, front_matter = read_markdown(filepath)
out_path = get_out_path(front_matter)
return (doc, front_matter, out_path)
def render_article(doc, front_matter, f, out_path):
extsep = '/' if '/' in out_path else '\\'
subdirs = len(out_path.split(extsep))
base_path = os.path.join(*(['..'] * subdirs))
result = ARTICLE_TEMPLATE.render(
content=doc,
title=front_matter['title'],
post_publication_date=front_matter['date'],
post_tags=split_tags(front_matter['tags']),
base_path=base_path,
)
f.write(result)
def summarize(doc):
tree = bs4(doc, features='lxml')
html = list(tree.children)[0]
body = list(html.children)[0]
comments = tree.find_all(string=lambda text: isinstance(text, BeautifulSoup.Comment))
teaser_end = None
for comment in comments:
if 'TEASER_END' in comment:
teaser_end = comment
break
if 'gnucash' in doc:
assert teaser_end is not None
def recur_select_to_summarize(source, dest, num_tokens):
for item in source.children:
if num_tokens + len(item.text) < SUMMARIZE_MAX_TOKENS:
# All source fits
num_tokens += len(item.text)
dest.append(item)
else:
if not isinstance(item, BeautifulSoup.NavigableString):
# Let's take as much source as we can and then stop
subsect = bs4()
recur_select_to_summarize(item, subsect, num_tokens)
if len(list(subsect.children)) > 0:
dest.append(subsect)
break
def cut_after_element(reference):
while reference.next_sibling is None:
if reference.parent is None:
logging.warning("Reached root when looking for cutting point for teaser. Doc: {}".format(doc[:100]))
return
reference = reference.parent
nxt = reference.next_sibling
while nxt is not None:
was = nxt
if reference.next_sibling is not None:
# Move to the "right"
nxt = reference.next_sibling
else:
# Move "up and right"
nxt = reference.parent
if nxt is not None:
nxt = nxt.next_sibling
was.extract()
if teaser_end is None:
result = bs4()
recur_select_to_summarize(body, result, 0)
else:
summary = copy.copy(body)
comments = summary.find_all(string=lambda text: isinstance(text, BeautifulSoup.Comment))
teaser_end = None
for comment in comments:
if 'TEASER_END' in comment:
teaser_end = comment
break
assert teaser_end is not None, 'Error finding teaser end on copy'
cut_after_element(teaser_end)
result = bs4()
for child in summary.children:
result.append(child)
return result
def render_index(docs, dest_top):
# Collect all languages accepted for all docs
docs_by_slug = {}
for (doc, front_matter, out_path) in docs.values():
if front_matter['slug'] not in docs_by_slug:
docs_by_slug[front_matter['slug']] = {}
docs_by_slug[front_matter['slug']][front_matter.get('lang', LANG_PRIORITY[0])] = (doc, front_matter, out_path)
# Remove duplicated for langs with less priority
selected_docs = []
for (doc, front_matter, out_path) in docs.values():
langs = docs_by_slug[front_matter['slug']]
lang_priority = LANG_PRIORITY.index(front_matter.get('lang', LANG_PRIORITY[0]))
min_lang_priority = min([
LANG_PRIORITY.index(lang)
for lang in langs.keys()
])
if lang_priority == min_lang_priority:
selected_docs.append((doc, front_matter, out_path, langs))
docs = sorted(selected_docs, key=lambda x: x[1]['date'], reverse=True)
index_ranges = range(0, len(docs), BLOG_INDEX_PAGE_SIZE)
for off in index_ranges:
page = docs[off: off + BLOG_INDEX_PAGE_SIZE]
posts = [
{
"doc": doc,
"title": front_matter['title'],
"post_publication_date": front_matter['date'],
"post_tags": split_tags(front_matter['tags']),
"summary": summarize(doc),
"link": out_path.rstrip('/') + '/',
}
for (doc, front_matter, out_path, _alternatives) in page
]
prev_index_num = None
next_index_num = off // BLOG_INDEX_PAGE_SIZE + 1
if off > 0:
prev_index_num = off // BLOG_INDEX_PAGE_SIZE - 1
if next_index_num >= len(index_ranges):
next_index_num = None
result = BLOG_INDEX_TEMPLATE.render(
posts=posts,
prev_index_num=prev_index_num,
next_index_num=next_index_num,
)
if off == 0:
fname = 'index.html'
else:
fname = 'index-{}.html'.format(off // BLOG_INDEX_PAGE_SIZE)
with open(os.path.join(dest_top, fname), 'wt') as f:
f.write(result)
def render_categories(docs, dest_top):
categories = {}
for (doc, front_matter, out_path) in docs.values():
for tag in split_tags(front_matter['tags']):
if tag not in categories:
categories[tag] = []
categories[tag].append((doc, front_matter, out_path))
print("Found {} tags".format(len(categories), categories))
for tag, docs in categories.items():
docs = sorted(docs, key=lambda x: x[1]['date'], reverse=True)
posts = [
{
# "doc": doc,
"title": front_matter['title'],
"post_publication_date": front_matter['date'],
"post_tags": split_tags(front_matter['tags']),
# "summary": summarize(doc),
"link": out_path.rstrip('/') + '/',
}
for (doc, front_matter, out_path) in docs
]
result = CATEGORY_LIST_TEMPLATE.render(
posts=posts,
)
path = os.path.join(dest_top, "tags", tag.replace('/', '_'), "index.html")
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'wt') as f:
f.write(result)
def render_archive(docs, dest_top):
docs = sorted(docs.values(), key=lambda x: x[1]['date'], reverse=True)
posts = [
{
# "doc": doc,
"title": front_matter['title'],
"post_publication_date": front_matter['date'],
"post_tags": split_tags(front_matter['tags']),
# "summary": summarize(doc),
"link": out_path.rstrip('/') + '/',
}
for (doc, front_matter, out_path) in docs
]
result = ARTICLE_LIST_TEMPLATE.render(
posts=posts,
)
path = os.path.join(dest_top, "articles", "index.html")
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'wt') as f:
f.write(result)
def render_rss(docs, dest_top):
# Collect all languages accepted for all docs
docs_by_slug = {}
for (doc, front_matter, out_path) in docs.values():
if front_matter['slug'] not in docs_by_slug:
docs_by_slug[front_matter['slug']] = {}
docs_by_slug[front_matter['slug']][front_matter.get('lang', LANG_PRIORITY[0])] = (doc, front_matter, out_path)
# Remove duplicated for langs with less priority
selected_docs = []
for (doc, front_matter, out_path) in docs.values():
langs = docs_by_slug[front_matter['slug']]
lang_priority = LANG_PRIORITY.index(front_matter.get('lang', LANG_PRIORITY[0]))
min_lang_priority = min([
LANG_PRIORITY.index(lang)
for lang in langs.keys()
])
if lang_priority == min_lang_priority:
selected_docs.append((doc, front_matter, out_path, langs))
docs = sorted(selected_docs, key=lambda x: x[1]['date'], reverse=True)
posts = [
{
# "doc": doc,
"title": front_matter['title'],
"post_publication_date": front_matter['date'],
"post_tags": split_tags(front_matter['tags']),
"summary": summarize(doc),
"link": out_path.rstrip('/') + '/',
}
for (doc, front_matter, out_path, langs) in docs[:ITEMS_IN_RSS]
]
result = RSS_TEMPLATE.render(
posts=posts,
last_build_date=datetime.datetime.utcnow(),
)
path = os.path.join(dest_top, "rss.xml")
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'wt') as f:
f.write(result)
def regen_all(source_top, dest_top, docs=None, db=None):
if docs is None:
docs = load_all(source_top)
cur = db.cursor()
cleaned_db = False
try:
cur.execute('DELETE FROM note_search WHERE parser_namespace = ?;', (PARSER_NAMESPACE,))
cleaned_db = True
except sqlite3.OperationalError as err:
if WATCH:
logging.warning("Error pre-cleaning DB, search won't be updated")
else:
raise
# Save posts to DB
for (doc, front_matter, out_path) in docs.values():
cur.execute('''INSERT INTO note_search(note_id, title, body, top_level_title, is_done, is_todo, parser_namespace, url) VALUES (?, ?, ?, ?, ?, ?, ?, ?);''',
(
out_path,
front_matter['title'],
doc,
front_matter['title'],
False,
False,
PARSER_NAMESPACE,
out_path + '/index.html',
))
cur.close()
db.commit()
# Render posts
for (doc, front_matter, out_path) in docs.values():
doc_full_path = os.path.join(dest_top, out_path)
os.makedirs(os.path.dirname(doc_full_path), exist_ok=True)
# print("==", doc_full_path)
full_out_path = doc_full_path + '/index.html'
os.makedirs(os.path.dirname(full_out_path), exist_ok=True)
with open(full_out_path, 'wt') as f:
try:
render_article(doc, front_matter, f, out_path)
except:
logging.error(traceback.format_exc())
logging.error("Rendering failed 😿")
continue
# Render statics
for static in STATIC_RESOURCES:
src_path = static[0]
dest_path = static[1]
if len(static) > 2:
before, after = static[2]
else:
before, after = '', ''
target_dest = os.path.join(dest_top, dest_path)
os.makedirs(os.path.dirname(target_dest), exist_ok=True)
with open(os.path.join(STATIC_PATH, src_path), 'rt') as src:
data = before + src.read() + after
with open(target_dest, 'wt') as f:
f.write(data)
# Render index
render_index(docs, dest_top)
# Render categories
render_categories(docs, dest_top)
# Render archive
render_archive(docs, dest_top)
# Render RSS
render_rss(docs, dest_top)
return docs
def main(source_top, dest_top):
notifier = inotify.adapters.InotifyTrees([source_top, STATIC_PATH])
## Initial load
t0 = time.time()
logging.info("Initial load...")
db = create_db(os.path.join(dest_top, '..', 'db.sqlite3'))
docs = regen_all(source_top, dest_top, db=db)
logging.info("Initial load completed in {:.2f}s".format(time.time() - t0))
if not WATCH:
logging.info("Build completed in {:.2f}s".format(time.time() - t0))
return 0
## Updating
for event in notifier.event_gen(yield_nones=False):
(ev, types, directory, file) = event
if not any([type in MONITORED_EVENT_TYPES for type in types]):
continue
filepath = os.path.join(directory, file)
if filepath.startswith(STATIC_PATH):
t0 = time.time()
try:
update_statics()
except:
logging.error(traceback.format_exc())
logging.error("Loading new templates failed 😿")
continue
is_static_resource = False
for static in STATIC_RESOURCES:
src_path = static[0]
dest_path = static[1]
if file == os.path.basename(src_path):
is_static_resource = True
if len(static) > 2:
before, after = static[2]
else:
before, after = '', ''
target_dest = os.path.join(dest_top, dest_path)
os.makedirs(os.path.dirname(target_dest), exist_ok=True)
with open(os.path.join(STATIC_PATH, src_path), 'rt') as src:
data = before + src.read() + after
with open(target_dest, 'wt') as f:
f.write(data)
if is_static_resource:
logging.info("Updated static resources in {:.2f}s".format(time.time() - t0))
else:
docs = regen_all(source_top, dest_top, docs, db=db)
logging.info("Updated all in {:.2f}s".format(time.time() - t0))
else:
try:
print("Reloading: {}".format(filepath))
(doc, front_matter, out_path) = load_doc(filepath)
except:
logging.error(traceback.format_exc())
logging.error("Skipping update 😿")
continue
t0 = time.time()
docs[filepath] = (doc, front_matter, out_path)
doc_full_path = os.path.join(dest_top, out_path)
print("Updated: {}.html".format(doc_full_path))
os.makedirs(os.path.dirname(doc_full_path + '/index.html'), exist_ok=True)
# print("==", doc_full_path)
with open(doc_full_path + '/index.html', 'wt') as f:
try:
render_article(doc, front_matter, f, out_path)
render_archive(docs, dest_top)
except:
logging.error(traceback.format_exc())
logging.error("Rendering failed 😿")
continue
logging.info("Updated all in {:.2f}s".format(time.time() - t0))
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: {} SOURCE_TOP DEST_TOP".format(sys.argv[0]))
exit(0)
logging.basicConfig(level=logging.INFO, format="%(levelname)-8s %(message)s")
main(sys.argv[1], sys.argv[2])