This document provides various examples and use cases to illustrate the features and capabilities of the sflow library.
https://sflow-examples.vercel.app/
npm i sflow
Transform and filter a stream of numbers.
import { sflow } from "sflow"; async function example1() { const result = await sflow([1, 2, 3, 4, 5, 6]) .map((n) => n * 2) .log() // Prints: 2, 4, 6, 8, 10, 12 .filter((n) => n > 6) .log() // Prints: 8, 10, 12 .toArray(); console.log(result); // Outputs: [8, 10, 12] } await example1(); console.log("๐", "Done");
Chunk the stream into groups of three elements.
import { sflow } from "sflow"; async function example2() { const result = await sflow([1, 2, 3, 4, 5, 6, 7, 8]).chunk(3).toArray(); console.log(result); // Outputs: [[1, 2, 3], [4, 5, 6], [7, 8]] } await example2(); console.log("๐", "Done");
Debounce and throttle a stream of events.
import { sflow } from "sflow"; const sleep = (ms) => new Promise((r) => setTimeout(r, ms)); async function example3() { const result = await sflow([1, 2, 3, 4, 5]) .debounce(100) .log() // Prints debounced numbers .throttle(200, { drop: true }) .log() // Prints throttled numbers .toArray(); console.log(result); // Outputs: [2, 4] } await example3(); console.log("๐", "Done");
Merge multiple streams together.
import { sflow } from "sflow"; async function example4() { const stream1 = sflow([1, 3, 5]); const stream2 = sflow([2, 4, 6]); const result = await sflow([stream1, stream2]) .confluenceByConcat() .log() // Prints: 1, 2, 3, 4, 5, 6 .toArray(); console.log(result); // Outputs: [1, 2, 3, 4, 5, 6] } await example4(); console.log("๐", "Done");
Parse and format CSV data.
import { sflow } from "sflow"; import { csvParses, csvFormats } from "sflow/xsvStreams"; async function example5() { const csvData = "name,age\nJohn,30\nJane,25"; const result = await sflow(csvData) .through(csvParses("name,age")) .map((record) => ({ ...record, age: Number(record.age) })) .log() // Prints parsed records .through(csvFormats(["name", "age"])) .text(); console.log(result); // Outputs formatted CSV with modified records } await example5(); console.log("๐", "Done");
Unwind nested arrays within objects.
import { sflow } from "sflow"; async function example6() { const data = [ { id: 1, values: [10, 20, 30] }, { id: 2, values: [40, 50] }, ]; const result = await sflow(data) .unwind("values") .log() // Prints unwound records .toArray(); console.log(result); // Outputs: // [ // { id: 1, values: 10 }, // { id: 1, values: 20 }, // { id: 1, values: 30 }, // { id: 2, values: 40 }, // { id: 2, values: 50 } // ] } await example6(); console.log("๐", "Done");
Use pMap
for processing stream items concurrently.
import { sflow } from "sflow"; const sleep = (ms) => new Promise((r) => setTimeout(r, ms)); async function example7() { const result = await sflow([1, 2, 3, 4]) .pMap( async (n) => { await sleep(n * 100); // Simulate async work return n * 2; }, { concurrency: 2 } ) .log() // Prints: 2, 4, 6, 8 .toArray(); console.log(result); // Outputs: [2, 4, 6, 8] } await example7(); console.log("๐", "Done");
Reduce a stream while emitting intermediate states.
import { sflow } from "sflow"; import { reduceEmits } from "sflow/reduceEmits"; async function example8() { const reducer = (sum, value) => ({ next: sum + value, emit: sum + value, }); const result = await sflow([1, 2, 3, 4]) .through(reduceEmits(reducer, 0)) .log() // Prints: 1, 3, 6, 10 .toArray(); console.log(result); // Outputs: [1, 3, 6, 10] } await example8(); console.log("๐", "Done");
Split a string stream into lines and process each line.
import { sflow } from "sflow"; import { lines } from "sflow/lines"; async function example9() { const text = "line1\nline2\nline3"; const result = await sflow(text) .through(lines()) .map((line) => line.toUpperCase()) .log() // Prints: LINE1, LINE2, LINE3 .join("\n") .text(); console.log(result); // Outputs: LINE1\nLINE2\nLINE3 } await example9(); console.log("๐", "Done");
Use Keyv for caching stream data.
import { sflow } from "sflow"; import Keyv from "keyv"; import { cacheTails } from "sflow/cacheTails"; async function example10() { const store = new Keyv({ ttl: 1000 * 60 * 60 }); // Cache for 1 hour const fetchAndProcessData = async () => { // Simulate data fetching const data = [1, 2, 3, 4, 5]; return sflow(data); }; const dataStream = await fetchAndProcessData(); const result = await dataStream .through(cacheTails(store, "my-cache-key")) .log() // Print cached items .toArray(); console.log(result); // Output: [1, 2, 3, 4, 5] } await example10(); console.log("๐", "Done");
Aggregate stream items by a specific key.
import { sflow } from "sflow"; import { reduceEmits } from "sflow/reduceEmits"; async function example11() { const data = [ { category: "A", value: 10 }, { category: "B", value: 20 }, { category: "A", value: 5 }, ]; const reducer = (acc, { category, value }) => { const next = { ...acc, [category]: (acc[category] || 0) + value }; return { next, emit: next }; }; const result = await sflow(data) .through(reduceEmits(reducer, {})) .log() // Prints: {"A": 10}, {"A": 10, "B": 20}, {"A": 15, "B": 20} .toArray(); console.log(result); // Outputs: // [ // { "A": 10 }, // { "A": 10, "B": 20 }, // { "A": 15, "B": 20 } // ] } await example11(); console.log("๐", "Done");
Create and process a stream vector.
import { svector } from "sflow"; async function example12() { const vector = svector(1, 2, 3, 4, 5); const result = await vector .uniq() .map((n) => n * 2) .log() // Prints: 2, 4, 6, 8, 10 .toArray(); console.log(result); // Outputs: [2, 4, 6, 8, 10] } await example12(); console.log("๐", "Done");
Process and respond with distinct streams.
import { sflow } from "sflow"; import { uniq } from "sflow/uniq"; async function example13() { const stream1 = sflow([1, 2, 3, 2, 1]); const stream2 = sflow([4, 5, 5, 6]); const distinctStream1 = stream1.uniq(); const distinctStream2 = stream2.uniq(); const result = await sflow([distinctStream1, distinctStream2]) .merge() .log() // Prints: 1, 4, 2, 5, 3, 6 .toArray(); console.log(result); // Outputs: [1, 4, 2, 5, 3, 6] } await example13(); console.log("๐", "Done");
Conditionally chunk a stream using a custom predicate.
import { sflow } from "sflow"; import { chunkIfs } from "sflow/chunkIfs"; async function example14() { const data = "a,b,c\n1,2,3\nd,s,f"; const result = await sflow(data.split("")) .through(chunkIfs((char) => char !== "\n")) .map((chunk) => chunk.join("")) .log() // Prints: "a,b,c", "\n", "1,2,3", "\n", "d,s,f" .toArray(); console.log(result); // Outputs: ["a,b,c", "\n", "1,2,3", "\n", "d,s,f"] } await example14(); console.log("๐", "Done");
Use a custom reducer to process and emit intermediate states.
import { sflow } from "sflow"; import { reduceEmits } from "sflow/reduceEmits"; async function example15() { const data = [1, 2, 3, 4, 5]; const customReducer = (sum, value) => ({ next: sum + value, emit: sum + value * 2, }); const result = await sflow(data) .through(reduceEmits(customReducer, 0)) .log() // Prints: 2, 6, 12, 20, 30 .toArray(); console.log(result); // Outputs: [2, 6, 12, 20, 30] } await example15(); console.log("๐", "Done");
Hold sequential waiting on async operation in stream.
import { sflow } from "sflow"; const sleep = (ms) => new Promise((r) => setTimeout(r, ms)); async function example16() { const data = [1, 2, 3, 4]; const result = await sflow(data) .asyncMap(async (item) => { await sleep(item * 100); // Simulate async work return item * 2; }) .log() // Prints: 2, 4, 6, 8 .toArray(); console.log(result); // Outputs: [2, 4, 6, 8] } await example16(); console.log("๐", "Done");
Parse TSV data and format it back to TSV.
import { sflow } from "sflow"; import { tsvParses, tsvFormats } from "sflow/xsvStreams"; async function example17() { const tsvData = "name\tage\nJohn\t30\nJane\t25"; const result = await sflow(tsvData) .through(tsvParses("name\tage")) .map((record) => ({ ...record, age: Number(record.age) + 1 })) .log() // Prints: { name: "John", age: 31 }, { name: "Jane", age: 26 } .through(tsvFormats("name\tage")) .text(); console.log(result); // Outputs formatted TSV with updated ages } await example17(); console.log("๐", "Done");
Gracefully handle errors within streams.
import { sflow } from "sflow"; import { andIgnoreError } from "sflow/andIgnoreError"; async function example18() { const data = [1, 2, "three", 4]; const result = await sflow(data) .map((x) => { if (typeof x !== "number") { throw new Error("Not a number"); } return x * 2; }) .catch(andIgnoreError(/Not a number/)) .log() // Prints: 2, 4 .toArray(); console.log(result); // Outputs: [2, 4] } await example18(); console.log("๐", "Done");
Buffer stream items within a time interval.
import { sflow } from "sflow"; import { chunkIntervals } from "sflow/chunkIntervals"; const sleep = (ms) => new Promise((r) => setTimeout(r, ms)); async function example19() { const result = await sflow([1, 2, 3, 4]) .forEach(() => sleep(50)) .chunkInterval(100) .log() // Prints: [1, 2], [3, 4] .toArray(); console.log(result); // Outputs: [[1, 2], [3, 4]] } await example19(); console.log("๐", "Done");
Implement a custom transformation using TransformStream
.
import { sflow } from "sflow"; async function example20() { const uppercaseTransform = new TransformStream({ transform(chunk, controller) { controller.enqueue(chunk.toUpperCase()); }, }); const result = await sflow(["hello", "world"]) .through(uppercaseTransform) .log() // Prints: "HELLO", "WORLD" .toArray(); console.log(result); // Outputs: ["HELLO", "WORLD"] } await example20(); console.log("๐", "Done");
Implement a custom transformation using TransformStream
.
import { sflow } from "sflow"; async function run() { let result = await sflow([1, 2, 3, 4]) .map((n) => n * 2) .log((e) => "stage 1 - " + e) // this stage prints 2, 4, 6, 8 .filter((n) => n > 4) .log((e) => "stage 2 - " + e) // this stage prints 6, 8 .reduce((a, b) => a + b, 0) // first emit 0+6=6, second emit 0+6+8=14 .log((e) => "stage 3 - " + e) // this stage prints 6, 14 .toArray(); console.log(result); // Outputs: [6, 14] } await run(); // "stage 1 - 2" // "stage 1 - 4" // "stage 1 - 6" // "stage 2 - 6" // 2, 4 is filtered out in stage 2 // "stage 1 - 8" // "stage 3 - 6" // 0+6 // "stage 2 - 8" // "stage 3 - 14" // 0+6+8 // [ 6, 14 ] // results
import { sflow } from "sflow"; async function BEFORE() { const url = "/music.md"; // need 4 awaits, 2 extra variables, and fetching is not start by parallel let result1 = (await (await fetch(url)).text()).replace(/#.*/gm, ""); let result2 = (await (await fetch(url)).text()).replace(/#.*/gm, ""); const result = result1 + "\n" + result2; // and it would be even more ugly if you try rewrite it with Promise.all(...) console.log(result); } async function AFTER() { // WIP const url = "/music.md"; // need only 1 awaits, reuseable replace logic let result = await sflow(fetch(url), fetch(url)) .map((e) => e.text()) .replace(/#.*/gm, "") .join("\n") .text(); console.log(result); } await BEFORE(); await AFTER();
.
in the example repo, and edit as you want, then createTry to create your first PR here! https://github.dev/snomiao/sflow-examples