Skip to content
Go back

DSPy Parallel Chunk Streaming

Suggest Changes

Streaming Named Chunks from Parallel DSPy Modules

Yesterday we covered status streaming, surfacing “working on it…” messages while your agent grinds through tools. This follow-up tackles the next piece: streaming actual response chunks and labeling them by the module that produced them.

The demo below shows two submodules running in parallel, each emitting its own named stream (ideas and evidence) before a final composed answer.

Why This Matters

Showing partial text feels faster than waiting for the final string. Parallel stages make it even better. Different regions of your UI update independently. “Brainstorm” on the left, “Evidence” on the right, both filling in at once.

DSPy makes this trivial. streamify + StreamListener + asyncio.gather give you named, typed chunks with a few lines.

What It Looks Like

ViewWhat you see
ConsoleRows per submodule updating in place
SSEJSON payloads with status, tokens, prediction

Here’s a quick walkthrough of the console + SSE demo:

Your console runner shows a live board with rows per submodule updating in place:

[ideas.brainstorm]    DSPy helps by abstracting the complexity of...
[evidence.probe]      According to the 2024 benchmarks, agents using...
[compose]             Combining these insights...

SSE clients receive JSON payloads:

{ "type": "status", "message": "Spinning ideas..." }
{ "type": "token", "field": "brainstorm", "predictor": "ideas.brainstorm", "chunk": "DSPy helps...", "is_last": false }
{ "type": "prediction", "data": { "answer": "..." } }

How It Works

First, define submodules that output named fields. IdeaGenerator outputs brainstorm, EvidenceCollector outputs evidence:

class IdeaGenerator(dspy.Module):
    def __init__(self):
        super().__init__()
        self.brainstorm = dspy.ChainOfThought("question -> brainstorm")

    async def aforward(self, question: str):
        return await self.brainstorm.acall(question=question)


class EvidenceCollector(dspy.Module):
    def __init__(self):
        super().__init__()
        self.probe = dspy.ChainOfThought("question -> evidence")

    async def aforward(self, question: str):
        return await self.probe.acall(question=question)

The composer fans out to both branches in parallel, then merges their outputs:

class ParallelComposer(dspy.Module):
    def __init__(self):
        super().__init__()
        self.ideas = IdeaGenerator()
        self.evidence = EvidenceCollector()
        self.compose = dspy.ChainOfThought("question, brainstorm, evidence -> answer")

    async def aforward(self, question: str):
        idea_pred, evidence_pred = await asyncio.gather(
            self.ideas.acall(question=question),
            self.evidence.acall(question=question),
        )
        return await self.compose.acall(
            question=question,
            brainstorm=idea_pred.brainstorm,
            evidence=evidence_pred.evidence,
        )

Wiring Up StreamListeners

Bind a StreamListener to each predictor you want to stream. The predict_name becomes the label in your UI:

from dspy.streaming import StreamListener, streamify

program = ParallelComposer()

listeners = [
    StreamListener(
        signature_field_name="brainstorm",
        predict=program.ideas.brainstorm.predict,
        predict_name="ideas.brainstorm",
    ),
    StreamListener(
        signature_field_name="evidence",
        predict=program.evidence.probe.predict,
        predict_name="evidence.probe",
    ),
    StreamListener(
        signature_field_name="answer",
        predict=program.compose.predict,
        predict_name="compose",
    ),
]

streaming_program = streamify(
    program,
    status_message_provider=ParallelStatus(),
    stream_listeners=listeners,
    is_async_program=True,
)

Status Messages

Same pattern as last time. Use instance to identify which module fired:

class ParallelStatus(StatusMessageProvider):
    def module_start_status_message(self, instance, inputs):
        if isinstance(instance, IdeaGenerator):
            return "Spinning ideas..."
        if isinstance(instance, EvidenceCollector):
            return "Pulling evidence..."
        return None

    def module_end_status_message(self, outputs):
        if hasattr(outputs, "brainstorm"):
            return "Ideas ready."
        if hasattr(outputs, "evidence"):
            return "Evidence ready."
        return None

Server-Sent Events

Wrap it in FastAPI and emit SSE, just like before:

@app.post("/v1/stream")
async def stream(query: Query):
    async def event_source():
        async for item in streaming_program(question=query.question):
            if isinstance(item, StatusMessage):
                payload = {"type": "status", "message": item.message}
            elif isinstance(item, StreamResponse):
                payload = {
                    "type": "token",
                    "field": item.signature_field_name,
                    "predictor": item.predict_name,
                    "chunk": item.chunk,
                    "is_last": item.is_last_chunk,
                }
            elif isinstance(item, dspy.Prediction):
                payload = {"type": "prediction", "data": dict(item.items())}
            else:
                continue
            yield f"data: {json.dumps(payload)}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(event_source(), media_type="text/event-stream")

Full Demo

I put together a self-contained script that runs both the console demo and the SSE server:

GitHub Gist: dspy_parallel_chunk_streaming.py

Setup:

export OPENAI_API_KEY="sk-your-key"
python3 -m venv .venv
source .venv/bin/activate
pip install rich dspy

Run:

# Console mode
python dspy_parallel_chunk_streaming.py "Your question here"

# SSE server
python dspy_parallel_chunk_streaming.py --serve
# POST to http://127.0.0.1:8000/v1/stream with {"question": "..."}

Extending This

Add more branches to your gather for research, math, or RAG retrieval. Add matching StreamListener entries to stream each branch. Push chunk metadata into your UI layout. Left column for ideas, right for evidence, bottom for the final answer. Swap FastAPI for WebSockets if that’s your stack.

The payloads are plain JSON. The pattern drops into any agent or RAG pipeline.

With streamify and a few listeners, DSPy gives you parallel, named chunk streams and status updates. No custom token plumbing. Your users see progress and partial answers as they happen.


Suggest Changes
Share this post on:

Next Post
DSPy Status Streaming