Refactor UI redraw handling and improve message drawing logic

This commit is contained in:
pdxlocations
2026-03-19 14:06:01 -07:00
parent 8f376edabe
commit ecc5308dad
9 changed files with 338 additions and 152 deletions

View File

@@ -48,11 +48,8 @@ from contact.utilities.utils import (
add_new_message,
)
from contact.ui.contact_ui import (
draw_packetlog_win,
draw_node_list,
draw_messages_window,
draw_channel_list,
add_notification,
request_ui_redraw,
)
from contact.utilities.db_handler import (
save_message_to_db,
@@ -121,7 +118,7 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
if ui_state.display_log:
draw_packetlog_win()
request_ui_redraw(packetlog=True)
if ui_state.current_window == 4:
menu_state.need_redraw = True
@@ -132,7 +129,7 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
# Assume any incoming packet could update the last seen time for a node
changed = refresh_node_list()
if changed:
draw_node_list()
request_ui_redraw(nodes=True)
if packet["decoded"]["portnum"] == "NODEINFO_APP":
if "user" in packet["decoded"] and "longName" in packet["decoded"]["user"]:
@@ -186,9 +183,9 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
add_new_message(channel_id, f"{config.message_prefix} [{hops}] {message_from_string} ", message_string)
if refresh_channels:
draw_channel_list()
request_ui_redraw(channels=True)
if refresh_messages:
draw_messages_window(True)
request_ui_redraw(messages=True, scroll_messages_to_bottom=True)
save_message_to_db(channel_id, message_from_id, message_string)

View File

@@ -15,7 +15,7 @@ from contact.utilities.db_handler import (
)
import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state
from contact.utilities.singleton import ui_state, interface_state, app_state
from contact.utilities.utils import add_new_message
@@ -28,145 +28,141 @@ def onAckNak(packet: Dict[str, Any]) -> None:
"""
Handles incoming ACK/NAK response packets.
"""
from contact.ui.contact_ui import draw_messages_window
from contact.ui.contact_ui import request_ui_redraw
request = packet["decoded"]["requestId"]
if request not in ack_naks:
return
with app_state.lock:
request = packet["decoded"]["requestId"]
if request not in ack_naks:
return
acknak = ack_naks.pop(request)
message = ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]][1]
acknak = ack_naks.pop(request)
message = ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]][1]
confirm_string = " "
ack_type = None
if packet["decoded"]["routing"]["errorReason"] == "NONE":
if packet["from"] == interface_state.myNodeNum: # Ack "from" ourself means implicit ACK
confirm_string = config.ack_implicit_str
ack_type = "Implicit"
confirm_string = " "
ack_type = None
if packet["decoded"]["routing"]["errorReason"] == "NONE":
if packet["from"] == interface_state.myNodeNum: # Ack "from" ourself means implicit ACK
confirm_string = config.ack_implicit_str
ack_type = "Implicit"
else:
confirm_string = config.ack_str
ack_type = "Ack"
else:
confirm_string = config.ack_str
ack_type = "Ack"
else:
confirm_string = config.nak_str
ack_type = "Nak"
confirm_string = config.nak_str
ack_type = "Nak"
ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]] = (
time.strftime("[%H:%M:%S] ") + config.sent_message_prefix + confirm_string + ": ",
message,
)
ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]] = (
time.strftime("[%H:%M:%S] ") + config.sent_message_prefix + confirm_string + ": ",
message,
)
update_ack_nak(acknak["channel"], acknak["timestamp"], message, ack_type)
update_ack_nak(acknak["channel"], acknak["timestamp"], message, ack_type)
channel_number = ui_state.channel_list.index(acknak["channel"])
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
draw_messages_window()
channel_number = ui_state.channel_list.index(acknak["channel"])
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
request_ui_redraw(messages=True)
def on_response_traceroute(packet: Dict[str, Any]) -> None:
"""
Handle traceroute response packets and render the route visually in the UI.
"""
from contact.ui.contact_ui import draw_channel_list, draw_messages_window, add_notification
from contact.ui.contact_ui import add_notification, request_ui_redraw
refresh_channels = False
refresh_messages = False
with app_state.lock:
refresh_channels = False
refresh_messages = False
UNK_SNR = -128 # Value representing unknown SNR
UNK_SNR = -128 # Value representing unknown SNR
route_discovery = mesh_pb2.RouteDiscovery()
route_discovery.ParseFromString(packet["decoded"]["payload"])
msg_dict = google.protobuf.json_format.MessageToDict(route_discovery)
route_discovery = mesh_pb2.RouteDiscovery()
route_discovery.ParseFromString(packet["decoded"]["payload"])
msg_dict = google.protobuf.json_format.MessageToDict(route_discovery)
msg_str = "Traceroute to:\n"
msg_str = "Traceroute to:\n"
route_str = (
get_name_from_database(packet["to"], "short") or f"{packet['to']:08x}"
) # Start with destination of response
# SNR list should have one more entry than the route, as the final destination adds its SNR also
lenTowards = 0 if "route" not in msg_dict else len(msg_dict["route"])
snrTowardsValid = "snrTowards" in msg_dict and len(msg_dict["snrTowards"]) == lenTowards + 1
if lenTowards > 0: # Loop through hops in route and add SNR if available
for idx, node_num in enumerate(msg_dict["route"]):
route_str += (
" --> "
+ (get_name_from_database(node_num, "short") or f"{node_num:08x}")
+ " ("
+ (
str(msg_dict["snrTowards"][idx] / 4)
if snrTowardsValid and msg_dict["snrTowards"][idx] != UNK_SNR
else "?"
)
+ "dB)"
)
# End with origin of response
route_str += (
" --> "
+ (get_name_from_database(packet["from"], "short") or f"{packet['from']:08x}")
+ " ("
+ (str(msg_dict["snrTowards"][-1] / 4) if snrTowardsValid and msg_dict["snrTowards"][-1] != UNK_SNR else "?")
+ "dB)"
)
msg_str += route_str + "\n" # Print the route towards destination
# Only if hopStart is set and there is an SNR entry (for the origin) it's valid, even though route might be empty (direct connection)
lenBack = 0 if "routeBack" not in msg_dict else len(msg_dict["routeBack"])
backValid = "hopStart" in packet and "snrBack" in msg_dict and len(msg_dict["snrBack"]) == lenBack + 1
if backValid:
msg_str += "Back:\n"
route_str = (
get_name_from_database(packet["from"], "short") or f"{packet['from']:08x}"
) # Start with origin of response
get_name_from_database(packet["to"], "short") or f"{packet['to']:08x}"
) # Start with destination of response
if lenBack > 0: # Loop through hops in routeBack and add SNR if available
for idx, node_num in enumerate(msg_dict["routeBack"]):
lenTowards = 0 if "route" not in msg_dict else len(msg_dict["route"])
snrTowardsValid = "snrTowards" in msg_dict and len(msg_dict["snrTowards"]) == lenTowards + 1
if lenTowards > 0:
for idx, node_num in enumerate(msg_dict["route"]):
route_str += (
" --> "
+ (get_name_from_database(node_num, "short") or f"{node_num:08x}")
+ " ("
+ (str(msg_dict["snrBack"][idx] / 4) if msg_dict["snrBack"][idx] != UNK_SNR else "?")
+ (
str(msg_dict["snrTowards"][idx] / 4)
if snrTowardsValid and msg_dict["snrTowards"][idx] != UNK_SNR
else "?"
)
+ "dB)"
)
# End with destination of response (us)
route_str += (
" --> "
+ (get_name_from_database(packet["to"], "short") or f"{packet['to']:08x}")
+ (get_name_from_database(packet["from"], "short") or f"{packet['from']:08x}")
+ " ("
+ (str(msg_dict["snrBack"][-1] / 4) if msg_dict["snrBack"][-1] != UNK_SNR else "?")
+ (str(msg_dict["snrTowards"][-1] / 4) if snrTowardsValid and msg_dict["snrTowards"][-1] != UNK_SNR else "?")
+ "dB)"
)
msg_str += route_str + "\n" # Print the route back to us
msg_str += route_str + "\n"
if packet["from"] not in ui_state.channel_list:
ui_state.channel_list.append(packet["from"])
refresh_channels = True
lenBack = 0 if "routeBack" not in msg_dict else len(msg_dict["routeBack"])
backValid = "hopStart" in packet and "snrBack" in msg_dict and len(msg_dict["snrBack"]) == lenBack + 1
if backValid:
msg_str += "Back:\n"
route_str = get_name_from_database(packet["from"], "short") or f"{packet['from']:08x}"
if is_chat_archived(packet["from"]):
update_node_info_in_db(packet["from"], chat_archived=False)
if lenBack > 0:
for idx, node_num in enumerate(msg_dict["routeBack"]):
route_str += (
" --> "
+ (get_name_from_database(node_num, "short") or f"{node_num:08x}")
+ " ("
+ (str(msg_dict["snrBack"][idx] / 4) if msg_dict["snrBack"][idx] != UNK_SNR else "?")
+ "dB)"
)
channel_number = ui_state.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number]
route_str += (
" --> "
+ (get_name_from_database(packet["to"], "short") or f"{packet['to']:08x}")
+ " ("
+ (str(msg_dict["snrBack"][-1] / 4) if msg_dict["snrBack"][-1] != UNK_SNR else "?")
+ "dB)"
)
if channel_id == ui_state.channel_list[ui_state.selected_channel]:
refresh_messages = True
else:
add_notification(channel_number)
refresh_channels = True
msg_str += route_str + "\n"
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
if packet["from"] not in ui_state.channel_list:
ui_state.channel_list.append(packet["from"])
refresh_channels = True
add_new_message(channel_id, f"{config.message_prefix} {message_from_string}", msg_str)
if is_chat_archived(packet["from"]):
update_node_info_in_db(packet["from"], chat_archived=False)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
channel_number = ui_state.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number]
save_message_to_db(channel_id, packet["from"], msg_str)
if channel_id == ui_state.channel_list[ui_state.selected_channel]:
refresh_messages = True
else:
add_notification(channel_number)
refresh_channels = True
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
add_new_message(channel_id, f"{config.message_prefix} {message_from_string}", msg_str)
if refresh_channels:
request_ui_redraw(channels=True)
if refresh_messages:
request_ui_redraw(messages=True, scroll_messages_to_bottom=True)
save_message_to_db(channel_id, packet["from"], msg_str)
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
"""

View File

@@ -15,8 +15,15 @@ from contact.utilities.i18n import t
from contact.utilities.emoji_utils import normalize_message_text
import contact.ui.default_config as config
import contact.ui.dialog
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text
from contact.utilities.singleton import ui_state, interface_state, menu_state
from contact.ui.nav_utils import (
move_main_highlight,
draw_main_arrows,
get_msg_window_lines,
wrap_text,
truncate_with_ellipsis,
pad_to_width,
)
from contact.utilities.singleton import ui_state, interface_state, menu_state, app_state
MIN_COL = 1 # "effectively zero" without breaking curses
@@ -25,6 +32,53 @@ root_win = None
nodes_pad = None
def request_ui_redraw(
*,
channels: bool = False,
messages: bool = False,
nodes: bool = False,
packetlog: bool = False,
full: bool = False,
scroll_messages_to_bottom: bool = False,
) -> None:
ui_state.redraw_channels = ui_state.redraw_channels or channels
ui_state.redraw_messages = ui_state.redraw_messages or messages
ui_state.redraw_nodes = ui_state.redraw_nodes or nodes
ui_state.redraw_packetlog = ui_state.redraw_packetlog or packetlog
ui_state.redraw_full_ui = ui_state.redraw_full_ui or full
ui_state.scroll_messages_to_bottom = ui_state.scroll_messages_to_bottom or scroll_messages_to_bottom
def process_pending_ui_updates(stdscr: curses.window) -> None:
if ui_state.redraw_full_ui:
ui_state.redraw_full_ui = False
ui_state.redraw_channels = False
ui_state.redraw_messages = False
ui_state.redraw_nodes = False
ui_state.redraw_packetlog = False
ui_state.scroll_messages_to_bottom = False
handle_resize(stdscr, False)
return
if ui_state.redraw_channels:
ui_state.redraw_channels = False
draw_channel_list()
if ui_state.redraw_nodes:
ui_state.redraw_nodes = False
draw_node_list()
if ui_state.redraw_messages:
scroll_to_bottom = ui_state.scroll_messages_to_bottom
ui_state.redraw_messages = False
ui_state.scroll_messages_to_bottom = False
draw_messages_window(scroll_to_bottom)
if ui_state.redraw_packetlog:
ui_state.redraw_packetlog = False
draw_packetlog_win()
# Draw arrows for a specific window id (0=channel,1=messages,2=nodes).
def draw_window_arrows(window_id: int) -> None:
@@ -89,7 +143,7 @@ def refresh_node_selection(old_index: int = -1, highlight: bool = False) -> None
if nodes_pad is None or not ui_state.node_list:
return
width = max(0, nodes_pad.getmaxyx()[1] - 4)
width = max(0, nodes_pad.getmaxyx()[1] - 2)
if 0 <= old_index < len(ui_state.node_list):
try:
@@ -121,7 +175,7 @@ def refresh_main_window(window_id: int, selected: bool) -> None:
elif window_id == 2:
paint_frame(nodes_win, selected=selected)
if ui_state.node_list and nodes_pad is not None:
width = max(0, nodes_pad.getmaxyx()[1] - 4)
width = max(0, nodes_pad.getmaxyx()[1] - 2)
nodes_pad.chgat(ui_state.selected_node, 1, width, get_node_row_color(ui_state.selected_node))
refresh_pad(2)
@@ -216,6 +270,7 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
win.refresh()
entry_win.keypad(True)
entry_win.timeout(200)
curses.curs_set(1)
try:
@@ -261,14 +316,19 @@ def main_ui(stdscr: curses.window) -> None:
handle_resize(stdscr, True)
while True:
with app_state.lock:
process_pending_ui_updates(stdscr)
draw_text_field(entry_win, f"Message: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
# Get user input from entry window
if queued_char is None:
char = entry_win.get_wch()
else:
char = queued_char
queued_char = None
try:
if queued_char is None:
char = entry_win.get_wch()
else:
char = queued_char
queued_char = None
except curses.error:
continue
# draw_debug(f"Keypress: {char}")
@@ -961,7 +1021,7 @@ def draw_channel_list() -> None:
channel_pad.erase()
win_width = channel_win.getmaxyx()[1]
channel_pad.resize(len(ui_state.all_messages), channel_win.getmaxyx()[1])
channel_pad.resize(max(1, len(ui_state.channel_list)), channel_win.getmaxyx()[1])
idx = 0
for channel in ui_state.channel_list:
@@ -978,9 +1038,7 @@ def draw_channel_list() -> None:
notification = " " + config.notification_symbol if idx in ui_state.notifications else ""
# Truncate the channel name if it's too long to fit in the window
truncated_channel = (
(channel[: win_width - 5] + "-" if len(channel) > win_width - 5 else channel) + notification
).ljust(win_width - 3)
truncated_channel = truncate_with_ellipsis(f"{channel}{notification}", win_width - 3)
color = get_color("channel_list")
if idx == ui_state.selected_channel:
@@ -1075,8 +1133,7 @@ def draw_node_list() -> None:
node_name = get_node_display_name(node_num, node)
# Future node name custom formatting possible
node_str = f"{status_icon} {node_name}"
node_str = node_str.ljust(box_width - 4)[: box_width - 2]
node_str = pad_to_width(f"{status_icon} {node_name}", box_width - 2)
nodes_pad.addstr(i, 1, node_str, get_node_row_color(i))
paint_frame(nodes_win, selected=(ui_state.current_window == 2))
@@ -1298,8 +1355,7 @@ def refresh_pad(window: int) -> None:
pad = messages_pad
box = messages_win
lines = get_msg_window_lines(messages_win, packetlog_win)
selected_item = ui_state.selected_message
start_index = ui_state.selected_message
start_index = ui_state.start_index[1]
if ui_state.display_log:
packetlog_win.box()

View File

@@ -331,6 +331,51 @@ def text_width(text: str) -> int:
return sum(2 if east_asian_width(c) in "FW" else 1 for c in text)
def slice_to_width(text: str, max_width: int) -> str:
if max_width <= 0:
return ""
width = 0
chars = []
for char in text:
char_width = text_width(char)
if width + char_width > max_width:
break
chars.append(char)
width += char_width
return "".join(chars)
def pad_to_width(text: str, width: int) -> str:
clipped = slice_to_width(text, width)
return clipped + (" " * max(0, width - text_width(clipped)))
def truncate_with_ellipsis(text: str, width: int) -> str:
if width <= 0:
return ""
if text_width(text) <= width:
return pad_to_width(text, width)
if width == 1:
return ""
return pad_to_width(slice_to_width(text, width - 1) + "", width)
def split_text_to_width_chunks(text: str, width: int) -> List[str]:
if width <= 0:
return [""]
chunks = []
remaining = text
while remaining:
chunk = slice_to_width(remaining, width)
if not chunk:
break
chunks.append(chunk)
remaining = remaining[len(chunk) :]
return chunks or [""]
def wrap_text(text: str, wrap_width: int) -> List[str]:
"""Wraps text while preserving spaces and breaking long words."""
@@ -353,8 +398,7 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
wrapped_lines.append(line_buffer.strip())
line_buffer = ""
line_length = 0
for i in range(0, word_length, wrap_width):
wrapped_lines.append(word[i : i + wrap_width])
wrapped_lines.extend(split_text_to_width_chunks(word, wrap_width))
continue
if line_length + word_length > wrap_width and word.strip():
@@ -413,8 +457,8 @@ def highlight_line(
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 4, color_new)
elif ui_state.current_window == 2:
menu_pad.chgat(old_idx, 1, menu_pad.getmaxyx()[1] - 4, get_node_color(old_idx))
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 4, get_node_color(new_idx, reverse=True))
menu_pad.chgat(old_idx, 1, menu_pad.getmaxyx()[1] - 2, get_node_color(old_idx))
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 2, get_node_color(new_idx, reverse=True))
menu_win.refresh()

View File

@@ -33,6 +33,12 @@ class ChatUIState:
show_save_option: bool = False
menu_path: List[str] = field(default_factory=list)
single_pane_mode: bool = False
redraw_channels: bool = False
redraw_messages: bool = False
redraw_nodes: bool = False
redraw_packetlog: bool = False
redraw_full_ui: bool = False
scroll_messages_to_bottom: bool = False
@dataclass

View File

@@ -36,3 +36,60 @@ class ContactUiTests(unittest.TestCase):
self.assertEqual(curs_set.call_args_list[0].args, (0,))
self.assertEqual(curs_set.call_args_list[-1].args, (1,))
self.assertEqual(ui_state.current_window, 1)
def test_process_pending_ui_updates_draws_requested_windows(self) -> None:
stdscr = mock.Mock()
ui_state.redraw_channels = True
ui_state.redraw_messages = True
ui_state.redraw_nodes = True
ui_state.redraw_packetlog = True
ui_state.scroll_messages_to_bottom = True
with mock.patch.object(contact_ui, "draw_channel_list") as draw_channel_list:
with mock.patch.object(contact_ui, "draw_messages_window") as draw_messages_window:
with mock.patch.object(contact_ui, "draw_node_list") as draw_node_list:
with mock.patch.object(contact_ui, "draw_packetlog_win") as draw_packetlog_win:
contact_ui.process_pending_ui_updates(stdscr)
draw_channel_list.assert_called_once_with()
draw_messages_window.assert_called_once_with(True)
draw_node_list.assert_called_once_with()
draw_packetlog_win.assert_called_once_with()
def test_process_pending_ui_updates_full_redraw_uses_handle_resize(self) -> None:
stdscr = mock.Mock()
ui_state.redraw_full_ui = True
ui_state.redraw_channels = True
ui_state.redraw_messages = True
with mock.patch.object(contact_ui, "handle_resize") as handle_resize:
contact_ui.process_pending_ui_updates(stdscr)
handle_resize.assert_called_once_with(stdscr, False)
self.assertFalse(ui_state.redraw_channels)
self.assertFalse(ui_state.redraw_messages)
def test_refresh_node_selection_highlights_full_row_width(self) -> None:
ui_state.node_list = [101, 202]
ui_state.selected_node = 1
ui_state.start_index = [0, 0, 0]
contact_ui.nodes_pad = mock.Mock()
contact_ui.nodes_pad.getmaxyx.return_value = (4, 20)
contact_ui.nodes_win = mock.Mock()
contact_ui.nodes_win.getmaxyx.return_value = (10, 20)
interface = mock.Mock()
interface.nodesByNum = {101: {}, 202: {}}
with mock.patch.object(contact_ui, "refresh_pad") as refresh_pad:
with mock.patch.object(contact_ui, "draw_window_arrows") as draw_window_arrows:
with mock.patch.object(contact_ui, "get_node_row_color", side_effect=[11, 22]):
with mock.patch("contact.ui.contact_ui.interface_state.interface", interface):
contact_ui.refresh_node_selection(old_index=0, highlight=True)
self.assertEqual(
contact_ui.nodes_pad.chgat.call_args_list,
[mock.call(0, 1, 18, 11), mock.call(1, 1, 18, 22)],
)
refresh_pad.assert_called_once_with(2)
draw_window_arrows.assert_called_once_with(2)

36
tests/test_nav_utils.py Normal file
View File

@@ -0,0 +1,36 @@
import unittest
from unittest import mock
from contact.ui import nav_utils
from contact.ui.nav_utils import truncate_with_ellipsis, wrap_text
from contact.utilities.singleton import ui_state
class NavUtilsTests(unittest.TestCase):
def setUp(self) -> None:
ui_state.current_window = 0
ui_state.node_list = []
ui_state.start_index = [0, 0, 0]
def test_wrap_text_splits_wide_characters_by_display_width(self) -> None:
self.assertEqual(wrap_text("🔐🔐🔐", 4), ["🔐", "🔐", "🔐"])
def test_truncate_with_ellipsis_respects_display_width(self) -> None:
self.assertEqual(truncate_with_ellipsis("🔐Alpha", 5), "🔐Al…")
def test_highlight_line_uses_full_node_row_width(self) -> None:
ui_state.current_window = 2
ui_state.start_index = [0, 0, 0]
menu_win = mock.Mock()
menu_win.getbegyx.return_value = (0, 0)
menu_win.getmaxyx.return_value = (8, 20)
menu_pad = mock.Mock()
menu_pad.getmaxyx.return_value = (4, 20)
with mock.patch.object(nav_utils, "get_node_color", side_effect=[11, 22]):
nav_utils.highlight_line(menu_win, menu_pad, 0, 1, 5)
self.assertEqual(
menu_pad.chgat.call_args_list,
[mock.call(0, 1, 18, 11), mock.call(1, 1, 18, 22)],
)

View File

@@ -34,17 +34,13 @@ class RxHandlerTests(unittest.TestCase):
}
with mock.patch.object(rx_handler, "refresh_node_list", return_value=True):
with mock.patch.object(rx_handler, "draw_node_list") as draw_node_list:
with mock.patch.object(rx_handler, "draw_messages_window") as draw_messages_window:
with mock.patch.object(rx_handler, "draw_channel_list") as draw_channel_list:
with mock.patch.object(rx_handler, "add_notification") as add_notification:
with mock.patch.object(rx_handler, "save_message_to_db") as save_message_to_db:
with mock.patch.object(rx_handler, "get_name_from_database", return_value="SAT2"):
rx_handler.on_receive(packet, interface=None)
with mock.patch.object(rx_handler, "request_ui_redraw") as request_ui_redraw:
with mock.patch.object(rx_handler, "add_notification") as add_notification:
with mock.patch.object(rx_handler, "save_message_to_db") as save_message_to_db:
with mock.patch.object(rx_handler, "get_name_from_database", return_value="SAT2"):
rx_handler.on_receive(packet, interface=None)
draw_node_list.assert_called_once_with()
draw_messages_window.assert_called_once_with(True)
draw_channel_list.assert_not_called()
self.assertEqual(request_ui_redraw.call_args_list, [mock.call(nodes=True), mock.call(messages=True, scroll_messages_to_bottom=True)])
add_notification.assert_not_called()
save_message_to_db.assert_called_once_with("Primary", 222, "hello")
self.assertEqual(ui_state.all_messages["Primary"][-1][1], "hello")
@@ -66,18 +62,16 @@ class RxHandlerTests(unittest.TestCase):
}
with mock.patch.object(rx_handler, "refresh_node_list", return_value=False):
with mock.patch.object(rx_handler, "draw_messages_window") as draw_messages_window:
with mock.patch.object(rx_handler, "draw_channel_list") as draw_channel_list:
with mock.patch.object(rx_handler, "add_notification") as add_notification:
with mock.patch.object(rx_handler, "update_node_info_in_db") as update_node_info_in_db:
with mock.patch.object(rx_handler, "save_message_to_db") as save_message_to_db:
with mock.patch.object(rx_handler, "get_name_from_database", return_value="SAT2"):
rx_handler.on_receive(packet, interface=None)
with mock.patch.object(rx_handler, "request_ui_redraw") as request_ui_redraw:
with mock.patch.object(rx_handler, "add_notification") as add_notification:
with mock.patch.object(rx_handler, "update_node_info_in_db") as update_node_info_in_db:
with mock.patch.object(rx_handler, "save_message_to_db") as save_message_to_db:
with mock.patch.object(rx_handler, "get_name_from_database", return_value="SAT2"):
rx_handler.on_receive(packet, interface=None)
self.assertIn(222, ui_state.channel_list)
self.assertIn(222, ui_state.all_messages)
draw_messages_window.assert_not_called()
draw_channel_list.assert_called_once_with()
request_ui_redraw.assert_called_once_with(channels=True)
add_notification.assert_called_once_with(1)
update_node_info_in_db.assert_called_once_with(222, chat_archived=False)
save_message_to_db.assert_called_once_with(222, 222, "dm")
@@ -87,10 +81,10 @@ class RxHandlerTests(unittest.TestCase):
ui_state.display_log = True
ui_state.current_window = 4
with mock.patch.object(rx_handler, "draw_packetlog_win") as draw_packetlog_win:
with mock.patch.object(rx_handler, "request_ui_redraw") as request_ui_redraw:
rx_handler.on_receive({"id": "new"}, interface=None)
draw_packetlog_win.assert_called_once_with()
request_ui_redraw.assert_called_once_with(packetlog=True)
self.assertEqual(len(ui_state.packet_buffer), 20)
self.assertEqual(ui_state.packet_buffer[-1], {"id": "new"})
self.assertTrue(menu_state.need_redraw)

View File

@@ -81,11 +81,11 @@ class TxHandlerTests(unittest.TestCase):
with mock.patch.object(tx_handler, "update_ack_nak") as update_ack_nak:
with mock.patch("contact.message_handlers.tx_handler.time.strftime", return_value="[01:02:03] "):
with mock.patch("contact.ui.contact_ui.draw_messages_window") as draw_messages_window:
with mock.patch("contact.ui.contact_ui.request_ui_redraw") as request_ui_redraw:
tx_handler.onAckNak(packet)
update_ack_nak.assert_called_once_with("Primary", 55, "hello", "Ack")
draw_messages_window.assert_called_once_with()
request_ui_redraw.assert_called_once_with(messages=True)
self.assertIn(config.sent_message_prefix, ui_state.all_messages["Primary"][0][0])
self.assertIn(config.ack_str, ui_state.all_messages["Primary"][0][0])
@@ -100,7 +100,7 @@ class TxHandlerTests(unittest.TestCase):
with mock.patch.object(tx_handler, "update_ack_nak") as update_ack_nak:
with mock.patch("contact.message_handlers.tx_handler.time.strftime", return_value="[01:02:03] "):
with mock.patch("contact.ui.contact_ui.draw_messages_window"):
with mock.patch("contact.ui.contact_ui.request_ui_redraw"):
tx_handler.onAckNak(packet)
update_ack_nak.assert_called_once_with("Primary", 55, "hello", "Implicit")