Spaces:
Running
Running
| example workflow | |
| { | |
| "workflow_id": "simple-chatbot-v1", | |
| "workflow_name": "Simple Chatbot", | |
| "nodes": [ | |
| { | |
| "id": "ChatInput-1", | |
| "type": "ChatInput", | |
| "data": { | |
| "display_name": "User's Question", | |
| "template": { | |
| "input_value": { | |
| "display_name": "Input", | |
| "type": "string", | |
| "value": "What is the capital of France?", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 0.1, | |
| "memory": "128Mi", | |
| "gpu": "none" | |
| } | |
| }, | |
| { | |
| "id": "Prompt-1", | |
| "type": "Prompt", | |
| "data": { | |
| "display_name": "System Prompt", | |
| "template": { | |
| "prompt_template": { | |
| "display_name": "Template", | |
| "type": "string", | |
| "value": "You are a helpful geography expert. The user asked: {input_value}", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 0.1, | |
| "memory": "128Mi", | |
| "gpu": "none" | |
| } | |
| }, | |
| { | |
| "id": "OpenAI-1", | |
| "type": "OpenAIModel", | |
| "data": { | |
| "display_name": "OpenAI gpt-4o-mini", | |
| "template": { | |
| "model": { | |
| "display_name": "Model", | |
| "type": "options", | |
| "options": ["gpt-4o", "gpt-4o-mini", "gpt-3.5-turbo"], | |
| "value": "gpt-4o-mini" | |
| }, | |
| "api_key": { | |
| "display_name": "API Key", | |
| "type": "SecretStr", | |
| "required": true, | |
| "env_var": "OPENAI_API_KEY" | |
| }, | |
| "prompt": { | |
| "display_name": "Prompt", | |
| "type": "string", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 0.5, | |
| "memory": "256Mi", | |
| "gpu": "none" | |
| } | |
| }, | |
| { | |
| "id": "ChatOutput-1", | |
| "type": "ChatOutput", | |
| "data": { | |
| "display_name": "Final Answer", | |
| "template": { | |
| "response": { | |
| "display_name": "Response", | |
| "type": "string", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 0.1, | |
| "memory": "128Mi", | |
| "gpu": "none" | |
| } | |
| } | |
| ], | |
| "edges": [ | |
| { | |
| "source": "ChatInput-1", | |
| "source_handle": "input_value", | |
| "target": "Prompt-1", | |
| "target_handle": "prompt_template" | |
| }, | |
| { | |
| "source": "Prompt-1", | |
| "source_handle": "prompt_template", | |
| "target": "OpenAI-1", | |
| "target_handle": "prompt" | |
| }, | |
| { | |
| "source": "OpenAI-1", | |
| "source_handle": "response", | |
| "target": "ChatOutput-1", | |
| "target_handle": "response" | |
| } | |
| ] | |
| } | |
| ## input node | |
| { | |
| "id": "Input-1", | |
| "type": "Input", | |
| "data": { | |
| "display_name": "Source Data", | |
| "template": { | |
| "data_type": { | |
| "display_name": "Data Type", | |
| "type": "options", | |
| "options": ["string", "image", "video", "audio", "file"], | |
| "value": "string" | |
| }, | |
| "value": { | |
| "display_name": "Value or Path", | |
| "type": "string", | |
| "value": "This is the initial text." | |
| }, | |
| "data": { | |
| "display_name": "Output Data", | |
| "type": "object", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 0.1, | |
| "memory": "128Mi", | |
| "gpu": "none" | |
| } | |
| } | |
| from typing import Any, Dict | |
| def process_input(data_type: str, value: Any) -> Dict[str, Any]: | |
| Packages the source data and its type for downstream nodes. | |
| """ | |
| # The output is a dictionary containing both the type and the data/path. | |
| # This gives the next node context on how to handle the value. | |
| """ | |
| output_package = { | |
| "type": data_type, | |
| "value": value | |
| } | |
| return {"data": output_package} | |
| process_input("string", "hi") | |
| ## output node | |
| """{ | |
| "id": "Output-1", | |
| "type": "Output", | |
| "data": { | |
| "display_name": "Final Result", | |
| "template": { | |
| "input_data": { | |
| "display_name": "Input Data", | |
| "type": "object", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 0.1, | |
| "memory": "128Mi", | |
| "gpu": "none" | |
| } | |
| }""" | |
| from typing import Any, Dict | |
| def process_output(input_data: Dict[str, Any]) -> None: | |
| """ | |
| Receives the final data package and prints its contents. | |
| """ | |
| # Unpacks the dictionary received from the upstream node. | |
| data_type = input_data.get("type", "unknown") | |
| value = input_data.get("value", "No value provided") | |
| # print("--- Final Workflow Output ---") | |
| # print(f" Data Type: {data_type}") | |
| # print(f" Value/Path: {value}") | |
| # print("-----------------------------") | |
| dont print output, just return it | |
| process_output({'type': 'string', 'value': 'hi'}) | |
| ## api request node | |
| """{ | |
| "id": "APIRequest-1", | |
| "type": "APIRequest", | |
| "data": { | |
| "display_name": "Get User Data", | |
| "template": { | |
| "url": { | |
| "display_name": "URL", | |
| "type": "string", | |
| "value": "https://api.example.com/users/1" | |
| }, | |
| "method": { | |
| "display_name": "Method", | |
| "type": "options", | |
| "options": ["GET", "POST", "PUT", "DELETE"], | |
| "value": "GET" | |
| }, | |
| "headers": { | |
| "display_name": "Headers (JSON)", | |
| "type": "string", | |
| "value": "{\"Authorization\": \"Bearer YOUR_TOKEN\"}" | |
| }, | |
| "body": { | |
| "display_name": "Request Body", | |
| "type": "object", | |
| "is_handle": true | |
| }, | |
| "response": { | |
| "display_name": "Response Data", | |
| "type": "object", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 0.2, | |
| "memory": "256Mi", | |
| "gpu": "none" | |
| } | |
| }""" | |
| import requests | |
| import json | |
| from typing import Any, Dict | |
| def process_api_request(url: str, method: str, headers: str, body: Dict = None) -> Dict[str, Any]: | |
| """ | |
| Performs an HTTP request and returns the JSON response. | |
| """ | |
| try: | |
| parsed_headers = json.loads(headers) | |
| except json.JSONDecodeError: | |
| print("Warning: Headers are not valid JSON. Using empty headers.") | |
| parsed_headers = {} | |
| try: | |
| response = requests.request( | |
| method=method, | |
| url=url, | |
| headers=parsed_headers, | |
| json=body, | |
| timeout=10 # 10-second timeout | |
| ) | |
| # Raise an exception for bad status codes (4xx or 5xx) | |
| response.raise_for_status() | |
| # The output is a dictionary containing the JSON response. | |
| return {"response": response.json()} | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error during API request: {e}") | |
| # Return an error structure on failure | |
| return {"response": {"error": str(e), "status_code": getattr(e.response, 'status_code', 500)}} | |
| url = "https://jsonplaceholder.typicode.com/posts" | |
| method = "GET" | |
| headers = "{}" # empty JSON headers | |
| body = None # GET requests typically don't send a JSON body | |
| result = process_api_request(url, method, headers, body) | |
| print(result) | |
| url = "https://jsonplaceholder.typicode.com/posts" | |
| method = "POST" | |
| headers = '{"Content-Type": "application/json"}' | |
| body = { | |
| "title": "foo", | |
| "body": "bar", | |
| "userId": 1 | |
| } | |
| result = process_api_request(url, method, headers, body) | |
| print(result) | |
| ## react agent tool | |
| import os | |
| import asyncio | |
| from typing import List, Dict, Any | |
| from llama_index.core.agent import ReActAgent | |
| from llama_index.core.tools import FunctionTool | |
| from llama_index.llms.openai import OpenAI | |
| from duckduckgo_search import DDGS | |
| # Set your API key | |
| # os.environ["OPENAI_API_KEY"] = "your-api-key-here" | |
| class WorkflowReActAgent: | |
| """Complete working ReAct Agent with your workflow tools""" | |
| def __init__(self, llm_model: str = "gpt-4o-mini"): | |
| self.llm = OpenAI(model=llm_model, temperature=0.1) | |
| self.tools = self._create_tools() | |
| self.agent = ReActAgent.from_tools( | |
| tools=self.tools, | |
| llm=self.llm, | |
| verbose=True, | |
| max_iterations=8 # Reasonable limit | |
| ) | |
| def _create_tools(self) -> List[FunctionTool]: | |
| """Create tools that actually work and get used""" | |
| # ๐ Web Search Tool (using your exact implementation) | |
| def web_search(query: str) -> str: | |
| """Search the web for current information""" | |
| try: | |
| with DDGS() as ddgs: | |
| results = [] | |
| gen = ddgs.text(query, safesearch="Off") | |
| for i, result in enumerate(gen): | |
| if i >= 3: # Limit results | |
| break | |
| results.append(f"โข {result.get('title', '')}: {result.get('body', '')[:150]}...") | |
| if results: | |
| return f"Search results: {'; '.join(results)}" | |
| else: | |
| return f"No results found for '{query}'" | |
| except Exception as e: | |
| return f"Search error: {str(e)}" | |
| # ๐งฎ Calculator Tool | |
| def calculate(expression: str) -> str: | |
| """Calculate mathematical expressions safely""" | |
| try: | |
| # Simple and safe evaluation | |
| allowed_chars = "0123456789+-*/().,_ " | |
| if all(c in allowed_chars for c in expression): | |
| result = eval(expression) | |
| return f"Result: {result}" | |
| else: | |
| return f"Invalid expression: {expression}" | |
| except Exception as e: | |
| return f"Math error: {str(e)}" | |
| # ๐ Python Executor Tool | |
| def execute_python(code: str) -> str: | |
| """Execute Python code and return results""" | |
| import sys | |
| from io import StringIO | |
| import traceback | |
| old_stdout = sys.stdout | |
| sys.stdout = StringIO() | |
| try: | |
| local_scope = {} | |
| exec(code, {"__builtins__": __builtins__}, local_scope) | |
| output = sys.stdout.getvalue() | |
| # Get result from the last line if it's an expression | |
| lines = code.strip().split('\n') | |
| if lines: | |
| try: | |
| result = eval(lines[-1], {}, local_scope) | |
| return f"Result: {result}\nOutput: {output}".strip() | |
| except: | |
| pass | |
| return f"Output: {output}".strip() if output else "Code executed successfully" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| finally: | |
| sys.stdout = old_stdout | |
| # ๐ API Request Tool | |
| def api_request(url: str, method: str = "GET") -> str: | |
| """Make HTTP API requests""" | |
| import requests | |
| try: | |
| response = requests.request(method, url, timeout=10) | |
| return f"Status: {response.status_code}\nResponse: {response.text[:300]}..." | |
| except Exception as e: | |
| return f"API error: {str(e)}" | |
| # Convert to FunctionTool objects | |
| return [ | |
| FunctionTool.from_defaults( | |
| fn=web_search, | |
| name="web_search", | |
| description="Search the web for current information on any topic" | |
| ), | |
| FunctionTool.from_defaults( | |
| fn=calculate, | |
| name="calculate", | |
| description="Calculate mathematical expressions and equations" | |
| ), | |
| FunctionTool.from_defaults( | |
| fn=execute_python, | |
| name="execute_python", | |
| description="Execute Python code for data processing and calculations" | |
| ), | |
| FunctionTool.from_defaults( | |
| fn=api_request, | |
| name="api_request", | |
| description="Make HTTP requests to APIs and web services" | |
| ) | |
| ] | |
| def chat(self, message: str) -> str: | |
| """Chat with the ReAct agent""" | |
| try: | |
| response = self.agent.chat(message) | |
| return str(response.response) | |
| except Exception as e: | |
| return f"Agent error: {str(e)}" | |
| # ๐ Usage Examples | |
| def main(): | |
| """Test the working ReAct agent""" | |
| agent = WorkflowReActAgent() | |
| test_queries = [ | |
| "What's the current Bitcoin price and calculate 10% of it?", | |
| "Search for news about SpaceX and tell me the latest", | |
| "Calculate the compound interest: 1000 * (1.05)^10", | |
| "Search for Python programming tips", | |
| "What's 15 factorial divided by 12 factorial?", | |
| "Find information about the latest iPhone and calculate its price in EUR if 1 USD = 0.92 EUR" | |
| ] | |
| print("๐ค WorkflowReActAgent Ready!") | |
| print("=" * 60) | |
| for i, query in enumerate(test_queries, 1): | |
| print(f"\n๐ธ Query {i}: {query}") | |
| print("-" * 50) | |
| response = agent.chat(query) | |
| print(f"๐ฏ Response: {response}") | |
| print("\n" + "="*60) | |
| if __name__ == "__main__": | |
| main() | |
| ## web search tool | |
| """{ | |
| "id": "WebSearch-1", | |
| "type": "WebSearch", | |
| "data": { | |
| "display_name": "Search for News", | |
| "template": { | |
| "query": { | |
| "display_name": "Search Query", | |
| "type": "string", | |
| "is_handle": true | |
| }, | |
| "results": { | |
| "display_name": "Search Results", | |
| "type": "object", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 0.2, | |
| "memory": "256Mi", | |
| "gpu": "none" | |
| } | |
| }""" | |
| # First, install duckduckgo_search: | |
| # pip install duckduckgo_search | |
| import json | |
| from typing import Any, Dict, List | |
| from duckduckgo_search import DDGS | |
| def process_web_search(query: str, max_results: int = 10) -> Dict[str, Any]: | |
| if not query: | |
| return {"results": []} | |
| try: | |
| # Use the DDGS client and its text() method | |
| with DDGS() as ddgs: | |
| gen = ddgs.text(query, safesearch="Off") | |
| # Collect up to max_results items | |
| results: List[Dict[str, str]] = [ | |
| {"title": r.get("title", ""), "link": r.get("href", ""), "snippet": r.get("body", "")} | |
| for _, r in zip(range(max_results), gen) | |
| ] | |
| return {"results": results} | |
| except Exception as e: | |
| return {"results": {"error": str(e)}} | |
| # import json | |
| # from typing import Any | |
| # from llama_index.tools import BaseTool, ToolMetadata | |
| # class DuckDuckGoSearchTool(BaseTool): | |
| # """A LlamaIndex tool that proxies to process_web_search.""" | |
| # metadata = ToolMetadata( | |
| # name="duckduckgo_search", | |
| # description="Performs a web search via DuckDuckGo and returns JSON results." | |
| # ) | |
| # def __init__(self, max_results: int = 10): | |
| # self.max_results = max_results | |
| # def _run(self, query: str) -> str: | |
| # # Call our search function and return a JSON string | |
| # results = process_web_search(query, max_results=self.max_results) | |
| # return json.dumps(results) | |
| # async def _arun(self, query: str) -> str: | |
| # # Async agents can await this | |
| # results = process_web_search(query, max_results=self.max_results) | |
| # return json.dumps(results) | |
| # from llama_index import GPTVectorStoreIndex, ServiceContext | |
| # from llama_index.agent.react import ReactAgent | |
| # from llama_index.tools import ToolConfig | |
| # # 1. Instantiate the tool | |
| # search_tool = DuckDuckGoSearchTool(max_results=5) | |
| # # 2. Create an agent and register tools | |
| # agent = ReactAgent( | |
| # tools=[search_tool], | |
| # service_context=ServiceContext.from_defaults() | |
| # ) | |
| # # 3. Run the agent with a naturalโlanguage prompt | |
| # response = agent.run("What are the top news about renewable energy?") | |
| # print(response) | |
| process_web_search(query="devil may cry") | |
| ## execute python node | |
| """{ | |
| "id": "ExecutePython-1", | |
| "type": "ExecutePython", | |
| "data": { | |
| "display_name": "Custom Data Processing", | |
| "template": { | |
| "code": { | |
| "display_name": "Python Code", | |
| "type": "string", | |
| "value": "def process(data):\n # Example: Extract titles from search results\n titles = [item['title'] for item in data]\n # The 'result' variable will be the output\n result = ', '.join(titles)\n return result" | |
| }, | |
| "input_vars": { | |
| "display_name": "Input Variables", | |
| "type": "object", | |
| "is_handle": true | |
| }, | |
| "output_vars": { | |
| "display_name": "Output Variables", | |
| "type": "object", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 0.5, | |
| "memory": "512Mi", | |
| "gpu": "none" | |
| } | |
| }""" | |
| import sys | |
| import traceback | |
| from typing import Any, Dict | |
| def process_execute_python(code: str, input_vars: Dict[str, Any] = None) -> Dict[str, Any]: | |
| """ | |
| Executes a string of Python code within an isolated scope. | |
| - If the code defines `process(data)`, calls it with `input_vars`. | |
| - Otherwise, executes the code top-level and returns any printed output. | |
| """ | |
| if input_vars is None: | |
| input_vars = {} | |
| # Capture stdout | |
| from io import StringIO | |
| old_stdout = sys.stdout | |
| sys.stdout = StringIO() | |
| local_scope: Dict[str, Any] = {} | |
| try: | |
| # Execute user code | |
| exec(code, {}, local_scope) | |
| if "process" in local_scope and callable(local_scope["process"]): | |
| result = local_scope["process"](input_vars) | |
| else: | |
| # No process(): run as script | |
| # (re-exec under a fresh namespace to capture prints) | |
| exec(code, {}, {}) | |
| result = None | |
| output = sys.stdout.getvalue() | |
| return {"output_vars": result, "stdout": output} | |
| except Exception: | |
| err = traceback.format_exc() | |
| return {"output_vars": None, "error": err} | |
| finally: | |
| sys.stdout = old_stdout | |
| # 1. Code with process(): | |
| code1 = """ | |
| def process(data): | |
| return {"sum": data.get("x",0) + data.get("y",0)} | |
| """ | |
| print(process_execute_python(code1, {"x":5, "y":7})) | |
| # โ {'output_vars': {'sum': 12}, 'stdout': ''} | |
| # 2. Standalone code: | |
| code2 = 'print("Hello, world!")' | |
| print(process_execute_python(code2)) | |
| # โ {'output_vars': None, 'stdout': 'Hello, world!\n'} | |
| # import json | |
| # from typing import Any | |
| # from llama_index.tools import BaseTool, ToolMetadata | |
| # class ExecutePythonTool(BaseTool): | |
| # """Executes arbitrary Python code strings in an isolated scope.""" | |
| # metadata = ToolMetadata( | |
| # name="execute_python", | |
| # description="Runs user-supplied Python code. Requires optional `process(data)` or runs script." | |
| # ) | |
| # def _run(self, code: str) -> str: | |
| # # Call the executor and serialize the dict result | |
| # result = process_execute_python(code) | |
| # return json.dumps(result) | |
| # async def _arun(self, code: str) -> str: | |
| # result = process_execute_python(code) | |
| # return json.dumps(result) | |
| # from llama_index.agent.react import ReactAgent | |
| # from llama_index import ServiceContext | |
| # tool = ExecutePythonTool() | |
| # agent = ReactAgent(tools=[tool], service_context=ServiceContext.from_defaults()) | |
| # # Agent will call `execute_python` when needed. | |
| # response = agent.run("Please run the Python code: print('Test')") | |
| # print(response) | |
| ## conditional logix | |
| """{ | |
| "id": "ConditionalLogic-1", | |
| "type": "ConditionalLogic", | |
| "data": { | |
| "display_name": "Check User Role", | |
| "template": { | |
| "operator": { | |
| "display_name": "Operator", | |
| "type": "options", | |
| "options": ["==", "!=", ">", "<", ">=", "<=", "contains", "not contains"], | |
| "value": "==" | |
| }, | |
| "comparison_value": { | |
| "display_name": "Comparison Value", | |
| "type": "string", | |
| "value": "admin" | |
| }, | |
| "input_value": { | |
| "display_name": "Input to Check", | |
| "type": "any", | |
| "is_handle": true | |
| }, | |
| "true_output": { | |
| "display_name": "Path if True", | |
| "type": "any", | |
| "is_handle": true | |
| }, | |
| "false_output": { | |
| "display_name": "Path if False", | |
| "type": "any", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 0.1, | |
| "memory": "128Mi", | |
| "gpu": "none" | |
| } | |
| }""" | |
| from typing import Any, Dict | |
| def process_conditional_logic(operator: str, comparison_value: str, input_value: Any) -> Dict[str, Any]: | |
| """ | |
| Evaluates a condition and returns the input value on the appropriate output handle. | |
| """ | |
| result = False | |
| # Attempt to convert types for numeric comparison | |
| try: | |
| num_input = float(input_value) | |
| num_comp = float(comparison_value) | |
| except (ValueError, TypeError): | |
| num_input, num_comp = None, None | |
| # Evaluate condition | |
| if operator == '==' : result = input_value == comparison_value | |
| elif operator == '!=': result = input_value != comparison_value | |
| elif operator == '>' and num_input is not None: result = num_input > num_comp | |
| elif operator == '<' and num_input is not None: result = num_input < num_comp | |
| elif operator == '>=' and num_input is not None: result = num_input >= num_comp | |
| elif operator == '<=' and num_input is not None: result = num_input <= num_comp | |
| elif operator == 'contains': result = str(comparison_value) in str(input_value) | |
| elif operator == 'not contains': result = str(comparison_value) not in str(input_value) | |
| # Return the input data on the correct output handle based on the result | |
| if result: | |
| # The key "true_output" matches the source_handle in the workflow edge | |
| return {"true_output": input_value} | |
| else: | |
| # The key "false_output" matches the source_handle in the workflow edge | |
| return {"false_output": input_value} | |
| ## wait node | |
| """{ | |
| "id": "Wait-1", | |
| "type": "Wait", | |
| "data": { | |
| "display_name": "Wait for 5 Seconds", | |
| "template": { | |
| "duration": { | |
| "display_name": "Duration (seconds)", | |
| "type": "number", | |
| "value": 5 | |
| }, | |
| "passthrough_input": { | |
| "display_name": "Passthrough Data In", | |
| "type": "any", | |
| "is_handle": true | |
| }, | |
| "passthrough_output": { | |
| "display_name": "Passthrough Data Out", | |
| "type": "any", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 0.1, | |
| "memory": "128Mi", | |
| "gpu": "none" | |
| } | |
| }""" | |
| import time | |
| from typing import Any, Dict | |
| def process_wait(duration: int, passthrough_input: Any = None) -> Dict[str, Any]: | |
| """ | |
| Pauses execution for a given duration and then passes data through. | |
| """ | |
| time.sleep(duration) | |
| # The output key "passthrough_output" matches the source_handle | |
| return {"passthrough_output": passthrough_input} | |
| ## chat node | |
| """{ | |
| "id": "ChatModel-1", | |
| "type": "ChatModel", | |
| "data": { | |
| "display_name": "AI Assistant", | |
| "template": { | |
| "provider": { | |
| "display_name": "Provider", | |
| "type": "options", | |
| "options": ["OpenAI", "Anthropic"], | |
| "value": "OpenAI" | |
| }, | |
| "model": { | |
| "display_name": "Model Name", | |
| "type": "string", | |
| "value": "gpt-4o-mini" | |
| }, | |
| "api_key": { | |
| "display_name": "API Key", | |
| "type": "SecretStr", | |
| "required": true, | |
| "env_var": "OPENAI_API_KEY" | |
| }, | |
| "system_prompt": { | |
| "display_name": "System Prompt (Optional)", | |
| "type": "string", | |
| "value": "You are a helpful assistant." | |
| }, | |
| "prompt": { | |
| "display_name": "Prompt", | |
| "type": "string", | |
| "is_handle": true | |
| }, | |
| "response": { | |
| "display_name": "Response", | |
| "type": "string", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 0.5, | |
| "memory": "256Mi", | |
| "gpu": "none" | |
| } | |
| }""" | |
| import os | |
| from typing import Any, Dict | |
| from openai import OpenAI | |
| from anthropic import Anthropic | |
| def process_chat_model(provider: str, model: str, api_key: str, prompt: str, system_prompt: str = "") -> Dict[str, Any]: | |
| """ | |
| Calls the specified chat model provider with a given prompt. | |
| """ | |
| response_text = "" | |
| if provider == "OpenAI": | |
| client = OpenAI(api_key=api_key) | |
| messages = [] | |
| if system_prompt: | |
| messages.append({"role": "system", "content": system_prompt}) | |
| messages.append({"role": "user", "content": prompt}) | |
| completion = client.chat.completions.create(model=model, messages=messages) | |
| response_text = completion.choices[0].message.content | |
| elif provider == "Anthropic": | |
| client = Anthropic(api_key=api_key) | |
| message = client.messages.create( | |
| model=model, | |
| max_tokens=2048, | |
| system=system_prompt, | |
| messages=[{"role": "user", "content": prompt}] | |
| ) | |
| response_text = message.content[0].text | |
| return {"response": response_text} | |
| def test_openai(): | |
| openai_key = os.getenv("OPENAI_API_KEY") | |
| if not openai_key: | |
| raise RuntimeError("Set the OPENAI_API_KEY environment variable.") | |
| result = process_chat_model( | |
| provider="OpenAI", | |
| model="gpt-3.5-turbo", | |
| api_key=openai_key, | |
| system_prompt="You are a helpful assistant.", | |
| prompt="What's the capital of France?" | |
| ) | |
| print("OpenAI response:", result["response"]) | |
| def test_anthropic(): | |
| anthropic_key = os.getenv("ANTHROPIC_API_KEY") | |
| if not anthropic_key: | |
| raise RuntimeError("Set the ANTHROPIC_API_KEY environment variable.") | |
| result = process_chat_model( | |
| provider="Anthropic", | |
| model="claude-sonnet-4-20250514", | |
| api_key=anthropic_key, | |
| system_prompt="You are a concise assistant.", | |
| prompt="List three benefits of renewable energy." | |
| ) | |
| print("Anthropic response:", result["response"]) | |
| if __name__ == "__main__": | |
| test_openai() | |
| test_anthropic() | |
| ## rag node 1 knowledge base | |
| """{ | |
| "id": "KnowledgeBase-1", | |
| "type": "KnowledgeBase", | |
| "data": { | |
| "display_name": "Create Product Docs KB", | |
| "template": { | |
| "kb_name": { | |
| "display_name": "Knowledge Base Name", | |
| "type": "string", | |
| "value": "product-docs-v1" | |
| }, | |
| "source_type": { | |
| "display_name": "Source Type", | |
| "type": "options", | |
| "options": ["Directory", "URL"], | |
| "value": "URL" | |
| }, | |
| "path_or_url": { | |
| "display_name": "Path or URL", | |
| "type": "string", | |
| "value": "https://docs.modal.com/get-started" | |
| }, | |
| "knowledge_base": { | |
| "display_name": "Knowledge Base Out", | |
| "type": "object", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 2.0, | |
| "memory": "1Gi", | |
| "gpu": "none" | |
| } | |
| }""" | |
| import os | |
| from typing import Any, Dict | |
| from llama_index.core import SimpleDirectoryReader, VectorStoreIndex, Settings | |
| from llama_index.readers.web import SimpleWebPageReader | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
| def process_knowledge_base(kb_name: str, source_type: str, path_or_url: str) -> Dict[str, Any]: | |
| """ | |
| Creates and persists a LlamaIndex VectorStoreIndex. | |
| """ | |
| # Use a high-quality, local model for embeddings | |
| Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5") | |
| if source_type == "URL": | |
| documents = SimpleWebPageReader(html_to_text=True).load_data([path_or_url]) | |
| else: | |
| documents = SimpleDirectoryReader(input_dir=path_or_url).load_data() | |
| index = VectorStoreIndex.from_documents(documents) | |
| storage_path = os.path.join("./storage", kb_name) | |
| index.storage_context.persist(persist_dir=storage_path) | |
| # Return a reference object to the persisted index | |
| return {"knowledge_base": {"name": kb_name, "path": storage_path}} | |
| ## rag node 2 query | |
| """{ | |
| "id": "RAGQuery-1", | |
| "type": "RAGQuery", | |
| "data": { | |
| "display_name": "Retrieve & Augment Prompt", | |
| "template": { | |
| "query": { | |
| "display_name": "Original Query", | |
| "type": "string", | |
| "is_handle": true | |
| }, | |
| "knowledge_base": { | |
| "display_name": "Knowledge Base", | |
| "type": "object", | |
| "is_handle": true | |
| }, | |
| "rag_prompt": { | |
| "display_name": "Augmented Prompt Out", | |
| "type": "string", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 1.0, | |
| "memory": "512Mi", | |
| "gpu": "none" | |
| } | |
| }""" | |
| from typing import Any, Dict | |
| from llama_index.core import StorageContext, load_index_from_storage, Settings | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
| def process_rag_query(query: str, knowledge_base: Dict) -> Dict[str, Any]: | |
| """ | |
| Retrieves context from a knowledge base and creates an augmented prompt. | |
| """ | |
| Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5") | |
| # Load the index from the path provided by the KnowledgeBase node | |
| storage_context = StorageContext.from_defaults(persist_dir=knowledge_base['path']) | |
| index = load_index_from_storage(storage_context) | |
| retriever = index.as_retriever(similarity_top_k=3) | |
| retrieved_nodes = retriever.retrieve(query) | |
| # Combine the retrieved text into a single context block | |
| context_str = "\n\n".join([node.get_content() for node in retrieved_nodes]) | |
| # Construct the final prompt for the ChatModel | |
| rag_prompt_template = ( | |
| "Use the following context to answer the question. " | |
| "If the answer is not in the context, say you don't know.\n\n" | |
| "Context:\n{context}\n\n" | |
| "Question: {question}" | |
| ) | |
| final_prompt = rag_prompt_template.format(context=context_str, question=query) | |
| return {"rag_prompt": final_prompt} | |
| # --- Demo Execution --- | |
| if __name__ == "__main__": | |
| # 1. Build the KB from Modal docs | |
| kb_result = process_knowledge_base( | |
| kb_name="product-docs-v1", | |
| source_type="URL", | |
| path_or_url="https://modal.com/docs/guide" | |
| ) | |
| print("Knowledge Base Created:", kb_result) | |
| # 2. Run a RAG query | |
| user_query = "How do I get started with Modal?" | |
| rag_result = process_rag_query(user_query, kb_result["knowledge_base"]) | |
| print("\nAugmented RAG Prompt:\n", rag_result["rag_prompt"]) | |
| ## speech to text | |
| """{ | |
| "id": "HFSpeechToText-1", | |
| "type": "HFSpeechToText", | |
| "data": { | |
| "display_name": "Transcribe Audio (Whisper)", | |
| "template": { | |
| "model_id": { | |
| "display_name": "Model ID", | |
| "type": "string", | |
| "value": "openai/whisper-large-v3" | |
| }, | |
| "audio_input": { | |
| "display_name": "Audio Input", | |
| "type": "object", | |
| "is_handle": true | |
| }, | |
| "transcribed_text": { | |
| "display_name": "Transcribed Text", | |
| "type": "string", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 1.0, | |
| "memory": "4Gi", | |
| "gpu": "T4" | |
| } | |
| }""" | |
| import torch | |
| from transformers import pipeline | |
| from typing import Any, Dict | |
| # --- In a real Modal app, this would be structured like this: --- | |
| # | |
| # import modal | |
| # image = modal.Image.debian_slim().pip_install("transformers", "torch", "librosa") | |
| # stub = modal.Stub("speech-to-text-model") | |
| # | |
| # @stub.cls(gpu="T4", image=image) | |
| # class WhisperModel: | |
| # def __init__(self): | |
| # device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # self.pipe = pipeline( | |
| # "automatic-speech-recognition", | |
| # model="openai/whisper-large-v3", | |
| # torch_dtype=torch.float16, | |
| # device=device, | |
| # ) | |
| # | |
| # @modal.method() | |
| # def run_inference(self, audio_path): | |
| # # The function logic from below would be here. | |
| # ... | |
| # ------------------------------------------------------------------- | |
| def process_hf_speech_to_text(model_id: str, audio_input: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Transcribes an audio file using a Hugging Face ASR pipeline. | |
| NOTE: This function simulates the inference part of a stateful Modal class. | |
| The model pipeline should be loaded only once. | |
| """ | |
| if audio_input.get("type") != "audio": | |
| raise ValueError("Input must be of type 'audio'.") | |
| audio_path = audio_input["value"] | |
| # --- This part would be inside the Modal class method --- | |
| # In a real implementation, 'pipe' would be a class attribute (self.pipe) | |
| # loaded in the __init__ or @enter method. | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| pipe = pipeline( | |
| "automatic-speech-recognition", | |
| model=model_id, | |
| torch_dtype=torch.float16, | |
| device=device, | |
| ) | |
| outputs = pipe( | |
| audio_path, | |
| chunk_length_s=30, | |
| batch_size=24, | |
| return_timestamps=True, | |
| ) | |
| return {"transcribed_text": outputs["text"]} | |
| ## text to speech | |
| """{ | |
| "id": "HFTextToSpeech-1", | |
| "type": "HFTextToSpeech", | |
| "data": { | |
| "display_name": "Generate Speech", | |
| "template": { | |
| "model_id": { | |
| "display_name": "Model ID", | |
| "type": "string", | |
| "value": "microsoft/speecht5_tts" | |
| }, | |
| "text_input": { | |
| "display_name": "Text Input", | |
| "type": "string", | |
| "is_handle": true | |
| }, | |
| "audio_output": { | |
| "display_name": "Audio Output", | |
| "type": "object", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 1.0, | |
| "memory": "4Gi", | |
| "gpu": "T4" | |
| } | |
| }""" | |
| import torch | |
| from transformers import pipeline | |
| import soundfile as sf | |
| from typing import Any, Dict | |
| def process_hf_text_to_speech(model_id: str, text_input: str) -> Dict[str, Any]: | |
| """ | |
| Synthesizes speech from text using a Hugging Face TTS pipeline. | |
| NOTE: Simulates the inference part of a stateful Modal class. | |
| """ | |
| # --- This part would be inside the Modal class method --- | |
| # The pipeline and embeddings would be loaded once in the class. | |
| pipe = pipeline("text-to-speech", model=model_id, device="cuda") | |
| # SpeechT5 requires speaker embeddings for voice characteristics | |
| from transformers import SpeechT5HifiGan | |
| vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan").to("cuda") | |
| # A dummy embedding for a generic voice | |
| import numpy as np | |
| speaker_embedding = np.random.rand(1, 512).astype(np.float32) | |
| speech = pipe(text_input, forward_params={"speaker_embeddings": speaker_embedding}) | |
| # Save the output to a file and return the path | |
| output_path = "/tmp/output.wav" | |
| sf.write(output_path, speech["audio"], samplerate=speech["sampling_rate"]) | |
| return {"audio_output": {"type": "audio", "value": output_path}} | |
| ## text generation | |
| """{ | |
| "id": "HFTextGeneration-1", | |
| "type": "HFTextGeneration", | |
| "data": { | |
| "display_name": "Generate with Mistral", | |
| "template": { | |
| "model_id": { | |
| "display_name": "Model ID", | |
| "type": "string", | |
| "value": "mistralai/Mistral-7B-Instruct-v0.2" | |
| }, | |
| "max_new_tokens": { | |
| "display_name": "Max New Tokens", | |
| "type": "number", | |
| "value": 256 | |
| }, | |
| "prompt": { | |
| "display_name": "Prompt", | |
| "type": "string", | |
| "is_handle": true | |
| }, | |
| "generated_text": { | |
| "display_name": "Generated Text", | |
| "type": "string", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 2.0, | |
| "memory": "24Gi", | |
| "gpu": "A10G" | |
| } | |
| }""" | |
| import torch | |
| from transformers import pipeline | |
| from typing import Any, Dict | |
| def process_hf_text_generation(model_id: str, prompt: str, max_new_tokens: int) -> Dict[str, Any]: | |
| """ | |
| Generates text from a prompt using a Hugging Face LLM. | |
| NOTE: Simulates the inference part of a stateful Modal class. | |
| """ | |
| # --- This part would be inside the Modal class method --- | |
| # The pipeline is loaded once on container start. | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model_id, | |
| torch_dtype=torch.bfloat16, | |
| device_map="auto", | |
| ) | |
| messages = [{"role": "user", "content": prompt}] | |
| # The pipeline needs the prompt to be formatted correctly for instruct models | |
| formatted_prompt = pipe.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| outputs = pipe( | |
| formatted_prompt, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=True, | |
| temperature=0.7, | |
| top_k=50, | |
| top_p=0.95, | |
| ) | |
| # Extract only the generated part of the text | |
| generated_text = outputs[0]["generated_text"] | |
| # The output includes the prompt, so we remove it. | |
| response_text = generated_text[len(formatted_prompt):] | |
| return {"generated_text": response_text} | |
| ## image generation | |
| """{ | |
| "id": "HFImageGeneration-1", | |
| "type": "HFImageGeneration", | |
| "data": { | |
| "display_name": "Generate Image (SDXL)", | |
| "template": { | |
| "model_id": { | |
| "display_name": "Base Model ID", | |
| "type": "string", | |
| "value": "stabilityai/stable-diffusion-xl-base-1.0" | |
| }, | |
| "lora_id": { | |
| "display_name": "LoRA Model ID (Optional)", | |
| "type": "string", | |
| "value": "nerijs/pixel-art-xl" | |
| }, | |
| "prompt": { | |
| "display_name": "Prompt", | |
| "type": "string", | |
| "is_handle": true | |
| }, | |
| "image_output": { | |
| "display_name": "Image Output", | |
| "type": "object", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 2.0, | |
| "memory": "24Gi", | |
| "gpu": "A10G" | |
| } | |
| }""" | |
| import torch | |
| from diffusers import StableDiffusionXLPipeline | |
| from typing import Any, Dict | |
| def process_hf_image_generation(model_id: str, prompt: str, lora_id: str = None) -> Dict[str, Any]: | |
| """ | |
| Generates an image using a Stable Diffusion pipeline, with optional LoRA. | |
| NOTE: Simulates the inference part of a stateful Modal class. | |
| """ | |
| # --- This part would be inside the Modal class method --- | |
| # The base pipeline is loaded once. | |
| pipe = StableDiffusionXLPipeline.from_pretrained( | |
| model_id, | |
| torch_dtype=torch.float16, | |
| variant="fp16", | |
| use_safetensors=True | |
| ).to("cuda") | |
| # If a LoRA is specified, load and fuse it. | |
| # In a real app, this logic would be more complex to handle multiple LoRAs. | |
| if lora_id: | |
| pipe.load_lora_weights(lora_id) | |
| pipe.fuse_lora() | |
| # Generate the image | |
| image = pipe(prompt=prompt).images[0] | |
| output_path = "/tmp/generated_image.png" | |
| image.save(output_path) | |
| return {"image_output": {"type": "image", "value": output_path}} | |
| ## captioning image to text | |
| """{ | |
| "id": "HFVisionModel-1", | |
| "type": "HFVisionModel", | |
| "data": { | |
| "display_name": "Describe Image", | |
| "template": { | |
| "task": { | |
| "display_name": "Task", | |
| "type": "options", | |
| "options": ["image-to-text"], | |
| "value": "image-to-text" | |
| }, | |
| "model_id": { | |
| "display_name": "Model ID", | |
| "type": "string", | |
| "value": "Salesforce/blip-image-captioning-large" | |
| }, | |
| "image_input": { | |
| "display_name": "Image Input", | |
| "type": "object", | |
| "is_handle": true | |
| }, | |
| "result": { | |
| "display_name": "Result", | |
| "type": "string", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 1.0, | |
| "memory": "8Gi", | |
| "gpu": "T4" | |
| } | |
| }""" | |
| from transformers import pipeline | |
| from PIL import Image | |
| from typing import Any, Dict | |
| def process_hf_vision_model(task: str, model_id: str, image_input: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Performs a vision-based task, like image captioning. | |
| NOTE: Simulates the inference part of a stateful Modal class. | |
| """ | |
| if image_input.get("type") != "image": | |
| raise ValueError("Input must be of type 'image'.") | |
| image_path = image_input["value"] | |
| # --- This part would be inside the Modal class method --- | |
| # The pipeline is loaded once. | |
| pipe = pipeline(task, model=model_id, device="cuda") | |
| # Open the image file | |
| image = Image.open(image_path) | |
| result = pipe(image) | |
| # The output format for this pipeline is a list of dicts | |
| # e.g., [{'generated_text': 'a cat sitting on a couch'}] | |
| output_text = result[0]['generated_text'] | |
| return {"result": output_text} | |
| import os | |
| from openai import OpenAI | |
| client = OpenAI( | |
| base_url="https://api.studio.nebius.com/v1/", | |
| api_key=os.environ.get("NEBIUS_API_KEY") | |
| ) | |
| response = client.images.generate( | |
| model="black-forest-labs/flux-dev", | |
| response_format="b64_json", | |
| extra_body={ | |
| "response_extension": "png", | |
| "width": 1024, | |
| "height": 1024, | |
| "num_inference_steps": 28, | |
| "negative_prompt": "", | |
| "seed": -1 | |
| }, | |
| prompt="pokemon" | |
| ) | |
| print(response.to_json()) | |
| ## nebius image generation | |
| """{ | |
| "id": "NebiusImage-1", | |
| "type": "NebiusImage", | |
| "data": { | |
| "display_name": "Nebius Image Generation", | |
| "template": { | |
| "model": { | |
| "display_name": "Model", | |
| "type": "options", | |
| "options": [ | |
| "black-forest-labs/flux-dev", | |
| "black-forest-labs/flux-schnell", | |
| "stability-ai/sdxl" | |
| ], | |
| "value": "black-forest-labs/flux-dev" | |
| }, | |
| "api_key": { | |
| "display_name": "Nebius API Key", | |
| "type": "SecretStr", | |
| "required": true, | |
| "env_var": "NEBIUS_API_KEY" | |
| }, | |
| "prompt": { | |
| "display_name": "Prompt", | |
| "type": "string", | |
| "is_handle": true | |
| }, | |
| "negative_prompt": { | |
| "display_name": "Negative Prompt (Optional)", | |
| "type": "string", | |
| "value": "" | |
| }, | |
| "width": { | |
| "display_name": "Width", | |
| "type": "number", | |
| "value": 1024 | |
| }, | |
| "height": { | |
| "display_name": "Height", | |
| "type": "number", | |
| "value": 1024 | |
| }, | |
| "num_inference_steps": { | |
| "display_name": "Inference Steps", | |
| "type": "number", | |
| "value": 28 | |
| }, | |
| "seed": { | |
| "display_name": "Seed", | |
| "type": "number", | |
| "value": -1 | |
| }, | |
| "image_output": { | |
| "display_name": "Image Output", | |
| "type": "object", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 0.2, | |
| "memory": "256Mi", | |
| "gpu": "none" | |
| } | |
| }""" | |
| import os | |
| import base64 | |
| from typing import Any, Dict | |
| from openai import OpenAI | |
| def process_nebius_image( | |
| model: str, | |
| api_key: str, | |
| prompt: str, | |
| negative_prompt: str = "", | |
| width: int = 1024, | |
| height: int = 1024, | |
| num_inference_steps: int = 28, | |
| seed: int = -1 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Generates an image using the Nebius AI Studio API. | |
| """ | |
| if not api_key: | |
| raise ValueError("Nebius API key is missing.") | |
| client = OpenAI( | |
| base_url="https://api.studio.nebius.com/v1/", | |
| api_key=api_key | |
| ) | |
| try: | |
| response = client.images.generate( | |
| model=model, | |
| response_format="b64_json", | |
| prompt=prompt, | |
| extra_body={ | |
| "response_extension": "png", | |
| "width": width, | |
| "height": height, | |
| "num_inference_steps": num_inference_steps, | |
| "negative_prompt": negative_prompt, | |
| "seed": seed | |
| } | |
| ) | |
| # Extract the base64 encoded string | |
| b64_data = response.data[0].b64_json | |
| # Decode the string and save the image to a file | |
| image_bytes = base64.b64decode(b64_data) | |
| output_path = "/tmp/nebius_image.png" | |
| with open(output_path, "wb") as f: | |
| f.write(image_bytes) | |
| # Return a data package with the path to the generated image | |
| return {"image_output": {"type": "image", "value": output_path}} | |
| except Exception as e: | |
| print(f"Error calling Nebius API: {e}") | |
| return {"image_output": {"error": str(e)}} | |
| ## mcp new | |
| """{ | |
| "id": "MCPConnection-1", | |
| "type": "MCPConnection", | |
| "data": { | |
| "display_name": "MCP Server Connection", | |
| "template": { | |
| "server_url": { | |
| "display_name": "MCP Server URL", | |
| "type": "string", | |
| "value": "http://localhost:8000/sse", | |
| "info": "URL to MCP server (HTTP/SSE or stdio command)" | |
| }, | |
| "connection_type": { | |
| "display_name": "Connection Type", | |
| "type": "dropdown", | |
| "options": ["http", "stdio"], | |
| "value": "http" | |
| }, | |
| "allowed_tools": { | |
| "display_name": "Allowed Tools (Optional)", | |
| "type": "list", | |
| "info": "Filter specific tools. Leave empty for all tools" | |
| }, | |
| "api_key": { | |
| "display_name": "API Key (Optional)", | |
| "type": "SecretStr", | |
| "env_var": "MCP_API_KEY" | |
| }, | |
| "mcp_tools_output": { | |
| "display_name": "MCP Tools Output", | |
| "type": "list", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 0.1, | |
| "memory": "128Mi", | |
| "gpu": "none" | |
| } | |
| } | |
| """ | |
| """{ | |
| "id": "MCPAgent-1", | |
| "type": "MCPAgent", | |
| "data": { | |
| "display_name": "MCP-Powered AI Agent", | |
| "template": { | |
| "mcp_tools_input": { | |
| "display_name": "MCP Tools Input", | |
| "type": "list", | |
| "is_handle": true | |
| }, | |
| "llm_model": { | |
| "display_name": "LLM Model", | |
| "type": "dropdown", | |
| "options": ["gpt-4", "gpt-3.5-turbo", "gpt-4o", "gpt-4o-mini"], | |
| "value": "gpt-4o-mini" | |
| }, | |
| "system_prompt": { | |
| "display_name": "System Prompt", | |
| "type": "text", | |
| "value": "You are a helpful AI assistant with access to various tools. Use the available tools to help answer user questions accurately.", | |
| "multiline": true | |
| }, | |
| "user_query": { | |
| "display_name": "User Query", | |
| "type": "string", | |
| "is_handle": true | |
| }, | |
| "max_iterations": { | |
| "display_name": "Max Iterations", | |
| "type": "int", | |
| "value": 10 | |
| }, | |
| "agent_response": { | |
| "display_name": "Agent Response", | |
| "type": "string", | |
| "is_handle": true | |
| } | |
| } | |
| }, | |
| "resources": { | |
| "cpu": 0.5, | |
| "memory": "512Mi", | |
| "gpu": "none" | |
| } | |
| } | |
| """ | |
| import asyncio | |
| import os | |
| from typing import List, Optional, Dict, Any | |
| from llama_index.tools.mcp import BasicMCPClient, McpToolSpec, get_tools_from_mcp_url, aget_tools_from_mcp_url | |
| from llama_index.core.tools import FunctionTool | |
| class MCPConnectionNode: | |
| """Node to connect to MCP servers and retrieve tools""" | |
| def __init__(self): | |
| self.client = None | |
| self.tools = [] | |
| async def execute(self, | |
| server_url: str, | |
| connection_type: str = "http", | |
| allowed_tools: Optional[List[str]] = None, | |
| api_key: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Connect to MCP server and retrieve available tools | |
| """ | |
| try: | |
| # Set API key if provided | |
| if api_key: | |
| os.environ["MCP_API_KEY"] = api_key | |
| print(f"๐ Connecting to MCP server: {server_url}") | |
| if connection_type == "http": | |
| # Use LlamaIndex's built-in function to get tools[2] | |
| tools = await aget_tools_from_mcp_url( | |
| server_url, | |
| allowed_tools=allowed_tools | |
| ) | |
| else: | |
| # For stdio connections | |
| self.client = BasicMCPClient(server_url) | |
| mcp_tool_spec = McpToolSpec( | |
| client=self.client, | |
| allowed_tools=allowed_tools | |
| ) | |
| tools = await mcp_tool_spec.to_tool_list_async() | |
| self.tools = tools | |
| print(f"โ Successfully connected! Retrieved {len(tools)} tools:") | |
| for tool in tools: | |
| print(f" - {tool.metadata.name}: {tool.metadata.description}") | |
| return { | |
| "success": True, | |
| "tools_count": len(tools), | |
| "tool_names": [tool.metadata.name for tool in tools], | |
| "mcp_tools_output": tools | |
| } | |
| except Exception as e: | |
| print(f"โ Connection failed: {str(e)}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "mcp_tools_output": [] | |
| } | |
| # Example usage | |
| async def mcp_connection_demo(): | |
| node = MCPConnectionNode() | |
| # Using a public MCP server (you'll need to replace with actual public servers) | |
| result = await node.execute( | |
| server_url="http://localhost:8000/sse", # Replace with public MCP server | |
| connection_type="http", | |
| allowed_tools=None # Get all tools | |
| ) | |
| return result | |
| from llama_index.core.agent import FunctionCallingAgentWorker, AgentRunner | |
| from llama_index.llms.openai import OpenAI | |
| from llama_index.core.tools import FunctionTool | |
| from typing import List, Dict, Any | |
| import os | |
| class MCPAgentNode: | |
| """Node to create and run MCP-powered AI agents""" | |
| def __init__(self): | |
| self.agent = None | |
| self.tools = [] | |
| async def execute(self, | |
| mcp_tools_input: List[FunctionTool], | |
| user_query: str, | |
| llm_model: str = "gpt-4o-mini", | |
| system_prompt: str = "You are a helpful AI assistant.", | |
| max_iterations: int = 10) -> Dict[str, Any]: | |
| """ | |
| Create and run MCP-powered agent using FunctionCallingAgent | |
| """ | |
| try: | |
| if not mcp_tools_input: | |
| return { | |
| "success": False, | |
| "error": "No MCP tools provided", | |
| "agent_response": "No tools available to process the query." | |
| } | |
| print(f"๐ค Creating agent with {len(mcp_tools_input)} tools...") | |
| # Initialize LLM[1] | |
| llm = OpenAI( | |
| model=llm_model, | |
| api_key=os.getenv("OPENAI_API_KEY"), | |
| temperature=0.1 | |
| ) | |
| # Create function calling agent (more reliable than ReAct)[2] | |
| agent_worker = FunctionCallingAgentWorker.from_tools( | |
| tools=mcp_tools_input, | |
| llm=llm, | |
| verbose=True, | |
| system_prompt=system_prompt | |
| ) | |
| self.agent = AgentRunner(agent_worker) | |
| print(f"๐ญ Processing query: {user_query}") | |
| # Execute the query | |
| response = self.agent.chat(user_query) | |
| return { | |
| "success": True, | |
| "agent_response": str(response.response), | |
| "user_query": user_query, | |
| "tools_used": len(mcp_tools_input) | |
| } | |
| except Exception as e: | |
| print(f"โ Agent execution failed: {str(e)}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "agent_response": f"Sorry, I encountered an error while processing your query: {str(e)}" | |
| } | |
| # Example usage | |
| async def mcp_agent_demo(tools: List[FunctionTool]): | |
| node = MCPAgentNode() | |
| result = await node.execute( | |
| mcp_tools_input=tools, | |
| user_query="What tools do you have available and what can you help me with?", | |
| llm_model="gpt-4o-mini", | |
| system_prompt="You are a helpful AI assistant. Use your available tools to provide accurate and useful responses." | |
| ) | |
| return result | |
| example | |
| import asyncio | |
| import os | |
| from typing import List, Dict, Any | |
| from llama_index.core.tools import FunctionTool | |
| from llama_index.core.agent import FunctionCallingAgentWorker, AgentRunner | |
| from llama_index.llms.openai import OpenAI | |
| class CompleteMCPWorkflowDemo: | |
| """Complete demo of MCP workflow with connection and agent nodes""" | |
| def __init__(self): | |
| self.connection_node = MCPConnectionNode() | |
| self.agent_node = MCPAgentNode() | |
| # Set your OpenAI API key | |
| # os.environ["OPENAI_API_KEY"] = "your-openai-api-key-here" | |
| async def create_mock_mcp_tools(self) -> List[FunctionTool]: | |
| """ | |
| Create mock MCP tools that simulate a real MCP server | |
| Replace this with actual MCP server connection when available | |
| """ | |
| def get_weather(city: str, country: str = "US") -> str: | |
| """Get current weather information for a city""" | |
| weather_data = { | |
| "london": "Cloudy, 15ยฐC, humidity 80%", | |
| "paris": "Sunny, 22ยฐC, humidity 45%", | |
| "tokyo": "Rainy, 18ยฐC, humidity 90%", | |
| "new york": "Partly cloudy, 20ยฐC, humidity 55%" | |
| } | |
| result = weather_data.get(city.lower(), f"Weather data not available for {city}") | |
| return f"Weather in {city}, {country}: {result}" | |
| def search_news(topic: str, limit: int = 5) -> str: | |
| """Search for latest news on a given topic""" | |
| news_items = [ | |
| f"Breaking: New developments in {topic}", | |
| f"Analysis: {topic} trends for 2025", | |
| f"Expert opinion on {topic} industry changes", | |
| f"Research shows {topic} impact on society", | |
| f"Global {topic} market outlook" | |
| ] | |
| return f"Top {limit} news articles about {topic}:\n" + "\n".join(news_items[:limit]) | |
| def calculate_math(expression: str) -> str: | |
| """Calculate mathematical expressions safely""" | |
| try: | |
| # Simple and safe evaluation | |
| allowed_chars = "0123456789+-*/().,_ " | |
| if all(c in allowed_chars for c in expression): | |
| result = eval(expression) | |
| return f"Result: {expression} = {result}" | |
| else: | |
| return f"Invalid expression: {expression}" | |
| except Exception as e: | |
| return f"Error calculating {expression}: {str(e)}" | |
| def get_company_info(company: str) -> str: | |
| """Get basic company information""" | |
| companies = { | |
| "openai": "OpenAI - AI research company, creator of GPT models", | |
| "microsoft": "Microsoft - Technology corporation, cloud computing and software", | |
| "google": "Google - Search engine and technology company", | |
| "amazon": "Amazon - E-commerce and cloud computing platform" | |
| } | |
| return companies.get(company.lower(), f"Company information not found for {company}") | |
| # Convert to FunctionTool objects[2] | |
| tools = [ | |
| FunctionTool.from_defaults(fn=get_weather), | |
| FunctionTool.from_defaults(fn=search_news), | |
| FunctionTool.from_defaults(fn=calculate_math), | |
| FunctionTool.from_defaults(fn=get_company_info) | |
| ] | |
| return tools | |
| async def run_complete_workflow(self): | |
| """ | |
| Run the complete MCP workflow demonstration | |
| """ | |
| print("๐ Starting Complete MCP Workflow Demo") | |
| print("=" * 60) | |
| # Step 1: Setup MCP Connection (simulated) | |
| print("\n๐ก Step 1: Setting up MCP Connection...") | |
| # In real implementation, this would connect to actual MCP server | |
| mock_tools = await self.create_mock_mcp_tools() | |
| connection_result = { | |
| "success": True, | |
| "tools_count": len(mock_tools), | |
| "tool_names": [tool.metadata.name for tool in mock_tools], | |
| "mcp_tools_output": mock_tools | |
| } | |
| if connection_result["success"]: | |
| print(f"โ MCP Connection successful!") | |
| print(f"๐ Retrieved {connection_result['tools_count']} tools:") | |
| for tool_name in connection_result['tool_names']: | |
| print(f" - {tool_name}") | |
| else: | |
| print(f"โ MCP Connection failed: {connection_result.get('error')}") | |
| return | |
| # Step 2: Create and test MCP Agent | |
| print(f"\n๐ค Step 2: Creating MCP-Powered Agent...") | |
| test_queries = [ | |
| "What's the weather like in London?", | |
| "Search for news about artificial intelligence", | |
| "Calculate 15 * 8 + 32", | |
| "Tell me about OpenAI company", | |
| "What tools do you have and what can you help me with?" | |
| ] | |
| for i, query in enumerate(test_queries, 1): | |
| print(f"\n๐ฌ Query {i}: {query}") | |
| print("-" * 40) | |
| agent_result = await self.agent_node.execute( | |
| mcp_tools_input=connection_result["mcp_tools_output"], | |
| user_query=query, | |
| llm_model="gpt-4o-mini", | |
| system_prompt="""You are a helpful AI assistant with access to weather, news, calculation, and company information tools. | |
| When a user asks a question: | |
| 1. Determine which tool(s) can help answer their question | |
| 2. Use the appropriate tool(s) to gather information | |
| 3. Provide a clear, helpful response based on the tool results | |
| Always be informative and explain what tools you used.""", | |
| max_iterations=5 | |
| ) | |
| if agent_result["success"]: | |
| print(f"๐ฏ Agent Response:") | |
| print(f"{agent_result['agent_response']}") | |
| else: | |
| print(f"โ Agent Error: {agent_result['error']}") | |
| print("\n" + "="*50) | |
| # Function to connect to real MCP servers when available | |
| async def connect_to_real_mcp_server(server_url: str): | |
| """ | |
| Example of connecting to a real MCP server | |
| Replace server_url with actual public MCP servers | |
| """ | |
| try: | |
| from llama_index.tools.mcp import aget_tools_from_mcp_url | |
| print(f"๐ Attempting to connect to: {server_url}") | |
| tools = await aget_tools_from_mcp_url(server_url) | |
| print(f"โ Connected successfully! Found {len(tools)} tools:") | |
| for tool in tools: | |
| print(f" - {tool.metadata.name}: {tool.metadata.description}") | |
| return tools | |
| except Exception as e: | |
| print(f"โ Failed to connect to {server_url}: {e}") | |
| return [] | |
| # Main execution | |
| async def main(): | |
| """Run the complete demo""" | |
| # Option 1: Run with mock tools (works immediately) | |
| print("๐ฎ Running MCP Workflow Demo with Mock Tools") | |
| demo = CompleteMCPWorkflowDemo() | |
| await demo.run_complete_workflow() | |
| # Option 2: Try connecting to real MCP servers (uncomment when available) | |
| # real_servers = [ | |
| # "http://your-mcp-server.com:8000/sse", | |
| # "https://api.example.com/mcp" | |
| # ] | |
| # | |
| # for server_url in real_servers: | |
| # tools = await connect_to_real_mcp_server(server_url) | |
| # if tools: | |
| # # Use real tools with agent | |
| # agent_node = MCPAgentNode() | |
| # result = await agent_node.execute( | |
| # mcp_tools_input=tools, | |
| # user_query="What can you help me with?", | |
| # llm_model="gpt-4o-mini" | |
| # ) | |
| # print(f"Real MCP Agent Response: {result}") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |