Traylinx Subscription Service¶
Status: โ
Production Ready (Sprint 2 Complete)
Version: 1.0.0
Port: 8001
Overview¶
The Traylinx Subscription Service manages event subscriptions for the agent ecosystem. It enables agents to subscribe to specific event types and allows the Router to query which agents should receive events when they're published.
Think of it as: YouTube's subscription system, but for agents. Agents "subscribe" to events they care about, and when those events occur, they get notified.
๐ฏ Key Features¶
Core Functionality¶
- โ Event Subscriptions - Agents subscribe to event types
- โ Scoped Subscriptions - Subscribe to specific job IDs
- โ Advanced JSONB Filters - Filter events by payload content
- โ Ownership Management - Agents manage their own subscriptions
- โ Subscriber Discovery - Router queries for event subscribers
Technical Features¶
- โ PostgreSQL Database with full-text search
- โ JSONB Filter Matching for complex event filtering
- โ A2A Authentication on all endpoints
- โ Health & Readiness Probes for orchestration
- โ
Structured Logging with
structlog - โ Alembic Migrations for schema management
- โ Docker Compose for local development
Architecture¶
Subscription Types¶
-
Wildcard Subscription: All events of a type
-
Scoped Subscription: Specific job only
-
Filtered Subscription: Advanced JSONB matching
Database Schema¶
CREATE TABLE subscriptions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
subscriber_agent_key TEXT NOT NULL,
event_type TEXT NOT NULL,
job_id TEXT NULL,
filters JSONB NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (subscriber_agent_key, event_type, job_id)
);
-- Indexes for performance
CREATE INDEX idx_subscriptions_agent ON subscriptions (subscriber_agent_key);
CREATE INDEX idx_subscriptions_event_type ON subscriptions (event_type);
CREATE INDEX idx_subscriptions_event_job ON subscriptions (event_type, job_id);
CREATE INDEX idx_subscriptions_filters ON subscriptions USING GIN (filters);
Quick Start¶
Prerequisites¶
- Docker & Docker Compose
- Python 3.9+
- Poetry
1. Start PostgreSQL¶
PostgreSQL runs on port 5433 (to avoid conflicts with default port 5432).
2. Install Dependencies¶
3. Run Migrations¶
4. Start Service¶
5. Verify Health¶
6. View API Docs¶
Open http://localhost:8001/docs
API Reference¶
Base URL¶
Content Type¶
Authentication Headers¶
1. Create Subscription¶
Subscribe to events.
Endpoint: POST /a2a/subscriptions/create
Request:
{
"envelope": {
"message_id": "msg-001",
"sender_agent_key": "my-agent",
"timestamp": "2025-11-05T10:00:00Z"
},
"action": "subscriptions.create",
"payload": {
"event_type": "job.completed",
"job_id": "job-123", // Optional
"filters": { // Optional
"status": "success"
}
}
}
Response (201 Created):
{
"envelope": {
"message_id": "response-001",
"correlation_id": "msg-001",
"in_reply_to": "msg-001",
"sender_agent_key": "subscription-service",
"timestamp": "2025-11-05T10:00:01Z"
},
"action": "subscriptions.create",
"payload": {
"subscription": {
"subscription_id": "uuid-123",
"subscriber_agent_key": "my-agent",
"event_type": "job.completed",
"job_id": "job-123",
"filters": {"status": "success"},
"created_at": "2025-11-05T10:00:01Z",
"updated_at": "2025-11-05T10:00:01Z"
}
}
}
Behavior: - Uses upsert logic - updates existing subscription if duplicate - Validates ownership - sender must match authenticated agent - JSONB filters validated on creation
2. List Subscriptions¶
List your subscriptions with optional filters.
Endpoint: POST /a2a/subscriptions/list
Request:
{
"envelope": {...},
"action": "subscriptions.list",
"payload": {
"event_type": "job.completed", // Optional filter
"job_id": "job-123", // Optional filter
"limit": 100,
"offset": 0
}
}
Response (200 OK):
{
"envelope": {...},
"action": "subscriptions.list",
"payload": {
"subscriptions": [
{
"subscription_id": "uuid-123",
"subscriber_agent_key": "my-agent",
"event_type": "job.completed",
"job_id": "job-123",
"filters": {"status": "success"},
"created_at": "2025-11-05T10:00:01Z",
"updated_at": "2025-11-05T10:00:01Z"
}
],
"total": 1,
"limit": 100,
"offset": 0
}
}
3. Delete Subscription¶
Delete a subscription by ID.
Endpoint: DELETE /a2a/subscriptions/delete
Request:
{
"envelope": {...},
"action": "subscriptions.delete",
"payload": {
"subscription_id": "uuid-123"
}
}
Response (200 OK):
{
"envelope": {...},
"action": "subscriptions.delete",
"payload": {
"success": true,
"subscription_id": "uuid-123"
}
}
Ownership Validation: You can only delete your own subscriptions.
4. Query Subscribers (Router Use)¶
Find subscribers for an event (used by Router during event publishing).
Endpoint: POST /a2a/subscriptions/query
Request:
{
"envelope": {...},
"action": "subscriptions.query",
"payload": {
"event_type": "job.completed",
"job_id": "job-123",
"event_data": {
"status": "success",
"priority": "high"
}
}
}
Response (200 OK):
{
"envelope": {...},
"action": "subscriptions.query",
"payload": {
"subscribers": [
{
"agent_key": "agent-a",
"subscription_id": "uuid-123"
},
{
"agent_key": "agent-b",
"subscription_id": "uuid-456"
}
],
"event_type": "job.completed",
"job_id": "job-123",
"total": 2
}
}
Matching Logic:
- Event Type: Exact match required
- Job ID: Matches specific job OR wildcard (NULL)
- Filters: JSONB containment (filters <@ event_data)
๐ Filter Matching Examples¶
Example 1: Simple Filter¶
Subscription:
Event Data:
Result: โ Matches (event contains all filter keys)
Example 2: Multiple Filters¶
Subscription:
Event Data:
Result: โ Matches (all filter keys present with matching values)
Example 3: No Match¶
Subscription:
Event Data:
Result: โ No match (status value differs)
Configuration¶
Environment Variables¶
Create a .env file:
# Database
DATABASE_URL=postgresql+asyncpg://traylinx:dev_password@localhost:5433/subscriptions
# Service
SUBSCRIPTION_SERVICE_PORT=8001
SERVICE_NAME=traylinx-subscription-service
# Logging
LOG_LEVEL=INFO
# Rate Limiting (future use)
RATE_LIMIT_CREATE_PER_MINUTE=60
RATE_LIMIT_QUERY_PER_MINUTE=300
๐ Database Management¶
Running Migrations¶
# Create a new migration
poetry run alembic revision --autogenerate -m "description"
# Apply migrations
poetry run alembic upgrade head
# Rollback last migration
poetry run alembic downgrade -1
Direct Database Access¶
# Connect to PostgreSQL
docker exec -it traylinx_subscriptions_db psql -U traylinx -d subscriptions
# Useful queries
SELECT * FROM subscriptions;
SELECT * FROM subscriptions WHERE subscriber_agent_key = 'my-agent';
SELECT * FROM subscriptions WHERE event_type = 'job.completed';
SELECT * FROM subscriptions WHERE filters @> '{"status": "success"}'::jsonb;
๐ฅ Health & Monitoring¶
Health Endpoints¶
Monitoring¶
Currently uses structured logging with structlog. All operations log:
- Agent identity
- Event types
- Subscription IDs
- Success/failure status
- Performance metrics
Future: Prometheus metrics integration planned.
๐งช Testing¶
Manual Testing¶
See QUICK_START.md for complete cURL examples.
Database Testing¶
-- Test JSONB filter matching
INSERT INTO subscriptions (subscriber_agent_key, event_type, filters)
VALUES ('test-agent', 'test.event', '{"key": "value"}'::jsonb);
-- Test query logic (simulates Router query)
SELECT * FROM subscriptions
WHERE event_type = 'test.event'
AND (job_id IS NULL OR job_id = 'test-job')
AND (filters IS NULL OR filters @> '{"key": "value"}'::jsonb);
๐ Troubleshooting¶
Service Won't Start¶
# Check PostgreSQL
docker ps | grep traylinx_subscriptions_db
docker logs traylinx_subscriptions_db
# Check migrations
poetry run alembic current
poetry run alembic upgrade head
# Run with debug logging
LOG_LEVEL=DEBUG poetry run uvicorn app.main:app --port 8001
Connection Issues¶
# Test PostgreSQL connection
docker exec traylinx_subscriptions_db psql -U traylinx -d subscriptions -c "SELECT 1;"
# Verify DATABASE_URL in .env
echo $DATABASE_URL
A2A Authentication Failures¶
Ensure you have:
1. Valid access_token from Authentication Service
2. Valid agent_secret_token for your agent
3. Matching X-Agent-Key header
๐ Project Structure¶
traylinx_subscription_service/
โโโ app/
โ โโโ __init__.py
โ โโโ main.py # FastAPI application
โ โโโ config.py # Settings (Pydantic v2)
โ โโโ models.py # Legacy (kept for compatibility)
โ โโโ dependencies.py # Dependency injection
โ โโโ a2a/
โ โ โโโ __init__.py
โ โ โโโ schemas.py # A2A protocol models
โ โโโ db/
โ โ โโโ __init__.py
โ โ โโโ connection.py # AsyncPG connection
โ โ โโโ models.py # SQLAlchemy ORM
โ โโโ repositories/
โ โ โโโ __init__.py
โ โ โโโ subscription_repository.py # CRUD + JSONB queries
โ โโโ routers/
โ โ โโโ __init__.py
โ โ โโโ health.py # Health checks
โ โ โโโ a2a_subscriptions.py # 4 A2A endpoints
โ โโโ services/
โ โโโ __init__.py
โ โโโ subscription_service.py # Business logic
โโโ migrations/
โ โโโ env.py # Alembic config
โ โโโ script.py.mako # Migration template
โ โโโ versions/
โ โโโ 001_initial_schema.py # Initial schema
โโโ tests/ # (To be implemented)
โโโ docker-compose.yml # PostgreSQL setup
โโโ Dockerfile # Container image
โโโ alembic.ini # Alembic configuration
โโโ pyproject.toml # Dependencies
โโโ poetry.lock # Locked dependencies
โโโ README.md # This file
โโโ QUICK_START.md # Quick start guide
๐ Integration with Router¶
The Router calls the query endpoint to find subscribers:
- Agent publishes event โ Router (
POST /a2a/event) - Router queries โ Subscription Service (
POST /a2a/subscriptions/query) - Subscription Service returns โ List of subscribers
- Router fans out โ Delivers event to all subscribers in parallel
See Router documentation for event publishing details.
๐ฆ Production Checklist¶
Before deploying to production:
- [ ] Configure production DATABASE_URL
- [ ] Set up database backups
- [ ] Configure log aggregation (e.g., ELK, Datadog)
- [ ] Add Prometheus metrics
- [ ] Set up monitoring alerts
- [ ] Configure rate limiting
- [ ] Review JSONB index performance
- [ ] Load test with expected subscriber counts
- [ ] Set up CI/CD pipeline
- [ ] Configure container orchestration (K8s)
Related Documentation¶
- Quick Start Guide:
QUICK_START.md - Sprint 2 Summary:
../SPRINT_2_COMPLETE.md - Ecosystem Architecture:
../SUBSCRIPTION_SERVICE_EXPLAINED.md - Development Status:
../TRAYLINX_DEVELOPMENT_STATUS.md - API Documentation:
../TRAYLINX_API_DOCUMENTATION.md - Router Integration:
../traylinx_router_agen../index.md
๐ค Support¶
For issues or questions:
1. Check the troubleshooting section above
2. Review logs: docker logs traylinx_subscriptions_db
3. Test database connectivity
4. Verify A2A authentication setup
๐ License¶
[Your License Here]
Built with: FastAPI, PostgreSQL, SQLAlchemy, Alembic, Pydantic v2, structlog