Back to Repositories

Testing gRPC-Web Proxy Server Communication in Alibaba Arthas

This test suite validates the gRPC Web Proxy server functionality in Alibaba Arthas, focusing on request/response handling and stream processing. It implements comprehensive testing of gRPC-Web protocol communication, including payload encoding/decoding and HTTP header management.

Test Coverage Overview

The test suite provides thorough coverage of gRPC-Web proxy server operations.

Key areas tested include:
  • Simple request-response communication
  • Server streaming functionality
  • Base64 encoding/decoding of payloads
  • HTTP header management
  • Error handling and connection management

Implementation Analysis

The testing approach utilizes JUnit for structured test execution with dynamic port allocation for both gRPC and proxy servers. The implementation employs HTTP client interactions to simulate gRPC-Web requests, with careful attention to protocol-specific encoding and message formatting.

Notable patterns include:
  • Thread-based server initialization
  • Byte array manipulation for message framing
  • Stream processing for server-side streaming

Technical Details

Testing infrastructure includes:
  • JUnit test framework
  • Apache HTTP Components for client requests
  • Custom byte array utilities for message framing
  • Base64 encoding/decoding utilities
  • Dynamic port allocation via SocketUtils
  • gRPC protocol buffers for message handling

Best Practices Demonstrated

The test suite exemplifies robust testing practices through proper test isolation and comprehensive validation.

Key practices include:
  • Proper resource cleanup and connection handling
  • Separation of concerns between setup and test execution
  • Thorough validation of both simple and streaming scenarios
  • Clear error handling and logging

alibaba/arthas

labs/arthas-grpc-web-proxy/src/test/java/com/taobao/arthas/grpcweb/proxy/server/GrpcWebProxyServerTest.java

            
package com.taobao.arthas.grpcweb.proxy.server;

import grpc.gateway.testing.Echo;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;
import com.taobao.arthas.common.SocketUtils;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Base64;


public class GrpcWebProxyServerTest {

