Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/config/config_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ func parsePostgresListenerConfig() (*stream.PostgresListenerConfig, error) {
PostgresURL: pgURL,
ReplicationSlotName: viper.GetString("PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME"),
PluginArguments: pgreplication.PluginArguments{
IncludeXIDs: viper.GetBool("PGSTREAM_POSTGRES_REPLICATION_PLUGIN_INCLUDE_XIDS"),
IncludeXIDs: viper.GetBool("PGSTREAM_POSTGRES_REPLICATION_PLUGIN_INCLUDE_XIDS"),
AddTables: viper.GetString("PGSTREAM_POSTGRES_REPLICATION_PLUGIN_ADD_TABLES"),
FilterTables: viper.GetString("PGSTREAM_POSTGRES_REPLICATION_PLUGIN_FILTER_TABLES"),
},
},
RetryPolicy: parseBackoffConfig("PGSTREAM_POSTGRES_LISTENER"),
Expand Down
6 changes: 5 additions & 1 deletion cmd/config/config_yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ type ReplicationConfig struct {
}

type PluginConfig struct {
IncludeXIDs bool `mapstructure:"include_xids" yaml:"include_xids"`
IncludeXIDs bool `mapstructure:"include_xids" yaml:"include_xids"`
AddTables string `mapstructure:"add_tables" yaml:"add_tables"`
FilterTables string `mapstructure:"filter_tables" yaml:"filter_tables"`
}

type KafkaConfig struct {
Expand Down Expand Up @@ -429,6 +431,8 @@ func (c *YAMLConfig) parsePostgresListenerConfig() (*stream.PostgresListenerConf
replicationSlotName = c.Source.Postgres.Replication.ReplicationSlot
if c.Source.Postgres.Replication.Plugin != nil {
pluginArgs.IncludeXIDs = c.Source.Postgres.Replication.Plugin.IncludeXIDs
pluginArgs.AddTables = c.Source.Postgres.Replication.Plugin.AddTables
pluginArgs.FilterTables = c.Source.Postgres.Replication.Plugin.FilterTables
}
}
streamCfg.Replication = pgreplication.Config{
Expand Down
10 changes: 9 additions & 1 deletion pkg/wal/replication/postgres/pg_replication_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ type Config struct {
}

type PluginArguments struct {
IncludeXIDs bool
IncludeXIDs bool
AddTables string // wal2json add-tables option (e.g., "public.*")
FilterTables string // wal2json filter-tables option (e.g., "pipelines.*,private.*")
}

type Option func(h *Handler)
Expand Down Expand Up @@ -110,6 +112,12 @@ func NewHandler(ctx context.Context, cfg Config, opts ...Option) (*Handler, erro
if cfg.PluginArguments.IncludeXIDs {
h.pluginArguments = append(h.pluginArguments, `"include-xids" '1'`)
}
if cfg.PluginArguments.AddTables != "" {
h.pluginArguments = append(h.pluginArguments, fmt.Sprintf(`"add-tables" '%s'`, cfg.PluginArguments.AddTables))
}
if cfg.PluginArguments.FilterTables != "" {
h.pluginArguments = append(h.pluginArguments, fmt.Sprintf(`"filter-tables" '%s'`, cfg.PluginArguments.FilterTables))
}

if len(cfg.IncludeTables) > 0 {
h.includedTables, err = pglib.NewSchemaTableMap(cfg.IncludeTables)
Expand Down
Loading