LearnMCPImplement an MCP Client

Implement an MCP Client

In this guide, you’ll learn how to implement an MCP client.

Overview

The MCP client is the application that will communicate with one or more MCP servers, and use the tools and resources they provide. Our guide will focus on implementing a client that can call tools using the Streamable HTTP transport. The MCP Protocol includes several transports, and other primitives besides tools, such as prompts and resources. To learn more about those primitives and alternative transports, please refer to the MCP Protocol documentation.

Features of the client

The client will have the following features:

  • Connect to an MCP server using the Streamable HTTP transport
  • If the MCP server requires authentication, the client will handle the authorization flow
  • List available tools
  • Call tools

If you’re looking for a ready-made client, you can use the reference implementation available in the MCP reference implementation repository, which is the base for this guide.

Setup your environment

We’ll use uv to manage our Python environment. Let’s create a new project and install the necessary dependencies:

mkdir mcp-client
cd mcp-client
uv init
uv venv
source .venv/bin/activate
uv add mcp

Implementation

Our client will have the following structure:

  • A volatile (in memory) token storage
  • A thin HTTP server to have a callback endpoint for the authorization flow
  • The actual MCP client, that manages:
    • Connecting to the MCP server
    • Listing available tools
    • Calling tools
    • A simple CLI to interact with the client

Import all required libraries

To keep things organized, let’s import all required libraries at the top of the file.

import asyncio
import threading
import time
import webbrowser
from datetime import timedelta
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Any
from urllib.parse import parse_qs, urlparse
 
from mcp.client.auth import OAuthClientProvider, TokenStorage
from mcp.client.session import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken

We’re importing:

  • asyncio and threading to handle asynchronous operations for our small server and the MCP client.
  • webbrowser to open the browser for the authorization flow.
  • timedelta to handle the expiration time of the tokens.
  • BaseHTTPRequestHandler and HTTPServer to create a small HTTP server to handle the callback endpoint for the authorization flow.
  • Any to handle the type of the tokens.
  • OAuthClientProvider and TokenStorage to handle the OAuth client provider and the token storage.
  • ClientSession to handle the client session.
  • streamablehttp_client to handle the Streamable HTTP client.
  • OAuthClientInformationFull, OAuthClientMetadata and OAuthToken to handle the OAuth client information, metadata and token.

The token storage

The token storage is a simple in-memory class that will store the tokens for the MCP server. We’re using the TokenStorage class from the MCP client library to handle the token storage, as it implements a compatible interface.

class RAMTokenStorage(TokenStorage):
    """Volatile token storage"""
 
    def __init__(self):
        self.tokens: OAuthToken | None = None
        self.client_information: OAuthClientInformationFull | None = None

The HTTP server

The HTTP server is a simple class that will handle the callback endpoint for the authorization flow. We’re using the BaseHTTPRequestHandler class from the http.server module to create a small HTTP server to handle the callback endpoint for the authorization flow. It’s important to not use this server in production.

Our server has two parts:

  • The callback handler, that handles the callback endpoint for the authorization flow
  • The callback server, that starts the HTTP server and waits for the callback handler to complete the authorization flow

The callback handler only cares about:

  • successfully completing the authorization flow
  • displaying errors from the authorization flow
  • capturing the state from the authorization flow
class HTTPHandler(BaseHTTPRequestHandler):
    """HTTP handler to capture OAuth callback"""
 
    def __init__(self, request, client_addr, server, callback_data):
        self.callback_data = callback_data
        super().__init__(request, client_addr, server)
 
    def do_GET(self):
        """Handle GET requests from OAuth redirect"""
        parsed = urlparse(self.path)
        query_params = parse_qs(parsed.query)
 
        if "code" in query_params:  # we've successfully completed the authorization flow
            self.callback_data["authorization_code"] = query_params["code"][0]
            self.callback_data["state"] = query_params.get("state", [None])[0]
            self.send_response(200)
            self.send_header("Content-type", "text/html")
            self.end_headers()
            self.wfile.write(b"""
            <html>
                <body>
                    <h1>Authorization Successful</h1>
                    <p>You can close this window and return to your app</p>
                    <script>setTimeout(() => window.close(), 2000);</script>
                </body>
            </html>
            """)
        elif "error" in query_params:  # we've encountered an error
            self.callback_data["error"] = query_params["error"][0]
            self.send_response(400)
            self.send_header("Content-type", "text/html")
            self.end_headers()
            self.wfile.write(f"""
            <html>
                <body>
                    <h1>Authorization Failed</h1>
                    <p>Error: {query_params["error"][0]}</p>
                    <p>You can close this window and return to your app</p>
                </body>
            </html>
            """.encode())
        else:  # for everything else, we return a not found response
            self.send_response(404)
            self.end_headers()
 
    def log_message(self, format, *args):
        # supress default logging
        pass
 

