Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions sidecar/rpc/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package rpc

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)

const defaultTimeout = 10 * time.Second

// HTTPDoer abstracts HTTP requests for testability.
type HTTPDoer interface {
Do(req *http.Request) (*http.Response, error)
}

// rpcError represents a JSON-RPC error returned by CometBFT.
type rpcError struct {
Code int `json:"code"`
Message string `json:"message"`
Data string `json:"data"`
}

// envelope is the JSON-RPC response wrapper returned by all CometBFT
// HTTP RPC endpoints (e.g. /status, /block, /block_results).
type envelope struct {
Result json.RawMessage `json:"result"`
Error *rpcError `json:"error,omitempty"`
}

// Client performs HTTP GET requests against a CometBFT RPC endpoint
// and handles the JSON-RPC envelope unwrapping.
type Client struct {
endpoint string
httpClient HTTPDoer
timeout time.Duration
}

// NewClient creates a CometBFT RPC client. Pass "" for endpoint to
// use the default localhost address. Pass nil for httpClient to use
// http.DefaultClient with no custom timeout.
func NewClient(endpoint string, httpClient HTTPDoer) *Client {
if endpoint == "" {
endpoint = DefaultEndpoint
}
if httpClient == nil {
httpClient = &http.Client{}
}
return &Client{
endpoint: endpoint,
httpClient: httpClient,
timeout: defaultTimeout,
}
}

// SetTimeout overrides the per-request context timeout.
func (c *Client) SetTimeout(d time.Duration) { c.timeout = d }

// Endpoint returns the configured RPC base URL.
func (c *Client) Endpoint() string { return c.endpoint }

// Get performs an HTTP GET to endpoint+path, unwraps the JSON-RPC
// envelope, and returns the inner "result" as raw JSON.
func (c *Client) Get(ctx context.Context, path string) (json.RawMessage, error) {
body, err := c.doGet(ctx, path)
if err != nil {
return nil, err
}

var env envelope
if err := json.Unmarshal(body, &env); err != nil {
return nil, fmt.Errorf("decoding JSON-RPC envelope from %s: %w", path, err)
}
if env.Error != nil {
return nil, fmt.Errorf("JSON-RPC error from %s: %s (code %d, data: %s)",
path, env.Error.Message, env.Error.Code, env.Error.Data)
}
if len(env.Result) == 0 {
return nil, fmt.Errorf("empty result in JSON-RPC response from %s", path)
}

return env.Result, nil
}

// GetRaw performs an HTTP GET and returns the entire response body
// without envelope unwrapping. Use for archival paths that store the
// verbatim JSON-RPC response (e.g., S3 export).
func (c *Client) GetRaw(ctx context.Context, path string) ([]byte, error) {
return c.doGet(ctx, path)
}

func (c *Client) doGet(ctx context.Context, path string) ([]byte, error) {
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.endpoint+path, nil)
if err != nil {
return nil, err
}

resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, body)
}

return io.ReadAll(resp.Body)
}
128 changes: 128 additions & 0 deletions sidecar/rpc/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package rpc

import (
"context"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
)

func TestClient_Get_UnwrapsEnvelope(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"jsonrpc":"2.0","id":-1,"result":{"sync_info":{"latest_block_height":"42"}}}`))
}))
defer srv.Close()

c := NewClient(srv.URL, nil)
raw, err := c.Get(context.Background(), "/status")
if err != nil {
t.Fatalf("Get: %v", err)
}

got := string(raw)
if !strings.Contains(got, "latest_block_height") {
t.Errorf("expected unwrapped result containing latest_block_height, got %s", got)
}
if strings.Contains(got, "jsonrpc") {
t.Error("result should not contain the JSON-RPC envelope")
}
}

func TestClient_Get_EmptyResult(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"jsonrpc":"2.0","id":-1}`))
}))
defer srv.Close()

c := NewClient(srv.URL, nil)
_, err := c.Get(context.Background(), "/status")
if err == nil {
t.Fatal("expected error for empty result")
}
if !strings.Contains(err.Error(), "empty result") {
t.Errorf("unexpected error: %v", err)
}
}

func TestClient_Get_HTTPError(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadGateway)
_, _ = w.Write([]byte("bad gateway"))
}))
defer srv.Close()

c := NewClient(srv.URL, nil)
_, err := c.Get(context.Background(), "/status")
if err == nil {
t.Fatal("expected error for non-200 response")
}
if !strings.Contains(err.Error(), "502") {
t.Errorf("expected HTTP 502 in error, got: %v", err)
}
}

func TestClient_Get_MalformedJSON(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`not json`))
}))
defer srv.Close()

c := NewClient(srv.URL, nil)
_, err := c.Get(context.Background(), "/status")
if err == nil {
t.Fatal("expected error for malformed JSON")
}
}

func TestClient_GetRaw_ReturnsFullBody(t *testing.T) {
body := `{"jsonrpc":"2.0","id":-1,"result":{"txs_results":[]}}`
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(body))
}))
defer srv.Close()

