Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

all: use timer instead of time.After in loops, to avoid memleaks #91

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
all: use timer instead of time.After in loops, to avoid memleaks
  • Loading branch information
ryanmorphl2 committed Jun 12, 2024
commit bc3f8350301c3b8ba0ee9722a4dc7d673c6dcb9c
8 changes: 6 additions & 2 deletions core/bloombits/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,9 @@ func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets []
// of the session, any request in-flight need to be responded to! Empty responses
// are fine though in that case.
func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) {
waitTimer := time.NewTimer(wait)
defer waitTimer.Stop()

for {
// Allocate a new bloom bit index to retrieve data for, stopping when done
bit, ok := s.allocateRetrieval()
Expand All @@ -604,15 +607,16 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan
}
// Bit allocated, throttle a bit if we're below our batch limit
if s.pendingSections(bit) < batch {
waitTimer.Reset(wait)
select {
case <-s.quit:
// Session terminating, we can't meaningfully service, abort
s.allocateSections(bit, 0)
s.deliverSections(bit, []uint64{}, [][]byte{})
return

case <-time.After(wait):
// Throttling up, fetch whatever's available
case <-waitTimer.C:
// Throttling up, fetch whatever is available
}
}
// Allocate as much as we can handle and request servicing
Expand Down
6 changes: 5 additions & 1 deletion eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
localHeaders = d.readHeaderRange(tail, int(count))
log.Warn("Retrieved beacon headers from local", "from", from, "count", count)
}
fsHeaderContCheckTimer := time.NewTimer(fsHeaderContCheck)
defer fsHeaderContCheckTimer.Stop()

for {
// Some beacon headers might have appeared since the last cycle, make
// sure we're always syncing to all available ones
Expand Down Expand Up @@ -381,8 +384,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
}
// State sync still going, wait a bit for new headers and retry
log.Trace("Pivot not yet committed, waiting...")
fsHeaderContCheckTimer.Reset(fsHeaderContCheck)
select {
case <-time.After(fsHeaderContCheck):
case <-fsHeaderContCheckTimer.C:
case <-d.cancelCh:
return errCanceled
}
Expand Down
12 changes: 10 additions & 2 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,7 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis)
rollbackErr error
mode = d.getMode()
timer = time.NewTimer(time.Second)
)
defer func() {
if rollback > 0 {
Expand All @@ -1284,6 +1285,8 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
// Wait for batches of headers to process
gotHeaders := false

defer timer.Stop()

for {
select {
case <-d.cancelCh:
Expand Down Expand Up @@ -1439,11 +1442,12 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
if mode == FullSync || mode == SnapSync {
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
timer.Reset(time.Second)
select {
case <-d.cancelCh:
rollbackErr = errCanceled
return errCanceled
case <-time.After(time.Second):
case <-timer.C:
}
}
// Otherwise insert the headers for content retrieval
Expand Down Expand Up @@ -1599,7 +1603,10 @@ func (d *Downloader) processSnapSyncContent() error {
var (
oldPivot *fetchResult // Locked in pivot block, might change eventually
oldTail []*fetchResult // Downloaded content after the pivot
timer = time.NewTimer(time.Second)
)
defer timer.Stop()

for {
// Wait for the next batch of downloaded data to be available, and if the pivot
// block became stale, move the goalpost
Expand Down Expand Up @@ -1673,6 +1680,7 @@ func (d *Downloader) processSnapSyncContent() error {
oldPivot = P
}
// Wait for completion, occasionally checking for pivot staleness
timer.Reset(time.Second)
select {
case <-sync.done:
if sync.err != nil {
Expand All @@ -1683,7 +1691,7 @@ func (d *Downloader) processSnapSyncContent() error {
}
oldPivot = nil

case <-time.After(time.Second):
case <-timer.C:
oldTail = afterP
continue
}
Expand Down
5 changes: 4 additions & 1 deletion ethstats/ethstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,10 +541,13 @@ func (s *Service) reportLatency(conn *connWrapper) error {
return err
}
// Wait for the pong request to arrive back
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()

select {
case <-s.pongCh:
// Pong delivered, report the latency
case <-time.After(5 * time.Second):
case <-timer.C:
// Ping timeout, abort
return errors.New("ping timed out")
}
Expand Down
5 changes: 4 additions & 1 deletion p2p/simulations/adapters/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,13 @@ func (n *ExecNode) Stop() error {
go func() {
waitErr <- n.Cmd.Wait()
}()
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()

select {
case err := <-waitErr:
return err
case <-time.After(5 * time.Second):
case <-timer.C:
return n.Cmd.Process.Kill()
}
}
Expand Down
10 changes: 8 additions & 2 deletions p2p/simulations/mocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) {
if err != nil {
panic("Could not startup node network for mocker")
}
tick := time.NewTicker(10 * time.Second)
var (
tick = time.NewTicker(10 * time.Second)
timer = time.NewTimer(3 * time.Second)
)
defer tick.Stop()
defer timer.Stop()

for {
select {
case <-quit:
Expand All @@ -80,11 +85,12 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) {
return
}

timer.Reset(3 * time.Second)
select {
case <-quit:
log.Info("Terminating simulation loop")
return
case <-time.After(3 * time.Second):
case <-timer.C:
}

log.Debug("starting node", "id", id)
Expand Down
5 changes: 4 additions & 1 deletion p2p/simulations/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,11 +1032,14 @@ func (net *Network) Load(snap *Snapshot) error {
}
}

timeout := time.NewTimer(snapshotLoadTimeout)
defer timeout.Stop()

select {
// Wait until all connections from the snapshot are established.
case <-allConnected:
// Make sure that we do not wait forever.
case <-time.After(snapshotLoadTimeout):
case <-timeout.C:
return errors.New("snapshot connections not established")
}
return nil
Expand Down
Loading