Skip to content

Python SDK Guide

Active Development

The SDK is under active development. Pin to a specific release for production use.

The Python SDK lets you build Aether agents using async/await and Pydantic models. It communicates with the Hub over gRPC.

Installation

From PyPI

bash
pip install aether-agent-sdk

From Source

bash
git clone https://github.com/amansrivastava/hacky-automation.git
cd hacky-automation/sdks/python

# Install in development mode (includes dev dependencies)
make dev-install

# Generate gRPC stubs from proto files
make proto-gen

Requires: Python 3.8+

Configuration

Environment Variables

bash
AETHER_HUB_URL=localhost:50051
AETHER_AGENT_ID=my-agent-001
AETHER_AGENT_NAME="My Agent"
AETHER_AGENT_SECRET=sk-secret
AETHER_VERSION=1.0.0
AETHER_HEARTBEAT_INTERVAL=30
AETHER_RECONNECT_DELAY=5
AETHER_MAX_RECONNECT_ATTEMPTS=-1

AgentConfig Reference

FieldTypeDefaultDescription
agent_idstrRequiredUnique identifier (e.g. pm_sarah)
agent_namestrRequiredHuman-readable name
versionstr"0.1.0"Agent version
hub_urlstr"localhost:50051"Hub gRPC endpoint
capabilitiesdict{}Key-value capability flags
metadatadict{}Arbitrary metadata
heartbeat_intervalint30Seconds between heartbeats
reconnect_delayint5Seconds before reconnect attempt
max_reconnect_attemptsint-1Max attempts (-1 = infinite)

Quickstart

Create Your Agent

Subclass AetherAgent and implement handle_task:

python
import asyncio
from aether_sdk import AetherAgent, AgentConfig, TaskAssignment, TaskResult


class MyAgent(AetherAgent):
    async def handle_task(self, task: TaskAssignment) -> TaskResult:
        print(f"Processing task {task.task_id}: {task.description}")

        try:
            result_data = f"Processed: {task.description}".encode()
            return TaskResult(
                task_id=task.task_id,
                success=True,
                result=result_data,
                metrics={"processing_time": "0.5s"},
            )
        except Exception as e:
            return TaskResult(
                task_id=task.task_id,
                success=False,
                error_message=str(e),
            )


async def main():
    config = AgentConfig(
        agent_id="my-agent-001",
        agent_name="My Example Agent",
        version="1.0.0",
        hub_url="localhost:50051",
        capabilities={"task_types": "analysis,processing"},
    )
    agent = MyAgent(config)
    await agent.run()


if __name__ == "__main__":
    asyncio.run(main())

Register, Start, Stop

python
# Start: connect + register (non-blocking)
await agent.start()

# Stop: unregister + disconnect
await agent.stop()

# Run: start + block until interrupted (handles SIGINT/SIGTERM)
await agent.run()

Hub Services

Access Hub-proxied services through self.hub inside handle_task.

LLM

python
response = await self.hub.llm.chat(
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": task.description},
    ],
    model="gpt-4o-mini",   # Hub may override based on trust level
    temperature=0.3,
    max_tokens=1000,
)

print(response.content)
print(f"Model: {response.model}, Tokens: {response.usage.total_tokens}")

Tools

python
# List available tools
tools = await self.hub.tools.list()

# Execute a tool
result = await self.hub.tools.execute(
    tool_id="add_comment",
    input={"issue_id": "123", "comment": "Review complete"},
)
output = result.result_json  # JSON string

Memory

python
# Store a value
await self.hub.memory.store(
    key="my-config",
    value={"theme": "dark"},
    scope="agent",
    ttl=3600,  # seconds; 0 = no expiry
)

# Retrieve a value
response = await self.hub.memory.retrieve(key="my-config", scope="agent")
if response.success:
    config = response.value_json  # JSON string

Messaging

python
# Publish to a NATS subject
await self.hub.messaging.publish(
    subject="agent.status",
    payload={"agent_id": self.config.agent_id, "status": "processing"},
)

# Request-reply
response = await self.hub.messaging.request(
    subject="workflow.status",
    payload={"workflow_id": "123"},
    timeout=10,
)

Example: Code Reviewer Agent

python
import asyncio
import os
from aether_sdk import AetherAgent, AgentConfig, TaskAssignment, TaskResult


class CodeReviewerAgent(AetherAgent):
    """Reviews code diffs using an LLM."""

    async def handle_task(self, task: TaskAssignment) -> TaskResult:
        diff = task.payload.decode() if task.payload else task.description

        response = await self.hub.llm.chat(
            messages=[
                {
                    "role": "system",
                    "content": (
                        "You are an expert code reviewer. "
                        "Provide concise, constructive feedback."
                    ),
                },
                {"role": "user", "content": f"Review this diff:\n\n{diff}"},
            ],
            temperature=0.2,
        )

        if not response.success:
            return TaskResult(
                task_id=task.task_id,
                success=False,
                error_message=f"LLM call failed: {response.error_message}",
            )

        # Notify other agents
        await self.hub.messaging.publish(
            subject="review.completed",
            payload={"task_id": task.task_id, "agent": self.config.agent_id},
        )

        return TaskResult(
            task_id=task.task_id,
            success=True,
            result=response.content.encode(),
            metrics={"model": response.model, "tokens": str(response.usage.total_tokens)},
        )