The callback server is responsible for:

  • Starting the HTTP server
  • Waiting for the callback handler to complete the authorization flow
  • Stopping the HTTP server
class CallbackServer:
    def __init__(self, port=3000):
        self.port = port
        self.server = None
        self.thread = None
        self.callback_data = { "authorization_code": None, "state": None, "error": None, }
 
    def _create_handler_with_data(self):
        callback_data = self.callback_data
 
        class DataCallbackHandler(HTTPHandler):
            def __init__(self, request, client_address, server):
                super().__init__(request, client_address, server, callback_data)
 
        return DataCallbackHandler
 
    def start(self):
        handler_class = self._create_handler_with_data()
        self.server = HTTPServer(("localhost", self.port), handler_class)
        self.thread = threading.Thread(target=self.server.serve_forever,
                                       daemon=True)
        self.thread.start()
        print(f"Started callback server on http://localhost:{self.port}")
 
    def stop(self):
        if self.server:
            self.server.shutdown()
            self.server.server_close()
        if self.thread:
            self.thread.join(timeout=1)
 
    def wait_for_callback(self, timeout=300):
        start_time = time.time()
        while time.time() - start_time < timeout:
            if self.callback_data["authorization_code"]:
                return self.callback_data["authorization_code"]
            elif self.callback_data["error"]:
                raise Exception(f"OAuth error: {self.callback_data["error"]}")
            time.sleep(0.1)
        raise Exception("Timeout waiting for OAuth callback")
 
    def get_state(self):
        return self.callback_data["state"]

The MCP client

The central class is the actual MCP client, that will handle:

  • Connecting to the MCP server
  • Listing available tools
  • Calling tools
  • A simple CLI to interact with the client

As with many other protocols, the main component is the session. Each function that we add to the client will interact with the session to achieve its purpose.

class MCPOAuthClient:
 
    def __init__(self, server_url: str):
        self.server_url = server_url
        self.session: ClientSession | None = None

The first step is to initialize the session, connecting to the provided MCP Server URL. The function may seem complex, but it simply initializes the client components:

  • The callback server (on port 3030)
  • The callback handler
  • The OAuth client provider
  • The Streamable HTTP client

The function also sets the server_url attribute, which will be used to connect to the MCP server.

async def connect(self):
    print(f"Attempting to connect to {self.server_url}")
 
    try:
        callback_server = CallbackServer(port=3030)
        callback_server.start()
 
        async def callback_handler() -> tuple[str, str | None]:
            print("Waiting for authorization callback...")
            try:
                auth_code = callback_server.wait_for_callback(timeout=300)
                return auth_code, callback_server.get_state()
            finally:
                callback_server.stop()
 
        client_metadata = {
            "client_name": "MCP OAuth client",
            "redirect_uris": ["http://localhost:3030/callback"],
            "grant_types": ["authorization_code", "refresh_token"],
            "response_types": ["code"],
            "token_ednpoint_auth_method": "client_secret_post",
        }
 
        async def _default_redirect_handler(authorization_url: str) -> None:
            print(f"Opening browser for authorization: {authorization_url}")
            webbrowser.open(authorization_url)
 
        oauth_auth = OAuthClientProvider(
            server_url=self.server_url.replace("/mcp", ""),
            client_metadata=OAuthClientMetadata.model_validate(client_metadata),
            storage=RAMTokenStorage(),
            redirect_handler=_default_redirect_handler,
            callback_handler=callback_handler
        )
 
        print("Opening StreamableHTTP transport connection with auth...")
        async with streamablehttp_client(
            url=self.server_url,
            auth=oauth_auth,
            timeout=timedelta(seconds=60),
        ) as (read_stream, write_stream, get_session_id):
            await self._run_session(read_stream,
                                    write_stream,
                                    get_session_id)
    except Exception as e:
        print(f"Failed to connect: {e}")
        import traceback
        traceback.print_exc()

