ATOM Documentation

← Back to App

ATOM SaaS - Integration Guide

**Version:** v13.0

**Last Updated:** 2026-04-05

This guide provides integration patterns, code examples, and deployment considerations for building applications on top of the ATOM SaaS platform.

Table of Contents

  1. Integration Patterns
  2. Python Integration
  3. TypeScript/JavaScript Integration
  4. Go Integration
  5. cURL Examples
  6. Complete Workflows
  7. Testing Integrations
  8. Deployment Considerations
  9. Performance Optimization
  10. Security Best Practices

---

Integration Patterns

Pattern 1: Direct API Integration

**Best for:** Simple applications, prototypes, quick integrations

# Direct API calls with authentication
import requests

session = requests.Session()
session.headers.update({
    "X-Tenant-ID": "your-tenant-id",
    "Authorization": "Bearer your-api-token"
})

response = session.get("https://atom-saas.fly.dev/api/agents")
agents = response.json()

**Pros:**

  • Simple to implement
  • Full control over requests
  • Language-agnostic

**Cons:**

  • Manual error handling
  • No built-in retry logic
  • Must manage rate limiting manually

Pattern 2: SDK Integration

**Best for:** Production applications, complex integrations

# Using ATOM SDK
from atom_marketplace import MarketplaceClient

client = MarketplaceClient(
    base_url="https://atom-saas.fly.dev/api",
    tenant_id="your-tenant-id",
    api_token="your-api-token"
)

agents = client.list_agents()

**Pros:**

  • Built-in error handling
  • Automatic retry logic
  • Type safety (TypeScript)
  • Rate limiting handled

**Cons:**

  • Additional dependency
  • SDK update maintenance

Pattern 3: Webhook Integration

**Best for:** Event-driven applications, real-time updates

# Webhook handler
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/webhooks/atom', methods=['POST'])
def handle_webhook():
    payload = request.get_data()
    signature = request.headers.get('X-Webhook-Signature')

    if not verify_webhook(payload, signature, WEBHOOK_SECRET):
        return jsonify({'error': 'Invalid signature'}), 401

    event = request.json
    handle_event(event)

    return jsonify({'status': 'processed'}), 200

**Pros:**

  • Real-time updates
  • Decoupled architecture
  • Event-driven

**Cons:**

  • Requires public endpoint
  • Signature verification required
  • Retry handling complexity

Pattern 4: Federation Integration

**Best for:** Multi-instance deployments, cross-tenant sharing

# Federation client
from atom_federation import FederationClient

client = FederationClient(
    base_url="https://atom-saas.fly.dev/api/federation",
    federation_key="your-federation-key",
    source_instance="https://your-instance.fly.dev"
)

# Discover agents from other instances
agents = client.discover_agents()

# Download agent bundle
bundle = client.download_agent_bundle(agent_id="agent-123")

**Pros:**

  • Cross-instance discovery
  • Shared component ecosystem
  • Standardized protocol

**Cons:**

  • Federation key management
  • Network latency
  • Version compatibility

Pattern 5: OAuth Integration

**Best for:** Integrating with third-party services that require OAuth authentication

ATOM supports OAuth 2.0 authentication for secure integration with popular third-party services. This enables agents to access external APIs on behalf of users without storing credentials.

**Supported OAuth Providers:**

ProviderOAuth VersionPKCE SupportStatus
**Airtable**2.0✅ Yes✅ Available
**Google**2.0✅ Yes✅ Available
**Slack**2.0✅ Yes✅ Available
**Salesforce**2.0✅ Yes✅ Available
**GitHub**2.0✅ Yes✅ Available
**Microsoft**2.0✅ Yes✅ Available

**OAuth 2.0 Flow with PKCE:**

# Step 1: Generate PKCE code verifier and challenge
import base64
import hashlib
import secrets
import urllib.parse

def generate_pkce_codes():
    # Generate code verifier (43-128 characters)
    code_verifier = secrets.token_urlsafe(32)

    # Create code challenge (SHA256 hash)
    challenge_bytes = hashlib.sha256(code_verifier.encode()).digest()
    code_challenge = base64.urlsafe_b64encode(challenge_bytes).decode().rstrip('=')

    return code_verifier, code_challenge

code_verifier, code_challenge = generate_pkce_codes()

# Step 2: Redirect user to OAuth authorization URL
auth_url = (
    f"https://airtable.com/oauth2/v1/authorize?"
    f"client_id={CLIENT_ID}&"
    f"redirect_uri={urllib.parse.quote_plus(REDIRECT_URI)}&"
    f"response_type=code&"
    f"scope=data.records:read data.records:write&"
    f"code_challenge={code_challenge}&"
    f"code_challenge_method=S256"
)

# Redirect user to auth_url
# User approves authorization

# Step 3: Exchange authorization code for access token
auth_code = request.args.get('code')  # From callback

token_response = requests.post(
    "https://airtable.com/oauth2/v1/token",
    data={
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "code": auth_code,
        "redirect_uri": REDIRECT_URI,
        "grant_type": "authorization_code",
        "code_verifier": code_verifier
    }
)

tokens = token_response.json()
access_token = tokens['access_token']
refresh_token = tokens.get('refresh_token')

**Airtable Integration Example:**

import requests
from atom_marketplace import MarketplaceClient

class AirtableIntegration:
    """Airtable integration using OAuth 2.0 with PKCE"""

    BASE_URL = "https://api.airtable.com/v0"

    def __init__(self, access_token, base_id, table_name):
        self.access_token = access_token
        self.base_id = base_id
        self.table_name = table_name
        self.headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json"
        }

    def list_records(self, max_records=100):
        """List records from Airtable table"""
        url = f"{self.BASE_URL}/{self.base_id}/{self.table_name}"
        params = {"maxRecords": max_records}

        response = requests.get(url, headers=self.headers, params=params)
        response.raise_for_status()

        return response.json().get('records', [])

    def create_record(self, fields):
        """Create a new record in Airtable"""
        url = f"{self.BASE_URL}/{self.base_id}/{self.table_name}"
        data = {"records": [{"fields": fields}]}

        response = requests.post(url, headers=self.headers, json=data)
        response.raise_for_status()

        return response.json().get('records', [])[0]

    def update_record(self, record_id, fields):
        """Update an existing record"""
        url = f"{self.BASE_URL}/{self.base_id}/{self.table_name}/{record_id}"
        data = {"fields": fields}

        response = requests.patch(url, headers=self.headers, json=data)
        response.raise_for_status()

        return response.json()

    def delete_record(self, record_id):
        """Delete a record"""
        url = f"{self.BASE_URL}/{self.base_id}/{self.table_name}/{record_id}"

        response = requests.delete(url, headers=self.headers)
        response.raise_for_status()

        return True

# Usage in agent skill
def execute_airtable_skill(input_data, context):
    """Example: Airtable skill for ATOM agents"""

    # Get OAuth token from tenant settings
    tenant_id = context['tenant_id']
    marketplace = MarketplaceClient(tenant_id=tenant_id)

    # Retrieve stored OAuth token
    oauth_config = marketplace.get_integration_config(
        integration_id="airtable"
    )

    access_token = oauth_config['access_token']
    base_id = input_data['base_id']
    table_name = input_data['table_name']

    # Initialize Airtable client
    airtable = AirtableIntegration(
        access_token=access_token,
        base_id=base_id,
        table_name=table_name
    )

    # Perform operation
    action = input_data['action']

    if action == 'list':
        records = airtable.list_records(
            max_records=input_data.get('max_records', 100)
        )
        return {"records": records}

    elif action == 'create':
        record = airtable.create_record(input_data['fields'])
        return {"record": record}

    elif action == 'update':
        record = airtable.update_record(
            record_id=input_data['record_id'],
            fields=input_data['fields']
        )
        return {"record": record}

    elif action == 'delete':
        result = airtable.delete_record(input_data['record_id'])
        return {"deleted": result}

    else:
        raise ValueError(f"Unknown action: {action}")

**OAuth Configuration in ATOM:**

# Store OAuth tokens securely
marketplace.set_integration_config(
    integration_id="airtable",
    config={
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_expires_at": expiration_time,
        "base_id": "appXXXXXXXXXXXXXX",
        "scopes": ["data.records:read", "data.records:write"]
    }
)

# Retrieve OAuth configuration
config = marketplace.get_integration_config(integration_id="airtable")

# Refresh token if expired
if config['token_expires_at'] < datetime.now():
    new_tokens = refresh_airtable_token(config['refresh_token'])
    marketplace.set_integration_config(
        integration_id="airtable",
        config={
            **config,
            "access_token": new_tokens['access_token'],
            "refresh_token": new_tokens['refresh_token'],
            "token_expires_at": calculate_expiration(new_tokens['expires_in'])
        }
    )

**OAuth Security Best Practices:**

  1. **Use PKCE**: Always use PKCE for mobile and SPA applications
  2. **Store tokens securely**: Encrypt tokens at rest in database
  3. **Use HTTPS**: Never transmit tokens over HTTP
  4. **Limit scopes**: Request minimum required permissions
  5. **Implement refresh**: Use refresh tokens instead of long-lived access tokens
  6. **Handle revocation**: Detect and handle token revocation gracefully
  7. **Validate tokens**: Verify token integrity before use

**Example: Complete OAuth Flow with ATOM**

from flask import Flask, request, redirect, session
import secrets

app = Flask(__name__)
app.secret_key = os.getenv("FLASK_SECRET_KEY")

# OAuth configuration
AIRTABLE_CLIENT_ID = os.getenv("AIRTABLE_CLIENT_ID")
AIRTABLE_CLIENT_SECRET = os.getenv("AIRTABLE_CLIENT_SECRET")
REDIRECT_URI = "https://your-app.com/oauth/callback"

@app.route('/oauth/airtable/start')
def start_oauth():
    """Initiate Airtable OAuth flow"""
    # Generate PKCE codes
    code_verifier, code_challenge = generate_pkce_codes()

    # Store verifier in session for callback
    session['code_verifier'] = code_verifier
    session['tenant_id'] = request.args.get('tenant_id')

    # Build authorization URL
    auth_url = (
        f"https://airtable.com/oauth2/v1/authorize?"
        f"client_id={AIRTABLE_CLIENT_ID}&"
        f"redirect_uri={urllib.parse.quote_plus(REDIRECT_URI)}&"
        f"response_type=code&"
        f"scope=data.records:read data.records:write&"
        f"code_challenge={code_challenge}&"
        f"code_challenge_method=S256&"
        f"state={secrets.token_urlsafe(16)}"
    )

    return redirect(auth_url)

