Skip to main content

Cursor Management

Save progress to resume from last processed slot. Use case: Resume indexing after restart without re-processing data.
import { createTarget } from "@subsquid/pipes";
import {
  solanaPortalSource,
  solanaInstructionDecoder,
} from "@subsquid/pipes/solana";
import * as orcaWhirlpool from "./abi/orca_whirlpool/index.js";
import fs from "fs/promises";

const CURSOR_FILE = "cursor.json";

async function loadCursor() {
  try {
    const data = await fs.readFile(CURSOR_FILE, "utf-8");
    return JSON.parse(data);
  } catch {
    return null;
  }
}

async function saveCursor(slotNumber: number) {
  await fs.writeFile(CURSOR_FILE, JSON.stringify({ slotNumber }));
}

// Load saved cursor
const cursor = await loadCursor();

const source = solanaPortalSource({
  portal: "https://portal.sqd.dev/datasets/solana-mainnet",
  cursor: cursor ? { number: cursor.slotNumber } : undefined,
});

const decoder = solanaInstructionDecoder({
  range: { from: cursor?.slotNumber || 200000000 },
  programId: orcaWhirlpool.programId,
  instructions: { swap: orcaWhirlpool.instructions.swap },
});

const target = createTarget({
  write: async ({ logger, read }) => {
    for await (const { data } of read()) {
      // Process data
      await database.insert(data.swap);

      // Save cursor
      const lastSlot = Math.max(...data.swap.map((s) => s.blockNumber));
      await saveCursor(lastSlot);

      logger.info(`Processed up to slot ${lastSlot}`);
    }
  },
});

await source.pipe(decoder).pipeTo(target);

Fork Handling

Handle Solana reorganizations. Use case: Automatically handle chain forks and rollbacks.
import { createTarget } from "@subsquid/pipes";
import {
  solanaPortalSource,
  solanaInstructionDecoder,
} from "@subsquid/pipes/solana";
import * as orcaWhirlpool from "./abi/orca_whirlpool/index.js";

const source = solanaPortalSource({
  portal: "https://portal.sqd.dev/datasets/solana-mainnet",
});

const decoder = solanaInstructionDecoder({
  range: { from: "latest" },
  programId: orcaWhirlpool.programId,
  instructions: { swap: orcaWhirlpool.instructions.swap },
});

const target = createTarget({
  write: async ({ logger, read }) => {
    for await (const { data } of read()) {
      // Insert swaps
      await database.query(
        "INSERT INTO swaps (slot_number, transaction_hash, program_id) VALUES ($1, $2, $3)",
        data.swap.map((s) => [
          s.blockNumber,
          s.transaction.signatures[0],
          s.programId,
        ])
      );

      logger.info(`Inserted ${data.swap.length} swaps`);
    }
  },
  onRollback: async ({ cursor }) => {
    logger.warn(`Fork detected at slot ${cursor.number}`);

    // Delete orphaned data
    await database.query("DELETE FROM swaps WHERE slot_number > $1", [
      cursor.number,
    ]);

    logger.info(`Rolled back to slot ${cursor.number}`);
  },
});

await source.pipe(decoder).pipeTo(target);

ClickHouse Target

Persist to ClickHouse with automatic fork handling. Use case: Production-ready persistence with ClickHouse.
import { createClient } from "@clickhouse/client";
import {
  solanaPortalSource,
  solanaInstructionDecoder,
} from "@subsquid/pipes/solana";
import { clickhouseTarget } from "@subsquid/pipes/targets/clickhouse";
import * as orcaWhirlpool from "./abi/orca_whirlpool/index.js";

// Create client
const client = createClient({
  username: "default",
  password: "default",
  url: "http://localhost:8123",
});

// Create table
await client.command({
  query: `
    CREATE TABLE IF NOT EXISTS orca_swaps (
      slot_number UInt64 CODEC (DoubleDelta, ZSTD),
      timestamp DateTime CODEC (DoubleDelta, ZSTD),
      transaction_hash String,
      instruction_address UInt16,
      program_id String,
      sign Int8 DEFAULT 1
    )
    ENGINE = CollapsingMergeTree(sign)
    ORDER BY (slot_number, transaction_hash, instruction_address)
  `,
});

const source = solanaPortalSource({
  portal: "https://portal.sqd.dev/datasets/solana-mainnet",
});

const decoder = solanaInstructionDecoder({
  range: { from: "latest" },
  programId: orcaWhirlpool.programId,
  instructions: { swap: orcaWhirlpool.instructions.swap },
});

const target = clickhouseTarget({
  client,
  onData: async ({ store, data, ctx }) => {
    ctx.logger.info(`Inserting ${data.swap.length} swaps`);

    store.insert({
      table: "orca_swaps",
      values: data.swap.map((s) => ({
        slot_number: s.blockNumber,
        timestamp: s.timestamp.valueOf() / 1000,
        transaction_hash: s.transaction.signatures[0],
        instruction_address: s.rawInstruction.instructionAddress[0],
        program_id: s.programId,
      })),
      format: "JSONEachRow",
    });
  },
  onRollback: async ({ store, safeCursor }) => {
    await store.removeAllRows({
      tables: ["orca_swaps"],
      where: `slot_number > ${safeCursor.number}`,
    });
  },
});

await source.pipe(decoder).pipeTo(target);

File Output

Save data to JSON files. Use case: Simple file-based persistence for development.
import { createTarget } from "@subsquid/pipes";
import {
  solanaPortalSource,
  solanaInstructionDecoder,
} from "@subsquid/pipes/solana";
import * as orcaWhirlpool from "./abi/orca_whirlpool/index.js";
import fs from "fs/promises";

const source = solanaPortalSource({
  portal: "https://portal.sqd.dev/datasets/solana-mainnet",
});

const decoder = solanaInstructionDecoder({
  range: { from: 200000000, to: 200001000 },
  programId: orcaWhirlpool.programId,
  instructions: { swap: orcaWhirlpool.instructions.swap },
});

const target = createTarget({
  write: async ({ logger, read }) => {
    for await (const { data } of read()) {
      const filename = `swaps-${Date.now()}.json`;

      const simplified = data.swap.map((s) => ({
        slot: s.blockNumber,
        txHash: s.transaction.signatures[0],
        programId: s.programId,
      }));

      await fs.writeFile(filename, JSON.stringify(simplified, null, 2));

      logger.info(`Saved ${data.swap.length} swaps to ${filename}`);
    }
  },
});

await source.pipe(decoder).pipeTo(target);