Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/synix/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

__version__ = "0.22.2"

from synix.agents import Agent, AgentRequest, AgentResult # noqa: F401
from synix.agents import Agent, Group, SynixLLMAgent # noqa: F401
from synix.core.models import ( # noqa: F401
Artifact,
FlatFile,
Expand Down
202 changes: 167 additions & 35 deletions src/synix/agents.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,194 @@
"""Agent gateway interface for Synix transforms.
"""Agent interface for Synix pipeline operations.

Synix owns prompt rendering and transform semantics. The Agent protocol
is an execution gateway --- the transform renders the prompt, the agent
produces the output text.
Agents are named execution units with typed methods matching transform
shapes (map, reduce, group, fold). Each agent has stable identity
(agent_id) and a config snapshot fingerprint for cache invalidation.

The Agent does NOT own prompts, grouping, sorting, fold checkpointing,
search-surface access, or context_budget logic. Those stay in transforms.
The transform renders its prompt (task structure) and passes it as
task_prompt. The agent provides persona/semantic instructions and
executes the LLM call. Both compose: transform defines WHAT to do,
agent defines HOW to do it.

SynixLLMAgent is the built-in implementation backed by PromptStore
for versioned instructions and LLMClient for execution.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Protocol, runtime_checkable

from synix.core.models import Artifact

@dataclass(frozen=True)
class AgentRequest:
"""A rendered synthesis request ready for execution.

The transform has already rendered the prompt. The agent just needs
to produce output text.
"""

prompt: str
max_tokens: int | None = None
metadata: dict[str, Any] = field(default_factory=dict)

@dataclass
class Group:
"""Result of a group operation."""

@dataclass(frozen=True)
class AgentResult:
"""Output from an agent execution."""

key: str
artifacts: list[Artifact]
content: str


@runtime_checkable
class Agent(Protocol):
"""Execution gateway for synthesis transforms.
"""Named execution agent for Synix pipeline operations.

Each method receives typed transform inputs plus a task_prompt
rendered by the transform. The agent composes its own instructions
(persona/semantics) with the task prompt and executes.

Any object with write() and fingerprint_value() methods satisfies
this protocol. Synix does not own the agent's lifecycle, config,
or runtime surface.
agent_id is stable identity (who this agent is).
fingerprint_value() is config snapshot (how it behaves now).
These have separate lifecycles.
"""

def write(self, request: AgentRequest) -> AgentResult:
"""Execute a rendered synthesis request and return output text."""
@property
def agent_id(self) -> str:
"""Stable identity. Changes only when the agent is replaced."""
...

def fingerprint_value(self) -> str:
"""Return a deterministic fingerprint for output-affecting behavior.
"""Config snapshot hash. Changes when instructions/model/config change.
Drives cache invalidation."""
...

def map(self, artifact: Artifact, task_prompt: str) -> str:
"""1:1 — process single artifact.

task_prompt: rendered by the transform with {artifact}, {label}, etc.
"""
...

def reduce(self, artifacts: list[Artifact], task_prompt: str) -> str:
"""N:1 — combine artifacts into one.

task_prompt: rendered by the transform with {artifacts}, {count}, etc.
"""
...

This is a cache-correctness contract. The fingerprint must:
- Be deterministic for the same effective behavior
- Change whenever output-affecting behavior changes
- Be safe to use as part of transform fingerprinting
def group(self, artifacts: list[Artifact], task_prompt: str) -> list[Group]:
"""N:M — assign artifacts to groups and synthesize each.

Examples of what an implementation may include:
model/version, instructions, tool set, endpoint revision,
decoding parameters, externally managed prompt version.
task_prompt: rendered by the transform (may be used per-group).
"""
...

def fold(self, accumulated: str, artifact: Artifact, step: int, total: int, task_prompt: str) -> str:
"""Sequential — one fold step.

task_prompt: rendered by the transform with {accumulated}, {artifact}, {step}, {total}, etc.
"""
...


@dataclass
class SynixLLMAgent:
"""Built-in agent backed by PromptStore + LLMClient.

Instructions (persona/semantics) are loaded from PromptStore by
prompt_key at call time, so edits in the viewer are picked up
automatically. The transform's task_prompt becomes the user message,
agent instructions become the system message.
"""

name: str
prompt_key: str
llm_config: dict | None = None
description: str = ""
_prompt_store: Any = field(default=None, repr=False)

def __post_init__(self):
if not self.name:
raise ValueError("SynixLLMAgent must have a name")
if not self.prompt_key:
raise ValueError("SynixLLMAgent must have a prompt_key")

@property
def agent_id(self) -> str:
return self.name

@property
def instructions(self) -> str:
"""Load current instructions from PromptStore."""
if self._prompt_store is None:
raise ValueError(
f"Agent {self.name!r} has no prompt store — call bind_prompt_store() first"
)
content = self._prompt_store.get(self.prompt_key)
if content is None:
raise ValueError(f"Prompt key {self.prompt_key!r} not found in store")
return content

def fingerprint_value(self) -> str:
"""Hash of prompt content (from store) + llm_config.

Raises ValueError if prompt store is not bound — fingerprinting
requires a known prompt state for cache correctness.
"""
from synix.build.fingerprint import compute_digest, fingerprint_value

if self._prompt_store is None:
raise ValueError(
f"Agent {self.name!r} has no prompt store — "
"bind_prompt_store() before fingerprinting"
)
content_hash = self._prompt_store.content_hash(self.prompt_key) or ""

components = {"prompt_content": content_hash}
if self.llm_config:
components["llm_config"] = fingerprint_value(self.llm_config)
return compute_digest(components)

def bind_prompt_store(self, store) -> SynixLLMAgent:
"""Bind a PromptStore. Returns self for chaining."""
self._prompt_store = store
return self

def map(self, artifact: Artifact, task_prompt: str) -> str:
return self._call(task_prompt)

def reduce(self, artifacts: list[Artifact], task_prompt: str) -> str:
return self._call(task_prompt)

def group(self, artifacts: list[Artifact], task_prompt: str) -> list[Group]:
raise NotImplementedError(
f"SynixLLMAgent {self.name!r} does not implement group(). "
"See issue #127 for agent-driven grouping."
)

def fold(self, accumulated: str, artifact: Artifact, step: int, total: int, task_prompt: str) -> str:
return self._call(task_prompt)

def _call(self, task_prompt: str) -> str:
"""Execute LLM call: instructions as system, task_prompt as user."""
from synix.build.llm_client import LLMClient
from synix.core.config import LLMConfig

config = LLMConfig.from_dict(self.llm_config or {})
client = LLMClient(config)
response = client.complete(
messages=[
{"role": "system", "content": self.instructions},
{"role": "user", "content": task_prompt},
],
artifact_desc=f"agent:{self.name}",
)
return response.content

@classmethod
def from_file(
cls,
name: str,
prompt_key: str,
instructions_path: str | Path,
prompt_store,
**kwargs,
) -> SynixLLMAgent:
"""Create agent and seed its instructions into the PromptStore from a file."""
content = Path(instructions_path).read_text()
prompt_store.put(prompt_key, content)
agent = cls(name=name, prompt_key=prompt_key, **kwargs)
agent.bind_prompt_store(prompt_store)
return agent
1 change: 1 addition & 0 deletions src/synix/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class Artifact:
artifact_id: str = ""
input_ids: list[str] = field(default_factory=list)
prompt_id: str | None = None
agent_id: str | None = None
agent_fingerprint: str | None = None
model_config: dict | None = None
created_at: datetime = field(default_factory=datetime.now)
Expand Down
21 changes: 8 additions & 13 deletions src/synix/ext/fold_synthesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,12 @@ def execute(self, inputs: list[Artifact], ctx: TransformContext) -> list[Artifac
client = _get_llm_client(ctx)
model_config = ctx.llm_config
agent_fingerprint = None
agent_id_val = None
else:
client = None
model_config = None
agent_fingerprint = self.agent.fingerprint_value()
agent_id_val = self.agent.agent_id

sorted_inputs = self._sort_inputs(inputs)
transform_fp = self.compute_fingerprint(ctx.to_dict() if hasattr(ctx, "to_dict") else ctx)
Expand Down Expand Up @@ -185,19 +187,11 @@ def execute(self, inputs: list[Artifact], ctx: TransformContext) -> list[Artifac
)

if self.agent is not None:
from synix.agents import AgentRequest

result = self.agent.write(AgentRequest(
prompt=rendered,
metadata={
"transform_name": self.name,
"shape": "fold",
"step": step,
"total": total,
"input_label": inp.label,
},
))
accumulated = result.content
logger.info(
"FoldSynthesis %r: agent %r fold step %d/%d (input: %s)",
self.name, self.agent.agent_id, step, total, inp.label,
)
accumulated = self.agent.fold(accumulated, inp, step, total, rendered)
else:
response = _logged_complete(
client,
Expand Down Expand Up @@ -229,6 +223,7 @@ def execute(self, inputs: list[Artifact], ctx: TransformContext) -> list[Artifac
content=accumulated,
input_ids=[a.artifact_id for a in inputs],
prompt_id=prompt_id,
agent_id=agent_id_val,
model_config=model_config,
agent_fingerprint=agent_fingerprint,
metadata=output_metadata,
Expand Down
71 changes: 39 additions & 32 deletions src/synix/ext/group_synthesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,42 @@ def estimate_output_count(self, input_count: int) -> int:

def execute(self, inputs: list[Artifact], ctx: TransformContext) -> list[Artifact]:
ctx = self.get_context(ctx)
prompt_id = self._make_prompt_id()

# Agent path: render task prompt, agent owns grouping and execution
if self.agent is not None:
logger.info(
"GroupSynthesis %r: agent %r grouping %d artifacts",
self.name, self.agent.agent_id, len(inputs),
)
rendered = render_template(self.prompt, artifact_type=self.artifact_type) if self.prompt else ""
groups = self.agent.group(inputs, rendered)
results: list[Artifact] = []
for g in groups:
label = f"{self.label_prefix}-{g.key}" if self.label_prefix else g.key
meta = {"group_key": g.key, "input_count": len(g.artifacts)}
results.append(Artifact(
label=label,
artifact_type=self.artifact_type,
content=g.content,
input_ids=[a.artifact_id for a in g.artifacts],
prompt_id=prompt_id,
agent_id=self.agent.agent_id,
agent_fingerprint=self.agent.fingerprint_value(),
model_config=None,
metadata=meta,
))
return results

# Non-agent path: use split/group logic with LLM
group_key = ctx.get("_group_key")
if group_key is None:
# Called directly without split — process all groups sequentially
results: list[Artifact] = []
results = []
for unit_inputs, config_extras in self.split(inputs, ctx):
results.extend(self.execute(unit_inputs, ctx.with_updates(config_extras)))
return results

prompt_id = self._make_prompt_id()

# Sort inputs by artifact_id for deterministic prompt -> stable cassette key
sorted_inputs = sorted(inputs, key=lambda a: a.artifact_id)
artifacts_text = "\n\n---\n\n".join(f"### {a.label}\n{a.content}" for a in sorted_inputs)
Expand All @@ -194,33 +220,14 @@ def execute(self, inputs: list[Artifact], ctx: TransformContext) -> list[Artifac
artifact_type=self.artifact_type,
)

if self.agent is not None:
from synix.agents import AgentRequest

result = self.agent.write(AgentRequest(
prompt=rendered,
metadata={
"transform_name": self.name,
"shape": "group",
"group_key": group_key,
"input_labels": [a.label for a in inputs],
"count": len(inputs),
},
))
content = result.content
model_config = None
agent_fingerprint = self.agent.fingerprint_value()
else:
client = _get_llm_client(ctx)
response = _logged_complete(
client,
ctx,
messages=[{"role": "user", "content": rendered}],
artifact_desc=f"{self.name} group-{group_key}",
)
content = response.content
model_config = ctx.llm_config
agent_fingerprint = None
client = _get_llm_client(ctx)
response = _logged_complete(
client,
ctx,
messages=[{"role": "user", "content": rendered}],
artifact_desc=f"{self.name} group-{group_key}",
)
content = response.content

prefix = self.label_prefix or (self.group_by if isinstance(self.group_by, str) else self.name)
slug = group_key.lower().replace(" ", "-")
Expand All @@ -237,8 +244,8 @@ def execute(self, inputs: list[Artifact], ctx: TransformContext) -> list[Artifac
content=content,
input_ids=[a.artifact_id for a in inputs],
prompt_id=prompt_id,
model_config=model_config,
agent_fingerprint=agent_fingerprint,
model_config=ctx.llm_config,
agent_fingerprint=None,
metadata=output_metadata,
)
]
Expand Down
Loading
Loading