@app.route('/oauth/callback')
def oauth_callback():
    """Handle OAuth callback"""
    # Verify state parameter (CSRF protection)
    state = request.args.get('state')
    if not session.get('oauth_state') == state:
        return "Invalid state parameter", 400

    # Get authorization code
    auth_code = request.args.get('code')
    code_verifier = session.get('code_verifier')

    # Exchange code for tokens
    token_response = requests.post(
        "https://airtable.com/oauth2/v1/token",
        data={
            "client_id": AIRTABLE_CLIENT_ID,
            "client_secret": AIRTABLE_CLIENT_SECRET,
            "code": auth_code,
            "redirect_uri": REDIRECT_URI,
            "grant_type": "authorization_code",
            "code_verifier": code_verifier
        }
    )

    if token_response.status_code != 200:
        return f"Token exchange failed: {token_response.text}", 400

    tokens = token_response.json()

    # Store tokens in ATOM
    tenant_id = session.get('tenant_id')
    marketplace = MarketplaceClient(tenant_id=tenant_id)

    marketplace.set_integration_config(
        integration_id="airtable",
        config={
            "access_token": tokens['access_token'],
            "refresh_token": tokens.get('refresh_token'),
            "token_expires_at": datetime.now() + timedelta(seconds=tokens.get('expires_in', 3600)),
            "scopes": ["data.records:read", "data.records:write"]
        }
    )

    # Clear session
    session.pop('code_verifier', None)
    session.pop('oauth_state', None)

    return redirect('/settings/integrations?success=airtable')

**Pros:**

  • Secure authentication without storing credentials
  • User-controlled permissions and revocation
  • Industry-standard security (PKCE)
  • Automatic token refresh handling

**Cons:**

  • More complex setup than API keys
  • Requires user interaction for authorization
  • Token expiration handling required
  • Provider-specific implementation details

---

Python Integration

Installation

pip install atom-marketplace atom-federation

Basic Setup

import os
from atom_marketplace import MarketplaceClient
from atom_federation import FederationClient

# Initialize clients
marketplace = MarketplaceClient(
    base_url=os.getenv("ATOM_BASE_URL", "https://atom-saas.fly.dev/api"),
    tenant_id=os.getenv("ATOM_TENANT_ID"),
    api_token=os.getenv("ATOM_API_TOKEN")
)

federation = FederationClient(
    base_url=os.getenv("ATOM_FEDERATION_URL", "https://atom-saas.fly.dev/api/federation"),
    federation_key=os.getenv("ATOM_FEDERATION_KEY"),
    source_instance=os.getenv("ATOM_INSTANCE_URL")
)

Listing Agents

# List all public agents
agents = marketplace.list_agents(
    marketplace_type="public",
    capability="sales",
    min_rating=4.0,
    page=1,
    page_size=50
)

for agent in agents.items:
    print(f"{agent['name']} - Rating: {agent['avg_rating']}")

Publishing Components

# Publish an agent
result = marketplace.publish_agent(
    agent_id="my_sales_agent",
    marketplace_type="public",
    name="My Sales Agent",
    description="Automated sales outreach and follow-up",
    capabilities=["sales", "crm", "outreach"],
    maturity_level="supervised",
    pricing_tier="team",
    tags=["sales", "automation"],
    documentation_url="https://docs.example.com"
)

print(f"Published: {result['component_id']}")
print(f"Status: {result['status']}")  # pending_approval, published

Installing Components

# Install an agent
result = marketplace.install_agent(
    agent_id="sales_automation_agent",
    source_tenant_id="publisher-tenant-id",
    target_tenant_id="your-tenant-id",
    install_config={
        "api_keys": {
            "salesforce": os.getenv("SALESFORCE_API_KEY")
        },
        "settings": {
            "auto_approve": False,
            "max_daily_tasks": 100
        }
    }
)

print(f"Installed: {result['installation_id']}")

Error Handling

from atom_marketplace.errors import (
    RateLimitError,
    ComponentNotFoundError,
    InsufficientPermissionsError,
    MarketplaceError
)

try:
    result = marketplace.install_agent(...)
except RateLimitError as e:
    print(f"Rate limited. Retry after {e.retry_after}s")
    time.sleep(e.retry_after)
except ComponentNotFoundError as e:
    print(f"Agent not found: {e.component_id}")
except InsufficientPermissionsError as e:
    print(f"Permission denied: {e.required_permission}")
except MarketplaceError as e:
    print(f"Marketplace error: {e.code} - {e.message}")
    # Log error for debugging
    logger.error(f"Marketplace error: {e.details}")

Async Integration

import asyncio
from atom_marketplace import AsyncMarketplaceClient

async def main():
    client = AsyncMarketplaceClient(...)

    # Concurrent requests
    agents, skills, canvases = await asyncio.gather(
        client.list_agents(),
        client.list_skills(),
        client.list_canvases()
    )

    print(f"Found {len(agents)} agents, {len(skills)} skills, {len(canvases)} canvases")

asyncio.run(main())

---

TypeScript/JavaScript Integration

Installation

npm install @atom-saas/marketplace-sdk @atom-saas/federation-sdk

Basic Setup

import { MarketplaceClient } from '@atom-saas/marketplace-sdk';
import { FederationClient } from '@atom-saas/federation-sdk';

