import curses import textwrap import logging import traceback from utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list from settings import settings_menu from message_handlers.tx_handler import send_message, send_traceroute from ui.colors import setup_colors, get_color from db_handler import get_name_from_database, update_node_info_in_db, is_chat_archived import default_config as config import ui.dialog import globals def handle_resize(stdscr, firstrun): global messages_pad, messages_win, nodes_pad, nodes_win, channel_pad, channel_win, function_win, packetlog_win, entry_win # Calculate window max dimensions height, width = stdscr.getmaxyx() # Define window dimensions and positions channel_width = 3 * (width // 16) nodes_width = 5 * (width // 16) messages_width = width - channel_width - nodes_width if firstrun: entry_win = curses.newwin(3, width, 0, 0) channel_win = curses.newwin(height - 6, channel_width, 3, 0) messages_win = curses.newwin(height - 6, messages_width, 3, channel_width) nodes_win = curses.newwin(height - 6, nodes_width, 3, channel_width + messages_width) function_win = curses.newwin(3, width, height - 3, 0) packetlog_win = curses.newwin(int(height / 3), messages_width, height - int(height / 3) - 3, channel_width) # Will be resized to what we need when drawn messages_pad = curses.newpad(1,1) nodes_pad = curses.newpad(1,1) channel_pad = curses.newpad(1,1) # Set background colors for windows for win in [entry_win, channel_win, messages_win, nodes_win, function_win, packetlog_win]: win.bkgd(get_color("background")) # Set background colors for pads for pad in [messages_pad, nodes_pad, channel_pad]: pad.bkgd(get_color("background")) # Set colors for window frames for win in [channel_win, entry_win, nodes_win, messages_win, function_win]: win.attrset(get_color("window_frame")) else: for win in [entry_win, channel_win, messages_win, nodes_win, function_win, packetlog_win]: win.erase() entry_win.resize(3, width) channel_win.resize(height - 6, channel_width) messages_win.resize(height - 6, messages_width) messages_win.mvwin(3, channel_width) nodes_win.resize(height - 6, nodes_width) nodes_win.mvwin(3, channel_width + messages_width) function_win.resize(3, width) function_win.mvwin(height - 3, 0) packetlog_win.resize(int(height / 3), messages_width) packetlog_win.mvwin(height - int(height / 3) - 3, channel_width) # Draw window borders for win in [channel_win, entry_win, nodes_win, messages_win, function_win]: win.box() win.refresh() entry_win.keypad(True) curses.curs_set(1) try: draw_function_win() draw_channel_list() draw_messages_window(True) draw_node_list() except: # Resize events can come faster than we can re-draw, which can cause a curses error. # In this case we'll see another curses.KEY_RESIZE in our key handler and draw again later. pass def main_ui(stdscr): stdscr.keypad(True) get_channels() handle_resize(stdscr, True) input_text = "" while True: draw_text_field(entry_win, f"Input: {input_text[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input")) # Get user input from entry window char = entry_win.get_wch() # draw_debug(f"Keypress: {char}") if char == curses.KEY_UP: if globals.current_window == 0: scroll_channels(-1) elif globals.current_window == 1: scroll_messages(-1) elif globals.current_window == 2: scroll_nodes(-1) elif char == curses.KEY_DOWN: if globals.current_window == 0: scroll_channels(1) elif globals.current_window == 1: scroll_messages(1) elif globals.current_window == 2: scroll_nodes(1) elif char == curses.KEY_HOME: if globals.current_window == 0: select_channel(0) elif globals.current_window == 1: globals.selected_message = 0 refresh_pad(1) elif globals.current_window == 2: select_node(0) elif char == curses.KEY_END: if globals.current_window == 0: select_channel(len(globals.channel_list) - 1) elif globals.current_window == 1: msg_line_count = messages_pad.getmaxyx()[0] globals.selected_message = max(msg_line_count - get_msg_window_lines(), 0) refresh_pad(1) elif globals.current_window == 2: select_node(len(globals.node_list) - 1) elif char == curses.KEY_PPAGE: if globals.current_window == 0: select_channel(globals.selected_channel - (channel_win.getmaxyx()[0] - 2)) # select_channel will bounds check for us elif globals.current_window == 1: globals.selected_message = max(globals.selected_message - get_msg_window_lines(), 0) refresh_pad(1) elif globals.current_window == 2: select_node(globals.selected_node - (nodes_win.getmaxyx()[0] - 2)) # select_node will bounds check for us elif char == curses.KEY_NPAGE: if globals.current_window == 0: select_channel(globals.selected_channel + (channel_win.getmaxyx()[0] - 2)) # select_channel will bounds check for us elif globals.current_window == 1: msg_line_count = messages_pad.getmaxyx()[0] globals.selected_message = min(globals.selected_message + get_msg_window_lines(), msg_line_count - get_msg_window_lines()) refresh_pad(1) elif globals.current_window == 2: select_node(globals.selected_node + (nodes_win.getmaxyx()[0] - 2)) # select_node will bounds check for us elif char == curses.KEY_LEFT or char == curses.KEY_RIGHT: delta = -1 if char == curses.KEY_LEFT else 1 old_window = globals.current_window globals.current_window = (globals.current_window + delta) % 3 if old_window == 0: channel_win.attrset(get_color("window_frame")) channel_win.box() channel_win.refresh() highlight_line(False, 0, globals.selected_channel) refresh_pad(0) if old_window == 1: messages_win.attrset(get_color("window_frame")) messages_win.box() messages_win.refresh() refresh_pad(1) elif old_window == 2: draw_function_win() nodes_win.attrset(get_color("window_frame")) nodes_win.box() nodes_win.refresh() highlight_line(False, 2, globals.selected_node) refresh_pad(2) if globals.current_window == 0: channel_win.attrset(get_color("window_frame_selected")) channel_win.box() channel_win.attrset(get_color("window_frame")) channel_win.refresh() highlight_line(True, 0, globals.selected_channel) refresh_pad(0) elif globals.current_window == 1: messages_win.attrset(get_color("window_frame_selected")) messages_win.box() messages_win.attrset(get_color("window_frame")) messages_win.refresh() refresh_pad(1) elif globals.current_window == 2: draw_function_win() nodes_win.attrset(get_color("window_frame_selected")) nodes_win.box() nodes_win.attrset(get_color("window_frame")) nodes_win.refresh() highlight_line(True, 2, globals.selected_node) refresh_pad(2) # Check for Esc elif char == chr(27): break # Check for Ctrl + t elif char == chr(20): send_traceroute() curses.curs_set(0) # Hide cursor ui.dialog.dialog(stdscr, "Traceroute Sent", "Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.") curses.curs_set(1) # Show cursor again handle_resize(stdscr, False) elif char in (chr(curses.KEY_ENTER), chr(10), chr(13)): if globals.current_window == 2: node_list = globals.node_list if node_list[globals.selected_node] not in globals.channel_list: globals.channel_list.append(node_list[globals.selected_node]) if(node_list[globals.selected_node] not in globals.all_messages): globals.all_messages[node_list[globals.selected_node]] = [] globals.selected_channel = globals.channel_list.index(node_list[globals.selected_node]) if(is_chat_archived(globals.channel_list[globals.selected_channel])): update_node_info_in_db(globals.channel_list[globals.selected_channel], chat_archived=False) globals.selected_node = 0 globals.current_window = 0 draw_node_list() draw_channel_list() draw_messages_window(True) elif len(input_text) > 0: # Enter key pressed, send user input as message send_message(input_text, channel=globals.selected_channel) draw_messages_window(True) # Clear entry window and reset input text input_text = "" entry_win.erase() elif char in (curses.KEY_BACKSPACE, chr(127)): if input_text: input_text = input_text[:-1] y, x = entry_win.getyx() entry_win.move(y, x - 1) entry_win.addch(' ') # entry_win.move(y, x - 1) entry_win.refresh() elif char == "`": # ` Launch the settings interface curses.curs_set(0) settings_menu(stdscr, globals.interface) curses.curs_set(1) refresh_node_list() handle_resize(stdscr, False) elif char == chr(16): # Display packet log if globals.display_log is False: globals.display_log = True draw_messages_window(True) else: globals.display_log = False packetlog_win.erase() draw_messages_window(True) elif char == curses.KEY_RESIZE: input_text = "" handle_resize(stdscr, False) # ^D elif char == chr(4): if(globals.current_window == 0): if(isinstance(globals.channel_list[globals.selected_channel], int)): update_node_info_in_db(globals.channel_list[globals.selected_channel], chat_archived=True) # Shift notifications up to account for deleted item for i in range(len(globals.notifications)): if globals.notifications[i] > globals.selected_channel: globals.notifications[i] -= 1 del globals.channel_list[globals.selected_channel] globals.selected_channel = min(globals.selected_channel, len(globals.channel_list) - 1) select_channel(globals.selected_channel) draw_channel_list() draw_messages_window() elif char == chr(31): if(globals.current_window == 2 or globals.current_window == 0): search(globals.current_window) else: # Append typed character to input text if(isinstance(char, str)): input_text += char else: input_text += chr(char) def draw_splash(stdscr): setup_colors() curses.curs_set(0) stdscr.clear() stdscr.bkgd(get_color("background")) height, width = stdscr.getmaxyx() message_1 = "/ Λ" message_2 = "/ / \\" message_3 = "P W R D" message_4 = "connecting..." start_x = width // 2 - len(message_1) // 2 start_x2 = width // 2 - len(message_4) // 2 start_y = height // 2 - 1 stdscr.addstr(start_y, start_x, message_1, get_color("splash_logo", bold=True)) stdscr.addstr(start_y+1, start_x-1, message_2, get_color("splash_logo", bold=True)) stdscr.addstr(start_y+2, start_x-2, message_3, get_color("splash_logo", bold=True)) stdscr.addstr(start_y+4, start_x2, message_4, get_color("splash_text")) stdscr.attrset(get_color("window_frame")) stdscr.box() stdscr.refresh() curses.napms(500) def draw_channel_list(): channel_pad.erase() win_height, win_width = channel_win.getmaxyx() start_index = max(0, globals.selected_channel - (win_height - 3)) # Leave room for borders channel_pad.resize(len(globals.all_messages), channel_win.getmaxyx()[1]) idx = 0 for channel in globals.channel_list: # Convert node number to long name if it's an integer if isinstance(channel, int): if is_chat_archived(channel): continue channel = get_name_from_database(channel, type='long') # Determine whether to add the notification notification = " " + config.notification_symbol if idx in globals.notifications else "" # Truncate the channel name if it's too long to fit in the window truncated_channel = ((channel[:win_width - 5] + '-' if len(channel) > win_width - 5 else channel) + notification).ljust(win_width - 3) color = get_color("channel_list") if idx == globals.selected_channel: if globals.current_window == 0: color = get_color("channel_list", reverse=True) remove_notification(globals.selected_channel) else: color = get_color("channel_selected") channel_pad.addstr(idx, 1, truncated_channel, color) idx += 1 channel_win.attrset(get_color("window_frame_selected") if globals.current_window == 0 else get_color("window_frame")) channel_win.box() channel_win.attrset((get_color("window_frame"))) channel_win.refresh() refresh_pad(0) def draw_messages_window(scroll_to_bottom = False): """Update the messages window based on the selected channel and scroll position.""" messages_pad.erase() channel = globals.channel_list[globals.selected_channel] if channel in globals.all_messages: messages = globals.all_messages[channel] msg_line_count = 0 row = 0 for (prefix, message) in messages: full_message = f"{prefix}{message}" wrapped_lines = textwrap.wrap(full_message, messages_win.getmaxyx()[1] - 2) msg_line_count += len(wrapped_lines) messages_pad.resize(msg_line_count, messages_win.getmaxyx()[1]) for line in wrapped_lines: if prefix.startswith("--"): color = get_color("timestamps") elif prefix.startswith(config.sent_message_prefix): color = get_color("tx_messages") else: color = get_color("rx_messages") messages_pad.addstr(row, 1, line, color) row += 1 messages_win.attrset(get_color("window_frame_selected") if globals.current_window == 1 else get_color("window_frame")) messages_win.box() messages_win.attrset(get_color("window_frame")) messages_win.refresh() if(scroll_to_bottom): globals.selected_message = max(msg_line_count - get_msg_window_lines(), 0) else: globals.selected_message = max(min(globals.selected_message, msg_line_count - get_msg_window_lines()), 0) refresh_pad(1) draw_packetlog_win() def draw_node_list(): global nodes_pad # This didn't work, for some reason an error is thown on startup, so we just create the pad every time # if nodes_pad is None: # nodes_pad = curses.newpad(1, 1) nodes_pad = curses.newpad(1, 1) try: nodes_pad.erase() box_width = nodes_win.getmaxyx()[1] nodes_pad.resize(len(globals.node_list) + 1, box_width) except Exception as e: logging.error(f"Error Drawing Nodes List: {e}") logging.error("Traceback: %s", traceback.format_exc()) for i, node_num in enumerate(globals.node_list): node = globals.interface.nodesByNum[node_num] secure = 'user' in node and 'publicKey' in node['user'] and node['user']['publicKey'] node_str = f"{'🔐' if secure else '🔓'} {get_name_from_database(node_num, 'long')}".ljust(box_width - 2)[:box_width - 2] nodes_pad.addstr(i, 1, node_str, get_color("node_list", reverse=globals.selected_node == i and globals.current_window == 2)) nodes_win.attrset(get_color("window_frame_selected") if globals.current_window == 2 else get_color("window_frame")) nodes_win.box() nodes_win.attrset(get_color("window_frame")) nodes_win.refresh() refresh_pad(2) def select_channel(idx): old_selected_channel = globals.selected_channel globals.selected_channel = max(0, min(idx, len(globals.channel_list) - 1)) draw_messages_window(True) # For now just re-draw channel list when clearing notifications, we can probably make this more efficient if globals.selected_channel in globals.notifications: remove_notification(globals.selected_channel) draw_channel_list() return highlight_line(False, 0, old_selected_channel) highlight_line(True, 0, globals.selected_channel) refresh_pad(0) def scroll_channels(direction): new_selected_channel = globals.selected_channel + direction if new_selected_channel < 0: new_selected_channel = len(globals.channel_list) - 1 elif new_selected_channel >= len(globals.channel_list): new_selected_channel = 0 select_channel(new_selected_channel) def scroll_messages(direction): globals.selected_message += direction msg_line_count = messages_pad.getmaxyx()[0] globals.selected_message = max(0, min(globals.selected_message, msg_line_count - get_msg_window_lines())) refresh_pad(1) def select_node(idx): old_selected_node = globals.selected_node globals.selected_node = max(0, min(idx, len(globals.node_list) - 1)) highlight_line(False, 2, old_selected_node) highlight_line(True, 2, globals.selected_node) refresh_pad(2) draw_function_win() def scroll_nodes(direction): new_selected_node = globals.selected_node + direction if new_selected_node < 0: new_selected_node = len(globals.node_list) - 1 elif new_selected_node >= len(globals.node_list): new_selected_node = 0 select_node(new_selected_node) def draw_packetlog_win(): columns = [10,10,15,30] span = 0 if globals.display_log: packetlog_win.erase() height, width = packetlog_win.getmaxyx() for column in columns[:-1]: span += column # Add headers headers = f"{'From':<{columns[0]}} {'To':<{columns[1]}} {'Port':<{columns[2]}} {'Payload':<{width-span}}" packetlog_win.addstr(1, 1, headers[:width - 2],get_color("log_header", underline=True)) # Truncate headers if they exceed window width for i, packet in enumerate(reversed(globals.packet_buffer)): if i >= height - 3: # Skip if exceeds the window height break # Format each field from_id = get_name_from_database(packet['from'], 'short').ljust(columns[0]) to_id = ( "BROADCAST".ljust(columns[1]) if str(packet['to']) == "4294967295" else get_name_from_database(packet['to'], 'short').ljust(columns[1]) ) if 'decoded' in packet: port = packet['decoded']['portnum'].ljust(columns[2]) payload = (packet['decoded']['payload']).ljust(columns[3]) else: port = "NO KEY".ljust(columns[2]) payload = "NO KEY".ljust(columns[3]) # Combine and truncate if necessary logString = f"{from_id} {to_id} {port} {payload}" logString = logString[:width - 3] # Add to the window packetlog_win.addstr(i + 2, 1, logString, get_color("log")) packetlog_win.attrset(get_color("window_frame")) packetlog_win.box() packetlog_win.refresh() def search(win): start_idx = globals.selected_node select_func = select_node if win == 0: start_idx = globals.selected_channel select_func = select_channel search_text = "" entry_win.erase() while True: draw_centered_text_field(entry_win, f"Search: {search_text}", 0, get_color("input")) char = entry_win.get_wch() if char in (chr(27), chr(curses.KEY_ENTER), chr(10), chr(13)): break elif char == "\t": start_idx = globals.selected_node + 1 if win == 2 else globals.selected_channel + 1 elif char in (curses.KEY_BACKSPACE, chr(127)): if search_text: search_text = search_text[:-1] y, x = entry_win.getyx() entry_win.move(y, x - 1) entry_win.addch(' ') # entry_win.move(y, x - 1) entry_win.erase() entry_win.refresh() elif isinstance(char, str): search_text += char search_text_caseless = search_text.casefold() l = globals.node_list if win == 2 else globals.channel_list for i, n in enumerate(l[start_idx:] + l[:start_idx]): if isinstance(n, int) and search_text_caseless in get_name_from_database(n, 'long').casefold() \ or isinstance(n, int) and search_text_caseless in get_name_from_database(n, 'short').casefold() \ or search_text_caseless in str(n).casefold(): select_func((i + start_idx) % len(l)) break entry_win.erase() def draw_node_details(): node = None try: node = globals.interface.nodesByNum[globals.node_list[globals.selected_node]] except KeyError: return function_win.erase() function_win.box() nodestr = "" width = function_win.getmaxyx()[1] node_details_list = [f"{node['user']['longName']} " if 'user' in node and 'longName' in node['user'] else "", f"({node['user']['shortName']})" if 'user' in node and 'shortName' in node['user'] else "", f" | {node['user']['hwModel']}" if 'user' in node and 'hwModel' in node['user'] else "", f" | {node['user']['role']}" if 'user' in node and 'role' in node['user'] else ""] if globals.node_list[globals.selected_node] == globals.myNodeNum: node_details_list.extend([f" | Bat: {node['deviceMetrics']['batteryLevel']}% ({node['deviceMetrics']['voltage']}v)" if 'deviceMetrics' in node and 'batteryLevel' in node['deviceMetrics'] and 'voltage' in node['deviceMetrics'] else "", f" | Up: {get_readable_duration(node['deviceMetrics']['uptimeSeconds'])}" if 'deviceMetrics' in node and 'uptimeSeconds' in node['deviceMetrics'] else "", f" | ChUtil: {node['deviceMetrics']['channelUtilization']:.2f}%" if 'deviceMetrics' in node and 'channelUtilization' in node['deviceMetrics'] else "", f" | AirUtilTX: {node['deviceMetrics']['airUtilTx']:.2f}%" if 'deviceMetrics' in node and 'airUtilTx' in node['deviceMetrics'] else "", ]) else: node_details_list.extend([f" | {get_time_ago(node['lastHeard'])}" if ('lastHeard' in node and node['lastHeard']) else "", f" | Hops: {node['hopsAway']}" if 'hopsAway' in node else "", f" | SNR: {node['snr']}dB" if ('snr' in node and 'hopsAway' in node and node['hopsAway'] == 0) else "", ]) for s in node_details_list: if len(nodestr) + len(s) < width - 2: nodestr = nodestr + s draw_centered_text_field(function_win, nodestr, 0, get_color("commands")) def draw_help(): cmds = ["↑→↓← = Select", " ENTER = Send", " ` = Settings", " ^P = Packet Log", " ESC = Quit", " ^t = Traceroute", " ^d = Archive Chat"] function_str = "" for s in cmds: if(len(function_str) + len(s) < function_win.getmaxyx()[1] - 2): function_str += s draw_centered_text_field(function_win, function_str, 0, get_color("commands")) def draw_function_win(): if(globals.current_window == 2): draw_node_details() else: draw_help() def get_msg_window_lines(): packetlog_height = packetlog_win.getmaxyx()[0] - 1 if globals.display_log else 0 return messages_win.getmaxyx()[0] - 2 - packetlog_height def refresh_pad(window): # global messages_pad, nodes_pad, channel_pad win_height = channel_win.getmaxyx()[0] if(window == 1): pad = messages_pad box = messages_win lines = get_msg_window_lines() selected_item = globals.selected_message start_index = globals.selected_message if globals.display_log: packetlog_win.box() packetlog_win.refresh() elif(window == 2): pad = nodes_pad box = nodes_win lines = box.getmaxyx()[0] - 2 selected_item = globals.selected_node start_index = max(0, selected_item - (win_height - 3)) # Leave room for borders else: pad = channel_pad box = channel_win lines = box.getmaxyx()[0] - 2 selected_item = globals.selected_channel start_index = max(0, selected_item - (win_height - 3)) # Leave room for borders pad.refresh(start_index, 0, box.getbegyx()[0] + 1, box.getbegyx()[1] + 1, box.getbegyx()[0] + lines, box.getbegyx()[1] + box.getmaxyx()[1] - 2) def highlight_line(highlight, window, line): pad = nodes_pad color = get_color("node_list") select_len = nodes_win.getmaxyx()[1] - 2 if(window == 0): pad = channel_pad color = get_color("channel_selected" if (line == globals.selected_channel and highlight == False) else "channel_list") select_len = channel_win.getmaxyx()[1] - 2 pad.chgat(line, 1, select_len, color | curses.A_REVERSE if highlight else color) def add_notification(channel_number): if channel_number not in globals.notifications: globals.notifications.append(channel_number) def remove_notification(channel_number): if channel_number in globals.notifications: globals.notifications.remove(channel_number) def draw_text_field(win, text, color): win.border() win.addstr(1, 1, text, color) def draw_centered_text_field(win, text, y_offset, color): height, width = win.getmaxyx() x = (width - len(text)) // 2 y = (height // 2) + y_offset win.addstr(y, x, text, color) win.refresh() def draw_debug(value): function_win.addstr(1, 1, f"debug: {value} ") function_win.refresh()