Implement an MCP Client
In this guide, you’ll learn how to implement an MCP client.
Overview
The MCP client is the application that will communicate with one or more MCP servers, and use the tools and resources they provide. Our guide will focus on implementing a client that can call tools using the Streamable HTTP transport. The MCP Protocol includes several transports, and other primitives besides tools, such as prompts and resources. To learn more about those primitives and alternative transports, please refer to the MCP Protocol documentation.
Features of the client
The client will have the following features:
- Connect to an MCP server using the Streamable HTTP transport
- If the MCP server requires authentication, the client will handle the authorization flow
- List available tools
- Call tools
If you’re looking for a ready-made client, you can use the reference implementation available in the MCP reference implementation repository, which is the base for this guide.
Setup your environment
We’ll use uv to manage our Python environment. Let’s create a new project and install the necessary dependencies:
mkdir mcp-client
cd mcp-client
uv init
uv venv
source .venv/bin/activate
uv add mcpImplementation
Our client will have the following structure:
- A volatile (in memory) token storage
- A thin HTTP server to have a callback endpoint for the authorization flow
- The actual MCP client, that manages:
- Connecting to the MCP server
- Listing available tools
- Calling tools
- A simple CLI to interact with the client
Import all required libraries
To keep things organized, let’s import all required libraries at the top of the file.
import asyncio
import threading
import time
import webbrowser
from datetime import timedelta
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Any
from urllib.parse import parse_qs, urlparse
from mcp.client.auth import OAuthClientProvider, TokenStorage
from mcp.client.session import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthTokenWe’re importing:
asyncioandthreadingto handle asynchronous operations for our small server and the MCP client.webbrowserto open the browser for the authorization flow.timedeltato handle the expiration time of the tokens.BaseHTTPRequestHandlerandHTTPServerto create a small HTTP server to handle the callback endpoint for the authorization flow.Anyto handle the type of the tokens.OAuthClientProviderandTokenStorageto handle the OAuth client provider and the token storage.ClientSessionto handle the client session.streamablehttp_clientto handle the Streamable HTTP client.OAuthClientInformationFull,OAuthClientMetadataandOAuthTokento handle the OAuth client information, metadata and token.
The token storage
The token storage is a simple in-memory class that will store the tokens for the MCP server. We’re using the TokenStorage class from the MCP client library to handle the token storage, as it implements a compatible interface.
class RAMTokenStorage(TokenStorage):
"""Volatile token storage"""
def __init__(self):
self.tokens: OAuthToken | None = None
self.client_information: OAuthClientInformationFull | None = NoneThe HTTP server
The HTTP server is a simple class that will handle the callback endpoint for the authorization flow. We’re using the BaseHTTPRequestHandler class from the http.server module to create a small HTTP server to handle the callback endpoint for the authorization flow. It’s important to not use this server in production.
Our server has two parts:
- The callback handler, that handles the callback endpoint for the authorization flow
- The callback server, that starts the HTTP server and waits for the callback handler to complete the authorization flow
The callback handler only cares about:
- successfully completing the authorization flow
- displaying errors from the authorization flow
- capturing the state from the authorization flow
class HTTPHandler(BaseHTTPRequestHandler):
"""HTTP handler to capture OAuth callback"""
def __init__(self, request, client_addr, server, callback_data):
self.callback_data = callback_data
super().__init__(request, client_addr, server)
def do_GET(self):
"""Handle GET requests from OAuth redirect"""
parsed = urlparse(self.path)
query_params = parse_qs(parsed.query)
if "code" in query_params: # we've successfully completed the authorization flow
self.callback_data["authorization_code"] = query_params["code"][0]
self.callback_data["state"] = query_params.get("state", [None])[0]
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(b"""
<html>
<body>
<h1>Authorization Successful</h1>
<p>You can close this window and return to your app</p>
<script>setTimeout(() => window.close(), 2000);</script>
</body>
</html>
""")
elif "error" in query_params: # we've encountered an error
self.callback_data["error"] = query_params["error"][0]
self.send_response(400)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(f"""
<html>
<body>
<h1>Authorization Failed</h1>
<p>Error: {query_params["error"][0]}</p>
<p>You can close this window and return to your app</p>
</body>
</html>
""".encode())
else: # for everything else, we return a not found response
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
# supress default logging
pass
The callback server is responsible for:
- Starting the HTTP server
- Waiting for the callback handler to complete the authorization flow
- Stopping the HTTP server
class CallbackServer:
def __init__(self, port=3000):
self.port = port
self.server = None
self.thread = None
self.callback_data = { "authorization_code": None, "state": None, "error": None, }
def _create_handler_with_data(self):
callback_data = self.callback_data
class DataCallbackHandler(HTTPHandler):
def __init__(self, request, client_address, server):
super().__init__(request, client_address, server, callback_data)
return DataCallbackHandler
def start(self):
handler_class = self._create_handler_with_data()
self.server = HTTPServer(("localhost", self.port), handler_class)
self.thread = threading.Thread(target=self.server.serve_forever,
daemon=True)
self.thread.start()
print(f"Started callback server on http://localhost:{self.port}")
def stop(self):
if self.server:
self.server.shutdown()
self.server.server_close()
if self.thread:
self.thread.join(timeout=1)
def wait_for_callback(self, timeout=300):
start_time = time.time()
while time.time() - start_time < timeout:
if self.callback_data["authorization_code"]:
return self.callback_data["authorization_code"]
elif self.callback_data["error"]:
raise Exception(f"OAuth error: {self.callback_data["error"]}")
time.sleep(0.1)
raise Exception("Timeout waiting for OAuth callback")
def get_state(self):
return self.callback_data["state"]The MCP client
The central class is the actual MCP client, that will handle:
- Connecting to the MCP server
- Listing available tools
- Calling tools
- A simple CLI to interact with the client
As with many other protocols, the main component is the session. Each function that we add to the client will interact with the session to achieve its purpose.
class MCPOAuthClient:
def __init__(self, server_url: str):
self.server_url = server_url
self.session: ClientSession | None = NoneThe first step is to initialize the session, connecting to the provided MCP Server URL. The function may seem complex, but it simply initializes the client components:
- The callback server (on port 3030)
- The callback handler
- The OAuth client provider
- The Streamable HTTP client
The function also sets the server_url attribute, which will be used to connect to the MCP server.
async def connect(self):
print(f"Attempting to connect to {self.server_url}")
try:
callback_server = CallbackServer(port=3030)
callback_server.start()
async def callback_handler() -> tuple[str, str | None]:
print("Waiting for authorization callback...")
try:
auth_code = callback_server.wait_for_callback(timeout=300)
return auth_code, callback_server.get_state()
finally:
callback_server.stop()
client_metadata = {
"client_name": "MCP OAuth client",
"redirect_uris": ["http://localhost:3030/callback"],
"grant_types": ["authorization_code", "refresh_token"],
"response_types": ["code"],
"token_ednpoint_auth_method": "client_secret_post",
}
async def _default_redirect_handler(authorization_url: str) -> None:
print(f"Opening browser for authorization: {authorization_url}")
webbrowser.open(authorization_url)
oauth_auth = OAuthClientProvider(
server_url=self.server_url.replace("/mcp", ""),
client_metadata=OAuthClientMetadata.model_validate(client_metadata),
storage=RAMTokenStorage(),
redirect_handler=_default_redirect_handler,
callback_handler=callback_handler
)
print("Opening StreamableHTTP transport connection with auth...")
async with streamablehttp_client(
url=self.server_url,
auth=oauth_auth,
timeout=timedelta(seconds=60),
) as (read_stream, write_stream, get_session_id):
await self._run_session(read_stream,
write_stream,
get_session_id)
except Exception as e:
print(f"Failed to connect: {e}")
import traceback
traceback.print_exc()The streamablehttp_client function creates two streams (read and write) and a function to get the session ID. These 3 components are the actual transport we use to communicate with the MCP server. The _run_session function then uses these components to initialize the session (enabling our client to interact with the MCP server), and then starts the interactive loop, which runs indefinitely in our client.
async def _run_session(self, read_stream, write_stream, get_session_id):
print("Initializing MCP session")
async with ClientSession(read_stream, write_stream) as session:
self.session = session
print("Starting session initialization...")
await session.initialize()
print("Completed session initialization")
print(f"Conntected to MCP server at {self.server_url}")
session_id = get_session_id()
if session_id:
print(f"Session ID: {session_id}")
await self.interactive_loop()The interactive loop is the “app” portion of this client. In more sophisticated clients, you would normally interact with convenience functions (which we’ll implement in a second!). But this simple CLI is illustrative of how you would use the client to interact with the MCP server.
If you’ve written other CLI’s, you will be familiar with this pattern. An infinite loop that waits for user input, parses and validates it, and then calls the appropriate function. In this case, we have 3 commands:
list- List available toolscall <tool_name> [args]- Call a toolquit- Exit the client, which will also close the session and the transport as the objects are cleaned up
async def interactive_loop(self):
"""Run interactive command loop."""
print("\n🎯 Interactive MCP Client")
print("Commands:")
print(" list - List available tools")
print(" call <tool_name> [args] - Call a tool")
print(" quit - Exit the client")
print()
while True:
try:
command = input("mcp> ").strip()
if not command:
continue
if command == "quit":
break
elif command == "list":
await self.list_tools()
elif command.startswith("call "):
parts = command.split(maxsplit=2)
tool_name = parts[1] if len(parts) > 1 else ""
if not tool_name:
print("❌ Please specify a tool name")
continue
# Parse arguments (simple JSON-like format)
arguments = {}
if len(parts) > 2:
import json
try:
arguments = json.loads(parts[2])
except json.JSONDecodeError:
print("❌ Invalid arguments format (expected JSON)")
continue
await self.call_tool(tool_name, arguments)
else:
print("❌ Unknown command. Try 'list', 'call <tool_name>', or 'quit'")
except KeyboardInterrupt:
print("\n\n👋 Goodbye!")
break
except EOFError:
breakWhat remains to implement inside the MCPOAuthClient class are the functions to list available tools and call tools.
Both follow a very simple pattern, the session object already provides these functions, and we are simply adding a small amount of error handling, and printing the results to the terminal. On a real app, you would typically return these results to the caller, and perhaps inject it into an LLM for another turn.
async def list_tools(self):
"""List available tools from the server."""
if not self.session:
print("❌ Not connected to server")
return
try:
result = await self.session.list_tools()
if hasattr(result, "tools") and result.tools:
print("\n📋 Available tools:")
for i, tool in enumerate(result.tools, 1):
print(f"{i}. {tool.name}")
if tool.description:
print(f" Description: {tool.description}")
if tool.inputSchema:
print(f" Input Schema: {tool.inputSchema}")
print()
else:
print("No tools available")
except Exception as e:
print(f"❌ Failed to list tools: {e}")
async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None = None):
"""Call a specific tool."""
if not self.session:
print("❌ Not connected to server")
return
try:
result = await self.session.call_tool(tool_name, arguments or {})
print(f"\n🔧 Tool '{tool_name}' result:")
if hasattr(result, "content"):
for content in result.content:
if content.type == "text":
print(content.text)
else:
print(content)
else:
print(result)
except Exception as e:
print(f"❌ Failed to call tool '{tool_name}': {e}")The main function
With the above, we have a working MCP client!. Let’s add a main function to run it. We encourage you to configure your own MCP server URL, and we’ve included the Arcade public MCP server as an example.
async def main():
print("Hello from mcp-client!")
server_url = "https://api.arcade.dev/v1/mcps/arcade-anon/mcp"
client = MCPOAuthClient(server_url)
await client.connect()
if __name__ == "__main__":
asyncio.run(main())