// Initialize clients
const marketplace = new MarketplaceClient({
    baseUrl: process.env.ATOM_BASE_URL || 'https://atom-saas.fly.dev/api',
    tenantId: process.env.ATOM_TENANT_ID!,
    apiToken: process.env.ATOM_API_TOKEN!
});

const federation = new FederationClient({
    baseUrl: process.env.ATOM_FEDERATION_URL || 'https://atom-saas.fly.dev/api/federation',
    federationKey: process.env.ATOM_FEDERATION_KEY!,
    sourceInstance: process.env.ATOM_INSTANCE_URL!
});

Listing Agents

// List agents with filters
interface ListAgentsOptions {
    marketplaceType: 'public' | 'private';
    capability?: string;
    minRating?: number;
    page?: number;
    pageSize?: number;
}

const agents = await marketplace.listAgents({
    marketplaceType: 'public',
    capability: 'sales',
    minRating: 4.0,
    page: 1,
    pageSize: 50
});

agents.items.forEach(agent => {
    console.log(`${agent.name} - Rating: ${agent.avgRating}`);
});

Publishing Components

// Publish an agent
interface PublishAgentOptions {
    agentId: string;
    marketplaceType: 'public' | 'private';
    name: string;
    description: string;
    capabilities: string[];
    maturityLevel: 'student' | 'intern' | 'supervised' | 'autonomous';
    pricingTier: 'free' | 'solo' | 'team' | 'enterprise';
    tags: string[];
    documentationUrl?: string;
}

const result = await marketplace.publishAgent({
    agentId: 'my-sales-agent',
    marketplaceType: 'public',
    name: 'My Sales Agent',
    description: 'Automated sales outreach and follow-up',
    capabilities: ['sales', 'crm', 'outreach'],
    maturityLevel: 'supervised',
    pricingTier: 'team',
    tags: ['sales', 'automation'],
    documentationUrl: 'https://docs.example.com'
});

console.log(`Published: ${result.componentId}`);
console.log(`Status: ${result.status}`);

Installing Components

// Install an agent
interface InstallAgentOptions {
    agentId: string;
    sourceTenantId: string;
    targetTenantId: string;
    installConfig?: {
        apiKeys?: Record<string, string>;
        settings?: Record<string, any>;
    };
}

const result = await marketplace.installAgent({
    agentId: 'sales-automation-agent',
    sourceTenantId: 'publisher-tenant-id',
    targetTenantId: 'your-tenant-id',
    installConfig: {
        apiKeys: {
            salesforce: process.env.SALESFORCE_API_KEY!
        },
        settings: {
            autoApprove: false,
            maxDailyTasks: 100
        }
    }
});

console.log(`Installed: ${result.installationId}`);

Error Handling

import {
    MarketplaceClient,
    RateLimitError,
    ComponentNotFoundError,
    InsufficientPermissionsError,
    MarketplaceError
} from '@atom-saas/marketplace-sdk';

try {
    const result = await marketplace.installAgent(...);
} catch (error) {
    if (error instanceof RateLimitError) {
        console.log(`Rate limited. Retry after ${error.retryAfter}s`);
        await new Promise(resolve => setTimeout(resolve, error.retryAfter * 1000));
    } else if (error instanceof ComponentNotFoundError) {
        console.log(`Agent not found: ${error.componentId}`);
    } else if (error instanceof InsufficientPermissionsError) {
        console.log(`Permission denied: ${error.requiredPermission}`);
    } else if (error instanceof MarketplaceError) {
        console.log(`Marketplace error: ${error.code} - ${error.message}`);
        // Log error for debugging
        console.error('Marketplace error details:', error.details);
    }
}

React Integration

import React, { useState, useEffect } from 'react';
import { MarketplaceClient } from '@atom-saas/marketplace-sdk';

const client = new MarketplaceClient({
    baseUrl: process.env.NEXT_PUBLIC_ATOM_BASE_URL!,
    tenantId: process.env.NEXT_PUBLIC_ATOM_TENANT_ID!,
    apiToken: process.env.ATOM_API_TOKEN!
});

function AgentMarketplace() {
    const [agents, setAgents] = useState([]);
    const [loading, setLoading] = useState(true);
    const [error, setError] = useState<string | null>(null);

    useEffect(() => {
        async function loadAgents() {
            try {
                const response = await client.listAgents({
                    marketplaceType: 'public',
                    capability: 'sales'
                });
                setAgents(response.items);
            } catch (err) {
                setError(err instanceof Error ? err.message : 'Unknown error');
            } finally {
                setLoading(false);
            }
        }

        loadAgents();
    }, []);

    if (loading) return <div>Loading...</div>;
    if (error) return <div>Error: {error}</div>;

    return (
        <ul>
            {agents.map(agent => (
                <li key={agent.id}>
                    <h3>{agent.name}</h3>
                    <p>Rating: {agent.avgRating}</p>
                    <p>{agent.description}</p>
                </li>
            ))}
        </ul>
    );
}

export default AgentMarketplace;

Next.js Integration

// pages/api/agents.ts
import type { NextApiRequest, NextApiResponse } from 'next';
import { MarketplaceClient } from '@atom-saas/marketplace-sdk';

const client = new MarketplaceClient({
    baseUrl: process.env.ATOM_BASE_URL!,
    tenantId: process.env.ATOM_TENANT_ID!,
    apiToken: process.env.ATOM_API_TOKEN!
});