    private int GRPC_WEB_PROXY_PORT;
    private int GRPC_PORT;
    private String hostName;
    private CloseableHttpClient httpClient;
    @Before
    public void startServer(){
        GRPC_WEB_PROXY_PORT = SocketUtils.findAvailableTcpPort();
        GRPC_PORT = SocketUtils.findAvailableTcpPort();
        // 启动grpc服务
        Thread grpcStart = new Thread(() -> {
            StartGrpcTest startGrpcTest = new StartGrpcTest(GRPC_PORT);
            startGrpcTest.startGrpcService();
        });
        grpcStart.start();
        // 启动grpc-web-proxy服务
        Thread grpcWebProxyStart = new Thread(() -> {
            StartGrpcWebProxyTest startGrpcWebProxyTest = new StartGrpcWebProxyTest(GRPC_WEB_PROXY_PORT,GRPC_PORT);
            startGrpcWebProxyTest.startGrpcWebProxy();
        });
        grpcWebProxyStart.start();
        try {
            // waiting for the server to start
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        hostName = "http://127.0.0.1:" + GRPC_WEB_PROXY_PORT;
        httpClient = HttpClients.createDefault();
    }

    @Test
    public void simpleReqTest()  {
        // 单个response
        String url = hostName +"/grpc.gateway.testing.EchoService/Echo";

        String requestStr = "hello world!!!";
        Echo.EchoRequest request = Echo.EchoRequest.newBuilder().setMessage(requestStr).build();
        System.out.println("request message--->" + requestStr);
        byte[] requestData = request.toByteArray();
        requestData = ByteArrayWithLengthExample(requestData);
        // 编码请求载荷为gRPC-Web格式
        String encodedPayload = Base64.getEncoder().encodeToString(requestData);
        try {
            String result = "";
            String encoding = "utf-8";
            HttpPost httpPost = getPost(url, encodedPayload, encoding);
            //发送请求,并拿到结果(同步阻塞)
            CloseableHttpResponse response = httpClient.execute(httpPost);
            //获取返回结果
            HttpEntity entity = response.getEntity();
            if (entity != null) {
                //按指定编码转换结果实体为String类型
                result = EntityUtils.toString(entity, encoding);
            }
            EntityUtils.consume(entity);
            //释放Http请求链接
            response.close();

            System.out.println("result-->" + result);
            System.out.println("after decode...");
            // gAAAAA9ncnBjLXN0YXR1czowDQo= 是结尾字符
            int endStartIndex = result.indexOf("gAAAAA");
            String data = result.substring(0,endStartIndex);
            String end = result.substring(endStartIndex,result.length());
            byte[] decodedData = Base64.getDecoder().decode(data);
            byte[] decodedEnd = Base64.getDecoder().decode(end);
            // 去掉前5个byte
            decodedData = RemoveBytesExample(decodedData);
            decodedEnd = RemoveBytesExample(decodedEnd);
            Echo.EchoResponse echoResponse = Echo.EchoResponse.parseFrom(decodedData);
            System.out.println("response message--->" + echoResponse.getMessage());
            String endStr = new String(decodedEnd);
            System.out.println(endStr);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    @Test
    public void streamReqTest() {
        // stream response
        String url = hostName + "/grpc.gateway.testing.EchoService/ServerStreamingEcho";
        String requestStr = "hello world!!!";
        Echo.ServerStreamingEchoRequest request = Echo.ServerStreamingEchoRequest.newBuilder().setMessage(requestStr)
                .setMessageCount(5)
                .build();
        byte[] requestData = request.toByteArray();
        requestData = ByteArrayWithLengthExample(requestData);
        // 编码请求载荷为gRPC-Web格式
        String encodedPayload = Base64.getEncoder().encodeToString(requestData);
        try {
            String encoding = "utf-8";
            HttpPost httpPost = getPost(url, encodedPayload, encoding);
            //发送请求
            CloseableHttpResponse response = httpClient.execute(httpPost);
            //获取返回结果
            HttpEntity entity = response.getEntity();
            if (entity != null) {
                try (InputStream inputStream = entity.getContent()) {
                    // 在这里使用 inputStream 流式处理响应内容
                    // 例如,逐行读取响应内容
                    byte[] buffer = new byte[1024];
                    int bytesRead;
                    while ((bytesRead = inputStream.read(buffer)) != -1) {
                        // 处理读取的数据
                        String result = new String(buffer, 0, bytesRead);
                        System.out.println("result-->" + result);
                        System.out.println("after decode...");
                        // gAAAAA9ncnBjLXN0YXR1czowDQo= 是结尾字符

                        byte[] decodedData = Base64.getDecoder().decode(result);
                        // 去掉前5个byte
                        decodedData = RemoveBytesExample(decodedData);
                        if(result.startsWith("gAAAAA")){
                            String end = new String(decodedData);
                            System.out.println(end);
                        }else {
                            Echo.ServerStreamingEchoResponse echoResponse = Echo.ServerStreamingEchoResponse.parseFrom(decodedData);
                            System.out.println("response message--->" + echoResponse.getMessage());
                        }
                    }
                }
            }
            EntityUtils.consume(entity);
            //释放Http请求链接
            response.close();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public HttpPost getPost(String url, String param, String encoding) throws IOException {
        System.out.println("request param(encode)--->" + param);
        //创建post方式请求对象
        HttpPost httpPost = new HttpPost (url);
        //设置请求参数实体
        StringEntity reqParam = new StringEntity(param,encoding);
        reqParam.setContentEncoding(new BasicHeader(HTTP.CONTENT_TYPE, "application/grpc-web-text"));
//        将请求参数放到请求对象中
        httpPost.setEntity(reqParam);
        //设置请求报文头信息
        httpPost.setHeader("Connection","keep-alive");
        httpPost.setHeader("Accept", "application/grpc-web-text");
        httpPost.setHeader("Content-type", "application/grpc-web-text");//设置发送表单请求
        httpPost.setHeader("X-Grpc-Web","1");
        httpPost.setHeader("X-User-Agent", "grpc-web-javascript/0.1");
        httpPost.setHeader("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36");
        return httpPost;
    }

    public byte[] ByteArrayWithLengthExample(byte[] data){
        // 添加长度信息,用于编码过程
        int length = data.length;
        byte[] newData = {0,0,0,0,(byte) length};
        byte[] combineArray = new byte[newData.length + data.length];
        System.arraycopy(newData, 0, combineArray, 0, newData.length);
        System.arraycopy(data, 0, combineArray, newData.length, data.length);
        return combineArray;
    }

    public byte[] RemoveBytesExample(byte[] data){
        // 去掉长度信息,用于解码过程
        byte[] result = Arrays.copyOfRange(data, 5, data.length);
        return result;
    }

}