Update module go-redis/redis to v7 #277

Merged
konrad merged 1 commits from renovate/github.com-go-redis-redis-7.x into master 2020-04-09 04:35:37 +00:00
25 changed files with 500 additions and 1099 deletions

4
go.mod
View File

@ -34,12 +34,12 @@ require (
github.com/garyburd/redigo v1.6.0 // indirect github.com/garyburd/redigo v1.6.0 // indirect
github.com/go-openapi/jsonreference v0.19.3 // indirect github.com/go-openapi/jsonreference v0.19.3 // indirect
github.com/go-openapi/spec v0.19.4 // indirect github.com/go-openapi/spec v0.19.4 // indirect
github.com/go-redis/redis v6.15.7+incompatible github.com/go-redis/redis v6.14.0+incompatible
github.com/go-redis/redis/v7 v7.2.0 // indirect
github.com/go-sql-driver/mysql v1.5.0 github.com/go-sql-driver/mysql v1.5.0
github.com/go-testfixtures/testfixtures/v3 v3.1.1 github.com/go-testfixtures/testfixtures/v3 v3.1.1
github.com/go-xorm/core v0.6.2 // indirect github.com/go-xorm/core v0.6.2 // indirect
github.com/go-xorm/xorm v0.7.9 // indirect github.com/go-xorm/xorm v0.7.9 // indirect
github.com/golang/protobuf v1.3.2 // indirect
github.com/gordonklaus/ineffassign v0.0.0-20200309095847-7953dde2c7bf github.com/gordonklaus/ineffassign v0.0.0-20200309095847-7953dde2c7bf
github.com/imdario/mergo v0.3.9 github.com/imdario/mergo v0.3.9
github.com/jgautheron/goconst v0.0.0-20200227150835-cda7ea3bf591 github.com/jgautheron/goconst v0.0.0-20200227150835-cda7ea3bf591

7
go.sum
View File

@ -117,11 +117,13 @@ github.com/go-openapi/swag v0.17.0/go.mod h1:AByQ+nYG6gQg71GINrmuDXCPWdL640yX49/
github.com/go-openapi/swag v0.19.2/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-openapi/swag v0.19.2/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-openapi/swag v0.19.5 h1:lTz6Ys4CmqqCQmZPBlbQENR1/GucA2bzYTE12Pw4tFY= github.com/go-openapi/swag v0.19.5 h1:lTz6Ys4CmqqCQmZPBlbQENR1/GucA2bzYTE12Pw4tFY=
github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-redis/redis v6.14.0+incompatible h1:AMPZkM7PbsJbilelrJUAyC4xQbGROTOLSuDd7fnMXCI=
github.com/go-redis/redis v6.14.0+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-redis/redis v6.14.0+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDAhzyXg+Bs+0Sb4= github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDAhzyXg+Bs+0Sb4=
github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis v6.15.7+incompatible h1:3skhDh95XQMpnqeqNftPkQD9jL9e5e36z/1SUm6dy1U= github.com/go-redis/redis v6.15.7+incompatible h1:3skhDh95XQMpnqeqNftPkQD9jL9e5e36z/1SUm6dy1U=
github.com/go-redis/redis v6.15.7+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-redis/redis v6.15.7+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v7 v7.2.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
@ -281,9 +283,11 @@ github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FW
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 h1:lDH9UUVJtmYCjyT0CI4q8xvlXPxeZ0gYCVvWbmPlp88= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 h1:lDH9UUVJtmYCjyT0CI4q8xvlXPxeZ0gYCVvWbmPlp88=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
@ -460,6 +464,7 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@ -492,6 +497,7 @@ golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb h1:fgwFCsaw9buMuxNd6+DQfAuSFqbNiQZpcgJQAgJsK6k= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb h1:fgwFCsaw9buMuxNd6+DQfAuSFqbNiQZpcgJQAgJsK6k=
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -549,6 +555,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/d4l3k/messagediff.v1 v1.2.1 h1:70AthpjunwzUiarMHyED52mj9UwtAnE89l1Gmrt3EU0= gopkg.in/d4l3k/messagediff.v1 v1.2.1 h1:70AthpjunwzUiarMHyED52mj9UwtAnE89l1Gmrt3EU0=
gopkg.in/d4l3k/messagediff.v1 v1.2.1/go.mod h1:EUzikiKadqXWcD1AzJLagx0j/BeeWGtn++04Xniyg44= gopkg.in/d4l3k/messagediff.v1 v1.2.1/go.mod h1:EUzikiKadqXWcD1AzJLagx0j/BeeWGtn++04Xniyg44=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=

View File

@ -5,10 +5,10 @@ services:
- redis-server - redis-server
go: go:
- 1.7.x
- 1.8.x
- 1.9.x - 1.9.x
- 1.10.x - 1.10.x
- 1.11.x
- 1.12.x
- tip - tip
matrix: matrix:

View File

@ -1,9 +1,5 @@
# Changelog # Changelog
## Unreleased
- Cluster and Ring pipelines process commands for each node in its own goroutine.
## 6.14 ## 6.14
- Added Options.MinIdleConns. - Added Options.MinIdleConns.

View File

@ -3,8 +3,6 @@ all: testdeps
go test ./... -short -race go test ./... -short -race
env GOOS=linux GOARCH=386 go test ./... env GOOS=linux GOARCH=386 go test ./...
go vet go vet
go get github.com/gordonklaus/ineffassign
ineffassign .
testdeps: testdata/redis/src/redis-server testdeps: testdata/redis/src/redis-server
@ -15,7 +13,7 @@ bench: testdeps
testdata/redis: testdata/redis:
mkdir -p $@ mkdir -p $@
wget -qO- https://github.com/antirez/redis/archive/5.0.tar.gz | tar xvz --strip-components=1 -C $@ wget -qO- https://github.com/antirez/redis/archive/unstable.tar.gz | tar xvz --strip-components=1 -C $@
testdata/redis/src/redis-server: testdata/redis testdata/redis/src/redis-server: testdata/redis
sed -i.bak 's/libjemalloc.a/libjemalloc.a -lrt/g' $</src/Makefile sed -i.bak 's/libjemalloc.a/libjemalloc.a -lrt/g' $</src/Makefile

View File

@ -9,7 +9,7 @@ Supports:
- Redis 3 commands except QUIT, MONITOR, SLOWLOG and SYNC. - Redis 3 commands except QUIT, MONITOR, SLOWLOG and SYNC.
- Automatic connection pooling with [circuit breaker](https://en.wikipedia.org/wiki/Circuit_breaker_design_pattern) support. - Automatic connection pooling with [circuit breaker](https://en.wikipedia.org/wiki/Circuit_breaker_design_pattern) support.
- [Pub/Sub](https://godoc.org/github.com/go-redis/redis#PubSub). - [Pub/Sub](https://godoc.org/github.com/go-redis/redis#PubSub).
- [Transactions](https://godoc.org/github.com/go-redis/redis#example-Client-TxPipeline). - [Transactions](https://godoc.org/github.com/go-redis/redis#Multi).
- [Pipeline](https://godoc.org/github.com/go-redis/redis#example-Client-Pipeline) and [TxPipeline](https://godoc.org/github.com/go-redis/redis#example-Client-TxPipeline). - [Pipeline](https://godoc.org/github.com/go-redis/redis#example-Client-Pipeline) and [TxPipeline](https://godoc.org/github.com/go-redis/redis#example-Client-TxPipeline).
- [Scripting](https://godoc.org/github.com/go-redis/redis#Script). - [Scripting](https://godoc.org/github.com/go-redis/redis#Script).
- [Timeouts](https://godoc.org/github.com/go-redis/redis#Options). - [Timeouts](https://godoc.org/github.com/go-redis/redis#Options).
@ -143,4 +143,4 @@ BenchmarkRedisClusterPing-4 100000 11535 ns/op 117 B/op
- [Golang PostgreSQL ORM](https://github.com/go-pg/pg) - [Golang PostgreSQL ORM](https://github.com/go-pg/pg)
- [Golang msgpack](https://github.com/vmihailenco/msgpack) - [Golang msgpack](https://github.com/vmihailenco/msgpack)
- [Golang message task queue](https://github.com/vmihailenco/taskq) - [Golang message task queue](https://github.com/go-msgqueue/msgqueue)

View File

@ -3,6 +3,7 @@ package redis
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"errors"
"fmt" "fmt"
"math" "math"
"math/rand" "math/rand"
@ -17,6 +18,7 @@ import (
"github.com/go-redis/redis/internal/hashtag" "github.com/go-redis/redis/internal/hashtag"
"github.com/go-redis/redis/internal/pool" "github.com/go-redis/redis/internal/pool"
"github.com/go-redis/redis/internal/proto" "github.com/go-redis/redis/internal/proto"
"github.com/go-redis/redis/internal/singleflight"
) )
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
@ -48,9 +50,6 @@ type ClusterOptions struct {
// and Cluster.ReloadState to manually trigger state reloading. // and Cluster.ReloadState to manually trigger state reloading.
ClusterSlots func() ([]ClusterSlot, error) ClusterSlots func() ([]ClusterSlot, error)
// Optional hook that is called when a new node is created.
OnNewNode func(*Client)
// Following options are copied from Options struct. // Following options are copied from Options struct.
OnConnect func(*Conn) error OnConnect func(*Conn) error
@ -83,7 +82,7 @@ func (opt *ClusterOptions) init() {
opt.MaxRedirects = 8 opt.MaxRedirects = 8
} }
if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil { if opt.RouteByLatency || opt.RouteRandomly {
opt.ReadOnly = true opt.ReadOnly = true
} }
@ -167,10 +166,6 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
go node.updateLatency() go node.updateLatency()
} }
if clOpt.OnNewNode != nil {
clOpt.OnNewNode(node.Client)
}
return &node return &node
} }
@ -242,6 +237,8 @@ type clusterNodes struct {
clusterAddrs []string clusterAddrs []string
closed bool closed bool
nodeCreateGroup singleflight.Group
_generation uint32 // atomic _generation uint32 // atomic
} }
@ -344,6 +341,11 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
return node, nil return node, nil
} }
v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) {
node := newClusterNode(c.opt, addr)
return node, nil
})
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@ -353,13 +355,15 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
node, ok := c.allNodes[addr] node, ok := c.allNodes[addr]
if ok { if ok {
_ = v.(*clusterNode).Close()
return node, err return node, err
} }
node = v.(*clusterNode)
node = newClusterNode(c.opt, addr)
c.allAddrs = appendIfNotExists(c.allAddrs, addr) c.allAddrs = appendIfNotExists(c.allAddrs, addr)
c.clusterAddrs = append(c.clusterAddrs, addr) if err == nil {
c.clusterAddrs = append(c.clusterAddrs, addr)
}
c.allNodes[addr] = node c.allNodes[addr] = node
return node, err return node, err
@ -529,12 +533,10 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
n := rand.Intn(len(nodes)-1) + 1 n := rand.Intn(len(nodes)-1) + 1
slave = nodes[n] slave = nodes[n]
if !slave.Loading() { if !slave.Loading() {
return slave, nil break
} }
} }
return slave, nil
// All slaves are loading - use master.
return nodes[0], nil
} }
} }
@ -578,12 +580,23 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode {
return nil return nil
} }
func (c *clusterState) IsConsistent() bool {
if c.nodes.opt.ClusterSlots != nil {
return true
}
return len(c.Masters) <= len(c.Slaves)
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type clusterStateHolder struct { type clusterStateHolder struct {
load func() (*clusterState, error) load func() (*clusterState, error)
state atomic.Value state atomic.Value
firstErrMu sync.RWMutex
firstErr error
reloading uint32 // atomic reloading uint32 // atomic
} }
@ -594,8 +607,24 @@ func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder
} }
func (c *clusterStateHolder) Reload() (*clusterState, error) { func (c *clusterStateHolder) Reload() (*clusterState, error) {
state, err := c.reload()
if err != nil {
return nil, err
}
if !state.IsConsistent() {
time.AfterFunc(time.Second, c.LazyReload)
}
return state, nil
}
func (c *clusterStateHolder) reload() (*clusterState, error) {
state, err := c.load() state, err := c.load()
if err != nil { if err != nil {
c.firstErrMu.Lock()
if c.firstErr == nil {
c.firstErr = err
}
c.firstErrMu.Unlock()
return nil, err return nil, err
} }
c.state.Store(state) c.state.Store(state)
@ -609,11 +638,16 @@ func (c *clusterStateHolder) LazyReload() {
go func() { go func() {
defer atomic.StoreUint32(&c.reloading, 0) defer atomic.StoreUint32(&c.reloading, 0)
_, err := c.Reload() for {
if err != nil { state, err := c.reload()
return if err != nil {
return
}
time.Sleep(100 * time.Millisecond)
if state.IsConsistent() {
return
}
} }
time.Sleep(100 * time.Millisecond)
}() }()
} }
@ -626,7 +660,15 @@ func (c *clusterStateHolder) Get() (*clusterState, error) {
} }
return state, nil return state, nil
} }
return c.Reload()
c.firstErrMu.RLock()
err := c.firstErr
c.firstErrMu.RUnlock()
if err != nil {
return nil, err
}
return nil, errors.New("redis: cluster has no state")
} }
func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) { func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) {
@ -674,6 +716,10 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
c.processTxPipeline = c.defaultProcessTxPipeline c.processTxPipeline = c.defaultProcessTxPipeline
c.init() c.init()
_, _ = c.state.Reload()
_, _ = c.cmdsInfoCache.Get()
if opt.IdleCheckFrequency > 0 { if opt.IdleCheckFrequency > 0 {
go c.reaper(opt.IdleCheckFrequency) go c.reaper(opt.IdleCheckFrequency)
} }
@ -681,17 +727,17 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
return c return c
} }
func (c *ClusterClient) init() { // ReloadState reloads cluster state. It calls ClusterSlots func
c.cmdable.setProcessor(c.Process)
}
// ReloadState reloads cluster state. If available it calls ClusterSlots func
// to get cluster slots information. // to get cluster slots information.
func (c *ClusterClient) ReloadState() error { func (c *ClusterClient) ReloadState() error {
_, err := c.state.Reload() _, err := c.state.Reload()
return err return err
} }
func (c *ClusterClient) init() {
c.cmdable.setProcessor(c.Process)
}
func (c *ClusterClient) Context() context.Context { func (c *ClusterClient) Context() context.Context {
if c.ctx != nil { if c.ctx != nil {
return c.ctx return c.ctx
@ -703,12 +749,12 @@ func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient {
if ctx == nil { if ctx == nil {
panic("nil context") panic("nil context")
} }
c2 := c.clone() c2 := c.copy()
c2.ctx = ctx c2.ctx = ctx
return c2 return c2
} }
func (c *ClusterClient) clone() *ClusterClient { func (c *ClusterClient) copy() *ClusterClient {
cp := *c cp := *c
cp.init() cp.init()
return &cp return &cp
@ -772,11 +818,6 @@ func cmdSlot(cmd Cmder, pos int) int {
} }
func (c *ClusterClient) cmdSlot(cmd Cmder) int { func (c *ClusterClient) cmdSlot(cmd Cmder) int {
args := cmd.Args()
if args[0] == "cluster" && args[1] == "getkeysinslot" {
return args[2].(int)
}
cmdInfo := c.cmdInfo(cmd.Name()) cmdInfo := c.cmdInfo(cmd.Name())
return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo)) return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
} }
@ -788,9 +829,9 @@ func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) {
} }
cmdInfo := c.cmdInfo(cmd.Name()) cmdInfo := c.cmdInfo(cmd.Name())
slot := c.cmdSlot(cmd) slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly { if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly {
if c.opt.RouteByLatency { if c.opt.RouteByLatency {
node, err := state.slotClosestNode(slot) node, err := state.slotClosestNode(slot)
return slot, node, err return slot, node, err
@ -849,12 +890,15 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
if err == nil { if err == nil {
break break
} }
if err != Nil {
if internal.IsRetryableError(err, true) {
c.state.LazyReload() c.state.LazyReload()
continue
} }
moved, ask, addr := internal.IsMovedError(err) moved, ask, addr := internal.IsMovedError(err)
if moved || ask { if moved || ask {
c.state.LazyReload()
node, err = c.nodes.GetOrCreate(addr) node, err = c.nodes.GetOrCreate(addr)
if err != nil { if err != nil {
return err return err
@ -862,7 +906,7 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
continue continue
} }
if err == pool.ErrClosed || internal.IsReadOnlyError(err) { if err == pool.ErrClosed {
node, err = c.slotMasterNode(slot) node, err = c.slotMasterNode(slot)
if err != nil { if err != nil {
return err return err
@ -870,10 +914,6 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
continue continue
} }
if internal.IsRetryableError(err, true) {
continue
}
return err return err
} }
@ -938,34 +978,16 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
if err == nil { if err == nil {
break break
} }
if err != Nil {
c.state.LazyReload()
}
// If slave is loading - pick another node. // If slave is loading - read from master.
if c.opt.ReadOnly && internal.IsLoadingError(err) { if c.opt.ReadOnly && internal.IsLoadingError(err) {
node.MarkAsLoading() node.MarkAsLoading()
node = nil
continue
}
var moved bool
var addr string
moved, ask, addr = internal.IsMovedError(err)
if moved || ask {
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
break
}
continue
}
if err == pool.ErrClosed || internal.IsReadOnlyError(err) {
node = nil
continue continue
} }
if internal.IsRetryableError(err, true) { if internal.IsRetryableError(err, true) {
c.state.LazyReload()
// First retry the same node. // First retry the same node.
if attempt == 0 { if attempt == 0 {
continue continue
@ -979,6 +1001,24 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
continue continue
} }
var moved bool
var addr string
moved, ask, addr = internal.IsMovedError(err)
if moved || ask {
c.state.LazyReload()
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
break
}
continue
}
if err == pool.ErrClosed {
node = nil
continue
}
break break
} }
@ -1198,12 +1238,10 @@ func (c *ClusterClient) WrapProcessPipeline(
fn func(oldProcess func([]Cmder) error) func([]Cmder) error, fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
) { ) {
c.processPipeline = fn(c.processPipeline) c.processPipeline = fn(c.processPipeline)
c.processTxPipeline = fn(c.processTxPipeline)
} }
func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
cmdsMap := newCmdsMap() cmdsMap, err := c.mapCmdsByNode(cmds)
err := c.mapCmdsByNode(cmds, cmdsMap)
if err != nil { if err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
return err return err
@ -1214,31 +1252,28 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
time.Sleep(c.retryBackoff(attempt)) time.Sleep(c.retryBackoff(attempt))
} }
failedCmds := newCmdsMap() failedCmds := make(map[*clusterNode][]Cmder)
var wg sync.WaitGroup
for node, cmds := range cmdsMap.m { for node, cmds := range cmdsMap {
wg.Add(1) cn, err := node.Client.getConn()
go func(node *clusterNode, cmds []Cmder) { if err != nil {
defer wg.Done() if err == pool.ErrClosed {
c.remapCmds(cmds, failedCmds)
cn, err := node.Client.getConn() } else {
if err != nil { setCmdsErr(cmds, err)
if err == pool.ErrClosed {
c.mapCmdsByNode(cmds, failedCmds)
} else {
setCmdsErr(cmds, err)
}
return
} }
continue
}
err = c.pipelineProcessCmds(node, cn, cmds, failedCmds) err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
node.Client.releaseConnStrict(cn, err) if err == nil || internal.IsRedisError(err) {
}(node, cmds) node.Client.connPool.Put(cn)
} else {
node.Client.connPool.Remove(cn)
}
} }
wg.Wait() if len(failedCmds) == 0 {
if len(failedCmds.m) == 0 {
break break
} }
cmdsMap = failedCmds cmdsMap = failedCmds
@ -1247,24 +1282,14 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
return cmdsFirstErr(cmds) return cmdsFirstErr(cmds)
} }
type cmdsMap struct { func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) {
mu sync.Mutex
m map[*clusterNode][]Cmder
}
func newCmdsMap() *cmdsMap {
return &cmdsMap{
m: make(map[*clusterNode][]Cmder),
}
}
func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error {
state, err := c.state.Get() state, err := c.state.Get()
if err != nil { if err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
return err return nil, err
} }
cmdsMap := make(map[*clusterNode][]Cmder)
cmdsAreReadOnly := c.cmdsAreReadOnly(cmds) cmdsAreReadOnly := c.cmdsAreReadOnly(cmds)
for _, cmd := range cmds { for _, cmd := range cmds {
var node *clusterNode var node *clusterNode
@ -1276,13 +1301,11 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error {
node, err = state.slotMasterNode(slot) node, err = state.slotMasterNode(slot)
} }
if err != nil { if err != nil {
return err return nil, err
} }
cmdsMap.mu.Lock() cmdsMap[node] = append(cmdsMap[node], cmd)
cmdsMap.m[node] = append(cmdsMap.m[node], cmd)
cmdsMap.mu.Unlock()
} }
return nil return cmdsMap, nil
} }
func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool { func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
@ -1295,30 +1318,39 @@ func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
return true return true
} }
func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) {
remappedCmds, err := c.mapCmdsByNode(cmds)
if err != nil {
setCmdsErr(cmds, err)
return
}
for node, cmds := range remappedCmds {
failedCmds[node] = cmds
}
}
func (c *ClusterClient) pipelineProcessCmds( func (c *ClusterClient) pipelineProcessCmds(
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error { ) error {
err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmd(wr, cmds...) return writeCmd(wr, cmds...)
}) })
if err != nil { if err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
failedCmds.mu.Lock() failedCmds[node] = cmds
failedCmds.m[node] = cmds
failedCmds.mu.Unlock()
return err return err
} }
err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
return c.pipelineReadCmds(node, rd, cmds, failedCmds) return c.pipelineReadCmds(rd, cmds, failedCmds)
}) })
return err return err
} }
func (c *ClusterClient) pipelineReadCmds( func (c *ClusterClient) pipelineReadCmds(
node *clusterNode, rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap, rd *proto.Reader, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error { ) error {
var firstErr error
for _, cmd := range cmds { for _, cmd := range cmds {
err := cmd.readReply(rd) err := cmd.readReply(rd)
if err == nil { if err == nil {
@ -1333,18 +1365,13 @@ func (c *ClusterClient) pipelineReadCmds(
continue continue
} }
failedCmds.mu.Lock() return err
failedCmds.m[node] = append(failedCmds.m[node], cmd)
failedCmds.mu.Unlock()
if firstErr == nil {
firstErr = err
}
} }
return firstErr return nil
} }
func (c *ClusterClient) checkMovedErr( func (c *ClusterClient) checkMovedErr(
cmd Cmder, err error, failedCmds *cmdsMap, cmd Cmder, err error, failedCmds map[*clusterNode][]Cmder,
) bool { ) bool {
moved, ask, addr := internal.IsMovedError(err) moved, ask, addr := internal.IsMovedError(err)
@ -1356,9 +1383,7 @@ func (c *ClusterClient) checkMovedErr(
return false return false
} }
failedCmds.mu.Lock() failedCmds[node] = append(failedCmds[node], cmd)
failedCmds.m[node] = append(failedCmds.m[node], cmd)
failedCmds.mu.Unlock()
return true return true
} }
@ -1368,9 +1393,7 @@ func (c *ClusterClient) checkMovedErr(
return false return false
} }
failedCmds.mu.Lock() failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd)
failedCmds.m[node] = append(failedCmds.m[node], NewCmd("ASKING"), cmd)
failedCmds.mu.Unlock()
return true return true
} }
@ -1410,34 +1433,31 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
time.Sleep(c.retryBackoff(attempt)) time.Sleep(c.retryBackoff(attempt))
} }
failedCmds := newCmdsMap() failedCmds := make(map[*clusterNode][]Cmder)
var wg sync.WaitGroup
for node, cmds := range cmdsMap { for node, cmds := range cmdsMap {
wg.Add(1) cn, err := node.Client.getConn()
go func(node *clusterNode, cmds []Cmder) { if err != nil {
defer wg.Done() if err == pool.ErrClosed {
c.remapCmds(cmds, failedCmds)
cn, err := node.Client.getConn() } else {
if err != nil { setCmdsErr(cmds, err)
if err == pool.ErrClosed {
c.mapCmdsByNode(cmds, failedCmds)
} else {
setCmdsErr(cmds, err)
}
return
} }
continue
}
err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds) err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
node.Client.releaseConnStrict(cn, err) if err == nil || internal.IsRedisError(err) {
}(node, cmds) node.Client.connPool.Put(cn)
} else {
node.Client.connPool.Remove(cn)
}
} }
wg.Wait() if len(failedCmds) == 0 {
if len(failedCmds.m) == 0 {
break break
} }
cmdsMap = failedCmds.m cmdsMap = failedCmds
} }
} }
@ -1454,16 +1474,14 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
} }
func (c *ClusterClient) txPipelineProcessCmds( func (c *ClusterClient) txPipelineProcessCmds(
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error { ) error {
err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
return txPipelineWriteMulti(wr, cmds) return txPipelineWriteMulti(wr, cmds)
}) })
if err != nil { if err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
failedCmds.mu.Lock() failedCmds[node] = cmds
failedCmds.m[node] = cmds
failedCmds.mu.Unlock()
return err return err
} }
@ -1479,7 +1497,7 @@ func (c *ClusterClient) txPipelineProcessCmds(
} }
func (c *ClusterClient) txPipelineReadQueued( func (c *ClusterClient) txPipelineReadQueued(
rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap, rd *proto.Reader, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error { ) error {
// Parse queued replies. // Parse queued replies.
var statusCmd StatusCmd var statusCmd StatusCmd
@ -1528,51 +1546,40 @@ func (c *ClusterClient) txPipelineReadQueued(
return nil return nil
} }
func (c *ClusterClient) pubSub() *PubSub { func (c *ClusterClient) pubSub(channels []string) *PubSub {
var node *clusterNode var node *clusterNode
pubsub := &PubSub{ pubsub := &PubSub{
opt: c.opt.clientOptions(), opt: c.opt.clientOptions(),
newConn: func(channels []string) (*pool.Conn, error) { newConn: func(channels []string) (*pool.Conn, error) {
if node != nil { if node == nil {
panic("node != nil") var slot int
} if len(channels) > 0 {
slot = hashtag.Slot(channels[0])
} else {
slot = -1
}
var err error masterNode, err := c.slotMasterNode(slot)
if len(channels) > 0 { if err != nil {
slot := hashtag.Slot(channels[0]) return nil, err
node, err = c.slotMasterNode(slot) }
} else { node = masterNode
node, err = c.nodes.Random()
} }
if err != nil { return node.Client.newConn()
return nil, err
}
cn, err := node.Client.newConn()
if err != nil {
node = nil
return nil, err
}
return cn, nil
}, },
closeConn: func(cn *pool.Conn) error { closeConn: func(cn *pool.Conn) error {
err := node.Client.connPool.CloseConn(cn) return node.Client.connPool.CloseConn(cn)
node = nil
return err
}, },
} }
pubsub.init() pubsub.init()
return pubsub return pubsub
} }
// Subscribe subscribes the client to the specified channels. // Subscribe subscribes the client to the specified channels.
// Channels can be omitted to create empty subscription. // Channels can be omitted to create empty subscription.
func (c *ClusterClient) Subscribe(channels ...string) *PubSub { func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
pubsub := c.pubSub() pubsub := c.pubSub(channels)
if len(channels) > 0 { if len(channels) > 0 {
_ = pubsub.Subscribe(channels...) _ = pubsub.Subscribe(channels...)
} }
@ -1582,7 +1589,7 @@ func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
// PSubscribe subscribes the client to the given patterns. // PSubscribe subscribes the client to the given patterns.
// Patterns can be omitted to create empty subscription. // Patterns can be omitted to create empty subscription.
func (c *ClusterClient) PSubscribe(channels ...string) *PubSub { func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
pubsub := c.pubSub() pubsub := c.pubSub(channels)
if len(channels) > 0 { if len(channels) > 0 {
_ = pubsub.PSubscribe(channels...) _ = pubsub.PSubscribe(channels...)
} }

View File

@ -183,7 +183,7 @@ func (cmd *Cmd) Int() (int, error) {
case string: case string:
return strconv.Atoi(val) return strconv.Atoi(val)
default: default:
err := fmt.Errorf("redis: unexpected type=%T for Int", val) err := fmt.Errorf("redis: unexpected type=%T for Int64", val)
return 0, err return 0, err
} }
} }
@ -218,25 +218,6 @@ func (cmd *Cmd) Uint64() (uint64, error) {
} }
} }
func (cmd *Cmd) Float32() (float32, error) {
if cmd.err != nil {
return 0, cmd.err
}
switch val := cmd.val.(type) {
case int64:
return float32(val), nil
case string:
f, err := strconv.ParseFloat(val, 32)
if err != nil {
return 0, err
}
return float32(f), nil
default:
err := fmt.Errorf("redis: unexpected type=%T for Float32", val)
return 0, err
}
}
func (cmd *Cmd) Float64() (float64, error) { func (cmd *Cmd) Float64() (float64, error) {
if cmd.err != nil { if cmd.err != nil {
return 0, cmd.err return 0, cmd.err
@ -604,17 +585,6 @@ func (cmd *StringCmd) Uint64() (uint64, error) {
return strconv.ParseUint(cmd.Val(), 10, 64) return strconv.ParseUint(cmd.Val(), 10, 64)
} }
func (cmd *StringCmd) Float32() (float32, error) {
if cmd.err != nil {
return 0, cmd.err
}
f, err := strconv.ParseFloat(cmd.Val(), 32)
if err != nil {
return 0, err
}
return float32(f), nil
}
func (cmd *StringCmd) Float64() (float64, error) { func (cmd *StringCmd) Float64() (float64, error) {
if cmd.err != nil { if cmd.err != nil {
return 0, cmd.err return 0, cmd.err
@ -717,12 +687,12 @@ func (cmd *StringSliceCmd) readReply(rd *proto.Reader) error {
func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) { func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
ss := make([]string, 0, n) ss := make([]string, 0, n)
for i := int64(0); i < n; i++ { for i := int64(0); i < n; i++ {
switch s, err := rd.ReadString(); { s, err := rd.ReadString()
case err == Nil: if err == Nil {
ss = append(ss, "") ss = append(ss, "")
case err != nil: } else if err != nil {
return nil, err return nil, err
default: } else {
ss = append(ss, s) ss = append(ss, s)
} }
} }
@ -999,20 +969,14 @@ func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
return nil, err return nil, err
} }
var values map[string]interface{}
v, err := rd.ReadArrayReply(stringInterfaceMapParser) v, err := rd.ReadArrayReply(stringInterfaceMapParser)
if err != nil { if err != nil {
if err != proto.Nil { return nil, err
return nil, err
}
} else {
values = v.(map[string]interface{})
} }
msgs = append(msgs, XMessage{ msgs = append(msgs, XMessage{
ID: id, ID: id,
Values: values, Values: v.(map[string]interface{}),
}) })
return nil, nil return nil, nil
}) })
@ -1373,68 +1337,6 @@ func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type ZWithKeyCmd struct {
baseCmd
val ZWithKey
}
var _ Cmder = (*ZWithKeyCmd)(nil)
func NewZWithKeyCmd(args ...interface{}) *ZWithKeyCmd {
return &ZWithKeyCmd{
baseCmd: baseCmd{_args: args},
}
}
func (cmd *ZWithKeyCmd) Val() ZWithKey {
return cmd.val
}
func (cmd *ZWithKeyCmd) Result() (ZWithKey, error) {
return cmd.Val(), cmd.Err()
}
func (cmd *ZWithKeyCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *ZWithKeyCmd) readReply(rd *proto.Reader) error {
var v interface{}
v, cmd.err = rd.ReadArrayReply(zWithKeyParser)
if cmd.err != nil {
return cmd.err
}
cmd.val = v.(ZWithKey)
return nil
}
// Implements proto.MultiBulkParse
func zWithKeyParser(rd *proto.Reader, n int64) (interface{}, error) {
if n != 3 {
return nil, fmt.Errorf("got %d elements, expected 3", n)
}
var z ZWithKey
var err error
z.Key, err = rd.ReadString()
if err != nil {
return nil, err
}
z.Member, err = rd.ReadString()
if err != nil {
return nil, err
}
z.Score, err = rd.ReadFloatReply()
if err != nil {
return nil, err
}
return z, nil
}
//------------------------------------------------------------------------------
type ScanCmd struct { type ScanCmd struct {
baseCmd baseCmd

View File

@ -8,6 +8,13 @@ import (
"github.com/go-redis/redis/internal" "github.com/go-redis/redis/internal"
) )
func readTimeout(timeout time.Duration) time.Duration {
if timeout == 0 {
return 0
}
return timeout + 10*time.Second
}
func usePrecise(dur time.Duration) bool { func usePrecise(dur time.Duration) bool {
return dur < time.Second || dur%time.Second != 0 return dur < time.Second || dur%time.Second != 0
} }
@ -166,7 +173,6 @@ type Cmdable interface {
SUnion(keys ...string) *StringSliceCmd SUnion(keys ...string) *StringSliceCmd
SUnionStore(destination string, keys ...string) *IntCmd SUnionStore(destination string, keys ...string) *IntCmd
XAdd(a *XAddArgs) *StringCmd XAdd(a *XAddArgs) *StringCmd
XDel(stream string, ids ...string) *IntCmd
XLen(stream string) *IntCmd XLen(stream string) *IntCmd
XRange(stream, start, stop string) *XMessageSliceCmd XRange(stream, start, stop string) *XMessageSliceCmd
XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd
@ -175,7 +181,6 @@ type Cmdable interface {
XRead(a *XReadArgs) *XStreamSliceCmd XRead(a *XReadArgs) *XStreamSliceCmd
XReadStreams(streams ...string) *XStreamSliceCmd XReadStreams(streams ...string) *XStreamSliceCmd
XGroupCreate(stream, group, start string) *StatusCmd XGroupCreate(stream, group, start string) *StatusCmd
XGroupCreateMkStream(stream, group, start string) *StatusCmd
XGroupSetID(stream, group, start string) *StatusCmd XGroupSetID(stream, group, start string) *StatusCmd
XGroupDestroy(stream, group string) *IntCmd XGroupDestroy(stream, group string) *IntCmd
XGroupDelConsumer(stream, group, consumer string) *IntCmd XGroupDelConsumer(stream, group, consumer string) *IntCmd
@ -187,8 +192,6 @@ type Cmdable interface {
XClaimJustID(a *XClaimArgs) *StringSliceCmd XClaimJustID(a *XClaimArgs) *StringSliceCmd
XTrim(key string, maxLen int64) *IntCmd XTrim(key string, maxLen int64) *IntCmd
XTrimApprox(key string, maxLen int64) *IntCmd XTrimApprox(key string, maxLen int64) *IntCmd
BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd
BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd
ZAdd(key string, members ...Z) *IntCmd ZAdd(key string, members ...Z) *IntCmd
ZAddNX(key string, members ...Z) *IntCmd ZAddNX(key string, members ...Z) *IntCmd
ZAddXX(key string, members ...Z) *IntCmd ZAddXX(key string, members ...Z) *IntCmd
@ -203,8 +206,6 @@ type Cmdable interface {
ZLexCount(key, min, max string) *IntCmd ZLexCount(key, min, max string) *IntCmd
ZIncrBy(key string, increment float64, member string) *FloatCmd ZIncrBy(key string, increment float64, member string) *FloatCmd
ZInterStore(destination string, store ZStore, keys ...string) *IntCmd ZInterStore(destination string, store ZStore, keys ...string) *IntCmd
ZPopMax(key string, count ...int64) *ZSliceCmd
ZPopMin(key string, count ...int64) *ZSliceCmd
ZRange(key string, start, stop int64) *StringSliceCmd ZRange(key string, start, stop int64) *StringSliceCmd
ZRangeWithScores(key string, start, stop int64) *ZSliceCmd ZRangeWithScores(key string, start, stop int64) *ZSliceCmd
ZRangeByScore(key string, opt ZRangeBy) *StringSliceCmd ZRangeByScore(key string, opt ZRangeBy) *StringSliceCmd
@ -232,7 +233,6 @@ type Cmdable interface {
ClientKillByFilter(keys ...string) *IntCmd ClientKillByFilter(keys ...string) *IntCmd
ClientList() *StringCmd ClientList() *StringCmd
ClientPause(dur time.Duration) *BoolCmd ClientPause(dur time.Duration) *BoolCmd
ClientID() *IntCmd
ConfigGet(parameter string) *SliceCmd ConfigGet(parameter string) *SliceCmd
ConfigResetStat() *StatusCmd ConfigResetStat() *StatusCmd
ConfigSet(parameter, value string) *StatusCmd ConfigSet(parameter, value string) *StatusCmd
@ -270,7 +270,6 @@ type Cmdable interface {
ClusterResetHard() *StatusCmd ClusterResetHard() *StatusCmd
ClusterInfo() *StringCmd ClusterInfo() *StringCmd
ClusterKeySlot(key string) *IntCmd ClusterKeySlot(key string) *IntCmd
ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd
ClusterCountFailureReports(nodeID string) *IntCmd ClusterCountFailureReports(nodeID string) *IntCmd
ClusterCountKeysInSlot(slot int) *IntCmd ClusterCountKeysInSlot(slot int) *IntCmd
ClusterDelSlots(slots ...int) *StatusCmd ClusterDelSlots(slots ...int) *StatusCmd
@ -1343,16 +1342,6 @@ func (c *cmdable) XAdd(a *XAddArgs) *StringCmd {
return cmd return cmd
} }
func (c *cmdable) XDel(stream string, ids ...string) *IntCmd {
args := []interface{}{"xdel", stream}
for _, id := range ids {
args = append(args, id)
}
cmd := NewIntCmd(args...)
c.process(cmd)
return cmd
}
func (c *cmdable) XLen(stream string) *IntCmd { func (c *cmdable) XLen(stream string) *IntCmd {
cmd := NewIntCmd("xlen", stream) cmd := NewIntCmd("xlen", stream)
c.process(cmd) c.process(cmd)
@ -1406,9 +1395,6 @@ func (c *cmdable) XRead(a *XReadArgs) *XStreamSliceCmd {
} }
cmd := NewXStreamSliceCmd(args...) cmd := NewXStreamSliceCmd(args...)
if a.Block >= 0 {
cmd.setReadTimeout(a.Block)
}
c.process(cmd) c.process(cmd)
return cmd return cmd
} }
@ -1426,12 +1412,6 @@ func (c *cmdable) XGroupCreate(stream, group, start string) *StatusCmd {
return cmd return cmd
} }
func (c *cmdable) XGroupCreateMkStream(stream, group, start string) *StatusCmd {
cmd := NewStatusCmd("xgroup", "create", stream, group, start, "mkstream")
c.process(cmd)
return cmd
}
func (c *cmdable) XGroupSetID(stream, group, start string) *StatusCmd { func (c *cmdable) XGroupSetID(stream, group, start string) *StatusCmd {
cmd := NewStatusCmd("xgroup", "setid", stream, group, start) cmd := NewStatusCmd("xgroup", "setid", stream, group, start)
c.process(cmd) c.process(cmd)
@ -1453,11 +1433,9 @@ func (c *cmdable) XGroupDelConsumer(stream, group, consumer string) *IntCmd {
type XReadGroupArgs struct { type XReadGroupArgs struct {
Group string Group string
Consumer string Consumer string
// List of streams and ids. Streams []string
Streams []string Count int64
Count int64 Block time.Duration
Block time.Duration
NoAck bool
} }
func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd { func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd {
@ -1469,18 +1447,12 @@ func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd {
if a.Block >= 0 { if a.Block >= 0 {
args = append(args, "block", int64(a.Block/time.Millisecond)) args = append(args, "block", int64(a.Block/time.Millisecond))
} }
if a.NoAck {
args = append(args, "noack")
}
args = append(args, "streams") args = append(args, "streams")
for _, s := range a.Streams { for _, s := range a.Streams {
args = append(args, s) args = append(args, s)
} }
cmd := NewXStreamSliceCmd(args...) cmd := NewXStreamSliceCmd(args...)
if a.Block >= 0 {
cmd.setReadTimeout(a.Block)
}
c.process(cmd) c.process(cmd)
return cmd return cmd
} }
@ -1577,12 +1549,6 @@ type Z struct {
Member interface{} Member interface{}
} }
// ZWithKey represents sorted set member including the name of the key where it was popped.
type ZWithKey struct {
Z
Key string
}
// ZStore is used as an arg to ZInterStore and ZUnionStore. // ZStore is used as an arg to ZInterStore and ZUnionStore.
type ZStore struct { type ZStore struct {
Weights []float64 Weights []float64
@ -1590,34 +1556,6 @@ type ZStore struct {
Aggregate string Aggregate string
} }
// Redis `BZPOPMAX key [key ...] timeout` command.
func (c *cmdable) BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd {
args := make([]interface{}, 1+len(keys)+1)
args[0] = "bzpopmax"
for i, key := range keys {
args[1+i] = key
}
args[len(args)-1] = formatSec(timeout)
cmd := NewZWithKeyCmd(args...)
cmd.setReadTimeout(timeout)
c.process(cmd)
return cmd
}
// Redis `BZPOPMIN key [key ...] timeout` command.
func (c *cmdable) BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd {
args := make([]interface{}, 1+len(keys)+1)
args[0] = "bzpopmin"
for i, key := range keys {
args[1+i] = key
}
args[len(args)-1] = formatSec(timeout)
cmd := NewZWithKeyCmd(args...)
cmd.setReadTimeout(timeout)
c.process(cmd)
return cmd
}
func (c *cmdable) zAdd(a []interface{}, n int, members ...Z) *IntCmd { func (c *cmdable) zAdd(a []interface{}, n int, members ...Z) *IntCmd {
for i, m := range members { for i, m := range members {
a[n+2*i] = m.Score a[n+2*i] = m.Score
@ -1756,46 +1694,6 @@ func (c *cmdable) ZInterStore(destination string, store ZStore, keys ...string)
return cmd return cmd
} }
func (c *cmdable) ZPopMax(key string, count ...int64) *ZSliceCmd {
args := []interface{}{
"zpopmax",
key,
}
switch len(count) {
case 0:
break
case 1:
args = append(args, count[0])
default:
panic("too many arguments")
}
cmd := NewZSliceCmd(args...)
c.process(cmd)
return cmd
}
func (c *cmdable) ZPopMin(key string, count ...int64) *ZSliceCmd {
args := []interface{}{
"zpopmin",
key,
}
switch len(count) {
case 0:
break
case 1:
args = append(args, count[0])
default:
panic("too many arguments")
}
cmd := NewZSliceCmd(args...)
c.process(cmd)
return cmd
}
func (c *cmdable) zRange(key string, start, stop int64, withScores bool) *StringSliceCmd { func (c *cmdable) zRange(key string, start, stop int64, withScores bool) *StringSliceCmd {
args := []interface{}{ args := []interface{}{
"zrange", "zrange",
@ -2071,24 +1969,6 @@ func (c *cmdable) ClientPause(dur time.Duration) *BoolCmd {
return cmd return cmd
} }
func (c *cmdable) ClientID() *IntCmd {
cmd := NewIntCmd("client", "id")
c.process(cmd)
return cmd
}
func (c *cmdable) ClientUnblock(id int64) *IntCmd {
cmd := NewIntCmd("client", "unblock", id)
c.process(cmd)
return cmd
}
func (c *cmdable) ClientUnblockWithError(id int64) *IntCmd {
cmd := NewIntCmd("client", "unblock", id, "error")
c.process(cmd)
return cmd
}
// ClientSetName assigns a name to the connection. // ClientSetName assigns a name to the connection.
func (c *statefulCmdable) ClientSetName(name string) *BoolCmd { func (c *statefulCmdable) ClientSetName(name string) *BoolCmd {
cmd := NewBoolCmd("client", "setname", name) cmd := NewBoolCmd("client", "setname", name)
@ -2404,12 +2284,6 @@ func (c *cmdable) ClusterKeySlot(key string) *IntCmd {
return cmd return cmd
} }
func (c *cmdable) ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd {
cmd := NewStringSliceCmd("cluster", "getkeysinslot", slot, count)
c.process(cmd)
return cmd
}
func (c *cmdable) ClusterCountFailureReports(nodeID string) *IntCmd { func (c *cmdable) ClusterCountFailureReports(nodeID string) *IntCmd {
cmd := NewIntCmd("cluster", "count-failure-reports", nodeID) cmd := NewIntCmd("cluster", "count-failure-reports", nodeID)
c.process(cmd) c.process(cmd)

View File

@ -9,9 +9,6 @@ import (
) )
func IsRetryableError(err error, retryTimeout bool) bool { func IsRetryableError(err error, retryTimeout bool) bool {
if err == nil {
return false
}
if err == io.EOF { if err == io.EOF {
return true return true
} }
@ -47,8 +44,7 @@ func IsBadConn(err error, allowTimeout bool) bool {
return false return false
} }
if IsRedisError(err) { if IsRedisError(err) {
// #790 return strings.HasPrefix(err.Error(), "READONLY ")
return IsReadOnlyError(err)
} }
if allowTimeout { if allowTimeout {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() { if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
@ -83,7 +79,3 @@ func IsMovedError(err error) (moved bool, ask bool, addr string) {
func IsLoadingError(err error) bool { func IsLoadingError(err error) bool {
return strings.HasPrefix(err.Error(), "LOADING ") return strings.HasPrefix(err.Error(), "LOADING ")
} }
func IsReadOnlyError(err error) bool {
return strings.HasPrefix(err.Error(), "READONLY ")
}

View File

@ -17,16 +17,14 @@ type Conn struct {
rdLocked bool rdLocked bool
wr *proto.Writer wr *proto.Writer
Inited bool InitedAt time.Time
pooled bool pooled bool
createdAt time.Time usedAt atomic.Value
usedAt atomic.Value
} }
func NewConn(netConn net.Conn) *Conn { func NewConn(netConn net.Conn) *Conn {
cn := &Conn{ cn := &Conn{
netConn: netConn, netConn: netConn,
createdAt: time.Now(),
} }
cn.rd = proto.NewReader(netConn) cn.rd = proto.NewReader(netConn)
cn.wr = proto.NewWriter(netConn) cn.wr = proto.NewWriter(netConn)

View File

@ -38,7 +38,7 @@ type Pooler interface {
Get() (*Conn, error) Get() (*Conn, error)
Put(*Conn) Put(*Conn)
Remove(*Conn, error) Remove(*Conn)
Len() int Len() int
IdleLen() int IdleLen() int
@ -289,7 +289,7 @@ func (p *ConnPool) popIdle() *Conn {
func (p *ConnPool) Put(cn *Conn) { func (p *ConnPool) Put(cn *Conn) {
if !cn.pooled { if !cn.pooled {
p.Remove(cn, nil) p.Remove(cn)
return return
} }
@ -300,7 +300,7 @@ func (p *ConnPool) Put(cn *Conn) {
p.freeTurn() p.freeTurn()
} }
func (p *ConnPool) Remove(cn *Conn, reason error) { func (p *ConnPool) Remove(cn *Conn) {
p.removeConn(cn) p.removeConn(cn)
p.freeTurn() p.freeTurn()
_ = p.closeConn(cn) _ = p.closeConn(cn)
@ -468,7 +468,7 @@ func (p *ConnPool) isStaleConn(cn *Conn) bool {
if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout { if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
return true return true
} }
if p.opt.MaxConnAge > 0 && now.Sub(cn.createdAt) >= p.opt.MaxConnAge { if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge {
return true return true
} }

View File

@ -1,203 +1,53 @@
package pool package pool
import (
"fmt"
"sync/atomic"
)
const (
stateDefault = 0
stateInited = 1
stateClosed = 2
)
type BadConnError struct {
wrapped error
}
var _ error = (*BadConnError)(nil)
func (e BadConnError) Error() string {
return "pg: Conn is in a bad state"
}
func (e BadConnError) Unwrap() error {
return e.wrapped
}
type SingleConnPool struct { type SingleConnPool struct {
pool Pooler cn *Conn
level int32 // atomic
state uint32 // atomic
ch chan *Conn
_badConnError atomic.Value
} }
var _ Pooler = (*SingleConnPool)(nil) var _ Pooler = (*SingleConnPool)(nil)
func NewSingleConnPool(pool Pooler) *SingleConnPool { func NewSingleConnPool(cn *Conn) *SingleConnPool {
p, ok := pool.(*SingleConnPool) return &SingleConnPool{
if !ok { cn: cn,
p = &SingleConnPool{
pool: pool,
ch: make(chan *Conn, 1),
}
}
atomic.AddInt32(&p.level, 1)
return p
}
func (p *SingleConnPool) SetConn(cn *Conn) {
if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
p.ch <- cn
} else {
panic("not reached")
} }
} }
func (p *SingleConnPool) NewConn() (*Conn, error) { func (p *SingleConnPool) NewConn() (*Conn, error) {
return p.pool.NewConn() panic("not implemented")
} }
func (p *SingleConnPool) CloseConn(cn *Conn) error { func (p *SingleConnPool) CloseConn(*Conn) error {
return p.pool.CloseConn(cn) panic("not implemented")
} }
func (p *SingleConnPool) Get() (*Conn, error) { func (p *SingleConnPool) Get() (*Conn, error) {
// In worst case this races with Close which is not a very common operation. return p.cn, nil
for i := 0; i < 1000; i++ {
switch atomic.LoadUint32(&p.state) {
case stateDefault:
cn, err := p.pool.Get()
if err != nil {
return nil, err
}
if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
return cn, nil
}
p.pool.Remove(cn, ErrClosed)
case stateInited:
if err := p.badConnError(); err != nil {
return nil, err
}
cn, ok := <-p.ch
if !ok {
return nil, ErrClosed
}
return cn, nil
case stateClosed:
return nil, ErrClosed
default:
panic("not reached")
}
}
return nil, fmt.Errorf("pg: SingleConnPool.Get: infinite loop")
} }
func (p *SingleConnPool) Put(cn *Conn) { func (p *SingleConnPool) Put(cn *Conn) {
defer func() { if p.cn != cn {
if recover() != nil { panic("p.cn != cn")
p.freeConn(cn)
}
}()
p.ch <- cn
}
func (p *SingleConnPool) freeConn(cn *Conn) {
if err := p.badConnError(); err != nil {
p.pool.Remove(cn, err)
} else {
p.pool.Put(cn)
} }
} }
func (p *SingleConnPool) Remove(cn *Conn, reason error) { func (p *SingleConnPool) Remove(cn *Conn) {
defer func() { if p.cn != cn {
if recover() != nil { panic("p.cn != cn")
p.pool.Remove(cn, ErrClosed) }
}
}()
p._badConnError.Store(BadConnError{wrapped: reason})
p.ch <- cn
} }
func (p *SingleConnPool) Len() int { func (p *SingleConnPool) Len() int {
switch atomic.LoadUint32(&p.state) { return 1
case stateDefault:
return 0
case stateInited:
return 1
case stateClosed:
return 0
default:
panic("not reached")
}
} }
func (p *SingleConnPool) IdleLen() int { func (p *SingleConnPool) IdleLen() int {
return len(p.ch) return 0
} }
func (p *SingleConnPool) Stats() *Stats { func (p *SingleConnPool) Stats() *Stats {
return &Stats{} return nil
} }
func (p *SingleConnPool) Close() error { func (p *SingleConnPool) Close() error {
level := atomic.AddInt32(&p.level, -1)
if level > 0 {
return nil
}
for i := 0; i < 1000; i++ {
state := atomic.LoadUint32(&p.state)
if state == stateClosed {
return ErrClosed
}
if atomic.CompareAndSwapUint32(&p.state, state, stateClosed) {
close(p.ch)
cn, ok := <-p.ch
if ok {
p.freeConn(cn)
}
return nil
}
}
return fmt.Errorf("pg: SingleConnPool.Close: infinite loop")
}
func (p *SingleConnPool) Reset() error {
if p.badConnError() == nil {
return nil
}
select {
case cn, ok := <-p.ch:
if !ok {
return ErrClosed
}
p.pool.Remove(cn, ErrClosed)
p._badConnError.Store(BadConnError{wrapped: nil})
default:
return fmt.Errorf("pg: SingleConnPool does not have a Conn")
}
if !atomic.CompareAndSwapUint32(&p.state, stateInited, stateDefault) {
state := atomic.LoadUint32(&p.state)
return fmt.Errorf("pg: invalid SingleConnPool state: %d", state)
}
return nil
}
func (p *SingleConnPool) badConnError() error {
if v := p._badConnError.Load(); v != nil {
err := v.(BadConnError)
if err.wrapped != nil {
return err
}
}
return nil return nil
} }

View File

@ -55,13 +55,13 @@ func (p *StickyConnPool) putUpstream() {
func (p *StickyConnPool) Put(cn *Conn) {} func (p *StickyConnPool) Put(cn *Conn) {}
func (p *StickyConnPool) removeUpstream(reason error) { func (p *StickyConnPool) removeUpstream() {
p.pool.Remove(p.cn, reason) p.pool.Remove(p.cn)
p.cn = nil p.cn = nil
} }
func (p *StickyConnPool) Remove(cn *Conn, reason error) { func (p *StickyConnPool) Remove(cn *Conn) {
p.removeUpstream(reason) p.removeUpstream()
} }
func (p *StickyConnPool) Len() int { func (p *StickyConnPool) Len() int {
@ -101,7 +101,7 @@ func (p *StickyConnPool) Close() error {
if p.reusable { if p.reusable {
p.putUpstream() p.putUpstream()
} else { } else {
p.removeUpstream(ErrClosed) p.removeUpstream()
} }
} }

View File

@ -0,0 +1,64 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package singleflight provides a duplicate function call suppression
// mechanism.
package singleflight
import "sync"
// call is an in-flight or completed Do call
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
return c.val, c.err
}

View File

@ -27,13 +27,3 @@ func isLower(s string) bool {
} }
return true return true
} }
func Unwrap(err error) error {
u, ok := err.(interface {
Unwrap() error
})
if !ok {
return nil
}
return u.Unwrap()
}

View File

@ -14,17 +14,6 @@ import (
"github.com/go-redis/redis/internal/pool" "github.com/go-redis/redis/internal/pool"
) )
// Limiter is the interface of a rate limiter or a circuit breaker.
type Limiter interface {
// Allow returns a nil if operation is allowed or an error otherwise.
// If operation is allowed client must report the result of operation
// whether is a success or a failure.
Allow() error
// ReportResult reports the result of previously allowed operation.
// nil indicates a success, non-nil error indicates a failure.
ReportResult(result error)
}
type Options struct { type Options struct {
// The network type, either tcp or unix. // The network type, either tcp or unix.
// Default is tcp. // Default is tcp.
@ -59,7 +48,7 @@ type Options struct {
// Default is 5 seconds. // Default is 5 seconds.
DialTimeout time.Duration DialTimeout time.Duration
// Timeout for socket reads. If reached, commands will fail // Timeout for socket reads. If reached, commands will fail
// with a timeout instead of blocking. Use value -1 for no timeout and 0 for default. // with a timeout instead of blocking.
// Default is 3 seconds. // Default is 3 seconds.
ReadTimeout time.Duration ReadTimeout time.Duration
// Timeout for socket writes. If reached, commands will fail // Timeout for socket writes. If reached, commands will fail
@ -101,9 +90,6 @@ func (opt *Options) init() {
if opt.Network == "" { if opt.Network == "" {
opt.Network = "tcp" opt.Network = "tcp"
} }
if opt.Addr == "" {
opt.Addr = "localhost:6379"
}
if opt.Dialer == nil { if opt.Dialer == nil {
opt.Dialer = func() (net.Conn, error) { opt.Dialer = func() (net.Conn, error) {
netDialer := &net.Dialer{ netDialer := &net.Dialer{

View File

@ -8,22 +8,8 @@ import (
type pipelineExecer func([]Cmder) error type pipelineExecer func([]Cmder) error
// Pipeliner is an mechanism to realise Redis Pipeline technique.
//
// Pipelining is a technique to extremely speed up processing by packing
// operations to batches, send them at once to Redis and read a replies in a
// singe step.
// See https://redis.io/topics/pipelining
//
// Pay attention, that Pipeline is not a transaction, so you can get unexpected
// results in case of big pipelines and small read/write timeouts.
// Redis client has retransmission logic in case of timeouts, pipeline
// can be retransmitted and commands can be executed more then once.
// To avoid this: it is good idea to use reasonable bigger read/write timeouts
// depends of your batch size and/or use TxPipeline.
type Pipeliner interface { type Pipeliner interface {
StatefulCmdable StatefulCmdable
Do(args ...interface{}) *Cmd
Process(cmd Cmder) error Process(cmd Cmder) error
Close() error Close() error
Discard() error Discard() error
@ -45,12 +31,6 @@ type Pipeline struct {
closed bool closed bool
} }
func (c *Pipeline) Do(args ...interface{}) *Cmd {
cmd := NewCmd(args...)
_ = c.Process(cmd)
return cmd
}
// Process queues the cmd for later execution. // Process queues the cmd for later execution.
func (c *Pipeline) Process(cmd Cmder) error { func (c *Pipeline) Process(cmd Cmder) error {
c.mu.Lock() c.mu.Lock()

View File

@ -1,9 +1,7 @@
package redis package redis
import ( import (
"errors"
"fmt" "fmt"
"strings"
"sync" "sync"
"time" "time"
@ -12,9 +10,7 @@ import (
"github.com/go-redis/redis/internal/proto" "github.com/go-redis/redis/internal/proto"
) )
var errPingTimeout = errors.New("redis: ping timeout") // PubSub implements Pub/Sub commands bas described in
// PubSub implements Pub/Sub commands as described in
// http://redis.io/topics/pubsub. Message receiving is NOT safe // http://redis.io/topics/pubsub. Message receiving is NOT safe
// for concurrent use by multiple goroutines. // for concurrent use by multiple goroutines.
// //
@ -30,9 +26,8 @@ type PubSub struct {
cn *pool.Conn cn *pool.Conn
channels map[string]struct{} channels map[string]struct{}
patterns map[string]struct{} patterns map[string]struct{}
closed bool
closed bool exit chan struct{}
exit chan struct{}
cmd *Cmd cmd *Cmd
@ -41,12 +36,6 @@ type PubSub struct {
ping chan struct{} ping chan struct{}
} }
func (c *PubSub) String() string {
channels := mapKeys(c.channels)
channels = append(channels, mapKeys(c.patterns)...)
return fmt.Sprintf("PubSub(%s)", strings.Join(channels, ", "))
}
func (c *PubSub) init() { func (c *PubSub) init() {
c.exit = make(chan struct{}) c.exit = make(chan struct{})
} }
@ -62,6 +51,7 @@ func (c *PubSub) _conn(newChannels []string) (*pool.Conn, error) {
if c.closed { if c.closed {
return nil, pool.ErrClosed return nil, pool.ErrClosed
} }
if c.cn != nil { if c.cn != nil {
return c.cn, nil return c.cn, nil
} }
@ -397,39 +387,16 @@ func (c *PubSub) ReceiveMessage() (*Message, error) {
// It periodically sends Ping messages to test connection health. // It periodically sends Ping messages to test connection health.
// The channel is closed with PubSub. Receive* APIs can not be used // The channel is closed with PubSub. Receive* APIs can not be used
// after channel is created. // after channel is created.
//
// If the Go channel is full for 30 seconds the message is dropped.
func (c *PubSub) Channel() <-chan *Message { func (c *PubSub) Channel() <-chan *Message {
return c.channel(100) c.chOnce.Do(c.initChannel)
}
// ChannelSize is like Channel, but creates a Go channel
// with specified buffer size.
func (c *PubSub) ChannelSize(size int) <-chan *Message {
return c.channel(size)
}
func (c *PubSub) channel(size int) <-chan *Message {
c.chOnce.Do(func() {
c.initChannel(size)
})
if cap(c.ch) != size {
err := fmt.Errorf("redis: PubSub.Channel is called with different buffer size")
panic(err)
}
return c.ch return c.ch
} }
func (c *PubSub) initChannel(size int) { func (c *PubSub) initChannel() {
const timeout = 30 * time.Second c.ch = make(chan *Message, 100)
c.ping = make(chan struct{}, 10)
c.ch = make(chan *Message, size)
c.ping = make(chan struct{}, 1)
go func() { go func() {
timer := time.NewTimer(timeout)
timer.Stop()
var errCount int var errCount int
for { for {
msg, err := c.Receive() msg, err := c.Receive()
@ -444,7 +411,6 @@ func (c *PubSub) initChannel(size int) {
errCount++ errCount++
continue continue
} }
errCount = 0 errCount = 0
// Any message is as good as a ping. // Any message is as good as a ping.
@ -459,28 +425,21 @@ func (c *PubSub) initChannel(size int) {
case *Pong: case *Pong:
// Ignore. // Ignore.
case *Message: case *Message:
timer.Reset(timeout) c.ch <- msg
select {
case c.ch <- msg:
if !timer.Stop() {
<-timer.C
}
case <-timer.C:
internal.Logf(
"redis: %s channel is full for %s (message is dropped)",
c, timeout)
}
default: default:
internal.Logf("redis: unknown message type: %T", msg) internal.Logf("redis: unknown message: %T", msg)
} }
} }
}() }()
go func() { go func() {
const timeout = 5 * time.Second
timer := time.NewTimer(timeout) timer := time.NewTimer(timeout)
timer.Stop() timer.Stop()
healthy := true healthy := true
var pingErr error
for { for {
timer.Reset(timeout) timer.Reset(timeout)
select { select {
@ -490,13 +449,10 @@ func (c *PubSub) initChannel(size int) {
<-timer.C <-timer.C
} }
case <-timer.C: case <-timer.C:
pingErr := c.Ping() pingErr = c.Ping()
if healthy { if healthy {
healthy = false healthy = false
} else { } else {
if pingErr == nil {
pingErr = errPingTimeout
}
c.mu.Lock() c.mu.Lock()
c._reconnect(pingErr) c._reconnect(pingErr)
c.mu.Unlock() c.mu.Unlock()

View File

@ -26,7 +26,6 @@ func SetLogger(logger *log.Logger) {
type baseClient struct { type baseClient struct {
opt *Options opt *Options
connPool pool.Pooler connPool pool.Pooler
limiter Limiter
process func(Cmder) error process func(Cmder) error
processPipeline func([]Cmder) error processPipeline func([]Cmder) error
@ -51,80 +50,45 @@ func (c *baseClient) newConn() (*pool.Conn, error) {
return nil, err return nil, err
} }
err = c.initConn(cn) if cn.InitedAt.IsZero() {
if err != nil { if err := c.initConn(cn); err != nil {
_ = c.connPool.CloseConn(cn) _ = c.connPool.CloseConn(cn)
return nil, err return nil, err
}
} }
return cn, nil return cn, nil
} }
func (c *baseClient) getConn() (*pool.Conn, error) { func (c *baseClient) getConn() (*pool.Conn, error) {
if c.limiter != nil {
err := c.limiter.Allow()
if err != nil {
return nil, err
}
}
cn, err := c._getConn()
if err != nil {
if c.limiter != nil {
c.limiter.ReportResult(err)
}
return nil, err
}
return cn, nil
}
func (c *baseClient) _getConn() (*pool.Conn, error) {
cn, err := c.connPool.Get() cn, err := c.connPool.Get()
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = c.initConn(cn) if cn.InitedAt.IsZero() {
if err != nil { err := c.initConn(cn)
c.connPool.Remove(cn, err) if err != nil {
if err := internal.Unwrap(err); err != nil { c.connPool.Remove(cn)
return nil, err return nil, err
} }
return nil, err
} }
return cn, nil return cn, nil
} }
func (c *baseClient) releaseConn(cn *pool.Conn, err error) { func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool {
if c.limiter != nil {
c.limiter.ReportResult(err)
}
if internal.IsBadConn(err, false) { if internal.IsBadConn(err, false) {
c.connPool.Remove(cn, err) c.connPool.Remove(cn)
} else { return false
c.connPool.Put(cn)
}
}
func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) {
if c.limiter != nil {
c.limiter.ReportResult(err)
} }
if err == nil || internal.IsRedisError(err) { c.connPool.Put(cn)
c.connPool.Put(cn) return true
} else {
c.connPool.Remove(cn, err)
}
} }
func (c *baseClient) initConn(cn *pool.Conn) error { func (c *baseClient) initConn(cn *pool.Conn) error {
if cn.Inited { cn.InitedAt = time.Now()
return nil
}
cn.Inited = true
if c.opt.Password == "" && if c.opt.Password == "" &&
c.opt.DB == 0 && c.opt.DB == 0 &&
@ -162,7 +126,7 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
// Do creates a Cmd from the args and processes the cmd. // Do creates a Cmd from the args and processes the cmd.
func (c *baseClient) Do(args ...interface{}) *Cmd { func (c *baseClient) Do(args ...interface{}) *Cmd {
cmd := NewCmd(args...) cmd := NewCmd(args...)
_ = c.Process(cmd) c.Process(cmd)
return cmd return cmd
} }
@ -204,7 +168,9 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
return err return err
} }
err = cn.WithReader(c.cmdTimeout(cmd), cmd.readReply) err = cn.WithReader(c.cmdTimeout(cmd), func(rd *proto.Reader) error {
return cmd.readReply(rd)
})
c.releaseConn(cn, err) c.releaseConn(cn, err)
if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) { if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) {
continue continue
@ -222,11 +188,7 @@ func (c *baseClient) retryBackoff(attempt int) time.Duration {
func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration { func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
if timeout := cmd.readTimeout(); timeout != nil { if timeout := cmd.readTimeout(); timeout != nil {
t := *timeout return readTimeout(*timeout)
if t == 0 {
return 0
}
return t + 10*time.Second
} }
return c.opt.ReadTimeout return c.opt.ReadTimeout
} }
@ -238,7 +200,7 @@ func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
func (c *baseClient) Close() error { func (c *baseClient) Close() error {
var firstErr error var firstErr error
if c.onClose != nil { if c.onClose != nil {
if err := c.onClose(); err != nil { if err := c.onClose(); err != nil && firstErr == nil {
firstErr = err firstErr = err
} }
} }
@ -282,7 +244,12 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e
} }
canRetry, err := p(cn, cmds) canRetry, err := p(cn, cmds)
c.releaseConnStrict(cn, err)
if err == nil || internal.IsRedisError(err) {
c.connPool.Put(cn)
break
}
c.connPool.Remove(cn)
if !canRetry || !internal.IsRetryableError(err, true) { if !canRetry || !internal.IsRetryableError(err, true) {
break break
@ -352,7 +319,7 @@ func txPipelineReadQueued(rd *proto.Reader, cmds []Cmder) error {
return err return err
} }
for range cmds { for _ = range cmds {
err = statusCmd.readReply(rd) err = statusCmd.readReply(rd)
if err != nil && !internal.IsRedisError(err) { if err != nil && !internal.IsRedisError(err) {
return err return err
@ -424,12 +391,12 @@ func (c *Client) WithContext(ctx context.Context) *Client {
if ctx == nil { if ctx == nil {
panic("nil context") panic("nil context")
} }
c2 := c.clone() c2 := c.copy()
c2.ctx = ctx c2.ctx = ctx
return c2 return c2
} }
func (c *Client) clone() *Client { func (c *Client) copy() *Client {
cp := *c cp := *c
cp.init() cp.init()
return &cp return &cp
@ -440,11 +407,6 @@ func (c *Client) Options() *Options {
return c.opt return c.opt
} }
func (c *Client) SetLimiter(l Limiter) *Client {
c.limiter = l
return c
}
type PoolStats pool.Stats type PoolStats pool.Stats
// PoolStats returns connection pool stats. // PoolStats returns connection pool stats.
@ -493,30 +455,6 @@ func (c *Client) pubSub() *PubSub {
// Subscribe subscribes the client to the specified channels. // Subscribe subscribes the client to the specified channels.
// Channels can be omitted to create empty subscription. // Channels can be omitted to create empty subscription.
// Note that this method does not wait on a response from Redis, so the
// subscription may not be active immediately. To force the connection to wait,
// you may call the Receive() method on the returned *PubSub like so:
//
// sub := client.Subscribe(queryResp)
// iface, err := sub.Receive()
// if err != nil {
// // handle error
// }
//
// // Should be *Subscription, but others are possible if other actions have been
// // taken on sub since it was created.
// switch iface.(type) {
// case *Subscription:
// // subscribe succeeded
// case *Message:
// // received first message
// case *Pong:
// // pong received
// default:
// // handle error
// }
//
// ch := sub.Channel()
func (c *Client) Subscribe(channels ...string) *PubSub { func (c *Client) Subscribe(channels ...string) *PubSub {
pubsub := c.pubSub() pubsub := c.pubSub()
if len(channels) > 0 { if len(channels) > 0 {
@ -544,12 +482,10 @@ type Conn struct {
} }
func newConn(opt *Options, cn *pool.Conn) *Conn { func newConn(opt *Options, cn *pool.Conn) *Conn {
connPool := pool.NewSingleConnPool(nil)
connPool.SetConn(cn)
c := Conn{ c := Conn{
baseClient: baseClient{ baseClient: baseClient{
opt: opt, opt: opt,
connPool: connPool, connPool: pool.NewSingleConnPool(cn),
}, },
} }
c.baseClient.init() c.baseClient.init()

View File

@ -273,13 +273,9 @@ func (c *ringShards) Heartbeat(frequency time.Duration) {
// rebalance removes dead shards from the Ring. // rebalance removes dead shards from the Ring.
func (c *ringShards) rebalance() { func (c *ringShards) rebalance() {
c.mu.RLock()
shards := c.shards
c.mu.RUnlock()
hash := newConsistentHash(c.opt) hash := newConsistentHash(c.opt)
var shardsNum int var shardsNum int
for name, shard := range shards { for name, shard := range c.shards {
if shard.IsUp() { if shard.IsUp() {
hash.Add(name) hash.Add(name)
shardsNum++ shardsNum++
@ -323,12 +319,12 @@ func (c *ringShards) Close() error {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// Ring is a Redis client that uses consistent hashing to distribute // Ring is a Redis client that uses constistent hashing to distribute
// keys across multiple Redis servers (shards). It's safe for // keys across multiple Redis servers (shards). It's safe for
// concurrent use by multiple goroutines. // concurrent use by multiple goroutines.
// //
// Ring monitors the state of each shard and removes dead shards from // Ring monitors the state of each shard and removes dead shards from
// the ring. When a shard comes online it is added back to the ring. This // the ring. When shard comes online it is added back to the ring. This
// gives you maximum availability and partition tolerance, but no // gives you maximum availability and partition tolerance, but no
// consistency between different shards or even clients. Each client // consistency between different shards or even clients. Each client
// uses shards that are available to the client and does not do any // uses shards that are available to the client and does not do any
@ -346,7 +342,6 @@ type Ring struct {
shards *ringShards shards *ringShards
cmdsInfoCache *cmdsInfoCache cmdsInfoCache *cmdsInfoCache
process func(Cmder) error
processPipeline func([]Cmder) error processPipeline func([]Cmder) error
} }
@ -359,10 +354,8 @@ func NewRing(opt *RingOptions) *Ring {
} }
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
ring.process = ring.defaultProcess
ring.processPipeline = ring.defaultProcessPipeline ring.processPipeline = ring.defaultProcessPipeline
ring.cmdable.setProcessor(ring.Process)
ring.init()
for name, addr := range opt.Addrs { for name, addr := range opt.Addrs {
clopt := opt.clientOptions() clopt := opt.clientOptions()
@ -375,10 +368,6 @@ func NewRing(opt *RingOptions) *Ring {
return ring return ring
} }
func (c *Ring) init() {
c.cmdable.setProcessor(c.Process)
}
func (c *Ring) Context() context.Context { func (c *Ring) Context() context.Context {
if c.ctx != nil { if c.ctx != nil {
return c.ctx return c.ctx
@ -390,15 +379,13 @@ func (c *Ring) WithContext(ctx context.Context) *Ring {
if ctx == nil { if ctx == nil {
panic("nil context") panic("nil context")
} }
c2 := c.clone() c2 := c.copy()
c2.ctx = ctx c2.ctx = ctx
return c2 return c2
} }
func (c *Ring) clone() *Ring { func (c *Ring) copy() *Ring {
cp := *c cp := *c
cp.init()
return &cp return &cp
} }
@ -539,34 +526,19 @@ func (c *Ring) Do(args ...interface{}) *Cmd {
func (c *Ring) WrapProcess( func (c *Ring) WrapProcess(
fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error, fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
) { ) {
c.process = fn(c.process) c.ForEachShard(func(c *Client) error {
c.WrapProcess(fn)
return nil
})
} }
func (c *Ring) Process(cmd Cmder) error { func (c *Ring) Process(cmd Cmder) error {
return c.process(cmd) shard, err := c.cmdShard(cmd)
} if err != nil {
cmd.setErr(err)
func (c *Ring) defaultProcess(cmd Cmder) error { return err
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
if attempt > 0 {
time.Sleep(c.retryBackoff(attempt))
}
shard, err := c.cmdShard(cmd)
if err != nil {
cmd.setErr(err)
return err
}
err = shard.Client.Process(cmd)
if err == nil {
return nil
}
if !internal.IsRetryableError(err, cmd.readTimeout() == nil) {
return err
}
} }
return cmd.Err() return shard.Client.Process(cmd)
} }
func (c *Ring) Pipeline() Pipeliner { func (c *Ring) Pipeline() Pipeliner {
@ -603,42 +575,36 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
time.Sleep(c.retryBackoff(attempt)) time.Sleep(c.retryBackoff(attempt))
} }
var mu sync.Mutex
var failedCmdsMap map[string][]Cmder var failedCmdsMap map[string][]Cmder
var wg sync.WaitGroup
for hash, cmds := range cmdsMap { for hash, cmds := range cmdsMap {
wg.Add(1) shard, err := c.shards.GetByHash(hash)
go func(hash string, cmds []Cmder) { if err != nil {
defer wg.Done() setCmdsErr(cmds, err)
continue
}
shard, err := c.shards.GetByHash(hash) cn, err := shard.Client.getConn()
if err != nil { if err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
return continue
}
canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
if err == nil || internal.IsRedisError(err) {
shard.Client.connPool.Put(cn)
continue
}
shard.Client.connPool.Remove(cn)
if canRetry && internal.IsRetryableError(err, true) {
if failedCmdsMap == nil {
failedCmdsMap = make(map[string][]Cmder)
} }
failedCmdsMap[hash] = cmds
cn, err := shard.Client.getConn() }
if err != nil {
setCmdsErr(cmds, err)
return
}
canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
shard.Client.releaseConnStrict(cn, err)
if canRetry && internal.IsRetryableError(err, true) {
mu.Lock()
if failedCmdsMap == nil {
failedCmdsMap = make(map[string][]Cmder)
}
failedCmdsMap[hash] = cmds
mu.Unlock()
}
}(hash, cmds)
} }
wg.Wait()
if len(failedCmdsMap) == 0 { if len(failedCmdsMap) == 0 {
break break
} }
@ -664,39 +630,6 @@ func (c *Ring) Close() error {
return c.shards.Close() return c.shards.Close()
} }
func (c *Ring) Watch(fn func(*Tx) error, keys ...string) error {
if len(keys) == 0 {
return fmt.Errorf("redis: Watch requires at least one key")
}
var shards []*ringShard
for _, key := range keys {
if key != "" {
shard, err := c.shards.GetByKey(hashtag.Key(key))
if err != nil {
return err
}
shards = append(shards, shard)
}
}
if len(shards) == 0 {
return fmt.Errorf("redis: Watch requires at least one shard")
}
if len(shards) > 1 {
for _, shard := range shards[1:] {
if shard.Client != shards[0].Client {
err := fmt.Errorf("redis: Watch requires all keys to be in the same shard")
return err
}
}
}
return shards[0].Client.Watch(fn, keys...)
}
func newConsistentHash(opt *RingOptions) *consistenthash.Map { func newConsistentHash(opt *RingOptions) *consistenthash.Map {
return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash)) return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash))
} }

View File

@ -90,7 +90,9 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
opt: opt, opt: opt,
connPool: failover.Pool(), connPool: failover.Pool(),
onClose: failover.Close, onClose: func() error {
return failover.Close()
},
}, },
} }
c.baseClient.init() c.baseClient.init()
@ -117,7 +119,7 @@ func NewSentinelClient(opt *Options) *SentinelClient {
return c return c
} }
func (c *SentinelClient) pubSub() *PubSub { func (c *SentinelClient) PubSub() *PubSub {
pubsub := &PubSub{ pubsub := &PubSub{
opt: c.opt, opt: c.opt,
@ -130,67 +132,14 @@ func (c *SentinelClient) pubSub() *PubSub {
return pubsub return pubsub
} }
// Subscribe subscribes the client to the specified channels.
// Channels can be omitted to create empty subscription.
func (c *SentinelClient) Subscribe(channels ...string) *PubSub {
pubsub := c.pubSub()
if len(channels) > 0 {
_ = pubsub.Subscribe(channels...)
}
return pubsub
}
// PSubscribe subscribes the client to the given patterns.
// Patterns can be omitted to create empty subscription.
func (c *SentinelClient) PSubscribe(channels ...string) *PubSub {
pubsub := c.pubSub()
if len(channels) > 0 {
_ = pubsub.PSubscribe(channels...)
}
return pubsub
}
func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd { func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
cmd := NewStringSliceCmd("sentinel", "get-master-addr-by-name", name) cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name)
c.Process(cmd) c.Process(cmd)
return cmd return cmd
} }
func (c *SentinelClient) Sentinels(name string) *SliceCmd { func (c *SentinelClient) Sentinels(name string) *SliceCmd {
cmd := NewSliceCmd("sentinel", "sentinels", name) cmd := NewSliceCmd("SENTINEL", "sentinels", name)
c.Process(cmd)
return cmd
}
// Failover forces a failover as if the master was not reachable, and without
// asking for agreement to other Sentinels.
func (c *SentinelClient) Failover(name string) *StatusCmd {
cmd := NewStatusCmd("sentinel", "failover", name)
c.Process(cmd)
return cmd
}
// Reset resets all the masters with matching name. The pattern argument is a
// glob-style pattern. The reset process clears any previous state in a master
// (including a failover in progress), and removes every slave and sentinel
// already discovered and associated with the master.
func (c *SentinelClient) Reset(pattern string) *IntCmd {
cmd := NewIntCmd("sentinel", "reset", pattern)
c.Process(cmd)
return cmd
}
// FlushConfig forces Sentinel to rewrite its configuration on disk, including
// the current Sentinel state.
func (c *SentinelClient) FlushConfig() *StatusCmd {
cmd := NewStatusCmd("sentinel", "flushconfig")
c.Process(cmd)
return cmd
}
// Master shows the state and info of the specified master.
func (c *SentinelClient) Master(name string) *StringStringMapCmd {
cmd := NewStringStringMapCmd("sentinel", "master", name)
c.Process(cmd) c.Process(cmd)
return cmd return cmd
} }
@ -207,92 +156,79 @@ type sentinelFailover struct {
masterName string masterName string
_masterAddr string _masterAddr string
sentinel *SentinelClient sentinel *SentinelClient
pubsub *PubSub
} }
func (c *sentinelFailover) Close() error { func (d *sentinelFailover) Close() error {
c.mu.Lock() return d.resetSentinel()
defer c.mu.Unlock()
if c.sentinel != nil {
return c.closeSentinel()
}
return nil
} }
func (c *sentinelFailover) Pool() *pool.ConnPool { func (d *sentinelFailover) Pool() *pool.ConnPool {
c.poolOnce.Do(func() { d.poolOnce.Do(func() {
c.opt.Dialer = c.dial d.opt.Dialer = d.dial
c.pool = newConnPool(c.opt) d.pool = newConnPool(d.opt)
}) })
return c.pool return d.pool
} }
func (c *sentinelFailover) dial() (net.Conn, error) { func (d *sentinelFailover) dial() (net.Conn, error) {
addr, err := c.MasterAddr() addr, err := d.MasterAddr()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return net.DialTimeout("tcp", addr, c.opt.DialTimeout) return net.DialTimeout("tcp", addr, d.opt.DialTimeout)
} }
func (c *sentinelFailover) MasterAddr() (string, error) { func (d *sentinelFailover) MasterAddr() (string, error) {
addr, err := c.masterAddr() d.mu.Lock()
defer d.mu.Unlock()
addr, err := d.masterAddr()
if err != nil { if err != nil {
return "", err return "", err
} }
c.switchMaster(addr) d._switchMaster(addr)
return addr, nil return addr, nil
} }
func (c *sentinelFailover) masterAddr() (string, error) { func (d *sentinelFailover) masterAddr() (string, error) {
c.mu.RLock() // Try last working sentinel.
addr := c.getMasterAddr() if d.sentinel != nil {
c.mu.RUnlock() addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result()
if addr != "" { if err == nil {
return addr, nil addr := net.JoinHostPort(addr[0], addr[1])
return addr, nil
}
internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
d.masterName, err)
d._resetSentinel()
} }
c.mu.Lock() for i, sentinelAddr := range d.sentinelAddrs {
defer c.mu.Unlock()
addr = c.getMasterAddr()
if addr != "" {
return addr, nil
}
if c.sentinel != nil {
c.closeSentinel()
}
for i, sentinelAddr := range c.sentinelAddrs {
sentinel := NewSentinelClient(&Options{ sentinel := NewSentinelClient(&Options{
Addr: sentinelAddr, Addr: sentinelAddr,
MaxRetries: c.opt.MaxRetries, DialTimeout: d.opt.DialTimeout,
ReadTimeout: d.opt.ReadTimeout,
WriteTimeout: d.opt.WriteTimeout,
DialTimeout: c.opt.DialTimeout, PoolSize: d.opt.PoolSize,
ReadTimeout: c.opt.ReadTimeout, PoolTimeout: d.opt.PoolTimeout,
WriteTimeout: c.opt.WriteTimeout, IdleTimeout: d.opt.IdleTimeout,
PoolSize: c.opt.PoolSize,
PoolTimeout: c.opt.PoolTimeout,
IdleTimeout: c.opt.IdleTimeout,
IdleCheckFrequency: c.opt.IdleCheckFrequency,
TLSConfig: c.opt.TLSConfig,
}) })
masterAddr, err := sentinel.GetMasterAddrByName(c.masterName).Result() masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result()
if err != nil { if err != nil {
internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s", internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s",
c.masterName, err) d.masterName, err)
_ = sentinel.Close() sentinel.Close()
continue continue
} }
// Push working sentinel to the top. // Push working sentinel to the top.
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0] d.sentinelAddrs[0], d.sentinelAddrs[i] = d.sentinelAddrs[i], d.sentinelAddrs[0]
c.setSentinel(sentinel) d.setSentinel(sentinel)
addr := net.JoinHostPort(masterAddr[0], masterAddr[1]) addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
return addr, nil return addr, nil
@ -301,34 +237,17 @@ func (c *sentinelFailover) masterAddr() (string, error) {
return "", errors.New("redis: all sentinels are unreachable") return "", errors.New("redis: all sentinels are unreachable")
} }
func (c *sentinelFailover) getMasterAddr() string { func (c *sentinelFailover) switchMaster(addr string) {
sentinel := c.sentinel c.mu.Lock()
c._switchMaster(addr)
if sentinel == nil { c.mu.Unlock()
return ""
}
addr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
if err != nil {
internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
c.masterName, err)
return ""
}
return net.JoinHostPort(addr[0], addr[1])
} }
func (c *sentinelFailover) switchMaster(addr string) { func (c *sentinelFailover) _switchMaster(addr string) {
c.mu.RLock() if c._masterAddr == addr {
masterAddr := c._masterAddr
c.mu.RUnlock()
if masterAddr == addr {
return return
} }
c.mu.Lock()
defer c.mu.Unlock()
internal.Logf("sentinel: new master=%q addr=%q", internal.Logf("sentinel: new master=%q addr=%q",
c.masterName, addr) c.masterName, addr)
_ = c.Pool().Filter(func(cn *pool.Conn) bool { _ = c.Pool().Filter(func(cn *pool.Conn) bool {
@ -337,36 +256,32 @@ func (c *sentinelFailover) switchMaster(addr string) {
c._masterAddr = addr c._masterAddr = addr
} }
func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) { func (d *sentinelFailover) setSentinel(sentinel *SentinelClient) {
c.discoverSentinels(sentinel) d.discoverSentinels(sentinel)
c.sentinel = sentinel d.sentinel = sentinel
go d.listen(sentinel)
c.pubsub = sentinel.Subscribe("+switch-master")
go c.listen(c.pubsub)
} }
func (c *sentinelFailover) closeSentinel() error { func (d *sentinelFailover) resetSentinel() error {
var firstErr error var err error
d.mu.Lock()
err := c.pubsub.Close() if d.sentinel != nil {
if err != nil && firstErr == err { err = d._resetSentinel()
firstErr = err
} }
c.pubsub = nil d.mu.Unlock()
return err
err = c.sentinel.Close()
if err != nil && firstErr == err {
firstErr = err
}
c.sentinel = nil
return firstErr
} }
func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) { func (d *sentinelFailover) _resetSentinel() error {
sentinels, err := sentinel.Sentinels(c.masterName).Result() err := d.sentinel.Close()
d.sentinel = nil
return err
}
func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
sentinels, err := sentinel.Sentinels(d.masterName).Result()
if err != nil { if err != nil {
internal.Logf("sentinel: Sentinels master=%q failed: %s", c.masterName, err) internal.Logf("sentinel: Sentinels master=%q failed: %s", d.masterName, err)
return return
} }
for _, sentinel := range sentinels { for _, sentinel := range sentinels {
@ -375,32 +290,49 @@ func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
key := vals[i].(string) key := vals[i].(string)
if key == "name" { if key == "name" {
sentinelAddr := vals[i+1].(string) sentinelAddr := vals[i+1].(string)
if !contains(c.sentinelAddrs, sentinelAddr) { if !contains(d.sentinelAddrs, sentinelAddr) {
internal.Logf("sentinel: discovered new sentinel=%q for master=%q", internal.Logf(
sentinelAddr, c.masterName) "sentinel: discovered new sentinel=%q for master=%q",
c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr) sentinelAddr, d.masterName,
)
d.sentinelAddrs = append(d.sentinelAddrs, sentinelAddr)
} }
} }
} }
} }
} }
func (c *sentinelFailover) listen(pubsub *PubSub) { func (d *sentinelFailover) listen(sentinel *SentinelClient) {
ch := pubsub.Channel() pubsub := sentinel.PubSub()
defer pubsub.Close()
err := pubsub.Subscribe("+switch-master")
if err != nil {
internal.Logf("sentinel: Subscribe failed: %s", err)
d.resetSentinel()
return
}
for { for {
msg, ok := <-ch msg, err := pubsub.ReceiveMessage()
if !ok { if err != nil {
break if err == pool.ErrClosed {
d.resetSentinel()
return
}
internal.Logf("sentinel: ReceiveMessage failed: %s", err)
continue
} }
if msg.Channel == "+switch-master" { switch msg.Channel {
case "+switch-master":
parts := strings.Split(msg.Payload, " ") parts := strings.Split(msg.Payload, " ")
if parts[0] != c.masterName { if parts[0] != d.masterName {
internal.Logf("sentinel: ignore addr for master=%q", parts[0]) internal.Logf("sentinel: ignore addr for master=%q", parts[0])
continue continue
} }
addr := net.JoinHostPort(parts[3], parts[4]) addr := net.JoinHostPort(parts[3], parts[4])
c.switchMaster(addr) d.switchMaster(addr)
} }
} }
} }

View File

@ -29,10 +29,10 @@ func (c *Client) newTx() *Tx {
return &tx return &tx
} }
// Watch prepares a transaction and marks the keys to be watched // Watch prepares a transcaction and marks the keys to be watched
// for conditional execution if there are any keys. // for conditional execution if there are any keys.
// //
// The transaction is automatically closed when fn exits. // The transaction is automatically closed when the fn exits.
func (c *Client) Watch(fn func(*Tx) error, keys ...string) error { func (c *Client) Watch(fn func(*Tx) error, keys ...string) error {
tx := c.newTx() tx := c.newTx()
if len(keys) > 0 { if len(keys) > 0 {

View File

@ -155,7 +155,6 @@ type UniversalClient interface {
Watch(fn func(*Tx) error, keys ...string) error Watch(fn func(*Tx) error, keys ...string) error
Process(cmd Cmder) error Process(cmd Cmder) error
WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error)
WrapProcessPipeline(fn func(oldProcess func([]Cmder) error) func([]Cmder) error)
Subscribe(channels ...string) *PubSub Subscribe(channels ...string) *PubSub
PSubscribe(channels ...string) *PubSub PSubscribe(channels ...string) *PubSub
Close() error Close() error

3
vendor/modules.txt vendored
View File

@ -60,13 +60,14 @@ github.com/go-openapi/jsonreference
github.com/go-openapi/spec github.com/go-openapi/spec
# github.com/go-openapi/swag v0.19.5 # github.com/go-openapi/swag v0.19.5
github.com/go-openapi/swag github.com/go-openapi/swag
# github.com/go-redis/redis v6.15.7+incompatible # github.com/go-redis/redis v6.14.0+incompatible
github.com/go-redis/redis github.com/go-redis/redis
github.com/go-redis/redis/internal github.com/go-redis/redis/internal
github.com/go-redis/redis/internal/consistenthash github.com/go-redis/redis/internal/consistenthash
github.com/go-redis/redis/internal/hashtag github.com/go-redis/redis/internal/hashtag
github.com/go-redis/redis/internal/pool github.com/go-redis/redis/internal/pool
github.com/go-redis/redis/internal/proto github.com/go-redis/redis/internal/proto
github.com/go-redis/redis/internal/singleflight
github.com/go-redis/redis/internal/util github.com/go-redis/redis/internal/util
# github.com/go-sql-driver/mysql v1.5.0 # github.com/go-sql-driver/mysql v1.5.0
github.com/go-sql-driver/mysql github.com/go-sql-driver/mysql