Files
contact/curses-client.py
2024-03-31 00:09:17 -07:00

384 lines
13 KiB
Python

#!/usr/bin/env python3
import curses
import meshtastic.serial_interface
from pubsub import pub
from meshtastic import config_pb2, BROADCAST_NUM
# Initialize Meshtastic interface
interface = meshtastic.serial_interface.SerialInterface()
myinfo = interface.getMyNodeInfo()
myNodeNum = myinfo['num']
all_messages = {}
channel_list = []
selected_channel = 0
selected_node = 0
direct_message = False
def get_channels():
global channel_list
node = interface.getNode('^local')
device_channels = node.channels
channel_output = []
for device_channel in device_channels:
if device_channel.role:
if device_channel.settings.name:
channel_output.append(device_channel.settings.name)
all_messages[device_channel.settings.name] = []
else:
# If channel name is blank, use the modem preset
lora_config = node.localConfig.lora
modem_preset_enum = lora_config.modem_preset
modem_preset_string = config_pb2._CONFIG_LORACONFIG_MODEMPRESET.values_by_number[modem_preset_enum].name
channel_output.append(convert_to_camel_case(modem_preset_string))
all_messages[convert_to_camel_case(modem_preset_string)] = []
channel_list = list(all_messages.keys())
def get_node_list():
node_list = []
if interface.nodes:
for node in interface.nodes.values():
# node_list.append(node["user"]["longName"])
node_list.append(node['num'])
return node_list
def decimal_to_hex(decimal_number):
return f"!{decimal_number:08x}"
def convert_to_camel_case(string):
words = string.split('_')
camel_case_string = ''.join(word.capitalize() for word in words)
return camel_case_string
def get_name_from_number(number, type='long'):
name = ""
for node in interface.nodes.values():
if number == node['num']:
if type == 'long':
name = node['user']['longName']
return name
elif type == 'short':
name = node['user']['shortName']
return name
else:
pass
else:
name = str(decimal_to_hex(number)) # If long name not found, use the ID as string
return name
def on_receive(packet, interface):
global all_messages, selected_channel, channel_list
try:
if 'decoded' in packet and packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP':
message_bytes = packet['decoded']['payload']
message_string = message_bytes.decode('utf-8')
if packet.get('channel'):
channel_number = packet['channel']
else:
channel_number = 0
if packet['to'] == myNodeNum:
if packet['from'] in channel_list:
pass
else:
channel_list.append(packet['from'])
all_messages[packet['from']] = []
draw_channel_list()
channel_number = channel_list.index(packet['from'])
if channel_list[channel_number] != channel_list[selected_channel]:
add_notification(channel_number)
# Add received message to the messages list
message_from_id = packet['from']
message_from_string = ""
for node in interface.nodes.values():
if message_from_id == node['num']:
message_from_string = node["user"]["longName"] # Get the long name using the node ID
break
else:
message_from_string = str(decimal_to_hex(message_from_id)) # If long name not found, use the ID as string
if channel_list[channel_number] in all_messages:
all_messages[channel_list[channel_number]].append((f">> {message_from_string} ", message_string))
else:
all_messages[channel_list[channel_number]] = [(f">> {message_from_string} ", message_string)]
draw_channel_list()
channel_win.refresh()
update_messages_window()
except KeyError as e:
print(f"Error processing packet: {e}")
def send_message(message, destination=BROADCAST_NUM, channel=0):
global all_messages, channel_list
# FIXME if sending a DM, always send on channel 0
send_on_channel = 0
if isinstance(channel_list[channel], int):
send_on_channel = 0
destination = channel_list[channel]
elif isinstance(channel_list[channel], str):
send_on_channel = channel
interface.sendText(
text=message,
destinationId=destination,
wantAck=False,
wantResponse=False,
onResponse=None,
channelIndex=send_on_channel,
)
# Add sent message to the messages dictionary
if channel_list[channel] in all_messages:
all_messages[channel_list[channel]].append((">> Sent: ", message))
else:
all_messages[channel_list[channel]] = [(">> Sent: ", message)]
update_messages_window()
messages_win.refresh()
def add_notification(channel_number):
if isinstance(channel_list[channel_number], str):
channel_win.addstr(channel_number+1, len(channel_list[channel_number])+1, " *", curses.color_pair(4))
# this is a DM
if isinstance(channel_list[channel_number], int):
channel_win.addstr(channel_number+1, len(get_name_from_number(channel_list[channel_number]))+1, " *", curses.color_pair(4))
channel_win.refresh()
def remove_notification(channel_number):
if isinstance(channel_list[channel_number], str):
channel_win.addstr(channel_number+1, len(channel_list[channel_number])+1, " ", curses.color_pair(4))
# this is a DM
if isinstance(channel_list[channel_number], int):
channel_win.addstr(channel_number+1, len(get_name_from_number(channel_list[channel_number]))+1, " ", curses.color_pair(4))
channel_win.refresh()
def update_messages_window():
global all_messages, selected_channel
messages_win.clear()
# Calculate how many messages can fit in the window
max_messages = messages_win.getmaxyx()[0] - 2 # Subtract 2 for the top and bottom border
# Determine the starting index for displaying messages
if channel_list[selected_channel] in all_messages:
start_index = max(0, len(all_messages[channel_list[selected_channel]]) - max_messages)
else:
# Handle the case where selected_channel does not exist
start_index = 0 # Set start_index to 0 or any other appropriate value
# Display messages starting from the calculated start index
# Check if selected_channel exists in all_messages before accessing it
if channel_list[selected_channel] in all_messages:
for row, (prefix, message) in enumerate(all_messages[channel_list[selected_channel]][start_index:], start=1):
messages_win.addstr(row, 1, prefix, curses.color_pair(1) if prefix.startswith(">> Sent:") else curses.color_pair(2))
messages_win.addstr(row, len(prefix) + 1, message + '\n')
else:
# Handle the case where selected_channel does not exist
pass
messages_win.box()
messages_win.refresh()
def draw_text_field(win, text):
win.clear()
win.border()
win.addstr(1, 1, text)
def draw_channel_list():
global direct_message
for i, (channel, message_list) in enumerate(all_messages.items()):
#convert node number to long name
if isinstance(channel,int):
channel = get_name_from_number(channel, type='long')
if selected_channel == i and not direct_message:
channel_win.addstr(i+1, 1, str(channel), curses.color_pair(3))
remove_notification(selected_channel)
else:
channel_win.addstr(i+1, 1, str(channel), curses.color_pair(4))
channel_win.refresh()
def draw_node_list(height):
global selected_node, direct_message
for i, node in enumerate(get_node_list(), start=1):
if i < height - 8 : # Check if there is enough space in the window
# if i==1: draw_debug(f"{get_node_list()} {get_name_from_number(node, 'long')}")
if selected_node + 1 == i and direct_message:
nodes_win.addstr(i, 1, get_name_from_number(node, "long"), curses.color_pair(3))
else:
nodes_win.addstr(i, 1, get_name_from_number(node, "long"), curses.color_pair(4))
nodes_win.refresh()
def draw_debug(value):
function_win.addstr(1, 100, f"debug: {value} ")
function_win.refresh()
def input_tab_channels():
global selected_channel
if selected_channel < len(channel_list)-1:
selected_channel += 1
else:
selected_channel = 0
draw_channel_list()
channel_win.refresh()
update_messages_window()
def input_tab_nodes(height):
global selected_node
if selected_node < len(get_node_list())-1:
selected_node += 1
else:
selected_node = 0
draw_node_list(height)
nodes_win.refresh()
def main(stdscr):
global messages_win, nodes_win, channel_win, function_win, selected_node, selected_channel, direct_message
# Initialize colors
curses.start_color()
curses.init_pair(1, curses.COLOR_CYAN, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_YELLOW, curses.COLOR_BLACK)
curses.init_pair(3, curses.COLOR_BLACK, curses.COLOR_WHITE)
curses.init_pair(4, curses.COLOR_WHITE, curses.COLOR_BLACK)
# Calculate window max dimensions
height, width = stdscr.getmaxyx()
# Define window dimensions and positions
entry_win = curses.newwin(3, width, 0, 0)
channel_width = width // 8
messages_width = 4 * (width // 8)
nodes_width = 3 * (width // 8)
channel_win = curses.newwin(height - 6, channel_width, 3, 0)
messages_win = curses.newwin(height - 6, messages_width, 3, channel_width)
nodes_win = curses.newwin(height - 6, nodes_width, 3, channel_width + messages_width)
function_win = curses.newwin(3, width, height - 3, 0)
draw_text_field(function_win, f"TAB = Switch Channels CTRL-D = DM ENTER = Send Message ESC = Quit")
# Enable scrolling for messages and nodes windows
messages_win.scrollok(True)
nodes_win.scrollok(True)
channel_win.scrollok(True)
get_channels()
draw_channel_list()
channel_win.refresh()
draw_node_list(height)
# Draw boxes around windows
channel_win.box()
entry_win.box()
messages_win.box()
nodes_win.box()
function_win.box()
# Refresh all windows
entry_win.refresh()
messages_win.refresh()
nodes_win.refresh()
channel_win.refresh()
function_win.refresh()
input_text = ""
direct_message = False
while True:
draw_text_field(entry_win, f"Input: {input_text}")
# Get user input from entry window
entry_win.move(1, len(input_text) + 8)
char = entry_win.getch()
# Check for Esc
if char == 27:
break
# Check for Ctrl-D
elif char == 4:
if direct_message == False:
direct_message = True
draw_channel_list()
draw_node_list(height)
channel_win.refresh()
nodes_win.refresh()
else:
direct_message = False
draw_channel_list()
draw_node_list(height)
channel_win.refresh()
nodes_win.refresh()
# Check for Tab
elif char == ord('\t'):
if direct_message:
draw_channel_list()
channel_win.refresh()
input_tab_nodes(height)
else:
input_tab_channels()
elif char == curses.KEY_ENTER or char == 10 or char == 13:
if direct_message:
node_list = get_node_list()
if node_list[selected_node] not in channel_list:
channel_list.append(node_list[selected_node])
all_messages[node_list[selected_node]] = []
selected_channel = channel_list.index(node_list[selected_node])
selected_node = 0
direct_message = False
draw_debug(direct_message)
draw_node_list(height)
draw_channel_list()
nodes_win.refresh()
channel_win.refresh()
update_messages_window()
else:
# Enter key pressed, send user input as message
send_message(input_text, channel=selected_channel)
# Clear entry window and reset input text
input_text = ""
entry_win.clear()
entry_win.refresh()
elif char == curses.KEY_BACKSPACE or char == 127:
input_text = input_text[:-1]
else:
# Append typed character to input text
input_text += chr(char)
pub.subscribe(on_receive, 'meshtastic.receive')
if __name__ == "__main__":
curses.wrapper(main)