Skip to content

Email Auto Responder

IntermediateSignature


The Problem

Here is how most teams handle incoming email at first.

Incoming email
│           GeneralAgent                   │
│                                          │
│   read email  ←──→  write reply          │
         │ sends whatever comes out
      reply (unreviewed)
  • The agent replies without understanding intent. A complaint gets the same treatment as a quick question.
  • Tone is inconsistent. A formal cancellation request might get a casual response.
  • There is no quality gate. A poorly drafted reply goes out as-is.
  • When a reply is bad, you rewrite the prompt and hope. You have no record of why it failed.

You are shipping unreviewed text.


The Plan

We will build a flux based on the Reflection architecture that classifies the email before responding, drafts a calibrated reply, and runs it through a reviewer before sending.

A Classifier reads the email and extracts intent, urgency, and tone. A Drafter uses those signals to write a context-aware reply. A Reviewer scores the draft and decides whether it's ready. If not, a Reviser incorporates the feedback — and the Reviewer runs again. The revision cycle is expressed declaratively with Inline's @{while} construct, so the flux keeps iterating until the draft passes.


Architecture

Incoming email
  Classifier ──── Signature: email_body → intent, urgency, tone, sender_name
  Reviewer ─────── Signature: email_body, draft → approved, feedback, score
  @{ approved == False }
       │  ↺ revise with feedback
       └─ Reviser ── Signature: draft, feedback → draft
          Reviewer (again)
              │ approved == True
         msg.draft  (ready to send)

The flux is:

  • Adaptive — tone and depth are driven by the classified intent, not a fixed prompt
  • Self-correcting — the revision loop runs until quality passes, capped by max_iterations
  • Observable — every classification decision and reviewer score is structured data on msg

Setup

Setup your chat completion model (check dependencies)

Authenticate by setting the OPENAI_API_KEY env variable.

import msgflux as mf

mf.set_envs(OPENAI_API_KEY="...")
model = mf.Model.chat_completion("openai/gpt-4.1-mini")

Authenticate by setting the GROQ_API_KEY env variable.

import msgflux as mf

mf.set_envs(GROQ_API_KEY="...")
model = mf.Model.chat_completion("groq/openai/gpt-oss-120b")

Install Ollama and pull your model first:

ollama pull gpt-oss:120b
import msgflux as mf

model = mf.Model.chat_completion("ollama/gpt-oss:120b")

Authenticate by setting the OPENROUTER_API_KEY env variable.

import msgflux as mf

mf.set_envs(OPENROUTER_API_KEY="...")
model = mf.Model.chat_completion("openrouter/anthropic/claude-opus-4-6")

Authenticate by setting the SAMBANOVA_API_KEY env variable.

import msgflux as mf

mf.set_envs(SAMBANOVA_API_KEY="...")
model = mf.Model.chat_completion("sambanova/openai/gpt-oss-120b")

Self-hosted with an OpenAI-compatible API:

vllm serve openai/gpt-oss-120b
import msgflux as mf

model = mf.Model.chat_completion(
    "vllm/openai/gpt-oss-120b",
    base_url="http://localhost:8000/v1",
)

msgFlux supports 12+ providers. Any provider with an OpenAI-compatible API works:

import msgflux as mf

# Together AI
model = mf.Model.chat_completion("together/openai/gpt-oss-120b")

# Cerebras
model = mf.Model.chat_completion("cerebras/openai/gpt-oss-120b")

Step 1 — Classifying the Email

Before drafting anything, the flux needs to understand what kind of email arrived. The classifier extracts four signals that the rest of the flux depends on: the sender's primary intent, how urgently they need a response, what tone the reply should use, and the sender's name for personalization.

import msgflux as mf
import msgflux.nn as nn
from typing import Literal


class ClassifyEmail(mf.Signature):
    """Classify the incoming email to inform the reply strategy."""

    email_body: str = mf.InputField(desc="The full text of the incoming email")

    intent: Literal[
        "question", "complaint", "request", "follow_up", "cancellation", "praise"
    ] = mf.OutputField(desc="Primary intent of the email")
    urgency: Literal["low", "medium", "high"] = mf.OutputField(
        desc="How urgently this email needs a response"
    )
    tone: Literal["formal", "neutral", "informal"] = mf.OutputField(
        desc="Appropriate reply tone based on sender style"
    )
    sender_name: str = mf.OutputField(desc="Sender's first name extracted from the email")

Step 2 — Drafting and the Review Loop

