Testing Monitor Integration Implementation in DeepSpeed
This test suite validates monitoring integrations in DeepSpeed, including TensorBoard, Weights & Biases (WandB), CSV logging, and Comet ML. It ensures proper configuration, initialization and functionality of various monitoring tools essential for tracking ML model training.
Test Coverage Overview
Implementation Analysis
Technical Details
Best Practices Demonstrated
microsoft/deepspeed
tests/unit/monitor/test_monitor.py
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
from deepspeed.monitor.tensorboard import TensorBoardMonitor
from deepspeed.monitor.wandb import WandbMonitor
from deepspeed.monitor.csv_monitor import csvMonitor
from deepspeed.monitor.config import DeepSpeedMonitorConfig
from deepspeed.monitor.comet import CometMonitor
from unit.common import DistributedTest
from unittest.mock import Mock, patch
from deepspeed.runtime.config import DeepSpeedConfig
import deepspeed.comm as dist
class TestTensorBoard(DistributedTest):
world_size = 2
def test_tensorboard(self):
config_dict = {
"train_batch_size": 2,
"tensorboard": {
"enabled": True,
"output_path": "test_output/ds_logs/",
"job_name": "test"
}
}
ds_config = DeepSpeedConfig(config_dict)
tb_monitor = TensorBoardMonitor(ds_config.monitor_config.tensorboard)
assert tb_monitor.enabled == True
assert tb_monitor.output_path == "test_output/ds_logs/"
assert tb_monitor.job_name == "test"
def test_empty_tensorboard(self):
config_dict = {"train_batch_size": 2, "tensorboard": {}}
ds_config = DeepSpeedConfig(config_dict)
tb_monitor = TensorBoardMonitor(ds_config.monitor_config.tensorboard)
defaults = DeepSpeedMonitorConfig().tensorboard
assert tb_monitor.enabled == defaults.enabled
assert tb_monitor.output_path == defaults.output_path
assert tb_monitor.job_name == defaults.job_name
class TestWandB(DistributedTest):
world_size = 2
def test_wandb(self):
config_dict = {
"train_batch_size": 2,
"wandb": {
"enabled": False,
"group": "my_group",
"team": "my_team",
"project": "my_project"
}
}
ds_config = DeepSpeedConfig(config_dict)
wandb_monitor = WandbMonitor(ds_config.monitor_config.wandb)
assert wandb_monitor.enabled == False
assert wandb_monitor.group == "my_group"
assert wandb_monitor.team == "my_team"
assert wandb_monitor.project == "my_project"
def test_empty_wandb(self):
config_dict = {"train_batch_size": 2, "wandb": {}}
ds_config = DeepSpeedConfig(config_dict)
wandb_monitor = WandbMonitor(ds_config.monitor_config.wandb)
defaults = DeepSpeedMonitorConfig().wandb
assert wandb_monitor.enabled == defaults.enabled
assert wandb_monitor.group == defaults.group
assert wandb_monitor.team == defaults.team
assert wandb_monitor.project == defaults.project
class TestCSVMonitor(DistributedTest):
world_size = 2
def test_csv_monitor(self):
config_dict = {
"train_batch_size": 2,
"csv_monitor": {
"enabled": True,
"output_path": "test_output/ds_logs/",
"job_name": "test"
}
}
ds_config = DeepSpeedConfig(config_dict)
csv_monitor = csvMonitor(ds_config.monitor_config.csv_monitor)
assert csv_monitor.enabled == True
assert csv_monitor.output_path == "test_output/ds_logs/"
assert csv_monitor.job_name == "test"
def test_empty_csv_monitor(self):
config_dict = {"train_batch_size": 2, "csv_monitor": {}}
ds_config = DeepSpeedConfig(config_dict)
csv_monitor = csvMonitor(ds_config.monitor_config.csv_monitor)
defaults = DeepSpeedMonitorConfig().csv_monitor
assert csv_monitor.enabled == defaults.enabled
assert csv_monitor.output_path == defaults.output_path
assert csv_monitor.job_name == defaults.job_name
class TestCometMonitor(DistributedTest):
world_size = 2
def test_comet_monitor(self):
import comet_ml
mock_experiment = Mock()
mock_start = Mock(return_value=mock_experiment)
config_dict = {
"train_batch_size": 2,
"comet": {
"enabled": True,
"samples_log_interval": 42,
"workspace": "some-workspace",
"project": "some-project",
"api_key": "some-api-key",
"experiment_name": "some-experiment-name",
"experiment_key": "some-experiment-key",
"mode": "get_or_create",
"online": True
}
}
ds_config = DeepSpeedConfig(config_dict)
with patch.object(comet_ml, "start", mock_start):
comet_monitor = CometMonitor(ds_config.monitor_config.comet)
assert comet_monitor.enabled is True
assert comet_monitor.samples_log_interval == 42
# experiment should be initialized via comet_ml.start only if rank == 0
if dist.get_rank() == 0:
mock_start.assert_called_once_with(
api_key="some-api-key",
project="some-project",
workspace="some-workspace",
experiment_key="some-experiment-key",
mode="get_or_create",
online=True,
)
mock_experiment.set_name.assert_called_once_with("some-experiment-name")
assert comet_monitor.experiment is mock_experiment
else:
mock_start.assert_not_called()
def test_empty_comet(self):
import comet_ml
mock_start = Mock()
config_dict = {"train_batch_size": 2, "comet": {}}
ds_config = DeepSpeedConfig(config_dict)
with patch.object(comet_ml, "start", mock_start):
comet_monitor = CometMonitor(ds_config.monitor_config.comet)
defaults = DeepSpeedMonitorConfig().comet
assert comet_monitor.enabled == defaults.enabled
assert comet_monitor.samples_log_interval == defaults.samples_log_interval
mock_start.assert_not_called()