export default async function handler(
    req: NextApiRequest,
    res: NextApiResponse
) {
    if (req.method !== 'GET') {
        return res.status(405).json({ error: 'Method not allowed' });
    }

    try {
        const agents = await client.listAgents({
            marketplaceType: 'public',
            capability: req.query.capability as string
        });

        res.status(200).json(agents);
    } catch (error) {
        res.status(500).json({
            error: error instanceof Error ? error.message : 'Unknown error'
        });
    }
}

---

Go Integration

Installation

go get github.com/atom-saas/go-marketplace
go get github.com/atom-saas/go-federation

Basic Setup

package main

import (
    "os"
    "github.com/atom-saas/go-marketplace"
)

func main() {
    client := marketplace.NewClient(marketplace.Config{
        BaseURL:  getEnv("ATOM_BASE_URL", "https://atom-saas.fly.dev/api"),
        TenantID: os.Getenv("ATOM_TENANT_ID"),
        APIToken: os.Getenv("ATOM_API_TOKEN"),
    })

    // Use client...
}

func getEnv(key, defaultValue string) string {
    if value := os.Getenv(key); value != "" {
        return value
    }
    return defaultValue
}

Listing Agents

agents, err := client.ListAgents(marketplace.ListAgentsOptions{
    MarketplaceType: "public",
    Capability:      "sales",
    MinRating:       4.0,
    Page:            1,
    PageSize:        50,
})

if err != nil {
    log.Fatalf("Failed to list agents: %v", err)
}

for _, agent := range agents.Items {
    fmt.Printf("%s - Rating: %.1f\n", agent.Name, agent.AvgRating)
}

Publishing Components

result, err := client.PublishAgent(marketplace.PublishAgentOptions{
    AgentID:         "my-sales-agent",
    MarketplaceType: "public",
    Name:            "My Sales Agent",
    Description:     "Automated sales outreach and follow-up",
    Capabilities:    []string{"sales", "crm", "outreach"},
    MaturityLevel:   "supervised",
    PricingTier:     "team",
    Tags:            []string{"sales", "automation"},
    DocumentationURL: "https://docs.example.com",
})

if err != nil {
    log.Fatalf("Failed to publish agent: %v", err)
}

fmt.Printf("Published: %s\n", result.ComponentID)
fmt.Printf("Status: %s\n", result.Status)

Error Handling

result, err := client.InstallAgent(marketplace.InstallAgentOptions{
    AgentID:        "sales-automation-agent",
    SourceTenantID: "publisher-tenant-id",
    TargetTenantID: "your-tenant-id",
})

if err != nil {
    switch e := err.(type) {
    case *marketplace.RateLimitError:
        log.Printf("Rate limited. Retry after %d seconds", e.RetryAfter)
        time.Sleep(time.Duration(e.RetryAfter) * time.Second)
    case *marketplace.ComponentNotFoundError:
        log.Printf("Agent not found: %s", e.ComponentID)
    case *marketplace.InsufficientPermissionsError:
        log.Printf("Permission denied: %s", e.RequiredPermission)
    default:
        log.Printf("Marketplace error: %v", err)
    }
}

---

cURL Examples

Authentication

# List agents with API token
curl -X GET "https://atom-saas.fly.dev/api/agent-marketplace/agents" \
  -H "X-Tenant-ID: your-tenant-id" \
  -H "Authorization: Bearer your-api-token" \
  -G \
  --data-urlencode "marketplace_type=public"

Listing Components

# List agents
curl -X GET "https://atom-saas.fly.dev/api/agent-marketplace/agents" \
  -H "X-Tenant-ID: your-tenant-id" \
  -H "Authorization: Bearer your-api-token" \
  -G \
  --data-urlencode "marketplace_type=public" \
  --data-urlencode "capability=sales" \
  --data-urlencode "min_rating=4.0"

# List skills
curl -X GET "https://atom-saas.fly.dev/api/skill-marketplace/skills" \
  -H "X-Tenant-ID: your-tenant-id" \
  -H "Authorization: Bearer your-api-token" \
  -G \
  --data-urlencode "marketplace_type=public"

# List canvases
curl -X GET "https://atom-saas.fly.dev/api/canvas-marketplace/components" \
  -H "X-Tenant-ID: your-tenant-id" \
  -H "Authorization: Bearer your-api-token"

Publishing Components

# Publish agent
curl -X POST "https://atom-saas.fly.dev/api/agent-marketplace/publish" \
  -H "X-Tenant-ID: your-tenant-id" \
  -H "Authorization: Bearer your-api-token" \
  -H "Content-Type: application/json" \
  -d '{
    "agent_id": "my-sales-agent",
    "marketplace_type": "public",
    "name": "My Sales Agent",
    "description": "Automated sales outreach",
    "capabilities": ["sales", "crm"],
    "maturity_level": "supervised",
    "pricing_tier": "team",
    "tags": ["sales", "automation"]
  }'

Installing Components

# Install agent
curl -X POST "https://atom-saas.fly.dev/api/agent-marketplace/install" \
  -H "X-Tenant-ID: your-tenant-id" \
  -H "Authorization: Bearer your-api-token" \
  -H "Content-Type: application/json" \
  -d '{
    "agent_id": "sales-automation-agent",
    "source_tenant_id": "publisher-tenant-id",
    "target_tenant_id": "your-tenant-id"
  }'

Federation

# Discover agents via federation
curl -X GET "https://atom-saas.fly.dev/api/federation/v1/marketplace/agents" \
  -H "X-Federation-Key: your-federation-key" \
  -H "X-Federation-Source: https://your-instance.fly.dev"

