Composite Pipes
Process multiple data streams simultaneously across different DEX programs.Copy
const pipeline = source.pipeComposite({
orcaSwaps: solanaInstructionDecoder({
range: { from: 200000000 },
programId: orcaWhirlpool.programId,
instructions: { swap: orcaWhirlpool.instructions.swap },
}),
raydiumSwaps: solanaInstructionDecoder({
range: { from: 200000000 },
programId: raydiumAmm.programId,
instructions: { swapBaseIn: raydiumAmm.instructions.swapBaseIn },
}),
});
for await (const { data } of pipeline) {
console.log(`Orca Swaps: ${data.orcaSwaps.swap.length}`);
console.log(`Raydium Swaps: ${data.raydiumSwaps.swapBaseIn.length}`);
}
Portal Caching
Cache Portal responses locally for faster iteration.Copy
import { portalSqliteCache } from "@subsquid/pipes/portal-cache/node";
const source = solanaPortalSource({
portal: "https://portal.sqd.dev/datasets/solana-mainnet",
cache: portalSqliteCache({
path: "./portal-cache.sqlite",
}),
});
When to Use
- Development iteration
- Testing pipelines
- Repeated processing of same slots
Custom Logging
The Pipes SDK uses a Pino-compatible logger, allowing you to integrate custom log transports and send logs to external services like GCP Cloud Logging, Sentry, or any other Pino-compatible destination.Basic Custom Logger
Pass a custom logger to the source to configure logging for your entire pipeline.Copy
import { createTarget } from "@subsquid/pipes";
import { solanaPortalSource } from "@subsquid/pipes/solana";
import pino from "pino";
const transport = pino.transport({
target: "pino-pretty",
options: {
colorize: true,
translateTime: "HH:MM:ss",
},
});
const source = solanaPortalSource({
portal: "https://portal.sqd.dev/datasets/solana-mainnet",
logger: pino(transport),
});
const target = createTarget({
write: async ({ logger, read }) => {
for await (const { data } of read()) {
logger.info({ count: data.length }, "Processed batch");
}
},
});
await source.pipeTo(target);
Integration with Cloud Services
You can use any Pino transport to send logs to cloud services. Pass the configured logger to the source.- GCP Cloud Logging
- Sentry
- Multiple Transports
Copy
import { createTarget } from "@subsquid/pipes";
import { solanaPortalSource } from "@subsquid/pipes/solana";
import pino from "pino";
const transport = pino.transport({
target: "@google-cloud/logging-pino",
options: {
projectId: "your-project-id",
logName: "solana-pipes-indexer",
},
});
const source = solanaPortalSource({
portal: "https://portal.sqd.dev/datasets/solana-mainnet",
logger: pino(transport),
});
const target = createTarget({
write: async ({ logger, read }) => {
for await (const { data } of read()) {
logger.info(
{
slotsProcessed: data.slots?.length,
instructionsCount: data.swap?.length,
},
"Batch processed"
);
}
},
});
await source.pipeTo(target);
Copy
import { createTarget } from "@subsquid/pipes";
import { solanaPortalSource } from "@subsquid/pipes/solana";
import pino from "pino";
const transport = pino.transport({
target: "pino-sentry-transport",
options: {
sentry: {
dsn: process.env.SENTRY_DSN,
environment: "production",
},
level: "error", // Only send errors to Sentry
},
});
const source = solanaPortalSource({
portal: "https://portal.sqd.dev/datasets/solana-mainnet",
logger: pino(transport),
});
const target = createTarget({
write: async ({ logger, read }) => {
for await (const { data } of read()) {
try {
await processData(data);
logger.info({ count: data.length }, "Batch processed");
} catch (error) {
logger.error({ error, data }, "Failed to process batch");
}
}
},
});
await source.pipeTo(target);
Copy
import { createTarget } from "@subsquid/pipes";
import { solanaPortalSource } from "@subsquid/pipes/solana";
import pino from "pino";
const transport = pino.transport({
targets: [
{
target: "pino-pretty",
options: { colorize: true },
level: "info",
},
{
target: "@google-cloud/logging-pino",
options: { projectId: "your-project-id" },
level: "info",
},
{
target: "pino-sentry-transport",
options: { sentry: { dsn: process.env.SENTRY_DSN } },
level: "error",
},
],
});
const source = solanaPortalSource({
portal: "https://portal.sqd.dev/datasets/solana-mainnet",
logger: pino(transport),
});
const target = createTarget({
write: async ({ logger, read }) => {
for await (const { data } of read()) {
logger.info({ count: data.length }, "Processed batch");
}
},
});
await source.pipeTo(target);
The
ctx.logger in transformers and targets is the same logger instance passed to the source. Configure logging at the source level, then use ctx.logger throughout your pipeline for consistent logging.Custom Metrics
Track custom performance metrics.Copy
const metrics = {
slotsProcessed: 0,
instructionsDecoded: 0,
startTime: Date.now(),
};
const target = createTarget({
write: async ({ read }) => {
for await (const { data } of read()) {
const span = profiler.start("processing");
metrics.slotsProcessed += data.blocks?.length || 0;
metrics.instructionsDecoded += data.swap?.length || 0;
await processData(data);
span.end();
// Log metrics
if (metrics.slotsProcessed % 100 === 0) {
const elapsed = (Date.now() - metrics.startTime) / 1000;
const sps = metrics.slotsProcessed / elapsed;
console.log({
slots: metrics.slotsProcessed,
instructions: metrics.instructionsDecoded,
slotsPerSec: sps.toFixed(2),
});
}
}
},
});
Profiler Usage
Copy
write: async ({ read }) => {
for await (const { data } of read()) {
const decodeSpan = profiler.start("decode");
const decoded = decodeInstructions(data);
decodeSpan.end();
const saveSpan = profiler.start("save");
await database.insert(decoded);
saveSpan.end();
}
};
Indexing Latency
Measure time from slot production to indexing.Copy
const target = createTarget({
write: async ({ logger, read }) => {
for await (const { data } of read()) {
for (const swap of data.swap) {
const slotTime = swap.timestamp.getTime();
const now = Date.now();
const latency = now - slotTime;
logger.info({
slot: swap.blockNumber,
latencyMs: latency,
latencySec: (latency / 1000).toFixed(2),
});
}
}
},
});
RPC Latency Monitoring
Monitor latency between Portal and external RPC providers.Copy
import { solanaRpcLatencyWatcher } from '@subsquid/pipes/solana'
const stream = solanaPortalSource({
portal: 'https://portal.sqd.dev/datasets/solana-mainnet',
query: { from: 'latest' },
}).pipe(
solanaRpcLatencyWatcher({
rpcUrl: ['https://api.mainnet-beta.solana.com'],
}).pipe({
profiler: { id: 'expose metrics' },
transform: (data, { metrics }) => {
if (!data) return
for (const rpc of data.rpc) {
metrics
.gauge({
name: 'rpc_latency_ms',
help: 'RPC Latency in ms',
labelNames: ['url'],
})
.set({ url: rpc.url }, data.rpc[0].portalDelayMs)
}
return data
},
}),
)
for await (const { data } of stream) {
if (!data) continue
console.log(`Slot: ${data.number} / ${data.timestamp.toString()}`)
console.table(data.rpc)
}
Combining Patterns
Copy
// Portal caching + Composite pipes + Custom metrics
const source = solanaPortalSource({
portal: 'https://portal.sqd.dev/datasets/solana-mainnet',
cache: {
adapter: await portalSqliteCache({ path: './cache.sqlite' })
}
})
const metrics = {
orcaSwaps: 0,
raydiumSwaps: 0
}
const pipeline = source.pipeComposite({
orcaSwaps: solanaInstructionDecoder({
profiler: { id: 'Orca' },
range: { from: 200000000 },
programId: orcaWhirlpool.programId,
instructions: { swap: orcaWhirlpool.instructions.swap }
}),
raydiumSwaps: solanaInstructionDecoder({
profiler: { id: 'Raydium' },
range: { from: 200000000 },
programId: raydiumAmm.programId,
instructions: { swapBaseIn: raydiumAmm.instructions.swapBaseIn }
})
})
const target = clickhouseTarget({
client,
onData: async ({store, data, ctx}) => {
metrics.orcaSwaps += data.orcaSwaps.swap.length
metrics.raydiumSwaps += data.raydiumSwaps.swapBaseIn.length
ctx.logger.info(metrics)
store.insert({
table: 'orca_swaps',
values: data.orcaSwaps.swap.map(s => ({...})),
format: 'JSONEachRow'
})
store.insert({
table: 'raydium_swaps',
values: data.raydiumSwaps.swapBaseIn.map(s => ({...})),
format: 'JSONEachRow'
})
},
onRollback: async ({store, safeCursor}) => {
await store.removeAllRows({
tables: ['orca_swaps', 'raydium_swaps'],
where: `slot_number > ${safeCursor.number}`
})
}
})
await pipeline.pipeTo(target)

