Back to Repositories

Testing Stream Communication Handler in Gradio Client

This test suite validates the streaming functionality in the Gradio client, focusing on the open_stream method and SSE (Server-Sent Events) endpoint interactions. The tests ensure proper stream initialization, error handling, and message processing for real-time communication.

Test Coverage Overview

The test suite provides comprehensive coverage of the Client’s streaming capabilities.

Key areas tested include:
  • Stream initialization and configuration validation
  • SSE endpoint connection verification
  • Message handling and event callbacks
  • Error scenarios and stream closure conditions
Integration points focus on the interaction between the Client class and the mock server implementation.

Implementation Analysis

The testing approach utilizes Vitest’s mocking capabilities to simulate stream behavior and validate event handling. The implementation employs the readable_stream utility and leverages Jest-style assertions within a TypeScript environment.

Key patterns include:
  • Mock function injection for stream creation
  • URL validation for endpoint connections
  • Event callback testing for message and error handling
  • Stream state management verification

Technical Details

Testing tools and setup:
  • Vitest for test runner and mocking
  • TypeScript for type safety
  • Mock server implementation for endpoint simulation
  • BeforeAll/AfterAll hooks for server lifecycle management
  • BeforeEach/AfterEach for test isolation

Best Practices Demonstrated

The test suite exemplifies high-quality testing practices through thorough setup and teardown procedures.

Notable practices include:
  • Proper test isolation using beforeEach and afterEach hooks
  • Comprehensive error case coverage
  • Mock cleanup between tests
  • Type-safe testing with TypeScript
  • Clear test case organization and descriptive naming

gradio-app/gradio

client/js/src/test/stream.test.ts

            
import { vi, type Mock } from "vitest";
import { Client } from "../client";
import { readable_stream } from "../utils/stream";
import { initialise_server } from "./server";
import { direct_space_url } from "./handlers.ts";

import {
	describe,
	it,
	expect,
	afterEach,
	beforeAll,
	afterAll,
	beforeEach
} from "vitest";

const server = initialise_server();

beforeAll(() => server.listen());
afterEach(() => server.resetHandlers());
afterAll(() => server.close());

describe("open_stream", () => {
	let app: Client;

	beforeEach(async () => {
		app = await Client.connect("hmb/hello_world");
		app.stream = vi.fn().mockImplementation(() => {
			app.stream_instance = readable_stream(
				new URL(`${direct_space_url}/queue/data`)
			);
			return app.stream_instance;
		});
	});

	afterEach(() => {
		vi.clearAllMocks();
	});

	it("should throw an error if config is not defined", () => {
		app.config = undefined;

		expect(async () => {
			await app.open_stream();
		}).rejects.toThrow("Could not resolve app config");
	});

	it("should connect to the SSE endpoint and handle messages", async () => {
		await app.open_stream();

		const eventsource_mock_call = (app.stream as Mock).mock.calls[0][0];

		expect(eventsource_mock_call.href).toMatch(
			/https:\/\/hmb-hello-world\.hf\.space\/queue\/data\?session_hash/
		);

		expect(app.stream).toHaveBeenCalledWith(eventsource_mock_call);

		if (!app.stream_instance?.onmessage || !app.stream_instance?.onerror) {
			throw new Error("stream instance is not defined");
		}

		const onMessageCallback = app.stream_instance.onmessage.bind(app);
		const onErrorCallback = app.stream_instance.onerror.bind(app);

		const message = { msg: "hello jerry" };

		onMessageCallback({ data: JSON.stringify(message) });
		expect(app.stream_status.open).toBe(true);

		expect(app.event_callbacks).toEqual({});
		expect(app.pending_stream_messages).toEqual({});

		const close_stream_message = { msg: "close_stream" };
		onMessageCallback({ data: JSON.stringify(close_stream_message) });
		expect(app.stream_status.open).toBe(false);

		onErrorCallback({ data: JSON.stringify("404") });
		expect(app.stream_status.open).toBe(false);
	});
});