Streaming Named Chunks from Parallel DSPy Modules
Yesterday we covered status streaming, surfacing “working on it…” messages while your agent grinds through tools. This follow-up tackles the next piece: streaming actual response chunks and labeling them by the module that produced them.
The demo below shows two submodules running in parallel, each emitting its own named stream (ideas and evidence) before a final composed answer.
Why This Matters
Showing partial text feels faster than waiting for the final string. Parallel stages make it even better. Different regions of your UI update independently. “Brainstorm” on the left, “Evidence” on the right, both filling in at once.
DSPy makes this trivial. streamify + StreamListener + asyncio.gather give you named, typed chunks with a few lines.
What It Looks Like
| View | What you see |
|---|---|
| Console | Rows per submodule updating in place |
| SSE | JSON payloads with status, tokens, prediction |
Here’s a quick walkthrough of the console + SSE demo:
Your console runner shows a live board with rows per submodule updating in place:
[ideas.brainstorm] DSPy helps by abstracting the complexity of...
[evidence.probe] According to the 2024 benchmarks, agents using...
[compose] Combining these insights...
SSE clients receive JSON payloads:
{ "type": "status", "message": "Spinning ideas..." }
{ "type": "token", "field": "brainstorm", "predictor": "ideas.brainstorm", "chunk": "DSPy helps...", "is_last": false }
{ "type": "prediction", "data": { "answer": "..." } }
How It Works
First, define submodules that output named fields. IdeaGenerator outputs brainstorm, EvidenceCollector outputs evidence:
class IdeaGenerator(dspy.Module):
def __init__(self):
super().__init__()
self.brainstorm = dspy.ChainOfThought("question -> brainstorm")
async def aforward(self, question: str):
return await self.brainstorm.acall(question=question)
class EvidenceCollector(dspy.Module):
def __init__(self):
super().__init__()
self.probe = dspy.ChainOfThought("question -> evidence")
async def aforward(self, question: str):
return await self.probe.acall(question=question)
The composer fans out to both branches in parallel, then merges their outputs:
class ParallelComposer(dspy.Module):
def __init__(self):
super().__init__()
self.ideas = IdeaGenerator()
self.evidence = EvidenceCollector()
self.compose = dspy.ChainOfThought("question, brainstorm, evidence -> answer")
async def aforward(self, question: str):
idea_pred, evidence_pred = await asyncio.gather(
self.ideas.acall(question=question),
self.evidence.acall(question=question),
)
return await self.compose.acall(
question=question,
brainstorm=idea_pred.brainstorm,
evidence=evidence_pred.evidence,
)
Wiring Up StreamListeners
Bind a StreamListener to each predictor you want to stream. The predict_name becomes the label in your UI:
from dspy.streaming import StreamListener, streamify
program = ParallelComposer()
listeners = [
StreamListener(
signature_field_name="brainstorm",
predict=program.ideas.brainstorm.predict,
predict_name="ideas.brainstorm",
),
StreamListener(
signature_field_name="evidence",
predict=program.evidence.probe.predict,
predict_name="evidence.probe",
),
StreamListener(
signature_field_name="answer",
predict=program.compose.predict,
predict_name="compose",
),
]
streaming_program = streamify(
program,
status_message_provider=ParallelStatus(),
stream_listeners=listeners,
is_async_program=True,
)
Status Messages
Same pattern as last time. Use instance to identify which module fired:
class ParallelStatus(StatusMessageProvider):
def module_start_status_message(self, instance, inputs):
if isinstance(instance, IdeaGenerator):
return "Spinning ideas..."
if isinstance(instance, EvidenceCollector):
return "Pulling evidence..."
return None
def module_end_status_message(self, outputs):
if hasattr(outputs, "brainstorm"):
return "Ideas ready."
if hasattr(outputs, "evidence"):
return "Evidence ready."
return None
Server-Sent Events
Wrap it in FastAPI and emit SSE, just like before:
@app.post("/v1/stream")
async def stream(query: Query):
async def event_source():
async for item in streaming_program(question=query.question):
if isinstance(item, StatusMessage):
payload = {"type": "status", "message": item.message}
elif isinstance(item, StreamResponse):
payload = {
"type": "token",
"field": item.signature_field_name,
"predictor": item.predict_name,
"chunk": item.chunk,
"is_last": item.is_last_chunk,
}
elif isinstance(item, dspy.Prediction):
payload = {"type": "prediction", "data": dict(item.items())}
else:
continue
yield f"data: {json.dumps(payload)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(event_source(), media_type="text/event-stream")
Full Demo
I put together a self-contained script that runs both the console demo and the SSE server:
GitHub Gist: dspy_parallel_chunk_streaming.py
Setup:
export OPENAI_API_KEY="sk-your-key"
python3 -m venv .venv
source .venv/bin/activate
pip install rich dspy
Run:
# Console mode
python dspy_parallel_chunk_streaming.py "Your question here"
# SSE server
python dspy_parallel_chunk_streaming.py --serve
# POST to http://127.0.0.1:8000/v1/stream with {"question": "..."}
Extending This
Add more branches to your gather for research, math, or RAG retrieval. Add matching StreamListener entries to stream each branch. Push chunk metadata into your UI layout. Left column for ideas, right for evidence, bottom for the final answer. Swap FastAPI for WebSockets if that’s your stack.
The payloads are plain JSON. The pattern drops into any agent or RAG pipeline.
With streamify and a few listeners, DSPy gives you parallel, named chunk streams and status updates. No custom token plumbing. Your users see progress and partial answers as they happen.