forked from googleapis/google-cloud-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
iterator.go
165 lines (139 loc) · 4.71 KB
/
iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"sync"
"time"
"golang.org/x/net/context"
"google.golang.org/api/iterator"
)
// Done is returned when an iteration is complete.
var Done = iterator.Done
type Iterator struct {
// kaTicker controls how often we send an ack deadline extension request.
kaTicker *time.Ticker
// ackTicker controls how often we acknowledge a batch of messages.
ackTicker *time.Ticker
ka *keepAlive
acker *acker
puller *puller
// mu ensures that cleanup only happens once, and concurrent Stop
// invocations block until cleanup completes.
mu sync.Mutex
// closed is used to signal that Stop has been called.
closed chan struct{}
}
// newIterator starts a new Iterator. Stop must be called on the Iterator
// when it is no longer needed.
// subName is the full name of the subscription to pull messages from.
// ctx is the context to use for acking messages and extending message deadlines.
func newIterator(ctx context.Context, s service, subName string, po *pullOptions) *Iterator {
// TODO: make kaTicker frequency more configurable.
// (ackDeadline - 5s) is a reasonable default for now, because the minimum ack period is 10s. This gives us 5s grace.
keepAlivePeriod := po.ackDeadline - 5*time.Second
kaTicker := time.NewTicker(keepAlivePeriod) // Stopped in it.Stop
// TODO: make ackTicker more configurable. Something less than
// kaTicker is a reasonable default (there's no point extending
// messages when they could be acked instead).
ackTicker := time.NewTicker(keepAlivePeriod / 2) // Stopped in it.Stop
ka := &keepAlive{
s: s,
Ctx: ctx,
Sub: subName,
ExtensionTick: kaTicker.C,
Deadline: po.ackDeadline,
MaxExtension: po.maxExtension,
}
ack := &acker{
s: s,
Ctx: ctx,
Sub: subName,
AckTick: ackTicker.C,
Notify: ka.Remove,
}
pull := newPuller(s, subName, ctx, int64(po.maxPrefetch), ka.Add, ka.Remove)
ka.Start()
ack.Start()
return &Iterator{
kaTicker: kaTicker,
ackTicker: ackTicker,
ka: ka,
acker: ack,
puller: pull,
closed: make(chan struct{}),
}
}
// Next returns the next Message to be processed. The caller must call
// Message.Done when finished with it.
// Once Stop has been called, calls to Next will return Done.
func (it *Iterator) Next() (*Message, error) {
m, err := it.puller.Next()
if err == nil {
m.it = it
return m, nil
}
select {
// If Stop has been called, we return Done regardless the value of err.
case <-it.closed:
return nil, Done
default:
return nil, err
}
}
// Client code must call Stop on an Iterator when finished with it.
// Stop will block until Done has been called on all Messages that have been
// returned by Next, or until the context with which the Iterator was created
// is cancelled or exceeds its deadline.
// Stop need only be called once, but may be called multiple times from
// multiple goroutines.
func (it *Iterator) Stop() {
it.mu.Lock()
defer it.mu.Unlock()
select {
case <-it.closed:
// Cleanup has already been performed.
return
default:
}
// We close this channel before calling it.puller.Stop to ensure that we
// reliably return Done from Next.
close(it.closed)
// Stop the puller. Once this completes, no more messages will be added
// to it.ka.
it.puller.Stop()
// Start acking messages as they arrive, ignoring ackTicker. This will
// result in it.ka.Stop, below, returning as soon as possible.
it.acker.FastMode()
// This will block until
// (a) it.ka.Ctx is done, or
// (b) all messages have been removed from keepAlive.
// (b) will happen once all outstanding messages have been either ACKed or NACKed.
it.ka.Stop()
// There are no more live messages, so kill off the acker.
it.acker.Stop()
it.kaTicker.Stop()
it.ackTicker.Stop()
}
func (it *Iterator) done(ackID string, ack bool) {
if ack {
it.acker.Ack(ackID)
// There's no need to call it.ka.Remove here, as acker will
// call it via its Notify function.
} else {
// TODO: explicitly NACK the message by sending an
// ModifyAckDeadline request with 0s deadline, to make the
// message immediately available for redelivery.
it.ka.Remove(ackID)
}
}