# On truncation of logs
Source: https://docs.soldexer.dev/api-reference/data-notes/logs-truncation
Important information for users of Solana program logs data
Solana programs output logs as they run. You can obtain these from:
* Soldexer API's `logs` [data request](/api-reference/endpoint/post-stream)
* other Soldexer API's data requests as related data (e.g. when [requesting transactions with `"logs": true`](/api-reference/endpoint/post-stream#body-transactions-logs))
* from Solana RPCs via the [`logsSubscribe` method](https://solana.com/docs/rpc/websocket/logssubscribe)
and other sources.
This data has a limitation that all users of this data must be aware of: **logs are truncated if their total size per transaction exceeds 10kb**.
More precisely, Solana runtime tracks the number of bytes as it outputs the log line by line. As soon as the number exceeds 10000 it outputs the final line reading `Log truncated` and stops the log writing operation. The only way to get around this limitation is to patch the runtime.
Here's the code that does that in common Solana node implementations: [modern](https://github.com/anza-xyz/agave/blob/31a17ed5ce73de2f1050392aeadeb1fd2b12424a/log-collector/src/lib.rs#L31-L40), [historical](https://github.com/solana-labs/solana/blob/7700cb3128c1f19820de67b81aa45d18f73d2ac0/program-runtime/src/log_collector.rs#L31-L40). Example of a transaction affected by the truncation (scroll to the very bottom in all explorers):
* [SolanaBeach](https://solanabeach.io/transaction/22qXg6rNq4ePwYMi7WY6bN6bKtzvCrU3upkJFv34y17dFWMuDQTDyKqiLuuYU378krkoAu1kmvuersxe5ZcAtSK7) shows the raw program log and there's a "Log truncated" line at the end.
* [explorer.solana.com](https://explorer.solana.com/tx/22qXg6rNq4ePwYMi7WY6bN6bKtzvCrU3upkJFv34y17dFWMuDQTDyKqiLuuYU378krkoAu1kmvuersxe5ZcAtSK7) omits logs from instructions 15-18 in the parsed log.
* [solana.fm](https://solana.fm/tx/22qXg6rNq4ePwYMi7WY6bN6bKtzvCrU3upkJFv34y17dFWMuDQTDyKqiLuuYU378krkoAu1kmvuersxe5ZcAtSK7?cluster=mainnet-alpha) shows logs of all instructions, strongly suggesting that they use a patched node for their data.
Soldexer uses data from a regular non-patched Solana node, so some logs in its dataset are truncated.
The practical consequence on this is that data consumers cannot be fully certain that they will always receive a log message whenever the relevant program code is executed. Whether or not this is acceptable depends on how the data is used.
To avoid having to deal with this issue we recommend using [instructions](/api-reference/endpoint/post-stream#body-instructions) and [token balance updates](/api-reference/endpoint/post-stream#body-token-balances) in place of logs whenever possible. Anchor developed [a special type of no-op instructions](https://www.anchor-lang.com/docs/features/events#emit_cpi) that can be used to mark execution events reliably, similar to how event logs work on EVM. If you program emits these, use them instead of execution logs.
# GET /finalized-head
Source: https://docs.soldexer.dev/api-reference/endpoint/get-finalized-head
/api-reference/openapi.yaml get /finalized-head
Returns the slot number and hash of the highest finalized block available in the dataset, or null if no blocks are available. Useful for diagnotics.
# GET /head
Source: https://docs.soldexer.dev/api-reference/endpoint/get-head
/api-reference/openapi.yaml get /head
Returns the slot number and hash of the highest block available in the dataset, or null if no blocks are available. Takes real-time data into account. Useful for diagnotics.
# GET /metadata
Source: https://docs.soldexer.dev/api-reference/endpoint/get-metadata
/api-reference/openapi.yaml get /metadata
Retrieves metadata describing the dataset, including its name, aliases, start block, and real-time status.
# POST /finalized-stream
Source: https://docs.soldexer.dev/api-reference/endpoint/post-finalized-stream
/api-reference/openapi.yaml post /finalized-stream
Streams only finalized blocks matching the provided data query. Query structure is identical to that of the /stream endpoint. Required request headers: `Content-Type: application/json`; optional request headers: `Accept-Encoding: gzip` `Content-Encoding: gzip`
# POST /stream
Source: https://docs.soldexer.dev/api-reference/endpoint/post-stream
/api-reference/openapi.yaml post /stream
Streams a list of blocks matching the provided data query, potentially including real-time data. Required request headers: `Content-Type: application/json`; optional request headers: `Accept-Encoding: gzip` `Content-Encoding: gzip`
# Introduction
Source: https://docs.soldexer.dev/api-reference/introduction
Try our new Solana dataset starting from genesis! See [Base URL](#base-url)
Request the data you need by [POSTing to `/stream`](/api-reference/endpoint/post-stream) and you shall receive a stream of newline-separated JSON objects that package your data by block. For example
```bash theme={null}
curl https://portal.sqd.dev/datasets/solana-mainnet/stream \
-X POST \
-d '{
"type": "solana",
"fromBlock": 325000000,
"toBlock": 325000001,
"fields": {
"block": {"number": true},
"instruction": {"data": true}
},
"instructions": [
{"programId": [
"MoonCVVNZFSYkqNXP6bxHLPL6QQJiMagDL3qcqUQTrG"
]}
]
}'
```
outputs
```jsonl theme={null}
{"header":{"number":325000000},"instructions":[{"data":"XJqfG9ATWCDLLmxbNnKxcQ6KRHveSXFyrM8JwmyievueP"}]}
{"header":{"number":325000001}}
```
Here, we requested all records on instructions executed against the `MoonCVV...Trg` program, with a bare minimum of fields. One such record was found for block 325000000. There were no records of such instructions being executed in block 325000001, but the block was included anyway to indicate the range boundary.
By using this general approach, you can
* Get data on instructions, transactions, logs, balance updates and rewards.
* Ensure that you only get the data you need by using a rich palette of filters.
* Request data related to the data that matches your filters. In the example above we could request all transactions that contain matching instructions, or even all token balance updates caused by such transactions.
Solana data is updated in real time and includes unfinalized blocks, enabling latencies of 1-2 s. Just omit the `toBlock` field in the request and you'll get the latest data.
## Making your own client
Real life protocols and infra introduce a couple of complications that you need to know about to actually build a client:
1. The endpoint may terminate the stream of blocks at any point. To resume the stream the client must update the `fromBlock` and the optional `parentBlockHash` fields of its request and send the request to the API again.
Note that that does mean that our example theoretically could have taken two requests instead of one.
If your request specifies a `toBlock` you can tell that it's done when you get either a block with this slot number or an empty HTTP 200 response.
If the range of your request is entirely above the range of available blocks you'll get an HTTP 204 (No Content).
2. Ensuring data correctness when streaming unfinalized blocks is somewhat complicated. If you don't want to deal with that complexity and don't mind that the newest available data is up to several hours old, use the alternative [`/finalized-stream` endpoint](/api-reference/endpoint/post-finalized-stream); otherwise read on.
Since the API sends the blocks before it can ascertain that they are final, it will occasionally stream blocks that end up being [orphaned](https://academy.binance.com/en/glossary/orphan-block). The client must detect such situations and process them according to its business logic. For example
* if the data is simply written to the memory it may be necessary to overwrite it;
* if the data is transformed and the results are used to populate a database, it may be necessary to roll back the database state changes made due to orphan blocks and re-run the transform on final blocks, etc.
To be able to detect and correctly process orphan blocks the client must:
**A.** Maintain a backlog of hash values for recent blocks (available at the `.header.hash` field; enable it by setting the `.fields.block.hash` field of the request to `true`). Backlog depth of ten blocks should suffice on Solana.
**B.** Be ready to restart its business logic at any block in the backlog. This may involve state rollbacks.
**C.** Supply the `parentBlockHash` request field when resuming the stream after termination.
**D.** Expect HTTP 409 (Conflict) responses from the API. These occur when the server detects that the first block of the request does not exist in the current consensus / is an orphan. The response contains a list of blocks preceeding the one that was requested:
```json theme={null}
{
"previousBlocks": [
{
"number": 21780872,
"hash": "0xf6a96a29423093e947960fcde3cf79730eadacd389fe2ed6cd1c97deb356a12e"
},
{
"number": 21780873,
"hash": "0xcc44e9d4723600bb3078c5e0ab5df0cf7513df2e12e85f8548c5c469083b19bb"
},
{
"number": 21780874,
"hash": "0x1dce783bdb93b72af818addd1e97473d64f6e25ab512ce790a89c7f0976f6a0a"
}
]
}
```
The client must use this response and its block hash backlog to find the latest consensus block it received. If no such block is found it can send another POST request with the earliest block from the current response and a wrong hash (e.g. the one from the backlog): an HTTP 409 response from the API will contain even earlier blocks that might match those found in the backlog. This process can be repeated until the backlog is exhausted.
Once the client finds a non-orphaned block it should restart the business logic and resume the stream at its height.
Some of the shared code for [pipes](/pipes/overview) and the [Squid SDK](/squid-sdk/intro) will deal with unfinalized blocks for you under the hood.
## Base URL
Stable Solana dataset is available at
```
https://portal.sqd.dev/datasets/solana-mainnet
```
Its earliest block is in February 2025.
Feel free to try the new complete dataset (starting from genesis) available at
```
https://portal.sqd.dev/datasets/solana-beta
```
Other SVM networks are TBA.
## Other endpoints
* [GET /metadata](/api-reference/endpoint/get-metadata) - dataset metadata. Includes the number of the lowest available slot.
* [GET /head](/api-reference/endpoint/get-head) - slot number and hash of the highest block of the dataset.
* [GET /finalized-head](/api-reference/endpoint/get-finalized-head) - same, but only for finalized blocks. In the current implementation it will be behind `/head` by some thousands of slots.
* [POST /finalized-stream](/api-reference/endpoint/post-finalized-stream) - same as the main endpoint [POST /stream](/api-reference/endpoint/post-stream), but will only return finalized blocks, up to the height you can get from [GET /finalized-head](/api-reference/endpoint/get-finalized-head)
# Cherry
Source: https://docs.soldexer.dev/external-tools/cherry
A Web3 data library by Steelcake
[Cherry](https://github.com/steelcake/cherry) is a Python library developed by [Steelcake](https://steelcake.com) for building performant blockchain data pipelines. Supports SVM via Soldexer, EVM. Provides built-in tools to decode, validate, and transform blockchain data. Easily write transformations using Polars, DataFusion, Pandas, and more.
Examples:
* [Jupiter swaps](https://github.com/steelcake/cherry/blob/main/examples/jup_swap.py)
* [...more framework examples](https://github.com/steelcake/cherry/tree/main/examples)
* [...SVM pipelines](https://github.com/steelcake/cherry-pipelines/tree/main/src/cherry_pipelines/svm)
Docs:
* [Main page](https://steelcake.github.io/cherry-docs)
* [Using with Soldexer/SQD](https://steelcake.github.io/cherry-docs/providers/sqd.html)
Links:
* [x.com/steelcakedev](https://x.com/steelcakedev)
# Choose your approach
Source: https://docs.soldexer.dev/get-started/choose-your-approach
The three ways to consume the Soldexer API
There are three ways to consume the Soldexer API:
* with [Pipes](/pipes/overview),
* with [Squid SDK](/squid-sdk/intro) or
* with your own custom client ([API Reference](/api-reference/introduction)).
Here's the comparison between the first two architectures:
| | Pipes | Squid SDK |
| ------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------- | --------------------------------- |
| Mental model | Box of LEGO bricks | A framework |
| Peripheral services (DB clients, GraphQL etc) | "Bring your own", occasionally borrowing from Squid SDK | Opinionated comprehensive support |
| Real-time data handling | Yes | Yes |
| Data transform update without resyncing | Yes | No |
| Storing aggregations without the raw data (example: keeping a total DEx volume without keeping any temporally distributed data) | Yes, with partial loss of ability to update the transform | Yes |
| Hosted service | Not yet | [Yes](https://docs.sqd.ai/cloud/) |
# FAQ
Source: https://docs.soldexer.dev/get-started/faq
Common questions and answers about Soldexer
## General Questions
Soldexer is a high-throughput data service for accessing Solana's historical
and real-time data. It processes 10MB+ per request with \~2-3 second latency,
making it significantly faster than running your own node.
Soldexer uses a decentralized network of 2,900+ worker nodes ([SQD
Network](https://network.subsquid.io/dashboard)) to index and process Solana
data. You make API calls to request specific data, and Soldexer returns it
filtered and transformed according to your needs.
[SQD Network](https://network.subsquid.io/dashboard) by [SQD](https://sqd.ai)
provides the decentralized infrastructure that powers Soldexer. The network's
2,900+ worker nodes ensure reliable data processing and transparent
architecture.
Soldexer stays free while in Open Beta. Learn more about pricing here:
[soldexer.dev/#pricing](https://www.soldexer.dev/#pricing).
Here's how Soldexer compares to other solutions:
| Capability | Soldexer | Geyser Node by Helius | Dune Analytics | Other Solutions (QuickNode, Alchemy) |
| -------------------------- | --------------------------------------------------- | -------------------------------------------------------------------------------- | ----------------------------------------------------- | -------------------------------------------------------------------------------------------- |
| Speed & Throughput | ✅ Pull 10MB+ per request with \~2s latency | ⚪ Real-time streaming, optimized for low latency but not bulk data requests | ⚪ SQL queries with variable execution times | ❌ Slower response times for high-volume queries; optimized for general-purpose RPC calls |
| Transformation & Filtering | ✅ Built-in composable pipelines at source level | ⚪ Requires custom plugin development for filtering | ⚪ SQL-based filtering, fixed to predefined SQL syntax | ❌ Limited filtering capabilities; requires external tools for advanced processing |
| Historical Access | ✅ Full archive (in progress) | ❌ Real-time only unless paired with external DB | ✅ Extensive historical access | ⚪ Some archive data available but often slower and less efficient for large-scale queries |
| Setup Complexity | ✅ Ready out-of-the-box via SQD portal | ⚪ Simplified with "Automatic Private Nodes" but needs DevOps for advanced setups | ✅ Easy to start, but limited flexibility | ⚪ Easy setup via managed services but lacks customization options |
| Scalability | ✅ Stateless, horizontally scalable | ⚪ Scalable with dedicated hardware but constrained by infrastructure costs | ⚪ Platform-dependent scalability | ❌ Limited scalability for high-frequency workloads; performance tied to pricing tiers |
| Architecture | ✅ Decentralized (via SQD Network) | ⚪ Centralized validator or custom hosting | ❌ Fully centralized platform | ❌ Fully centralized infrastructure reliant on proprietary APIs |
| Cost Efficiency | ✅ Open infra/transparent pricing | ❌ High infra/validator maintenance | ⚪ Free-to-query for basic use | ❌ Higher costs for enterprise-grade services; pay-as-you-go models can be expensive at scale |
| Developer Experience | ✅ Built for engineers: streaming, filtering, piping | ❌ DIY heavy, plugin extensibility requires expertise | ⚪ SQL-only, no programming extensibility | ⚪ Simplified APIs and dashboards but limited flexibility for custom solutions |
Join our [Telegram community](https://t.me/+Rx3p5SS0PMtmNzQy) for direct
support and discussions.
## Technical Questions
Examples all use the stable and limited `solana-mainnet` dataset. Access the new dataset by
replacing endpoint URLs with
```
https://portal.sqd.dev/datasets/solana-beta
```
Check our API reference section for endpoint documentation, request/response
formats, and example calls. Each endpoint includes usage examples and
parameter descriptions.
Soldexer provides a REST API that works with any programming language or
framework that can make HTTP requests. See our API section to learn more.
Use our granular filtering system to target specific accounts, instructions,
programs, or transaction types. You can combine multiple filters in a single
request for precise data retrieval.
The public endpoint will serve up to 100 requests in any 10 second window to each unique IP.
Yes. Soldexer's REST API and standardized data formats make it easy to
integrate with existing ETL pipelines, databases, or analytics tools.
[Pipes](/pipes/overview) are especially well-suited for being added to
existing infrastructure. It's also not too difficult to build a custom
client specifically for your system, see e.g. [this example by Cherry
library](https://github.com/steelcake/cherry-core/blob/main/python/examples/solana.py).
# Introduction
Source: https://docs.soldexer.dev/get-started/introduction
The Fastest Way to Index and Query Solana Data
Soldexer gives you high-throughput access to Solana's historical and real-time data—built for performance at scale.
## Solana Data, Simplified
We're builders, just like you. We understand the challenges of working with Solana data—so we built Soldexer to make it faster, cleaner, and easier to work with.
## What is Soldexer?
Soldexer is a high-throughput data service for accessing Solana's historical and real-time data, powered by SQD Network. It processes 10MB+ per request with \~2-3 second latency, making it significantly faster than running your own node. Soldexer is currently in Open Beta and free to use.
## Why Soldexer?
Unlike running your own dedicated node (\~\$2k+/mo) or paying for proprietary APIs, Soldexer gives you scalable access, full filtering, and built-in transformations—at a fraction of the effort and cost.
## Key Features
Move beyond block-by-block scans. Grab tens of MBs in a single request.
Receive your data with an average 2-3 second latency.
Target exactly what you need. Filter by accounts, instructions, programs,
and more—directly at the source.
Seamlessly transform raw Solana data into actionable formats with minimal
setup.
## Advanced Capabilities
2,900+ worker nodes power Soldexer via SQD Network to ensure resilient,
transparent architecture.
Plug in transformation steps—decode, reshape, and enrich data without extra
infrastructure.
Stream and query massive workloads with ease—up to 20 million blocks per
second.
Fast, affordable access to real-time and historical Solana data—built to
scale with your needs.
## Resources
Additional resources to help you get the most out of Soldexer:
# Cursor management
Source: https://docs.soldexer.dev/pipes/guides/architecture-deep-dives/cursor-management
How pipelines track progress and resume after restarts
A cursor records the last successfully processed slot. Built-in targets (ClickHouse, Drizzle) handle persistence automatically. When using `createTarget` directly you own the full lifecycle.
## The cursor object
```typescript theme={null}
type BlockCursor = {
number: number // stream resumes from number + 1
hash?: string // slot hash — used as parentBlockHash for fork detection
timestamp?: number // slot timestamp in seconds
}
```
`hash` is the fork detection tripwire: the SDK sends `parentBlockHash = cursor.hash` in each portal request. An absent hash silently skips fork detection for that request. See [cursor semantics](./fork-handling#5-cursor-semantics) for the full picture.
## Startup: range.from and stored cursors
`range.from` in the decoder sets where the stream begins on a first run — before any cursor exists:
```typescript theme={null}
solanaInstructionDecoder({
range: { from: 'latest' }, // chain head
// range: { from: 200_000_000 }, // slot number
// range: { from: '2024-01-01' }, // ISO date string
// range: { from: new Date() }, // Date object
})
```
Once a cursor is stored, `range.from` is ignored — the stream resumes from `cursor.number + 1`.
## The stream id
The `id` on `solanaPortalStream` is the primary key for all stored state:
```typescript theme={null}
solanaPortalStream({ id: 'my-pipeline', ... })
```
Both built-in targets use it to isolate state records, so multiple streams can share one physical table. **Never rename an active stream's id** — the stored cursor is keyed on it, and renaming causes the pipeline to restart from `range.from`.
## ClickHouse target
`clickhouseTarget` saves the cursor after every successful `onData` call and resolves fork and crash-recovery callbacks automatically.
```typescript theme={null}
clickhouseTarget({
client,
settings: {
id: 'my-stream', // overrides solanaPortalStream id (default: 'stream')
table: 'sync', // state table name (default: 'sync')
database: 'default', // ClickHouse database
maxRows: 10_000, // cursor rows to keep per stream id (default: 10,000)
},
onData: ...,
onRollback: ...,
})
```
**`onRollback` is called in two situations:**
* `type: 'offset_check'` — on every startup when a cursor exists. ClickHouse is non-transactional: a crash between `onData` and the cursor save leaves rows newer than the saved cursor. Delete them here. See [non-transactional databases](./fork-handling#4-state-rollback-atomicity).
* `type: 'blockchain_fork'` — when the portal signals a reorg. The rollback cursor is resolved automatically from stored history; your callback only needs to delete rows after `safeCursor.number`.
The same implementation typically serves both:
```typescript theme={null}
onRollback: async ({ store, safeCursor }) => {
await store.removeAllRows({
tables: ['my_table'],
where: `slot_number > {n:UInt32}`,
params: { n: safeCursor.number },
})
},
```
**State table.** Each row stores the cursor, the last finalized slot, and the unfinalized slot history used for fork recovery. Rows beyond `maxRows` are pruned every 25 saves. Set `maxRows` to cover your network's worst-case reorg depth — see [rollback depth](./fork-handling#3-rollback-depth-and-history-limits).
## Drizzle target
`drizzleTarget` saves the cursor inside the same PostgreSQL transaction as the data write — fully atomic, no crash-recovery pass needed.
```typescript theme={null}
drizzleTarget({
db: drizzle(DB_URL),
tables: [swapsTable], // every table onData writes to — required
settings: {
state: {
id: 'my-stream',
schema: 'public',
table: 'sync',
unfinalizedBlocksRetention: 1000, // cursor rows to keep (default: 1,000)
},
transaction: { isolationLevel: 'serializable' }, // default
},
onData: async ({ tx, data }) => {
await tx.insert(swapsTable).values(...)
},
})
```
**`tables` is required** for every table written in `onData`. At startup the target installs a PostgreSQL trigger on each listed table; the trigger copies the pre-change row into a `__snapshots` table (keyed by slot number and primary key). On a fork the target replays these snapshots in reverse, restoring pre-fork state automatically. Writing to a table not in `tables` raises a runtime error.
Snapshotting only fires for slots at or above the current finalized head — historical slots can never be reorged.
**Advisory lock.** Every batch acquires `pg_try_advisory_xact_lock(hashtext(id))` inside the transaction, preventing concurrent writers on the same stream. Two `drizzleTarget` instances sharing the same `id` will serialize correctly; two with different `id`s run independently.
**Retention.** Snapshot rows below `min(current, finalizedHead) - unfinalizedBlocksRetention` are deleted every 25 batches. Set this to cover your network's worst-case reorg depth.
**Rollback hooks.** `onBeforeRollback` and `onAfterRollback` receive `{ tx, cursor }` and run inside the fork transaction. Use them to perform additional cleanup that the snapshot mechanism cannot cover (e.g., rows in tables not tracked by `tables`).
## Async iterator
When consuming a pipeline with `for await...of` instead of `pipeTo`, the native `[Symbol.asyncIterator]()` always calls `read()` with no cursor — it has no way to accept one. The stream therefore starts from `range.from` on every run.
**Finalized streams.** If the stream only consumes already-finalized slots (no forks possible), rebuilding the stream with `range.from` set to the stored cursor is sufficient:
```typescript theme={null}
let cursor = loadCursor() // BlockCursor | undefined
const stream = solanaPortalStream({
id: 'my-pipeline',
portal: '...',
outputs: solanaInstructionDecoder({
// cursor.number is the last processed slot; resume from the next one
range: { from: cursor ? cursor.number + 1 : 0 },
}),
})
for await (const { data, ctx } of stream) {
await processData(data)
saveCursor(ctx.stream.state.current) // { number, hash, timestamp }
}
```
Save `ctx.stream.state.current` — the full `BlockCursor` of the batch's last slot — not just the number. The `hash` is needed if you later switch to real-time or need the cursor as a fork anchor.
**Real-time streams.** Setting `range.from` to a stored number loses the slot hash. On restart the first request carries no `parentBlockHash`, so fork detection is silently disabled for that request. For real-time streams, use the `pipeToIterator` helper from the [async iteration tab of the fork handling guide](./fork-handling), which accepts an `initialCursor` and passes it directly to `read()` inside `pipeTo`:
```typescript theme={null}
const stream = pipeToIterator(
solanaPortalStream({ id: 'my-pipeline', portal: '...', outputs: solanaInstructionDecoder({ range: { from: 'latest' } }) }),
loadCursor(), // full BlockCursor with hash — passed to read(), not range.from
onFork,
)
for await (const { data, ctx } of stream) {
await processData(data)
saveCursor(ctx.stream.state.current)
}
```
`pipeToIterator` preserves `parentBlockHash` across fork rounds because it uses `pipeTo` internally. On a fresh first run, pass `undefined` as `initialCursor` and the stream begins from `range.from` as normal.
## Custom cursor management
When using `createTarget` directly, you own the full cursor lifecycle.
At the start of `write`, fetch the stored cursor and pass it to `read`:
```typescript theme={null}
write: async ({ read }) => {
const cursor = await db.getLatestCursor()
for await (const { data, ctx } of read(cursor)) {
// ...
}
}
```
After processing each batch, persist the cursor together with the fork-recovery state:
```typescript theme={null}
await db.transaction(async (tx) => {
await writeData(tx, data)
await tx.saveCursor({
cursor: ctx.stream.state.current,
rollbackChain: ctx.stream.state.rollbackChain,
finalized: ctx.stream.head.finalized,
})
})
```
For **transactional stores** (Postgres): save all three fields in the same transaction as the data write. For **non-transactional stores** (ClickHouse): write data first, cursor last, and implement a startup check that detects and corrects any data written after the last cursor save. See [state rollback atomicity](./fork-handling#4-state-rollback-atomicity).
The `fork` callback and the algorithm for resolving rollback cursors from stored history are covered in detail in the [fork handling guide](./fork-handling).
A minimal example showing manual cursor passing in createTarget
Full pipeline with onRollback and onData
Full pipeline including GraphQL API
# Fork handling
Source: https://docs.soldexer.dev/pipes/guides/architecture-deep-dives/fork-handling
Handle Solana forks and rollbacks in real-time streams
When consuming a real-time stream near the chain head, the portal can detect that the client's view of the chain has diverged from the canonical chain — a situation known as a fork or reorg. The portal signals this with an HTTP 409 response containing a sample of slots from the new canonical chain. Your code must find the highest slot that both chains agree on, roll back any state written after that point, and replay from there.
Fork handling is only needed for real-time streams (`range.from: 'latest'`). Historical streams consume already-finalized data and never produce forks. See [Fork detection scope](#7-fork-detection-scope-real-time-streams-only) below.
The SDK provides two patterns for consuming a stream. Both use the same state-tracking logic; they differ in how the fork signal is delivered.
If your pipeline includes a [stateful transformer](../advanced-topics/stateful-transforms#fork-callbacks-in-stateful-transformers), it must also implement a `fork` callback to roll back its own state in lockstep with the target.
The `pipeTo(createTarget({write, fork}))` pattern keeps fork handling completely separate from batch processing. The SDK catches the 409 internally and calls `fork()` with the portal's consensus slot sample; `write()` never sees the interruption and continues iterating batches without restarting.
Two variables span the lifetime of the stream:
```typescript theme={null}
let recentUnfinalizedBlocks: BlockCursor[] = []
let finalizedHighWatermark: BlockCursor | undefined
```
`recentUnfinalizedBlocks` is the local history of unfinalized slots used to find the common ancestor during a fork. `finalizedHighWatermark` tracks the highest finalized slot ever seen — stored as a full `BlockCursor` (number **and** hash) so it can double as a rollback cursor when needed. Both must be declared outside `pipeTo` so `fork()` can access them.
Inside `write()`, append each batch's unfinalized slots to the local history:
```typescript theme={null}
ctx.stream.state.rollbackChain.forEach((bc) => {
recentUnfinalizedBlocks.push(bc)
})
```
`ctx.stream.state.rollbackChain` contains only the slots from **this batch** that are above the current finalized head — it is a per-batch delta, not a full snapshot. Always append to the end; never replace or reorder.
After collecting history, prune slots that are now finalized and cap the queue:
```typescript theme={null}
if (ctx.stream.head.finalized) {
if (!finalizedHighWatermark || ctx.stream.head.finalized.number > finalizedHighWatermark.number) {
finalizedHighWatermark = ctx.stream.head.finalized
}
recentUnfinalizedBlocks = recentUnfinalizedBlocks.filter(b => b.number >= finalizedHighWatermark!.number)
}
recentUnfinalizedBlocks = recentUnfinalizedBlocks.slice(recentUnfinalizedBlocks.length - 1000)
```
Portal instances behind a load balancer can report different finalized heads. Using the **maximum** seen so far (the high-water mark) prevents the pruning threshold from moving backwards when the stream reconnects to a lagging instance. See [consideration 6](#6-load-balanced-portals-and-a-non-monotonic-finalized-head) for details.
`fork()` receives `previousBlocks` — the portal's current-chain sample — and must return the last good slot cursor, or `null` if recovery is impossible:
```typescript theme={null}
fork: async (newConsensusBlocks) => {
const rollbackIndex = findRollbackIndex(recentUnfinalizedBlocks, newConsensusBlocks)
if (rollbackIndex >= 0) {
recentUnfinalizedBlocks.length = rollbackIndex + 1
return recentUnfinalizedBlocks[rollbackIndex]
}
if (finalizedHighWatermark &&
newConsensusBlocks.every(b => b.number < finalizedHighWatermark!.number)) {
recentUnfinalizedBlocks = recentUnfinalizedBlocks.filter(b => b.number <= finalizedHighWatermark!.number)
return finalizedHighWatermark
}
return null
}
```
Three cases: (1) a common ancestor is found in local history — truncate and return it; (2) all `previousBlocks` fall below the finalized high-water mark, meaning the portal's sample doesn't reach local history — return the high-water mark cursor; (3) no recovery possible — return `null`, which surfaces a `ForkCursorMissingError`.
```typescript theme={null}
import { BlockCursor, createTarget } from '@subsquid/pipes'
import { solanaPortalStream, solanaInstructionDecoder } from '@subsquid/pipes/solana'
import * as orcaWhirlpool from './abi/orca_whirlpool/index.js'
async function main() {
let recentUnfinalizedSlots: BlockCursor[] = []
let finalizedHighWatermark: BlockCursor | undefined
await solanaPortalStream({
id: 'forks',
portal: 'https://portal.sqd.dev/datasets/solana-mainnet',
outputs: solanaInstructionDecoder({
programId: orcaWhirlpool.programId,
instructions: { swap: orcaWhirlpool.instructions.swap },
range: { from: 'latest' }
}),
})
.pipeTo(createTarget({
write: async ({read}) => {
for await (const {data, ctx} of read(recentUnfinalizedSlots[recentUnfinalizedSlots.length-1])) {
console.log(`Got ${data.swap.length} swaps`)
ctx.stream.state.rollbackChain.forEach((bc) => { recentUnfinalizedSlots.push(bc) })
if (ctx.stream.head.finalized) {
if (!finalizedHighWatermark || ctx.stream.head.finalized.number > finalizedHighWatermark.number) {
finalizedHighWatermark = ctx.stream.head.finalized
}
recentUnfinalizedSlots = recentUnfinalizedSlots.filter(b => b.number >= finalizedHighWatermark!.number)
}
recentUnfinalizedSlots = recentUnfinalizedSlots.slice(recentUnfinalizedSlots.length - 1000)
}
},
fork: async (newConsensusSlots) => {
const rollbackIndex = findRollbackIndex(recentUnfinalizedSlots, newConsensusSlots)
if (rollbackIndex >= 0) {
recentUnfinalizedSlots.length = rollbackIndex + 1
return recentUnfinalizedSlots[rollbackIndex]
}
if (finalizedHighWatermark &&
newConsensusSlots.every(b => b.number < finalizedHighWatermark!.number)) {
recentUnfinalizedSlots = recentUnfinalizedSlots.filter(b => b.number <= finalizedHighWatermark!.number)
return finalizedHighWatermark
}
return null
}
}))
}
main().then(() => { console.log('\ndone') })
function findRollbackIndex(chainA: BlockCursor[], chainB: BlockCursor[]): number {
let aIndex = 0, bIndex = 0, lastCommonIndex = -1
while (aIndex < chainA.length && bIndex < chainB.length) {
const a = chainA[aIndex], b = chainB[bIndex]
if (a.number < b.number) { aIndex++; continue }
if (a.number > b.number) { bIndex++; continue }
if (a.hash !== b.hash) return lastCommonIndex
lastCommonIndex = aIndex; aIndex++; bIndex++
}
return lastCommonIndex
}
```
The native `[Symbol.asyncIterator]()` on a `PortalSource` cannot handle forks that require multiple 409 rounds. After a fork, the only option with native iteration is to re-create the stream — but the re-created stream's first request carries no `parentBlockHash`, so the portal cannot detect whether the client is still on the wrong chain and will not send the second 409.
The root cause: `pipeTo`'s internal `read()` generator maintains a `cursor` variable across fork rounds. After `target.fork()` returns a rollback cursor it sets `cursor = forkedCursor` before re-entering `self.read(cursor)`, keeping `parentBlockHash` populated on every subsequent request. The native async iterator calls `this.read()` with no cursor and has no equivalent mechanism.
**Workaround:** wrap `pipeTo` in a helper called `pipeToIterator` that bridges its push-based `write()` into a pull-based iterator via a single-item queue with producer acknowledgement. This preserves the `for await...of` interface while using `pipeTo`'s cursor-tracking machinery internally.
Same two variables as the `pipeTo` approach — no extra `resumeCursor` needed, since `pipeTo` handles cursor updates internally:
```typescript theme={null}
let recentUnfinalizedBlocks: BlockCursor[] = []
let finalizedHighWatermark: BlockCursor | undefined
```
The fork callback passed to `pipeToIterator` is identical to `fork()` in the `pipeTo` example — the same three-case logic, the same state mutations:
```typescript theme={null}
async (newConsensusBlocks) => {
const rollbackIndex = findRollbackIndex(recentUnfinalizedBlocks, newConsensusBlocks)
if (rollbackIndex >= 0) {
recentUnfinalizedBlocks.length = rollbackIndex + 1
return recentUnfinalizedBlocks[rollbackIndex]
}
if (finalizedHighWatermark &&
newConsensusBlocks.every(b => b.number < finalizedHighWatermark!.number)) {
recentUnfinalizedBlocks = recentUnfinalizedBlocks.filter(b => b.number <= finalizedHighWatermark!.number)
return finalizedHighWatermark
}
return null
}
```
The SDK awaits this callback before resuming the stream, so `recentUnfinalizedBlocks` is safe to mutate here without additional locking.
Pass the source, the initial cursor, and the fork callback to `pipeToIterator`, then iterate normally:
```typescript theme={null}
const stream = pipeToIterator(source, recentUnfinalizedBlocks.at(-1), onFork)
for await (const {data, ctx} of stream) {
// batch processing — identical to the pipeTo example
}
```
```typescript theme={null}
// WORKAROUND — see explanation above the tab
function pipeToIterator(
source: { pipeTo(t: ReturnType>): Promise },
initialCursor: BlockCursor | undefined,
onFork: (previousBlocks: BlockCursor[]) => Promise
): AsyncIterableIterator<{ data: T; ctx: any }> {
type Slot =
| { k: 'batch'; v: { data: T; ctx: any } }
| { k: 'end' }
| { k: 'error'; err: unknown }
const queue: Slot[] = []
let consumerWake: (() => void) | null = null
let producerAck: (() => void) | null = null
const wake = () => { consumerWake?.(); consumerWake = null }
;(source.pipeTo as any)(createTarget({
write: async ({ read }: any) => {
for await (const batch of read(initialCursor)) {
queue.push({ k: 'batch', v: batch })
wake()
await new Promise(r => { producerAck = r })
}
queue.push({ k: 'end' })
wake()
},
fork: onFork,
})).catch((err: unknown) => { queue.push({ k: 'error', err }); wake() })
return {
async next(): Promise> {
if (!queue.length) await new Promise(r => { consumerWake = r })
const slot = queue.shift()!
if (slot.k === 'end') return { done: true, value: undefined as any }
if (slot.k === 'error') throw slot.err
producerAck?.(); producerAck = null
return { done: false, value: slot.v }
},
[Symbol.asyncIterator]() { return this },
}
}
```
```typescript theme={null}
import { BlockCursor, createTarget } from '@subsquid/pipes'
import { solanaPortalStream, solanaInstructionDecoder } from '@subsquid/pipes/solana'
import * as orcaWhirlpool from './abi/orca_whirlpool/index.js'
// WORKAROUND — pipeToIterator defined above (see implementation expandable)
async function main() {
let recentUnfinalizedSlots: BlockCursor[] = []
let finalizedHighWatermark: BlockCursor | undefined
const stream = pipeToIterator(
solanaPortalStream({
id: 'forks-async',
portal: 'https://portal.sqd.dev/datasets/solana-mainnet',
outputs: solanaInstructionDecoder({
programId: orcaWhirlpool.programId,
instructions: { swap: orcaWhirlpool.instructions.swap },
range: { from: 'latest' }
}),
}),
recentUnfinalizedSlots.at(-1),
async (newConsensusSlots) => {
const rollbackIndex = findRollbackIndex(recentUnfinalizedSlots, newConsensusSlots)
if (rollbackIndex >= 0) {
recentUnfinalizedSlots.length = rollbackIndex + 1
return recentUnfinalizedSlots[rollbackIndex]
}
if (finalizedHighWatermark &&
newConsensusSlots.every(b => b.number < finalizedHighWatermark!.number)) {
recentUnfinalizedSlots = recentUnfinalizedSlots.filter(b => b.number <= finalizedHighWatermark!.number)
return finalizedHighWatermark
}
recentUnfinalizedSlots.length = 0
return null
}
)
for await (const {data, ctx} of stream) {
console.log(`Got ${data.swap.length} swaps`)
ctx.stream.state.rollbackChain.forEach((bc: BlockCursor) => { recentUnfinalizedSlots.push(bc) })
if (ctx.stream.head.finalized) {
if (!finalizedHighWatermark || ctx.stream.head.finalized.number > finalizedHighWatermark.number) {
finalizedHighWatermark = ctx.stream.head.finalized
}
recentUnfinalizedSlots = recentUnfinalizedSlots.filter(b => b.number >= finalizedHighWatermark!.number)
}
recentUnfinalizedSlots = recentUnfinalizedSlots.slice(recentUnfinalizedSlots.length - 1000)
}
}
main().then(() => { console.log('\ndone') })
function findRollbackIndex(chainA: BlockCursor[], chainB: BlockCursor[]): number {
let aIndex = 0, bIndex = 0, lastCommonIndex = -1
while (aIndex < chainA.length && bIndex < chainB.length) {
const a = chainA[aIndex], b = chainB[bIndex]
if (a.number < b.number) { aIndex++; continue }
if (a.number > b.number) { bIndex++; continue }
if (a.hash !== b.hash) return lastCommonIndex
lastCommonIndex = aIndex; aIndex++; bIndex++
}
return lastCommonIndex
}
```
## The common-ancestor search
Both approaches use the same merge-sort scan. Given two ascending-sorted arrays of `BlockCursor` — local history and the portal's `previousBlocks` — `findRollbackIndex` returns the index in local history of the last entry that both chains agree on (same slot number **and** hash):
```typescript theme={null}
function findRollbackIndex(chainA: BlockCursor[], chainB: BlockCursor[]): number {
let aIndex = 0, bIndex = 0, lastCommonIndex = -1
while (aIndex < chainA.length && bIndex < chainB.length) {
const a = chainA[aIndex], b = chainB[bIndex]
if (a.number < b.number) { aIndex++; continue }
if (a.number > b.number) { bIndex++; continue }
if (a.hash !== b.hash) return lastCommonIndex // chains diverged here
lastCommonIndex = aIndex; aIndex++; bIndex++
}
return lastCommonIndex
}
```
The scan advances the pointer for the lower-numbered entry until both point to the same slot number. A hash mismatch means the chains diverged at this number; `lastCommonIndex` holds the last agreement point. Returning `-1` means no common ancestor was found in the sample.
## Edge cases and considerations
**Empty history at stream start.** The rollback chain is built batch-by-batch from `ctx.stream.state.rollbackChain`. Until the first batch arrives the history is empty. A fork arriving before any batch has been processed means `fork()` will find no common ancestor and must return `null`, which the SDK turns into a fatal error. For a long-running process this window is typically acceptable, but it matters for freshly started consumers.
**History gaps from fast-moving finalization.** `rollbackChain` in each batch contains only the slots from *that batch* that are strictly above the current finalized head. A slot that was already at or below the finalized head when its batch was fetched will never appear in any rollback chain and will therefore be absent from history. This can leave gaps in the number sequence. Algorithms that assume a contiguous history will fail; always match by both number *and* hash.
**No finalized-head info in a batch.** When `batch.head.finalized` is absent, no history is accumulated. On networks or portal deployments that do not yet surface finality data, the rollback chain stays empty indefinitely. On such networks fork recovery is impossible unless unfinalized slots are tracked through another mechanism.
**Ascending order, match by hash *and* number.** The API spec requires matching on both. Matching only by number is wrong — different chains can have the same slot number. The array is ordered ascending (lowest number first); the last entry is the most recent slot the portal knows about.
**`previousBlocks` may have no overlap with local history.** The portal sends a bounded sample. If `findRollbackIndex` finds no agreement point at all (returns -1) and no HWM fallback applies, fork recovery is impossible — return `null`. The SDK will surface a `ForkCursorMissingError`. Do not silently roll back to slot 0 or crash.
**Multiple consecutive 409s converge to the common ancestor.** These two cases are distinct from each other: when `findRollbackIndex` *does* find an overlap point, the stream rolls back there and resumes. If the true common ancestor is deeper still — because the `previousBlocks` sample only reached partway — the portal detects another mismatch and sends a fresh 409 with an older window, this time closer to the true ancestor. The stream converges over several rounds. `fork()` must be idempotent across these calls; truncating the history array in place handles this correctly, since each call receives a shorter local history. Database-backed approaches must also handle re-entrant rollback calls.
**Fork deeper than your history.** If you cap rollback history (e.g. to 1000 slots), a reorg deeper than the cap is unrecoverable. Choose the cap based on the worst-case reorg depth for your target network. Ethereum mainnet finalizes within \~64 slots (\~2 epochs), but PoW or pre-finality networks can reorg much deeper. Fail loudly rather than silently replaying from slot 0.
**The finalized slot as the last-resort anchor.** Keep the current finalized slot *in* your rollback history even though it is technically not unfinalized. It is the guaranteed safe floor: the portal will never ask you to roll back past it. Having it available means `fork()` can always return a valid cursor for the deepest possible reorg. Pruning with `number > finalized` instead of `number >= finalized` removes this anchor and makes very deep reorgs unrecoverable.
**History that never gets pruned.** If the portal never sends a finalized head, rollback history will grow without bound. Apply a slot-count cap as a secondary safeguard.
**Business state and rollback-chain history must be rolled back atomically.** For databases with transactions (Postgres), both must be updated in the same transaction — a crash between the two leaves `fork()` computing the wrong rollback point.
**For non-transactional databases (ClickHouse), atomicity is not achievable; use a crash-recovery callback instead.** Write application data first, write the rollback-chain checkpoint second. A crash after data but before the checkpoint save leaves the checkpoint pointing to the previous batch. On every restart, before the stream resumes, the checkpoint cursor should be read and used to purge any rows written after it — this closes the gap. This is how the Pipes SDK ClickHouse target works: `onRollback` is invoked with `type: 'offset_check'` on every startup so user code can delete the partial batch. Because ClickHouse `DELETE`s are asynchronous and unsafe under concurrent writes, the SDK inserts tombstone rows (`sign = -1`) via `CollapsingMergeTree` instead of issuing true deletes; queries that need to see only live rows must use the `FINAL` modifier.
**Rolling back spans multiple batches.** A single reorg can invalidate data written across many batches. Your rollback mechanism must undo *all* rows/documents written after the rollback point, not just the last batch.
**Idempotency of re-processing.** After a rollback the stream replays slots from the rollback cursor forward. Write logic that is not idempotent (e.g. unconditional INSERT instead of UPSERT, incrementing a counter instead of setting it) will corrupt state on replay. Design writes so they are safe to run more than once for the same slot.
**Side effects that cannot be rolled back.** Database writes can be undone; emails, webhook calls, and Kafka publishes cannot. Either defer all external side effects until the slot is finalized, or build a separate reconciliation layer. Treating unfinalized state as permanent is the most common source of production incidents in real-time blockchain consumers.
**The cursor returned from `fork()` is inclusive.** Return the last slot you consider good; the SDK resumes from `cursor.number + 1`. Off-by-one errors cause either duplicate re-processing or skipped slots.
**The cursor hash must be set.** The SDK sends `parentBlockHash = cursor.hash` in the next request so the portal can detect the next fork. A cursor with a missing hash silently disables fork detection for that request.
**The cursor in `write()`'s `read()` call is only the initial startup cursor.** `pipeTo()` handles post-fork cursor updates inside the `read()` generator; `write()` runs continuously through forks and is never restarted by the SDK. The cursor you pass to `read()` is only relevant if `write()` is re-invoked by an external retry mechanism. For in-memory implementations the cursor is effectively always `undefined`.
**Process restart loses in-memory rollback history.** An in-memory rollback chain survives forks but not process restarts. After a restart you have no history. For services that must survive restarts, persist the rollback chain alongside application state and restore it on startup. See [Cursor management](./cursor-management) for patterns.
**The `X-Sqd-Finalized-Head-Number` header can go backwards.** Portal instances behind a load balancer can be at different heights. When a reconnected stream lands on a lagging instance, the `finalized` value in `batch.head.finalized` may be lower than what was previously reported. Do not use the current batch's finalized number as a pruning threshold directly.
**Treat the finalized head as a high-water mark.** Maintain the highest finalized number seen across all batches and key all pruning on that value. For database-backed implementations this is critical: a DELETE keyed on the current (possibly lower) finalized number will over-retain rows on some batches, and under-retain them if the logic is structured the other way.
**A 409 from a lagging instance may have `previousBlocks` entirely below the high-water mark.** Two cases:
* *All* of `previousBlocks` are strictly below the high-water mark. The lagging instance's sample doesn't reach local history. Because the high-water mark is truly final, every correct instance agrees on it: the fork is somewhere *above* it. Return the high-water mark cursor. This requires storing the finalized head as a full `BlockCursor` (number **and** hash), not just a number — the hash is needed for the next request's `parentBlockHash`.
* Some of `previousBlocks` are at or above the high-water mark but no hash match is found. This is a genuine inconsistency at a height the client already considers final. Return `null` and surface the error.
**Forks only occur in the real-time (unfinalized) portion of the stream.** The `/finalized-stream` endpoint never returns a 409. Fork handling is only needed when consuming the `/stream` endpoint with `fromBlock` near or at the chain head. If your range is bounded and entirely in the past, you will never see a fork.
**`parentBlockHash` is the tripwire.** Every request to the portal includes the hash of the last slot the client has seen. A mismatch triggers a 409. Anything that disrupts this — starting from a cursor with a wrong or missing hash, replaying from a checkpoint that has drifted from the chain — will produce spurious fork events.
**`rollbackChain` is per-batch, not cumulative.** It contains only the slots in *this batch* that are above the current finalized head. Treat it as a delta to append to running history, not as a full snapshot of the current unfinalized chain.
**Slots near the finality boundary move between finalized and unfinalized.** A slot that appears in one batch's `rollbackChain` may be at or below the finalized head in the next batch. The pruning filter must remove these once they are finalized, or rollback history will slowly fill with slots that can never be the subject of a reorg.
**Empty `rollbackChain` is valid.** It means either (a) the batch contained no slots above the finalized head, or (b) the finalized head was unknown. Do not treat an empty rollback chain as an error.
**Both arrays must be in ascending order.** The merge-sort scan breaks silently if either array is unsorted. Local history is ascending if you always append to the end; `previousBlocks` from the portal is ascending by protocol convention. After a rollback, the truncated history remains ascending.
**Gaps in slot numbers do not break correctness, only efficiency.** A gap (e.g. slots 100, 101, 103 — 102 missing because it was already finalized) means a fork at 102 resolves by rolling back to 101. The extra re-processing of 102 is harmless because finalized slots are immutable.
**Duplicate entries break the scan.** If the same slot number appears more than once with different hashes in your history, the scan may report the wrong common ancestor. UPSERT rather than INSERT when persisting rollback chain entries to a store.
**Hash comparison requires both sides to be non-null.** `BlockCursor.hash` is optional in the type system. If either side is `undefined`, `undefined !== "0x..."` evaluates to `true`, which looks like a fork on a slot that may be fine. Always verify hashes are present before comparing.
**`fork()` is called synchronously relative to the batch stream.** The SDK awaits `fork()` before resuming the stream. No new batches arrive while `fork()` is running. It is safe to mutate shared state inside `fork()` without additional locking.
**`write()` and `fork()` share mutable state without synchronization.** This is safe only because the SDK never calls them concurrently. If you introduce background workers or async tasks that also read or write rollback state, you must add explicit synchronization.
**The order in which you update rollback history and application state matters.** If you update application state first and crash before updating rollback history, the next restart will not know how far to roll back. Prefer database transactions that update both atomically, or update rollback history first so a crash leaves you conservative — you can always re-process a slot you have already seen.
# A prod-ready pipe
Source: https://docs.soldexer.dev/pipes/prod-ready-pipe
Walkthrough of a production DEX liquidity pipe built with Pipes SDK
Although this is an EVM walkthrough, it shares most of the content with the future Solana version (TBA)
The [pipes-sqdgn-dex-example](https://github.com/subsquid-labs/pipes-sqdgn-dex-example) repo is a full-featured pipe that indexes DEX liquidity events (swaps, mints, burns, syncs, etc.) from Base mainnet for Uniswap V2/V3/V4, Aerodrome Basic/Slipstream, and other forks. It uses Pipes SDK to pull data from a Subsquid Portal, decode EVM logs, transform them into a unified schema, and write to ClickHouse. For what Pipes SDK is and when to use it, see [Why Pipes SDK](/en/sdk/pipes-sdk/solana/why-pipes-sdk).
The entrypoint is [`pipes/evm/liquidity/cli.ts`](https://github.com/subsquid-labs/pipes-sqdgn-dex-example/blob/master/pipes/evm/liquidity/cli.ts). The pipeline is: **Portal source → composite decoders → transform pipe → ClickHouse target**.
## 1. Config and ClickHouse
Config comes from env (e.g. `NETWORK`, `DB_PATH`, `PORTAL_CACHE_DB_PATH`). The CLI creates a ClickHouse client, ensures tables exist, then builds the pipe:
```typescript theme={null}
const config = getConfig();
const client = await createClickhouseClient({ ... });
await ensureTables(client, __dirname, config.network, databaseName);
```
See [`pipes/evm/config.ts`](https://github.com/subsquid-labs/pipes-sqdgn-dex-example/blob/master/pipes/evm/config.ts) for network/portal URLs and [`liquidity.sql`](https://github.com/subsquid-labs/pipes-sqdgn-dex-example/blob/master/pipes/evm/liquidity/liquidity.sql) for schema.
## 2. Portal source
[`portal_source.ts`](https://github.com/subsquid-labs/pipes-sqdgn-dex-example/blob/master/pipes/evm/liquidity/portal_source.ts) wraps the EVM Portal with optional metrics and a SQLite cache so responses can be reused:
```typescript theme={null}
evmPortalSource({
portal: config.portal.url,
metrics: metricsPort ? metricsServer({ port: metricsPort }) : undefined,
cache: portalSqliteCache({ path: portalCacheDbPath }),
});
```
## 3. Composite decoders
[`evm_decoder.ts`](https://github.com/subsquid-labs/pipes-sqdgn-dex-example/blob/master/pipes/evm/liquidity/evm_decoder.ts) defines one decoder per protocol (Uniswap V2/V3/V4, Aerodrome Basic/Slipstream). Each uses `evmDecoder` with `factory()` + a factory SQLite DB to discover pools from factory events, then subscribes to swap/mint/burn/sync/collect (and protocol-specific) events:
```typescript theme={null}
uniswapV2: evmDecoder({
profiler,
range: { from: blockFrom },
contracts: factory({
address: getFactoryAddressesByProtocol(network, 'uniswap_v2'),
event: UniswapV2FactoryEvents.PairCreated,
database,
parameter: 'pair',
}),
events: {
swaps: UniswapV2PairEvents.Swap,
burns: UniswapV2PairEvents.Burn,
mints: UniswapV2PairEvents.Mint,
syncs: UniswapV2PairEvents.Sync,
},
}),
```
The CLI merges all decoders with `pipeComposite({ ...decoders })` so the Portal stream is decoded into multiple typed streams (one per protocol).
## 4. Transform pipe
[`raw_liquidity_event_pipe.ts`](https://github.com/subsquid-labs/pipes-sqdgn-dex-example/blob/master/pipes/evm/liquidity/raw_liquidity_event_pipe.ts) takes the decoded composite and converts each protocol's events into a single `DbLiquidityEvent` shape (pool, tokens, amounts, tick, fee, block/transaction/log indices, etc.) via protocol-specific converters in `converters/`, then sorts by block/tx/log index:
```typescript theme={null}
return ({ uniswapV2, uniswapV3, uniswapV4, aerodromeBasic, aerodromeSlipstream }: InputType) => {
const v2_res = convertV2(network, { uniswapV2 });
// ... convertV3, convertV4, convertAerodromeBasic, convertAerodromeSlipstream
return [...v2_res, ...v3_res, ...v4_res, ...basic_res, ...slipstream_res]
.sort((a, b) => /* by block_number, transaction_index, log_index */);
};
```
## 5. ClickHouse target
[`clickhouse_target.ts`](https://github.com/subsquid-labs/pipes-sqdgn-dex-example/blob/master/pipes/evm/liquidity/clickhouse_target.ts) uses the Pipes `clickhouseTarget` and inserts batches into `liquidity_events_raw` with retry logic:
```typescript theme={null}
clickhouseTarget({
client,
onData: async ({ data, ctx }) => {
await chRetry(logger, 'liquidity_events_raw insert', () =>
client.insert({ table: 'liquidity_events_raw', values: data, format: 'JSONEachRow' })
);
},
});
```
## 6. Materialized view transforms in ClickHouse
After rows land in `liquidity_events_raw`, [liquidity.sql](https://github.com/subsquid-labs/pipes-sqdgn-dex-example/blob/master/pipes/evm/liquidity/liquidity.sql) (from [line 87](https://github.com/subsquid-labs/pipes-sqdgn-dex-example/blob/master/pipes/evm/liquidity/liquidity.sql#L87) onward) defines materialized views that maintain derived tables in ClickHouse. For example, `current_balances_mv` aggregates `balances_history` into `current_balances` The aggregation runs incrementally as new data is inserted into `liquidity_events_raw`.
This approach is commonly used for stateful data transforms / aggregations at SQD. It is elegant and convenient, but its scalability has its limits.
## Putting it together
The full pipeline in `cli.ts`:
```typescript theme={null}
await portalSource
.pipeComposite({ ...decoders })
.pipe(createPipeFunc(config.network, poolMetadataStorage))
.pipeTo(chTarget);
```
Data flows from the Portal (with cache and optional metrics) through the composite decoders, then through the liquidity transform, and finally into ClickHouse. For run instructions and env vars, see the repo [README](https://github.com/subsquid-labs/pipes-sqdgn-dex-example#readme).
# Quickstart
Source: https://docs.soldexer.dev/pipes/quickstart
Bootstrap a Pipes SDK project
# Using with AI
The fastest way to get an AI coding agent productive on a Pipes SDK project is to install the official [Pipes SDK Agent Skill](/en/ai/agent-skills#pipes-sdk-skill):
```bash theme={null}
npx skills add subsquid-labs/skills/pipes-sdk
```
The skill activates automatically on tasks like *"create an indexer for Uniswap V3 swaps"* or *"my indexer is syncing slowly, help me optimize it"*. It covers scaffolding, runtime error diagnosis, sync tuning, and data-quality checks.
Pair the skill with one or both MCP servers so the agent can read live data and look things up:
* [Portal MCP server](/en/ai/mcp-server) — 29 tools for querying blocks, transactions, logs, instructions, and analytics across 225+ datasets. No API key.
* [Documentation MCP server](/en/ai/mcp-server-docs) — search and retrieve these docs from inside the agent.
If you'd rather feed docs into a model directly, the static [`llms.txt`](/llms.txt) (index) and [`llms-full.txt`](/llms-full.txt) (full content) files are kept in sync with the site. See the [AI Development overview](/en/ai/ai-development) for the full menu.
# Scaffolding with Pipes CLI
`pipes-cli` is a work in progress.
In a few minutes, you'll have a running pipe that indexes Orca Whirlpool swap instructions on Solana mainnet into a local PostgreSQL database.
## Prerequisites
* Node.js 22.15+
* `pnpm`
* Docker (for the bundled PostgreSQL container)
## Initialize the project
Run the CLI in the directory where you want the project folder to land:
```bash theme={null}
pnpx @subsquid/pipes-cli@1.0.0-alpha.4 init
```
The CLI prompts for the project folder name, package manager (please stick to `pnpm` for now), sink (please use `ClickHouse` or `Postgres`), network type, network, and template; then installs dependencies and writes a runnable project.
You can supply a JSON config instead of filling the prompts manually. Here's the configuration for Orca Whirlpool swap instructions mentioned above:
```bash theme={null}
pnpx @subsquid/pipes-cli@1.0.0-alpha.4 init --config '{
"projectFolder": "orca-example",
"packageManager": "pnpm",
"sink": "postgresql",
"networkType": "svm",
"network": "solana-mainnet",
"templates": [
{
"templateId": "custom",
"params": {
"contracts": [
{
"contractAddress": "whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc",
"contractName": "OrcaWhirlpool",
"contractEvents": [
{
"name": "swap",
"type": "instruction",
"inputs": [
{ "name": "amount", "type": "u64" },
{ "name": "sqrtPriceLimit", "type": "u128" }
]
}
],
"range": { "from": "latest" }
}
]
}
}
]
}'
```
`--config` also accepts a path to a JSON file.
To inspect the full config schema run
```
pnpx @subsquid/pipes-cli@1.0.0-alpha.4 init --schema
```
## Run the pipeline
The generated project ships with a `docker-compose.yml` that brings up the sink database and the pipeline together:
```bash theme={null}
cd orca-example
docker compose --profile with-pipeline up
```
For an iterative dev loop, run the database in Docker and the pipeline locally:
```bash theme={null}
docker compose up -d # Postgres on :5432
pnpm run db:migrate # apply the generated migration
pnpm run dev # tsx src/index.ts
```
Either way, rows start landing in the `orca_whirlpool_swap` table within a minute.
## What was generated
The project layout:
```
orca-example/
├── src/
│ ├── index.ts # the pipe — source, decoder, target
│ ├── schemas.ts # Drizzle table definitions
│ ├── contracts/ # generated ABI bindings (per program ID)
│ └── utils/
├── migrations/ # SQL migrations generated by drizzle-kit
├── docker-compose.yml # Postgres + optional pipeline service
├── Dockerfile
├── drizzle.config.ts
├── package.json
├── .env # DB_CONNECTION_STR — points at local Postgres
└── README.md
```
The pipe lives in `src/index.ts`. The decoder block defines what to extract + a light transform:
```ts theme={null}
const custom = solanaInstructionDecoder({
range: { from: 'latest' },
programId: ['whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc'],
instructions: {
swap: orcaWhirlpoolInstructions.swap,
},
}).pipe(enrichEvents)
```
The decoder asks the Portal for `swap` instructions on the Orca Whirlpool program. `enrichEvents` (from `src/utils/`) reshapes each decoded instruction into a row matching the Drizzle table. See the [Pipe anatomy](./guides/basic-development/anatomy) and [Handling contract events](./guides/basic-development/handling-events) guides for more info on `evmDecoder()`.
The `main()` function wires the decoder to a [drizzleTarget](./reference/basic-components/target/postgres-drizzle):
```ts theme={null}
export async function main() {
await solanaPortalSource({
id: 'solana-orca-pipe',
portal: 'https://portal.sqd.dev/datasets/solana-mainnet',
outputs: { custom },
}).pipeTo(
drizzleTarget({
db: drizzle(env.DB_CONNECTION_STR),
tables: [orcaWhirlpoolSwapTable],
onData: async ({ tx, data }) => {
for (const values of chunk(data.custom.swap)) {
await tx.insert(orcaWhirlpoolSwapTable).values(values)
}
},
}),
)
}
```
The `id` is a per-pipeline identifier — keep it stable so the [target's cursor](./guides/architecture-deep-dives/cursor-management) survives restarts. See [solanaPortalSource](./reference/basic-components/source) for the full source API and [Pipe anatomy](./guides/basic-development/anatomy) for how the pieces fit together.
## Other examples
The `tokenBalances` template indexes pre/post token balances directly from blocks — no program ID needed. The generated pipe uses [solanaQuery()](./reference/basic-components/query-builder) instead of an instruction decoder.
```json theme={null}
pnpx @subsquid/pipes-cli@1.0.0-alpha.4 init --config '{
"projectFolder": "solana-tokens",
"packageManager": "pnpm",
"sink": "postgresql",
"networkType": "svm",
"network": "solana-mainnet",
"templates": [
{
"templateId": "tokenBalances"
}
]
}'
```
The CLI ships one built-in SVM template — `tokenBalances` — plus the open-ended `custom` template shown at the top of the page.
# Source
Source: https://docs.soldexer.dev/pipes/reference/basic-components/source
API reference for Solana Portal source
The source component connects to SQD Portal and streams blockchain data to your pipeline. It's the starting point for all Pipes SDK data flows.
## solanaPortalSource
Create a Portal source for Solana chains.
```ts theme={null}
solanaPortalSource(config: SolanaPortalSourceConfig): Source
```
**Parameters:**
* `id`: (required) Pipeline ID. Must be unique within any infra shared with other pipelines (DB, logging sinks etc).
* `portal`: (required) Portal API URL or config object.
* String: `"https://portal.sqd.dev/datasets/ethereum-mainnet"`
* Object: `{ url: string, finalized?: boolean }`. When `finalized: true` is set the stream will consist of finalized blocks only and none of the [fork handling machinery](../../guides/architecture-deep-dives/fork-handling) will be required.
* `outputs`: (required) A single query-transformers chain combo or record of named outputs.
* `cache`: (optional) Portal cache instance. If supplied, saves portal responses locally and reuses them when the pipeline re-runs.
* `logger`: (optional) A pino-compatible `Logger` instance or a log level string. Accepted level values: `'fatal'`, `'error'`, `'warn'`, `'info'`, `'debug'`, `'trace'`, `'silent'`, `false`, `null`. Passing `false` or `null` silences all log output. When omitted, a default console logger is used.
* `metrics`: (optional) `metricsServer()` instance for exposing Prometheus metrics.
* `progress`: (optional) Options for progress tracking.
* `profiler`: (optional) Enable the built-in per-batch profiler. See [Profiling](../../guides/advanced-topics/profiling).
**Example:**
```ts theme={null}
import { solanaPortalSource } from "@subsquid/pipes/solana";
import { portalSqliteCache } from "@subsquid/pipes/portal-cache/node";
const source = solanaPortalSource({
id: "orca-swaps",
portal: "https://portal.sqd.dev/datasets/solana-mainnet",
outputs: solanaInstructionDecoder({
range: { from: 200000000 },
programId: orcaWhirlpool.programId,
instructions: { swap: orcaWhirlpool.instructions.swap },
}),
cache: portalSqliteCache({ path: "./cache.sqlite" }),
});
```
### Finalized Blocks
You can configure the source to only receive finalized blocks:
```ts theme={null}
const source = solanaPortalSource({
portal: {
finalized: true,
url: 'https://portal.sqd.dev/datasets/solana-mainnet'
}
});
```
Using finalized blocks eliminates the need for rollback handlers in your targets, simplifying the logic of your pipeline.
## Pipe methods
### pipe()
Chain a single [whole-pipe transformer](./transformer) to the source.
```ts theme={null}
source.pipe(transformer)
```
The returned value behaves exactly as the source.
See also: [Stateful transformers](../../guides/advanced-topics/stateful-transforms).
### pipeTo()
Connect the pipeline to a [target](./target).
```ts theme={null}
source.pipeTo(target)
```
This is a terminal operation: you cannot continue piping after calling this method.
If you want your stream to resume on restarts and properly handle unfinalized data, make sure that the target [manages cursors](../../guides/architecture-deep-dives/cursor-management) and [handles forks](../../guides/architecture-deep-dives/fork-handling) correctly.
### \*[Symbol.asyncIterator]()
Use the pipeline as an async iterator:
```ts theme={null}
for await (const { data } of stream) {
// ... do something with data ...
}
```
On blockchain forks this will throw `ForkException`s - see [Fork handling](../../guides/architecture-deep-dives/fork-handling).
# Transformer
Source: https://docs.soldexer.dev/pipes/reference/basic-components/transformer
API reference for createTransformer and pipe transforms
## createTransformer
Construct a whole-pipe transformer.
```ts theme={null}
createTransformer(config: TransformerOptions): Transformer
```
**Config fields:**
* `transform`: (required) `(data: I, ctx: BatchContext) => O | Promise`. Called once per batch.
* `start`: (optional) `(ctx: StartCtx) => void | Promise`. Called once when the pipe starts. Use this to load state, warm up caches, or query the portal for historical data before the main stream begins.
* `stop`: (optional) `(ctx: StopCtx) => void | Promise`. Called once when the pipe stops.
* `fork`: (optional) `(cursor: BlockCursor, ctx: Ctx) => void | Promise`. Called before the next batch whenever the source detects a chain reorg. `cursor` identifies the last safe slot. See [Fork handling](../../guides/architecture-deep-dives/fork-handling).
* `profiler`: (optional) `{ name: string; hidden?: boolean }`. Overrides the transformer's node name in the [profiler](../../guides/advanced-topics/profiling) tree.
**Example:**
```ts theme={null}
const transformer = createTransformer({
transform: async (data, ctx) => {
ctx.logger.info({ slot: ctx.stream.state.current.number }, 'batch')
return data.map((b) => b.instructions)
},
})
```
## Context variables
Each callback receives a context object. The fields differ by callback.
### `transform(data, ctx: BatchContext)`
`ctx` is the full per-batch context. Fields:
| Field | Type | Description |
| ---------- | -------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `id` | `string` | Pipeline ID — the `id` passed to `solanaPortalStream()`. |
| `logger` | `Logger` | Pino-compatible logger scoped to this batch. Defaults to the source-level logger. |
| `metrics` | `Metrics` | Prometheus metrics registry. Use `ctx.metrics.counter()`, `.gauge()`, `.histogram()`, `.summary()` to register and update custom metrics. See [Metrics](../../guides/advanced-topics/metrics). |
| `profiler` | `Profiler` | Open a span with `ctx.profiler.start('label')`. See [Profiling](../../guides/advanced-topics/profiling). |
| `stream` | `BatchStreamContext` | Per-stream state (see below). |
| `batch` | `BatchMetadata` | Per-batch volume info (see below). |
#### `ctx.stream: BatchStreamContext`
| Field | Type | Description |
| --------------------- | --------------------------- | ----------------------------------------------------------------------------------------- |
| `dataset` | `ApiDataset` | Dataset metadata returned by the portal (chain name, genesis, tier). |
| `head.finalized` | `BlockCursor \| undefined` | Current finalized head known to the portal, if advertised. |
| `head.latest` | `BlockCursor \| undefined` | Current unfinalized head. |
| `state.initial` | `number` | First slot the stream was configured to read. |
| `state.last` | `number` | Last slot the stream intends to read (often `Infinity`). |
| `state.current` | `BlockCursor` | Latest slot in this batch. Cursor has `{ number, hash?, timestamp? }`. |
| `state.rollbackChain` | `BlockCursor[]` | Unfinalized-chain tail — cursors the stream will need to roll back if a fork is detected. |
| `progress` | `ProgressEvent['progress']` | Progress metrics when `progress` is configured on the source; otherwise undefined. |
| `query` | `{ url, hash, raw }` | Debug info for the portal query feeding this batch. |
#### `ctx.batch: BatchMetadata`
| Field | Type | Description |
| --------------------- | ------------------------ | ----------------------------------------------------------------------- |
| `blocksCount` | `number` | Number of slots in this batch. |
| `bytesSize` | `number` | Compressed payload size received from the portal. |
| `requests` | `Record` | Map of HTTP status code → number of responses that produced this batch. |
| `lastBlockReceivedAt` | `Date` | Wall-clock time the last block was received. |
### `start(ctx: StartCtx)`
Fired once, before any batch. Use to warm up caches or run one-off queries.
| Field | Type | Description |
| --------------- | -------------------------- | --------------------------------------------------------------------------- |
| `id` | `string` | Pipeline ID. |
| `logger` | `Logger` | Same as in `BatchContext`. |
| `metrics` | `Metrics` | Same as in `BatchContext`. |
| `portal` | `PortalClient` | Live portal client. Use `portal.getStream(query)` for warm-up reads. |
| `state.initial` | `number` | First slot the stream was configured to read. |
| `state.current` | `BlockCursor \| undefined` | Cursor persisted by the previous run, if any. `undefined` on a fresh start. |
### `fork(cursor, ctx: Ctx)`
Fired before the next batch whenever a reorg is detected. `cursor` is the last slot to keep; drop state produced for anything after it.
| Field | Type | Description |
| ---------- | ---------- | -------------------------- |
| `logger` | `Logger` | Same as in `BatchContext`. |
| `profiler` | `Profiler` | Same as in `BatchContext`. |
### `stop(ctx: StopCtx)`
Fired once when the pipe stops.
| Field | Type | Description |
| -------- | -------- | -------------------------- |
| `logger` | `Logger` | Same as in `BatchContext`. |
# solanaInstructionDecoder
Source: https://docs.soldexer.dev/pipes/reference/utility-components/instruction-decoder
API reference for Solana instruction decoder
See the [Handling program instructions](../../guides/basic-development/handling-instructions) guide for usage examples and ABI generation.
Returns a query-transformer combo that instructs the [source](../basic-components/source) to fetch and decode Solana program instructions.
```ts theme={null}
solanaInstructionDecoder(config: SolanaInstructionDecoderConfig): Transformer
```
**Parameters:**
* `range`: Slot range `{ from: number | string | 'latest', to?: number }` (required)
* `programId`: Program address(es) `string | string[]` (required)
* `instructions`: Map of instruction names to ABI instruction objects (required)
* `profiler`: Profiler config shard that'll be used for labeling the transformer in profiling data `{ name: string }` (optional)
* `onError`: Error handler `(ctx: BatchCtx, error: any) => unknown | Promise` (optional)
**Example:**
```ts theme={null}
const transformer = solanaInstructionDecoder({
range: { from: 200000000, to: 201000000 },
programId: orcaWhirlpool.programId,
instructions: {
swap: orcaWhirlpool.instructions.swap,
swapV2: orcaWhirlpool.instructions.swapV2,
},
});
```
## Decoded instruction structure
Each entry in the output arrays is a `DecodedInstruction` object:
| Field | Type | Description |
| ------------------- | ---------------------------------- | ------------------------------------------------------------------ |
| `instruction` | `{ accounts, data }` | Decoded accounts (by name) and instruction data fields |
| `block` | `{ number: number, hash: string }` | Slot number and block hash |
| `timestamp` | `Date` | Block timestamp |
| `transaction` | `Transaction` | Parent transaction (index, signatures) |
| `innerInstructions` | `Instruction[]` | CPI calls made by this instruction |
| `rawInstruction` | `Instruction` | Raw instruction as received from the portal, including `programId` |
| `tokenBalances` | `TokenBalance[]` | Token balance changes in the parent transaction |
| `blockNumber` | `number` | Deprecated alias for `block.number` |
## Instruction discriminators
Instructions are filtered by discriminators (prefix bytes of instruction data). The discriminator names indicate the number of hex characters returned:
| Name | Hex Characters | Bytes | Common Use |
| ---- | -------------- | ------- | ----------------------------- |
| `d1` | 4 chars | 2 bytes | Simple programs |
| `d2` | 8 chars | 4 bytes | Some native programs |
| `d4` | 14 chars | 7 bytes | Custom programs |
| `d8` | 18 chars | 9 bytes | Anchor programs (most common) |
The decoder automatically extracts the appropriate discriminator from the ABI instruction definition.
## Multiple programs
Decode instructions from multiple programs:
```ts theme={null}
const decoder = solanaInstructionDecoder({
range: { from: 200000000 },
programId: [
orcaWhirlpool.programId,
raydiumAmm.programId,
],
instructions: {
orcaSwap: orcaWhirlpool.instructions.swap,
raydiumSwap: raydiumAmm.instructions.swapBaseIn,
},
});
```
## Error handling
Handle decoding errors:
```ts theme={null}
const decoder = solanaInstructionDecoder({
range: { from: 200000000 },
programId: programId,
instructions: { swap: instructions.swap },
onError: (ctx, error) => {
ctx.logger.warn(`Failed to decode instruction: ${error.message}`);
// Return null to skip this instruction, or throw to stop processing
return null;
},
});
```
# metricsServer
Source: https://docs.soldexer.dev/pipes/reference/utility-components/metrics-server
Expose Prometheus metrics and the live-stats API from your pipe
Start a metrics server on the pipe process. Required by [Pipes UI](../../guides/basic-development/pipes-ui) and by anything that scrapes Prometheus (Grafana, Alertmanager, etc.).
```ts theme={null}
import { metricsServer } from "@subsquid/pipes/metrics/node";
import { solanaPortalStream } from "@subsquid/pipes/solana";
solanaPortalStream({
// ...
metrics: metricsServer({ port: 9090 }),
// ...
});
```
**Parameters:**
* `port`: HTTP port for the server (default: `9090`).
## Endpoints
`metricsServer()` serves four HTTP endpoints on the configured port. They are all also useful for ad-hoc inspection with `curl`.
| Path | Content |
| ----------- | ----------------------------------------------------------------------------------------------------------------------------------- |
| `/stats` | JSON — per-pipe progress, speed, portal query, SDK version. This is what [Pipes UI](../../guides/basic-development/pipes-ui) polls. |
| `/metrics` | Prometheus text — built-in `sqd_*` series plus any custom metrics you registered. Scrape this from Prometheus. |
| `/profiler` | JSON — recent per-batch span trees. See [Profiling](../../guides/advanced-topics/profiling). Empty when profiling is disabled. |
| `/health` | Responds with `ok`. |
## Custom metrics
Register counters, gauges, histograms, and summaries via `ctx.metrics` in [whole-pipe transformers](../basic-components/transformer), [targets](../basic-components/target), or when consuming the pipe as an async iterator. See the [Metrics guide](../../guides/advanced-topics/metrics).
## See also
* [Pipes UI](../../guides/basic-development/pipes-ui) — visual dashboard that consumes `/stats` and `/profiler`.
* [Metrics](../../guides/advanced-topics/metrics) — walkthrough for exposing Prometheus metrics and adding custom series.
* [Profiling](../../guides/advanced-topics/profiling) — interpreting the `/profiler` span tree.
# solanaRpcLatencyWatcher
Source: https://docs.soldexer.dev/pipes/reference/utility-components/rpc-latency-watcher
Monitor RPC latency and compare with Portal performance
The `solanaRpcLatencyWatcher` function monitors the latency between Portal and external RPC providers, helping you track indexing performance.
## Import
```ts theme={null}
import { solanaRpcLatencyWatcher } from '@subsquid/pipes/solana'
```
## Signature
```ts theme={null}
function solanaRpcLatencyWatcher(options: {
rpcUrl: string[]
}): RpcLatencyWatcher
```
## Parameters
| Parameter | Type | Description |
| --------- | ---------- | ------------------------------------- |
| `rpcUrl` | `string[]` | Array of RPC endpoint URLs to monitor |
## Return Value
Returns a transformer that can be piped to add latency data to the stream.
## Output Data Structure
Each output contains:
```ts theme={null}
{
number: number, // Slot number
timestamp: Date, // Block timestamp
rpc: Array<{
url: string, // RPC endpoint URL
receivedAt: Date, // When block was received from RPC
portalDelayMs: number, // Delay between RPC and Portal (ms)
}>
}
```
## Basic Usage
```ts theme={null}
import { solanaPortalSource, solanaRpcLatencyWatcher } from '@subsquid/pipes/solana'
const stream = solanaPortalSource({
portal: 'https://portal.sqd.dev/datasets/solana-mainnet',
outputs: new SolanaQueryBuilder().addFields({ block: { number: true, hash: true, timestamp: true } }).includeAllBlocks({ from: 'latest' }).build(),
}).pipe(
solanaRpcLatencyWatcher({
rpcUrl: ['https://api.mainnet-beta.solana.com'],
})
)
for await (const { data } of stream) {
if (!data) continue
console.log(`Slot: ${data.number}`)
console.table(data.rpc)
}
```
## Example Output
```
-------------------------------------
BLOCK DATA: 369,377,455 / Fri Sep 26 2025 15:31:36 GMT+0400
┌───┬─────────────────────────────────────┬──────────────────────────┬───────────────┐
│ │ url │ receivedAt │ portalDelayMs │
├───┼─────────────────────────────────────┼──────────────────────────┼───────────────┤
│ 0 │ https://api.mainnet-beta.solana.com │ 2025-09-26T11:31:37.075Z │ 358 │
└───┴─────────────────────────────────────┴──────────────────────────┴───────────────┘
```
## Multiple RPC Endpoints
Monitor multiple RPC providers simultaneously:
```ts theme={null}
const stream = solanaPortalSource({
portal: 'https://portal.sqd.dev/datasets/solana-mainnet',
outputs: new SolanaQueryBuilder().addFields({ block: { number: true, hash: true, timestamp: true } }).includeAllBlocks({ from: 'latest' }).build(),
}).pipe(
solanaRpcLatencyWatcher({
rpcUrl: [
'https://api.mainnet-beta.solana.com',
'https://solana-mainnet.rpc.extrnode.com',
],
})
)
```
## Metrics Integration
Export latency data to Prometheus or other monitoring systems:
```ts theme={null}
const stream = solanaPortalSource({
portal: 'https://portal.sqd.dev/datasets/solana-mainnet',
outputs: new SolanaQueryBuilder().addFields({ block: { number: true, hash: true, timestamp: true } }).includeAllBlocks({ from: 'latest' }).build(),
}).pipe(
solanaRpcLatencyWatcher({
rpcUrl: ['https://api.mainnet-beta.solana.com'],
}).pipe({
profiler: { id: 'expose metrics' },
transform: (data, { metrics }) => {
if (!data) return
for (const rpc of data.rpc) {
metrics
.gauge({
name: 'rpc_latency_ms',
help: 'RPC Latency in ms',
labelNames: ['url'],
})
.set({ url: rpc.url }, rpc.portalDelayMs)
}
return data
},
})
)
```
## Prometheus Export
Full example with HTTP metrics endpoint:
```ts theme={null}
import { solanaPortalSource, solanaRpcLatencyWatcher } from '@subsquid/pipes/solana'
import { Registry, Counter } from 'prom-client'
import http from 'http'
const registry = new Registry()
const latencyCounter = new Counter({
name: 'portal_latency_ms_total',
help: 'Total Portal latency in milliseconds',
registers: [registry],
})
const stream = solanaPortalSource({
portal: 'https://portal.sqd.dev/datasets/solana-mainnet',
outputs: new SolanaQueryBuilder().addFields({ block: { number: true, hash: true, timestamp: true } }).includeAllBlocks({ from: 'latest' }).build(),
}).pipe(
solanaRpcLatencyWatcher({
rpcUrl: ['https://api.mainnet-beta.solana.com'],
}).pipe({
transform: (data) => {
if (!data) return
for (const rpc of data.rpc) {
latencyCounter.inc(rpc.portalDelayMs)
}
return data
},
})
)
// Expose metrics endpoint
const server = http.createServer(async (req, res) => {
if (req.url === '/metrics') {
res.setHeader('Content-Type', registry.contentType)
res.end(await registry.metrics())
} else {
res.end('OK')
}
})
server.listen(9090)
for await (const { data } of stream) {
// Process blocks
}
```
## How It Works
The RPC latency watcher:
1. Subscribes to slot updates via WebSocket (`slotsUpdatesSubscribe`)
2. Listens for `optimisticConfirmation` events from each RPC
3. Compares the arrival time of blocks between RPC and Portal
4. Reports the delay as `portalDelayMs`
The measured values include client-side network latency. For RPC, only the arrival time of the block is measured—this does not capture the node's internal processing latency.
## Use Cases
* **Performance monitoring** - Track indexing latency in production
* **RPC comparison** - Compare performance across different RPC providers
* **Alerting** - Trigger alerts when latency exceeds thresholds
* **Optimization** - Identify bottlenecks in your indexing pipeline
# portalSqliteCache
Source: https://docs.soldexer.dev/pipes/reference/utility-components/sqlite-cache
SQLite cache for Portal responses
Create SQLite cache for Portal responses. Use with `solanaPortalSource` to cache Portal API responses locally.
```ts theme={null}
portalSqliteCache(config: { path: string }): PortalCache
```
**Example:**
```ts theme={null}
import { portalSqliteCache } from "@subsquid/pipes/portal-cache/node";
import { solanaPortalSource } from "@subsquid/pipes/solana";
const source = solanaPortalSource({
portal: "https://portal.sqd.dev/datasets/solana-mainnet",
cache: portalSqliteCache({
path: "./portal-cache.sqlite",
}),
});
```
Import from `@subsquid/pipes/portal-cache/node` instead of `@subsquid/pipes/portal-cache`.
### When to Use
* Development iteration
* Testing pipelines
* Repeated processing of same slot ranges.
# Why Pipes SDK?
Source: https://docs.soldexer.dev/pipes/why-pipes-sdk
And when you might want to use it
Pipes SDK is also available for EVM
Pipes SDK is a TypeScript library for retrieving blockchain data from SQD Portals and transforming it. It features:
* **All features of the SQD Portal API:**
* Data is downloaded in big chunks and at a high speed.
* It is filtered on the server side - you only download what you need.
* Real-time data is supported.
* Information on blockchain reorganizations and finality is available; in standard modules these are handled automatically.
* **Being a library**: although Pipes SDK can be used to build full-featured blockchain indexers, it is easy to embed it into larger applications, microservices, or data processing workflows.
* **Reusable modules:** data filters can be bundled with transformation logic, and the resulting modules can be mixed and matched. For example you can create modules to get you data on Raydium swaps and SPL token balance updates, then just plug them into your pipe when you need either.
* **Simplicity of extension:** we've made adding new modules as simple as possible.
* This includes data sinks: adding support for your database, data lake or message queue is no longer a hassle.
## When to Use Pipes SDK
1. You want common protocols (SPL, Jupiter AMM, Raydium etc) handled for you. If all of your data is like that, you can start uploading it into your database in minutes.
2. You need deep customization of any part of the data pipeline.
## When to Use Alternatives
1. If you want to use Portal data in an non-JS app, consider using
* [Raw Portal API](/en/portal/solana/api) for all languages.
2. Consider using [Squid SDK](/en/sdk/squid-sdk/evm) if:
* You're making a self-contained Web3 data service such as a GraphQL API.
* You're looking for an indexing framework similar to TheGraph, Ponder or Envio.
# Brand Kit
Source: https://docs.soldexer.dev/resources/brand-kit
Download Soldexer logos and brand assets
For the complete brand kit and all logo variations, please visit our [Google Drive folder](https://drive.google.com/drive/folders/1XqF3n6Jly5_VxqB-xRPxku4-3mCOjsSL?usp=sharing).
## Full Logos
Download light version of Soldexer logo
Download dark version of Soldexer logo
## Brand Colors
| Color | Hex Code |
| ------- | -------- |
| Primary | #7750EE |
| Black | #000000 |
| White | #FFFFFF |
# Example
Source: https://docs.soldexer.dev/squid-sdk/example
A simple squid processor indexing Orca
Here we look into an Squid SDK indexer (a *squid*) that gets the data about [Orca Exchange](https://www.orca.so/) NFTs, their transfers and owners.
Pre-requisites: Node.js **v20 or newer**, Git, Docker.
## Download the project
Begin by retrieving the template and installing dependencies:
```bash theme={null}
git clone -b portal-api https://github.com/subsquid-labs/solana-example
cd solana-example
npm i
```
## Interfacing with the Whirlpool program
First, we inspect the data available for indexing.
SQD provides a tool (`@subsquid/solana-typegen`) for retrieving program [IDLs](https://www.quicknode.com/guides/solana-development/anchor/what-is-an-idl) off the chain and generating boilerplate ABI code for data decoding. It supports IDLs generated by the [Anchor framework](https://www.anchor-lang.com/) and [Shank](https://github.com/metaplex-foundation/shank).
Generating the ABI code for Whirlpool is done with
```bash theme={null}
npx squid-solana-typegen src/abi whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc#whirlpool
```
Here, `src/abi` is the destination folder and the `whirlpool` suffix sets the base name for the generated file.
At the `src/abi/whirlpool/instructions.ts` we find exports of `instruction` instances for every instruction in the Whirlpool program. Here's one for the `swap` instruction:
```typescript theme={null}
export const swap = instruction(
{
d8: '0xf8c69e91e17587c8',
},
{
tokenProgram: 0,
tokenAuthority: 1,
whirlpool: 2,
tokenOwnerAccountA: 3,
tokenVaultA: 4,
tokenOwnerAccountB: 5,
tokenVaultB: 6,
tickArray0: 7,
tickArray1: 8,
tickArray2: 9,
oracle: 10,
},
struct({
amount: u64,
otherAmountThreshold: u64,
sqrtPriceLimit: u128,
amountSpecifiedIsInput: bool,
aToB: bool,
}),
)
```
Here, `d8` are the eight bytes that the relevant instruction data starts with. In the next section we'll use this discriminator to request pre-filtered `swap` data from the Soldexer API.
## Configuring the data source
"Data source" is a component that defines what data should be retrieved and where to get it. Here's how we configure it to retrieve the `swap` instruction data of the Whirlpool program:
```typescript src/main.ts theme={null}
// ...
import {DataSourceBuilder} from '@subsquid/solana-stream'
import * as whirlpool from './abi/whirpool'
const dataSource = new DataSourceBuilder()
// An SQD Network Portal URL is required
.setPortal('https://portal.sqd.dev/datasets/solana-mainnet')
.setBlockRange({from: 300_000_000})
.addInstruction({
// Select instructions that
where: {
// were executed by the Whirlpool program, and
programId: [whirlpool.programId],
// have the first eight bytes of .data equal to the swap descriptor, and
d8: [whirlpool.instructions.swap.d8],
// come from the USDC-SOL pair, and
...whirlpool.instructions.swap.accountSelection({
whirlpool: ['7qbRF6YsyGuLUVs6Y1q64bdVrfe4ZcUUz1JRdoVNUJnm']
}),
// were successfully committed
isCommitted: true
},
// For each instruction data item selected above
// make sure to also include
include: {
// inner instructions,
innerInstructions: true,
// transaction that executed the instruction,
transaction: true,
// all token balance update records of that transaction
transactionTokenBalances: true,
}
})
// Include the following fields the fetched data items:
.setFields({
block: {
// timestamps for the block headers
timestamp: true
},
transaction: {
// signatures for transactions:
// the first one is used as a tx hash/ID
signatures: true
},
instruction: {
programId: true,
accounts: true,
data: true
},
tokenBalance: {
preAmount: true,
postAmount: true,
preOwner: true,
postOwner: true
}
})
.build()
```
Here,
* `'https://portal.sqd.dev/datasets/solana-mainnet'` is the address of the public SQD Network portal for Solana mainnet.
* `300_000_000` is first Solana slot to be indexed by the processor.
* The argument of `addInstruction()` is a set of filters that tells the processor to retrieve all data on Whirlpool program instructions with discriminator matching the hash of the `:` of the `swap` instruction.
It also instructs the processor to fetch some related data: inner instructions, parent transactions and token balance updates due to these transactions.
Aside from instructions, it's also possible to request transactions, execution logs, token balance updates for SOL and other tokens and reward records, plus the related data for each of these data item types.
Next, we fetch the filtered data from the Soldexer API / SQD Portal, transform it and save the result to our destination of choice - a Postgres DB.
## Transforming and saving data
The `run` function from the `@subsquid/batch-processor` package fetches batches of data from the Soldexer API and runs a transformation function (called *batch handler*) on each batch. Here's how it's called:
```typescript theme={null}
import {run} from '@subsquid/batch-processor'
run(dataSource, database, async ctx => {
// data transformation and persistence code here
})
```
Here,
* `dataSource` is the data source object described in the previous section
* `database` is a compatible data sink implementation
* `async ctx => { ... }` is the batch definition
* `ctx` is a batch context object that exposes a batch of data (at `ctx.blocks`) and any data persistence facilities derived from `database` (at `ctx.store`)
And here is how this call looks in our Whirlpool data processor:
```typescript src/main.ts theme={null}
import {run} from '@subsquid/batch-processor'
import {augmentBlock} from '@subsquid/solana-objects'
import {DataSourceBuilder, SolanaRpcClient} from '@subsquid/solana-stream'
import {TypeormDatabase} from '@subsquid/typeorm-store'
import assert from 'assert'
import * as tokenProgram from './abi/token-program'
import * as whirlpool from './abi/whirpool'
import {Exchange} from './model'
// The data source definition from prev section
const dataSource = ...
// Saving to Postgres via TypeORM
const database = new TypeormDatabase()
run(dataSource, database, async ctx => {
// Block items that we get from `ctx.blocks` are flat JS objects.
//
// We can use `augmentBlock()` function from `@subsquid/solana-objects`
// to enrich block items with references to related objects and
// with convenient getters for derived data (e.g. `Instruction.d8`).
let blocks = ctx.blocks.map(augmentBlock)
let exchanges: Exchange[] = []
for (let block of blocks) {
for (let ins of block.instructions) {
// See https://read.cryptodatabytes.com/p/starter-guide-to-solana-data-analysis
if (ins.programId === whirlpool.programId &&
ins.d8 === whirlpool.instructions.swap.d8) {
// A TypeORM object for an `exchange` table row
let exchange = new Exchange({
id: ins.id,
slot: block.header.slot,
tx: ins.getTransaction().signatures[0],
timestamp: new Date(block.header.timestamp * 1000)
})
// Decoding the inner instructions executed by the Token Program
assert(ins.inner.length == 2)
let srcTransfer = tokenProgram.instructions.transfer.decode(ins.inner[0])
let destTransfer = tokenProgram.instructions.transfer.decode(ins.inner[1])
// Figuring out the exchange params
let srcBalance = ins.getTransaction().tokenBalances.find(tb => tb.account == srcTransfer.accounts.source)
let destBalance = ins.getTransaction().tokenBalances.find(tb => tb.account === destTransfer.accounts.destination)
let srcMint = ins.getTransaction().tokenBalances.find(tb => tb.account === srcTransfer.accounts.destination)?.preMint
let destMint = ins.getTransaction().tokenBalances.find(tb => tb.account === destTransfer.accounts.source)?.preMint
assert(srcMint)
assert(destMint)
// Updating the `Exchange` object and storing it in a buffer
exchange.fromToken = srcMint
exchange.fromOwner = srcBalance?.preOwner || srcTransfer.accounts.source
exchange.fromAmount = srcTransfer.data.amount
exchange.toToken = destMint
exchange.toOwner = destBalance?.postOwner || destBalance?.preOwner || destTransfer.accounts.destination
exchange.toAmount = destTransfer.data.amount
exchanges.push(exchange)
}
}
}
// Batch inserting the `exchange` table rows
await ctx.store.insert(exchanges)
})
```
This goes through all the instructions in the block, verifies that they indeed are `swap` instruction from the Whirlpool program and decodes the data of each inner instruction.
Then it retrieves the info on the exchange from decoded inner instructions, including input and output tokens, source and destination accounts and amounts.
The decoding is done with the `tokenProgram.instructions.transfer.decode` function from the Typescript ABI provided in the template.
At this point the squid is ready for its first test run. Execute
```bash theme={null}
npx tsc
docker compose up -d
npx squid-typeorm-migration apply
node -r dotenv/config lib/main.js
```
You can verify that the data is being stored in the database by running
```bash theme={null}
docker exec "$(basename "$(pwd)")-db-1" psql -U postgres -c "SELECT * FROM exchange"
```
The full code can be found [here](https://github.com/subsquid-labs/solana-example/tree/portal-api).
# Guide
Source: https://docs.soldexer.dev/squid-sdk/guide
All you need to know to write squids
Soldexer-specific Squid SDK master guide is TBA. For now, consult
* [the common Squid SDK master guide](https://docs.sqd.ai/sdk/how-to-start/squid-development/)
* [Solana section of the SQD documentation](https://docs.sqd.ai/solana-indexing/)
* [the `portal-api` branch of the SQD Solana template](https://github.com/subsquid-labs/solana-example/tree/portal-api) - it is modified to work with the SQD portal serving the [Soldexer API](/api-reference)
# Intro
Source: https://docs.soldexer.dev/squid-sdk/intro
The all-in-one solution for indexing Solana
Aside from [pipes](/pipes/overview) you can also use [Squid SDK](https://docs.sqd.ai/sdk/) to access [Soldexer API](/api-reference/introduction). Here are some features bundled in the Squid SDK:
* interfacing the [Soldexer API](/api-reference/introduction)
* writing to a variety of data sinks (Postgres, BigQuery, Parquet/CSV/JSON/JSONL local or S3 files)
* proper handling of real-time data, including rollbacks due to orphan blocks (only with Postgres)
* fast instruction decoding based on JSON IDLs (made with Anchor or Shank)
* an option to serve a GraphQL API using PostGraphile, Hasura or a native SQD GraphQL server
* an option to deploy to [SQD Cloud](https://app.subsquid.io) trusted by 100+ partners
If that's something you'd like, then this section might be for you.
## Squid SDK indexers' architecture
Before you jump into action it's best to know a few key facts:
0. In Squid SDK the indexers are called **squids**.
1. The main component of any squid is its **processor** - a NodeJS process that ingests data from Soldexer or another compatible API, transforms it and stores it to a data sync.
2. Processor fetches and processes data **in batches** of variable size. Each batch contains data for a range of blocks. The size of this range can be anywhere from one to several thousands of blocks. It is automatically determined by the SDK.
3. When it's done processing a batch of data, the processor will send the results to one of its supported data sinks. All such write operations are normally **atomic / transactional**, meaning that if you terminate the indexer in the middle of processing a batch then all code executed on that batch up to that point will have no effect on the data sink.
4. This allows squids to be stopped and restarted at any point without any fear of data corruption. Crashes won't corrupt data, either.
5. When consuming real-time data, the processor will occasionally encounter orphan blocks. When that happens it'll use a log of recent DB operations to roll back any changes to the database state made due to orphan blocks, then restart data processing from the latest known consensus block. All of this happens automatically, with the end result being that the database changes its state to the correct one. If you're simply sending the data to your app's frontend it's usually nothing to be worried about, but you have to plan for that if you're archiving the data with some external tools or are using it in decision making.
6. Consistency guarantees mentioned in pp. 3-5 come with some fine print: you have to use Squid SDK's built-in tools to access your data sink, or your warranty is void. Since squid processors are regular NodeJS apps you can make any calls to external libraries and/or APIs, including writes to arbitrary data sinks. However, you'll have to manage the consistency of any such writes on your own.
Now that you know the basics let's take a look at an actual squid processor!