Go client library for the walstream server, which streams PostgreSQL database changes to Apache Kafka topics.
go get github.com/attentiontech/walstream-goimport (
"github.com/attentiontech/walstream-go/client"
)
// Uses WALSTREAM_URL and WALSTREAM_TOKEN env vars by default
c := client.New()
// Or configure explicitly
c := client.New(
client.WithServerURL("http://localhost:9795"),
client.WithBearerToken("your-token"),
)Connection names reference server-side connection credentials managed separately from pipeline specs. The source connection refers to a named PostgreSQL connection, and the destination connection refers to a named Kafka connection.
result, created, err := c.Pipelines.Apply(ctx, types.PipelineSpec{
Name: "user_changes",
Source: types.SourceConfig{
Connection: "my-postgres",
Tables: []types.Table{
{Schema: "public", Name: "users"},
{Schema: "public", Name: "orders"},
},
},
Destination: types.DestinationConfig{
Connection: "my-kafka",
Kafka: types.KafkaDestinationConfig{
TopicPrefix: "myapp_",
Initial: types.KafkaTopicInitial{
Partitions: 3,
CleanupPolicy: types.CleanupPolicyDelete,
},
},
},
DesiredStatus: types.DesiredStatusRunning,
})pipelines, err := c.Pipelines.List(ctx)
for _, p := range pipelines {
fmt.Printf("%s: %s\n", p.Name, p.Status)
}state, err := c.Pipelines.Get(ctx, "user_changes")
fmt.Printf("status: %s\n", state.Status)
if state.LastError != nil {
fmt.Printf("last error: %s\n", *state.LastError)
}status, err := c.Pipelines.Healthz(ctx, "user_changes")
fmt.Printf("health: %s\n", status)state, err := c.Pipelines.Get(ctx, "user_changes")
state.DesiredStatus = types.DesiredStatusStopped
_, _, err = c.Pipelines.Apply(ctx, state.PipelineSpec)result, err := c.Pipelines.Destroy(ctx, "user_changes")
fmt.Printf("status: %s\n", result.Status)- PipelineSpec: The persistent definition of a streaming pipeline
- PipelineState: Runtime state including status, errors, and statistics
- SourceConfig: PostgreSQL connection and table specifications
- DestinationConfig: Kafka configuration and topic settings
- KafkaTopicOverride: Per-table Kafka topic customization
DesiredStatusRunning: Pipeline should be activeDesiredStatusStopped: Pipeline should be inactive
EffectiveStatusRunning: Pipeline is actively streamingEffectiveStatusFailing: Pipeline encountered an errorEffectiveStatusRestarting: Pipeline is restarting after failureEffectiveStatusStopped: Pipeline is stopped
CleanupPolicyDelete: Delete old log segments after retention periodCleanupPolicyCompact: Keep the latest value for each key
This project is licensed under the MIT License - see the LICENSE file for details.