2024-10-06 21:23:55 -04:00
import configparser
import json
import os
import pprint
import queue
2024-10-06 23:19:57 -04:00
import random
2024-10-06 21:23:55 -04:00
import re
import secrets
import sqlite3
import threading
import time
import uuid
from datetime import datetime
from typing import List , Optional
2024-10-06 23:19:57 -04:00
import enum
2024-10-06 21:23:55 -04:00
import GPUtil
import ollama
import psutil
import structlog
2024-10-06 23:19:57 -04:00
import logging
2024-10-06 21:23:55 -04:00
from flask import Flask , g , jsonify , request , send_from_directory
2024-09-26 13:38:25 -04:00
from flask_socketio import SocketIO , emit
from pydantic import BaseModel
2024-10-06 23:51:01 -04:00
from werkzeug . utils import secure_filename
import base64
2024-10-06 21:23:55 -04:00
2024-09-26 13:38:25 -04:00
from models import model_manager
2024-09-29 12:18:44 -04:00
from tools import DefaultToolManager
2024-10-06 21:23:55 -04:00
2024-10-06 23:19:57 -04:00
# Configure logging
logging . basicConfig ( level = logging . INFO , format = " %(message)s " )
console_handler = logging . StreamHandler ( )
console_handler . setLevel ( logging . INFO )
2024-09-26 13:38:25 -04:00
logger = structlog . get_logger ( )
2024-10-06 21:23:55 -04:00
# Configuration setup
CONFIG_FILE = " config.ini "
2024-10-06 23:19:57 -04:00
# Add this near the top of the file, after imports
processing_thread = None
processing_thread_started = False
2024-10-06 23:51:01 -04:00
ALLOWED_EXTENSIONS = { ' png ' , ' jpg ' , ' jpeg ' , ' gif ' }
MAX_IMAGE_SIZE = 1 * 1024 * 1024 # 1MB
2024-10-06 21:23:55 -04:00
def create_default_config ( ) :
config = configparser . ConfigParser ( )
config [ " DEFAULT " ] = {
" AdminKey " : secrets . token_urlsafe ( 32 ) ,
" DatabasePath " : " llm_chat_server.db " ,
}
config [ " SERVER_FEATURES " ] = {
" EnableFrontend " : " false " ,
" EnableChatEndpoints " : " false " ,
" EnableAPIEndpoints " : " true " ,
}
config [ " MODEL " ] = { " PrimaryModel " : " qwen2.5:14b " }
config [ " PERFORMANCE " ] = { " UpdateInterval " : " 0.1 " }
with open ( CONFIG_FILE , " w " ) as configfile :
config . write ( configfile )
def load_config ( ) :
if not os . path . exists ( CONFIG_FILE ) :
create_default_config ( )
config = configparser . ConfigParser ( )
config . read ( CONFIG_FILE )
return config
config = load_config ( )
ADMIN_KEY = config [ " DEFAULT " ] [ " AdminKey " ]
DATABASE = config [ " DEFAULT " ] [ " DatabasePath " ]
ENABLE_FRONTEND = config [ " SERVER_FEATURES " ] . getboolean ( " EnableFrontend " )
ENABLE_CHAT_ENDPOINTS = config [ " SERVER_FEATURES " ] . getboolean ( " EnableChatEndpoints " )
ENABLE_API_ENDPOINTS = config [ " SERVER_FEATURES " ] . getboolean ( " EnableAPIEndpoints " )
PRIMARY_MODEL = config [ " MODEL " ] [ " PrimaryModel " ]
UPDATE_INTERVAL = config [ " PERFORMANCE " ] . getfloat ( " UpdateInterval " )
2024-10-06 23:19:57 -04:00
app = Flask ( __name__ )
2024-09-26 13:38:25 -04:00
socketio = SocketIO ( app , cors_allowed_origins = " * " )
2024-09-23 18:20:00 -04:00
2024-09-29 12:18:44 -04:00
tool_manager = DefaultToolManager ( )
2024-10-06 21:23:55 -04:00
# Database setup
def get_db ( ) :
db = getattr ( g , " _database " , None )
if db is None :
db = g . _database = sqlite3 . connect ( DATABASE )
db . row_factory = sqlite3 . Row
return db
@app.teardown_appcontext
def close_connection ( exception ) :
db = getattr ( g , " _database " , None )
if db is not None :
db . close ( )
2024-10-06 23:19:57 -04:00
class QueryStatus ( enum . Enum ) :
QUEUED = " queued "
PROCESSING = " processing "
DONE = " done "
2024-10-06 21:23:55 -04:00
def init_db ( ) :
with app . app_context ( ) :
db = get_db ( )
2024-10-06 21:34:09 -04:00
db . execute ( """
2024-10-06 21:32:16 -04:00
CREATE TABLE IF NOT EXISTS Keys (
id INTEGER PRIMARY KEY AUTOINCREMENT ,
username TEXT NOT NULL UNIQUE ,
api_key TEXT NOT NULL UNIQUE
) ;
2024-10-06 21:34:09 -04:00
""" )
2024-10-06 21:43:49 -04:00
db . execute ( '''
2024-10-06 21:34:09 -04:00
CREATE TABLE IF NOT EXISTS Queries (
2024-10-06 21:43:49 -04:00
id TEXT PRIMARY KEY ,
2024-10-06 21:23:55 -04:00
ip TEXT NOT NULL ,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ,
query TEXT NOT NULL ,
api_key_id INTEGER ,
2024-10-06 23:19:57 -04:00
status TEXT NOT NULL ,
2024-10-06 21:23:55 -04:00
conversation_history TEXT ,
FOREIGN KEY ( api_key_id ) REFERENCES Keys ( id )
2024-10-06 21:43:49 -04:00
)
''' )
2024-10-06 21:23:55 -04:00
db . commit ( )
# Create a schema.sql file with the following content:
"""
CREATE TABLE IF NOT EXISTS Keys (
id INTEGER PRIMARY KEY AUTOINCREMENT ,
username TEXT NOT NULL UNIQUE ,
api_key TEXT NOT NULL UNIQUE
) ;
CREATE TABLE IF NOT EXISTS Queries (
2024-10-06 21:43:49 -04:00
id TEXT PRIMARY KEY ,
2024-10-06 21:23:55 -04:00
ip TEXT NOT NULL ,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ,
query TEXT NOT NULL ,
api_key_id INTEGER ,
2024-10-06 23:19:57 -04:00
status TEXT NOT NULL ,
2024-10-06 21:23:55 -04:00
conversation_history TEXT ,
FOREIGN KEY ( api_key_id ) REFERENCES Keys ( id )
) ;
"""
def validate_api_key ( api_key ) :
db = get_db ( )
cursor = db . cursor ( )
cursor . execute ( " SELECT id FROM Keys WHERE api_key = ? " , ( api_key , ) )
result = cursor . fetchone ( )
return result [ 0 ] if result else None
@app.route ( " / " )
2024-09-26 13:38:25 -04:00
def index ( ) :
2024-10-06 21:23:55 -04:00
if ENABLE_FRONTEND :
logger . info ( " Serving index.html " )
return send_from_directory ( " . " , " index.html " )
else :
return jsonify ( { " error " : " Frontend is disabled " } ) , 404
2024-09-23 18:20:00 -04:00
2024-09-26 13:38:25 -04:00
class ChatRequest ( BaseModel ) :
message : str
2024-09-23 18:20:00 -04:00
2024-10-06 21:23:55 -04:00
2024-09-26 13:38:25 -04:00
class ChatResponse ( BaseModel ) :
response : str
2024-10-06 21:23:55 -04:00
@socketio.on ( " chat_request " )
2024-09-26 13:38:25 -04:00
def handle_chat_request ( data ) :
2024-10-06 21:23:55 -04:00
if not ENABLE_CHAT_ENDPOINTS :
emit ( " error " , { " message " : " Chat endpoints are disabled " } )
return
user_input = data [ " message " ]
conversation_history = data . get ( " conversation_history " , [ ] )
conversation_history = [
{ " role " : " system " , " content " : ANSWER_QUESTION_PROMPT }
] + conversation_history
logger . info (
" Received chat request " ,
user_input = user_input ,
conversation_history = conversation_history ,
)
2024-09-26 13:38:25 -04:00
start_time = time . time ( )
try :
2024-09-29 12:18:44 -04:00
final_response = answer_question_tools ( user_input , conversation_history )
2024-09-26 13:38:25 -04:00
end_time = time . time ( )
thinking_time = round ( end_time - start_time , 2 )
2024-10-06 21:23:55 -04:00
emit (
" chat_response " ,
{ " response " : final_response , " thinking_time " : thinking_time } ,
)
2024-09-26 13:38:25 -04:00
except Exception as e :
logger . exception ( " Error during chat processing " , error = str ( e ) )
end_time = time . time ( )
thinking_time = round ( end_time - start_time , 2 )
2024-10-06 21:23:55 -04:00
emit (
" error " ,
{ " message " : f " An error occurred: { str ( e ) } " , " thinking_time " : thinking_time } ,
)
2024-09-26 13:38:25 -04:00
2024-10-06 21:23:55 -04:00
def answer_question_tools (
user_input : str , conversation_history : List [ dict ] , max_retries : int = 100
) :
2024-09-29 12:18:44 -04:00
global tool_manager
2024-10-06 21:23:55 -04:00
2024-09-29 12:18:44 -04:00
# If conversation_history is empty, initialize it with the system prompt
if not conversation_history :
conversation_history = [
{ " role " : " system " , " content " : ANSWER_QUESTION_PROMPT } ,
]
2024-10-06 21:23:55 -04:00
logger . info (
" Starting chat " ,
user_input = user_input ,
conversation_history = conversation_history ,
)
2024-09-29 12:18:44 -04:00
# Add the new user input to the conversation history
conversation_history . append ( { " role " : " user " , " content " : user_input } )
2024-10-06 21:23:55 -04:00
emit ( " thinking " , { " step " : " Starting " } )
emit ( " conversation_history " , { " history " : conversation_history } )
2024-09-26 13:38:25 -04:00
2024-10-02 19:48:25 -04:00
last_thought_content = None
2024-10-02 19:33:14 -04:00
for _ in range ( max_retries ) :
2024-10-06 21:23:55 -04:00
response = ollama . chat (
model = PRIMARY_MODEL ,
messages = conversation_history ,
tools = tool_manager . get_tools_for_ollama_dict ( ) ,
stream = False ,
)
assistant_message = response [ " message " ]
2024-09-29 12:18:44 -04:00
conversation_history . append ( assistant_message )
2024-10-06 21:23:55 -04:00
emit ( " conversation_history " , { " history " : conversation_history } )
2024-09-29 12:18:44 -04:00
pprint . pp ( assistant_message )
2024-10-06 21:23:55 -04:00
if " tool_calls " in assistant_message :
for tool_call in assistant_message [ " tool_calls " ] :
tool_name = tool_call [ " function " ] [ " name " ]
tool_args = tool_call [ " function " ] [ " arguments " ]
emit (
" thought " ,
{
" type " : " tool_call " ,
" content " : f " Tool: { tool_name } \n Arguments: { tool_args } " ,
} ,
)
2024-09-29 12:18:44 -04:00
tool_response = tool_manager . get_tool ( tool_name ) . execute ( tool_args )
2024-10-06 21:23:55 -04:00
conversation_history . append ( { " role " : " tool " , " content " : tool_response } )
emit ( " conversation_history " , { " history " : conversation_history } )
emit ( " thought " , { " type " : " tool_result " , " content " : tool_response } )
2024-09-29 12:18:44 -04:00
else :
2024-10-06 21:23:55 -04:00
if " <reply> " in assistant_message [ " content " ] . lower ( ) :
reply_content = re . search (
r " <reply>(.*?)</reply> " , assistant_message [ " content " ] , re . DOTALL
)
2024-10-02 19:33:14 -04:00
if reply_content :
reply_answer = reply_content . group ( 1 ) . strip ( )
2024-10-06 21:23:55 -04:00
emit ( " thought " , { " type " : " answer " , " content " : reply_answer } )
2024-10-02 19:33:14 -04:00
return reply_answer
2024-09-29 12:18:44 -04:00
else :
2024-10-06 21:23:55 -04:00
current_thought_content = assistant_message [ " content " ] . strip ( )
emit (
" thought " , { " type " : " thoughts " , " content " : current_thought_content }
)
2024-10-02 19:48:25 -04:00
# Check for two consecutive thoughts, with the second being empty
if last_thought_content and not current_thought_content :
2024-10-06 21:23:55 -04:00
emit ( " thought " , { " type " : " answer " , " content " : last_thought_content } )
2024-10-02 19:48:25 -04:00
return last_thought_content
2024-10-06 21:23:55 -04:00
2024-10-02 19:48:25 -04:00
last_thought_content = current_thought_content
2024-10-02 19:33:14 -04:00
continue
2024-09-26 13:38:25 -04:00
2024-09-29 12:18:44 -04:00
return f " Max iterations reached. Last response: { assistant_message [ ' content ' ] } "
2024-09-26 13:38:25 -04:00
2024-10-06 21:23:55 -04:00
2024-10-02 19:33:14 -04:00
ANSWER_QUESTION_PROMPT2 = f """
2024-09-29 12:18:44 -04:00
The current date is { datetime . now ( ) . strftime ( " % A, % B %d , % Y " ) } , your knowledge cutoff was December 2023.
You are Dewey , an AI assistant with access to external tools and the ability to think through complex problems . Your role is to assist users by leveraging tools when necessary , thinking deeply about problems , and providing accurate and helpful information , all with a cheerful , but witty personality . Here are the tools available to you :
2024-09-26 13:38:25 -04:00
2024-09-29 12:18:44 -04:00
{ tool_manager . get_tools_and_descriptions_for_prompt ( ) }
2024-09-26 13:38:25 -04:00
2024-09-29 12:18:44 -04:00
When addressing a query , follow these steps :
2024-09-26 13:38:25 -04:00
2024-09-29 12:18:44 -04:00
1. Analyze : Thoroughly analyze the query and consider multiple approaches to solving it .
2024-09-26 13:38:25 -04:00
2024-09-29 12:18:44 -04:00
2. Plan : Develop a plan of action , considering whether you need to use any tools or if you can answer directly .
2024-09-26 13:38:25 -04:00
2024-09-29 12:18:44 -04:00
3. Execute : If you need to use a tool , call it as you would a function . If not , proceed with your reasoning .
2024-10-01 19:31:57 -04:00
- Analyse the given prompt and decided whether or not it can be answered by a tool . If it can , use the following functions to respond with a JSON for a function call with its proper arguments that best answers the given prompt . Respond in the format \" name \" : function name, \" parameters \" : dictionary of argument name and its value. Do not use variables.
2024-09-26 13:38:25 -04:00
2024-09-29 12:18:44 -04:00
4. Reflect : After each step or tool use , reflect on the results :
- If successful , consider if the result fully answers the user ' s query or if additional steps are needed.
- If there were errors or the result is unsatisfactory , don ' t give up! Use Tree of Thoughts reasoning:
a ) Generate multiple alternative approaches or modifications to your previous approach .
b ) Briefly evaluate the potential of each alternative .
c ) Choose the most promising alternative and execute it .
d ) Repeat this process if needed , building upon your growing understanding of the problem .
e ) You cannot return a final answer after an error using a tool , you must try again .
2024-09-26 13:38:25 -04:00
2024-09-29 12:18:44 -04:00
5. Iterate : Continue this process of execution and reflection , exploring different branches of thought as needed .
2024-09-26 13:38:25 -04:00
2024-09-29 12:18:44 -04:00
6. Conclude : When you believe you have a comprehensive answer to the user ' s query, provide your final answer.
2024-09-26 13:38:25 -04:00
2024-10-01 19:31:57 -04:00
Always explain your thought process , including your reasoning for each decision and how you arrived at your conclusions . If you ' re providing a final answer, or need more input from the user, put your response in tags <answer></answer>.
2024-09-26 13:38:25 -04:00
2024-09-29 12:18:44 -04:00
Remember , complex problems often require multiple steps and iterations . Don ' t hesitate to break down the problem, use tools multiple times, or explore different approaches to arrive at the best solution.
2024-10-01 19:31:57 -04:00
Before approaching a problem , come up with a few ways you might solve it , and then choose the most promising approach . Repeat this on each iteration .
2024-09-26 13:38:25 -04:00
"""
2024-10-02 19:33:14 -04:00
ANSWER_QUESTION_PROMPT = f """
You are Dewey , an AI assistant with a personality that combines the wit and sarcasm of Dr . Gregory House from House MD with the helpfulness and intelligence of Jarvis from Iron Man . Today ' s date is { datetime.now().strftime( " % A, % B %d , % Y " )}. Your knowledge cutoff date is December 2023.
When responding to user queries , follow these steps :
Analyze the user ' s request
Option 1 : [ First interpretation of the request ]
Option 2 : [ Second interpretation of the request ]
. . . ( up to 5 options )
Selected approach : [ Choose the most promising option or combine the two best ]
Break down the task into subtasks
Option 1 : [ First breakdown of subtasks ]
Option 2 : [ Second breakdown of subtasks ]
. . . ( up to 5 options )
Selected breakdown : [ Choose the most promising option or combine the two best ]
For each subtask , consider available tools :
{ tool_manager . get_tools_and_descriptions_for_prompt ( ) }
Option 1 : [ First approach using tools ]
Option 2 : [ Second approach using tools ]
. . . ( up to 5 options )
Selected tool usage : [ Choose the most promising option or combine the two best ]
Execute the plan
Option 1 : [ First execution plan ]
Option 2 : [ Second execution plan ]
. . . ( up to 5 options )
Selected execution : [ Choose the most promising option or combine the two best ]
Review and refine the response
Option 1 : [ First refined response ]
Option 2 : [ Second refined response ]
. . . ( up to 5 options )
Selected response : [ Choose the most promising option or combine the two best ]
Verify the results
Check 1 : [ First verification method ]
Check 2 : [ Second verification method ]
. . . ( up to 5 checks )
Verification outcome : [ Summarize the verification results ]
Generate the final response to the user within < reply > < / reply > tags :
< reply >
[ Final response goes here , incorporating the following guidelines : ]
- Be conversational and engaging
- Maintain a witty and slightly sarcastic tone , reminiscent of Dr . Gregory House
- Deliver factual information with the precision and helpfulness of Jarvis
- Use clever analogies or pop culture references when appropriate
- Don ' t be afraid to challenge the user ' s assumptions , but always in a constructive manner
- Ensure the response is tailored to the user ' s query while showcasing your unique personality
< / reply >
Remember to always be helpful , accurate , and respectful in your interactions , while maintaining your distinctive character blend of House and Jarvis .
"""
2024-09-26 17:01:00 -04:00
def get_system_resources ( ) :
cpu_load = psutil . cpu_percent ( )
memory = psutil . virtual_memory ( )
memory_usage = memory . percent
disk_io = psutil . disk_io_counters ( )
disk_read = disk_io . read_bytes
disk_write = disk_io . write_bytes
2024-10-06 21:23:55 -04:00
2024-09-26 17:01:00 -04:00
gpus = GPUtil . getGPUs ( )
gpu_load = gpus [ 0 ] . load * 100 if gpus else 0
gpu_memory = gpus [ 0 ] . memoryUtil * 100 if gpus else 0
2024-10-06 21:23:55 -04:00
2024-09-26 17:01:00 -04:00
return {
2024-10-06 21:23:55 -04:00
" cpu_load " : cpu_load ,
" memory_usage " : memory_usage ,
" disk_read " : disk_read ,
" disk_write " : disk_write ,
" gpu_load " : gpu_load ,
" gpu_memory " : gpu_memory ,
2024-09-26 17:01:00 -04:00
}
2024-10-06 21:23:55 -04:00
2024-09-26 17:01:00 -04:00
def send_system_resources ( ) :
last_disk_read = 0
last_disk_write = 0
while True :
resources = get_system_resources ( )
2024-10-06 21:23:55 -04:00
2024-09-26 17:01:00 -04:00
# Calculate disk I/O rates
2024-10-06 21:23:55 -04:00
disk_read_rate = ( resources [ " disk_read " ] - last_disk_read ) / UPDATE_INTERVAL
disk_write_rate = ( resources [ " disk_write " ] - last_disk_write ) / UPDATE_INTERVAL
socketio . emit (
" system_resources " ,
{
" cpu_load " : resources [ " cpu_load " ] ,
" memory_usage " : resources [ " memory_usage " ] ,
" disk_read_rate " : disk_read_rate ,
" disk_write_rate " : disk_write_rate ,
" gpu_load " : resources [ " gpu_load " ] ,
" gpu_memory " : resources [ " gpu_memory " ] ,
} ,
)
last_disk_read = resources [ " disk_read " ]
last_disk_write = resources [ " disk_write " ]
2024-09-26 17:01:00 -04:00
time . sleep ( UPDATE_INTERVAL )
2024-10-06 21:23:55 -04:00
class QueryRequest ( BaseModel ) :
message : str
class QueryResponse ( BaseModel ) :
query_id : str
class QueryStatusResponse ( BaseModel ) :
status : str
conversation_history : Optional [ List [ dict ] ]
@app.post (
2024-10-06 23:19:57 -04:00
" /api/v1/query "
2024-10-06 21:23:55 -04:00
)
2024-10-06 23:19:57 -04:00
def api_query ( ) :
2024-10-06 21:23:55 -04:00
"""
Submit a new query to the LLM Chat Server .
This endpoint requires authentication via an API key .
Sample cURL :
curl - X POST http : / / localhost : 5001 / api / v1 / query \
- H " Content-Type: application/json " \
- H " X-API-Key: your-api-key " \
- d ' { " message " : " What is the capital of France? " } '
"""
if not ENABLE_API_ENDPOINTS :
return jsonify ( { " error " : " API endpoints are disabled " } ) , 404
2024-10-06 21:43:49 -04:00
api_key = request . headers . get ( ' X-API-Key ' )
2024-10-06 21:23:55 -04:00
if not api_key :
return jsonify ( { " error " : " API key is required " } ) , 401
api_key_id = validate_api_key ( api_key )
if not api_key_id :
return jsonify ( { " error " : " Invalid API key " } ) , 401
2024-10-06 23:19:57 -04:00
data = request . get_json ( )
if not data or ' message ' not in data :
return jsonify ( { " error " : " Invalid request body " } ) , 400
user_input = data [ ' message ' ]
2024-10-06 21:23:55 -04:00
query_id = str ( uuid . uuid4 ( ) )
try :
db = get_db ( )
cursor = db . cursor ( )
cursor . execute (
2024-10-06 23:19:57 -04:00
" INSERT INTO Queries (id, ip, query, api_key_id, status) VALUES (?, ?, ?, ?, ?) " ,
( query_id , request . remote_addr , user_input , api_key_id , QueryStatus . QUEUED . value )
2024-10-06 21:23:55 -04:00
)
db . commit ( )
2024-10-06 23:19:57 -04:00
logger . info ( f " Added new query to database: { query_id } " )
2024-10-06 21:23:55 -04:00
return jsonify ( { " query_id " : query_id } )
except Exception as e :
2024-10-06 23:19:57 -04:00
logger . exception ( f " Error during API query processing: { str ( e ) } " )
2024-10-06 21:23:55 -04:00
return jsonify ( { " error " : str ( e ) } ) , 500
@app.get (
2024-10-06 23:19:57 -04:00
" /api/v1/query_status/<string:query_id> "
2024-10-06 21:23:55 -04:00
)
def get_query_status ( query_id : str ) :
"""
Get the status of a submitted query .
This endpoint requires authentication via an API key .
Sample cURL :
curl - X GET http : / / localhost : 5001 / api / v1 / query_status / query - id - here \
- H " X-API-Key: your-api-key "
"""
2024-10-06 21:50:57 -04:00
api_key = request . headers . get ( ' X-API-Key ' )
if not api_key :
return jsonify ( { " error " : " API key is required " } ) , 401
api_key_id = validate_api_key ( api_key )
if not api_key_id :
return jsonify ( { " error " : " Invalid API key " } ) , 401
2024-10-06 21:23:55 -04:00
try :
db = get_db ( )
cursor = db . cursor ( )
2024-10-06 23:19:57 -04:00
cursor . execute ( " SELECT status, conversation_history FROM Queries WHERE id = ? " , ( query_id , ) )
2024-10-06 21:23:55 -04:00
result = cursor . fetchone ( )
if result is None :
return jsonify ( { " error " : " Query not found " } ) , 404
2024-10-06 23:19:57 -04:00
status , conversation_history = result
2024-10-06 21:23:55 -04:00
2024-10-06 23:19:57 -04:00
response = { " status " : status }
if status == QueryStatus . DONE . value :
response [ " conversation_history " ] = json . loads ( conversation_history )
return jsonify ( response )
2024-10-06 21:23:55 -04:00
except Exception as e :
logger . exception ( " Error retrieving query status " , error = str ( e ) )
return jsonify ( { " error " : str ( e ) } ) , 500
def answer_question_tools_api (
user_input : str , conversation_history : List [ dict ] , max_retries : int = 100
) :
global tool_manager
if not conversation_history :
conversation_history = [
{ " role " : " system " , " content " : ANSWER_QUESTION_PROMPT } ,
]
logger . info (
" Starting API chat " ,
user_input = user_input ,
conversation_history = conversation_history ,
)
conversation_history . append ( { " role " : " user " , " content " : user_input } )
last_thought_content = None
for _ in range ( max_retries ) :
response = ollama . chat (
model = PRIMARY_MODEL ,
messages = conversation_history ,
tools = tool_manager . get_tools_for_ollama_dict ( ) ,
stream = False ,
)
2024-10-06 23:51:01 -04:00
logger . info ( f " API Response: { response } " )
2024-10-06 21:23:55 -04:00
assistant_message = response [ " message " ]
conversation_history . append ( assistant_message )
if " tool_calls " in assistant_message :
for tool_call in assistant_message [ " tool_calls " ] :
tool_name = tool_call [ " function " ] [ " name " ]
tool_args = tool_call [ " function " ] [ " arguments " ]
2024-10-06 23:54:25 -04:00
if tool_name is not None and tool_args is not None :
tool_response = tool_manager . get_tool ( tool_name ) . execute ( tool_args )
conversation_history . append ( { " role " : " tool " , " content " : tool_response } )
logger . info ( f " API Tool response: { tool_response } " )
else :
logger . warning ( f " Skipping tool call due to missing tool name or arguments: { tool_call } " )
2024-10-06 21:23:55 -04:00
else :
if " <reply> " in assistant_message [ " content " ] . lower ( ) :
reply_content = re . search (
r " <reply>(.*?)</reply> " , assistant_message [ " content " ] , re . DOTALL
)
if reply_content :
reply_answer = reply_content . group ( 1 ) . strip ( )
conversation_history . append (
{ " role " : " assistant " , " content " : reply_answer }
)
return conversation_history
else :
current_thought_content = assistant_message [ " content " ] . strip ( )
if last_thought_content and not current_thought_content :
conversation_history . append (
{ " role " : " assistant " , " content " : last_thought_content }
)
return conversation_history
last_thought_content = current_thought_content
continue
conversation_history . append (
{
" role " : " assistant " ,
" content " : f " Max iterations reached. Last response: { assistant_message [ ' content ' ] } " ,
}
)
return conversation_history
def process_queries ( ) :
2024-10-06 23:19:57 -04:00
logger . info ( " Query processing thread started " )
2024-10-06 21:23:55 -04:00
with app . app_context ( ) :
while True :
try :
db = get_db ( )
cursor = db . cursor ( )
2024-10-06 23:19:57 -04:00
# First, check if there are any PROCESSING queries
cursor . execute (
" SELECT id FROM Queries WHERE status = ? LIMIT 1 " ,
( QueryStatus . PROCESSING . value , )
)
processing_query = cursor . fetchone ( )
if processing_query :
logger . info ( f " Found processing query: { processing_query [ 0 ] } . Waiting... " )
db . commit ( )
time . sleep ( 10 )
continue
# If no PROCESSING queries, get the oldest QUEUED query
2024-10-06 21:23:55 -04:00
cursor . execute (
2024-10-06 23:19:57 -04:00
" SELECT id, query FROM Queries WHERE status = ? ORDER BY timestamp ASC LIMIT 1 " ,
( QueryStatus . QUEUED . value , )
2024-10-06 21:23:55 -04:00
)
result = cursor . fetchone ( )
if result :
query_id , user_input = result
2024-10-06 23:19:57 -04:00
logger . info ( f " Processing query: { query_id } " )
# Update status to PROCESSING
cursor . execute (
" UPDATE Queries SET status = ? WHERE id = ? " ,
( QueryStatus . PROCESSING . value , query_id )
)
db . commit ( )
logger . info ( f " Updated query { query_id } status to PROCESSING " )
2024-10-06 23:51:01 -04:00
# Fetch conversation history if it exists
cursor . execute ( " SELECT conversation_history FROM Queries WHERE id = ? " , ( query_id , ) )
conversation_history_result = cursor . fetchone ( )
if conversation_history_result and conversation_history_result [ 0 ] :
conversation_history = json . loads ( conversation_history_result [ 0 ] )
else :
conversation_history = [ { " role " : " system " , " content " : ANSWER_QUESTION_PROMPT } ]
2024-10-06 23:19:57 -04:00
logger . info ( f " Starting answer_question_tools_api for query { query_id } " )
2024-10-06 21:43:49 -04:00
final_conversation_history = answer_question_tools_api ( user_input , conversation_history )
2024-10-06 23:19:57 -04:00
logger . info ( f " Finished answer_question_tools_api for query { query_id } " )
2024-10-06 21:23:55 -04:00
2024-10-06 23:19:57 -04:00
# Update with final result and set status to DONE
db . execute ( " BEGIN TRANSACTION " )
2024-10-06 21:23:55 -04:00
cursor . execute (
2024-10-06 23:19:57 -04:00
" UPDATE Queries SET conversation_history = ?, status = ? WHERE id = ? " ,
( json . dumps ( final_conversation_history ) , QueryStatus . DONE . value , query_id )
2024-10-06 21:23:55 -04:00
)
db . commit ( )
2024-10-06 23:19:57 -04:00
logger . info ( f " Updated query { query_id } status to DONE " )
2024-10-06 21:23:55 -04:00
else :
2024-10-06 23:19:57 -04:00
logger . info ( " No queued queries found. Waiting... " )
2024-10-06 23:51:01 -04:00
time . sleep ( 5 ) # Wait for 5 seconds before checking again if no queries are found
2024-10-06 21:23:55 -04:00
except Exception as e :
2024-10-06 23:19:57 -04:00
logger . exception ( f " Error processing query: { str ( e ) } " )
2024-10-06 21:23:55 -04:00
time . sleep ( 1 ) # Wait for 1 second before retrying in case of an error
# Admin endpoint for generating API keys
class GenerateKeyRequest ( BaseModel ) :
username : str
class GenerateKeyResponse ( BaseModel ) :
username : str
api_key : str
@app.post (
2024-10-06 23:19:57 -04:00
" /admin/generate_key "
2024-10-06 21:23:55 -04:00
)
2024-10-06 23:19:57 -04:00
def generate_api_key ( ) :
2024-10-06 21:23:55 -04:00
"""
Generate a new API key for a user .
This endpoint requires authentication via an admin key .
Sample cURL :
curl - X POST http : / / localhost : 5001 / admin / generate_key \
- H " Content-Type: application/json " \
- H " X-Admin-Key: your-admin-key " \
- d ' { " username " : " new_user " } '
"""
admin_key = request . headers . get ( " X-Admin-Key " )
if not admin_key or admin_key != ADMIN_KEY :
return jsonify ( { " error " : " Invalid admin key " } ) , 401
2024-10-06 23:19:57 -04:00
data = request . get_json ( )
if not data or ' username ' not in data :
return jsonify ( { " error " : " Invalid request body " } ) , 400
username = data [ ' username ' ]
2024-10-06 21:23:55 -04:00
api_key = secrets . token_urlsafe ( 32 )
try :
db = get_db ( )
cursor = db . cursor ( )
cursor . execute (
" INSERT INTO Keys (username, api_key) VALUES (?, ?) " , ( username , api_key )
)
db . commit ( )
return jsonify ( { " username " : username , " api_key " : api_key } )
except sqlite3 . IntegrityError :
return jsonify ( { " error " : " Username already exists " } ) , 400
except Exception as e :
logger . exception ( " Error generating API key " , error = str ( e ) )
return jsonify ( { " error " : str ( e ) } ) , 500
2024-10-06 23:19:57 -04:00
def start_processing_thread ( ) :
global processing_thread , processing_thread_started
if not processing_thread_started :
processing_thread = threading . Thread ( target = process_queries , daemon = True )
processing_thread . start ( )
processing_thread_started = True
logger . info ( " Query processing thread started " )
2024-10-06 23:51:01 -04:00
def allowed_file ( filename ) :
return ' . ' in filename and filename . rsplit ( ' . ' , 1 ) [ 1 ] . lower ( ) in ALLOWED_EXTENSIONS
@app.post ( " /api/v1/query_with_image " )
def api_query_with_image ( ) :
"""
Submit a new query to the LLM Chat Server with an optional image .
This endpoint requires authentication via an API key .
Sample cURL :
curl - X POST http : / / localhost : 5001 / api / v1 / query_with_image \
- H " X-API-Key: your-api-key " \
- F " message=What ' s in this image? " \
- F " image=@path/to/your/image.jpg "
"""
if not ENABLE_API_ENDPOINTS :
return jsonify ( { " error " : " API endpoints are disabled " } ) , 404
api_key = request . headers . get ( ' X-API-Key ' )
if not api_key :
return jsonify ( { " error " : " API key is required " } ) , 401
api_key_id = validate_api_key ( api_key )
if not api_key_id :
return jsonify ( { " error " : " Invalid API key " } ) , 401
if ' message ' not in request . form :
return jsonify ( { " error " : " Message is required " } ) , 400
user_input = request . form [ ' message ' ]
query_id = str ( uuid . uuid4 ( ) )
image_base64 = None
if ' image ' in request . files :
file = request . files [ ' image ' ]
if file and allowed_file ( file . filename ) :
if file . content_length > MAX_IMAGE_SIZE :
return jsonify ( { " error " : " Image size exceeds 1MB limit " } ) , 400
# Read and encode the image
image_data = file . read ( )
image_base64 = base64 . b64encode ( image_data ) . decode ( ' utf-8 ' )
try :
db = get_db ( )
cursor = db . cursor ( )
cursor . execute (
" INSERT INTO Queries (id, ip, query, api_key_id, status) VALUES (?, ?, ?, ?, ?) " ,
( query_id , request . remote_addr , user_input , api_key_id , QueryStatus . QUEUED . value )
)
db . commit ( )
logger . info ( f " Added new query with image to database: { query_id } " )
# If there's an image, add it to the conversation history
if image_base64 :
conversation_history = [
{ " role " : " system " , " content " : ANSWER_QUESTION_PROMPT } ,
{ " role " : " user " , " content " : f " [An image was uploaded with this message] { user_input } " } ,
{ " role " : " system " , " content " : f " An image was uploaded. You can analyze it using the analyze_image tool with the following base64 string: { image_base64 } " }
]
cursor . execute (
" UPDATE Queries SET conversation_history = ? WHERE id = ? " ,
( json . dumps ( conversation_history ) , query_id )
)
db . commit ( )
return jsonify ( { " query_id " : query_id } )
except Exception as e :
logger . exception ( f " Error during API query processing with image: { str ( e ) } " )
return jsonify ( { " error " : str ( e ) } ) , 500
2024-10-06 23:19:57 -04:00
# Replace the if __main__ block with this:
2024-09-26 13:38:25 -04:00
if __name__ == " __main__ " :
logger . info ( " Starting LLM Chat Server " )
2024-10-06 21:23:55 -04:00
init_db ( ) # Initialize the database
if ENABLE_FRONTEND or ENABLE_CHAT_ENDPOINTS :
threading . Thread ( target = send_system_resources , daemon = True ) . start ( )
2024-10-06 23:19:57 -04:00
logger . info ( " System resources thread started " )
2024-10-06 21:23:55 -04:00
if ENABLE_API_ENDPOINTS :
2024-10-06 23:19:57 -04:00
start_processing_thread ( )
2024-10-06 21:23:55 -04:00
2024-10-06 23:19:57 -04:00
logger . info ( " Starting Flask application " )
socketio . run ( app , debug = True , host = " 0.0.0.0 " , port = 5001 )
else :
# This will run when the module is imported, e.g., by the reloader
if ENABLE_API_ENDPOINTS :
start_processing_thread ( )