Skip to content

Agent Runtime Protocol (ARP)

Active Development

The gRPC protocol and this document are evolving. Pin your proto file to a specific commit when building against this API.

The Agent Runtime Protocol enables external agents — independent processes written in any language — to connect to Aether's Hub and process tasks alongside (or instead of) built-in agents.

Overview

Built-in agents (pm_sarah, ta_leo, etc.) run inside the Aether process. The ARP extends this model so agents can run anywhere — as Docker containers, standalone binaries, serverless functions, or cloud workloads — communicating via gRPC.

┌──────────────────────────────────────────────────────┐
│                     Aether Hub                        │
│  ┌───────────┐  ┌─────────┐  ┌────────────────────┐  │
│  │  gRPC     │  │  LLM    │  │  Tool Registry     │  │
│  │  Gateway  │  │  Proxy  │  │  (built-in + MCP)  │  │
│  └─────┬─────┘  └─────────┘  └────────────────────┘  │
│        │        ┌─────────┐  ┌────────────────────┐  │
│        │        │ Memory  │  │  NATS Messaging    │  │
│        │        │ Service │  │                    │  │
│        │        └─────────┘  └────────────────────┘  │
└────────┼─────────────────────────────────────────────┘
         │ gRPC (:9090)
    ┌────┼────────────────────────┐
    │    │    │    │              │
    ▼    ▼    ▼    ▼              ▼
┌──────┐┌──────┐┌──────┐   ┌──────────┐
│Python││ Go   ││  TS  │   │  Any     │
│Agent ││Agent ││Agent │   │  Language│
└──────┘└──────┘└──────┘   └──────────┘

Agent Lifecycle

Agent                          Hub
  │                              │
  │── RegisterAgent ────────────>│  (with agent_id + secret)
  │<──── session_id ─────────────│
  │                              │
  │── AcquireTask (stream) ─────>│  (blocks waiting for tasks)
  │<──── TaskAssignment ─────────│  (pushed when task fires)
  │                              │
  │── LLMChat ──────────────────>│  (call LLM via Hub)
  │<──── response ───────────────│
  │                              │
  │── ToolsExecute ─────────────>│  (run a tool action)
  │<──── result ─────────────────│
  │                              │
  │── MemoryStore ──────────────>│  (save memory for later)
  │                              │
  │── CompleteTask ─────────────>│  (report result)
  │<──── ack ────────────────────│
  │                              │
  │── Heartbeat ────────────────>│  (every ~25 seconds)
  │<──── alive ──────────────────│

Prerequisites

  1. Enable the gRPC server: AGENT_GATEWAY_ENABLED=true
  2. Create the agent in the database (via API or seeding)
  3. Set auth_enabled = true and auth_secret = <your-secret> on the agent record

SDK Availability

SDK Status

SDK definitions, interfaces, and example implementations are available in the repository at goway/pkg/sdk/ and sdks/. They are not yet published to public package registries (PyPI, pkg.go.dev, npm).

To use the SDKs today: clone the repository and reference the code directly, or generate your own client from the proto file at goway/proto/agent/v1/agent.proto.

To generate a client from the proto file:

bash
# Go
protoc --go_out=. --go-grpc_out=. goway/proto/agent/v1/agent.proto

# Python
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. goway/proto/agent/v1/agent.proto

# TypeScript / Node
npx protoc --ts_out=. goway/proto/agent/v1/agent.proto

Complete Example: Go Agent

go
package main

import (
    "context"
    "log"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    pb "your-module/proto/agent/v1"
)

const (
    hubAddr  = "localhost:9090"
    agentID  = "ta_leo"
    secret   = "my-secret"
)

