Agent Runtime Protocol (ARP)
Active Development
The gRPC protocol and this document are evolving. Pin your proto file to a specific commit when building against this API.
The Agent Runtime Protocol enables external agents — independent processes written in any language — to connect to Aether's Hub and process tasks alongside (or instead of) built-in agents.
Overview
Built-in agents (pm_sarah, ta_leo, etc.) run inside the Aether process. The ARP extends this model so agents can run anywhere — as Docker containers, standalone binaries, serverless functions, or cloud workloads — communicating via gRPC.
┌──────────────────────────────────────────────────────┐
│ Aether Hub │
│ ┌───────────┐ ┌─────────┐ ┌────────────────────┐ │
│ │ gRPC │ │ LLM │ │ Tool Registry │ │
│ │ Gateway │ │ Proxy │ │ (built-in + MCP) │ │
│ └─────┬─────┘ └─────────┘ └────────────────────┘ │
│ │ ┌─────────┐ ┌────────────────────┐ │
│ │ │ Memory │ │ NATS Messaging │ │
│ │ │ Service │ │ │ │
│ │ └─────────┘ └────────────────────┘ │
└────────┼─────────────────────────────────────────────┘
│ gRPC (:9090)
┌────┼────────────────────────┐
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌──────┐┌──────┐┌──────┐ ┌──────────┐
│Python││ Go ││ TS │ │ Any │
│Agent ││Agent ││Agent │ │ Language│
└──────┘└──────┘└──────┘ └──────────┘Agent Lifecycle
Agent Hub
│ │
│── RegisterAgent ────────────>│ (with agent_id + secret)
│<──── session_id ─────────────│
│ │
│── AcquireTask (stream) ─────>│ (blocks waiting for tasks)
│<──── TaskAssignment ─────────│ (pushed when task fires)
│ │
│── LLMChat ──────────────────>│ (call LLM via Hub)
│<──── response ───────────────│
│ │
│── ToolsExecute ─────────────>│ (run a tool action)
│<──── result ─────────────────│
│ │
│── MemoryStore ──────────────>│ (save memory for later)
│ │
│── CompleteTask ─────────────>│ (report result)
│<──── ack ────────────────────│
│ │
│── Heartbeat ────────────────>│ (every ~25 seconds)
│<──── alive ──────────────────│Prerequisites
- Enable the gRPC server:
AGENT_GATEWAY_ENABLED=true - Create the agent in the database (via API or seeding)
- Set
auth_enabled = trueandauth_secret = <your-secret>on the agent record
SDK Availability
SDK Status
SDK definitions, interfaces, and example implementations are available in the repository at goway/pkg/sdk/ and sdks/. They are not yet published to public package registries (PyPI, pkg.go.dev, npm).
To use the SDKs today: clone the repository and reference the code directly, or generate your own client from the proto file at goway/proto/agent/v1/agent.proto.
To generate a client from the proto file:
# Go
protoc --go_out=. --go-grpc_out=. goway/proto/agent/v1/agent.proto
# Python
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. goway/proto/agent/v1/agent.proto
# TypeScript / Node
npx protoc --ts_out=. goway/proto/agent/v1/agent.protoComplete Example: Go Agent
package main
import (
"context"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "your-module/proto/agent/v1"
)
const (
hubAddr = "localhost:9090"
agentID = "ta_leo"
secret = "my-secret"
)
func main() {
conn, err := grpc.NewClient(hubAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("connect: %v", err)
}
defer conn.Close()
client := pb.NewAgentGatewayServiceClient(conn)
ctx := context.Background()
// 1. Register
regResp, err := client.RegisterAgent(ctx, &pb.RegisterAgentRequest{
AgentId: agentID,
AgentSecret: secret,
AgentName: "Leo (Technical Analyst)",
Version: "1.0.0",
Capabilities: map[string]string{"code_review": "true", "security": "true"},
})
if err != nil || !regResp.Success {
log.Fatalf("registration failed: %v", err)
}
sessionID := regResp.SessionId
log.Printf("Registered. Session: %s", sessionID)
// 2. Heartbeat loop
go func() {
ticker := time.NewTicker(25 * time.Second)
defer ticker.Stop()
for range ticker.C {
hb, err := client.Heartbeat(ctx, &pb.HeartbeatRequest{
SessionId: sessionID,
AgentId: agentID,
Status: pb.AgentStatus_IDLE,
})
if err != nil || !hb.Alive {
log.Printf("heartbeat failed, session may have expired")
}
}
}()
// 3. Acquire and process tasks
stream, err := client.AcquireTask(ctx, &pb.AcquireTaskRequest{
SessionId: sessionID,
AgentId: agentID,
})
if err != nil {
log.Fatalf("acquire task stream: %v", err)
}
for {
task, err := stream.Recv()
if err != nil {
log.Printf("stream closed: %v", err)
break
}
log.Printf("Received task: %s", task.TaskId)
// 4. Search memory for relevant context
memResults, _ := client.MemoryList(ctx, &pb.MemoryListRequest{
SessionId: sessionID,
AgentId: agentID,
})
_ = memResults // inject into LLM prompt
// 5. Call LLM via Hub
llmResp, err := client.LLMChat(ctx, &pb.LLMChatRequest{
SessionId: sessionID,
Messages: []*pb.Message{
{Role: "user", Content: string(task.Payload)},
},
})
if err != nil {
client.FailTask(ctx, &pb.FailTaskRequest{
SessionId: sessionID,
TaskId: task.TaskId,
Error: err.Error(),
Retriable: true,
})
continue
}
// 6. Store result as memory
client.MemoryStore(ctx, &pb.MemoryStoreRequest{
SessionId: sessionID,
Key: "last-ta-analysis-" + task.TaskId,
Content: llmResp.Content,
Scope: pb.MemoryScope_AGENT,
})
// 7. Complete task
client.CompleteTask(ctx, &pb.CompleteTaskRequest{
SessionId: sessionID,
TaskId: task.TaskId,
Result: llmResp.Content,
TokensUsed: llmResp.TokensPrompt + llmResp.TokensCompletion,
})
}
}Complete Example: Python Agent
import grpc
import time
import threading
import logging
from proto.agent.v1 import agent_pb2, agent_pb2_grpc
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
HUB_ADDR = "localhost:9090"
AGENT_ID = "ta_leo"
SECRET = "my-secret"
channel = grpc.insecure_channel(HUB_ADDR)
stub = agent_pb2_grpc.AgentGatewayServiceStub(channel)
# 1. Register
response = stub.RegisterAgent(agent_pb2.RegisterAgentRequest(
agent_id=AGENT_ID,
agent_secret=SECRET,
agent_name="Leo (Technical Analyst)",
version="1.0.0",
capabilities={"code_review": "true", "security": "true"},
))
session_id = response.session_id
log.info(f"Registered. Session: {session_id}")
# 2. Heartbeat thread
def heartbeat_loop():
while True:
time.sleep(25)
try:
hb = stub.Heartbeat(agent_pb2.HeartbeatRequest(
session_id=session_id,
agent_id=AGENT_ID,
status=agent_pb2.AgentStatus.IDLE,
))
if not hb.alive:
log.warning("Session expired")
except Exception as e:
log.error(f"Heartbeat error: {e}")
threading.Thread(target=heartbeat_loop, daemon=True).start()
# 3. Acquire tasks
for task in stub.AcquireTask(agent_pb2.AcquireTaskRequest(
session_id=session_id,
agent_id=AGENT_ID,
)):
log.info(f"Processing task: {task.task_id}")
try:
# 4. Call LLM
llm_resp = stub.LLMChat(agent_pb2.LLMChatRequest(
session_id=session_id,
messages=[agent_pb2.Message(role="user", content=task.payload.decode())],
))
# 5. Complete
stub.CompleteTask(agent_pb2.CompleteTaskRequest(
session_id=session_id,
task_id=task.task_id,
result=llm_resp.content,
))
except Exception as e:
stub.FailTask(agent_pb2.FailTaskRequest(
session_id=session_id,
task_id=task.task_id,
error=str(e),
retriable=True,
))Hub Services Reference
LLM Hub
Access the configured LLM through Aether's rate-limited proxy:
resp, err := client.LLMChat(ctx, &pb.LLMChatRequest{
SessionId: sessionID,
SystemPrompt: "You are a technical analyst...",
Messages: messages,
Tools: toolDefinitions, // optional: enable tool calling
})Tool Hub
Discover and execute registered tools:
// List available tools
tools, _ := client.ToolsList(ctx, &pb.ToolsListRequest{SessionId: sessionID})
// Execute a tool
result, _ := client.ToolsExecute(ctx, &pb.ToolsExecuteRequest{
SessionId: sessionID,
ToolName: "github_add_comment",
Parameters: map[string]string{
"issue_number": "42",
"comment": "Technical analysis complete.",
},
})Memory Hub
Store and retrieve memories across task executions:
// Store
client.MemoryStore(ctx, &pb.MemoryStoreRequest{
SessionId: sessionID,
Key: "decision-jwt-2026",
Content: "Chose JWT over session cookies for stateless scalability",
Scope: pb.MemoryScope_PROJECT,
})
// Retrieve
mem, _ := client.MemoryRetrieve(ctx, &pb.MemoryRetrieveRequest{
SessionId: sessionID,
Key: "decision-jwt-2026",
})Messaging Hub
Publish messages or request-reply with other agents:
// Publish to a topic
client.MessagingPublish(ctx, &pb.MessagingPublishRequest{
SessionId: sessionID,
Subject: "analysis.complete",
Data: []byte(`{"task_id": "123", "summary": "..."}`),
})Authentication & Trust Levels
Agents authenticate with agent_id + agent_secret (stored hashed in the database). After registration, all RPCs use the session_id.
Trust levels control rate limiting:
| Level | Description | Default LLM calls/hour |
|---|---|---|
default | Standard agents | 20 |
trusted | Elevated permissions | 100 |
privileged | System agents | 500 |
Trust level is configured on the agent record in the database.
Database Tables
The ARP uses these tables:
| Table | Purpose |
|---|---|
agent_sessions | Active sessions with heartbeat tracking |
agent_capabilities | Declared capabilities per agent |
agent_audit_log | Immutable record of all agent actions |
task_executions | Execution history with timing and token metrics |
Configuration Reference
AGENT_GATEWAY_ENABLED=true # Enable the gRPC server
AGENT_GATEWAY_PORT=9090 # gRPC listen port
AGENT_HEARTBEAT_INTERVAL=30 # Expected heartbeat every N seconds
AGENT_SESSION_TIMEOUT=120 # Expire sessions after N seconds without heartbeat
RATE_LIMIT_ENABLED=true # Enable per-agent rate limiting
RATE_LIMIT_DEFAULT_LLM_CALLS_PER_HOUR=20
RATE_LIMIT_TRUSTED_LLM_CALLS_PER_HOUR=100
RATE_LIMIT_PRIVILEGED_LLM_CALLS_PER_HOUR=500See Also
- gRPC API Reference — Complete RPC reference
- Agent Memory System — Memory architecture details
- Tool Providers — Available tools
