Explore three end-to-end pipes — from on-chain data to ClickHouse-ready insights
Pipes are flexible indexers that package Soldexer API data into JS streams, pump them through stateless transforms such as decoding, save the results into an OLAP database and process them via materialized views.
Currently pipes are an experimental technology, but it already demonstrated its advantages over the alternatives:
Simplicity: data can be accessed with help of just a couple of libraries, no framework needed.
Code maintainability: since all stateful data transforms (aggregations) are written in SQL, the code is vectorized by default. No boilerplate batching code is needed.
Performance: the combination of stateless decoding + stateful transforms via highly optimized OLAP analytic engines such as ClickHouse ensures that both CPU and disk bandwidth are used efficiently.
Pipes’ architecture is best illustrated by the simple pipe you can find in this repo. It fetches all USDT balance updates, stores pre-transaction balances in a ClickHouse table then computes hourly averages via a materialized view. Here’s its main function:
Copy
async function main() { const clickhouse = createClickhouseClient(); // defining the pre-filtered raw data stream const ds = /* ...will be explained below... */ // adding the necessary raw data table // and the materialized view that computes hourly averages await ensureTables(clickhouse, path.join(__dirname, 'transfers.sql')); for await (const transfers of await ds.stream()) { await clickhouse.insert({ table: 'transfers_raw', values: transfers, format: 'JSONEachRow', }); await ds.ack(); }}
As you can see, it does little more than save the data to the database after insuring that the database is in the right state. SQL code responsible for that looks as follows:
src/transfers.sql
Copy
CREATE TABLE IF NOT EXISTS transfers_raw( timestamp DateTime CODEC (DoubleDelta, ZSTD), pre_balance UInt256) ENGINE = MergeTree()ORDER BY (timestamp);CREATE MATERIALIZED VIEW IF NOT EXISTS active_balance_statsENGINE = AggregatingMergeTree()ORDER BY (timestamp)ASSELECT toStartOfHour(timestamp) as timestamp, avgState(toFloat64(pre_balance) / 1e6) AS avg_active_wallet_balanceFROM transfers_rawGROUP BY timestamp;
Note how the aggregation code is almost devoid of boilerplate. Not only that, there’s no need to resync on aggregation code changes - just updating the materialized view is enough.
Show instructions on updating the materialized view
Suppose you want to replace the average wallet balance at the start of the transaction with the max one. You can do so by dropping the active_balance_stats view and re-creating it with the new column:
Copy
DROP VIEW active_balance_stats;
Copy
CREATE MATERIALIZED VIEW IF NOT EXISTS active_balance_statsENGINE = AggregatingMergeTree()ORDER BY (timestamp)POPULATEASSELECT toStartOfHour(timestamp) as timestamp, maxState(toFloat64(pre_balance) / 1e6) AS max_active_wallet_balanceFROM transfers_rawGROUP BY timestamp;
Note the POPULATE clause in the view definition - it tells ClickHouse that the materialized view should be populated with the data that’s already available in the transfers_raw table, not just with the newly arriving data.
You can now query the materialized view with
Copy
select timestamp, maxMerge(max_active_wallet_balance) from active_balance_stats group by timestamp;
After updating the view in the database you should also update it in the initial database state definition file. No POPULATE clause is necessary there:
src/transfers.sql
Copy
...CREATE MATERIALIZED VIEW IF NOT EXISTS active_balance_statsENGINE = AggregatingMergeTree()ORDER BY (timestamp)ASSELECT toStartOfHour(timestamp) as timestamp, maxState(toFloat64(pre_balance) / 1e6) AS max_active_wallet_balanceFROM transfers_rawGROUP BY timestamp;
This level of simplicity is enabled by the data stream object ds abstracting away the details of data retrieval and transformation. Its definition is somewhat long but straightforward.
Show full code
Copy
type TransferPreBalance = { timestamp: Date; pre_balance: bigint;};// A stream with just the data we needclass TransferPreBalancesStream extends PortalAbstractStream< TransferPreBalance, { token: string; }> { async stream(): Promise<ReadableStream<TransferPreBalance[]>> { // First, we request a stream of raw data from SQD network // Structure of the getStream argument mirrors that of // the raw Soldexer API JSON request, see // https://docs.soldexer.dev/api-reference/endpoint/post-stream const source = await this.getStream({ type: 'solana', fields: { block: { // Although we're only using timestamp in the final data, // we also need slot number and block hash for progress tracking number: true, hash: true, timestamp: true, }, tokenBalance: { preAmount: true, }, }, tokenBalances: [ { preMint: [ this.options.args.token ] } ] }); // Transforming the raw data stream into a stream // of convenient TransferPreBalance objects. If we needed // to do any decoding we would do it here const stream = source.pipeThrough( new TransformStream({ transform: ({ blocks }, controller) => { const res = blocks.flatMap((block: any) => { if (!block.tokenBalances) return []; const blockTimestamp = new Date(block.header.timestamp * 1000); return block.tokenBalances.map((tb) => ({ timestamp: blockTimestamp, pre_balance: tb.preAmount, })); }); controller.enqueue(res); }, }), ); return stream; }}... const ds = new TransferPreBalancesStream({ portal: 'https://portal.sqd.dev/datasets/solana-mainnet', blockRange: { from: 317617480, // Jan 31 2025 // Check out // https://portal.sqd.dev/datasets/solana-mainnet/metadata // for up-to-date info on the earliest available block }, args: { token: TRACKED_TOKEN, }, // We use the indexer state to track the last block processed // and resume from there state: new ClickhouseState(clickhouse, { table: 'solana_sync_status', id: 'transfers', }), logger: createLogger('transfers'), });...
Eventually we will have a library of reusable components such as the TransferPreBalancesStream class that implements common raw data processing subtasks (such as fetching and decoding all DEx swaps from major Solana exchanges).
And that’s it for the pipes’ architecture explanation! You can now explore these three end-to-end pipelines with more real world utility than this little demo.