Skip to main content
Although this is an EVM walkthrough, it shares most of the content with the future Solana version (TBA)
The pipes-sqdgn-dex-example repo is a full-featured pipe that indexes DEX liquidity events (swaps, mints, burns, syncs, etc.) from Base mainnet for Uniswap V2/V3/V4, Aerodrome Basic/Slipstream, and other forks. It uses Pipes SDK to pull data from a Subsquid Portal, decode EVM logs, transform them into a unified schema, and write to ClickHouse. For what Pipes SDK is and when to use it, see Why Pipes SDK. The entrypoint is pipes/evm/liquidity/cli.ts. The pipeline is: Portal source β†’ composite decoders β†’ transform pipe β†’ ClickHouse target.

1. Config and ClickHouse

Config comes from env (e.g. NETWORK, DB_PATH, PORTAL_CACHE_DB_PATH). The CLI creates a ClickHouse client, ensures tables exist, then builds the pipe:
const config = getConfig();
const client = await createClickhouseClient({ ... });
await ensureTables(client, __dirname, config.network, databaseName);
See pipes/evm/config.ts for network/portal URLs and liquidity.sql for schema.

2. Portal source

portal_source.ts wraps the EVM Portal with optional metrics and a SQLite cache so responses can be reused:
evmPortalSource({
  portal: config.portal.url,
  metrics: metricsPort ? metricsServer({ port: metricsPort }) : undefined,
  cache: portalSqliteCache({ path: portalCacheDbPath }),
});

3. Composite decoders

evm_decoder.ts defines one decoder per protocol (Uniswap V2/V3/V4, Aerodrome Basic/Slipstream). Each uses evmDecoder with factory() + a factory SQLite DB to discover pools from factory events, then subscribes to swap/mint/burn/sync/collect (and protocol-specific) events:
uniswapV2: evmDecoder({
  profiler,
  range: { from: blockFrom },
  contracts: factory({
    address: getFactoryAddressesByProtocol(network, 'uniswap_v2'),
    event: UniswapV2FactoryEvents.PairCreated,
    database,
    parameter: 'pair',
  }),
  events: {
    swaps: UniswapV2PairEvents.Swap,
    burns: UniswapV2PairEvents.Burn,
    mints: UniswapV2PairEvents.Mint,
    syncs: UniswapV2PairEvents.Sync,
  },
}),
The CLI merges all decoders with pipeComposite({ ...decoders }) so the Portal stream is decoded into multiple typed streams (one per protocol).

4. Transform pipe

raw_liquidity_event_pipe.ts takes the decoded composite and converts each protocol’s events into a single DbLiquidityEvent shape (pool, tokens, amounts, tick, fee, block/transaction/log indices, etc.) via protocol-specific converters in converters/, then sorts by block/tx/log index:
return ({ uniswapV2, uniswapV3, uniswapV4, aerodromeBasic, aerodromeSlipstream }: InputType) => {
  const v2_res = convertV2(network, { uniswapV2 });
  // ... convertV3, convertV4, convertAerodromeBasic, convertAerodromeSlipstream
  return [...v2_res, ...v3_res, ...v4_res, ...basic_res, ...slipstream_res]
    .sort((a, b) => /* by block_number, transaction_index, log_index */);
};

5. ClickHouse target

clickhouse_target.ts uses the Pipes clickhouseTarget and inserts batches into liquidity_events_raw with retry logic:
clickhouseTarget({
  client,
  onData: async ({ data, ctx }) => {
    await chRetry(logger, 'liquidity_events_raw insert', () =>
      client.insert({ table: 'liquidity_events_raw', values: data, format: 'JSONEachRow' })
    );
  },
});

6. Materialized view transforms in ClickHouse

After rows land in liquidity_events_raw, liquidity.sql (from line 87 onward) defines materialized views that maintain derived tables in ClickHouse. For example, current_balances_mv aggregates balances_history into current_balances The aggregation runs incrementally as new data is inserted into liquidity_events_raw. This approach is commonly used for stateful data transforms / aggregations at SQD. It is elegant and convenient, but its scalability has its limits.

Putting it together

The full pipeline in cli.ts:
await portalSource
  .pipeComposite({ ...decoders })
  .pipe(createPipeFunc(config.network, poolMetadataStorage))
  .pipeTo(chTarget);
Data flows from the Portal (with cache and optional metrics) through the composite decoders, then through the liquidity transform, and finally into ClickHouse. For run instructions and env vars, see the repo README.