Skip to content

Traylinx Subscription Service

Status: โœ… Production Ready (Sprint 2 Complete)
Version: 1.0.0
Port: 8001


Overview

The Traylinx Subscription Service manages event subscriptions for the agent ecosystem. It enables agents to subscribe to specific event types and allows the Router to query which agents should receive events when they're published.

Think of it as: YouTube's subscription system, but for agents. Agents "subscribe" to events they care about, and when those events occur, they get notified.


๐ŸŽฏ Key Features

Core Functionality

  • โœ… Event Subscriptions - Agents subscribe to event types
  • โœ… Scoped Subscriptions - Subscribe to specific job IDs
  • โœ… Advanced JSONB Filters - Filter events by payload content
  • โœ… Ownership Management - Agents manage their own subscriptions
  • โœ… Subscriber Discovery - Router queries for event subscribers

Technical Features

  • โœ… PostgreSQL Database with full-text search
  • โœ… JSONB Filter Matching for complex event filtering
  • โœ… A2A Authentication on all endpoints
  • โœ… Health & Readiness Probes for orchestration
  • โœ… Structured Logging with structlog
  • โœ… Alembic Migrations for schema management
  • โœ… Docker Compose for local development

Architecture

Subscription Types

  1. Wildcard Subscription: All events of a type

    {"event_type": "job.completed", "job_id": null}
    

  2. Scoped Subscription: Specific job only

    {"event_type": "job.completed", "job_id": "job-123"}
    

  3. Filtered Subscription: Advanced JSONB matching

    {
      "event_type": "job.completed",
      "filters": {"status": "success", "priority": "high"}
    }
    

Database Schema

CREATE TABLE subscriptions (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  subscriber_agent_key TEXT NOT NULL,
  event_type TEXT NOT NULL,
  job_id TEXT NULL,
  filters JSONB NULL,
  created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  UNIQUE (subscriber_agent_key, event_type, job_id)
);

-- Indexes for performance
CREATE INDEX idx_subscriptions_agent ON subscriptions (subscriber_agent_key);
CREATE INDEX idx_subscriptions_event_type ON subscriptions (event_type);
CREATE INDEX idx_subscriptions_event_job ON subscriptions (event_type, job_id);
CREATE INDEX idx_subscriptions_filters ON subscriptions USING GIN (filters);

Quick Start

Prerequisites

  • Docker & Docker Compose
  • Python 3.9+
  • Poetry

1. Start PostgreSQL

docker-compose up -d

PostgreSQL runs on port 5433 (to avoid conflicts with default port 5432).

2. Install Dependencies

poetry lock && poetry install

3. Run Migrations

poetry run alembic upgrade head

4. Start Service

poetry run uvicorn app.main:app --port 8001 --reload

5. Verify Health

curl http://localhost:8001/health
curl http://localhost:8001/ready

6. View API Docs

Open http://localhost:8001/docs


API Reference

Base URL

http://localhost:8001

Content Type

Content-Type: application/vnd.traylinx.a2a+json; version=1

Authentication Headers

Authorization: Bearer {access_token}
X-Agent-Key: {agent_key}
X-Agent-Secret-Token: {agent_secret}

1. Create Subscription

Subscribe to events.

Endpoint: POST /a2a/subscriptions/create

Request:

{
  "envelope": {
    "message_id": "msg-001",
    "sender_agent_key": "my-agent",
    "timestamp": "2025-11-05T10:00:00Z"
  },
  "action": "subscriptions.create",
  "payload": {
    "event_type": "job.completed",
    "job_id": "job-123",  // Optional
    "filters": {          // Optional
      "status": "success"
    }
  }
}

Response (201 Created):

{
  "envelope": {
    "message_id": "response-001",
    "correlation_id": "msg-001",
    "in_reply_to": "msg-001",
    "sender_agent_key": "subscription-service",
    "timestamp": "2025-11-05T10:00:01Z"
  },
  "action": "subscriptions.create",
  "payload": {
    "subscription": {
      "subscription_id": "uuid-123",
      "subscriber_agent_key": "my-agent",
      "event_type": "job.completed",
      "job_id": "job-123",
      "filters": {"status": "success"},
      "created_at": "2025-11-05T10:00:01Z",
      "updated_at": "2025-11-05T10:00:01Z"
    }
  }
}

Behavior: - Uses upsert logic - updates existing subscription if duplicate - Validates ownership - sender must match authenticated agent - JSONB filters validated on creation


2. List Subscriptions

List your subscriptions with optional filters.

Endpoint: POST /a2a/subscriptions/list

Request:

{
  "envelope": {...},
  "action": "subscriptions.list",
  "payload": {
    "event_type": "job.completed",  // Optional filter
    "job_id": "job-123",            // Optional filter
    "limit": 100,
    "offset": 0
  }
}

Response (200 OK):

{
  "envelope": {...},
  "action": "subscriptions.list",
  "payload": {
    "subscriptions": [
      {
        "subscription_id": "uuid-123",
        "subscriber_agent_key": "my-agent",
        "event_type": "job.completed",
        "job_id": "job-123",
        "filters": {"status": "success"},
        "created_at": "2025-11-05T10:00:01Z",
        "updated_at": "2025-11-05T10:00:01Z"
      }
    ],
    "total": 1,
    "limit": 100,
    "offset": 0
  }
}


3. Delete Subscription

Delete a subscription by ID.

Endpoint: DELETE /a2a/subscriptions/delete

Request:

{
  "envelope": {...},
  "action": "subscriptions.delete",
  "payload": {
    "subscription_id": "uuid-123"
  }
}

Response (200 OK):

{
  "envelope": {...},
  "action": "subscriptions.delete",
  "payload": {
    "success": true,
    "subscription_id": "uuid-123"
  }
}

Ownership Validation: You can only delete your own subscriptions.


4. Query Subscribers (Router Use)

Find subscribers for an event (used by Router during event publishing).

Endpoint: POST /a2a/subscriptions/query

Request:

{
  "envelope": {...},
  "action": "subscriptions.query",
  "payload": {
    "event_type": "job.completed",
    "job_id": "job-123",
    "event_data": {
      "status": "success",
      "priority": "high"
    }
  }
}

Response (200 OK):

{
  "envelope": {...},
  "action": "subscriptions.query",
  "payload": {
    "subscribers": [
      {
        "agent_key": "agent-a",
        "subscription_id": "uuid-123"
      },
      {
        "agent_key": "agent-b",
        "subscription_id": "uuid-456"
      }
    ],
    "event_type": "job.completed",
    "job_id": "job-123",
    "total": 2
  }
}

Matching Logic: - Event Type: Exact match required - Job ID: Matches specific job OR wildcard (NULL) - Filters: JSONB containment (filters <@ event_data)


๐Ÿ” Filter Matching Examples

Example 1: Simple Filter

Subscription:

{"event_type": "job.completed", "filters": {"status": "success"}}

Event Data:

{"status": "success", "duration": 120}

Result: โœ… Matches (event contains all filter keys)


Example 2: Multiple Filters

Subscription:

{
  "event_type": "job.completed",
  "filters": {"status": "success", "priority": "high"}
}

Event Data:

{"status": "success", "priority": "high", "owner": "alice"}

Result: โœ… Matches (all filter keys present with matching values)


Example 3: No Match

Subscription:

{"event_type": "job.completed", "filters": {"status": "success"}}

Event Data:

{"status": "failed", "error": "timeout"}

Result: โŒ No match (status value differs)


Configuration

Environment Variables

Create a .env file:

# Database
DATABASE_URL=postgresql+asyncpg://traylinx:dev_password@localhost:5433/subscriptions

# Service
SUBSCRIPTION_SERVICE_PORT=8001
SERVICE_NAME=traylinx-subscription-service

# Logging
LOG_LEVEL=INFO

# Rate Limiting (future use)
RATE_LIMIT_CREATE_PER_MINUTE=60
RATE_LIMIT_QUERY_PER_MINUTE=300

๐Ÿ—„ Database Management

Running Migrations

# Create a new migration
poetry run alembic revision --autogenerate -m "description"

# Apply migrations
poetry run alembic upgrade head

# Rollback last migration
poetry run alembic downgrade -1

Direct Database Access

# Connect to PostgreSQL
docker exec -it traylinx_subscriptions_db psql -U traylinx -d subscriptions

# Useful queries
SELECT * FROM subscriptions;
SELECT * FROM subscriptions WHERE subscriber_agent_key = 'my-agent';
SELECT * FROM subscriptions WHERE event_type = 'job.completed';
SELECT * FROM subscriptions WHERE filters @> '{"status": "success"}'::jsonb;

๐Ÿฅ Health & Monitoring

Health Endpoints

# Liveness probe
GET /health

# Readiness probe (checks PostgreSQL)
GET /ready

# Service info
GET /

Monitoring

Currently uses structured logging with structlog. All operations log: - Agent identity - Event types - Subscription IDs - Success/failure status - Performance metrics

Future: Prometheus metrics integration planned.


๐Ÿงช Testing

Manual Testing

See QUICK_START.md for complete cURL examples.

Database Testing

-- Test JSONB filter matching
INSERT INTO subscriptions (subscriber_agent_key, event_type, filters)
VALUES ('test-agent', 'test.event', '{"key": "value"}'::jsonb);

-- Test query logic (simulates Router query)
SELECT * FROM subscriptions
WHERE event_type = 'test.event'
  AND (job_id IS NULL OR job_id = 'test-job')
  AND (filters IS NULL OR filters @> '{"key": "value"}'::jsonb);

๐Ÿ› Troubleshooting

Service Won't Start

# Check PostgreSQL
docker ps | grep traylinx_subscriptions_db
docker logs traylinx_subscriptions_db

# Check migrations
poetry run alembic current
poetry run alembic upgrade head

# Run with debug logging
LOG_LEVEL=DEBUG poetry run uvicorn app.main:app --port 8001