func main() {
    conn, err := grpc.NewClient(hubAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf("connect: %v", err)
    }
    defer conn.Close()

    client := pb.NewAgentGatewayServiceClient(conn)
    ctx := context.Background()

    // 1. Register
    regResp, err := client.RegisterAgent(ctx, &pb.RegisterAgentRequest{
        AgentId:      agentID,
        AgentSecret:  secret,
        AgentName:    "Leo (Technical Analyst)",
        Version:      "1.0.0",
        Capabilities: map[string]string{"code_review": "true", "security": "true"},
    })
    if err != nil || !regResp.Success {
        log.Fatalf("registration failed: %v", err)
    }
    sessionID := regResp.SessionId
    log.Printf("Registered. Session: %s", sessionID)

    // 2. Heartbeat loop
    go func() {
        ticker := time.NewTicker(25 * time.Second)
        defer ticker.Stop()
        for range ticker.C {
            hb, err := client.Heartbeat(ctx, &pb.HeartbeatRequest{
                SessionId: sessionID,
                AgentId:   agentID,
                Status:    pb.AgentStatus_IDLE,
            })
            if err != nil || !hb.Alive {
                log.Printf("heartbeat failed, session may have expired")
            }
        }
    }()

    // 3. Acquire and process tasks
    stream, err := client.AcquireTask(ctx, &pb.AcquireTaskRequest{
        SessionId: sessionID,
        AgentId:   agentID,
    })
    if err != nil {
        log.Fatalf("acquire task stream: %v", err)
    }

    for {
        task, err := stream.Recv()
        if err != nil {
            log.Printf("stream closed: %v", err)
            break
        }
        log.Printf("Received task: %s", task.TaskId)

        // 4. Search memory for relevant context
        memResults, _ := client.MemoryList(ctx, &pb.MemoryListRequest{
            SessionId: sessionID,
            AgentId:   agentID,
        })
        _ = memResults // inject into LLM prompt

        // 5. Call LLM via Hub
        llmResp, err := client.LLMChat(ctx, &pb.LLMChatRequest{
            SessionId: sessionID,
            Messages: []*pb.Message{
                {Role: "user", Content: string(task.Payload)},
            },
        })
        if err != nil {
            client.FailTask(ctx, &pb.FailTaskRequest{
                SessionId: sessionID,
                TaskId:    task.TaskId,
                Error:     err.Error(),
                Retriable: true,
            })
            continue
        }

        // 6. Store result as memory
        client.MemoryStore(ctx, &pb.MemoryStoreRequest{
            SessionId: sessionID,
            Key:       "last-ta-analysis-" + task.TaskId,
            Content:   llmResp.Content,
            Scope:     pb.MemoryScope_AGENT,
        })

        // 7. Complete task
        client.CompleteTask(ctx, &pb.CompleteTaskRequest{
            SessionId:   sessionID,
            TaskId:      task.TaskId,
            Result:      llmResp.Content,
            TokensUsed:  llmResp.TokensPrompt + llmResp.TokensCompletion,
        })
    }
}

Complete Example: Python Agent

python
import grpc
import time
import threading
import logging

from proto.agent.v1 import agent_pb2, agent_pb2_grpc

logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

HUB_ADDR = "localhost:9090"
AGENT_ID = "ta_leo"
SECRET = "my-secret"

channel = grpc.insecure_channel(HUB_ADDR)
stub = agent_pb2_grpc.AgentGatewayServiceStub(channel)

# 1. Register
response = stub.RegisterAgent(agent_pb2.RegisterAgentRequest(
    agent_id=AGENT_ID,
    agent_secret=SECRET,
    agent_name="Leo (Technical Analyst)",
    version="1.0.0",
    capabilities={"code_review": "true", "security": "true"},
))
session_id = response.session_id
log.info(f"Registered. Session: {session_id}")

# 2. Heartbeat thread
def heartbeat_loop():
    while True:
        time.sleep(25)
        try:
            hb = stub.Heartbeat(agent_pb2.HeartbeatRequest(
                session_id=session_id,
                agent_id=AGENT_ID,
                status=agent_pb2.AgentStatus.IDLE,
            ))
            if not hb.alive:
                log.warning("Session expired")
        except Exception as e:
            log.error(f"Heartbeat error: {e}")

