Skip to main content
Handle Solana forks by tracking unfinalized slots and rolling back when forks occur.
import { BlockCursor, createTarget } from '@subsquid/pipes'
import { solanaPortalSource, solanaInstructionDecoder} from '@subsquid/pipes/solana'
import * as orcaWhirlpool from './abi/orca_whirlpool/index.js'

async function main() {
  const source = solanaPortalSource({
    portal: 'https://portal.sqd.dev/datasets/solana-mainnet'
  })

  const decoder = solanaInstructionDecoder({
    programId: orcaWhirlpool.programId,
    instructions: {
      swap: orcaWhirlpool.instructions.swap,
    },
    range: { from: 'latest' }
  })

  // Track recently processed unfinalized slots
  let recentUnfinalizedSlots: BlockCursor[] = []

  await source
    .pipe(decoder)
    .pipeTo(createTarget({
      write: async ({logger, read}) => {
        // Resume from the last known slot
        for await (const {data, ctx} of read(recentUnfinalizedSlots[recentUnfinalizedSlots.length-1])) {
          console.log(`Got ${data.swap.length} swaps`)
          
          // Track unfinalized slots from the batch
          ctx.state.rollbackChain.forEach((bc) => {
            recentUnfinalizedSlots.push(bc)
          })
          
          // Prune finalized slots and cap queue length at 1000
          if (ctx.head.finalized) {
            recentUnfinalizedSlots = recentUnfinalizedSlots.filter(
              b => b.number >= ctx.head.finalized!.number
            )
          }
          recentUnfinalizedSlots = recentUnfinalizedSlots.slice(
            recentUnfinalizedSlots.length - 1000,
            recentUnfinalizedSlots.length
          )
        }
      },
      // Handle fork events
      fork: async (previousBlocks) => {
        const rollbackIndex = findRollbackIndex(recentUnfinalizedSlots, previousBlocks)
        
        if (rollbackIndex >= 0) {
          recentUnfinalizedSlots.length = rollbackIndex + 1
          return recentUnfinalizedSlots[rollbackIndex]
        } else {
          recentUnfinalizedSlots.length = 0
          return null
        }
      }
    }))
}

function findRollbackIndex(chainA: BlockCursor[], chainB: BlockCursor[]): number {
  let aIndex = 0
  let bIndex = 0
  let lastCommonIndex = -1

  while (aIndex < chainA.length && bIndex < chainB.length) {
    const slotA = chainA[aIndex]
    const slotB = chainB[bIndex]

    if (slotA.number < slotB.number) {
        aIndex++
        continue
    }

    if (slotA.number > slotB.number) {
        bIndex++
        continue
    }

    if (slotA.number === slotB.number && slotA.hash !== slotB.hash) {
        return lastCommonIndex
    }

    lastCommonIndex = aIndex
    aIndex++
    bIndex++
  }

  return lastCommonIndex
}

void main()
When the source detects a fork, it throws a ForkException from read(). The target catches it and runs the fork() handler with slots from the new consensus. The handler finds the common ancestor and returns the cursor to resume from.