183 lines
5.4 KiB
Python
183 lines
5.4 KiB
Python
import subprocess
|
|
import tempfile
|
|
import time
|
|
|
|
import duckduckgo_search
|
|
import requests
|
|
from markdownify import markdownify as md
|
|
from readability.readability import Document
|
|
|
|
|
|
class Tool:
|
|
def __init__(self, name: str, description: str, arguments: dict, returns: str):
|
|
self.name = name
|
|
self.description = description
|
|
self.arguments = arguments
|
|
self.returns = returns
|
|
|
|
def execute(self, arguments: dict) -> str:
|
|
pass
|
|
|
|
|
|
class ToolManager:
|
|
def __init__(self):
|
|
self.tools = []
|
|
|
|
def add_tool(self, tool: Tool):
|
|
self.tools.append(tool)
|
|
|
|
def get_tool(self, name: str) -> Tool:
|
|
for tool in self.tools:
|
|
if tool.name == name:
|
|
return tool
|
|
return None
|
|
|
|
def get_tools_and_descriptions_for_prompt(self):
|
|
return "\n".join([f"{tool.name}: {tool.description}" for tool in self.tools])
|
|
|
|
def get_tools_for_ollama_dict(self):
|
|
return [
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": tool.name,
|
|
"description": tool.description,
|
|
"parameters": tool.arguments,
|
|
},
|
|
}
|
|
for tool in self.tools
|
|
]
|
|
|
|
|
|
class DefaultToolManager(ToolManager):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.add_tool(SearchTool())
|
|
self.add_tool(GetReadablePageContentsTool())
|
|
self.add_tool(CalculatorTool())
|
|
self.add_tool(PythonCodeTool())
|
|
|
|
|
|
class SearchTool(Tool):
|
|
def __init__(self):
|
|
super().__init__(
|
|
"search_web",
|
|
"Search the internet for information",
|
|
{
|
|
"type": "object",
|
|
"properties": {
|
|
"query": {"type": "string", "description": "The search query"}
|
|
},
|
|
},
|
|
"results:list[string]",
|
|
)
|
|
|
|
def execute(self, arg: dict) -> str:
|
|
res = duckduckgo_search.DDGS().text(arg["query"], max_results=5)
|
|
return "\n\n".join([f"{r['title']}\n{r['body']}\n{r['href']}" for r in res])
|
|
|
|
|
|
def get_readable_page_contents(url: str) -> str:
|
|
try:
|
|
response = requests.get(url)
|
|
response.raise_for_status()
|
|
doc = Document(response.content)
|
|
content = doc.summary()
|
|
return md(content)
|
|
except Exception as e:
|
|
return f"Error fetching readable content: {str(e)}"
|
|
|
|
|
|
class GetReadablePageContentsTool(Tool):
|
|
def __init__(self):
|
|
super().__init__(
|
|
"get_readable_page_contents",
|
|
"Get the contents of a web page in a readable format",
|
|
{
|
|
"type": "object",
|
|
"properties": {
|
|
"url": {"type": "string", "description": "The url of the web page"}
|
|
},
|
|
},
|
|
"contents:string",
|
|
)
|
|
|
|
def execute(self, arg: dict) -> str:
|
|
return get_readable_page_contents(arg["url"])
|
|
|
|
|
|
class CalculatorTool(Tool):
|
|
def __init__(self):
|
|
super().__init__(
|
|
"calculator",
|
|
"Perform a calculation using python's eval function",
|
|
{
|
|
"type": "object",
|
|
"properties": {
|
|
"expression": {
|
|
"type": "string",
|
|
"description": "The mathematical expression to evaluate, should be a python mathematical expression",
|
|
}
|
|
},
|
|
},
|
|
"result:string",
|
|
)
|
|
|
|
def execute(self, arg: dict) -> str:
|
|
try:
|
|
return str(eval(arg["expression"]))
|
|
except Exception as e:
|
|
return f"Error executing code: {str(e)}"
|
|
|
|
|
|
class PythonCodeTool(Tool):
|
|
def __init__(self):
|
|
super().__init__(
|
|
"python_code",
|
|
"Execute python code using a temporary file and a subprocess. You must print results to stdout.",
|
|
{
|
|
"type": "object",
|
|
"properties": {
|
|
"code": {
|
|
"type": "string",
|
|
"description": "The python code to execute, can be multiline",
|
|
}
|
|
},
|
|
},
|
|
"result:string",
|
|
)
|
|
|
|
def execute(self, arg: dict) -> str:
|
|
try:
|
|
with tempfile.NamedTemporaryFile(
|
|
suffix=".py", mode="w", delete=False
|
|
) as temp_file:
|
|
temp_file.write(arg["code"])
|
|
temp_file.flush()
|
|
|
|
start_time = time.time()
|
|
process = subprocess.Popen(
|
|
["python", temp_file.name],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
stdout, stderr = process.communicate(timeout=10) # 10 second timeout
|
|
end_time = time.time()
|
|
execution_time = end_time - start_time
|
|
|
|
result = {
|
|
"stdout": stdout,
|
|
"stderr": stderr,
|
|
"return_value": process.returncode,
|
|
"execution_time": execution_time,
|
|
}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
process.kill()
|
|
return "Error: Code execution timed out after 10 seconds"
|
|
except Exception as e:
|
|
return f"Error executing code: {str(e)}"
|
|
|
|
return "\n".join([f"{k}:\n{v}" for k, v in result.items()])
|