Back to Repositories

Testing Backend Server Components and Block Execution in AutoGPT

This test suite implements comprehensive testing infrastructure for AutoGPT’s backend platform, focusing on server initialization, execution management, and block testing functionality. It provides essential testing utilities for validating the core components of the AutoGPT system.

Test Coverage Overview

The test suite provides extensive coverage of AutoGPT’s backend components:
  • Server initialization and dependency management
  • Database connections and user creation
  • Execution status monitoring and result validation
  • Block testing with input/output verification
  • Mock object handling and credentials management

Implementation Analysis

The testing approach utilizes async/await patterns for server operations and execution monitoring. Implementation leverages context managers for resource handling and includes dependency injection for test isolation. The framework implements custom test utilities for block execution validation and status checking.

Technical Details

Key technical components include:
  • AsyncIO for asynchronous testing
  • Custom context managers for resource lifecycle
  • Logging infrastructure for test tracking
  • Type hints and runtime type checking
  • Modular test execution framework

Best Practices Demonstrated

The test implementation showcases several testing best practices:
  • Proper resource cleanup using context managers
  • Comprehensive error handling and validation
  • Flexible mock object integration
  • Structured logging for debugging
  • Timeout handling for async operations

significant-gravitas/autogpt

autogpt_platform/backend/backend/util/test.py

            
import logging
import time
from typing import Sequence

from backend.data import db
from backend.data.block import Block, initialize_blocks
from backend.data.execution import ExecutionResult, ExecutionStatus
from backend.data.model import CREDENTIALS_FIELD_NAME
from backend.data.user import create_default_user
from backend.executor import DatabaseManager, ExecutionManager, ExecutionScheduler
from backend.server.rest_api import AgentServer
from backend.server.utils import get_user_id

log = logging.getLogger(__name__)


class SpinTestServer:
    def __init__(self):
        self.db_api = DatabaseManager()
        self.exec_manager = ExecutionManager()
        self.agent_server = AgentServer()
        self.scheduler = ExecutionScheduler()

    @staticmethod
    def test_get_user_id():
        return "3e53486c-cf57-477e-ba2a-cb02dc828e1a"

    async def __aenter__(self):
        self.setup_dependency_overrides()
        self.db_api.__enter__()
        self.agent_server.__enter__()
        self.exec_manager.__enter__()
        self.scheduler.__enter__()

        await db.connect()
        await initialize_blocks()
        await create_default_user()

        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await db.disconnect()

        self.scheduler.__exit__(exc_type, exc_val, exc_tb)
        self.exec_manager.__exit__(exc_type, exc_val, exc_tb)
        self.agent_server.__exit__(exc_type, exc_val, exc_tb)
        self.db_api.__exit__(exc_type, exc_val, exc_tb)

    def setup_dependency_overrides(self):
        # Override get_user_id for testing
        self.agent_server.set_test_dependency_overrides(
            {get_user_id: self.test_get_user_id}
        )


async def wait_execution(
    user_id: str,
    graph_id: str,
    graph_exec_id: str,
    timeout: int = 20,
) -> Sequence[ExecutionResult]:
    async def is_execution_completed():
        status = await AgentServer().test_get_graph_run_status(
            graph_id, graph_exec_id, user_id
        )
        log.info(f"Execution status: {status}")
        if status == ExecutionStatus.FAILED:
            log.info("Execution failed")
            raise Exception("Execution failed")
        return status == ExecutionStatus.COMPLETED

    # Wait for the executions to complete
    for i in range(timeout):
        if await is_execution_completed():
            return await AgentServer().test_get_graph_run_node_execution_results(
                graph_id, graph_exec_id, user_id
            )
        time.sleep(1)

    assert False, "Execution did not complete in time."


def execute_block_test(block: Block):
    prefix = f"[Test-{block.name}]"

    if not block.test_input or not block.test_output:
        log.info(f"{prefix} No test data provided")
        return
    if not isinstance(block.test_input, list):
        block.test_input = [block.test_input]
    if not isinstance(block.test_output, list):
        block.test_output = [block.test_output]

    output_index = 0
    log.info(f"{prefix} Executing {len(block.test_input)} tests...")
    prefix = " " * 4 + prefix

    for mock_name, mock_obj in (block.test_mock or {}).items():
        log.info(f"{prefix} mocking {mock_name}...")
        if hasattr(block, mock_name):
            setattr(block, mock_name, mock_obj)
        else:
            log.info(f"{prefix} mock {mock_name} not found in block")

    extra_exec_kwargs = {}

    if CREDENTIALS_FIELD_NAME in block.input_schema.model_fields:
        if not block.test_credentials:
            raise ValueError(
                f"{prefix} requires credentials but has no test_credentials"
            )
        extra_exec_kwargs[CREDENTIALS_FIELD_NAME] = block.test_credentials

    for input_data in block.test_input:
        log.info(f"{prefix} in: {input_data}")

        for output_name, output_data in block.execute(input_data, **extra_exec_kwargs):
            if output_index >= len(block.test_output):
                raise ValueError(f"{prefix} produced output more than expected")
            ex_output_name, ex_output_data = block.test_output[output_index]

            def compare(data, expected_data):
                if data == expected_data:
                    is_matching = True
                elif isinstance(expected_data, type):
                    is_matching = isinstance(data, expected_data)
                elif callable(expected_data):
                    is_matching = expected_data(data)
                else:
                    is_matching = False

                mark = "✅" if is_matching else "❌"
                log.info(f"{prefix} {mark} comparing `{data}` vs `{expected_data}`")
                if not is_matching:
                    raise ValueError(
                        f"{prefix}: wrong output {data} vs {expected_data}"
                    )

            compare(output_data, ex_output_data)
            compare(output_name, ex_output_name)
            output_index += 1

    if output_index < len(block.test_output):
        raise ValueError(
            f"{prefix} produced output less than expected. output_index={output_index}, len(block.test_output)={len(block.test_output)}"
        )