c := NewClient(srv.URL, nil)
raw, err := c.GetRaw(context.Background(), "/block_results?height=1")
if err != nil {
t.Fatalf("GetRaw: %v", err)
}
if string(raw) != body {
t.Errorf("expected full body %q, got %q", body, string(raw))
}
}

func TestClient_Get_RPCError(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"jsonrpc":"2.0","id":-1,"error":{"code":-32603,"message":"Internal error","data":"height 999999 is not available"}}`))
}))
defer srv.Close()

c := NewClient(srv.URL, nil)
_, err := c.Get(context.Background(), "/block?height=999999")
if err == nil {
t.Fatal("expected error for JSON-RPC error response")
}
if !strings.Contains(err.Error(), "Internal error") {
t.Errorf("expected error to contain CometBFT message, got: %v", err)
}
if !strings.Contains(err.Error(), "height 999999 is not available") {
t.Errorf("expected error to contain data field, got: %v", err)
}
}

func TestClient_SetTimeout(t *testing.T) {
c := NewClient("", nil)
c.SetTimeout(500 * time.Millisecond)
if c.timeout != 500*time.Millisecond {
t.Errorf("timeout = %v, want 500ms", c.timeout)
}
}

func TestClient_DefaultEndpoint(t *testing.T) {
c := NewClient("", nil)
if c.Endpoint() != DefaultEndpoint {
t.Errorf("endpoint = %q, want %q", c.Endpoint(), DefaultEndpoint)
}
}
70 changes: 22 additions & 48 deletions sidecar/rpc/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,87 +4,61 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"time"

seiconfig "github.com/sei-protocol/sei-config"
)

const statusTimeout = 500 * time.Millisecond

// DefaultEndpoint is the local CometBFT RPC address.
var DefaultEndpoint = fmt.Sprintf("http://localhost:%d", seiconfig.PortRPC)

const defaultRequestTimeout = 500 * time.Millisecond

// StatusClient queries a CometBFT node's /status endpoint.
type StatusClient struct {
endpoint string
httpClient *http.Client
}

// NodeStatus holds the fields we care about from CometBFT /status.
type NodeStatus struct {
LatestBlockHeight int64
CatchingUp bool
}

// Endpoint returns the configured RPC endpoint.
func (c *StatusClient) Endpoint() string { return c.endpoint }

type statusResponse struct {
SyncInfo struct {
LatestBlockHeight string `json:"latest_block_height"`
CatchingUp bool `json:"catching_up"`
} `json:"sync_info"`
// StatusClient queries a CometBFT node's /status endpoint.
type StatusClient struct {
client *Client
}

// NewStatusClient creates a client targeting the given RPC endpoint.
// Pass "" for the default localhost endpoint.
func NewStatusClient(endpoint string, httpClient *http.Client) *StatusClient {
if endpoint == "" {
endpoint = DefaultEndpoint
}
if httpClient == nil {
httpClient = &http.Client{}
}
return &StatusClient{endpoint: endpoint, httpClient: httpClient}
// Pass "" for the default localhost endpoint. Pass nil for the default
// HTTP client.
func NewStatusClient(endpoint string, httpClient HTTPDoer) *StatusClient {
c := NewClient(endpoint, httpClient)
c.SetTimeout(statusTimeout)
return &StatusClient{client: c}
}

// Endpoint returns the configured RPC endpoint.
func (c *StatusClient) Endpoint() string { return c.client.Endpoint() }

// Status queries the node and returns the parsed status.
func (c *StatusClient) Status(ctx context.Context) (*NodeStatus, error) {
ctx, cancel := context.WithTimeout(ctx, defaultRequestTimeout)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.endpoint+"/status", nil)
if err != nil {
return nil, err
}

resp, err := c.httpClient.Do(req)
raw, err := c.client.Get(ctx, "/status")
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, body)
}

var rpcResp statusResponse
if err := json.NewDecoder(resp.Body).Decode(&rpcResp); err != nil {
return nil, fmt.Errorf("decoding /status: %w", err)
var result StatusResult
if err := json.Unmarshal(raw, &result); err != nil {
return nil, fmt.Errorf("decoding /status result: %w", err)
}

h, err := strconv.ParseInt(rpcResp.SyncInfo.LatestBlockHeight, 10, 64)
h, err := strconv.ParseInt(result.SyncInfo.LatestBlockHeight, 10, 64)
if err != nil {
return nil, fmt.Errorf("parsing latest_block_height %q: %w", rpcResp.SyncInfo.LatestBlockHeight, err)
return nil, fmt.Errorf("parsing latest_block_height %q: %w",
result.SyncInfo.LatestBlockHeight, err)
}

return &NodeStatus{
LatestBlockHeight: h,
CatchingUp: rpcResp.SyncInfo.CatchingUp,
CatchingUp: result.SyncInfo.CatchingUp,
}, nil
}

Expand Down
Loading
Loading