Three signatures drive the quality loop. DraftReply consumes the classifier's output to write a calibrated first reply. ReviewDraft scores it and decides whether it's ready to send. ReviseDraft takes the reviewer's feedback and rewrites the draft — its output goes to the same rsp namespace as DraftReply, so msg.rsp.draft is updated in place on each iteration.

Draft:

class DraftReply(mf.Signature):
    """Draft a professional reply to the email."""

    email_body: str = mf.InputField(desc="The original email")
    intent: str = mf.InputField(desc="Classified intent")
    urgency: str = mf.InputField(desc="Urgency level")
    tone: str = mf.InputField(desc="Reply tone to use")

    draft: str = mf.OutputField(
        desc="A complete, ready-to-send reply addressing all points raised"
    )

Review:

class ReviewDraft(mf.Signature):
    """Review a draft reply for quality, accuracy, and tone before sending."""

    email_body: str = mf.InputField(desc="The original email")
    draft: str = mf.InputField(desc="The draft reply to review")

    feedback: str = mf.OutputField(
        desc="Specific, actionable feedback if not approved; empty string if approved"
    )
    approved: bool = mf.OutputField(
        desc="True if the draft is ready to send, False if it needs revision"
    )    
    score: float = mf.OutputField(
        desc="Quality score from 0.0 to 1.0 (approved when >= 0.8)"
    )

Revise:

class ReviseDraft(mf.Signature):
    """Revise a draft based on reviewer feedback."""

    current_draft: str = mf.InputField(desc="The current draft that needs improvement")
    feedback: str = mf.InputField(desc="Specific feedback from the reviewer")

    draft: str = mf.OutputField(desc="Improved version of the draft")

Step 3 — Agents

Each agent declares message_fields to read its inputs from the shared msg object and response_mode to write its outputs back to a dedicated namespace. This keeps each agent's output isolated — cls for classification, rsp for the current draft, rev for the review — and makes every field addressable with dotted paths like cls.intent or rev.approved.

class Classifier(nn.Agent):
    model = model
    signature = ClassifyEmail
    message_fields = {"task": {"email_body": "email_body"}}
    response_mode = "cls"
    config = {"verbose": True}


class Drafter(nn.Agent):
    model = model
    signature = DraftReply
    message_fields = {
        "task": {
            # Map input_name to msg field
            "email_body": "email_body",
            "intent": "cls.intent",
            "urgency": "cls.urgency",
            "tone": "cls.tone"
        }
    }
    response_mode = "rsp"
    config = {"verbose": True}


class Reviewer(nn.Agent):
    model = model
    signature = ReviewDraft
    message_fields = {"task": {"email_body": "email_body", "draft": "rsp.draft"}}
    response_mode = "rev"
    config = {"verbose": True}


class Reviser(nn.Agent):
    model = model
    signature = ReviseDraft
    message_fields = {"task": {"current_draft": "rsp.draft", "feedback": "rev.feedback"}}
    response_mode = "rsp"
    config = {"verbose": True}

Step 4 — Wiring the Pipeline

Inline composes the agents into a single flux. The @{rev.approved == False}: reviser -> reviewer; node runs the revision cycle while the reviewer has not approved the draft — then exits when rev.approved is True. The dotted path rev.approved resolves to msg.rev.approved, which is written in-place by the Reviewer's response_mode.

flux = mf.Inline(
    "classifier -> drafter -> reviewer -> @{rev.approved == False}: reviser -> reviewer;",
    {
        "classifier": Classifier(),
        "drafter":    Drafter(),
        "reviewer":   Reviewer(),
        "reviser":    Reviser(),
    },
    max_iterations=5,
)

Tip

max_iterations caps the revision loop. Without it, a consistently failing draft would run indefinitely. Five iterations is a safe upper bound for most cases.


Step 5 — Running the Flux

Pass the email in and let the flux run. Each agent writes to its own namespace on msg; results are accessed via dotted paths after the flux returns.

Example
msg = mf.Message()
msg.email_body = """
Hi there,

I placed an order three weeks ago (order #ORD-9921) and it still hasn't arrived.
The tracking page just says "processing". This is really frustrating — I needed
this for a trip that already happened. I'd like a refund or an explanation.

Thanks,
Maria
"""

flux(msg)

print(f"Intent:      {msg.cls.intent}")
print(f"Urgency:     {msg.cls.urgency}")
print(f"Final score: {msg.rev.score:.2f}")
print(f"\nFinal reply:\n{msg.rsp.draft}")
[classifier][response] {'intent': 'complaint', 'urgency': 'high', 'tone': 'neutral', ...}
[drafter][response]    {'draft': 'Dear Maria, ...'}
[reviewer][response]   {'approved': False, 'score': 0.62, 'feedback': 'Add empathy ...'}
[reviser][response]    {'draft': 'Dear Maria, I sincerely apologize ...'}
[reviewer][response]   {'approved': True, 'score': 0.91, 'feedback': ''}

