Skip to content

ElementAstro/cobalt-forward

Repository files navigation

Cobalt Forward

Python 3.10+ License

A high-performance TCP-WebSocket forwarder with command dispatching capabilities, built on a clean modular architecture with dependency injection, event-driven messaging, and a rich plugin system.

中文文档 (Chinese Documentation)

Features

  • TCP-WebSocket Forwarding — Bidirectional data forwarding between TCP and WebSocket connections
  • Command Dispatching — Flexible command processing, routing, and priority-based queue system
  • Event-Driven Architecture — Publish-subscribe event bus and message bus with async processing
  • Plugin System — Dynamic plugin loading with sandboxing, dependency resolution, and lifecycle management
  • SSH Forwarding — Integrated SSH client and port-forwarding service with connection pooling
  • File Upload Management — Managed file upload service with progress tracking
  • Configuration Management — YAML/JSON config with hot-reload watching, encrypted secrets, and backup/restore
  • RESTful API — Full-featured FastAPI-based management interface with Swagger docs
  • Security — API key authentication, CORS, rate limiting, and trusted host middleware
  • Performance Monitoring — Request timing, metrics collection, and health checks
  • Dependency Injection — Lightweight DI container with singleton/transient lifetimes and auto-wiring
  • Comprehensive Testing — 25+ test modules covering all layers of the application

Quick Start

Prerequisites

  • Python 3.10+
  • uv (recommended) or pip

1. Install Dependencies

# Using uv (recommended)
uv sync

# Or using pip
pip install -e ".[dev]"

2. Generate Configuration

# Generate default config.yaml
uv run cobalt-forward init-config --output config.yaml

# Or specify JSON format
uv run cobalt-forward init-config --output config.json --format json

3. Validate Configuration

uv run cobalt-forward validate-config config.yaml

4. Start the Server

# Production mode
uv run cobalt-forward start --config config.yaml

# Development mode (auto-reload enabled)
uv run cobalt-forward dev --config config.yaml --port 8000

# With CLI overrides
uv run cobalt-forward start -c config.yaml --host 0.0.0.0 --port 9000 --log-level DEBUG --debug

5. Verify the Server

# Health check via CLI
uv run cobalt-forward health-check --host localhost --port 8000

# Or via HTTP
curl http://localhost:8000/health

6. Explore the API

Open http://localhost:8000/docs in your browser for the interactive Swagger UI.

Configuration

The application supports YAML and JSON configuration files. Key sections:

Section Description
tcp TCP client — host, port, buffer size, keepalive, retry
websocket WebSocket server — host, port, SSL, ping interval
ssh SSH client — host, credentials, pool size, key path
ftp FTP server — host, port, passive ports, root directory
mqtt MQTT client — broker, topics, QoS, credentials
logging Log level, format, file rotation, console/file output
plugins Plugin directory, auto-load, sandbox, allowed/blocked modules
performance Metrics interval, history size, alert thresholds
security API key, CORS origins, rate limiting

Environment variables can be set via .env file (see .env.example).

API Endpoints

Route Tag Description
GET / root Application info
GET /health health Health check and readiness probes
/ws websocket WebSocket connection management
/api/v1/system system System status, metrics, and management
/api/v1/commands commands Command dispatching and queue management
/api/v1/config configuration Runtime configuration CRUD, backup/restore, encryption
/api/v1/ssh ssh SSH connection and port-forwarding management
/api/v1/plugins plugins Plugin lifecycle management and execution
/api/v1/core core Core service inspection (event bus, message bus)

Project Structure