Connection Issues

# Test PostgreSQL connection
docker exec traylinx_subscriptions_db psql -U traylinx -d subscriptions -c "SELECT 1;"

# Verify DATABASE_URL in .env
echo $DATABASE_URL

A2A Authentication Failures

Ensure you have: 1. Valid access_token from Authentication Service 2. Valid agent_secret_token for your agent 3. Matching X-Agent-Key header


๐Ÿ“ Project Structure

traylinx_subscription_service/
โ”œโ”€โ”€ app/
โ”‚   โ”œโ”€โ”€ __init__.py
โ”‚   โ”œโ”€โ”€ main.py                    # FastAPI application
โ”‚   โ”œโ”€โ”€ config.py                  # Settings (Pydantic v2)
โ”‚   โ”œโ”€โ”€ models.py                  # Legacy (kept for compatibility)
โ”‚   โ”œโ”€โ”€ dependencies.py            # Dependency injection
โ”‚   โ”œโ”€โ”€ a2a/
โ”‚   โ”‚   โ”œโ”€โ”€ __init__.py
โ”‚   โ”‚   โ””โ”€โ”€ schemas.py             # A2A protocol models
โ”‚   โ”œโ”€โ”€ db/
โ”‚   โ”‚   โ”œโ”€โ”€ __init__.py
โ”‚   โ”‚   โ”œโ”€โ”€ connection.py          # AsyncPG connection
โ”‚   โ”‚   โ””โ”€โ”€ models.py              # SQLAlchemy ORM
โ”‚   โ”œโ”€โ”€ repositories/
โ”‚   โ”‚   โ”œโ”€โ”€ __init__.py
โ”‚   โ”‚   โ””โ”€โ”€ subscription_repository.py  # CRUD + JSONB queries
โ”‚   โ”œโ”€โ”€ routers/
โ”‚   โ”‚   โ”œโ”€โ”€ __init__.py
โ”‚   โ”‚   โ”œโ”€โ”€ health.py              # Health checks
โ”‚   โ”‚   โ””โ”€โ”€ a2a_subscriptions.py   # 4 A2A endpoints
โ”‚   โ””โ”€โ”€ services/
โ”‚       โ”œโ”€โ”€ __init__.py
โ”‚       โ””โ”€โ”€ subscription_service.py  # Business logic
โ”œโ”€โ”€ migrations/
โ”‚   โ”œโ”€โ”€ env.py                     # Alembic config
โ”‚   โ”œโ”€โ”€ script.py.mako             # Migration template
โ”‚   โ””โ”€โ”€ versions/
โ”‚       โ””โ”€โ”€ 001_initial_schema.py  # Initial schema
โ”œโ”€โ”€ tests/                         # (To be implemented)
โ”œโ”€โ”€ docker-compose.yml             # PostgreSQL setup
โ”œโ”€โ”€ Dockerfile                     # Container image
โ”œโ”€โ”€ alembic.ini                    # Alembic configuration
โ”œโ”€โ”€ pyproject.toml                 # Dependencies
โ”œโ”€โ”€ poetry.lock                    # Locked dependencies
โ”œโ”€โ”€ README.md                      # This file
โ””โ”€โ”€ QUICK_START.md                # Quick start guide

๐Ÿ”— Integration with Router

The Router calls the query endpoint to find subscribers:

  1. Agent publishes event โ†’ Router (POST /a2a/event)
  2. Router queries โ†’ Subscription Service (POST /a2a/subscriptions/query)
  3. Subscription Service returns โ†’ List of subscribers
  4. Router fans out โ†’ Delivers event to all subscribers in parallel

See Router documentation for event publishing details.


๐Ÿšฆ Production Checklist

Before deploying to production:

  • [ ] Configure production DATABASE_URL
  • [ ] Set up database backups
  • [ ] Configure log aggregation (e.g., ELK, Datadog)
  • [ ] Add Prometheus metrics
  • [ ] Set up monitoring alerts
  • [ ] Configure rate limiting
  • [ ] Review JSONB index performance
  • [ ] Load test with expected subscriber counts
  • [ ] Set up CI/CD pipeline
  • [ ] Configure container orchestration (K8s)

  • Quick Start Guide: QUICK_START.md
  • Sprint 2 Summary: ../SPRINT_2_COMPLETE.md
  • Ecosystem Architecture: ../SUBSCRIPTION_SERVICE_EXPLAINED.md
  • Development Status: ../TRAYLINX_DEVELOPMENT_STATUS.md
  • API Documentation: ../TRAYLINX_API_DOCUMENTATION.md
  • Router Integration: ../traylinx_router_agen../index.md

๐Ÿค Support

For issues or questions: 1. Check the troubleshooting section above 2. Review logs: docker logs traylinx_subscriptions_db 3. Test database connectivity 4. Verify A2A authentication setup


๐Ÿ“„ License

[Your License Here]


Built with: FastAPI, PostgreSQL, SQLAlchemy, Alembic, Pydantic v2, structlog