Skip to content

Commit f176d09

Browse files
authored
Merge pull request #2 from optimism-java/handle-reorg
handle reorg
2 parents 5ea2afd + 509f7d9 commit f176d09

9 files changed

+285
-78
lines changed

internal/handler/disputeGame.go

+17-13
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,12 @@ func (r *RetryDisputeGameClient) ProcessDisputeGameMove(ctx context.Context, evt
6363
return fmt.Errorf("[processDisputeGameMove] event data to disputeGameMove err: %s", err)
6464
}
6565
var storageClaimSize int64
66-
r.DB.Model(&schema.GameClaimData{}).Where("game_contract=?", evt.ContractAddress).Count(&storageClaimSize)
66+
r.DB.Model(&schema.GameClaimData{}).Where("game_contract=? and on_chain_status = ?",
67+
evt.ContractAddress, schema.GameClaimDataOnChainStatusValid).Count(&storageClaimSize)
6768
data, err := r.Client.RetryClaimData(ctx, &bind.CallOpts{}, big.NewInt(storageClaimSize))
6869
if err != nil {
69-
return fmt.Errorf("[processDisputeGameMove] contract: %s, index: %d move event get claim data err: %s", evt.ContractAddress, storageClaimSize, errors.WithStack(err))
70+
return fmt.Errorf("[processDisputeGameMove] contract: %s, index: %d move event get claim data err: %s",
71+
evt.ContractAddress, storageClaimSize, errors.WithStack(err))
7072
}
7173

7274
pos := types.NewPositionFromGIndex(data.Position)
@@ -172,17 +174,18 @@ func (r *RetryDisputeGameClient) addDisputeGame(ctx context.Context, evt *schema
172174
}
173175

174176
gameClaim := &schema.GameClaimData{
175-
GameContract: strings.ToLower(disputeGame.DisputeProxy),
176-
DataIndex: 0,
177-
ParentIndex: claimData.ParentIndex,
178-
CounteredBy: claimData.CounteredBy.Hex(),
179-
Claimant: claimData.Claimant.Hex(),
180-
Bond: cast.ToString(claimData.Bond),
181-
Claim: hex.EncodeToString(claimData.Claim[:]),
182-
Position: cast.ToString(claimData.Position),
183-
Clock: claimData.Clock.Int64(),
184-
OutputBlock: l2Block.Uint64(),
185-
EventID: evt.ID,
177+
GameContract: strings.ToLower(disputeGame.DisputeProxy),
178+
DataIndex: 0,
179+
ParentIndex: claimData.ParentIndex,
180+
CounteredBy: claimData.CounteredBy.Hex(),
181+
Claimant: claimData.Claimant.Hex(),
182+
Bond: cast.ToString(claimData.Bond),
183+
Claim: hex.EncodeToString(claimData.Claim[:]),
184+
Position: cast.ToString(claimData.Position),
185+
Clock: claimData.Clock.Int64(),
186+
OutputBlock: l2Block.Uint64(),
187+
EventID: evt.ID,
188+
OnChainStatus: schema.GameClaimDataOnChainStatusValid,
186189
}
187190

