Skip to main content
Learn how to manage indexing state, handle blockchain forks, and persist data for Solana.

Cursor Management

Resume indexing from a specific slot using cursors.
import { createTarget } from '@subsquid/pipes'
import { solanaPortalSource, solanaInstructionDecoder } from '@subsquid/pipes/solana'
import * as orcaWhirlpool from './abi/orca_whirlpool/index.js'

const source = solanaPortalSource({
  portal: 'https://portal.sqd.dev/datasets/solana-mainnet'
})

const decoder = solanaInstructionDecoder({
  programId: orcaWhirlpool.programId,
  instructions: {
    swap: orcaWhirlpool.instructions.swap,
  },
  range: { from: 200_000_000, to: 200_000_500 }
})

async function firstRun() {
  console.log(`Starting from the default slot 200_000_000...`)
  await source
    .pipe(decoder)
    .pipeTo(createTarget({
      write: async ({logger, read}) => {
        for await (const {data} of read()) {
          console.log('data:', data)
        }
      },
    }))
}

async function secondRun() {
  console.log(`Starting from slots following 200_000_300...`)
  await source
    .pipe(decoder)
    .pipeTo(createTarget({
      write: async ({logger, read}) => {
        // Resume from slot 200_000_300
        for await (const {data} of read({ number: 200_000_300 })) {
          console.log('data:', data)
        }
      },
    }))
}

firstRun().then(() => { secondRun().then(() => { console.log('\n\ndone') }) })
Pass a cursor to read() to resume processing from a specific slot. The cursor format is { number: slotNumber }.

Fork Handling

Handle Solana forks by tracking unfinalized slots and rolling back when forks occur.
import { BlockCursor, createTarget } from '@subsquid/pipes'
import { solanaPortalSource, solanaInstructionDecoder} from '@subsquid/pipes/solana'
import * as orcaWhirlpool from './abi/orca_whirlpool/index.js'

async function main() {
  const source = solanaPortalSource({
    portal: 'https://portal.sqd.dev/datasets/solana-mainnet'
  })

  const decoder = solanaInstructionDecoder({
    programId: orcaWhirlpool.programId,
    instructions: {
      swap: orcaWhirlpool.instructions.swap,
    },
    range: { from: 'latest' }
  })

  // Track recently processed unfinalized slots
  let recentUnfinalizedSlots: BlockCursor[] = []

  await source
    .pipe(decoder)
    .pipeTo(createTarget({
      write: async ({logger, read}) => {
        // Resume from the last known slot
        for await (const {data, ctx} of read(recentUnfinalizedSlots[recentUnfinalizedSlots.length-1])) {
          console.log(`Got ${data.swap.length} swaps`)
          
          // Track unfinalized slots from the batch
          ctx.state.rollbackChain.forEach((bc) => {
            recentUnfinalizedSlots.push(bc)
          })
          
          // Prune finalized slots and cap queue length at 1000
          if (ctx.head.finalized) {
            recentUnfinalizedSlots = recentUnfinalizedSlots.filter(
              b => b.number >= ctx.head.finalized!.number
            )
          }
          recentUnfinalizedSlots = recentUnfinalizedSlots.slice(
            recentUnfinalizedSlots.length - 1000,
            recentUnfinalizedSlots.length
          )

          console.log(`Recent slots list length is ${recentUnfinalizedSlots.length} after processing the batch`)
        }
      },
      // Handle fork events
      fork: async (previousBlocks) => {
        console.log(`Got a fork!`)
        const rollbackIndex = findRollbackIndex(recentUnfinalizedSlots, previousBlocks)
        
        if (rollbackIndex >= 0) {
          console.log(`Rolling back: removing slots after slot ${recentUnfinalizedSlots[rollbackIndex].number}`)
          recentUnfinalizedSlots.length = rollbackIndex + 1
          return recentUnfinalizedSlots[rollbackIndex]
        } else {
          // Can't recover if fork is deeper than our log
          console.log(`Failed to process the fork - no common ancestor found`)
          recentUnfinalizedSlots.length = 0
          return null
        }
      }
    }))
}

function findRollbackIndex(chainA: BlockCursor[], chainB: BlockCursor[]): number {
  let aIndex = 0
  let bIndex = 0
  let lastCommonIndex = -1

  while (aIndex < chainA.length && bIndex < chainB.length) {
    const slotA = chainA[aIndex]
    const slotB = chainB[bIndex]

    if (slotA.number < slotB.number) {
        aIndex++
        continue
    }

    if (slotA.number > slotB.number) {
        bIndex++
        continue
    }

    if (slotA.number === slotB.number && slotA.hash !== slotB.hash) {
        return lastCommonIndex
    }

    lastCommonIndex = aIndex
    aIndex++
    bIndex++
  }

  return lastCommonIndex
}

void main()
When the source detects a fork, it throws a ForkException from read(). The target catches it and runs the fork() handler with slots from the new consensus. The handler finds the common ancestor and returns the cursor to resume from.

ClickHouse Target

Use ClickHouse for automatic cursor management and rollback handling.
import { createClient } from '@clickhouse/client'
import { solanaInstructionDecoder, solanaPortalSource } from '@subsquid/pipes/solana'
import { clickhouseTarget } from '@subsquid/pipes/targets/clickhouse'
import * as orcaWhirlpool from './abi/orca_whirlpool/index.js'

async function main() {
  const client = createClient({
    username: 'default',
    password: 'default',
    url: 'http://localhost:10123',
  })

  // Create table for Orca swaps
  await client.command({ query: `
    CREATE TABLE IF NOT EXISTS orca_swaps (
      slot_number          UInt64 CODEC (DoubleDelta, ZSTD),
      timestamp             DateTime CODEC (DoubleDelta, ZSTD),
      transaction_hash      String,
      instruction_address   UInt16,
      program_id            String,
      sign                  Int8 DEFAULT 1
    )
    ENGINE = CollapsingMergeTree(sign)
    ORDER BY (slot_number, transaction_hash, instruction_address);
  `})

  await solanaPortalSource({
    portal: 'https://portal.sqd.dev/datasets/solana-mainnet',
  })
  .pipe(
    solanaInstructionDecoder({
      range: { from: 'latest' },
      programId: orcaWhirlpool.programId,
      instructions: {
        swaps: orcaWhirlpool.instructions.swap,
      },
    }),
  )
  .pipeTo(
    clickhouseTarget({
      client,
      onRollback: async ({type, store, safeCursor}) => {
        // Automatically remove rows after the fork point
        await store.removeAllRows({
          tables: ['orca_swaps'],
          where: `slot_number > ${safeCursor.number}`,
        })
      },
      onData: async ({ store, data, ctx }) => {
        console.log(`inserting ${data.swaps.length} swaps`)
        store.insert({
          table: 'orca_swaps',
          values: data.swaps.map(s => ({
            slot_number: s.blockNumber,
            timestamp: s.timestamp.valueOf() / 1000,
            transaction_hash: s.transaction.signatures[0],
            instruction_address: s.rawInstruction.instructionAddress[0],
            program_id: s.programId,
          })),
          format: 'JSONEachRow'
        })
      },
    }),
  )
}

void main()
The ClickHouse target handles cursor management and rollbacks automatically. Use onRollback to remove data after a fork point, and onData to insert new data.