Skip to content

Commit 541200f

Browse files
authored
Merge pull request #335 from atesgoral/ag/use-sdk-streamable-http-client-example
Update Streamable HTTP client example to use MCP::Client
2 parents 0c46dcd + 933cf94 commit 541200f

1 file changed

Lines changed: 130 additions & 158 deletions

File tree

examples/streamable_http_client.rb

Lines changed: 130 additions & 158 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,25 @@
11
# frozen_string_literal: true
22

3+
$LOAD_PATH.unshift(File.expand_path("../lib", __dir__))
4+
require "mcp"
35
require "net/http"
46
require "uri"
57
require "json"
68
require "logger"
9+
require "event_stream_parser"
710

8-
# Logger for client operations
9-
logger = Logger.new($stdout)
10-
logger.formatter = proc do |severity, datetime, _progname, msg|
11-
"[CLIENT] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n"
12-
end
13-
14-
# Server configuration
15-
SERVER_URL = "http://localhost:9393/mcp"
16-
PROTOCOL_VERSION = "2024-11-05"
11+
SERVER_URL = "http://localhost:9393"
1712

18-
# Helper method to make JSON-RPC requests
19-
def make_request(session_id, method, params = {}, id = nil)
20-
uri = URI(SERVER_URL)
21-
http = Net::HTTP.new(uri.host, uri.port)
22-
23-
request = Net::HTTP::Post.new(uri)
24-
request["Content-Type"] = "application/json"
25-
request["Mcp-Session-Id"] = session_id if session_id
26-
27-
body = {
28-
jsonrpc: "2.0",
29-
method: method,
30-
params: params,
31-
id: id || SecureRandom.uuid,
32-
}
33-
34-
request.body = body.to_json
35-
response = http.request(request)
36-
37-
{
38-
status: response.code,
39-
headers: response.to_hash,
40-
body: JSON.parse(response.body),
41-
}
42-
rescue => e
43-
{ error: e.message }
13+
def create_logger
14+
logger = Logger.new($stdout)
15+
logger.formatter = proc do |severity, datetime, _progname, msg|
16+
"[CLIENT] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n"
17+
end
18+
logger
4419
end
4520

46-
# Connect to SSE stream
21+
# The SDK does not yet implement the optional GET SSE stream, so this example
22+
# uses MCP::Client for JSON-RPC requests and raw Net::HTTP for the event stream.
4723
def connect_sse(session_id, logger)
4824
uri = URI(SERVER_URL)
4925

@@ -59,17 +35,13 @@ def connect_sse(session_id, logger)
5935
if response.code == "200"
6036
logger.info("SSE stream connected successfully")
6137

38+
parser = EventStreamParser::Parser.new
6239
response.read_body do |chunk|
63-
chunk.split("\n").each do |line|
64-
if line.start_with?("data: ")
65-
data = line[6..-1]
66-
begin
67-
logger.info("SSE data: #{data}")
68-
rescue JSON::ParserError
69-
logger.debug("Non-JSON SSE data: #{data}")
70-
end
71-
elsif line.start_with?(": ")
72-
logger.debug("SSE keepalive received: #{line}")
40+
parser.feed(chunk) do |type, data, _id|
41+
if type.empty?
42+
logger.info("SSE event: #{data}")
43+
else
44+
logger.info("SSE event (#{type}): #{data}")
7345
end
7446
end
7547
end
@@ -79,129 +51,129 @@ def connect_sse(session_id, logger)
7951
end
8052
end
8153
rescue Interrupt
82-
logger.info("SSE connection interrupted by user")
54+
logger.info("SSE connection interrupted")
8355
rescue => e
8456
logger.error("SSE connection error: #{e.message}")
8557
end
8658

87-
# Main client flow
88-
def main
89-
logger = Logger.new($stdout)
90-
logger.formatter = proc do |severity, datetime, _progname, msg|
91-
"[CLIENT] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n"
92-
end
93-
94-
puts "=== MCP SSE Test Client ==="
95-
96-
# Step 1: Initialize session
97-
logger.info("Initializing session...")
98-
99-
init_response = make_request(
100-
nil,
101-
"initialize",
102-
{
103-
protocolVersion: PROTOCOL_VERSION,
104-
capabilities: {},
105-
clientInfo: {
106-
name: "sse-test-client",
107-
version: "1.0",
108-
},
109-
},
110-
"init-1",
111-
)
112-
113-
if init_response[:error]
114-
logger.error("Failed to initialize: #{init_response[:error]}")
115-
exit(1)
116-
end
117-
118-
session_id = init_response[:headers]["mcp-session-id"]&.first
119-
120-
if session_id.nil?
121-
logger.error("No session ID received")
122-
exit(1)
123-
end
124-
125-
if init_response[:body].dig("result", "capabilities", "logging")
126-
make_request(session_id, "logging/setLevel", { level: "info" })
59+
def print_response(response)
60+
if response.nil?
61+
puts "Response accepted; watch the SSE stream for the server response."
62+
else
63+
puts "Response: #{JSON.pretty_generate(response)}"
12764
end
65+
end
12866