Intent:      complaint
Urgency:     high
Final score: 0.91

Final reply:
Dear Maria, I sincerely apologize for the inconvenience...
import asyncio

async def main():
    msg = mf.Message()
    msg.email_body = """
    Hi there,

    I placed an order three weeks ago (order #ORD-9921) and it still hasn't arrived.
    The tracking page just says "processing". This is really frustrating — I needed
    this for a trip that already happened. I'd like a refund or an explanation.

    Thanks,
    Maria
    """

    await flux.acall(msg)

    print(f"Intent:      {msg.cls.intent}")
    print(f"Urgency:     {msg.cls.urgency}")
    print(f"Final score: {msg.rev.score:.2f}")
    print(f"\nFinal reply:\n{msg.rsp.draft}")

asyncio.run(main())

Complete Script

Expand full script
import msgflux as mf
import msgflux.nn as nn
from typing import Literal

mf.load_dotenv()
model = mf.Model.chat_completion("openai/gpt-4.1-mini")


class ClassifyEmail(mf.Signature):
    """Classify the incoming email to inform the reply strategy."""

    email_body: str = mf.InputField(desc="The full text of the incoming email")
    intent: Literal[
        "question", "complaint", "request", "follow_up", "cancellation", "praise"
    ] = mf.OutputField(desc="Primary intent of the email")
    urgency: Literal["low", "medium", "high"] = mf.OutputField(
        desc="How urgently this email needs a response"
    )
    tone: Literal["formal", "neutral", "informal"] = mf.OutputField(
        desc="Appropriate reply tone based on sender style"
    )
    sender_name: str = mf.OutputField(desc="Sender's first name extracted from the email")


class DraftReply(mf.Signature):
    """Draft a professional reply to the email."""

    email_body: str = mf.InputField(desc="The original email")
    intent: str = mf.InputField(desc="Classified intent")
    urgency: str = mf.InputField(desc="Urgency level")
    tone: str = mf.InputField(desc="Reply tone to use")
    draft: str = mf.OutputField(desc="A complete, ready-to-send reply addressing all points raised")


class ReviewDraft(mf.Signature):
    """Review a draft reply for quality, accuracy, and tone before sending."""

    email_body: str = mf.InputField(desc="The original email")
    draft: str = mf.InputField(desc="The draft reply to review")
    feedback: str = mf.OutputField(desc="Specific, actionable feedback if not approved")
    approved: bool = mf.OutputField(desc="True if the draft is ready to send")    
    score: float = mf.OutputField(desc="Quality score from 0.0 to 1.0")


class ReviseDraft(mf.Signature):
    """Revise a draft based on reviewer feedback."""

    current_draft: str = mf.InputField(desc="The current draft that needs improvement")
    feedback: str = mf.InputField(desc="Specific feedback from the reviewer")
    draft: str = mf.OutputField(desc="Improved version of the draft")


class Classifier(nn.Agent):
    model = model
    signature = ClassifyEmail
    message_fields = {"task": {"email_body": "email_body"}}
    response_mode = "cls"
    config = {"verbose": True}


class Drafter(nn.Agent):
    model = model
    signature = DraftReply
    message_fields = {"task": {"email_body": "email_body", "intent": "cls.intent", "urgency": "cls.urgency", "tone": "cls.tone"}}
    response_mode = "rsp"
    config = {"verbose": True}


class Reviewer(nn.Agent):
    model = model
    signature = ReviewDraft
    message_fields = {"task": {"email_body": "email_body", "draft": "rsp.draft"}}
    response_mode = "rev"
    config = {"verbose": True}


class Reviser(nn.Agent):
    model = model
    signature = ReviseDraft
    message_fields = {"task": {"current_draft": "rsp.draft", "feedback": "rev.feedback"}}
    response_mode = "rsp"
    config = {"verbose": True}


flux = mf.Inline(
    "classifier -> drafter -> reviewer -> @{rev.approved == False}: reviser -> reviewer;",
    {
        "classifier": Classifier(),
        "drafter":    Drafter(),
        "reviewer":   Reviewer(),
        "reviser":    Reviser(),
    },
    max_iterations=5,
)

Further Reading

  • Inline DSL — flux syntax, branching, and while loops
  • Signatures — declarative input/output contracts for agents
  • Async — running fluxs asynchronously with acall