> ## Documentation Index
> Fetch the complete documentation index at: https://docs.soldexer.dev/llms.txt
> Use this file to discover all available pages before exploring further.

# A prod-ready pipe

> Walkthrough of a production DEX liquidity pipe built with Pipes SDK

<Info>Although this is an EVM walkthrough, it shares most of the content with the future Solana version (TBA)</Info>

The [pipes-sqdgn-dex-example](https://github.com/subsquid-labs/pipes-sqdgn-dex-example) repo is a full-featured pipe that indexes DEX liquidity events (swaps, mints, burns, syncs, etc.) from Base mainnet for Uniswap V2/V3/V4, Aerodrome Basic/Slipstream, and other forks. It uses Pipes SDK to pull data from a Subsquid Portal, decode EVM logs, transform them into a unified schema, and write to ClickHouse. For what Pipes SDK is and when to use it, see [Why Pipes SDK](/en/sdk/pipes-sdk/solana/why-pipes-sdk).

The entrypoint is [`pipes/evm/liquidity/cli.ts`](https://github.com/subsquid-labs/pipes-sqdgn-dex-example/blob/master/pipes/evm/liquidity/cli.ts). The pipeline is: **Portal source → composite decoders → transform pipe → ClickHouse target**.

## 1. Config and ClickHouse

Config comes from env (e.g. `NETWORK`, `DB_PATH`, `PORTAL_CACHE_DB_PATH`). The CLI creates a ClickHouse client, ensures tables exist, then builds the pipe:

```typescript theme={null}
const config = getConfig();
const client = await createClickhouseClient({ ... });
await ensureTables(client, __dirname, config.network, databaseName);
```

See [`pipes/evm/config.ts`](https://github.com/subsquid-labs/pipes-sqdgn-dex-example/blob/master/pipes/evm/config.ts) for network/portal URLs and [`liquidity.sql`](https://github.com/subsquid-labs/pipes-sqdgn-dex-example/blob/master/pipes/evm/liquidity/liquidity.sql) for schema.

## 2. Portal source

[`portal_source.ts`](https://github.com/subsquid-labs/pipes-sqdgn-dex-example/blob/master/pipes/evm/liquidity/portal_source.ts) wraps the EVM Portal with optional metrics and a SQLite cache so responses can be reused:

```typescript theme={null}
evmPortalSource({
  portal: config.portal.url,
  metrics: metricsPort ? metricsServer({ port: metricsPort }) : undefined,
  cache: portalSqliteCache({ path: portalCacheDbPath }),
});
```

## 3. Composite decoders

[`evm_decoder.ts`](https://github.com/subsquid-labs/pipes-sqdgn-dex-example/blob/master/pipes/evm/liquidity/evm_decoder.ts) defines one decoder per protocol (Uniswap V2/V3/V4, Aerodrome Basic/Slipstream). Each uses `evmDecoder` with `factory()` + a factory SQLite DB to discover pools from factory events, then subscribes to swap/mint/burn/sync/collect (and protocol-specific) events:

```typescript theme={null}
uniswapV2: evmDecoder({
  profiler,
  range: { from: blockFrom },
  contracts: factory({
    address: getFactoryAddressesByProtocol(network, 'uniswap_v2'),
    event: UniswapV2FactoryEvents.PairCreated,
    database,
    parameter: 'pair',
  }),
  events: {
    swaps: UniswapV2PairEvents.Swap,
    burns: UniswapV2PairEvents.Burn,
    mints: UniswapV2PairEvents.Mint,
    syncs: UniswapV2PairEvents.Sync,
  },
}),
```

The CLI merges all decoders with `pipeComposite({ ...decoders })` so the Portal stream is decoded into multiple typed streams (one per protocol).

## 4. Transform pipe

[`raw_liquidity_event_pipe.ts`](https://github.com/subsquid-labs/pipes-sqdgn-dex-example/blob/master/pipes/evm/liquidity/raw_liquidity_event_pipe.ts) takes the decoded composite and converts each protocol's events into a single `DbLiquidityEvent` shape (pool, tokens, amounts, tick, fee, block/transaction/log indices, etc.) via protocol-specific converters in `converters/`, then sorts by block/tx/log index:

```typescript theme={null}
return ({ uniswapV2, uniswapV3, uniswapV4, aerodromeBasic, aerodromeSlipstream }: InputType) => {
  const v2_res = convertV2(network, { uniswapV2 });
  // ... convertV3, convertV4, convertAerodromeBasic, convertAerodromeSlipstream
  return [...v2_res, ...v3_res, ...v4_res, ...basic_res, ...slipstream_res]
    .sort((a, b) => /* by block_number, transaction_index, log_index */);
};
```

## 5. ClickHouse target

[`clickhouse_target.ts`](https://github.com/subsquid-labs/pipes-sqdgn-dex-example/blob/master/pipes/evm/liquidity/clickhouse_target.ts) uses the Pipes `clickhouseTarget` and inserts batches into `liquidity_events_raw` with retry logic:

```typescript theme={null}
clickhouseTarget({
  client,
  onData: async ({ data, ctx }) => {
    await chRetry(logger, 'liquidity_events_raw insert', () =>
      client.insert({ table: 'liquidity_events_raw', values: data, format: 'JSONEachRow' })
    );
  },
});
```

## 6. Materialized view transforms in ClickHouse

After rows land in `liquidity_events_raw`, [liquidity.sql](https://github.com/subsquid-labs/pipes-sqdgn-dex-example/blob/master/pipes/evm/liquidity/liquidity.sql) (from [line 87](https://github.com/subsquid-labs/pipes-sqdgn-dex-example/blob/master/pipes/evm/liquidity/liquidity.sql#L87) onward) defines materialized views that maintain derived tables in ClickHouse. For example, `current_balances_mv` aggregates `balances_history` into `current_balances` The aggregation runs incrementally as new data is inserted into `liquidity_events_raw`.

This approach is commonly used for stateful data transforms / aggregations at SQD. It is elegant and convenient, but its scalability has its limits.

## Putting it together

The full pipeline in `cli.ts`:

```typescript theme={null}
await portalSource
  .pipeComposite({ ...decoders })
  .pipe(createPipeFunc(config.network, poolMetadataStorage))
  .pipeTo(chTarget);
```

Data flows from the Portal (with cache and optional metrics) through the composite decoders, then through the liquidity transform, and finally into ClickHouse. For run instructions and env vars, see the repo [README](https://github.com/subsquid-labs/pipes-sqdgn-dex-example#readme).
