Python SDK Guide
Active Development
The SDK is under active development. Pin to a specific release for production use.
The Python SDK lets you build Aether agents using async/await and Pydantic models. It communicates with the Hub over gRPC.
Installation
From PyPI
pip install aether-agent-sdkFrom Source
git clone https://github.com/amansrivastava/hacky-automation.git
cd hacky-automation/sdks/python
# Install in development mode (includes dev dependencies)
make dev-install
# Generate gRPC stubs from proto files
make proto-genRequires: Python 3.8+
Configuration
Environment Variables
AETHER_HUB_URL=localhost:50051
AETHER_AGENT_ID=my-agent-001
AETHER_AGENT_NAME="My Agent"
AETHER_AGENT_SECRET=sk-secret
AETHER_VERSION=1.0.0
AETHER_HEARTBEAT_INTERVAL=30
AETHER_RECONNECT_DELAY=5
AETHER_MAX_RECONNECT_ATTEMPTS=-1AgentConfig Reference
| Field | Type | Default | Description |
|---|---|---|---|
agent_id | str | Required | Unique identifier (e.g. pm_sarah) |
agent_name | str | Required | Human-readable name |
version | str | "0.1.0" | Agent version |
hub_url | str | "localhost:50051" | Hub gRPC endpoint |
capabilities | dict | {} | Key-value capability flags |
metadata | dict | {} | Arbitrary metadata |
heartbeat_interval | int | 30 | Seconds between heartbeats |
reconnect_delay | int | 5 | Seconds before reconnect attempt |
max_reconnect_attempts | int | -1 | Max attempts (-1 = infinite) |
Quickstart
Create Your Agent
Subclass AetherAgent and implement handle_task:
import asyncio
from aether_sdk import AetherAgent, AgentConfig, TaskAssignment, TaskResult
class MyAgent(AetherAgent):
async def handle_task(self, task: TaskAssignment) -> TaskResult:
print(f"Processing task {task.task_id}: {task.description}")
try:
result_data = f"Processed: {task.description}".encode()
return TaskResult(
task_id=task.task_id,
success=True,
result=result_data,
metrics={"processing_time": "0.5s"},
)
except Exception as e:
return TaskResult(
task_id=task.task_id,
success=False,
error_message=str(e),
)
async def main():
config = AgentConfig(
agent_id="my-agent-001",
agent_name="My Example Agent",
version="1.0.0",
hub_url="localhost:50051",
capabilities={"task_types": "analysis,processing"},
)
agent = MyAgent(config)
await agent.run()
if __name__ == "__main__":
asyncio.run(main())Register, Start, Stop
# Start: connect + register (non-blocking)
await agent.start()
# Stop: unregister + disconnect
await agent.stop()
# Run: start + block until interrupted (handles SIGINT/SIGTERM)
await agent.run()Hub Services
Access Hub-proxied services through self.hub inside handle_task.
LLM
response = await self.hub.llm.chat(
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": task.description},
],
model="gpt-4o-mini", # Hub may override based on trust level
temperature=0.3,
max_tokens=1000,
)
print(response.content)
print(f"Model: {response.model}, Tokens: {response.usage.total_tokens}")Tools
# List available tools
tools = await self.hub.tools.list()
# Execute a tool
result = await self.hub.tools.execute(
tool_id="add_comment",
input={"issue_id": "123", "comment": "Review complete"},
)
output = result.result_json # JSON stringMemory
# Store a value
await self.hub.memory.store(
key="my-config",
value={"theme": "dark"},
scope="agent",
ttl=3600, # seconds; 0 = no expiry
)
# Retrieve a value
response = await self.hub.memory.retrieve(key="my-config", scope="agent")
if response.success:
config = response.value_json # JSON stringMessaging
# Publish to a NATS subject
await self.hub.messaging.publish(
subject="agent.status",
payload={"agent_id": self.config.agent_id, "status": "processing"},
)
# Request-reply
response = await self.hub.messaging.request(
subject="workflow.status",
payload={"workflow_id": "123"},
timeout=10,
)Example: Code Reviewer Agent
import asyncio
import os
from aether_sdk import AetherAgent, AgentConfig, TaskAssignment, TaskResult
class CodeReviewerAgent(AetherAgent):
"""Reviews code diffs using an LLM."""
async def handle_task(self, task: TaskAssignment) -> TaskResult:
diff = task.payload.decode() if task.payload else task.description
response = await self.hub.llm.chat(
messages=[
{
"role": "system",
"content": (
"You are an expert code reviewer. "
"Provide concise, constructive feedback."
),
},
{"role": "user", "content": f"Review this diff:\n\n{diff}"},
],
temperature=0.2,
)
if not response.success:
return TaskResult(
task_id=task.task_id,
success=False,
error_message=f"LLM call failed: {response.error_message}",
)
# Notify other agents
await self.hub.messaging.publish(
subject="review.completed",
payload={"task_id": task.task_id, "agent": self.config.agent_id},
)
return TaskResult(
task_id=task.task_id,
success=True,
result=response.content.encode(),
metrics={"model": response.model, "tokens": str(response.usage.total_tokens)},
)
async def main():
config = AgentConfig(
agent_id="dev_py_reviewer",
agent_name="Python Code Reviewer",
version="1.0.0",
hub_url=os.getenv("HUB_URL", "localhost:50051"),
capabilities={"code_review": "true", "python": "true"},
metadata={"secret": os.getenv("AGENT_SECRET", "")},
)
agent = CodeReviewerAgent(config)
await agent.run()
if __name__ == "__main__":
asyncio.run(main())Example: Custom Data Processor
import asyncio
import json
import os
from aether_sdk import AetherAgent, AgentConfig, TaskAssignment, TaskResult
class DataProcessorAgent(AetherAgent):
"""Processes and classifies structured data."""
async def handle_task(self, task: TaskAssignment) -> TaskResult:
try:
data = json.loads(task.payload.decode())
except (json.JSONDecodeError, AttributeError):
data = {"raw": task.description}
response = await self.hub.llm.chat(
messages=[
{"role": "system", "content": "Classify and summarize this data as JSON."},
{"role": "user", "content": json.dumps(data)},
],
temperature=0.0,
)
# Persist the analysis
await self.hub.memory.store(
key=f"analysis-{task.task_id}",
value={"result": response.content},
scope="task",
ttl=3600,
)
return TaskResult(
task_id=task.task_id,
success=True,
result=response.content.encode(),
)
async def main():
config = AgentConfig(
agent_id="data_processor",
agent_name="Data Processor",
hub_url=os.getenv("HUB_URL", "localhost:50051"),
)
agent = DataProcessorAgent(config)
await agent.run()
if __name__ == "__main__":
asyncio.run(main())Error Handling
handle_task should never raise — catch exceptions and return a failure TaskResult:
async def handle_task(self, task: TaskAssignment) -> TaskResult:
try:
# ... your logic
return TaskResult(task_id=task.task_id, success=True, result=b"done")
except Exception as e:
return TaskResult(
task_id=task.task_id,
success=False,
error_message=str(e),
)The SDK handles reconnection automatically on connection loss. Configure limits with max_reconnect_attempts (default -1 = infinite).
Deployment
Docker
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY agent.py .
CMD ["python", "agent.py"]docker build -t my-aether-agent .
docker run -e HUB_URL=hub:50051 -e AGENT_SECRET=sk-xxx my-aether-agentDocker Compose (with Aether)
services:
my-agent:
build: ./my-agent
environment:
HUB_URL: goway:9090
AGENT_SECRET: ${AGENT_SECRET}
depends_on:
- goway
restart: unless-stoppedKubernetes
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-aether-agent
spec:
replicas: 1
selector:
matchLabels:
app: my-aether-agent
template:
metadata:
labels:
app: my-aether-agent
spec:
containers:
- name: agent
image: my-aether-agent:latest
env:
- name: HUB_URL
value: "aether-hub:9090"
- name: AGENT_SECRET
valueFrom:
secretKeyRef:
name: aether-secrets
key: agent-secretAPI Reference
AetherAgent
Base class for all agents. Subclass and override handle_task.
| Method | Description |
|---|---|
async start() | Connect and register with the Hub |
async stop() | Unregister and disconnect |
async run() | Start and block until interrupted |
async handle_task(task) | Override this — your task logic |
shutdown() | Synchronous graceful shutdown wrapper |
Properties:
self.hub— access to Hub services (available afterstart())self.config— theAgentConfiginstance
AgentClient
Low-level gRPC client (used internally, available for advanced use).
| Method | Description |
|---|---|
async connect() | Establish gRPC connection |
async disconnect() | Close connection |
async register() | Register with Hub → returns AgentRegistration |
async unregister() | Unregister from Hub |
async send_heartbeat() | Send heartbeat ping |
async stream_tasks() | Stream tasks from Hub |
Models
| Model | Description |
|---|---|
AgentConfig | Configuration for connecting to the Hub |
TaskAssignment | Task received from Hub |
TaskResult | Result sent back to Hub |
TaskStatus | Intermediate status update |
AgentRegistration | Registration response |
Development
cd sdks/python
make dev-install # Install with dev dependencies
make proto-gen # Generate gRPC stubs
make test # Run tests
make test-cov # Tests with coverage report
make lint # Run linting (ruff)
make fmt # Format code (black)
make build # Build distributable packageTroubleshooting
grpc._channel._InactiveRpcError: Connection refused
The Hub gRPC port is not reachable. Check:
- Hub is running (
docker-compose ps goway) HUB_URLpoints to the correct host and port (default9090for gRPC)- Firewall/network rules allow the connection
Registration failed: UNAUTHENTICATED
The AGENT_SECRET does not match the secret stored in the Hub. Verify the agent record in the database and the metadata.secret in your AgentConfig.
Proto stubs are missing
Run make proto-gen from sdks/python/. Ensure protoc and grpcio-tools are installed.
Reconnection loop not stopping
Set max_reconnect_attempts to a positive integer to limit reconnection attempts:
config = AgentConfig(
...
max_reconnect_attempts=10,
)