Files
kitty-mirror/kitty_tests/notifications.py

332 lines
10 KiB
Python

#!/usr/bin/env python
# License: GPLv3 Copyright: 2024, Kovid Goyal <kovid at kovidgoyal.net>
import os
import re
import tempfile
from base64 import standard_b64encode
from kitty.notifications import Channel, DesktopIntegration, IconDataCache, NotificationManager, UIState, Urgency
from . import BaseTest
def n(
title='title', body='', urgency=Urgency.Normal, desktop_notification_id=1, icon_names=(), icon_path='',
application_name='', notification_types=(), timeout=-1, sound='system',
):
return {
'title': title, 'body': body, 'urgency': urgency, 'id': desktop_notification_id, 'icon_names': icon_names, 'icon_path': icon_path,
'application_name': application_name, 'notification_types': notification_types, 'timeout': timeout, 'sound': sound,
}
class DesktopIntegration(DesktopIntegration):
def initialize(self):
self.reset()
def reset(self):
self.notifications = []
self.close_events = []
self.new_version_activated = False
self.close_succeeds = True
self.counter = 0
def query_live_notifications(self, channel_id, client_id):
ids = [n['id'] for n in self.notifications]
self.notification_manager.send_live_response(channel_id, client_id, tuple(ids))
def on_new_version_notification_activation(self, cmd, which) -> None:
self.new_version_activated = which + 1
def close_notification(self, desktop_notification_id: int) -> bool:
self.close_events.append(desktop_notification_id)
if self.close_succeeds:
self.notification_manager.notification_closed(desktop_notification_id)
return self.close_succeeds
def notify(self, cmd, existing_desktop_notification_id) -> int:
if existing_desktop_notification_id:
did = existing_desktop_notification_id
else:
self.counter += 1
did = self.counter
title, body, urgency = cmd.title, cmd.body, (Urgency.Normal if cmd.urgency is None else cmd.urgency)
ans = n(title, body, urgency, did, cmd.icon_names, os.path.basename(cmd.icon_path), cmd.application_name,
cmd.notification_types, timeout=cmd.timeout, sound=cmd.sound_name)
self.notifications.append(ans)
return self.counter
class Channel(Channel):
focused = visible = True
def __init__(self, *a):
super().__init__(*a)
self.reset()
def reset(self):
self.responses = []
self.focus_events = []
def ui_state(self, channel_id):
return UIState(self.focused, self.visible)
def focus(self, channel_id: int, activation_token: str) -> None:
self.focus_events.append(activation_token)
def send(self, channel_id: int, osc_escape_code: str) -> bool:
self.responses.append(osc_escape_code)
class NotificationManager(NotificationManager):
@property
def filter_rules(self):
yield from ('title:filterme',)
def do_test(self: 'TestNotifications', tdir: str) -> None:
di = DesktopIntegration(None)
ch = Channel()
nm = NotificationManager(di, ch, lambda *a, **kw: None, base_cache_dir=tdir)
di.notification_manager = nm
def reset():
di.reset()
ch.reset()
nm.reset()
def h(raw_data, osc_code=99, channel_id=1):
nm.handle_notification_cmd(channel_id, osc_code, raw_data)
def activate(which=0, button=0):
n = di.notifications[which]
nm.notification_activated(n['id'], button)
def close(which=0):
n = di.notifications[which]
di.close_notification(n['id'])
def assert_events(focus=True, close=0, report='', close_response='', live=''):
self.ae(ch.focus_events, [''] if focus else [])
if report:
self.assertIn(f'99;i={report};', ch.responses)
else:
for r in ch.responses:
m = re.match(r'99;i=[a-z0-9]+;', r)
self.assertIsNone(m, f'Unexpectedly found report response: {r}')
if close_response:
self.assertIn(f'99;i={close_response}:p=close;', ch.responses)
else:
for r in ch.responses:
m = re.match(r'99;i=[a-z0-9]+:p=close;', r)
self.assertIsNone(m, f'Unexpectedly found close response: {r}')
if live:
self.assertIn(f'99;i=live:p=alive;{live}', ch.responses)
else:
for r in ch.responses:
m = re.match(r'99;i=[a-z0-9]+:p=alive;', r)
self.assertIsNone(m, f'Unexpectedly found alive response: {r}')
self.ae(di.close_events, [close] if close else [])
h('test it', osc_code=9)
self.ae(di.notifications, [n(title='test it')])
activate()
assert_events()
reset()
h('d=0:u=2:i=x;title')
h('d=1:i=x:p=body;body')
self.ae(di.notifications, [n(body='body', urgency=Urgency.Critical)])
activate()
assert_events()
reset()
h('i=x:p=body:a=-focus;body')
self.ae(di.notifications, [n(title='body')])
activate()
assert_events(focus=False)
reset()
nm.send_new_version_notification('moose')
self.ae(di.notifications, [n('kitty update available!', 'kitty version moose released')])
activate()
self.assertTrue(di.new_version_activated)
reset()
h('i=x:e=1;' + standard_b64encode(b'title').decode('ascii'))
self.ae(di.notifications, [n()])
activate()
assert_events()
reset()
h('e=1;' + standard_b64encode(b'title').decode('ascii'))
self.ae(di.notifications, [n()])
activate()
assert_events()
reset()
h('d=0:i=x:a=-report;title')
h('d=1:i=x:a=report;body')
self.ae(di.notifications, [n(title='titlebody')])
activate()
assert_events(report='x')
reset()
h('a=report;title')
self.ae(di.notifications, [n()])
activate()
assert_events(report='0')
reset()
h('d=0:i=y;title')
h('d=1:i=y:p=xxx;title')
self.ae(di.notifications, [n()])
reset()
# test filtering
h(';title')
h(';filterme please')
self.ae(di.notifications, [n()])
reset()
# test closing interactions with reporting and activation
h('i=c;title')
self.ae(di.notifications, [n()])
close()
assert_events(focus=False, close=True)
reset()
h('i=c:c=1;title')
self.ae(di.notifications, [n()])
h('i=c:p=close')
self.ae(di.notifications, [n()])
assert_events(focus=False, close=True, close_response='c')
reset()
h('i=c:c=1;title')
h('i=c:p=close')
self.ae(di.notifications, [n()])
assert_events(focus=False, close=True, close_response='c')
reset()
h('i=c;title')
activate()
close()
h('i=c:p=close')
self.ae(di.notifications, [n()])
assert_events(focus=True, close=True)
reset()
h('i=c:a=report:c=1;title')
activate()
h('i=c:p=close')
self.ae(di.notifications, [n()])
assert_events(focus=True, report='c', close=True, close_response='c')
reset()
h('i=a[;title')
h('i=b;title')
h('i=live:p=alive;')
assert_events(focus=False, live='a,b')
reset()
h(';title')
self.ae(di.notifications, [n()])
activate()
assert_events()
reset()
# test sounds
def enc(x):
return standard_b64encode(x.encode()).decode()
h(f's={enc("silent")};title')
self.ae(di.notifications, [n(sound='silent')])
h(f's={enc("custom")};title')
self.ae(di.notifications[-1], n(desktop_notification_id=2, sound='custom'))
reset()
# Test querying
h('i=xyz:p=?')
self.assertFalse(di.notifications)
qr = 'a=focus,report:o=always,unfocused,invisible:u=0,1,2:p=title,body,?,close,icon,alive,buttons:c=1:w=1:s=system,silent,error,info,question,warn,warning'
self.ae(ch.responses, [f'99;i=xyz:p=?;{qr}'])
reset()
h('p=?')
self.assertFalse(di.notifications)
self.ae(ch.responses, [f'99;i=0:p=?;{qr}'])
# Test MIME streaming
for padding in (True, False):
for extra in ('a', 'ab', 'abc', 'abcd'):
text = 'some reasonably long text to test MIME streaming with: '
encoded = standard_b64encode(text.encode()).decode()
if not padding:
encoded = encoded.rstrip('=')
for t in encoded:
h(f'i=s:e=1:d=0;{t}')
h(f'i=s:e=1:d=0:p=body;{encoded[:13]}')
h(f'i=s:e=1:d=0:p=body;{encoded[13:]}')
h('i=s')
self.ae(di.notifications, [n(text, text)])
reset()
# Test application name and notification type
def e(x):
return standard_b64encode(x.encode()).decode()
h(f'i=t:d=0:f={e("app")}:t={e("1")};title')
h(f'i=t:t={e("test")}')
self.ae(di.notifications, [n(application_name='app', notification_types=('1', 'test',))])
reset()
# Test timeout
h('w=3;title')
self.ae(di.notifications, [n(timeout=3)])
reset()
# Test Disk Cache
dc = IconDataCache(base_cache_dir=tdir, max_cache_size=4)
cache_dir = dc._ensure_state()
for i in range(5):
dc.add_icon(str(i), str(i).encode())
self.ae(set(dc.keys()), set(map(str, range(1, 5))))
del dc
self.assertFalse(os.path.exists(cache_dir))
# Test icons
def send_with_icon(data='', n='', g=''):
m = ''
if n:
for x in n.split(','):
m += f'n={standard_b64encode(x.encode()).decode()}:'
if g:
m += f'g=({g}:'
h(f'i=9:d=0:{m};title')
h(f'i=9:p=icon;{data}')
dc = nm.icon_data_cache
send_with_icon(n='mycon,ic2')
self.ae(di.notifications, [n(icon_names=('mycon', 'ic2'))])
reset()
send_with_icon(g='gid')
self.ae(di.notifications, [n()])
reset()
send_with_icon(g='gid', data='1')
self.ae(di.notifications, [n(icon_path=dc.hash(b'1'))])
send_with_icon(g='gid', n='moose')
self.ae(di.notifications[-1], n(icon_names=('moose',), icon_path=dc.hash(b'1')))
send_with_icon(g='gid2', data='2')
self.ae(di.notifications[-1], n(icon_path=dc.hash(b'2')))
send_with_icon(data='3')
self.ae(di.notifications[-1], n(icon_path=dc.hash(b'3')))
reset()
class TestNotifications(BaseTest):
def test_desktop_notify(self):
with tempfile.TemporaryDirectory() as tdir:
do_test(self, tdir)