Cursor Management
Resume indexing from a specific slot using cursors.Copy
import { createTarget } from '@subsquid/pipes'
import { solanaPortalSource, solanaInstructionDecoder } from '@subsquid/pipes/solana'
import * as orcaWhirlpool from './abi/orca_whirlpool/index.js'
const source = solanaPortalSource({
portal: 'https://portal.sqd.dev/datasets/solana-mainnet'
})
const decoder = solanaInstructionDecoder({
programId: orcaWhirlpool.programId,
instructions: {
swap: orcaWhirlpool.instructions.swap,
},
range: { from: 200_000_000, to: 200_000_500 }
})
async function firstRun() {
console.log(`Starting from the default slot 200_000_000...`)
await source
.pipe(decoder)
.pipeTo(createTarget({
write: async ({logger, read}) => {
for await (const {data} of read()) {
console.log('data:', data)
}
},
}))
}
async function secondRun() {
console.log(`Starting from slots following 200_000_300...`)
await source
.pipe(decoder)
.pipeTo(createTarget({
write: async ({logger, read}) => {
// Resume from slot 200_000_300
for await (const {data} of read({ number: 200_000_300 })) {
console.log('data:', data)
}
},
}))
}
firstRun().then(() => { secondRun().then(() => { console.log('\n\ndone') }) })
read() to resume processing from a specific slot. The cursor format is { number: slotNumber }.
Fork Handling
Handle Solana forks by tracking unfinalized slots and rolling back when forks occur.Copy
import { BlockCursor, createTarget } from '@subsquid/pipes'
import { solanaPortalSource, solanaInstructionDecoder} from '@subsquid/pipes/solana'
import * as orcaWhirlpool from './abi/orca_whirlpool/index.js'
async function main() {
const source = solanaPortalSource({
portal: 'https://portal.sqd.dev/datasets/solana-mainnet'
})
const decoder = solanaInstructionDecoder({
programId: orcaWhirlpool.programId,
instructions: {
swap: orcaWhirlpool.instructions.swap,
},
range: { from: 'latest' }
})
// Track recently processed unfinalized slots
let recentUnfinalizedSlots: BlockCursor[] = []
await source
.pipe(decoder)
.pipeTo(createTarget({
write: async ({logger, read}) => {
// Resume from the last known slot
for await (const {data, ctx} of read(recentUnfinalizedSlots[recentUnfinalizedSlots.length-1])) {
console.log(`Got ${data.swap.length} swaps`)
// Track unfinalized slots from the batch
ctx.state.rollbackChain.forEach((bc) => {
recentUnfinalizedSlots.push(bc)
})
// Prune finalized slots and cap queue length at 1000
if (ctx.head.finalized) {
recentUnfinalizedSlots = recentUnfinalizedSlots.filter(
b => b.number >= ctx.head.finalized!.number
)
}
recentUnfinalizedSlots = recentUnfinalizedSlots.slice(
recentUnfinalizedSlots.length - 1000,
recentUnfinalizedSlots.length
)
console.log(`Recent slots list length is ${recentUnfinalizedSlots.length} after processing the batch`)
}
},
// Handle fork events
fork: async (previousBlocks) => {
console.log(`Got a fork!`)
const rollbackIndex = findRollbackIndex(recentUnfinalizedSlots, previousBlocks)
if (rollbackIndex >= 0) {
console.log(`Rolling back: removing slots after slot ${recentUnfinalizedSlots[rollbackIndex].number}`)
recentUnfinalizedSlots.length = rollbackIndex + 1
return recentUnfinalizedSlots[rollbackIndex]
} else {
// Can't recover if fork is deeper than our log
console.log(`Failed to process the fork - no common ancestor found`)
recentUnfinalizedSlots.length = 0
return null
}
}
}))
}
function findRollbackIndex(chainA: BlockCursor[], chainB: BlockCursor[]): number {
let aIndex = 0
let bIndex = 0
let lastCommonIndex = -1
while (aIndex < chainA.length && bIndex < chainB.length) {
const slotA = chainA[aIndex]
const slotB = chainB[bIndex]
if (slotA.number < slotB.number) {
aIndex++
continue
}
if (slotA.number > slotB.number) {
bIndex++
continue
}
if (slotA.number === slotB.number && slotA.hash !== slotB.hash) {
return lastCommonIndex
}
lastCommonIndex = aIndex
aIndex++
bIndex++
}
return lastCommonIndex
}
void main()
ForkException from read(). The target catches it and runs the fork() handler with slots from the new consensus. The handler finds the common ancestor and returns the cursor to resume from.
ClickHouse Target
Use ClickHouse for automatic cursor management and rollback handling.Copy
import { createClient } from '@clickhouse/client'
import { solanaInstructionDecoder, solanaPortalSource } from '@subsquid/pipes/solana'
import { clickhouseTarget } from '@subsquid/pipes/targets/clickhouse'
import * as orcaWhirlpool from './abi/orca_whirlpool/index.js'
async function main() {
const client = createClient({
username: 'default',
password: 'default',
url: 'http://localhost:10123',
})
// Create table for Orca swaps
await client.command({ query: `
CREATE TABLE IF NOT EXISTS orca_swaps (
slot_number UInt64 CODEC (DoubleDelta, ZSTD),
timestamp DateTime CODEC (DoubleDelta, ZSTD),
transaction_hash String,
instruction_address UInt16,
program_id String,
sign Int8 DEFAULT 1
)
ENGINE = CollapsingMergeTree(sign)
ORDER BY (slot_number, transaction_hash, instruction_address);
`})
await solanaPortalSource({
portal: 'https://portal.sqd.dev/datasets/solana-mainnet',
})
.pipe(
solanaInstructionDecoder({
range: { from: 'latest' },
programId: orcaWhirlpool.programId,
instructions: {
swaps: orcaWhirlpool.instructions.swap,
},
}),
)
.pipeTo(
clickhouseTarget({
client,
onRollback: async ({type, store, safeCursor}) => {
// Automatically remove rows after the fork point
await store.removeAllRows({
tables: ['orca_swaps'],
where: `slot_number > ${safeCursor.number}`,
})
},
onData: async ({ store, data, ctx }) => {
console.log(`inserting ${data.swaps.length} swaps`)
store.insert({
table: 'orca_swaps',
values: data.swaps.map(s => ({
slot_number: s.blockNumber,
timestamp: s.timestamp.valueOf() / 1000,
transaction_hash: s.transaction.signatures[0],
instruction_address: s.rawInstruction.instructionAddress[0],
program_id: s.programId,
})),
format: 'JSONEachRow'
})
},
}),
)
}
void main()
onRollback to remove data after a fork point, and onData to insert new data.