188191
game := &schema.DisputeGame{
@@ -201,6 +204,7 @@ func (r *RetryDisputeGameClient) addDisputeGame(ctx context.Context, evt *schema
201204
GameType: disputeGame.GameType,
202205
L2BlockNumber: l2Block.Int64(),
203206
Status: schema.DisputeGameStatusInProgress,
207+
OnChainStatus: schema.DisputeGameOnChainStatusValid,
204208
}
205209
err = r.DB.Transaction(func(tx *gorm.DB) error {
206210
err = tx.Save(gameClaim).Error

internal/handler/latestBlockNumber.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,26 @@
11
package handler
22

33
import (
4+
"context"
45
"time"
56

6-
"github.com/optimism-java/dispute-explorer/pkg/rpc"
7-
"github.com/pkg/errors"
8-
97
"github.com/optimism-java/dispute-explorer/internal/svc"
108
"github.com/optimism-java/dispute-explorer/pkg/log"
9+
"github.com/pkg/errors"
10+
"github.com/spf13/cast"
1111
)
1212

1313
func LatestBlackNumber(ctx *svc.ServiceContext) {
1414
for {
15-
blockJSON, err := rpc.HTTPPostJSON("", ctx.Config.L1RPCUrl, "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\"finalized\", false],\"id\":1}")
15+
latest, err := ctx.L1RPC.BlockNumber(context.Background())
1616
if err != nil {
1717
log.Errorf("[Handler.LatestBlackNumber] Syncing block by number error: %s\n", errors.WithStack(err))
1818
time.Sleep(3 * time.Second)
1919
continue
2020
}
21-
block := rpc.ParseJSONBlock(string(blockJSON))
2221

23-
ctx.LatestBlockNumber = block.Number()
24-
log.Infof("[Handle.LatestBlackNumber] Syncing latest block number: %d \n", block.Number())
22+
ctx.LatestBlockNumber = cast.ToInt64(latest)
23+
log.Infof("[Handle.LatestBlackNumber] Syncing latest block number: %d \n", latest)
2524
time.Sleep(3 * time.Second)
2625
}
2726
}

internal/handler/syncBlock.go

+45-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
)
1515

1616
func SyncBlock(ctx *svc.ServiceContext) {
17-
// 防止服务启停切换时同时存在2个服务同步数据
17+
// Prevent data synchronization between two services during service start/stop switchover
1818
time.Sleep(10 * time.Second)
1919
var syncedBlock schema.SyncBlock
2020
err := ctx.DB.Where("status = ? or status = ? ", schema.BlockValid, schema.BlockPending).Order("block_number desc").First(&syncedBlock).Error
@@ -54,6 +54,7 @@ func SyncBlock(ctx *svc.ServiceContext) {
5454

5555
if common.HexToHash(block.ParentHash()) != ctx.SyncedBlockHash {
5656
log.Errorf("[Handler.SyncBlock] ParentHash of the block being synchronized is inconsistent: %s \n", ctx.SyncedBlockHash)
57+
rollbackBlock(ctx)
5758
continue
5859
}
5960

@@ -80,3 +81,46 @@ func SyncBlock(ctx *svc.ServiceContext) {
8081
ctx.SyncedBlockHash = common.HexToHash(block.Hash())
8182
}
8283
}
84+
85+
func rollbackBlock(ctx *svc.ServiceContext) {
86+
for {
87+
rollbackBlockNumber := ctx.SyncedBlockNumber
88+
89+
log.Infof("[Handler.SyncBlock.RollBackBlock] Try to rollback block number: %d\n", rollbackBlockNumber)
90+
91+
blockJSON, err := rpc.HTTPPostJSON("", ctx.Config.L1RPCUrl, "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\""+fmt.Sprintf("0x%X", rollbackBlockNumber)+"\", true],\"id\":1}")
92+
if err != nil {
93+
log.Errorf("[Handler.SyncBlock.RollRackBlock]Rollback block by number error: %s\n", errors.WithStack(err))
94+
continue
95+
}
96+
97+
rollbackBlock := rpc.ParseJSONBlock(string(blockJSON))
98+
log.Errorf("[Handler.SyncBlock.RollRackBlock] rollbackBlock: %s, syncedBlockHash: %s \n", rollbackBlock.Hash(), ctx.SyncedBlockHash)
99+
100+
if common.HexToHash(rollbackBlock.Hash()) == ctx.SyncedBlockHash {
101+
err = ctx.DB.Transaction(func(tx *gorm.DB) error {
102+
err = tx.Model(schema.SyncBlock{}).Where(" (status = ? or status = ?) AND block_number>?",
103+
schema.BlockValid, schema.BlockPending, ctx.SyncedBlockNumber).Update("status", schema.BlockRollback).Error
104+
if err != nil {
105+
log.Errorf("[Handler.SyncBlock.RollRackBlock] Rollback Block err: %s\n", errors.WithStack(err))
106+
return err
107+
}
108+
return nil
109+
})
110+
if err != nil {
111+
log.Errorf("[Handler.SyncBlock.RollRackBlock] Rollback db transaction err: %s\n", errors.WithStack(err))
112+
continue
113+
}
114+
log.Infof("[Handler.SyncBlock.RollRackBlock] Rollback blocks is Stop\n")
115+
return
116+
}
117+
var previousBlock schema.SyncBlock
118+
rest := ctx.DB.Where("block_number = ? AND (status = ? or status = ?) ", rollbackBlockNumber-1, schema.BlockValid, schema.BlockPending).First(&previousBlock)
119+
if rest.Error != nil {
120+
log.Errorf("[Handler.RollRackBlock] Previous block by number error: %s\n", errors.WithStack(rest.Error))
121+
continue
122+
}
123+
ctx.SyncedBlockNumber = previousBlock.BlockNumber
124+
ctx.SyncedBlockHash = common.HexToHash(previousBlock.BlockHash)
125+
}
126+
}

internal/handler/syncDispute.go

+161-44
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,187 @@
11
package handler
22

33
import (
4+
"fmt"
5+
"strings"
6+
"sync"
47
"time"
58

9+
"github.com/optimism-java/dispute-explorer/internal/blockchain"
10+
"github.com/pkg/errors"
11+
"gorm.io/gorm"
12+
613
"github.com/ethereum/go-ethereum/common"
714
"github.com/optimism-java/dispute-explorer/internal/schema"
815
"github.com/optimism-java/dispute-explorer/internal/svc"
9-
"github.com/optimism-java/dispute-explorer/pkg/event"
16+
evt "github.com/optimism-java/dispute-explorer/pkg/event"
1017
"github.com/optimism-java/dispute-explorer/pkg/log"
1118
"golang.org/x/time/rate"
1219
)
1320

1421
func SyncDispute(ctx *svc.ServiceContext) {
1522
for {
1623
var events []schema.SyncEvent
17-
err := ctx.DB.Where("status=?", schema.EventPending).Limit(20).Find(&events).Error
24+
err := ctx.DB.Where("status=? OR status=?", schema.EventPending, schema.EventRollback).Order("block_number").Limit(50).Find(&events).Error
1825
if err != nil {
1926
time.Sleep(3 * time.Second)
2027
continue
2128
}
22-
for _, evt := range events {
23-
disputeCreated := event.DisputeGameCreated{}
24-
disputeMove := event.DisputeGameMove{}
25-
disputeResolved := event.DisputeGameResolved{}
26-
switch {
27-
case evt.EventName == disputeCreated.Name() && evt.EventHash == disputeCreated.EventHash().String():
28-
err = disputeCreated.ToObj(evt.Data)
29-
if err != nil {
30-
log.Errorf("[handle.SyncDispute] event data to DisputeGameCreated err: %s", err)
31-
}
32-
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(disputeCreated.DisputeProxy),
33-
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
34-
if err != nil {
35-
log.Errorf("[handle.SyncDispute] init client for created err: %s", err)
36-
}
37-
err = disputeClient.ProcessDisputeGameCreated(ctx.Context, evt)
38-
if err != nil {
39-
log.Errorf("[handle.SyncDispute] ProcessDisputeGameCreated err: %s", err)
40-
}
41-
case evt.EventName == disputeMove.Name() && evt.EventHash == disputeMove.EventHash().String():
42-
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(evt.ContractAddress),
43-
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
44-
if err != nil {
45-
log.Errorf("[handle.SyncDispute] init client for move err: %s", err)
46-
}
47-
err = disputeClient.ProcessDisputeGameMove(ctx.Context, evt)
48-
if err != nil {
49-
log.Errorf("[handle.SyncDispute] ProcessDisputeGameCreated err: %s", err)
50-
}
51-
case evt.EventName == disputeResolved.Name() && evt.EventHash == disputeResolved.EventHash().String():
52-
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(evt.ContractAddress),
53-
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
54-
if err != nil {
55-
log.Errorf("[handle.SyncDispute] init client for resolved err: %s", err)
56-
}
57-
err = disputeClient.ProcessDisputeGameResolve(evt)
58-
if err != nil {
59-
log.Errorf("[handle.SyncDispute] ProcessDisputeGameCreated err: %s", err)
29+
if len(events) == 0 {
30+
log.Infof("[Handler.SyncDispute] Pending events count is 0\n")
31+
time.Sleep(2 * time.Second)
32+
continue
33+
}
34+
35+
var wg sync.WaitGroup
36+
for _, event := range events {
37+
wg.Add(1)
38+
go func(_wg *sync.WaitGroup, ctx *svc.ServiceContext, event schema.SyncEvent) {
39+
defer _wg.Done()
40+
if event.Status == schema.EventPending {
41+
// add events & block.status= valid
42+
err = HandlePendingEvent(ctx, event)
43+
if err != nil {
44+
log.Errorf("[Handler.SyncEvent] HandlePendingBlock err: %s\n", errors.WithStack(err))
45+
time.Sleep(500 * time.Millisecond)
46+
}
47+
} else if event.Status == schema.EventRollback {
48+
// event.status=rollback & block.status=invalid
49+
err = HandleRollbackEvent(ctx, event)
50+
if err != nil {
51+
log.Errorf("[Handler.SyncEvent] HandleRollbackBlock err: %s\n", errors.WithStack(err))
52+
time.Sleep(500 * time.Millisecond)
53+
}
6054
}
61-
default:
62-
log.Infof("this event does not be monitored %s, hash %s", evt.EventName, evt.EventHash)
55+
}(&wg, ctx, event)
56+
}
57+
wg.Wait()
58+
time.Sleep(3 * time.Second)
59+
}
60+
}
61+
62+
func HandleRollbackEvent(ctx *svc.ServiceContext, event schema.SyncEvent) error {
63+
disputeCreated := evt.DisputeGameCreated{}
64+
disputeMove := evt.DisputeGameMove{}
65+
disputeResolved := evt.DisputeGameResolved{}
66+
switch {
67+
case event.EventName == disputeCreated.Name() && event.EventHash == disputeCreated.EventHash().String():
68+
// rollback created event include: dispute_game, game_data_claim
69+
err := disputeCreated.ToObj(event.Data)
70+
if err != nil {
71+
log.Errorf("[handle.SyncDispute.RollbackEvent] event data to DisputeGameCreated err: %s", err)
72+
return errors.WithStack(err)
73+
}
74+
// rollback dispute_game
75+
var disputeGame schema.DisputeGame
76+
err = ctx.DB.Where("game_contract=?", strings.ToLower(disputeCreated.DisputeProxy)).First(&disputeGame).Error
77+
if err != nil && err != gorm.ErrRecordNotFound {
78+
return fmt.Errorf("[handle.SyncDispute.RollbackEvent] rollback created event err: %s", err)
79+
}
80+
disputeGame.OnChainStatus = schema.DisputeGameOnChainStatusRollBack
81+
82+
// rollback game_claim_data
83+
var gameDataClaim schema.GameClaimData
84+
err = ctx.DB.Where("game_contract=? and data_index=0", strings.ToLower(disputeCreated.DisputeProxy)).First(&gameDataClaim).Error
85+
if err != nil && err != gorm.ErrRecordNotFound {
86+
return fmt.Errorf("[handle.SyncDispute.RollbackEvent] rollback created the first claim data err: %s", err)
87+
}
88+
gameDataClaim.OnChainStatus = schema.GameClaimDataOnChainStatusRollBack
89+
90+
err = ctx.DB.Transaction(func(tx *gorm.DB) error {
91+
err = tx.Save(disputeGame).Error
92+
if err != nil {
93+
return fmt.Errorf("[handle.SyncDispute.RollbackEvent] update dispute game status err: %s\n ", err)
6394
}
95+
err = tx.Save(gameDataClaim).Error
6496
if err != nil {
65-
panic(err)
97+
return fmt.Errorf("[handle.SyncDispute.RollbackEvent] update game data claim status err: %s\n ", err)
6698
}
99+
100+
event.Status = schema.EventValid
101+
err = tx.Save(event).Error
102+
if err != nil {
103+
return fmt.Errorf("[handle.SyncDispute.RollbackEvent] update event err: %s\n ", err)
104+
}
105+
return nil
106+
})
107+
// remove contract
108+
blockchain.RemoveContract(event.ContractAddress)
109+
log.Infof("remove contract: %s", event.ContractAddress)
110+
111+
case event.EventName == disputeMove.Name() && event.EventHash == disputeMove.EventHash().String():
112+
// rollback move: rollback move depend on event_id
113+
now := time.Now()
114+
err := ctx.DB.Model(schema.GameClaimData{}).Where("event_id=?", event.ID).
115+
Updates(map[string]interface{}{"on_chain_status": schema.GameClaimDataOnChainStatusRollBack, "updated_at": now}).Error
116+
if err != nil {
117+
log.Errorf("[Handler.SyncDispute.RollbackBlock] rollback move event err: %s ,id : %d \n", err, event.ID)
67118
}
68-
time.Sleep(3 * time.Second)
119+
case event.EventName == disputeResolved.Name() && event.EventHash == disputeResolved.EventHash().String():
120+
// rollback resolved
121+
now := time.Now()
122+
err := ctx.DB.Model(schema.DisputeGame{}).Where("game_contract=?", event.ContractAddress).
123+
Updates(map[string]interface{}{"status": schema.DisputeGameStatusInProgress, "updated_at": now}).Error
124+
if err != nil {
125+
log.Errorf("[Handler.SyncDispute.RollbackBlock] rollback resolved event err: %s ,id : %d \n", err, event.ID)
126+
}
127+
blockchain.AddContract(event.ContractAddress)
128+
log.Infof("[Handler.SyncDispute.RollbackBlock] rollback resolved event id : %d, contract: %s", event.ID, event.ContractAddress)
129+
default:
130+
log.Infof("[Handler.SyncDispute.RollbackBlock] this event does not be monitored %s, hash %s", event.EventName, event.EventHash)
131+
return nil
132+
}
133+
return nil
134+
}
135+
136+
func HandlePendingEvent(ctx *svc.ServiceContext, event schema.SyncEvent) error {
137+
disputeCreated := evt.DisputeGameCreated{}
138+
disputeMove := evt.DisputeGameMove{}
139+
disputeResolved := evt.DisputeGameResolved{}
140+
switch {
141+
case event.EventName == disputeCreated.Name() && event.EventHash == disputeCreated.EventHash().String():
142+
err := disputeCreated.ToObj(event.Data)
143+
if err != nil {
144+
log.Errorf("[handle.SyncDispute.HandlePendingEvent] event data to DisputeGameCreated err: %s", err)
145+
return errors.WithStack(err)
146+
}
147+
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(disputeCreated.DisputeProxy),
148+
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
149+
if err != nil {
150+
log.Errorf("[handle.SyncDispute.HandlePendingEvent] init client for created err: %s", err)
151+
return errors.WithStack(err)
152+
}
153+
err = disputeClient.ProcessDisputeGameCreated(ctx.Context, event)
154+
if err != nil {
155+
log.Errorf("[handle.SyncDispute.HandlePendingEvent] ProcessDisputeGameCreated err: %s", err)
156+
return errors.WithStack(err)
157+
}
158+
case event.EventName == disputeMove.Name() && event.EventHash == disputeMove.EventHash().String():
159+
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(event.ContractAddress),
160+
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
161+
if err != nil {
162+
log.Errorf("[handle.SyncDispute.HandlePendingEvent] init client for move err: %s", err)
163+
return errors.WithStack(err)
164+
}
165+
err = disputeClient.ProcessDisputeGameMove(ctx.Context, event)
166+
if err != nil {
167+
log.Errorf("[handle.SyncDispute.HandlePendingEvent] ProcessDisputeGameCreated err: %s", err)
168+
return errors.WithStack(err)
169+
}
170+
case event.EventName == disputeResolved.Name() && event.EventHash == disputeResolved.EventHash().String():
171+
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(event.ContractAddress),
172+
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
173+
if err != nil {
174+
log.Errorf("[handle.SyncDispute.HandlePendingEvent] init client for resolved err: %s", err)
175+
return errors.WithStack(err)
176+
}
177+
err = disputeClient.ProcessDisputeGameResolve(event)
178+
if err != nil {
179+
log.Errorf("[handle.SyncDispute.HandlePendingEvent] ProcessDisputeGameCreated err: %s", err)
180+
return errors.WithStack(err)
181+
}
182+
default:
183+
log.Infof("[handle.SyncDispute.HandlePendingEvent] this event does not be monitored %s, hash %s", event.EventName, event.EventHash)
184+
return nil
69185
}
186+
return nil
70187
}

0 commit comments

Comments
 (0)