cobalt_forward/
├── application/                 # Application layer
│   ├── container.py             # DI container (singleton/transient/scoped)
│   └── startup.py               # Service registration & startup orchestration
├── core/                        # Domain layer
│   ├── domain/
│   │   ├── commands.py          # Command models & results
│   │   ├── events.py            # Event models & priorities
│   │   ├── messages.py          # Message models & routing
│   │   └── plugins.py           # Plugin domain models
│   ├── interfaces/
│   │   ├── clients.py           # IBaseClient, ISSHClient, ITCPClient, ...
│   │   ├── commands.py          # ICommandDispatcher, ICommandHandler, ICommandQueue
│   │   ├── lifecycle.py         # IStartable, IStoppable, IHealthCheckable
│   │   ├── messaging.py         # IEventBus, IMessageBus
│   │   ├── plugins.py           # IPlugin, IPluginManager
│   │   ├── ssh.py               # ISSHForwarder
│   │   └── upload.py            # IUploadManager
│   └── services/
│       ├── command_dispatcher.py # Command routing & handler registry
│       ├── command_queue.py      # Priority command queue
│       ├── event_bus.py          # Pub-sub event bus with async processing
│       └── message_bus.py        # Request-reply message bus
├── infrastructure/              # Infrastructure layer
│   ├── clients/
│   │   ├── base.py              # Base client with metrics & reconnection
│   │   └── ssh/                 # SSH client implementation
│   ├── config/
│   │   ├── loader.py            # YAML/JSON config loading
│   │   ├── models.py            # Typed config dataclasses
│   │   ├── manager.py           # Runtime config management
│   │   ├── watcher.py           # File-system config hot-reload
│   │   ├── backup.py            # Config backup & restore
│   │   └── crypto.py            # Config encryption (Fernet/PBKDF2)
│   ├── logging/
│   │   └── setup.py             # Logging setup & manager
│   └── services/
│       ├── ssh/                 # SSH forwarding service
│       └── upload/              # File upload service
├── plugins/                     # Plugin system
│   ├── base.py                  # BasePlugin & PluginContext
│   └── manager.py               # Plugin manager with sandboxing
├── presentation/                # Presentation layer
│   └── api/
│       ├── app.py               # FastAPI factory & lifespan
│       ├── dependencies.py      # FastAPI dependency injection helpers
│       ├── middleware.py         # Error, performance, security middleware
│       └── routers/             # API route modules
│           ├── health.py        # Health & readiness endpoints
│           ├── system.py        # System management endpoints
│           ├── commands.py      # Command dispatching endpoints
│           ├── config.py        # Configuration management endpoints
│           ├── ssh.py           # SSH management endpoints
│           ├── plugin.py        # Plugin management endpoints
│           ├── websocket.py     # WebSocket endpoints
│           └── core.py          # Core service endpoints
└── main.py                      # CLI entry point (Typer)

plugins/                         # User plugins directory
tests/                           # Test suite (25+ modules)
config.yaml                      # Default configuration
config.json                      # Alternative JSON configuration

Development

Running Tests

# Run all tests (coverage enabled by default)
uv run pytest -q

# With HTML coverage report
uv run pytest --cov=cobalt_forward --cov-report=html

# Run a specific test module
uv run pytest tests/test_main.py -v

# Run without uv
pytest -q

Code Quality

# Lint
uv run ruff check .

# Format
uv run black .
uv run isort .

# Type check
uv run mypy cobalt_forward

Test Coverage

The project maintains comprehensive test coverage across all layers:

Category Test Modules
Entry Point test_main.py — CLI commands and application bootstrap
Application test_application_startup.py, test_container.py — DI container and startup
Domain test_domain_commands.py, test_domain_events.py, test_domain_messages.py
Interfaces test_interfaces_clients.py, test_interfaces_commands.py, test_interfaces_lifecycle.py, test_interfaces_messaging.py, test_interfaces_plugins.py, test_interfaces_ssh.py, test_interfaces_upload.py
Core Services test_core_services_command_dispatcher.py, test_command_queue.py
Infrastructure test_config_loader.py, test_config_models.py, test_config_manager.py, test_config_manager_backup.py, test_infrastructure_config_watcher.py, test_infrastructure_logging_setup.py
Services test_ssh_forwarder.py, test_upload_manager.py
Presentation test_presentation_api_app.py
Plugins test_plugins_manager.py

Tech Stack

Component Technology
Web Framework FastAPI
ASGI Server Uvicorn
CLI Typer
Config Validation Pydantic
WebSocket websockets
Async HTTP aiohttp
Caching aiocache
Logging Loguru
Encryption cryptography
Process Monitoring psutil
Interactive CLI prompt-toolkit

License

This project is authored by Max Qian (astro_air@126.com).

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages