# mcp_jsonrpc_client.py
AUTH_TOKEN = "Bearer xxxx" # replace with your token
import json
import uuid
import requests
MCP_URL = "https://mcp.publish.pylar.ai/mcp"
AUTH = AUTH_TOKEN
HEADERS = {
"Authorization": AUTH,
"Content-Type": "application/json",
"Accept": "application/json",
}
def rpc_call(method: str, params=None):
payload = {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": method,
}
if params is not None:
payload["params"] = params
r = requests.post(MCP_URL, headers=HEADERS, data=json.dumps(payload), timeout=30)
r.raise_for_status()
resp = r.json()
if "error" in resp:
raise RuntimeError(f"RPC error {resp['error'].get('code')}: {resp['error'].get('message')}")
return resp["result"]
def initialize():
# Many MCP servers accept initialize but some don't strictly require it.
params = {
"clientInfo": {"name": "mcp-jsonrpc-python", "version": "0.1.0"},
"capabilities": {}, # add if you advertise anything specific
}
try:
return rpc_call("initialize", params)
except Exception as e:
# If initialize isn't supported, it's fine to continue.
print(f"(initialize skipped: {e})")
def list_tools():
result = rpc_call("tools/list", {})
# Expected shape: {"tools":[{"name":"...","description":"...","inputSchema":{...}}, ...]}
tools = result.get("tools", [])
for i, t in enumerate(tools, 1):
desc = t.get("description") or ""
print(f"{i}. {t.get('name')} {('- ' + desc) if desc else ''}")
return tools
def get_tool_input_schema(tool):
"""Extract the input schema for a tool and return required parameters."""
schema = tool.get("inputSchema", {})
properties = schema.get("properties", {})
required = schema.get("required", [])
return properties, required
def prompt_for_tool_arguments(tool):
"""Prompt user for tool arguments based on the tool's input schema."""
properties, required = get_tool_input_schema(tool)
if not properties:
# Fallback to generic input if no schema
user_text = input("Enter input for the tool: ").strip()
return {"input": user_text}
arguments = {}
print(f"\nTool '{tool['name']}' requires the following parameters:")
for param_name, param_info in properties.items():
param_type = param_info.get("type", "string")
param_desc = param_info.get("description", "")
is_required = param_name in required
prompt = f" {param_name} ({param_type})"
if is_required:
prompt += " [REQUIRED]"
if param_desc:
prompt += f" - {param_desc}"
print(prompt)
if is_required:
while True:
value = input(f"Enter {param_name}: ").strip()
if value:
# Try to convert to appropriate type
if param_type == "integer":
try:
arguments[param_name] = int(value)
break
except ValueError:
print("Please enter a valid integer.")
continue
elif param_type == "number":
try:
arguments[param_name] = float(value)
break
except ValueError:
print("Please enter a valid number.")
continue
elif param_type == "boolean":
if value.lower() in ["true", "1", "yes", "y"]:
arguments[param_name] = True
break
elif value.lower() in ["false", "0", "no", "n"]:
arguments[param_name] = False
break
else:
print("Please enter true/false, yes/no, or 1/0.")
continue
else:
arguments[param_name] = value
break
else:
print("This parameter is required.")
else:
value = input(f"Enter {param_name} (optional, press Enter to skip): ").strip()
if value:
# Try to convert to appropriate type
if param_type == "integer":
try:
arguments[param_name] = int(value)
except ValueError:
arguments[param_name] = value
elif param_type == "number":
try:
arguments[param_name] = float(value)
except ValueError:
arguments[param_name] = value
elif param_type == "boolean":
if value.lower() in ["true", "1", "yes", "y"]:
arguments[param_name] = True
elif value.lower() in ["false", "0", "no", "n"]:
arguments[param_name] = False
else:
arguments[param_name] = value
else:
arguments[param_name] = value
return arguments
def call_tool(name: str, arguments: dict):
# Arguments must match the tool's inputSchema.
# If you don't know the schema, try a single-field convention like {"input": "<text>"}.
params = {"name": name, "arguments": arguments}
result = rpc_call("tools/call", params)
# Expected shape: {"content":[{"type":"text","text":"..."}], "isError": false}
return result
def main():
initialize()
tools = list_tools()
if not tools:
print("No tools available.")
return
choice = input("\nSelect a tool by number or name: ").strip()
if choice.isdigit():
idx = int(choice) - 1
if idx < 0 or idx >= len(tools):
print("Invalid index.")
return
tool = tools[idx]
else:
matches = [t for t in tools if t.get("name","").lower() == choice.lower()]
tool = matches[0] if matches else None
if not tool:
print("Tool not found.")
return
tool_name = tool["name"]
print(f"Selected: {tool_name}")
# Show the tool's input schema
properties, required = get_tool_input_schema(tool)
if properties:
print(f"\nInput Schema for '{tool_name}':")
print(json.dumps(tool.get("inputSchema", {}), indent=2))
# Get proper arguments based on schema
arguments = prompt_for_tool_arguments(tool)
print(f"\nCalling tool with arguments: {json.dumps(arguments, indent=2)}")
try:
result = call_tool(tool_name, arguments)
print("\n=== Tool Result ===")
if result.get("isError"):
print("ERROR:", result)
else:
# Handle different content types
content = result.get("content", [])
for item in content:
if item.get("type") == "text":
print(item.get("text", ""))
else:
print(json.dumps(item, indent=2))
except Exception as e:
print(f"Error calling tool: {e}")
print("Full error details:")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()