#!/usr/bin/env python3 """Discover RAGFlow MCP tools - using urllib with proper SSE handling.""" import json, urllib.request, urllib.error, time, threading, sys, re KEY = "ragflow-mvygWXqAN0G54pjwe45Ga5Gz1KMH2GY0yfSQAXU_Lr8" BASE = "http://192.168.1.140:9382" responses = {} session_id = None running = True lock = threading.Lock() def sse_listener(): global session_id print("[SSE] Connecting...", file=sys.stderr) req = urllib.request.Request( f"{BASE}/sse", headers={"api_key": KEY, "Accept": "text/event-stream"} ) try: resp = urllib.request.urlopen(req, timeout=30) print("[SSE] Connected, reading events...", file=sys.stderr) buffer = b"" while running: try: chunk = resp.read(4096) if not chunk: print("[SSE] Stream ended", file=sys.stderr) break buffer += chunk except Exception as e: print(f"[SSE] Read error: {e}", file=sys.stderr) continue while b"\n\n" in buffer: msg, buffer = buffer.split(b"\n\n", 1) text = msg.decode("utf-8", errors="replace") print(f"[SSE] Event: {text[:200]}", file=sys.stderr) event_type = data = None for line in text.split("\n"): line = line.strip() if line.startswith("event:"): event_type = line[6:].strip() elif line.startswith("data:"): data = line[5:].strip() if event_type == "endpoint" and data: m = re.search(r'session_id=([a-f0-9]+)', data) if m: session_id = m.group(1) print(f"[SSE] ✓ Session: {session_id}", file=sys.stderr) elif event_type == "message" and data: try: payload = json.loads(data) rid = payload.get("id") if rid is not None: with lock: responses[rid] = payload r = payload.get("result") e = payload.get("error") s = "✓" if r else f"✗ {str(e)[:60]}" print(f"[SSE] id={rid} {s}", file=sys.stderr) except: pass except Exception as e: print(f"[SSE] Fatal: {e}", file=sys.stderr) def post_rpc(method, params=None, msg_id=1): if not session_id: return None url = f"{BASE}/messages/?session_id={session_id}" payload = {"jsonrpc":"2.0","id":msg_id,"method":method,"params":params or {}} data = json.dumps(payload).encode() req = urllib.request.Request(url, data=data, headers={ "api_key": KEY, "Content-Type": "application/json" }, method="POST") try: resp = urllib.request.urlopen(req, timeout=10) body = resp.read().decode().strip() return body if body else "Accepted" except Exception as e: return f"Error: {e}" def wait(id_val, timeout=15): deadline = time.time() + timeout while time.time() < deadline: with lock: if id_val in responses: return responses.pop(id_val) time.sleep(0.2) return None print("Starting SSE listener thread...", file=sys.stderr) bg = threading.Thread(target=sse_listener, daemon=True) bg.start() # Wait for session ID for i in range(40): if session_id: break time.sleep(0.25) if not session_id: print("❌ Failed to get session ID from SSE", file=sys.stderr) running = False sys.exit(1) # Initialize print("\n▶ Initialize...", file=sys.stderr) r = post_rpc("initialize", {"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"openclaw","version":"1.0"}}, 1) init_resp = wait(1, 10) if init_resp: si = init_resp["result"].get("serverInfo", {}) print(f"✓ Server: {si.get('name','?')} v{si.get('version','?')}", file=sys.stderr) else: print("✗ No init response from SSE", file=sys.stderr) running = False sys.exit(1) post_rpc("notifications/initialized", {}, None) time.sleep(0.5) # List tools print("\n▶ tools/list...", file=sys.stderr) post_rpc("tools/list", {}, 2) tools_resp = wait(2, 15) if not tools_resp: print("❌ No tools response", file=sys.stderr) running = False sys.exit(1) tools = tools_resp.get("result", {}).get("tools", []) # ========== OUTPUT ========== print(f"\n{'='*70}") print(f"📦 RAGFlow MCP Tools: {len(tools)}") print(f"{'='*70}") for t in tools: name = t.get("name", "?") desc = t.get("description", "-") schema = t.get("inputSchema", {}) props = schema.get("properties", {}) required = schema.get("required", []) print(f"\n🔧 {name}") print(f" {desc}") if props: print(f" Params:") for pn, ps in props.items(): req = " [REQUIRED]" if pn in required else "" enum = f" enum={ps['enum']}" if "enum" in ps else "" print(f" • {pn}: {ps.get('type','?')}{enum}{req}") if ps.get('description'): print(f" {ps['description'][:100]}") # Test knowledge base tools print(f"\n{'='*70}") print(f"📖 Testing Knowledge Base Tools") print(f"{'='*70}") for t in tools: name = t["name"] if "delete" in name.lower() or "update" in name.lower() or "remove" in name.lower(): continue schema = t.get("inputSchema", {}) props = schema.get("properties", {}) required = schema.get("required", []) # Build args args = {} for pn, ps in props.items(): if pn in ("dataset_id", "knowledge_base_id", "kb_id"): args[pn] = "eec1104264c311f1aa50a7b0b2fe21dc" elif pn in ("query", "question", "search_query"): args[pn] = "Amazon新品广告投放策略" elif pn in ("top_k", "limit", "page_size", "max_results"): args[pn] = 3 elif pn == "page": args[pn] = 1 elif ps.get("type") == "number" or ps.get("type") == "integer": args[pn] = 3 elif ps.get("type") == "string": if pn not in args: args[pn] = "" elif ps.get("type") == "boolean": args[pn] = False # Only call if required params are satisfied if required and not all(r in args for r in required): continue print(f"\n▶ {name}({json.dumps(args, ensure_ascii=False)[:100]})", file=sys.stderr) post_rpc("tools/call", {"name": name, "arguments": args}, 100) resp = wait(100, 10) if resp: result = resp.get("result", {}) content = result.get("content", []) is_error = result.get("isError", False) if is_error: print(f" ❌ {str(content)[:200]}") elif content: for c in content[:2]: if c.get("type") == "text": text = c.get("text", "") print(text[:3000]) if len(text) > 3000: print(" ... [truncated]") elif c.get("type") == "resource": res = c.get("resource", {}) print(f" 📎 Resource: {json.dumps(res, ensure_ascii=False)[:200]}") else: print(f" Raw: {json.dumps(result, ensure_ascii=False)[:500]}") else: print(f" ⏱ Timeout") running = False print("\n✓ Done.", file=sys.stderr)