async def main():
    config = AgentConfig(
        agent_id="dev_py_reviewer",
        agent_name="Python Code Reviewer",
        version="1.0.0",
        hub_url=os.getenv("HUB_URL", "localhost:50051"),
        capabilities={"code_review": "true", "python": "true"},
        metadata={"secret": os.getenv("AGENT_SECRET", "")},
    )
    agent = CodeReviewerAgent(config)
    await agent.run()


if __name__ == "__main__":
    asyncio.run(main())

Example: Custom Data Processor

python
import asyncio
import json
import os
from aether_sdk import AetherAgent, AgentConfig, TaskAssignment, TaskResult


class DataProcessorAgent(AetherAgent):
    """Processes and classifies structured data."""

    async def handle_task(self, task: TaskAssignment) -> TaskResult:
        try:
            data = json.loads(task.payload.decode())
        except (json.JSONDecodeError, AttributeError):
            data = {"raw": task.description}

        response = await self.hub.llm.chat(
            messages=[
                {"role": "system", "content": "Classify and summarize this data as JSON."},
                {"role": "user", "content": json.dumps(data)},
            ],
            temperature=0.0,
        )

        # Persist the analysis
        await self.hub.memory.store(
            key=f"analysis-{task.task_id}",
            value={"result": response.content},
            scope="task",
            ttl=3600,
        )

        return TaskResult(
            task_id=task.task_id,
            success=True,
            result=response.content.encode(),
        )


async def main():
    config = AgentConfig(
        agent_id="data_processor",
        agent_name="Data Processor",
        hub_url=os.getenv("HUB_URL", "localhost:50051"),
    )
    agent = DataProcessorAgent(config)
    await agent.run()


if __name__ == "__main__":
    asyncio.run(main())

Error Handling

handle_task should never raise — catch exceptions and return a failure TaskResult:

python
async def handle_task(self, task: TaskAssignment) -> TaskResult:
    try:
        # ... your logic
        return TaskResult(task_id=task.task_id, success=True, result=b"done")
    except Exception as e:
        return TaskResult(
            task_id=task.task_id,
            success=False,
            error_message=str(e),
        )

The SDK handles reconnection automatically on connection loss. Configure limits with max_reconnect_attempts (default -1 = infinite).

Deployment

Docker

dockerfile
FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY agent.py .
CMD ["python", "agent.py"]
bash
docker build -t my-aether-agent .
docker run -e HUB_URL=hub:50051 -e AGENT_SECRET=sk-xxx my-aether-agent

Docker Compose (with Aether)

yaml
services:
  my-agent:
    build: ./my-agent
    environment:
      HUB_URL: goway:9090
      AGENT_SECRET: ${AGENT_SECRET}
    depends_on:
      - goway
    restart: unless-stopped

Kubernetes

yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: my-aether-agent
spec:
  replicas: 1
  selector:
    matchLabels:
      app: my-aether-agent
  template:
    metadata:
      labels:
        app: my-aether-agent
    spec:
      containers:
        - name: agent
          image: my-aether-agent:latest
          env:
            - name: HUB_URL
              value: "aether-hub:9090"
            - name: AGENT_SECRET
              valueFrom:
                secretKeyRef:
                  name: aether-secrets
                  key: agent-secret

API Reference

AetherAgent

Base class for all agents. Subclass and override handle_task.

MethodDescription
async start()Connect and register with the Hub
async stop()Unregister and disconnect
async run()Start and block until interrupted
async handle_task(task)Override this — your task logic
shutdown()Synchronous graceful shutdown wrapper

Properties:

  • self.hub — access to Hub services (available after start())
  • self.config — the AgentConfig instance

AgentClient

Low-level gRPC client (used internally, available for advanced use).

MethodDescription
async connect()Establish gRPC connection
async disconnect()Close connection
async register()Register with Hub → returns AgentRegistration
async unregister()Unregister from Hub
async send_heartbeat()Send heartbeat ping
async stream_tasks()Stream tasks from Hub

Models

ModelDescription
AgentConfigConfiguration for connecting to the Hub
TaskAssignmentTask received from Hub
TaskResultResult sent back to Hub
TaskStatusIntermediate status update
AgentRegistrationRegistration response

Development

bash
cd sdks/python

make dev-install   # Install with dev dependencies
make proto-gen     # Generate gRPC stubs
make test          # Run tests
make test-cov      # Tests with coverage report
make lint          # Run linting (ruff)
make fmt           # Format code (black)
make build         # Build distributable package

Troubleshooting

grpc._channel._InactiveRpcError: Connection refused

The Hub gRPC port is not reachable. Check:

  • Hub is running (docker-compose ps goway)
  • HUB_URL points to the correct host and port (default 9090 for gRPC)
  • Firewall/network rules allow the connection

Registration failed: UNAUTHENTICATED

The AGENT_SECRET does not match the secret stored in the Hub. Verify the agent record in the database and the metadata.secret in your AgentConfig.

Proto stubs are missing

Run make proto-gen from sdks/python/. Ensure protoc and grpcio-tools are installed.

Reconnection loop not stopping

Set max_reconnect_attempts to a positive integer to limit reconnection attempts:

python
config = AgentConfig(
    ...
    max_reconnect_attempts=10,
)

Released under the MIT License.