# Download agent bundle
curl -X GET "https://atom-saas.fly.dev/api/federation/v1/marketplace/agents/{agent_id}/bundle" \
  -H "X-Federation-Key: your-federation-key" \
  -H "X-Federation-Source: https://your-instance.fly.dev" \
  -o agent-bundle.json

Webhooks

# Create webhook
curl -X POST "https://atom-saas.fly.dev/api/webhooks" \
  -H "X-Tenant-ID: your-tenant-id" \
  -H "Authorization: Bearer your-api-token" \
  -H "Content-Type: application/json" \
  -d '{
    "url": "https://your-app.com/webhooks/atom",
    "events": ["component.installed", "component.updated"],
    "secret": "your-webhook-secret"
  }'

---

Complete Workflows

Workflow 1: Publish and Install Agent

# Step 1: Publish agent
result = marketplace.publish_agent(
    agent_id="my_sales_agent",
    marketplace_type="public",
    name="My Sales Agent",
    description="Automated sales outreach",
    capabilities=["sales"],
    maturity_level="supervised",
    pricing_tier="team",
    tags=["sales"]
)

print(f"Published: {result['component_id']}")
print(f"Status: {result['status']}")  # pending_approval

# Step 2: Wait for approval (poll status)
while True:
    status = marketplace.get_publication_status(result['component_id'])
    if status['status'] == 'published':
        break
    time.sleep(60)

# Step 3: Another tenant installs agent
result = marketplace.install_agent(
    agent_id="my_sales_agent",
    source_tenant_id="publisher-tenant-id",
    target_tenant_id="your-tenant-id",
    install_config={
        "api_keys": {"salesforce": "your-key"},
        "settings": {"auto_approve": False}
    }
)

print(f"Installed: {result['installation_id']}")

Workflow 2: Multi-Instance Federation

# Instance A: Publish agent
marketplace_a = MarketplaceClient(tenant_id="tenant-a", ...)
marketplace_a.publish_agent(agent_id="shared_agent", ...)

# Instance B: Discover agents from Instance A
federation_b = FederationClient(...)
agents = federation_b.discover_agents()

# Instance B: Download and install agent
for agent in agents:
    if agent['agent_id'] == 'shared_agent':
        bundle = federation_b.download_agent_bundle(agent['agent_id'])

        # Install from bundle
        marketplace_b.install_from_bundle(
            bundle=bundle,
            source_instance=agent['source_instance']
        )

Workflow 3: Event-Driven Integration

from flask import Flask, request, jsonify
from atom_marketplace import MarketplaceClient

app = Flask(__name__)
client = MarketplaceClient(...)

@app.route('/webhooks/atom', methods=['POST'])
def handle_webhook():
    # Verify signature
    payload = request.get_data()
    signature = request.headers.get('X-Webhook-Signature')

    if not verify_webhook(payload, signature, WEBHOOK_SECRET):
        return jsonify({'error': 'Invalid signature'}), 401

    # Handle event
    event = request.json

    if event['type'] == 'component.installed':
        # Trigger custom workflow
        on_component_installed(event)

    return jsonify({'status': 'processed'}), 200

def on_component_installed(event):
    # Custom business logic
    agent_id = event['component_id']
    tenant_id = event['tenant_id']

    # Send notification
    send_notification(f"Agent {agent_id} installed by {tenant_id}")

    # Update internal systems
    update_inventory(agent_id, tenant_id)

if __name__ == '__main__':
    app.run(port=8080)

---

Testing Integrations

Unit Testing

import pytest
from unittest.mock import Mock, patch
from atom_marketplace import MarketplaceClient

@pytest.fixture
def client():
    return MarketplaceClient(
        base_url="https://atom-saas.fly.dev/api",
        tenant_id="test-tenant",
        api_token="test-token"
    )

@patch('requests.Session.get')
def test_list_agents(mock_get, client):
    # Mock response
    mock_get.return_value.json.return_value = {
        "items": [
            {"id": "agent-1", "name": "Agent 1"},
            {"id": "agent-2", "name": "Agent 2"}
        ],
        "total": 2,
        "page": 1
    }

    # Test
    agents = client.list_agents()

    assert len(agents.items) == 2
    assert agents.items[0]["id"] == "agent-1"

Integration Testing

import pytest
from atom_marketplace import MarketplaceClient

@pytest.fixture
def client():
    return MarketplaceClient(
        base_url="https://atom-saas.fly.dev/api/test",
        tenant_id="test-tenant-id",
        api_token="test-api-token"
    )

def test_list_agents_integration(client):
    """Test against test environment"""
    agents = client.list_agents(marketplace_type="public")

    assert len(agents.items) > 0
    assert agents.items[0]["id"] is not None

def test_install_agent_integration(client):
    """Test installation workflow"""
    result = client.install_agent(
        agent_id="test-agent",
        target_tenant_id="test-tenant-id"
    )

    assert result["status"] == "installed"
    assert "installation_id" in result

End-to-End Testing

