Skip to content

Commit

Permalink
Merge pull request mediocregopher#28 from mediocregopher/util-scan
Browse files Browse the repository at this point in the history
Util scan
  • Loading branch information
Brian Picciano committed May 30, 2016
2 parents c408b66 + 9eaceb7 commit ae04b3e
Show file tree
Hide file tree
Showing 3 changed files with 349 additions and 70 deletions.
30 changes: 15 additions & 15 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
---
language: go
go:
- 1.4
- 1.5
- tip
- 1.4
- 1.5
- 1.6

env:
- REDIS_VERSION=stable
- REDIS_VERSION=stable

install:
- wget http://download.redis.io/releases/redis-$REDIS_VERSION.tar.gz
- tar xf redis-$REDIS_VERSION.tar.gz
- (cd redis-$REDIS_VERSION && make)
- gem install redis
- export PATH=./redis-$REDIS_VERSION/src:$PATH
- make start
- go get -v -t ./...
- wget http://download.redis.io/releases/redis-$REDIS_VERSION.tar.gz
- tar xf redis-$REDIS_VERSION.tar.gz
- (cd redis-$REDIS_VERSION && make)
- gem install redis
- export PATH=./redis-$REDIS_VERSION/src:$PATH
- make start
- go get -v -t ./...

before_script:
# sleep a bit to allow things to get set up
- sleep 10
# sleep a bit to allow things to get set up
- sleep 10

script:
- go test -v ./...
- go test -v -race ./...

