Copy
import { BlockCursor, createTarget } from '@subsquid/pipes'
import { solanaPortalSource, solanaInstructionDecoder} from '@subsquid/pipes/solana'
import * as orcaWhirlpool from './abi/orca_whirlpool/index.js'
async function main() {
const source = solanaPortalSource({
portal: 'https://portal.sqd.dev/datasets/solana-mainnet'
})
const decoder = solanaInstructionDecoder({
programId: orcaWhirlpool.programId,
instructions: {
swap: orcaWhirlpool.instructions.swap,
},
range: { from: 'latest' }
})
// Track recently processed unfinalized slots
let recentUnfinalizedSlots: BlockCursor[] = []
await source
.pipe(decoder)
.pipeTo(createTarget({
write: async ({logger, read}) => {
// Resume from the last known slot
for await (const {data, ctx} of read(recentUnfinalizedSlots[recentUnfinalizedSlots.length-1])) {
console.log(`Got ${data.swap.length} swaps`)
// Track unfinalized slots from the batch
ctx.state.rollbackChain.forEach((bc) => {
recentUnfinalizedSlots.push(bc)
})
// Prune finalized slots and cap queue length at 1000
if (ctx.head.finalized) {
recentUnfinalizedSlots = recentUnfinalizedSlots.filter(
b => b.number >= ctx.head.finalized!.number
)
}
recentUnfinalizedSlots = recentUnfinalizedSlots.slice(
recentUnfinalizedSlots.length - 1000,
recentUnfinalizedSlots.length
)
}
},
// Handle fork events
fork: async (previousBlocks) => {
const rollbackIndex = findRollbackIndex(recentUnfinalizedSlots, previousBlocks)
if (rollbackIndex >= 0) {
recentUnfinalizedSlots.length = rollbackIndex + 1
return recentUnfinalizedSlots[rollbackIndex]
} else {
recentUnfinalizedSlots.length = 0
return null
}
}
}))
}
function findRollbackIndex(chainA: BlockCursor[], chainB: BlockCursor[]): number {
let aIndex = 0
let bIndex = 0
let lastCommonIndex = -1
while (aIndex < chainA.length && bIndex < chainB.length) {
const slotA = chainA[aIndex]
const slotB = chainB[bIndex]
if (slotA.number < slotB.number) {
aIndex++
continue
}
if (slotA.number > slotB.number) {
bIndex++
continue
}
if (slotA.number === slotB.number && slotA.hash !== slotB.hash) {
return lastCommonIndex
}
lastCommonIndex = aIndex
aIndex++
bIndex++
}
return lastCommonIndex
}
void main()
ForkException from read(). The target catches it and runs the fork() handler with slots from the new consensus. The handler finds the common ancestor and returns the cursor to resume from.
