Skip to content

Commit

Permalink
[-] fix Patroni source resolving (#546)
Browse files Browse the repository at this point in the history
* Fixed broken patron support
* Patroni works with empty conn_str

---------

Co-authored-by: Sebastian Saletnik <[email protected]>
  • Loading branch information
saletnik and Sebastian Saletnik authored Sep 27, 2024
1 parent 9a338eb commit b4806e5
Showing 1 changed file with 25 additions and 20 deletions.
45 changes: 25 additions & 20 deletions internal/sources/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ import (
"os"
"path"
"regexp"
"strconv"
"strings"
"time"

"github.com/cybertec-postgresql/pgwatch/v3/internal/db"
"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
pgx "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
client "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -161,6 +160,7 @@ func getEtcdClusterMembers(s Source) ([]PatroniClusterMember, error) {
if err != nil {
return ret, err
}
defer c.Close()

ctx, cancel := context.WithTimeoutCause(context.Background(), 5*time.Second, errors.New("etcd client timeout"))
defer cancel()
Expand All @@ -173,14 +173,24 @@ func getEtcdClusterMembers(s Source) ([]PatroniClusterMember, error) {
if s.HostConfig.Namespace == "" {
return ret, fmt.Errorf("Skipping Patroni entry %s - search 'namespace' not specified", s.Name)
}
resp, err := kapi.Get(ctx, s.HostConfig.Namespace)
resp, err := kapi.Get(ctx, s.HostConfig.Namespace, client.WithPrefix(), client.WithKeysOnly())
if err != nil {

return ret, cmp.Or(context.Cause(ctx), err)
}

// etcd3 does not have a dir node.
// Key="/service/batman/leader"
// Key="/service/batman/members/node"
//
// create unique map of first level items without Namespace
scopes := make(map[string]bool, len(resp.Kvs))
for _, node := range resp.Kvs {
scope := path.Base(string(node.Key)) // Key="/service/batman"
pathSuffix := strings.TrimPrefix(string(node.Key), s.HostConfig.Namespace)
scope := strings.SplitN(pathSuffix, "/", 2)[0]
scopes[scope] = true
}

for scope := range scopes {
scopeMembers, err := extractEtcdScopeMembers(ctx, s, scope, kapi, true)
if err != nil {
continue
Expand All @@ -202,7 +212,7 @@ func extractEtcdScopeMembers(ctx context.Context, s Source, scope string, kapi c
var name string
membersPath := path.Join(s.HostConfig.Namespace, scope, "members")

resp, err := kapi.Get(ctx, membersPath)
resp, err := kapi.Get(ctx, membersPath, client.WithPrefix())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -288,15 +298,15 @@ func ResolveDatabasesFromPatroni(ce Source) ([]*MonitoredDatabase, error) {
mds = append(mds, c)
continue
}
c, err := db.New(context.TODO(), ce.ConnStr,
func(c *pgxpool.Config) error {
c.ConnConfig.Host = host
c.ConnConfig.Database = "template1"
i, err := strconv.ParseUint(port, 10, 16)
c.ConnConfig.Port = uint16(i)
mds[len(mds)].ConnStr = c.ConnString()
return err
})
connURL, err := url.Parse(ce.ConnStr)
if err != nil {
logger.Errorf("Could not contact Patroni member [%s:%s]: %v", ce.Name, m.Scope, err)
continue
}
connURL.Scheme = "postgresql"
connURL.Host = host + ":" + port
connURL.Path = "template1"
c, err := db.New(context.TODO(), connURL.String())
if err != nil {
logger.Errorf("Could not contact Patroni member [%s:%s]: %v", ce.Name, m.Scope, err)
continue
Expand All @@ -322,11 +332,6 @@ func ResolveDatabasesFromPatroni(ce Source) ([]*MonitoredDatabase, error) {
}

for _, d := range data {
connURL, err := url.Parse(ce.ConnStr)
if err != nil {
continue
}
connURL.Host = host + ":" + port
connURL.Path = d["datname"].(string)
c := ce.Clone()
c.Name = dbUnique + "_" + d["datname_escaped"].(string)
Expand Down

0 comments on commit b4806e5

Please sign in to comment.