diff --git a/go.mod b/go.mod index 428a1c1b42..0101094a29 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,7 @@ require ( github.com/garyburd/redigo v1.6.0 // indirect github.com/go-openapi/jsonreference v0.19.3 // indirect github.com/go-openapi/spec v0.19.4 // indirect - github.com/go-redis/redis v6.15.2+incompatible + github.com/go-redis/redis v6.15.7+incompatible github.com/go-sql-driver/mysql v1.5.0 github.com/go-testfixtures/testfixtures/v3 v3.1.1 github.com/go-xorm/core v0.6.2 // indirect diff --git a/go.sum b/go.sum index 258debf57f..0e2678ab1e 100644 --- a/go.sum +++ b/go.sum @@ -111,6 +111,8 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh github.com/go-redis/redis v6.14.0+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDAhzyXg+Bs+0Sb4= github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-redis/redis v6.15.7+incompatible h1:3skhDh95XQMpnqeqNftPkQD9jL9e5e36z/1SUm6dy1U= +github.com/go-redis/redis v6.15.7+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= diff --git a/vendor/github.com/go-redis/redis/.travis.yml b/vendor/github.com/go-redis/redis/.travis.yml index 6b110b4cbb..06d7897b4e 100644 --- a/vendor/github.com/go-redis/redis/.travis.yml +++ b/vendor/github.com/go-redis/redis/.travis.yml @@ -8,6 +8,7 @@ go: - 1.9.x - 1.10.x - 1.11.x + - 1.12.x - tip matrix: diff --git a/vendor/github.com/go-redis/redis/README.md b/vendor/github.com/go-redis/redis/README.md index 7d05b44661..6c8d99af72 100644 --- a/vendor/github.com/go-redis/redis/README.md +++ b/vendor/github.com/go-redis/redis/README.md @@ -9,7 +9,7 @@ Supports: - Redis 3 commands except QUIT, MONITOR, SLOWLOG and SYNC. - Automatic connection pooling with [circuit breaker](https://en.wikipedia.org/wiki/Circuit_breaker_design_pattern) support. - [Pub/Sub](https://godoc.org/github.com/go-redis/redis#PubSub). -- [Transactions](https://godoc.org/github.com/go-redis/redis#Multi). +- [Transactions](https://godoc.org/github.com/go-redis/redis#example-Client-TxPipeline). - [Pipeline](https://godoc.org/github.com/go-redis/redis#example-Client-Pipeline) and [TxPipeline](https://godoc.org/github.com/go-redis/redis#example-Client-TxPipeline). - [Scripting](https://godoc.org/github.com/go-redis/redis#Script). - [Timeouts](https://godoc.org/github.com/go-redis/redis#Options). @@ -143,4 +143,4 @@ BenchmarkRedisClusterPing-4 100000 11535 ns/op 117 B/op - [Golang PostgreSQL ORM](https://github.com/go-pg/pg) - [Golang msgpack](https://github.com/vmihailenco/msgpack) -- [Golang message task queue](https://github.com/go-msgqueue/msgqueue) +- [Golang message task queue](https://github.com/vmihailenco/taskq) diff --git a/vendor/github.com/go-redis/redis/cluster.go b/vendor/github.com/go-redis/redis/cluster.go index 0cecc62c4b..ab2c76f05e 100644 --- a/vendor/github.com/go-redis/redis/cluster.go +++ b/vendor/github.com/go-redis/redis/cluster.go @@ -703,12 +703,12 @@ func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient { if ctx == nil { panic("nil context") } - c2 := c.copy() + c2 := c.clone() c2.ctx = ctx return c2 } -func (c *ClusterClient) copy() *ClusterClient { +func (c *ClusterClient) clone() *ClusterClient { cp := *c cp.init() return &cp @@ -1198,6 +1198,7 @@ func (c *ClusterClient) WrapProcessPipeline( fn func(oldProcess func([]Cmder) error) func([]Cmder) error, ) { c.processPipeline = fn(c.processPipeline) + c.processTxPipeline = fn(c.processTxPipeline) } func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { @@ -1537,16 +1538,21 @@ func (c *ClusterClient) pubSub() *PubSub { panic("node != nil") } - slot := hashtag.Slot(channels[0]) - var err error - node, err = c.slotMasterNode(slot) + if len(channels) > 0 { + slot := hashtag.Slot(channels[0]) + node, err = c.slotMasterNode(slot) + } else { + node, err = c.nodes.Random() + } if err != nil { return nil, err } cn, err := node.Client.newConn() if err != nil { + node = nil + return nil, err } diff --git a/vendor/github.com/go-redis/redis/command.go b/vendor/github.com/go-redis/redis/command.go index cb4f94b122..c70973d3bf 100644 --- a/vendor/github.com/go-redis/redis/command.go +++ b/vendor/github.com/go-redis/redis/command.go @@ -218,6 +218,25 @@ func (cmd *Cmd) Uint64() (uint64, error) { } } +func (cmd *Cmd) Float32() (float32, error) { + if cmd.err != nil { + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return float32(val), nil + case string: + f, err := strconv.ParseFloat(val, 32) + if err != nil { + return 0, err + } + return float32(f), nil + default: + err := fmt.Errorf("redis: unexpected type=%T for Float32", val) + return 0, err + } +} + func (cmd *Cmd) Float64() (float64, error) { if cmd.err != nil { return 0, cmd.err @@ -585,6 +604,17 @@ func (cmd *StringCmd) Uint64() (uint64, error) { return strconv.ParseUint(cmd.Val(), 10, 64) } +func (cmd *StringCmd) Float32() (float32, error) { + if cmd.err != nil { + return 0, cmd.err + } + f, err := strconv.ParseFloat(cmd.Val(), 32) + if err != nil { + return 0, err + } + return float32(f), nil +} + func (cmd *StringCmd) Float64() (float64, error) { if cmd.err != nil { return 0, cmd.err @@ -687,12 +717,12 @@ func (cmd *StringSliceCmd) readReply(rd *proto.Reader) error { func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) { ss := make([]string, 0, n) for i := int64(0); i < n; i++ { - s, err := rd.ReadString() - if err == Nil { + switch s, err := rd.ReadString(); { + case err == Nil: ss = append(ss, "") - } else if err != nil { + case err != nil: return nil, err - } else { + default: ss = append(ss, s) } } @@ -969,14 +999,20 @@ func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) { return nil, err } + var values map[string]interface{} + v, err := rd.ReadArrayReply(stringInterfaceMapParser) if err != nil { - return nil, err + if err != proto.Nil { + return nil, err + } + } else { + values = v.(map[string]interface{}) } msgs = append(msgs, XMessage{ ID: id, - Values: v.(map[string]interface{}), + Values: values, }) return nil, nil }) diff --git a/vendor/github.com/go-redis/redis/internal/pool/conn.go b/vendor/github.com/go-redis/redis/internal/pool/conn.go index 1095bfe59b..ac48113b63 100644 --- a/vendor/github.com/go-redis/redis/internal/pool/conn.go +++ b/vendor/github.com/go-redis/redis/internal/pool/conn.go @@ -17,14 +17,16 @@ type Conn struct { rdLocked bool wr *proto.Writer - InitedAt time.Time - pooled bool - usedAt atomic.Value + Inited bool + pooled bool + createdAt time.Time + usedAt atomic.Value } func NewConn(netConn net.Conn) *Conn { cn := &Conn{ - netConn: netConn, + netConn: netConn, + createdAt: time.Now(), } cn.rd = proto.NewReader(netConn) cn.wr = proto.NewWriter(netConn) diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool.go b/vendor/github.com/go-redis/redis/internal/pool/pool.go index 9cecee8ad4..cd4c8d69cc 100644 --- a/vendor/github.com/go-redis/redis/internal/pool/pool.go +++ b/vendor/github.com/go-redis/redis/internal/pool/pool.go @@ -38,7 +38,7 @@ type Pooler interface { Get() (*Conn, error) Put(*Conn) - Remove(*Conn) + Remove(*Conn, error) Len() int IdleLen() int @@ -289,7 +289,7 @@ func (p *ConnPool) popIdle() *Conn { func (p *ConnPool) Put(cn *Conn) { if !cn.pooled { - p.Remove(cn) + p.Remove(cn, nil) return } @@ -300,7 +300,7 @@ func (p *ConnPool) Put(cn *Conn) { p.freeTurn() } -func (p *ConnPool) Remove(cn *Conn) { +func (p *ConnPool) Remove(cn *Conn, reason error) { p.removeConn(cn) p.freeTurn() _ = p.closeConn(cn) @@ -468,7 +468,7 @@ func (p *ConnPool) isStaleConn(cn *Conn) bool { if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout { return true } - if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge { + if p.opt.MaxConnAge > 0 && now.Sub(cn.createdAt) >= p.opt.MaxConnAge { return true } diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool_single.go b/vendor/github.com/go-redis/redis/internal/pool/pool_single.go index b35b78afbd..cd0289b684 100644 --- a/vendor/github.com/go-redis/redis/internal/pool/pool_single.go +++ b/vendor/github.com/go-redis/redis/internal/pool/pool_single.go @@ -1,53 +1,203 @@ package pool +import ( + "fmt" + "sync/atomic" +) + +const ( + stateDefault = 0 + stateInited = 1 + stateClosed = 2 +) + +type BadConnError struct { + wrapped error +} + +var _ error = (*BadConnError)(nil) + +func (e BadConnError) Error() string { + return "pg: Conn is in a bad state" +} + +func (e BadConnError) Unwrap() error { + return e.wrapped +} + type SingleConnPool struct { - cn *Conn + pool Pooler + level int32 // atomic + + state uint32 // atomic + ch chan *Conn + + _badConnError atomic.Value } var _ Pooler = (*SingleConnPool)(nil) -func NewSingleConnPool(cn *Conn) *SingleConnPool { - return &SingleConnPool{ - cn: cn, +func NewSingleConnPool(pool Pooler) *SingleConnPool { + p, ok := pool.(*SingleConnPool) + if !ok { + p = &SingleConnPool{ + pool: pool, + ch: make(chan *Conn, 1), + } + } + atomic.AddInt32(&p.level, 1) + return p +} + +func (p *SingleConnPool) SetConn(cn *Conn) { + if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) { + p.ch <- cn + } else { + panic("not reached") } } func (p *SingleConnPool) NewConn() (*Conn, error) { - panic("not implemented") + return p.pool.NewConn() } -func (p *SingleConnPool) CloseConn(*Conn) error { - panic("not implemented") +func (p *SingleConnPool) CloseConn(cn *Conn) error { + return p.pool.CloseConn(cn) } func (p *SingleConnPool) Get() (*Conn, error) { - return p.cn, nil + // In worst case this races with Close which is not a very common operation. + for i := 0; i < 1000; i++ { + switch atomic.LoadUint32(&p.state) { + case stateDefault: + cn, err := p.pool.Get() + if err != nil { + return nil, err + } + if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) { + return cn, nil + } + p.pool.Remove(cn, ErrClosed) + case stateInited: + if err := p.badConnError(); err != nil { + return nil, err + } + cn, ok := <-p.ch + if !ok { + return nil, ErrClosed + } + return cn, nil + case stateClosed: + return nil, ErrClosed + default: + panic("not reached") + } + } + return nil, fmt.Errorf("pg: SingleConnPool.Get: infinite loop") } func (p *SingleConnPool) Put(cn *Conn) { - if p.cn != cn { - panic("p.cn != cn") + defer func() { + if recover() != nil { + p.freeConn(cn) + } + }() + p.ch <- cn +} + +func (p *SingleConnPool) freeConn(cn *Conn) { + if err := p.badConnError(); err != nil { + p.pool.Remove(cn, err) + } else { + p.pool.Put(cn) } } -func (p *SingleConnPool) Remove(cn *Conn) { - if p.cn != cn { - panic("p.cn != cn") - } +func (p *SingleConnPool) Remove(cn *Conn, reason error) { + defer func() { + if recover() != nil { + p.pool.Remove(cn, ErrClosed) + } + }() + p._badConnError.Store(BadConnError{wrapped: reason}) + p.ch <- cn } func (p *SingleConnPool) Len() int { - return 1 + switch atomic.LoadUint32(&p.state) { + case stateDefault: + return 0 + case stateInited: + return 1 + case stateClosed: + return 0 + default: + panic("not reached") + } } func (p *SingleConnPool) IdleLen() int { - return 0 + return len(p.ch) } func (p *SingleConnPool) Stats() *Stats { - return nil + return &Stats{} } func (p *SingleConnPool) Close() error { + level := atomic.AddInt32(&p.level, -1) + if level > 0 { + return nil + } + + for i := 0; i < 1000; i++ { + state := atomic.LoadUint32(&p.state) + if state == stateClosed { + return ErrClosed + } + if atomic.CompareAndSwapUint32(&p.state, state, stateClosed) { + close(p.ch) + cn, ok := <-p.ch + if ok { + p.freeConn(cn) + } + return nil + } + } + + return fmt.Errorf("pg: SingleConnPool.Close: infinite loop") +} + +func (p *SingleConnPool) Reset() error { + if p.badConnError() == nil { + return nil + } + + select { + case cn, ok := <-p.ch: + if !ok { + return ErrClosed + } + p.pool.Remove(cn, ErrClosed) + p._badConnError.Store(BadConnError{wrapped: nil}) + default: + return fmt.Errorf("pg: SingleConnPool does not have a Conn") + } + + if !atomic.CompareAndSwapUint32(&p.state, stateInited, stateDefault) { + state := atomic.LoadUint32(&p.state) + return fmt.Errorf("pg: invalid SingleConnPool state: %d", state) + } + + return nil +} + +func (p *SingleConnPool) badConnError() error { + if v := p._badConnError.Load(); v != nil { + err := v.(BadConnError) + if err.wrapped != nil { + return err + } + } return nil } diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go b/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go index 91bd913330..3e8f503633 100644 --- a/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go +++ b/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go @@ -55,13 +55,13 @@ func (p *StickyConnPool) putUpstream() { func (p *StickyConnPool) Put(cn *Conn) {} -func (p *StickyConnPool) removeUpstream() { - p.pool.Remove(p.cn) +func (p *StickyConnPool) removeUpstream(reason error) { + p.pool.Remove(p.cn, reason) p.cn = nil } -func (p *StickyConnPool) Remove(cn *Conn) { - p.removeUpstream() +func (p *StickyConnPool) Remove(cn *Conn, reason error) { + p.removeUpstream(reason) } func (p *StickyConnPool) Len() int { @@ -101,7 +101,7 @@ func (p *StickyConnPool) Close() error { if p.reusable { p.putUpstream() } else { - p.removeUpstream() + p.removeUpstream(ErrClosed) } } diff --git a/vendor/github.com/go-redis/redis/internal/util.go b/vendor/github.com/go-redis/redis/internal/util.go index ffd2353e0e..80a600381d 100644 --- a/vendor/github.com/go-redis/redis/internal/util.go +++ b/vendor/github.com/go-redis/redis/internal/util.go @@ -27,3 +27,13 @@ func isLower(s string) bool { } return true } + +func Unwrap(err error) error { + u, ok := err.(interface { + Unwrap() error + }) + if !ok { + return nil + } + return u.Unwrap() +} diff --git a/vendor/github.com/go-redis/redis/pipeline.go b/vendor/github.com/go-redis/redis/pipeline.go index b3a8844af0..2714ceb81d 100644 --- a/vendor/github.com/go-redis/redis/pipeline.go +++ b/vendor/github.com/go-redis/redis/pipeline.go @@ -8,6 +8,19 @@ import ( type pipelineExecer func([]Cmder) error +// Pipeliner is an mechanism to realise Redis Pipeline technique. +// +// Pipelining is a technique to extremely speed up processing by packing +// operations to batches, send them at once to Redis and read a replies in a +// singe step. +// See https://redis.io/topics/pipelining +// +// Pay attention, that Pipeline is not a transaction, so you can get unexpected +// results in case of big pipelines and small read/write timeouts. +// Redis client has retransmission logic in case of timeouts, pipeline +// can be retransmitted and commands can be executed more then once. +// To avoid this: it is good idea to use reasonable bigger read/write timeouts +// depends of your batch size and/or use TxPipeline. type Pipeliner interface { StatefulCmdable Do(args ...interface{}) *Cmd diff --git a/vendor/github.com/go-redis/redis/pubsub.go b/vendor/github.com/go-redis/redis/pubsub.go index 0afb47cda3..03b01566ba 100644 --- a/vendor/github.com/go-redis/redis/pubsub.go +++ b/vendor/github.com/go-redis/redis/pubsub.go @@ -3,6 +3,7 @@ package redis import ( "errors" "fmt" + "strings" "sync" "time" @@ -13,7 +14,7 @@ import ( var errPingTimeout = errors.New("redis: ping timeout") -// PubSub implements Pub/Sub commands bas described in +// PubSub implements Pub/Sub commands as described in // http://redis.io/topics/pubsub. Message receiving is NOT safe // for concurrent use by multiple goroutines. // @@ -29,8 +30,9 @@ type PubSub struct { cn *pool.Conn channels map[string]struct{} patterns map[string]struct{} - closed bool - exit chan struct{} + + closed bool + exit chan struct{} cmd *Cmd @@ -39,6 +41,12 @@ type PubSub struct { ping chan struct{} } +func (c *PubSub) String() string { + channels := mapKeys(c.channels) + channels = append(channels, mapKeys(c.patterns)...) + return fmt.Sprintf("PubSub(%s)", strings.Join(channels, ", ")) +} + func (c *PubSub) init() { c.exit = make(chan struct{}) } @@ -389,16 +397,39 @@ func (c *PubSub) ReceiveMessage() (*Message, error) { // It periodically sends Ping messages to test connection health. // The channel is closed with PubSub. Receive* APIs can not be used // after channel is created. +// +// If the Go channel is full for 30 seconds the message is dropped. func (c *PubSub) Channel() <-chan *Message { - c.chOnce.Do(c.initChannel) + return c.channel(100) +} + +// ChannelSize is like Channel, but creates a Go channel +// with specified buffer size. +func (c *PubSub) ChannelSize(size int) <-chan *Message { + return c.channel(size) +} + +func (c *PubSub) channel(size int) <-chan *Message { + c.chOnce.Do(func() { + c.initChannel(size) + }) + if cap(c.ch) != size { + err := fmt.Errorf("redis: PubSub.Channel is called with different buffer size") + panic(err) + } return c.ch } -func (c *PubSub) initChannel() { - c.ch = make(chan *Message, 100) - c.ping = make(chan struct{}, 10) +func (c *PubSub) initChannel(size int) { + const timeout = 30 * time.Second + + c.ch = make(chan *Message, size) + c.ping = make(chan struct{}, 1) go func() { + timer := time.NewTimer(timeout) + timer.Stop() + var errCount int for { msg, err := c.Receive() @@ -413,6 +444,7 @@ func (c *PubSub) initChannel() { errCount++ continue } + errCount = 0 // Any message is as good as a ping. @@ -427,16 +459,24 @@ func (c *PubSub) initChannel() { case *Pong: // Ignore. case *Message: - c.ch <- msg + timer.Reset(timeout) + select { + case c.ch <- msg: + if !timer.Stop() { + <-timer.C + } + case <-timer.C: + internal.Logf( + "redis: %s channel is full for %s (message is dropped)", + c, timeout) + } default: - internal.Logf("redis: unknown message: %T", msg) + internal.Logf("redis: unknown message type: %T", msg) } } }() go func() { - const timeout = 5 * time.Second - timer := time.NewTimer(timeout) timer.Stop() diff --git a/vendor/github.com/go-redis/redis/redis.go b/vendor/github.com/go-redis/redis/redis.go index aca30648f5..2a6013c332 100644 --- a/vendor/github.com/go-redis/redis/redis.go +++ b/vendor/github.com/go-redis/redis/redis.go @@ -51,11 +51,10 @@ func (c *baseClient) newConn() (*pool.Conn, error) { return nil, err } - if cn.InitedAt.IsZero() { - if err := c.initConn(cn); err != nil { - _ = c.connPool.CloseConn(cn) - return nil, err - } + err = c.initConn(cn) + if err != nil { + _ = c.connPool.CloseConn(cn) + return nil, err } return cn, nil @@ -85,12 +84,13 @@ func (c *baseClient) _getConn() (*pool.Conn, error) { return nil, err } - if cn.InitedAt.IsZero() { - err := c.initConn(cn) - if err != nil { - c.connPool.Remove(cn) + err = c.initConn(cn) + if err != nil { + c.connPool.Remove(cn, err) + if err := internal.Unwrap(err); err != nil { return nil, err } + return nil, err } return cn, nil @@ -102,7 +102,7 @@ func (c *baseClient) releaseConn(cn *pool.Conn, err error) { } if internal.IsBadConn(err, false) { - c.connPool.Remove(cn) + c.connPool.Remove(cn, err) } else { c.connPool.Put(cn) } @@ -116,12 +116,15 @@ func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) { if err == nil || internal.IsRedisError(err) { c.connPool.Put(cn) } else { - c.connPool.Remove(cn) + c.connPool.Remove(cn, err) } } func (c *baseClient) initConn(cn *pool.Conn) error { - cn.InitedAt = time.Now() + if cn.Inited { + return nil + } + cn.Inited = true if c.opt.Password == "" && c.opt.DB == 0 && @@ -201,9 +204,7 @@ func (c *baseClient) defaultProcess(cmd Cmder) error { return err } - err = cn.WithReader(c.cmdTimeout(cmd), func(rd *proto.Reader) error { - return cmd.readReply(rd) - }) + err = cn.WithReader(c.cmdTimeout(cmd), cmd.readReply) c.releaseConn(cn, err) if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) { continue @@ -237,7 +238,7 @@ func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration { func (c *baseClient) Close() error { var firstErr error if c.onClose != nil { - if err := c.onClose(); err != nil && firstErr == nil { + if err := c.onClose(); err != nil { firstErr = err } } @@ -543,10 +544,12 @@ type Conn struct { } func newConn(opt *Options, cn *pool.Conn) *Conn { + connPool := pool.NewSingleConnPool(nil) + connPool.SetConn(cn) c := Conn{ baseClient: baseClient{ opt: opt, - connPool: pool.NewSingleConnPool(cn), + connPool: connPool, }, } c.baseClient.init() diff --git a/vendor/github.com/go-redis/redis/ring.go b/vendor/github.com/go-redis/redis/ring.go index 250e5f640c..5956b71a5d 100644 --- a/vendor/github.com/go-redis/redis/ring.go +++ b/vendor/github.com/go-redis/redis/ring.go @@ -273,9 +273,13 @@ func (c *ringShards) Heartbeat(frequency time.Duration) { // rebalance removes dead shards from the Ring. func (c *ringShards) rebalance() { + c.mu.RLock() + shards := c.shards + c.mu.RUnlock() + hash := newConsistentHash(c.opt) var shardsNum int - for name, shard := range c.shards { + for name, shard := range shards { if shard.IsUp() { hash.Add(name) shardsNum++ @@ -357,7 +361,8 @@ func NewRing(opt *RingOptions) *Ring { ring.process = ring.defaultProcess ring.processPipeline = ring.defaultProcessPipeline - ring.cmdable.setProcessor(ring.Process) + + ring.init() for name, addr := range opt.Addrs { clopt := opt.clientOptions() @@ -370,6 +375,10 @@ func NewRing(opt *RingOptions) *Ring { return ring } +func (c *Ring) init() { + c.cmdable.setProcessor(c.Process) +} + func (c *Ring) Context() context.Context { if c.ctx != nil { return c.ctx @@ -381,13 +390,15 @@ func (c *Ring) WithContext(ctx context.Context) *Ring { if ctx == nil { panic("nil context") } - c2 := c.copy() + c2 := c.clone() c2.ctx = ctx return c2 } -func (c *Ring) copy() *Ring { +func (c *Ring) clone() *Ring { cp := *c + cp.init() + return &cp } @@ -653,6 +664,39 @@ func (c *Ring) Close() error { return c.shards.Close() } +func (c *Ring) Watch(fn func(*Tx) error, keys ...string) error { + if len(keys) == 0 { + return fmt.Errorf("redis: Watch requires at least one key") + } + + var shards []*ringShard + for _, key := range keys { + if key != "" { + shard, err := c.shards.GetByKey(hashtag.Key(key)) + if err != nil { + return err + } + + shards = append(shards, shard) + } + } + + if len(shards) == 0 { + return fmt.Errorf("redis: Watch requires at least one shard") + } + + if len(shards) > 1 { + for _, shard := range shards[1:] { + if shard.Client != shards[0].Client { + err := fmt.Errorf("redis: Watch requires all keys to be in the same shard") + return err + } + } + } + + return shards[0].Client.Watch(fn, keys...) +} + func newConsistentHash(opt *RingOptions) *consistenthash.Map { return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash)) } diff --git a/vendor/github.com/go-redis/redis/sentinel.go b/vendor/github.com/go-redis/redis/sentinel.go index 7cbb90bdb4..503bbe7b95 100644 --- a/vendor/github.com/go-redis/redis/sentinel.go +++ b/vendor/github.com/go-redis/redis/sentinel.go @@ -90,9 +90,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client { opt: opt, connPool: failover.Pool(), - onClose: func() error { - return failover.Close() - }, + onClose: failover.Close, }, } c.baseClient.init() @@ -182,6 +180,21 @@ func (c *SentinelClient) Reset(pattern string) *IntCmd { return cmd } +// FlushConfig forces Sentinel to rewrite its configuration on disk, including +// the current Sentinel state. +func (c *SentinelClient) FlushConfig() *StatusCmd { + cmd := NewStatusCmd("sentinel", "flushconfig") + c.Process(cmd) + return cmd +} + +// Master shows the state and info of the specified master. +func (c *SentinelClient) Master(name string) *StringStringMapCmd { + cmd := NewStringStringMapCmd("sentinel", "master", name) + c.Process(cmd) + return cmd +} + type sentinelFailover struct { sentinelAddrs []string @@ -232,7 +245,9 @@ func (c *sentinelFailover) MasterAddr() (string, error) { } func (c *sentinelFailover) masterAddr() (string, error) { + c.mu.RLock() addr := c.getMasterAddr() + c.mu.RUnlock() if addr != "" { return addr, nil } @@ -240,6 +255,15 @@ func (c *sentinelFailover) masterAddr() (string, error) { c.mu.Lock() defer c.mu.Unlock() + addr = c.getMasterAddr() + if addr != "" { + return addr, nil + } + + if c.sentinel != nil { + c.closeSentinel() + } + for i, sentinelAddr := range c.sentinelAddrs { sentinel := NewSentinelClient(&Options{ Addr: sentinelAddr, @@ -278,9 +302,7 @@ func (c *sentinelFailover) masterAddr() (string, error) { } func (c *sentinelFailover) getMasterAddr() string { - c.mu.RLock() sentinel := c.sentinel - c.mu.RUnlock() if sentinel == nil { return "" @@ -290,11 +312,6 @@ func (c *sentinelFailover) getMasterAddr() string { if err != nil { internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s", c.masterName, err) - c.mu.Lock() - if c.sentinel == sentinel { - c.closeSentinel() - } - c.mu.Unlock() return "" } @@ -376,8 +393,7 @@ func (c *sentinelFailover) listen(pubsub *PubSub) { break } - switch msg.Channel { - case "+switch-master": + if msg.Channel == "+switch-master" { parts := strings.Split(msg.Payload, " ") if parts[0] != c.masterName { internal.Logf("sentinel: ignore addr for master=%q", parts[0]) diff --git a/vendor/github.com/go-redis/redis/universal.go b/vendor/github.com/go-redis/redis/universal.go index a607562464..03bfa0fad3 100644 --- a/vendor/github.com/go-redis/redis/universal.go +++ b/vendor/github.com/go-redis/redis/universal.go @@ -155,6 +155,7 @@ type UniversalClient interface { Watch(fn func(*Tx) error, keys ...string) error Process(cmd Cmder) error WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) + WrapProcessPipeline(fn func(oldProcess func([]Cmder) error) func([]Cmder) error) Subscribe(channels ...string) *PubSub PSubscribe(channels ...string) *PubSub Close() error diff --git a/vendor/modules.txt b/vendor/modules.txt index d7c342b697..f4ee06f966 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -60,7 +60,7 @@ github.com/go-openapi/jsonreference github.com/go-openapi/spec # github.com/go-openapi/swag v0.19.5 github.com/go-openapi/swag -# github.com/go-redis/redis v6.15.2+incompatible +# github.com/go-redis/redis v6.15.7+incompatible github.com/go-redis/redis github.com/go-redis/redis/internal github.com/go-redis/redis/internal/consistenthash