forked from iarv/contact
Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
30402f4906 | ||
|
|
324e0b03e7 | ||
|
|
056db12911 | ||
|
|
685a2d4bf8 | ||
|
|
6ed0cc8c9f | ||
|
|
fc208a9258 | ||
|
|
eaf9381bca | ||
|
|
367af9044c | ||
|
|
d8183d9009 | ||
|
|
3fb1335be3 | ||
|
|
8b05072786 | ||
|
|
4455781e6c | ||
|
|
0c8aaee415 | ||
|
|
b97d9f4649 | ||
|
|
4152fb6a21 | ||
|
|
384e36dac2 | ||
|
|
65bca84fe6 |
@@ -53,22 +53,21 @@ logging.basicConfig(
|
|||||||
|
|
||||||
app_state.lock = threading.Lock()
|
app_state.lock = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
# Main Program Logic
|
# Main Program Logic
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
|
def prompt_region_if_unset(args: object) -> None:
|
||||||
|
"""Prompt user to set region if it is unset."""
|
||||||
|
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
|
||||||
|
if confirmation == "Yes":
|
||||||
|
set_region(interface_state.interface)
|
||||||
|
interface_state.interface.close()
|
||||||
|
interface_state.interface = initialize_interface(args)
|
||||||
|
|
||||||
|
|
||||||
def initialize_globals(args) -> None:
|
def initialize_globals(args: object) -> None:
|
||||||
"""Initializes interface and shared globals."""
|
"""Initializes interface and shared globals."""
|
||||||
interface_state.interface = initialize_interface(args)
|
|
||||||
|
|
||||||
# Prompt for region if unset
|
|
||||||
if interface_state.interface.localNode.localConfig.lora.region == 0:
|
|
||||||
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
|
|
||||||
if confirmation == "Yes":
|
|
||||||
set_region(interface_state.interface)
|
|
||||||
interface_state.interface.close()
|
|
||||||
interface_state.interface = initialize_interface(args)
|
|
||||||
|
|
||||||
interface_state.myNodeNum = get_nodeNum()
|
interface_state.myNodeNum = get_nodeNum()
|
||||||
ui_state.channel_list = get_channels()
|
ui_state.channel_list = get_channels()
|
||||||
@@ -81,55 +80,63 @@ def initialize_globals(args) -> None:
|
|||||||
|
|
||||||
def main(stdscr: curses.window) -> None:
|
def main(stdscr: curses.window) -> None:
|
||||||
"""Main entry point for the curses UI."""
|
"""Main entry point for the curses UI."""
|
||||||
|
|
||||||
output_capture = io.StringIO()
|
output_capture = io.StringIO()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
|
setup_colors()
|
||||||
setup_colors()
|
draw_splash(stdscr)
|
||||||
draw_splash(stdscr)
|
|
||||||
|
|
||||||
args = setup_parser().parse_args()
|
args = setup_parser().parse_args()
|
||||||
|
|
||||||
if getattr(args, "settings", False):
|
if getattr(args, "settings", False):
|
||||||
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
|
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
|
||||||
return
|
return
|
||||||
|
|
||||||
logging.info("Initializing interface...")
|
logging.info("Initializing interface...")
|
||||||
with app_state.lock:
|
with app_state.lock:
|
||||||
initialize_globals(args)
|
interface_state.interface = initialize_interface(args)
|
||||||
logging.info("Starting main UI")
|
|
||||||
|
|
||||||
main_ui(stdscr)
|
if interface_state.interface.localNode.localConfig.lora.region == 0:
|
||||||
|
prompt_region_if_unset(args)
|
||||||
|
|
||||||
except Exception as e:
|
initialize_globals(args)
|
||||||
console_output = output_capture.getvalue()
|
logging.info("Starting main UI")
|
||||||
logging.error("Uncaught exception: %s", e)
|
|
||||||
logging.error("Traceback: %s", traceback.format_exc())
|
try:
|
||||||
logging.error("Console output:\n%s", console_output)
|
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
|
||||||
|
main_ui(stdscr)
|
||||||
|
except Exception:
|
||||||
|
console_output = output_capture.getvalue()
|
||||||
|
logging.error("Uncaught exception inside main_ui")
|
||||||
|
logging.error("Traceback:\n%s", traceback.format_exc())
|
||||||
|
logging.error("Console output:\n%s", console_output)
|
||||||
|
return
|
||||||
|
|
||||||
|
except Exception:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
def start() -> None:
|
def start() -> None:
|
||||||
"""Launch curses wrapper and redirect logs to file."""
|
"""Entry point for the application."""
|
||||||
|
|
||||||
if "--help" in sys.argv or "-h" in sys.argv:
|
if "--help" in sys.argv or "-h" in sys.argv:
|
||||||
setup_parser().print_help()
|
setup_parser().print_help()
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
with open(config.log_file_path, "a", buffering=1) as log_f:
|
try:
|
||||||
sys.stdout = log_f
|
curses.wrapper(main)
|
||||||
sys.stderr = log_f
|
except KeyboardInterrupt:
|
||||||
|
logging.info("User exited with Ctrl+C")
|
||||||
with contextlib.redirect_stdout(log_f), contextlib.redirect_stderr(log_f):
|
sys.exit(0)
|
||||||
try:
|
except Exception as e:
|
||||||
curses.wrapper(main)
|
logging.critical("Fatal error", exc_info=True)
|
||||||
except KeyboardInterrupt:
|
try:
|
||||||
logging.info("User exited with Ctrl+C")
|
curses.endwin()
|
||||||
sys.exit(0)
|
except Exception:
|
||||||
except Exception as e:
|
pass
|
||||||
logging.error("Fatal error: %s", e)
|
print("Fatal error:", e)
|
||||||
logging.error("Traceback: %s", traceback.format_exc())
|
traceback.print_exc()
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@@ -3,11 +3,12 @@ import os
|
|||||||
import platform
|
import platform
|
||||||
import shutil
|
import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
import time
|
|
||||||
from datetime import datetime
|
|
||||||
from typing import Any, Dict
|
from typing import Any, Dict
|
||||||
|
|
||||||
from contact.utilities.utils import refresh_node_list
|
from contact.utilities.utils import (
|
||||||
|
refresh_node_list,
|
||||||
|
add_new_message,
|
||||||
|
)
|
||||||
from contact.ui.contact_ui import (
|
from contact.ui.contact_ui import (
|
||||||
draw_packetlog_win,
|
draw_packetlog_win,
|
||||||
draw_node_list,
|
draw_node_list,
|
||||||
@@ -29,37 +30,42 @@ from contact.utilities.singleton import ui_state, interface_state, app_state
|
|||||||
def play_sound():
|
def play_sound():
|
||||||
try:
|
try:
|
||||||
system = platform.system()
|
system = platform.system()
|
||||||
|
sound_path = None
|
||||||
|
executable = None
|
||||||
|
|
||||||
if system == "Darwin": # macOS
|
if system == "Darwin": # macOS
|
||||||
sound_path = "/System/Library/Sounds/Ping.aiff"
|
sound_path = "/System/Library/Sounds/Ping.aiff"
|
||||||
if os.path.exists(sound_path):
|
executable = "afplay"
|
||||||
subprocess.run(["afplay", sound_path], check=True)
|
|
||||||
return
|
|
||||||
else:
|
|
||||||
logging.warning(f"macOS sound file not found: {sound_path}")
|
|
||||||
|
|
||||||
elif system == "Linux":
|
elif system == "Linux":
|
||||||
sound_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
|
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
|
||||||
if os.path.exists(sound_path):
|
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback
|
||||||
if shutil.which("paplay"):
|
|
||||||
subprocess.run(["paplay", sound_path], check=True)
|
if shutil.which("paplay") and os.path.exists(ogg_path):
|
||||||
return
|
executable = "paplay"
|
||||||
elif shutil.which("aplay"):
|
sound_path = ogg_path
|
||||||
subprocess.run(["aplay", sound_path], check=True)
|
elif shutil.which("ffplay") and os.path.exists(ogg_path):
|
||||||
return
|
executable = "ffplay"
|
||||||
else:
|
sound_path = ogg_path
|
||||||
logging.warning("No sound player found (paplay/aplay)")
|
elif shutil.which("aplay") and os.path.exists(wav_path):
|
||||||
|
executable = "aplay"
|
||||||
|
sound_path = wav_path
|
||||||
else:
|
else:
|
||||||
logging.warning(f"Linux sound file not found: {sound_path}")
|
logging.warning("No suitable sound player or sound file found on Linux")
|
||||||
|
|
||||||
|
if executable and sound_path:
|
||||||
|
cmd = [executable, sound_path]
|
||||||
|
if executable == "ffplay":
|
||||||
|
cmd = [executable, "-nodisp", "-autoexit", sound_path]
|
||||||
|
|
||||||
|
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||||||
|
return
|
||||||
|
|
||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as e:
|
||||||
logging.error(f"Sound playback failed: {e}")
|
logging.error(f"Sound playback failed: {e}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Unexpected error: {e}")
|
logging.error(f"Unexpected error: {e}")
|
||||||
|
|
||||||
# Final fallback: terminal beep
|
|
||||||
print("\a")
|
|
||||||
|
|
||||||
|
|
||||||
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
||||||
"""
|
"""
|
||||||
@@ -119,7 +125,9 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
|||||||
|
|
||||||
channel_number = ui_state.channel_list.index(packet["from"])
|
channel_number = ui_state.channel_list.index(packet["from"])
|
||||||
|
|
||||||
if ui_state.channel_list[channel_number] != ui_state.channel_list[ui_state.selected_channel]:
|
channel_id = ui_state.channel_list[channel_number]
|
||||||
|
|
||||||
|
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
|
||||||
add_notification(channel_number)
|
add_notification(channel_number)
|
||||||
refresh_channels = True
|
refresh_channels = True
|
||||||
else:
|
else:
|
||||||
@@ -129,40 +137,14 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
|||||||
message_from_id = packet["from"]
|
message_from_id = packet["from"]
|
||||||
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
|
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
|
||||||
|
|
||||||
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
|
add_new_message(channel_id, f"{config.message_prefix} {message_from_string} ", message_string)
|
||||||
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
|
|
||||||
|
|
||||||
# Timestamp handling
|
|
||||||
current_timestamp = time.time()
|
|
||||||
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
|
|
||||||
|
|
||||||
# Retrieve the last timestamp if available
|
|
||||||
channel_messages = ui_state.all_messages[ui_state.channel_list[channel_number]]
|
|
||||||
if channel_messages:
|
|
||||||
# Check the last entry for a timestamp
|
|
||||||
for entry in reversed(channel_messages):
|
|
||||||
if entry[0].startswith("--"):
|
|
||||||
last_hour = entry[0].strip("- ").strip()
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
last_hour = None
|
|
||||||
else:
|
|
||||||
last_hour = None
|
|
||||||
|
|
||||||
# Add a new timestamp if it's a new hour
|
|
||||||
if last_hour != current_hour:
|
|
||||||
ui_state.all_messages[ui_state.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
|
|
||||||
|
|
||||||
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
|
|
||||||
(f"{config.message_prefix} {message_from_string} ", message_string)
|
|
||||||
)
|
|
||||||
|
|
||||||
if refresh_channels:
|
if refresh_channels:
|
||||||
draw_channel_list()
|
draw_channel_list()
|
||||||
if refresh_messages:
|
if refresh_messages:
|
||||||
draw_messages_window(True)
|
draw_messages_window(True)
|
||||||
|
|
||||||
save_message_to_db(ui_state.channel_list[channel_number], message_from_id, message_string)
|
save_message_to_db(channel_id, message_from_id, message_string)
|
||||||
|
|
||||||
except KeyError as e:
|
except KeyError as e:
|
||||||
logging.error(f"Error processing packet: {e}")
|
logging.error(f"Error processing packet: {e}")
|
||||||
|
|||||||
@@ -1,4 +1,3 @@
|
|||||||
from datetime import datetime
|
|
||||||
from typing import Any, Dict
|
from typing import Any, Dict
|
||||||
|
|
||||||
import google.protobuf.json_format
|
import google.protobuf.json_format
|
||||||
@@ -16,6 +15,8 @@ import contact.ui.default_config as config
|
|||||||
|
|
||||||
from contact.utilities.singleton import ui_state, interface_state
|
from contact.utilities.singleton import ui_state, interface_state
|
||||||
|
|
||||||
|
from contact.utilities.utils import add_new_message
|
||||||
|
|
||||||
ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
|
ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
|
||||||
|
|
||||||
|
|
||||||
@@ -146,8 +147,9 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
|
|||||||
update_node_info_in_db(packet["from"], chat_archived=False)
|
update_node_info_in_db(packet["from"], chat_archived=False)
|
||||||
|
|
||||||
channel_number = ui_state.channel_list.index(packet["from"])
|
channel_number = ui_state.channel_list.index(packet["from"])
|
||||||
|
channel_id = ui_state.channel_list[channel_number]
|
||||||
|
|
||||||
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
|
if channel_id == ui_state.channel_list[ui_state.selected_channel]:
|
||||||
refresh_messages = True
|
refresh_messages = True
|
||||||
else:
|
else:
|
||||||
add_notification(channel_number)
|
add_notification(channel_number)
|
||||||
@@ -155,18 +157,14 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
|
|||||||
|
|
||||||
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
|
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
|
||||||
|
|
||||||
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
|
add_new_message(channel_id, f"{config.message_prefix} {message_from_string}", msg_str)
|
||||||
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
|
|
||||||
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
|
|
||||||
(f"{config.message_prefix} {message_from_string}", msg_str)
|
|
||||||
)
|
|
||||||
|
|
||||||
if refresh_channels:
|
if refresh_channels:
|
||||||
draw_channel_list()
|
draw_channel_list()
|
||||||
if refresh_messages:
|
if refresh_messages:
|
||||||
draw_messages_window(True)
|
draw_messages_window(True)
|
||||||
save_message_to_db(ui_state.channel_list[channel_number], packet["from"], msg_str)
|
|
||||||
|
|
||||||
|
save_message_to_db(channel_id, packet["from"], msg_str)
|
||||||
|
|
||||||
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
|
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
|
||||||
"""
|
"""
|
||||||
@@ -190,32 +188,7 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
|
|||||||
channelIndex=send_on_channel,
|
channelIndex=send_on_channel,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Add sent message to the messages dictionary
|
add_new_message(channel_id, config.sent_message_prefix + config.ack_unknown_str + ": ", message)
|
||||||
if channel_id not in ui_state.all_messages:
|
|
||||||
ui_state.all_messages[channel_id] = []
|
|
||||||
|
|
||||||
# Handle timestamp logic
|
|
||||||
current_timestamp = int(datetime.now().timestamp()) # Get current timestamp
|
|
||||||
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
|
|
||||||
|
|
||||||
# Retrieve the last timestamp if available
|
|
||||||
channel_messages = ui_state.all_messages[channel_id]
|
|
||||||
if channel_messages:
|
|
||||||
# Check the last entry for a timestamp
|
|
||||||
for entry in reversed(channel_messages):
|
|
||||||
if entry[0].startswith("--"):
|
|
||||||
last_hour = entry[0].strip("- ").strip()
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
last_hour = None
|
|
||||||
else:
|
|
||||||
last_hour = None
|
|
||||||
|
|
||||||
# Add a new timestamp if it's a new hour
|
|
||||||
if last_hour != current_hour:
|
|
||||||
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
|
|
||||||
|
|
||||||
ui_state.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
|
|
||||||
|
|
||||||
timestamp = save_message_to_db(channel_id, myid, message)
|
timestamp = save_message_to_db(channel_id, myid, message)
|
||||||
|
|
||||||
@@ -230,10 +203,14 @@ def send_traceroute() -> None:
|
|||||||
"""
|
"""
|
||||||
Sends a RouteDiscovery protobuf to the selected node.
|
Sends a RouteDiscovery protobuf to the selected node.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
channel_id = ui_state.node_list[ui_state.selected_node]
|
||||||
|
add_new_message(channel_id, f"{config.message_prefix} Sent Traceroute", "")
|
||||||
|
|
||||||
r = mesh_pb2.RouteDiscovery()
|
r = mesh_pb2.RouteDiscovery()
|
||||||
interface_state.interface.sendData(
|
interface_state.interface.sendData(
|
||||||
r,
|
r,
|
||||||
destinationId=ui_state.node_list[ui_state.selected_node],
|
destinationId=channel_id,
|
||||||
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
|
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
|
||||||
wantResponse=True,
|
wantResponse=True,
|
||||||
onResponse=on_response_traceroute,
|
onResponse=on_response_traceroute,
|
||||||
|
|||||||
@@ -108,7 +108,7 @@ def main_ui(stdscr: curses.window) -> None:
|
|||||||
handle_resize(stdscr, True)
|
handle_resize(stdscr, True)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
draw_text_field(entry_win, f"Input: {input_text[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
|
draw_text_field(entry_win, f"Input: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
|
||||||
|
|
||||||
# Get user input from entry window
|
# Get user input from entry window
|
||||||
char = entry_win.get_wch()
|
char = entry_win.get_wch()
|
||||||
@@ -137,8 +137,7 @@ def main_ui(stdscr: curses.window) -> None:
|
|||||||
handle_leftright(char)
|
handle_leftright(char)
|
||||||
|
|
||||||
elif char in (chr(curses.KEY_ENTER), chr(10), chr(13)):
|
elif char in (chr(curses.KEY_ENTER), chr(10), chr(13)):
|
||||||
handle_enter(input_text)
|
input_text = handle_enter(input_text)
|
||||||
input_text = ""
|
|
||||||
|
|
||||||
elif char == chr(20): # Ctrl + t for Traceroute
|
elif char == chr(20): # Ctrl + t for Traceroute
|
||||||
handle_ctrl_t(stdscr)
|
handle_ctrl_t(stdscr)
|
||||||
@@ -298,7 +297,7 @@ def handle_leftright(char: int) -> None:
|
|||||||
refresh_pad(2)
|
refresh_pad(2)
|
||||||
|
|
||||||
|
|
||||||
def handle_enter(input_text: str) -> None:
|
def handle_enter(input_text: str) -> str:
|
||||||
"""Handle Enter key events to send messages or select channels."""
|
"""Handle Enter key events to send messages or select channels."""
|
||||||
if ui_state.current_window == 2:
|
if ui_state.current_window == 2:
|
||||||
node_list = ui_state.node_list
|
node_list = ui_state.node_list
|
||||||
@@ -318,6 +317,7 @@ def handle_enter(input_text: str) -> None:
|
|||||||
draw_node_list()
|
draw_node_list()
|
||||||
draw_channel_list()
|
draw_channel_list()
|
||||||
draw_messages_window(True)
|
draw_messages_window(True)
|
||||||
|
return input_text
|
||||||
|
|
||||||
elif len(input_text) > 0:
|
elif len(input_text) > 0:
|
||||||
# Enter key pressed, send user input as message
|
# Enter key pressed, send user input as message
|
||||||
@@ -325,8 +325,9 @@ def handle_enter(input_text: str) -> None:
|
|||||||
draw_messages_window(True)
|
draw_messages_window(True)
|
||||||
|
|
||||||
# Clear entry window and reset input text
|
# Clear entry window and reset input text
|
||||||
input_text = ""
|
|
||||||
entry_win.erase()
|
entry_win.erase()
|
||||||
|
return ""
|
||||||
|
return input_text
|
||||||
|
|
||||||
|
|
||||||
def handle_ctrl_t(stdscr: curses.window) -> None:
|
def handle_ctrl_t(stdscr: curses.window) -> None:
|
||||||
@@ -335,14 +336,14 @@ def handle_ctrl_t(stdscr: curses.window) -> None:
|
|||||||
curses.curs_set(0) # Hide cursor
|
curses.curs_set(0) # Hide cursor
|
||||||
contact.ui.dialog.dialog(
|
contact.ui.dialog.dialog(
|
||||||
stdscr,
|
stdscr,
|
||||||
"Traceroute Sent",
|
f"Traceroute Sent To: {get_name_from_database(ui_state.node_list[ui_state.selected_node])}",
|
||||||
"Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.",
|
"Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.",
|
||||||
)
|
)
|
||||||
curses.curs_set(1) # Show cursor again
|
curses.curs_set(1) # Show cursor again
|
||||||
handle_resize(stdscr, False)
|
handle_resize(stdscr, False)
|
||||||
|
|
||||||
|
|
||||||
def handle_backspace(entry_win: curses.window, input_text: str) -> None:
|
def handle_backspace(entry_win: curses.window, input_text: str) -> str:
|
||||||
"""Handle backspace key events to remove the last character from input text."""
|
"""Handle backspace key events to remove the last character from input text."""
|
||||||
if input_text:
|
if input_text:
|
||||||
input_text = input_text[:-1]
|
input_text = input_text[:-1]
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import datetime
|
import datetime
|
||||||
|
import time
|
||||||
from meshtastic.protobuf import config_pb2
|
from meshtastic.protobuf import config_pb2
|
||||||
import contact.ui.default_config as config
|
import contact.ui.default_config as config
|
||||||
|
|
||||||
@@ -134,3 +135,31 @@ def get_time_ago(timestamp):
|
|||||||
if unit != "s":
|
if unit != "s":
|
||||||
return f"{value} {unit} ago"
|
return f"{value} {unit} ago"
|
||||||
return "now"
|
return "now"
|
||||||
|
|
||||||
|
def add_new_message(channel_id, prefix, message):
|
||||||
|
if channel_id not in ui_state.all_messages:
|
||||||
|
ui_state.all_messages[channel_id] = []
|
||||||
|
|
||||||
|
# Timestamp handling
|
||||||
|
current_timestamp = time.time()
|
||||||
|
current_hour = datetime.datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
|
||||||
|
|
||||||
|
# Retrieve the last timestamp if available
|
||||||
|
channel_messages = ui_state.all_messages[channel_id]
|
||||||
|
if channel_messages:
|
||||||
|
# Check the last entry for a timestamp
|
||||||
|
for entry in reversed(channel_messages):
|
||||||
|
if entry[0].startswith("--"):
|
||||||
|
last_hour = entry[0].strip("- ").strip()
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
last_hour = None
|
||||||
|
else:
|
||||||
|
last_hour = None
|
||||||
|
|
||||||
|
# Add a new timestamp if it's a new hour
|
||||||
|
if last_hour != current_hour:
|
||||||
|
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
|
||||||
|
|
||||||
|
# Add the message
|
||||||
|
ui_state.all_messages[channel_id].append((prefix,message))
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[project]
|
[project]
|
||||||
name = "contact"
|
name = "contact"
|
||||||
version = "1.3.13"
|
version = "1.3.15"
|
||||||
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
|
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
|
||||||
authors = [
|
authors = [
|
||||||
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}
|
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}
|
||||||
|
|||||||
Reference in New Issue
Block a user