after_failure:
- tail -n100 ./*.log
- tail -n100 ./*.log
256 changes: 227 additions & 29 deletions util/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,76 @@ import (
"strings"

"github.com/mediocregopher/radix.v2/cluster"
"github.com/mediocregopher/radix.v2/redis"
)

func scanSingle(r Cmder, ch chan string, cmd, key, pattern string) error {
defer close(ch)
cmd = strings.ToUpper(cmd)
// ScanOpts are various parameters which can be passed into ScanWithOpts. Some
// fields are required depending on which type of scan is being done.
type ScanOpts struct {
// The scan command to do, e.g. "SCAN", "HSCAN", etc...
Command string

var keys []string
cursor := "0"
for {
args := make([]interface{}, 0, 4)
if cmd != "SCAN" {
args = append(args, key)
}
args = append(args, cursor, "MATCH", pattern)
// The key to perform the scan on. Only necessary when Command isn't "SCAN"
Key string

parts, err := r.Cmd(cmd, args...).Array()
if err != nil {
return err
}
// An optional pattern to filter returned keys by
Pattern string

if len(parts) < 2 {
return errors.New("not enough parts returned")
}
// An optional count hint to send to redis to indicate number of keys to
// return per call. This does not affect the actual results of the scan
// command, but it may be useful for optimizing certain datasets
Count int
}

if cursor, err = parts[0].Str(); err != nil {
return err
}
func doScanPart(c Cmder, o ScanOpts, cursor string) (string, []string, error) {
cmd := strings.ToUpper(o.Command)
args := make([]interface{}, 0, 4)
if cmd != "SCAN" {
args = append(args, o.Key)
}
args = append(args, cursor)
if o.Pattern != "" {
args = append(args, "MATCH", o.Pattern)
}
if o.Count > 0 {
args = append(args, "COUNT", o.Count)
}
parts, err := c.Cmd(cmd, args...).Array()
if err != nil {
return "", nil, err
}

if len(parts) < 2 {
return "", nil, errors.New("not enough parts returned")
}

if cursor, err = parts[0].Str(); err != nil {
return "", nil, err
}

var keys []string
if keys, err = parts[1].List(); err != nil {
return "", nil, err
}

if keys, err = parts[1].List(); err != nil {
return cursor, keys, nil
}

func scanSingle(r Cmder, ch chan string, o ScanOpts) error {
defer close(ch)

var cursor string
var keys []string
var err error
for {
if cursor, keys, err = doScanPart(r, o, cursor); err != nil {
return err
}

for i := range keys {
ch <- keys[i]
if keys[i] != "" {
ch <- keys[i]
}
}

if cursor == "0" {
Expand All @@ -49,7 +85,7 @@ func scanSingle(r Cmder, ch chan string, cmd, key, pattern string) error {

// scanCluster is like Scan except it operates over a whole cluster. Unlike Scan
// it only works with SCAN and as such only takes in a pattern string.
func scanCluster(c *cluster.Cluster, ch chan string, pattern string) error {
func scanCluster(c *cluster.Cluster, ch chan string, o ScanOpts) error {
defer close(ch)
clients, err := c.GetEvery()
if err != nil {
Expand All @@ -63,7 +99,7 @@ func scanCluster(c *cluster.Cluster, ch chan string, pattern string) error {
cch := make(chan string)
var err error
go func() {
err = scanSingle(client, cch, "SCAN", "", pattern)
err = scanSingle(client, cch, o)
}()
for key := range cch {
ch <- key
Expand All @@ -76,6 +112,9 @@ func scanCluster(c *cluster.Cluster, ch chan string, pattern string) error {
return nil
}

// Scan is DEPRECATED. It contains an inherent race-condition and shouldn't be
// used. Use NewScanner instead.
//
// Scan is a helper function for performing any of the redis *SCAN functions. It
// takes in a channel which keys returned by redis will be written to, and
// returns an error should any occur. The input channel will always be closed
Expand Down Expand Up @@ -112,15 +151,174 @@ func scanCluster(c *cluster.Cluster, ch chan string, pattern string) error {
// }
//
func Scan(r Cmder, ch chan string, cmd, key, pattern string) error {
if rr, ok := r.(*cluster.Cluster); ok && strings.ToUpper(cmd) == "SCAN" {
return scanCluster(rr, ch, pattern)
// We're using ScanOpts here for esoteric reasons, this whole thing is
// deprecated anyway, so who cares.
o := ScanOpts{
Command: cmd,
Key: key,
Pattern: pattern,
}
if rr, ok := r.(*cluster.Cluster); ok && strings.ToUpper(o.Command) == "SCAN" {
return scanCluster(rr, ch, o)
}
var cmdErr error
err := withClientForKey(r, key, func(c Cmder) {
cmdErr = scanSingle(r, ch, cmd, key, pattern)
err := withClientForKey(r, o.Key, func(c Cmder) {
cmdErr = scanSingle(r, ch, o)
})
if err != nil {
return err
}
return cmdErr
}

////////////////////////////////////////////////////////////////////////////////

// Scanner is used to iterate through the results of a SCAN call (or HSCAN,
// SSCAN, etc...). The Cmder may be a Client, Pool, or Cluster.
//
// Once created, call HasNext() on it to determine if there's a waiting value,
// then Next() to retrieve that value, then repeat. Once HasNext() returns
// false, call Err() to potentially retrieve an error which stopped the
// iteration.
//
// Example SCAN command
//
// s := util.NewScanner(cmder, util.ScanOpts{Command: "SCAN"})
// for s.HasNext() {
// log.Printf("next: %q", s.Next())
// }
// if err := s.Err(); err != nil {
// log.Fatal(err)
// }
//
// Example HSCAN command
//
// s := util.NewScanner(cmder, util.ScanOpts{Command: "HSCAN", Key: "somekey"})
// for s.HasNext() {
// log.Printf("next: %q", s.Next())
// }
// if err := s.Err(); err != nil {
// log.Fatal(err)
// }
//
// HasNext MUST be called before every call to Next. Err MUST be called after
// HasNext returns false.
type Scanner interface {
HasNext() bool
Next() string
Err() error
}

type singleScanner struct {
c Cmder
o ScanOpts

err error
cursor string
buf []string
}

// NewScanner initializes a Scanner struct with the given options and returns
// it.
func NewScanner(c Cmder, o ScanOpts) Scanner {
if cc, ok := c.(*cluster.Cluster); ok && strings.ToUpper(o.Command) == "SCAN" {
return &clusterScanner{c: cc, o: o}
}
return &singleScanner{
c: c,
o: o,
}
}

func (s *singleScanner) HasNext() bool {
for {
if s.err != nil {
return false
}

if len(s.buf) > 0 {
if s.buf[0] == "" {
s.buf = s.buf[1:]
continue
}
return true
}

if s.cursor == "0" {
return false
}

s.cursor, s.buf, s.err = doScanPart(s.c, s.o, s.cursor)
}
}

func (s *singleScanner) Next() string {
// we assume they called HasNext first, which garauntees that this won't
// panic
ret := s.buf[0]
s.buf = s.buf[1:]
return ret
}

func (s *singleScanner) Err() error {
return s.err
}

type clusterScanner struct {
c *cluster.Cluster
o ScanOpts

err error
clients []*redis.Client
currScanner Scanner
}

func (cs *clusterScanner) HasNext() bool {
// if clients is nil then HasNext has never been called, and we need to get
// some clients
if cs.clients == nil {
clientsM, err := cs.c.GetEvery()
if err != nil {
cs.err = err
return false
}

cs.clients = make([]*redis.Client, 0, len(clientsM))
for _, client := range clientsM {
cs.clients = append(cs.clients, client)
}
}

for {
if len(cs.clients) == 0 || cs.err != nil {
return false
} else if cs.currScanner == nil {
cs.currScanner = NewScanner(cs.clients[0], cs.o)
}

if cs.currScanner.HasNext() {
return true
}

// if err isn't nil, cleanup will happen when Err is called on
// clusterScanner
cs.err = cs.currScanner.Err()
cs.currScanner = nil
cs.c.Put(cs.clients[0])
cs.clients = cs.clients[1:]
}
}

func (cs *clusterScanner) Next() string {
return cs.currScanner.Next()
}

func (cs *clusterScanner) Err() error {
// if Err is being called it means iteration is done and we should put back
// all of the clients that still haven't been put back
for _, client := range cs.clients {
cs.c.Put(client)
}
cs.clients = nil
return cs.err
}
Loading

0 comments on commit ae04b3e

Please sign in to comment.