def test_publish_to_install_workflow():
    """Test complete publish → install workflow"""

    # Step 1: Publish to private marketplace (no approval needed)
    result = client.publish_agent(
        agent_id="test-agent",
        marketplace_type="private",
        name="Test Agent",
        description="Test agent for E2E",
        capabilities=["test"],
        maturity_level="student",
        pricing_tier="free",
        tags=["test"]
    )

    assert result["status"] == "published"

    # Step 2: List agents
    agents = client.list_agents(marketplace_type="private")
    test_agent = next(a for a in agents.items if a["id"] == "test-agent")

    assert test_agent is not None

    # Step 3: Install agent
    result = client.install_agent(
        agent_id="test-agent",
        target_tenant_id="test-tenant-id"
    )

    assert result["status"] == "installed"

    # Step 4: Verify installation
    installations = client.list_installations()
    test_installation = next(i for i in installations if i["agent_id"] == "test-agent")

    assert test_installation is not None

---

Deployment Considerations

Environment Variables

# Required
ATOM_BASE_URL=https://atom-saas.fly.dev/api
ATOM_TENANT_ID=your-tenant-id
ATOM_API_TOKEN=your-api-token

# Optional (federation)
ATOM_FEDERATION_URL=https://atom-saas.fly.dev/api/federation
ATOM_FEDERATION_KEY=your-federation-key
ATOM_INSTANCE_URL=https://your-instance.fly.dev

# Optional (webhooks)
WEBHOOK_SECRET=your-webhook-secret
WEBHOOK_URL=https://your-app.com/webhooks/atom

Docker Deployment

FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Set environment variables
ENV ATOM_BASE_URL=https://atom-saas.fly.dev/api

# Run application
CMD ["python", "app.py"]

Kubernetes Deployment

apiVersion: v1
kind: ConfigMap
metadata:
  name: atom-config
data:
  ATOM_BASE_URL: "https://atom-saas.fly.dev/api"
  ATOM_TENANT_ID: "your-tenant-id"
---
apiVersion: v1
kind: Secret
metadata:
  name: atom-secrets
type: Opaque
stringData:
  ATOM_API_TOKEN: "your-api-token"
  ATOM_FEDERATION_KEY: "your-federation-key"
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: atom-integration
spec:
  replicas: 3
  selector:
    matchLabels:
      app: atom-integration
  template:
    metadata:
      labels:
        app: atom-integration
    spec:
      containers:
      - name: app
        image: your-registry/atom-integration:latest
        envFrom:
        - configMapRef:
            name: atom-config
        - secretRef:
            name: atom-secrets
        ports:
        - containerPort: 8080

Monitoring

# Add metrics collection
from prometheus_client import Counter, Histogram

# Metrics
request_counter = Counter('atom_requests_total', 'Total ATOM requests')
request_duration = Histogram('atom_request_duration_seconds', 'Request duration')

# Use in client
request_counter.inc()
with request_duration.time():
    agents = client.list_agents()

---

Performance Optimization

Caching

from functools import lru_cache
import time

@lru_cache(maxsize=1000)
def get_agent_with_cache(agent_id: str, cache_age: int = 3600):
    """Cache agent details for 1 hour"""
    return client.get_agent(agent_id)

# Clear cache periodically
def clear_cache_periodically():
    while True:
        time.sleep(3600)
        get_agent_with_cache.cache_clear()

Connection Pooling

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

session = requests.Session()

# Configure retry strategy
retry_strategy = Retry(
    total=3,
    backoff_factor=1,
    status_forcelist=[429, 500, 502, 503, 504]
)

# Mount adapter with connection pooling
adapter = HTTPAdapter(
    pool_connections=100,
    pool_maxsize=100,
    max_retries=retry_strategy
)

session.mount("https://", adapter)
session.mount("http://", adapter)

Batch Operations

# Instead of multiple individual calls
for agent_id in agent_ids:
    client.install_agent(agent_id, ...)

# Use bulk operation
client.bulk_install_agents(agent_ids, ...)

---

Security Best Practices

Credential Management

# Use environment variables (never hardcode)
import os

api_token = os.getenv("ATOM_API_TOKEN")
if not api_token:
    raise ValueError("ATOM_API_TOKEN not set")

# Use secret management in production
# AWS Secrets Manager, Azure Key Vault, etc.

Webhook Signature Verification

import hmac
import hashlib

def verify_webhook(payload: str, signature: str, secret: str) -> bool:
    """Verify webhook signature"""
    expected_signature = hmac.new(
        secret.encode(),
        payload.encode(),
        hashlib.sha256
    ).hexdigest()

    # Use constant-time comparison
    return hmac.compare_digest(expected_signature, signature)

Rate Limiting

from ratelimit import limits, sleep_and_retry

@sleep_and_retry
@limits(calls=100, period=60)  # 100 calls per minute
def rate_limited_api_call():
    return client.list_agents()

Input Validation

def validate_agent_id(agent_id: str) -> bool:
    """Validate agent ID format"""
    import re

    if not re.match(r'^[a-zA-Z0-9_-]+$', agent_id):
        raise ValueError("Invalid agent ID format")

    if len(agent_id) > 100:
        raise ValueError("Agent ID too long")

    return True

---

Canvas and Episode Integration

Recording Canvas Executions

From Plan 276-02, canvas executions are automatically recorded as episodes with full tenant context.

from core.canvas_skill_integration import record_canvas_execution

