Pipes are flexible indexers that package Soldexer API data into JS streams, pump them through stateless transforms such as decoding, save the results into an OLAP database and process them via materialized views.

Currently pipes are an experimental technology, but it already demonstrated its advantages over the alternatives:

  • Simplicity: data can be accessed with help of just a couple of libraries, no framework needed.
  • Code maintainability: since all stateful data transforms (aggregations) are written in SQL, the code is vectorized by default. No boilerplate batching code is needed.
  • Performance: the combination of stateless decoding + stateful transforms via highly optimized OLAP analytic engines such as ClickHouse ensures that both CPU and disk bandwidth are used efficiently.

Pipes’ architecture is best illustrated by the simple pipe you can find in this repo. It fetches all USDT balance updates, stores pre-transaction balances in a ClickHouse table then computes hourly averages via a materialized view. Here’s its main function:

async function main() {
  const clickhouse = createClickhouseClient();

  // defining the pre-filtered raw data stream
  const ds = /* ...will be explained below... */

  // adding the necessary raw data table
  // and the materialized view that computes hourly averages
  await ensureTables(clickhouse, path.join(__dirname, 'transfers.sql'));

  for await (const transfers of await ds.stream()) {
    await clickhouse.insert({
      table: 'transfers_raw',
      values: transfers,
      format: 'JSONEachRow',
    });

    await ds.ack();
  }
}

As you can see, it does little more than save the data to the database after insuring that the database is in the right state. SQL code responsible for that looks as follows:

src/transfers.sql
CREATE TABLE IF NOT EXISTS transfers_raw
(
  timestamp           DateTime CODEC (DoubleDelta, ZSTD),
  pre_balance         UInt256
) ENGINE = MergeTree()
ORDER BY (timestamp);

CREATE MATERIALIZED VIEW IF NOT EXISTS active_balance_stats
ENGINE = AggregatingMergeTree()
ORDER BY (timestamp)
AS
SELECT
  toStartOfHour(timestamp) as timestamp,
  avgState(toFloat64(pre_balance) / 1e6) AS avg_active_wallet_balance
FROM transfers_raw
GROUP BY timestamp;

Note how the aggregation code is almost devoid of boilerplate. Not only that, there’s no need to resync on aggregation code changes - just updating the materialized view is enough.

This level of simplicity is enabled by the data stream object ds abstracting away the details of data retrieval and transformation. Its definition is somewhat long but straightforward.

Eventually we will have a library of reusable components such as the TransferPreBalancesStream class that implements common raw data processing subtasks (such as fetching and decoding all DEx swaps from major Solana exchanges).

And that’s it for the pipes’ architecture explanation! You can now explore these three end-to-end pipelines with more real world utility than this little demo.

Examples

More examples coming soon! We’re working on additional examples to showcase more use cases and features of Soldexer.