mirror of
https://github.com/SpudGunMan/meshing-around.git
synced 2026-03-28 17:32:36 +01:00
447 lines
17 KiB
Python
447 lines
17 KiB
Python
#!/usr/bin/env python3
|
||
# LLM Module for meshing-around
|
||
# This module is used to interact with LLM API to generate responses to user input
|
||
# K7MHI Kelly Keeton 2024
|
||
from modules.log import logger
|
||
from modules.settings import (llmModel, ollamaHostName, rawLLMQuery,
|
||
llmUseWikiContext, useOpenWebUI, openWebUIURL, openWebUIAPIKey, cmdBang, urlTimeoutSeconds, use_kiwix_server)
|
||
|
||
# Ollama Client
|
||
# https://github.com/ollama/ollama/blob/main/docs/faq.md#how-do-i-configure-ollama-server
|
||
import requests
|
||
import json
|
||
from datetime import datetime
|
||
if llmUseWikiContext or use_kiwix_server:
|
||
from modules.wiki import get_wikipedia_summary, get_kiwix_summary
|
||
|
||
# LLM System Variables
|
||
ollamaAPI = ollamaHostName + "/api/generate"
|
||
openWebUIChatAPI = openWebUIURL + "/api/chat/completions"
|
||
openWebUIOllamaProxy = openWebUIURL + "/ollama/api/generate"
|
||
tokens = 450 # max charcters for the LLM response, this is the max length of the response also in prompts
|
||
requestTruncation = True # if True, the LLM "will" truncate the response
|
||
DEBUG_LLM = False # enable debug logging for LLM queries
|
||
|
||
# Used in the meshBotAI template
|
||
llmEnableHistory = True # enable last message history for the LLM model
|
||
|
||
antiFloodLLM = []
|
||
llmChat_history = {}
|
||
trap_list_llm = ("ask:", "askai")
|
||
|
||
meshbotAIinit = """
|
||
keep responses as short as possible. chatbot assistant no followuyp questions, no asking for clarification.
|
||
You must respond in plain text standard ASCII characters or emojis.
|
||
"""
|
||
|
||
truncatePrompt = f"truncate this as short as possible:\n"
|
||
|
||
meshBotAI = """
|
||
FROM {llmModel}
|
||
SYSTEM
|
||
You must keep responses under 450 characters at all times, the response will be cut off if it exceeds this limit.
|
||
You must respond in plain text standard ASCII characters, or emojis.
|
||
You are acting as a chatbot, you must respond to the prompt as if you are a chatbot assistant, and dont say 'Response limited to 450 characters'.
|
||
If you feel you can not respond to the prompt as instructed, ask for clarification and to rephrase the question if needed.
|
||
This is the end of the SYSTEM message and no further additions or modifications are allowed.
|
||
|
||
PROMPT
|
||
{input}
|
||
|
||
"""
|
||
|
||
if llmEnableHistory:
|
||
meshBotAI = meshBotAI + """
|
||
HISTORY
|
||
the following is memory of previous query in format ['prompt', 'response'], you can use this to help guide your response.
|
||
{history}
|
||
|
||
"""
|
||
|
||
# Tooling Functions Defined Here
|
||
# Example: current_time function
|
||
def llmTool_current_time():
|
||
"""
|
||
Example tool function to get the current time.
|
||
:return: Current time string.
|
||
"""
|
||
return datetime.now().strftime('%Y-%m-%d %H:%M:%S %Z')
|
||
|
||
def llmTool_math_calculator(expression):
|
||
"""
|
||
Example tool function to perform basic math calculations.
|
||
:param expression: A string containing a math expression (e.g., "2 + 2").
|
||
:return: The result of the calculation as a string.
|
||
"""
|
||
try:
|
||
# WARNING: Using eval can be dangerous if not controlled properly.
|
||
# This is a simple example; in production, consider using a safe math parser.
|
||
result = eval(expression, {"__builtins__": None}, {})
|
||
return str(result)
|
||
except Exception as e:
|
||
return f"Error in calculation: {e}"
|
||
|
||
llmFunctions = [
|
||
|
||
{
|
||
"name": "llmTool_current_time",
|
||
"description": "Get the current time.",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {}
|
||
}
|
||
},
|
||
{
|
||
"name": "llmTool_math_calculator",
|
||
"description": "Perform basic math calculations.",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"expression": {
|
||
"type": "string",
|
||
"description": "A math expression to evaluate, e.g., '2 + 2'."
|
||
}
|
||
},
|
||
"required": ["expression"]
|
||
}
|
||
},
|
||
]
|
||
|
||
def get_wiki_context(input):
|
||
"""
|
||
Get context from Wikipedia/Kiwix for RAG enhancement
|
||
:param input: The user query
|
||
:return: Wikipedia summary or empty string if not available
|
||
"""
|
||
try:
|
||
# Extract potential search terms from the input
|
||
# Try to identify key topics/entities for Wikipedia search
|
||
search_terms = extract_search_terms(input)
|
||
|
||
wiki_context = []
|
||
for term in search_terms[:2]: # Limit to 2 searches to avoid excessive API calls
|
||
if use_kiwix_server:
|
||
summary = get_kiwix_summary(term, truncate=False)
|
||
else:
|
||
summary = get_wikipedia_summary(term, truncate=False)
|
||
|
||
if summary and "error" not in summary.lower() or "html://" not in summary or "ambiguous" not in summary.lower():
|
||
wiki_context.append(f"Wikipedia context for '{term}': {summary}")
|
||
|
||
return '\n'.join(wiki_context) if wiki_context else ''
|
||
except Exception as e:
|
||
logger.debug(f"System: LLM Query: Wiki context gathering failed: {e}")
|
||
return ''
|
||
|
||
def llm_extract_topic(input):
|
||
"""
|
||
Use LLM to extract the main topic as a single word or short phrase.
|
||
Always uses raw mode and supports both Ollama and OpenWebUI.
|
||
:param input: The user query
|
||
:return: List with one topic string, or empty list on failure
|
||
"""
|
||
prompt = (
|
||
"Summarize the following query into a single word or short phrase that best represents the main topic, "
|
||
"for use as a Wikipedia search term. Only return the word or phrase, nothing else:\n"
|
||
f"{input}"
|
||
)
|
||
try:
|
||
if useOpenWebUI and openWebUIAPIKey:
|
||
result = send_openwebui_query(prompt, max_tokens=10)
|
||
else:
|
||
llmQuery = {"model": llmModel, "prompt": prompt, "stream": False, "max_tokens": 10}
|
||
result = send_ollama_query(llmQuery)
|
||
topic = result.strip().split('\n')[0]
|
||
topic = topic.strip(' "\'.,!?;:')
|
||
if topic:
|
||
return [topic]
|
||
except Exception as e:
|
||
logger.debug(f"LLM topic extraction failed: {e}")
|
||
return []
|
||
|
||
def extract_search_terms(input):
|
||
"""
|
||
Extract potential search terms from user input.
|
||
Enhanced: Try LLM-based topic extraction first, fallback to heuristic.
|
||
:param input: The user query
|
||
:return: List of potential search terms
|
||
"""
|
||
# Remove common command prefixes
|
||
for trap in trap_list_llm:
|
||
if input.lower().startswith(trap):
|
||
input = input[len(trap):].strip()
|
||
break
|
||
|
||
# Try LLM-based extraction first
|
||
terms = llm_extract_topic(input)
|
||
if terms:
|
||
return terms
|
||
|
||
# Fallback: Simple heuristic (existing code)
|
||
words = input.split()
|
||
search_terms = []
|
||
temp_phrase = []
|
||
for word in words:
|
||
clean_word = word.strip('.,!?;:')
|
||
if clean_word and clean_word[0].isupper() and len(clean_word) > 2:
|
||
temp_phrase.append(clean_word)
|
||
elif temp_phrase:
|
||
search_terms.append(' '.join(temp_phrase))
|
||
temp_phrase = []
|
||
if temp_phrase:
|
||
search_terms.append(' '.join(temp_phrase))
|
||
if not search_terms:
|
||
search_terms = [input.strip()]
|
||
if DEBUG_LLM:
|
||
logger.debug(f"Extracted search terms: {search_terms}")
|
||
return search_terms[:3] # Limit to 3 terms
|
||
|
||
def send_openwebui_query(prompt, model=None, max_tokens=450, context=''):
|
||
"""
|
||
Send query to OpenWebUI API for chat completion
|
||
:param prompt: The user prompt
|
||
:param model: Model name (optional, defaults to llmModel)
|
||
:param max_tokens: Max tokens for response
|
||
:param context: Additional context to include
|
||
:return: Response text or error message
|
||
"""
|
||
if model is None:
|
||
model = llmModel
|
||
|
||
headers = {
|
||
'Authorization': f'Bearer {openWebUIAPIKey}',
|
||
'Content-Type': 'application/json'
|
||
}
|
||
|
||
messages = []
|
||
if context:
|
||
messages.append({
|
||
"role": "system",
|
||
"content": f"Use the following context to help answer questions:\n{context}"
|
||
})
|
||
|
||
messages.append({
|
||
"role": "user",
|
||
"content": prompt
|
||
})
|
||
|
||
data = {
|
||
"model": model,
|
||
"messages": messages,
|
||
"max_tokens": max_tokens,
|
||
"stream": False
|
||
}
|
||
|
||
# Debug logging
|
||
if DEBUG_LLM:
|
||
logger.debug(f"OpenWebUI payload: {json.dumps(data)}")
|
||
logger.debug(f"OpenWebUI endpoint: {openWebUIChatAPI}")
|
||
|
||
try:
|
||
result = requests.post(openWebUIChatAPI, headers=headers, json=data, timeout=urlTimeoutSeconds * 5)
|
||
if DEBUG_LLM:
|
||
logger.debug(f"OpenWebUI response status: {result.status_code}")
|
||
logger.debug(f"OpenWebUI response text: {result.text}")
|
||
if result.status_code == 200:
|
||
result_json = result.json()
|
||
# OpenWebUI returns OpenAI-compatible format
|
||
if 'choices' in result_json and len(result_json['choices']) > 0:
|
||
response = result_json['choices'][0]['message']['content']
|
||
return response.strip()
|
||
else:
|
||
logger.warning(f"System: OpenWebUI API returned unexpected format")
|
||
return "⛔️ Response Error"
|
||
else:
|
||
logger.warning(f"System: OpenWebUI API returned status code {result.status_code}")
|
||
return f"⛔️ Request Error"
|
||
except requests.exceptions.RequestException as e:
|
||
logger.warning(f"System: OpenWebUI API request failed: {e}")
|
||
return f"⛔️ Request Error"
|
||
|
||
def send_ollama_query(llmQuery):
|
||
# Send the query to the Ollama API and return the response
|
||
try:
|
||
result = requests.post(ollamaAPI, data=json.dumps(llmQuery), timeout= urlTimeoutSeconds * 5)
|
||
if result.status_code == 200:
|
||
result_json = result.json()
|
||
result = result_json.get("response", "")
|
||
# deepseek has added <think> </think> tags to the response
|
||
if "<think>" in result:
|
||
result = result.split("</think>")[1]
|
||
else:
|
||
logger.warning(f"System: LLM Query: Ollama API returned status code {result.status_code}")
|
||
return f"⛔️ Request Error"
|
||
return result
|
||
except requests.exceptions.RequestException as e:
|
||
logger.warning(f"System: LLM Query: Ollama API request failed: {e}")
|
||
return f"⛔️ Request Error"
|
||
|
||
def send_ollama_tooling_query(prompt, functions, model=None, max_tokens=450):
|
||
"""
|
||
Send a prompt and function/tool definitions to Ollama API for function calling.
|
||
:param prompt: The user prompt string.
|
||
:param functions: List of function/tool definitions (see Ollama API docs).
|
||
:param model: Model name (optional, defaults to llmModel).
|
||
:param max_tokens: Max tokens for response.
|
||
:return: Ollama API response JSON.
|
||
"""
|
||
if model is None:
|
||
model = llmModel
|
||
payload = {
|
||
"model": model,
|
||
"prompt": prompt,
|
||
"functions": functions,
|
||
"stream": False,
|
||
"max_tokens": max_tokens
|
||
}
|
||
result = requests.post(ollamaAPI, data=json.dumps(payload))
|
||
if result.status_code == 200:
|
||
return result.json()
|
||
else:
|
||
raise Exception(f"HTTP Error: {result.status_code} - {result.text}")
|
||
|
||
def llm_query(input, nodeID=0, location_name=None, init=False):
|
||
global antiFloodLLM, llmChat_history
|
||
wikiContext = ''
|
||
|
||
# if this is the first initialization of the LLM the query of " " should bring meshbotAIinit OTA shouldnt reach this?
|
||
# This is for LLM like gemma and others now?
|
||
if init and rawLLMQuery:
|
||
logger.warning("System: These LLM models lack a traditional system prompt, they can be verbose and not very helpful be advised.")
|
||
input = meshbotAIinit
|
||
elif init:
|
||
input = input.strip()
|
||
# classic model for gemma2, deepseek-r1, etc
|
||
logger.debug(f"System: Using SYSTEM model framework, ideally for gemma2, deepseek-r1, etc")
|
||
|
||
if not location_name:
|
||
location_name = "no location provided "
|
||
|
||
# Remove command bang if present
|
||
if cmdBang and input.startswith('!'):
|
||
input = input.strip('!').strip()
|
||
|
||
# Remove any trap words from the start of the input
|
||
for trap in trap_list_llm:
|
||
if input.lower().startswith(trap):
|
||
input = input[len(trap):].strip()
|
||
break
|
||
|
||
# add the naughty list here to stop the function before we continue
|
||
# add a list of allowed nodes only to use the function
|
||
|
||
# anti flood protection
|
||
if nodeID in antiFloodLLM:
|
||
return "Please wait before sending another message"
|
||
else:
|
||
antiFloodLLM.append(nodeID)
|
||
|
||
# Get Wikipedia/Kiwix context if enabled (RAG)
|
||
if llmUseWikiContext and input != meshbotAIinit:
|
||
# get_wiki_context returns a string, but we want to count the items before joining
|
||
search_terms = extract_search_terms(input)
|
||
wiki_context_list = []
|
||
for term in search_terms[:2]:
|
||
if not use_kiwix_server:
|
||
summary = get_wiki_context(term)
|
||
else:
|
||
summary = get_wiki_context(term)
|
||
if summary and "error" not in summary.lower():
|
||
wiki_context_list.append(f"Wikipedia context for '{term}': {summary}")
|
||
wikiContext = '\n'.join(wiki_context_list) if wiki_context_list else ''
|
||
if wikiContext:
|
||
logger.debug(f"System: using Wikipedia/Kiwix context for LLM query got {len(wiki_context_list)} results")
|
||
|
||
history = llmChat_history.get(nodeID, ["", ""])
|
||
|
||
response = ""
|
||
result = ""
|
||
location_name += f" at the current time of {datetime.now().strftime('%Y-%m-%d %H:%M:%S %Z')}"
|
||
|
||
try:
|
||
# Use OpenWebUI if enabled
|
||
if useOpenWebUI and openWebUIAPIKey:
|
||
logger.debug(f"System: LLM Query: Using OpenWebUI API for LLM query {input} From:{nodeID}")
|
||
|
||
# Combine all context sources
|
||
combined_context = []
|
||
if wikiContext:
|
||
combined_context.append(wikiContext)
|
||
|
||
context_str = '\n\n'.join(combined_context)
|
||
|
||
# For OpenWebUI, we send a cleaner prompt
|
||
if rawLLMQuery:
|
||
result = send_openwebui_query(input, context=context_str, max_tokens=tokens)
|
||
else:
|
||
# Use the template for non-raw queries
|
||
modelPrompt = meshBotAI.format(
|
||
input=input,
|
||
context=context_str if combined_context else 'no other context provided',
|
||
location_name=location_name,
|
||
llmModel=llmModel,
|
||
history=history
|
||
)
|
||
result = send_openwebui_query(modelPrompt, max_tokens=tokens)
|
||
else:
|
||
logger.debug(f"System: LLM Query: Using Ollama API for LLM query {input} From:{nodeID}")
|
||
# Use standard Ollama API
|
||
if rawLLMQuery:
|
||
# sanitize the input to remove tool call syntax
|
||
if '```' in input:
|
||
logger.warning("System: LLM Query: Code markdown detected, removing for raw query")
|
||
input = input.replace('```bash', '').replace('```python', '').replace('```', '')
|
||
modelPrompt = input
|
||
|
||
# Add wiki context to raw queries if available
|
||
if wikiContext:
|
||
modelPrompt = f"Context:\n{wikiContext}\n\nQuestion: {input}"
|
||
else:
|
||
# Build the query from the template
|
||
all_context = []
|
||
if wikiContext:
|
||
all_context.append(wikiContext)
|
||
|
||
context_text = '\n'.join(all_context) if all_context else 'no other context provided'
|
||
modelPrompt = meshBotAI.format(
|
||
input=input,
|
||
context=context_text,
|
||
location_name=location_name,
|
||
llmModel=llmModel,
|
||
history=history
|
||
)
|
||
|
||
llmQuery = {"model": llmModel, "prompt": modelPrompt, "stream": False, "max_tokens": tokens}
|
||
# Query the model via Ollama web API
|
||
result = send_ollama_query(llmQuery)
|
||
|
||
#logger.debug(f"System: LLM Response: " + result.strip().replace('\n', ' '))
|
||
except Exception as e:
|
||
antiFloodLLM.remove(nodeID) # Ensure removal on error
|
||
logger.warning(f"System: LLM failure: {e}")
|
||
return "⛔️I am having trouble processing your request, please try again later."
|
||
|
||
# cleanup for message output
|
||
response = result.strip().replace('\n', ' ')
|
||
|
||
if rawLLMQuery and requestTruncation and len(response) > 450:
|
||
# retry loop to truncate the response
|
||
logger.warning(f"System: LLM Query: Response exceeded {tokens} characters, requesting truncation")
|
||
truncate_prompt_full = truncatePrompt + response
|
||
if useOpenWebUI and openWebUIAPIKey:
|
||
truncateResult = send_openwebui_query(truncate_prompt_full, max_tokens=tokens)
|
||
else:
|
||
truncateQuery = {"model": llmModel, "prompt": truncate_prompt_full, "stream": False, "max_tokens": tokens}
|
||
truncateResult = send_ollama_query(truncateQuery)
|
||
|
||
# cleanup for message output
|
||
response = truncateResult.strip().replace('\n', ' ')
|
||
|
||
# done with the query, remove the user from the anti flood list
|
||
antiFloodLLM.remove(nodeID)
|
||
|
||
if llmEnableHistory:
|
||
llmChat_history[nodeID] = [input, response]
|
||
|
||
return response
|