The streamablehttp_client function creates two streams (read and write) and a function to get the session ID. These 3 components are the actual transport we use to communicate with the MCP server. The _run_session function then uses these components to initialize the session (enabling our client to interact with the MCP server), and then starts the interactive loop, which runs indefinitely in our client.

async def _run_session(self, read_stream, write_stream, get_session_id):
    print("Initializing MCP session")
    async with ClientSession(read_stream, write_stream) as session:
        self.session = session
        print("Starting session initialization...")
        await session.initialize()
        print("Completed session initialization")
 
        print(f"Conntected to MCP server at {self.server_url}")
        session_id = get_session_id()
        if session_id:
            print(f"Session ID: {session_id}")
 
        await self.interactive_loop()

The interactive loop is the “app” portion of this client. In more sophisticated clients, you would normally interact with convenience functions (which we’ll implement in a second!). But this simple CLI is illustrative of how you would use the client to interact with the MCP server.

If you’ve written other CLI’s, you will be familiar with this pattern. An infinite loop that waits for user input, parses and validates it, and then calls the appropriate function. In this case, we have 3 commands:

  • list - List available tools
  • call <tool_name> [args] - Call a tool
  • quit - Exit the client, which will also close the session and the transport as the objects are cleaned up
async def interactive_loop(self):
    """Run interactive command loop."""
    print("\n🎯 Interactive MCP Client")
    print("Commands:")
    print("  list - List available tools")
    print("  call <tool_name> [args] - Call a tool")
    print("  quit - Exit the client")
    print()
 
    while True:
        try:
            command = input("mcp> ").strip()
            if not command:
                continue
            if command == "quit":
                break
            elif command == "list":
                await self.list_tools()
            elif command.startswith("call "):
                parts = command.split(maxsplit=2)
                tool_name = parts[1] if len(parts) > 1 else ""
                if not tool_name:
                    print("❌ Please specify a tool name")
                    continue
                # Parse arguments (simple JSON-like format)
                arguments = {}
                if len(parts) > 2:
                    import json
                    try:
                        arguments = json.loads(parts[2])
                    except json.JSONDecodeError:
                        print("❌ Invalid arguments format (expected JSON)")
                        continue
                await self.call_tool(tool_name, arguments)
            else:
                print("❌ Unknown command. Try 'list', 'call <tool_name>', or 'quit'")
        except KeyboardInterrupt:
            print("\n\n👋 Goodbye!")
            break
        except EOFError:
            break

What remains to implement inside the MCPOAuthClient class are the functions to list available tools and call tools.

Both follow a very simple pattern, the session object already provides these functions, and we are simply adding a small amount of error handling, and printing the results to the terminal. On a real app, you would typically return these results to the caller, and perhaps inject it into an LLM for another turn.

async def list_tools(self):
    """List available tools from the server."""
    if not self.session:
        print("❌ Not connected to server")
        return
 
    try:
        result = await self.session.list_tools()
        if hasattr(result, "tools") and result.tools:
            print("\n📋 Available tools:")
            for i, tool in enumerate(result.tools, 1):
                print(f"{i}. {tool.name}")
                if tool.description:
                    print(f"   Description: {tool.description}")
                if tool.inputSchema:
                    print(f"   Input Schema: {tool.inputSchema}")
                print()
        else:
            print("No tools available")
    except Exception as e:
        print(f"❌ Failed to list tools: {e}")
 
async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None = None):
    """Call a specific tool."""
    if not self.session:
        print("❌ Not connected to server")
        return
 
    try:
        result = await self.session.call_tool(tool_name, arguments or {})
        print(f"\n🔧 Tool '{tool_name}' result:")
        if hasattr(result, "content"):
            for content in result.content:
                if content.type == "text":
                    print(content.text)
                else:
                    print(content)
        else:
            print(result)
    except Exception as e:
        print(f"❌ Failed to call tool '{tool_name}': {e}")

The main function

With the above, we have a working MCP client!. Let’s add a main function to run it. We encourage you to configure your own MCP server URL, and we’ve included the Arcade public MCP server as an example.

async def main():
    print("Hello from mcp-client!")
 
    server_url = "https://api.arcade.dev/v1/mcps/arcade-anon/mcp"
    client = MCPOAuthClient(server_url)
    await client.connect()
 
if __name__ == "__main__":
    asyncio.run(main())