Skip to main content

Trivial Pipe

Stream Orca Whirlpool swap instructions from Portal to logger. Use case: Basic data streaming with query builder.
import { createTarget } from "@subsquid/pipes";
import { solanaPortalSource, SolanaQueryBuilder } from "@subsquid/pipes/solana";

const queryBuilder = new SolanaQueryBuilder()
  .addFields({
    block: { slot: true, hash: true },
    instruction: { programId: true, data: true, accounts: true, transactionHash: true },
  })
  .addInstruction({
    request: {
      programId: ["whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc"], // Orca Whirlpool
      d8: ["0xf8c69e91e17587c8"], // Swap instruction discriminator
    },
    range: { from: 200000000, to: 200000000 },
  });

const source = solanaPortalSource({
  portal: "https://portal.sqd.dev/datasets/solana-mainnet",
  query: queryBuilder,
});

const target = createTarget({
  write: async ({ logger, read }) => {
    for await (const { data } of read()) {
      logger.info(data, "data");
    }
  },
});

await source.pipeTo(target);
For better type safety and developer experience, use solanaInstructionDecoder with ABIs generated by @subsquid/solana-typegen instead of manually specifying instruction discriminators. See the Instruction Decoding guide for a typed approach.

Data Flow

Adding Transformer

Extract transaction hashes from instructions. Use case: Transform raw Portal data before logging.
import { createTarget, createTransformer } from "@subsquid/pipes";
import {
  solanaPortalSource,
  type SolanaPortalData,
  SolanaQueryBuilder,
} from "@subsquid/pipes/solana";

const queryBuilder = new SolanaQueryBuilder()
  .addFields({
    block: { slot: true, hash: true },
    instruction: { programId: true, data: true, accounts: true, transactionHash: true },
  })
  .addInstruction({
    request: {
      programId: ["whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc"],
      d8: ["0xf8c69e91e17587c8"],
    },
    range: { from: 200000000, to: 200000000 },
  });

const source = solanaPortalSource({
  portal: "https://portal.sqd.dev/datasets/solana-mainnet",
  query: queryBuilder,
});

const transformer = createTransformer({
  transform: async (data: SolanaPortalData<any>) => {
    return data.blocks.map((b) => b.instructions.map((i) => i.transactionHash));
  },
});

const target = createTarget({
  write: async ({ logger, read }) => {
    for await (const { data } of read()) {
      logger.info({ data }, "transaction hashes");
    }
  },
});

await source.pipe(transformer).pipeTo(target);

Query from Transformer

Build query dynamically in transformer. Use case: Self-contained transformers that specify their own data requirements.
import { createTarget, createTransformer } from "@subsquid/pipes";
import {
  solanaPortalSource,
  type SolanaPortalData,
  SolanaQueryBuilder,
} from "@subsquid/pipes/solana";

// Start with empty query
const source = solanaPortalSource({
  portal: "https://portal.sqd.dev/datasets/solana-mainnet",
  query: new SolanaQueryBuilder(),
});

// Transformer adds to query
const transformer = createTransformer({
  query: ({ queryBuilder }) => {
    queryBuilder
      .addFields({
        block: { slot: true, hash: true },
        instruction: { programId: true, data: true, accounts: true, transactionHash: true },
      })
      .addInstruction({
        request: {
          programId: ["whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc"],
          d8: ["0xf8c69e91e17587c8"],
        },
        range: { from: 200000000, to: 200000000 },
      });
  },
  transform: async (data: SolanaPortalData<any>) => {
    return data.blocks.map((b) => b.instructions.map((i) => i.transactionHash));
  },
});

const target = createTarget({
  write: async ({ logger, read }) => {
    for await (const { data } of read()) {
      logger.info({ data }, "data");
    }
  },
});

await source.pipe(transformer).pipeTo(target);
Key Distinction: Transformers can operate in two ways:
  • Pure transformation (previous example): Only implements transform() - processes data after the Portal query is executed
  • Query-aware transformation (this example): Implements both query() and transform() - contributes to building the Portal query AND processes the resulting data
When a transformer has a query() method, it augments the Portal query before data is fetched, allowing transformers to be self-contained with their data requirements.