diff --git a/ethstorage/eth/polling_client.go b/ethstorage/eth/polling_client.go index 0d1591b0..5099099b 100644 --- a/ethstorage/eth/polling_client.go +++ b/ethstorage/eth/polling_client.go @@ -147,45 +147,41 @@ func (w *PollingClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.H } func (w *PollingClient) pollHeads() { - // To prevent polls from stacking up in case HTTP requests - // are slow, use a similar model to the driver in which - // polls are requested manually after each header is fetched. - reqPollAfter := func() { - if w.pollRate == 0 { - return - } - time.AfterFunc(w.pollRate, w.reqPoll) - } - reqPollAfter() + w.reqPoll() defer close(w.closedCh) for { select { case <-w.pollReqCh: - // We don't need backoff here because we'll just try again - // after the pollRate elapses. head, err := w.queryHeader() + if err != nil { w.lg.Info("Error getting latest header", "err", err) - reqPollAfter() + w.scheduleNextPoll(nil) continue } if w.currHead != nil && w.currHead.Hash() == head.Hash() { w.lg.Trace("No change in head, skipping notifications") - reqPollAfter() + w.scheduleNextPoll(head) continue } - w.lg.Trace("Notifying subscribers of new head", "head", head.Hash()) + headTime := time.Unix(int64(head.Time), 0) + w.lg.Trace( + "Notifying subscribers of new head", + "height", head.Number, + "headTime", headTime.Format("15:04:05"), + "head", head.Hash(), + ) w.currHead = head w.mtx.RLock() for _, sub := range w.subs { sub <- head } w.mtx.RUnlock() - reqPollAfter() + w.scheduleNextPoll(head) case <-w.ctx.Done(): w.Client.Close() return @@ -199,8 +195,35 @@ func (w *PollingClient) getLatestHeader() (*types.Header, error) { return w.HeaderByNumber(ctx, big.NewInt(rpc.LatestBlockNumber.Int64())) } +// scheduleNextPoll decides the next poll time based on next head.Time: +func (w *PollingClient) scheduleNextPoll(head *types.Header) { + if w.pollRate == 0 { + return + } + // A heuristic estimation of p2p network delay to balance timely polling and request frequency + const minDelay = 1000 * time.Millisecond + + // Retry on failure + if head == nil { + time.AfterFunc(minDelay, w.reqPoll) + return + } + // Align next poll to headTime + pollRate + slack. + target := time.Unix(int64(head.Time), 0).Add(w.pollRate).Add(minDelay) + // bound the delay between minDelay and pollRate + delay := min(max(time.Until(target), minDelay), w.pollRate) + + w.lg.Trace("Scheduled next poll", "delay", delay) + + time.AfterFunc(delay, w.reqPoll) +} + func (w *PollingClient) reqPoll() { - w.pollReqCh <- struct{}{} + // non-blocking send + select { + case w.pollReqCh <- struct{}{}: + default: + } } func (w *PollingClient) FilterLogsByBlockRange(start *big.Int, end *big.Int, eventSig string) ([]types.Log, error) { diff --git a/ethstorage/node/node.go b/ethstorage/node/node.go index 180ba628..e7aa2dd3 100644 --- a/ethstorage/node/node.go +++ b/ethstorage/node/node.go @@ -191,26 +191,26 @@ func (n *EsNode) startL1(cfg *Config) { } n.lg.Error("L1 heads subscription error", "err", err) }() - - // Keep subscribed to the randao heads, which helps miner to get proper random seeds - n.randaoHeadsSub = event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) { - if err != nil { - n.lg.Warn("Resubscribing after failed randao head subscription", "err", err) - } - if n.randaoSource != nil { - return eth.WatchHeadChanges(n.resourcesCtx, n.randaoSource, n.OnNewRandaoSourceHead) - } else { - return eth.WatchHeadChanges(n.resourcesCtx, n.l1Source, n.OnNewRandaoSourceHead) - } - }) - go func() { - err, ok := <-n.randaoHeadsSub.Err() - if !ok { - return - } - n.lg.Error("Randao heads subscription error", "err", err) - }() - + if n.miner != nil { + // Keep subscribed to the randao heads, which helps miner to get proper random seeds + n.randaoHeadsSub = event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) { + if err != nil { + n.lg.Warn("Resubscribing after failed randao head subscription", "err", err) + } + if n.randaoSource != nil { + return eth.WatchHeadChanges(n.resourcesCtx, n.randaoSource, n.OnNewRandaoSourceHead) + } else { + return eth.WatchHeadChanges(n.resourcesCtx, n.l1Source, n.OnNewRandaoSourceHead) + } + }) + go func() { + err, ok := <-n.randaoHeadsSub.Err() + if !ok { + return + } + n.lg.Error("Randao heads subscription error", "err", err) + }() + } // Poll for the safe L1 block and finalized block, // which only change once per epoch at most and may be delayed. n.l1SafeSub = eth.PollBlockChanges(n.resourcesCtx, n.lg, n.l1Source, n.OnNewL1Safe, ethRPC.SafeBlockNumber,