A high-performance TCP-WebSocket forwarder with command dispatching capabilities, built on a clean modular architecture with dependency injection, event-driven messaging, and a rich plugin system.
- TCP-WebSocket Forwarding — Bidirectional data forwarding between TCP and WebSocket connections
- Command Dispatching — Flexible command processing, routing, and priority-based queue system
- Event-Driven Architecture — Publish-subscribe event bus and message bus with async processing
- Plugin System — Dynamic plugin loading with sandboxing, dependency resolution, and lifecycle management
- SSH Forwarding — Integrated SSH client and port-forwarding service with connection pooling
- File Upload Management — Managed file upload service with progress tracking
- Configuration Management — YAML/JSON config with hot-reload watching, encrypted secrets, and backup/restore
- RESTful API — Full-featured FastAPI-based management interface with Swagger docs
- Security — API key authentication, CORS, rate limiting, and trusted host middleware
- Performance Monitoring — Request timing, metrics collection, and health checks
- Dependency Injection — Lightweight DI container with singleton/transient lifetimes and auto-wiring
- Comprehensive Testing — 25+ test modules covering all layers of the application
- Python 3.10+
- uv (recommended) or pip
# Using uv (recommended)
uv sync
# Or using pip
pip install -e ".[dev]"# Generate default config.yaml
uv run cobalt-forward init-config --output config.yaml
# Or specify JSON format
uv run cobalt-forward init-config --output config.json --format jsonuv run cobalt-forward validate-config config.yaml# Production mode
uv run cobalt-forward start --config config.yaml
# Development mode (auto-reload enabled)
uv run cobalt-forward dev --config config.yaml --port 8000
# With CLI overrides
uv run cobalt-forward start -c config.yaml --host 0.0.0.0 --port 9000 --log-level DEBUG --debug# Health check via CLI
uv run cobalt-forward health-check --host localhost --port 8000
# Or via HTTP
curl http://localhost:8000/healthOpen http://localhost:8000/docs in your browser for the interactive Swagger UI.
The application supports YAML and JSON configuration files. Key sections:
| Section | Description |
|---|---|
tcp |
TCP client — host, port, buffer size, keepalive, retry |
websocket |
WebSocket server — host, port, SSL, ping interval |
ssh |
SSH client — host, credentials, pool size, key path |
ftp |
FTP server — host, port, passive ports, root directory |
mqtt |
MQTT client — broker, topics, QoS, credentials |
logging |
Log level, format, file rotation, console/file output |
plugins |
Plugin directory, auto-load, sandbox, allowed/blocked modules |
performance |
Metrics interval, history size, alert thresholds |
security |
API key, CORS origins, rate limiting |
Environment variables can be set via .env file (see .env.example).
| Route | Tag | Description |
|---|---|---|
GET / |
root | Application info |
GET /health |
health | Health check and readiness probes |
/ws |
websocket | WebSocket connection management |
/api/v1/system |
system | System status, metrics, and management |
/api/v1/commands |
commands | Command dispatching and queue management |
/api/v1/config |
configuration | Runtime configuration CRUD, backup/restore, encryption |
/api/v1/ssh |
ssh | SSH connection and port-forwarding management |
/api/v1/plugins |
plugins | Plugin lifecycle management and execution |
/api/v1/core |
core | Core service inspection (event bus, message bus) |
cobalt_forward/
├── application/ # Application layer
│ ├── container.py # DI container (singleton/transient/scoped)
│ └── startup.py # Service registration & startup orchestration
├── core/ # Domain layer
│ ├── domain/
│ │ ├── commands.py # Command models & results
│ │ ├── events.py # Event models & priorities
│ │ ├── messages.py # Message models & routing
│ │ └── plugins.py # Plugin domain models
│ ├── interfaces/
│ │ ├── clients.py # IBaseClient, ISSHClient, ITCPClient, ...
│ │ ├── commands.py # ICommandDispatcher, ICommandHandler, ICommandQueue
│ │ ├── lifecycle.py # IStartable, IStoppable, IHealthCheckable
│ │ ├── messaging.py # IEventBus, IMessageBus
│ │ ├── plugins.py # IPlugin, IPluginManager
│ │ ├── ssh.py # ISSHForwarder
│ │ └── upload.py # IUploadManager
│ └── services/
│ ├── command_dispatcher.py # Command routing & handler registry
│ ├── command_queue.py # Priority command queue
│ ├── event_bus.py # Pub-sub event bus with async processing
│ └── message_bus.py # Request-reply message bus
├── infrastructure/ # Infrastructure layer
│ ├── clients/
│ │ ├── base.py # Base client with metrics & reconnection
│ │ └── ssh/ # SSH client implementation
│ ├── config/
│ │ ├── loader.py # YAML/JSON config loading
│ │ ├── models.py # Typed config dataclasses
│ │ ├── manager.py # Runtime config management
│ │ ├── watcher.py # File-system config hot-reload
│ │ ├── backup.py # Config backup & restore
│ │ └── crypto.py # Config encryption (Fernet/PBKDF2)
│ ├── logging/
│ │ └── setup.py # Logging setup & manager
│ └── services/
│ ├── ssh/ # SSH forwarding service
│ └── upload/ # File upload service
├── plugins/ # Plugin system
│ ├── base.py # BasePlugin & PluginContext
│ └── manager.py # Plugin manager with sandboxing
├── presentation/ # Presentation layer
│ └── api/
│ ├── app.py # FastAPI factory & lifespan
│ ├── dependencies.py # FastAPI dependency injection helpers
│ ├── middleware.py # Error, performance, security middleware
│ └── routers/ # API route modules
│ ├── health.py # Health & readiness endpoints
│ ├── system.py # System management endpoints
│ ├── commands.py # Command dispatching endpoints
│ ├── config.py # Configuration management endpoints
│ ├── ssh.py # SSH management endpoints
│ ├── plugin.py # Plugin management endpoints
│ ├── websocket.py # WebSocket endpoints
│ └── core.py # Core service endpoints
└── main.py # CLI entry point (Typer)
plugins/ # User plugins directory
tests/ # Test suite (25+ modules)
config.yaml # Default configuration
config.json # Alternative JSON configuration
# Run all tests (coverage enabled by default)
uv run pytest -q
# With HTML coverage report
uv run pytest --cov=cobalt_forward --cov-report=html
# Run a specific test module
uv run pytest tests/test_main.py -v
# Run without uv
pytest -q# Lint
uv run ruff check .
# Format
uv run black .
uv run isort .
# Type check
uv run mypy cobalt_forwardThe project maintains comprehensive test coverage across all layers:
| Category | Test Modules |
|---|---|
| Entry Point | test_main.py — CLI commands and application bootstrap |
| Application | test_application_startup.py, test_container.py — DI container and startup |
| Domain | test_domain_commands.py, test_domain_events.py, test_domain_messages.py |
| Interfaces | test_interfaces_clients.py, test_interfaces_commands.py, test_interfaces_lifecycle.py, test_interfaces_messaging.py, test_interfaces_plugins.py, test_interfaces_ssh.py, test_interfaces_upload.py |
| Core Services | test_core_services_command_dispatcher.py, test_command_queue.py |
| Infrastructure | test_config_loader.py, test_config_models.py, test_config_manager.py, test_config_manager_backup.py, test_infrastructure_config_watcher.py, test_infrastructure_logging_setup.py |
| Services | test_ssh_forwarder.py, test_upload_manager.py |
| Presentation | test_presentation_api_app.py |
| Plugins | test_plugins_manager.py |
| Component | Technology |
|---|---|
| Web Framework | FastAPI |
| ASGI Server | Uvicorn |
| CLI | Typer |
| Config Validation | Pydantic |
| WebSocket | websockets |
| Async HTTP | aiohttp |
| Caching | aiocache |
| Logging | Loguru |
| Encryption | cryptography |
| Process Monitoring | psutil |
| Interactive CLI | prompt-toolkit |
This project is authored by Max Qian (astro_air@126.com).