129-
logger.info("Session initialized: #{session_id}")
130-
logger.info("Server info: #{init_response[:body]["result"]["serverInfo"]}")
131-
132-
# Step 2: Start SSE connection in a separate thread
133-
sse_thread = Thread.new { connect_sse(session_id, logger) }
134-
135-
# Give SSE time to connect
136-
sleep(1)
137-
138-
# Step 3: Interactive menu
139-
loop do
140-
puts <<~MESSAGE.chomp
141-
142-
=== Available Actions ===
143-
1. Send custom notification
144-
2. Test echo
145-
3. List tools
146-
0. Exit
147-
148-
Choose an action:#{" "}
67+
def main
68+
logger = create_logger
69+
70+
puts <<~MESSAGE
71+
MCP Streamable HTTP Client
72+
Make sure the server is running (ruby examples/streamable_http_server.rb)
73+
#{"=" * 60}
74+
MESSAGE
75+
76+
http_transport = MCP::Client::HTTP.new(url: SERVER_URL)
77+
client = MCP::Client.new(transport: http_transport)
78+
sse_thread = nil
79+
80+
begin
81+
puts "=== Initializing session ==="
82+
server_info = client.connect(
83+
client_info: { name: "streamable-http-client", version: "1.0" },
84+
)
85+
86+
puts <<~MESSAGE
87+
ID: #{http_transport.session_id}
88+
Version: #{http_transport.protocol_version}
89+
Server: #{server_info["serverInfo"]}
14990
MESSAGE
15091

151-
choice = gets.chomp
152-
153-
case choice
154-
when "1"
155-
print("Enter notification message: ")
156-
message = gets.chomp
157-
print("Enter delay in seconds (0 for immediate): ")
158-
delay = gets.chomp.to_f
159-
160-
response = make_request(
161-
session_id,
162-
"tools/call",
163-
{
164-
name: "notification_tool",
165-
arguments: {
166-
message: message,
167-
delay: delay,
168-
},
169-
},
170-
)
171-
if response[:body]["accepted"]
172-
logger.info("Notification sent successfully")
92+
unless http_transport.session_id
93+
logger.error("No session ID received; this example requires a stateful Streamable HTTP session.")
94+
return
95+
end
96+
97+
puts "=== Listing tools ==="
98+
tools = client.tools
99+
tools.each { |tool| puts " - #{tool.name}: #{tool.description}" }
100+
101+
echo_tool = tools.find { |tool| tool.name == "echo" }
102+
notification_tool = tools.find { |tool| tool.name == "notification_tool" }
103+
104+
sse_thread = Thread.new { connect_sse(http_transport.session_id, logger) }
105+
sleep(1)
106+
107+
# Once the optional SSE stream is active, POST requests may receive only a
108+
# 202 ACK while the actual JSON-RPC response is delivered over SSE.
109+
loop do
110+
puts <<~MENU.chomp
111+
112+
=== Available Actions ===
113+
1. Send notification (triggers SSE event)
114+
2. Echo message
115+
3. Show cached tools
116+
0. Exit
117+
118+
Choose an action:#{" "}
119+
MENU
120+
121+
case gets.chomp
122+
when "1"
123+
if notification_tool
124+
print("Enter notification message: ")
125+
message = gets.chomp
126+
print("Enter delay in seconds (0 for immediate): ")
127+
delay = gets.chomp.to_f
128+
129+
puts "=== Calling tool: notification_tool ==="
130+
response = client.call_tool(
131+
tool: notification_tool,
132+
arguments: { message: message, delay: delay },
133+
)
134+
print_response(response)
135+
else
136+
puts "notification_tool not available"
137+
end
138+
when "2"
139+
if echo_tool
140+
print("Enter message to echo: ")
141+
message = gets.chomp
142+
143+
puts "=== Calling tool: echo ==="
144+
response = client.call_tool(tool: echo_tool, arguments: { message: message })
145+
print_response(response)
146+
else
147+
puts "echo tool not available"
148+
end
149+
when "3"
150+
puts "=== Cached tools ==="
151+
tools.each { |tool| puts " - #{tool.name}: #{tool.description}" }
152+
when "0"
153+
logger.info("Exiting...")
154+
break
173155
else
174-
logger.error("Error: #{response[:body]["error"]}")
156+
puts "Invalid choice"
175157
end
176-
when "2"
177-
print("Enter message to echo: ")
178-
message = gets.chomp
179-
make_request(session_id, "tools/call", { name: "echo", arguments: { message: message } })
180-
when "3"
181-
make_request(session_id, "tools/list")
182-
when "0"
183-
logger.info("Exiting...")
184-
break
185-
else
186-
puts "Invalid choice"
158+
end
159+
rescue MCP::Client::SessionExpiredError => e
160+
logger.error("Session expired: #{e.message}")
161+
rescue MCP::Client::RequestHandlerError => e
162+
logger.error("Request error: #{e.message}")
163+
rescue Interrupt
164+
logger.info("Client interrupted")
165+
rescue => e
166+
logger.error("Error: #{e.message}")
167+
logger.error(e.backtrace.first(5).join("\n"))
168+
ensure
169+
sse_thread.kill if sse_thread&.alive?
170+
171+
if http_transport.connected?
172+
puts "=== Closing session ==="
173+
http_transport.close
174+
puts "Session closed"
187175
end
188176
end
189-
190-
# Clean up
191-
sse_thread.kill if sse_thread.alive?
192-
193-
# Close session
194-
logger.info("Closing session...")
195-
make_request(session_id, "close")
196-
logger.info("Session closed")
197-
rescue Interrupt
198-
logger.info("Client interrupted by user")
199-
rescue => e
200-
logger.error("Client error: #{e.message}")
201-
logger.error(e.backtrace.join("\n"))
202177
end
203178

204-
# Run the client
205-
if __FILE__ == $PROGRAM_NAME
206-
main
207-
end
179+
main if __FILE__ == $PROGRAM_NAME

0 commit comments

Comments
 (0)