various fixes, new code

arbtt_wayland_toplevel no longer gets caught in weird loops upon disconnect
smtp2rss now actually sanitizes HTML properly
This commit is contained in:
osmarks 2023-08-10 01:51:44 +01:00
parent dacc2b19e5
commit c37b6a6af0
3 changed files with 106 additions and 8 deletions

View File

@ -6,6 +6,7 @@ from wl_framework.protocols.base import UnsupportedProtocolError
from wl_framework.protocols.foreign_toplevel import ForeignTopLevel
from wl_framework.protocols.data_control import DataControl
from wl_framework.protocols.idle_notify import ( IdleNotifyManager, IdleNotifier as _IdleNotifier )
from wl_framework.network.connection import WaylandDisconnected
import asyncio.subprocess as subprocess
import orjson
@ -108,7 +109,12 @@ if __name__ == '__main__':
import asyncio
from wl_framework.loop_integrations import AsyncIOIntegration
# without this, WaylandDisconnected exceptions are thrown forever and problems occur
def handler(loop, context):
sys.exit(0)
async def init():
asyncio.get_event_loop().set_exception_handler(handler)
arbtt_importer = await subprocess.create_subprocess_exec("arbtt-import", "-a", "-t", "JSON", stdin=subprocess.PIPE)
loop = AsyncIOIntegration()
try:
@ -125,4 +131,4 @@ if __name__ == '__main__':
try:
asyncio.run(init())
except KeyboardInterrupt:
print()
print()

68
block_scope.py Normal file
View File

@ -0,0 +1,68 @@
import ast
import inspect
import types
BLOCKS = (
ast.If,
ast.For, ast.AsyncFor, ast.While,
ast.Try,
ast.With, ast.AsyncWith,
ast.Match
)
def block_scope(f):
_, pos = inspect.getsourcelines(f)
source = inspect.getsource(f)
source = '\n'.join(source.splitlines()[1:]) # remove the decorator first line.
old_code_obj = f.__code__
old_ast = ast.parse(source)
def rewrite(node, varstack):
if isinstance(node, (ast.Import, ast.ImportFrom)):
varstack[-1].update(x.asname or x.name for x in node.names)
if isinstance(node, BLOCKS):
varstack = varstack + [set()]
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
varstack[-1].add(node.name)
varstack = varstack + [set(arg.arg for arg in node.args.args)]
if isinstance(node, (ast.Nonlocal, ast.Global)):
varstack[-1].update(node.names)
if isinstance(node, ast.Name):
if isinstance(node.ctx, ast.Load):
if all(node.id not in s for s in varstack):
node.id += "\u200b" * len(varstack)
elif isinstance(node.ctx, ast.Store):
varstack[-1].add(node.id)
for child in ast.iter_child_nodes(node):
rewrite(child, varstack)
return node
new_ast = rewrite(old_ast, [set(f.__globals__) | set(dir(__builtins__))])
ast.increment_lineno(new_ast, pos)
new_code_obj = compile(new_ast, old_code_obj.co_filename, "exec")
new_f = types.FunctionType(new_code_obj.co_consts[0], f.__globals__)
return new_f
@block_scope
def example(demo1, demo2):
import random as rand
import random
from random import randint
if demo1:
if random.randint(0, 1) or randint(0, 3) == 3 or rand.randint(0, 5) == 5:
x = 3
else:
x = 4
print(x) # error
x = 723
def test():
nonlocal x
x = 4
if demo2:
test()
print(x)
print(example)
example(False, True)

View File

@ -12,6 +12,7 @@ import json
import feedparser.sanitizer
import rfeed
import base64
from lxml.html.clean import Cleaner
def now(): return datetime.now(tz=timezone.utc)
def decode_mime(subject): return str(make_header(decode_header(subject)))
@ -80,6 +81,26 @@ preference = {
"text/plain": 1
}
def clean_html(html):
cleaner = Cleaner(
page_structure=True,
meta=True,
embedded=True,
links=True,
style=False,
processing_instructions=True,
inline_style=True,
scripts=True,
javascript=True,
comments=True,
frames=True,
forms=True,
annoying_tags=True,
remove_unknown_tags=True,
safe_attrs_only=True
)
return cleaner.clean_html(feedparser.sanitizer._sanitize_html(html.replace("<!doctype html>", ""), "utf-8", "text/html"))
def email_to_html(emsg, debug_info=False):
if isinstance(emsg, Message):
payload = emsg.get_payload()
@ -90,14 +111,17 @@ def email_to_html(emsg, debug_info=False):
else:
html = [ email_to_html(thing, debug_info) for thing in payload ]
else:
try:
payload = emsg.get_payload(decode=True).decode("utf-8")
except:
payload = emsg.get_payload(decode=True).decode("latin1")
if emsg.get_content_subtype() == "html":
html = div(dominate.util.raw(feedparser.sanitizer._sanitize_html(payload.replace("<!doctype html>", ""), "utf-8", "text/html")))
if "attachment" in emsg.get("content-disposition", ""):
html = div("[attachment]")
else:
html = pre(payload)
try:
payload = emsg.get_payload(decode=True).decode("utf-8")
except:
payload = emsg.get_payload(decode=True).decode("latin1")
if emsg.get_content_subtype() == "html":
html = div(dominate.util.raw(clean_html(payload)))
else:
html = pre(payload)
else:
html = [ email_to_html(thing, debug_info) for thing in emsg.get_body(list(preference.keys())) ]
return div([