async def execute_canvas_with_episode(
    tenant_id: str,
    canvas_id: str,
    execution_data: dict
):
    """Execute canvas and record episode with tenant isolation."""

    # Execute canvas
    result = await canvas_executor.execute(
        tenant_id=tenant_id,
        canvas_id=canvas_id,
        data=execution_data
    )

    # Record episode with tenant context (from Plan 276-02)
    episode = await record_canvas_execution(
        tenant_id=tenant_id,
        canvas_id=canvas_id,
        execution_data={
            "result": result,
            "timestamp": datetime.utcnow(),
            **execution_data
        }
    )

    return {
        "result": result,
        "episode_id": episode.id,
        "tenant_id": tenant_id
    }

Testing Canvas Integration

import pytest
from tests.fixtures.tenant_fixtures import test_tenant
from tests.fixtures.canvas_factory import CanvasFactory
from core.canvas_skill_integration import record_canvas_execution

@pytest.mark.integration
def test_canvas_episode_recording(test_tenant):
    """Test canvas execution records episode with tenant isolation."""

    # Create canvas with tenant
    canvas = CanvasFactory.create(
        _session=db,
        tenant_id=test_tenant.id,
        name="Test Canvas"
    )

    # Execute and record episode
    episode = await record_canvas_execution(
        tenant_id=test_tenant.id,
        canvas_id=canvas.id,
        execution_data={"action": "test"}
    )

    # Verify tenant isolation
    assert episode.canvas_id == canvas.id
    assert episode.tenant_id == test_tenant.id

    # Verify episode only visible to this tenant
    episodes = db.query(Episode).filter(
        Episode.tenant_id == test_tenant.id
    ).all()

    assert episode.id in [e.id for e in episodes]

Episode Feedback Integration

from core.episode_service import EpisodeService

async def submit_episode_feedback(
    tenant_id: str,
    episode_id: str,
    feedback_score: float,
    feedback_text: str,
    category: str
):
    """Submit feedback for episode with tenant isolation."""

    # Verify episode belongs to tenant
    episode = db.query(Episode).filter(
        Episode.tenant_id == tenant_id,
        Episode.id == episode_id
    ).first()

    if not episode:
        raise ValueError(f"Episode {episode_id} not found for tenant {tenant_id}")

    # Submit feedback
    service = EpisodeService(db)
    feedback = await service.submit_feedback(
        episode_id=episode_id,
        score=feedback_score,
        feedback=feedback_text,
        category=category
    )

    return {
        "feedback_id": feedback.id,
        "episode_id": episode_id,
        "tenant_id": tenant_id
    }

Canvas-Aware World Model

From Plan 276-02, the World Model now integrates canvas context:

from core.agent_world_model import WorldModelService

async def recall_canvas_experiences(
    tenant_id: str,
    agent_id: str,
    canvas_id: str
):
    """Recall experiences from canvas executions with tenant isolation."""

    world_model = WorldModelService(db)

    # Recall canvas-aware experiences
    experiences = await world_model.recall_canvas_experiences(
        tenant_id=tenant_id,
        agent_id=agent_id,
        canvas_id=canvas_id,
        limit=5
    )

    return {
        "experiences": experiences,
        "tenant_id": tenant_id,
        "canvas_id": canvas_id
    }

Multi-Tenant Canvas Testing

@pytest.mark.integration
def test_canvas_tenant_isolation(test_tenant, pro_tenant):
    """Verify canvas executions are isolated between tenants."""

    # Create canvases in different tenants
    canvas1 = CanvasFactory.create(
        _session=db,
        tenant_id=test_tenant.id,
        name="Canvas 1"
    )

    canvas2 = CanvasFactory.create(
        _session=db,
        tenant_id=pro_tenant.id,
        name="Canvas 2"
    )

    # Execute both canvases
    episode1 = await record_canvas_execution(
        tenant_id=test_tenant.id,
        canvas_id=canvas1.id,
        execution_data={"action": "test"}
    )

    episode2 = await record_canvas_execution(
        tenant_id=pro_tenant.id,
        canvas_id=canvas2.id,
        execution_data={"action": "test"}
    )

    # Verify isolation
    assert episode1.tenant_id == test_tenant.id
    assert episode2.tenant_id == pro_tenant.id

    # Query episodes for tenant 1
    tenant1_episodes = db.query(Episode).filter(
        Episode.tenant_id == test_tenant.id
    ).all()

    # Verify episode2 not in tenant1's episodes
    assert episode2.id not in [e.id for e in tenant1_episodes]

---

Additional Resources

  • **Multi-Tenant Patterns:** /docs/MULTI_TENANT_PATTERNS.md - Comprehensive tenant isolation patterns
  • **Developer Guide:** /docs/DEVELOPER_GUIDE.md
  • **API Documentation:** /docs/api/overview.md
  • **Marketplace API:** /docs/api/marketplace-api.md
  • **Federation Protocol:** /docs/FEDERATION_PROTOCOL.md
  • **Canvas Integration:** /docs/CANVAS_SKILL_INTEGRATION.md
  • **Support:** support@atom-saas.com
  • **Community:** https://community.atom-saas.com

---

Changelog

v13.1 (2026-04-10)

  • Added Canvas and Episode integration section
  • Canvas execution recording with tenant isolation
  • Episode feedback integration patterns
  • Canvas-aware World Model integration
  • Multi-tenant canvas testing examples
  • Patterns from Plan 276-02 (integration service fixes)

v13.0 (2026-04-05)

  • Initial integration guide release
  • Python, TypeScript, Go integration examples
  • Complete workflow examples
  • Testing strategies
  • Deployment considerations
  • Security best practices