threading.Thread(target=heartbeat_loop, daemon=True).start()

# 3. Acquire tasks
for task in stub.AcquireTask(agent_pb2.AcquireTaskRequest(
    session_id=session_id,
    agent_id=AGENT_ID,
)):
    log.info(f"Processing task: {task.task_id}")
    try:
        # 4. Call LLM
        llm_resp = stub.LLMChat(agent_pb2.LLMChatRequest(
            session_id=session_id,
            messages=[agent_pb2.Message(role="user", content=task.payload.decode())],
        ))

        # 5. Complete
        stub.CompleteTask(agent_pb2.CompleteTaskRequest(
            session_id=session_id,
            task_id=task.task_id,
            result=llm_resp.content,
        ))
    except Exception as e:
        stub.FailTask(agent_pb2.FailTaskRequest(
            session_id=session_id,
            task_id=task.task_id,
            error=str(e),
            retriable=True,
        ))

Hub Services Reference

LLM Hub

Access the configured LLM through Aether's rate-limited proxy:

go
resp, err := client.LLMChat(ctx, &pb.LLMChatRequest{
    SessionId:    sessionID,
    SystemPrompt: "You are a technical analyst...",
    Messages:     messages,
    Tools:        toolDefinitions, // optional: enable tool calling
})

Tool Hub

Discover and execute registered tools:

go
// List available tools
tools, _ := client.ToolsList(ctx, &pb.ToolsListRequest{SessionId: sessionID})

// Execute a tool
result, _ := client.ToolsExecute(ctx, &pb.ToolsExecuteRequest{
    SessionId:  sessionID,
    ToolName:   "github_add_comment",
    Parameters: map[string]string{
        "issue_number": "42",
        "comment":      "Technical analysis complete.",
    },
})

Memory Hub

Store and retrieve memories across task executions:

go
// Store
client.MemoryStore(ctx, &pb.MemoryStoreRequest{
    SessionId:  sessionID,
    Key:        "decision-jwt-2026",
    Content:    "Chose JWT over session cookies for stateless scalability",
    Scope:      pb.MemoryScope_PROJECT,
})

// Retrieve
mem, _ := client.MemoryRetrieve(ctx, &pb.MemoryRetrieveRequest{
    SessionId: sessionID,
    Key:       "decision-jwt-2026",
})

Messaging Hub

Publish messages or request-reply with other agents:

go
// Publish to a topic
client.MessagingPublish(ctx, &pb.MessagingPublishRequest{
    SessionId: sessionID,
    Subject:   "analysis.complete",
    Data:      []byte(`{"task_id": "123", "summary": "..."}`),
})

Authentication & Trust Levels

Agents authenticate with agent_id + agent_secret (stored hashed in the database). After registration, all RPCs use the session_id.

Trust levels control rate limiting:

LevelDescriptionDefault LLM calls/hour
defaultStandard agents20
trustedElevated permissions100
privilegedSystem agents500

Trust level is configured on the agent record in the database.

Database Tables

The ARP uses these tables:

TablePurpose
agent_sessionsActive sessions with heartbeat tracking
agent_capabilitiesDeclared capabilities per agent
agent_audit_logImmutable record of all agent actions
task_executionsExecution history with timing and token metrics

Configuration Reference

bash
AGENT_GATEWAY_ENABLED=true     # Enable the gRPC server
AGENT_GATEWAY_PORT=9090        # gRPC listen port
AGENT_HEARTBEAT_INTERVAL=30    # Expected heartbeat every N seconds
AGENT_SESSION_TIMEOUT=120      # Expire sessions after N seconds without heartbeat
RATE_LIMIT_ENABLED=true        # Enable per-agent rate limiting
RATE_LIMIT_DEFAULT_LLM_CALLS_PER_HOUR=20
RATE_LIMIT_TRUSTED_LLM_CALLS_PER_HOUR=100
RATE_LIMIT_PRIVILEGED_LLM_CALLS_PER_HOUR=500

See Also

Released under the MIT License.