ATOM SaaS - Integration Guide
**Version:** v13.0
**Last Updated:** 2026-04-05
This guide provides integration patterns, code examples, and deployment considerations for building applications on top of the ATOM SaaS platform.
Table of Contents
- Integration Patterns
- Python Integration
- TypeScript/JavaScript Integration
- Go Integration
- cURL Examples
- Complete Workflows
- Testing Integrations
- Deployment Considerations
- Performance Optimization
- Security Best Practices
---
Integration Patterns
Pattern 1: Direct API Integration
**Best for:** Simple applications, prototypes, quick integrations
# Direct API calls with authentication
import requests
session = requests.Session()
session.headers.update({
"X-Tenant-ID": "your-tenant-id",
"Authorization": "Bearer your-api-token"
})
response = session.get("https://atom-saas.fly.dev/api/agents")
agents = response.json()**Pros:**
- Simple to implement
- Full control over requests
- Language-agnostic
**Cons:**
- Manual error handling
- No built-in retry logic
- Must manage rate limiting manually
Pattern 2: SDK Integration
**Best for:** Production applications, complex integrations
# Using ATOM SDK
from atom_marketplace import MarketplaceClient
client = MarketplaceClient(
base_url="https://atom-saas.fly.dev/api",
tenant_id="your-tenant-id",
api_token="your-api-token"
)
agents = client.list_agents()**Pros:**
- Built-in error handling
- Automatic retry logic
- Type safety (TypeScript)
- Rate limiting handled
**Cons:**
- Additional dependency
- SDK update maintenance
Pattern 3: Webhook Integration
**Best for:** Event-driven applications, real-time updates
# Webhook handler
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/webhooks/atom', methods=['POST'])
def handle_webhook():
payload = request.get_data()
signature = request.headers.get('X-Webhook-Signature')
if not verify_webhook(payload, signature, WEBHOOK_SECRET):
return jsonify({'error': 'Invalid signature'}), 401
event = request.json
handle_event(event)
return jsonify({'status': 'processed'}), 200**Pros:**
- Real-time updates
- Decoupled architecture
- Event-driven
**Cons:**
- Requires public endpoint
- Signature verification required
- Retry handling complexity
Pattern 4: Federation Integration
**Best for:** Multi-instance deployments, cross-tenant sharing
# Federation client
from atom_federation import FederationClient
client = FederationClient(
base_url="https://atom-saas.fly.dev/api/federation",
federation_key="your-federation-key",
source_instance="https://your-instance.fly.dev"
)
# Discover agents from other instances
agents = client.discover_agents()
# Download agent bundle
bundle = client.download_agent_bundle(agent_id="agent-123")**Pros:**
- Cross-instance discovery
- Shared component ecosystem
- Standardized protocol
**Cons:**
- Federation key management
- Network latency
- Version compatibility
Pattern 5: OAuth Integration
**Best for:** Integrating with third-party services that require OAuth authentication
ATOM supports OAuth 2.0 authentication for secure integration with popular third-party services. This enables agents to access external APIs on behalf of users without storing credentials.
**Supported OAuth Providers:**
| Provider | OAuth Version | PKCE Support | Status |
|---|---|---|---|
| **Airtable** | 2.0 | ✅ Yes | ✅ Available |
| **Google** | 2.0 | ✅ Yes | ✅ Available |
| **Slack** | 2.0 | ✅ Yes | ✅ Available |
| **Salesforce** | 2.0 | ✅ Yes | ✅ Available |
| **GitHub** | 2.0 | ✅ Yes | ✅ Available |
| **Microsoft** | 2.0 | ✅ Yes | ✅ Available |
**OAuth 2.0 Flow with PKCE:**
# Step 1: Generate PKCE code verifier and challenge
import base64
import hashlib
import secrets
import urllib.parse
def generate_pkce_codes():
# Generate code verifier (43-128 characters)
code_verifier = secrets.token_urlsafe(32)
# Create code challenge (SHA256 hash)
challenge_bytes = hashlib.sha256(code_verifier.encode()).digest()
code_challenge = base64.urlsafe_b64encode(challenge_bytes).decode().rstrip('=')
return code_verifier, code_challenge
code_verifier, code_challenge = generate_pkce_codes()
# Step 2: Redirect user to OAuth authorization URL
auth_url = (
f"https://airtable.com/oauth2/v1/authorize?"
f"client_id={CLIENT_ID}&"
f"redirect_uri={urllib.parse.quote_plus(REDIRECT_URI)}&"
f"response_type=code&"
f"scope=data.records:read data.records:write&"
f"code_challenge={code_challenge}&"
f"code_challenge_method=S256"
)
# Redirect user to auth_url
# User approves authorization
# Step 3: Exchange authorization code for access token
auth_code = request.args.get('code') # From callback
token_response = requests.post(
"https://airtable.com/oauth2/v1/token",
data={
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"code": auth_code,
"redirect_uri": REDIRECT_URI,
"grant_type": "authorization_code",
"code_verifier": code_verifier
}
)
tokens = token_response.json()
access_token = tokens['access_token']
refresh_token = tokens.get('refresh_token')**Airtable Integration Example:**
import requests
from atom_marketplace import MarketplaceClient
class AirtableIntegration:
"""Airtable integration using OAuth 2.0 with PKCE"""
BASE_URL = "https://api.airtable.com/v0"
def __init__(self, access_token, base_id, table_name):
self.access_token = access_token
self.base_id = base_id
self.table_name = table_name
self.headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
def list_records(self, max_records=100):
"""List records from Airtable table"""
url = f"{self.BASE_URL}/{self.base_id}/{self.table_name}"
params = {"maxRecords": max_records}
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
return response.json().get('records', [])
def create_record(self, fields):
"""Create a new record in Airtable"""
url = f"{self.BASE_URL}/{self.base_id}/{self.table_name}"
data = {"records": [{"fields": fields}]}
response = requests.post(url, headers=self.headers, json=data)
response.raise_for_status()
return response.json().get('records', [])[0]
def update_record(self, record_id, fields):
"""Update an existing record"""
url = f"{self.BASE_URL}/{self.base_id}/{self.table_name}/{record_id}"
data = {"fields": fields}
response = requests.patch(url, headers=self.headers, json=data)
response.raise_for_status()
return response.json()
def delete_record(self, record_id):
"""Delete a record"""
url = f"{self.BASE_URL}/{self.base_id}/{self.table_name}/{record_id}"
response = requests.delete(url, headers=self.headers)
response.raise_for_status()
return True
# Usage in agent skill
def execute_airtable_skill(input_data, context):
"""Example: Airtable skill for ATOM agents"""
# Get OAuth token from tenant settings
tenant_id = context['tenant_id']
marketplace = MarketplaceClient(tenant_id=tenant_id)
# Retrieve stored OAuth token
oauth_config = marketplace.get_integration_config(
integration_id="airtable"
)
access_token = oauth_config['access_token']
base_id = input_data['base_id']
table_name = input_data['table_name']
# Initialize Airtable client
airtable = AirtableIntegration(
access_token=access_token,
base_id=base_id,
table_name=table_name
)
# Perform operation
action = input_data['action']
if action == 'list':
records = airtable.list_records(
max_records=input_data.get('max_records', 100)
)
return {"records": records}
elif action == 'create':
record = airtable.create_record(input_data['fields'])
return {"record": record}
elif action == 'update':
record = airtable.update_record(
record_id=input_data['record_id'],
fields=input_data['fields']
)
return {"record": record}
elif action == 'delete':
result = airtable.delete_record(input_data['record_id'])
return {"deleted": result}
else:
raise ValueError(f"Unknown action: {action}")**OAuth Configuration in ATOM:**
# Store OAuth tokens securely
marketplace.set_integration_config(
integration_id="airtable",
config={
"access_token": access_token,
"refresh_token": refresh_token,
"token_expires_at": expiration_time,
"base_id": "appXXXXXXXXXXXXXX",
"scopes": ["data.records:read", "data.records:write"]
}
)
# Retrieve OAuth configuration
config = marketplace.get_integration_config(integration_id="airtable")
# Refresh token if expired
if config['token_expires_at'] < datetime.now():
new_tokens = refresh_airtable_token(config['refresh_token'])
marketplace.set_integration_config(
integration_id="airtable",
config={
**config,
"access_token": new_tokens['access_token'],
"refresh_token": new_tokens['refresh_token'],
"token_expires_at": calculate_expiration(new_tokens['expires_in'])
}
)**OAuth Security Best Practices:**
- **Use PKCE**: Always use PKCE for mobile and SPA applications
- **Store tokens securely**: Encrypt tokens at rest in database
- **Use HTTPS**: Never transmit tokens over HTTP
- **Limit scopes**: Request minimum required permissions
- **Implement refresh**: Use refresh tokens instead of long-lived access tokens
- **Handle revocation**: Detect and handle token revocation gracefully
- **Validate tokens**: Verify token integrity before use
**Example: Complete OAuth Flow with ATOM**
from flask import Flask, request, redirect, session
import secrets
app = Flask(__name__)
app.secret_key = os.getenv("FLASK_SECRET_KEY")
# OAuth configuration
AIRTABLE_CLIENT_ID = os.getenv("AIRTABLE_CLIENT_ID")
AIRTABLE_CLIENT_SECRET = os.getenv("AIRTABLE_CLIENT_SECRET")
REDIRECT_URI = "https://your-app.com/oauth/callback"
@app.route('/oauth/airtable/start')
def start_oauth():
"""Initiate Airtable OAuth flow"""
# Generate PKCE codes
code_verifier, code_challenge = generate_pkce_codes()
# Store verifier in session for callback
session['code_verifier'] = code_verifier
session['tenant_id'] = request.args.get('tenant_id')
# Build authorization URL
auth_url = (
f"https://airtable.com/oauth2/v1/authorize?"
f"client_id={AIRTABLE_CLIENT_ID}&"
f"redirect_uri={urllib.parse.quote_plus(REDIRECT_URI)}&"
f"response_type=code&"
f"scope=data.records:read data.records:write&"
f"code_challenge={code_challenge}&"
f"code_challenge_method=S256&"
f"state={secrets.token_urlsafe(16)}"
)
return redirect(auth_url)
@app.route('/oauth/callback')
def oauth_callback():
"""Handle OAuth callback"""
# Verify state parameter (CSRF protection)
state = request.args.get('state')
if not session.get('oauth_state') == state:
return "Invalid state parameter", 400
# Get authorization code
auth_code = request.args.get('code')
code_verifier = session.get('code_verifier')
# Exchange code for tokens
token_response = requests.post(
"https://airtable.com/oauth2/v1/token",
data={
"client_id": AIRTABLE_CLIENT_ID,
"client_secret": AIRTABLE_CLIENT_SECRET,
"code": auth_code,
"redirect_uri": REDIRECT_URI,
"grant_type": "authorization_code",
"code_verifier": code_verifier
}
)
if token_response.status_code != 200:
return f"Token exchange failed: {token_response.text}", 400
tokens = token_response.json()
# Store tokens in ATOM
tenant_id = session.get('tenant_id')
marketplace = MarketplaceClient(tenant_id=tenant_id)
marketplace.set_integration_config(
integration_id="airtable",
config={
"access_token": tokens['access_token'],
"refresh_token": tokens.get('refresh_token'),
"token_expires_at": datetime.now() + timedelta(seconds=tokens.get('expires_in', 3600)),
"scopes": ["data.records:read", "data.records:write"]
}
)
# Clear session
session.pop('code_verifier', None)
session.pop('oauth_state', None)
return redirect('/settings/integrations?success=airtable')**Pros:**
- Secure authentication without storing credentials
- User-controlled permissions and revocation
- Industry-standard security (PKCE)
- Automatic token refresh handling
**Cons:**
- More complex setup than API keys
- Requires user interaction for authorization
- Token expiration handling required
- Provider-specific implementation details
---
Python Integration
Installation
pip install atom-marketplace atom-federationBasic Setup
import os
from atom_marketplace import MarketplaceClient
from atom_federation import FederationClient
# Initialize clients
marketplace = MarketplaceClient(
base_url=os.getenv("ATOM_BASE_URL", "https://atom-saas.fly.dev/api"),
tenant_id=os.getenv("ATOM_TENANT_ID"),
api_token=os.getenv("ATOM_API_TOKEN")
)
federation = FederationClient(
base_url=os.getenv("ATOM_FEDERATION_URL", "https://atom-saas.fly.dev/api/federation"),
federation_key=os.getenv("ATOM_FEDERATION_KEY"),
source_instance=os.getenv("ATOM_INSTANCE_URL")
)Listing Agents
# List all public agents
agents = marketplace.list_agents(
marketplace_type="public",
capability="sales",
min_rating=4.0,
page=1,
page_size=50
)
for agent in agents.items:
print(f"{agent['name']} - Rating: {agent['avg_rating']}")Publishing Components
# Publish an agent
result = marketplace.publish_agent(
agent_id="my_sales_agent",
marketplace_type="public",
name="My Sales Agent",
description="Automated sales outreach and follow-up",
capabilities=["sales", "crm", "outreach"],
maturity_level="supervised",
pricing_tier="team",
tags=["sales", "automation"],
documentation_url="https://docs.example.com"
)
print(f"Published: {result['component_id']}")
print(f"Status: {result['status']}") # pending_approval, publishedInstalling Components
# Install an agent
result = marketplace.install_agent(
agent_id="sales_automation_agent",
source_tenant_id="publisher-tenant-id",
target_tenant_id="your-tenant-id",
install_config={
"api_keys": {
"salesforce": os.getenv("SALESFORCE_API_KEY")
},
"settings": {
"auto_approve": False,
"max_daily_tasks": 100
}
}
)
print(f"Installed: {result['installation_id']}")Error Handling
from atom_marketplace.errors import (
RateLimitError,
ComponentNotFoundError,
InsufficientPermissionsError,
MarketplaceError
)
try:
result = marketplace.install_agent(...)
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after}s")
time.sleep(e.retry_after)
except ComponentNotFoundError as e:
print(f"Agent not found: {e.component_id}")
except InsufficientPermissionsError as e:
print(f"Permission denied: {e.required_permission}")
except MarketplaceError as e:
print(f"Marketplace error: {e.code} - {e.message}")
# Log error for debugging
logger.error(f"Marketplace error: {e.details}")Async Integration
import asyncio
from atom_marketplace import AsyncMarketplaceClient
async def main():
client = AsyncMarketplaceClient(...)
# Concurrent requests
agents, skills, canvases = await asyncio.gather(
client.list_agents(),
client.list_skills(),
client.list_canvases()
)
print(f"Found {len(agents)} agents, {len(skills)} skills, {len(canvases)} canvases")
asyncio.run(main())---
TypeScript/JavaScript Integration
Installation
npm install @atom-saas/marketplace-sdk @atom-saas/federation-sdkBasic Setup
import { MarketplaceClient } from '@atom-saas/marketplace-sdk';
import { FederationClient } from '@atom-saas/federation-sdk';
// Initialize clients
const marketplace = new MarketplaceClient({
baseUrl: process.env.ATOM_BASE_URL || 'https://atom-saas.fly.dev/api',
tenantId: process.env.ATOM_TENANT_ID!,
apiToken: process.env.ATOM_API_TOKEN!
});
const federation = new FederationClient({
baseUrl: process.env.ATOM_FEDERATION_URL || 'https://atom-saas.fly.dev/api/federation',
federationKey: process.env.ATOM_FEDERATION_KEY!,
sourceInstance: process.env.ATOM_INSTANCE_URL!
});Listing Agents
// List agents with filters
interface ListAgentsOptions {
marketplaceType: 'public' | 'private';
capability?: string;
minRating?: number;
page?: number;
pageSize?: number;
}
const agents = await marketplace.listAgents({
marketplaceType: 'public',
capability: 'sales',
minRating: 4.0,
page: 1,
pageSize: 50
});
agents.items.forEach(agent => {
console.log(`${agent.name} - Rating: ${agent.avgRating}`);
});Publishing Components
// Publish an agent
interface PublishAgentOptions {
agentId: string;
marketplaceType: 'public' | 'private';
name: string;
description: string;
capabilities: string[];
maturityLevel: 'student' | 'intern' | 'supervised' | 'autonomous';
pricingTier: 'free' | 'solo' | 'team' | 'enterprise';
tags: string[];
documentationUrl?: string;
}
const result = await marketplace.publishAgent({
agentId: 'my-sales-agent',
marketplaceType: 'public',
name: 'My Sales Agent',
description: 'Automated sales outreach and follow-up',
capabilities: ['sales', 'crm', 'outreach'],
maturityLevel: 'supervised',
pricingTier: 'team',
tags: ['sales', 'automation'],
documentationUrl: 'https://docs.example.com'
});
console.log(`Published: ${result.componentId}`);
console.log(`Status: ${result.status}`);Installing Components
// Install an agent
interface InstallAgentOptions {
agentId: string;
sourceTenantId: string;
targetTenantId: string;
installConfig?: {
apiKeys?: Record<string, string>;
settings?: Record<string, any>;
};
}
const result = await marketplace.installAgent({
agentId: 'sales-automation-agent',
sourceTenantId: 'publisher-tenant-id',
targetTenantId: 'your-tenant-id',
installConfig: {
apiKeys: {
salesforce: process.env.SALESFORCE_API_KEY!
},
settings: {
autoApprove: false,
maxDailyTasks: 100
}
}
});
console.log(`Installed: ${result.installationId}`);Error Handling
import {
MarketplaceClient,
RateLimitError,
ComponentNotFoundError,
InsufficientPermissionsError,
MarketplaceError
} from '@atom-saas/marketplace-sdk';
try {
const result = await marketplace.installAgent(...);
} catch (error) {
if (error instanceof RateLimitError) {
console.log(`Rate limited. Retry after ${error.retryAfter}s`);
await new Promise(resolve => setTimeout(resolve, error.retryAfter * 1000));
} else if (error instanceof ComponentNotFoundError) {
console.log(`Agent not found: ${error.componentId}`);
} else if (error instanceof InsufficientPermissionsError) {
console.log(`Permission denied: ${error.requiredPermission}`);
} else if (error instanceof MarketplaceError) {
console.log(`Marketplace error: ${error.code} - ${error.message}`);
// Log error for debugging
console.error('Marketplace error details:', error.details);
}
}React Integration
import React, { useState, useEffect } from 'react';
import { MarketplaceClient } from '@atom-saas/marketplace-sdk';
const client = new MarketplaceClient({
baseUrl: process.env.NEXT_PUBLIC_ATOM_BASE_URL!,
tenantId: process.env.NEXT_PUBLIC_ATOM_TENANT_ID!,
apiToken: process.env.ATOM_API_TOKEN!
});
function AgentMarketplace() {
const [agents, setAgents] = useState([]);
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
useEffect(() => {
async function loadAgents() {
try {
const response = await client.listAgents({
marketplaceType: 'public',
capability: 'sales'
});
setAgents(response.items);
} catch (err) {
setError(err instanceof Error ? err.message : 'Unknown error');
} finally {
setLoading(false);
}
}
loadAgents();
}, []);
if (loading) return <div>Loading...</div>;
if (error) return <div>Error: {error}</div>;
return (
<ul>
{agents.map(agent => (
<li key={agent.id}>
<h3>{agent.name}</h3>
<p>Rating: {agent.avgRating}</p>
<p>{agent.description}</p>
</li>
))}
</ul>
);
}
export default AgentMarketplace;Next.js Integration
// pages/api/agents.ts
import type { NextApiRequest, NextApiResponse } from 'next';
import { MarketplaceClient } from '@atom-saas/marketplace-sdk';
const client = new MarketplaceClient({
baseUrl: process.env.ATOM_BASE_URL!,
tenantId: process.env.ATOM_TENANT_ID!,
apiToken: process.env.ATOM_API_TOKEN!
});
export default async function handler(
req: NextApiRequest,
res: NextApiResponse
) {
if (req.method !== 'GET') {
return res.status(405).json({ error: 'Method not allowed' });
}
try {
const agents = await client.listAgents({
marketplaceType: 'public',
capability: req.query.capability as string
});
res.status(200).json(agents);
} catch (error) {
res.status(500).json({
error: error instanceof Error ? error.message : 'Unknown error'
});
}
}---
Go Integration
Installation
go get github.com/atom-saas/go-marketplace
go get github.com/atom-saas/go-federationBasic Setup
package main
import (
"os"
"github.com/atom-saas/go-marketplace"
)
func main() {
client := marketplace.NewClient(marketplace.Config{
BaseURL: getEnv("ATOM_BASE_URL", "https://atom-saas.fly.dev/api"),
TenantID: os.Getenv("ATOM_TENANT_ID"),
APIToken: os.Getenv("ATOM_API_TOKEN"),
})
// Use client...
}
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}Listing Agents
agents, err := client.ListAgents(marketplace.ListAgentsOptions{
MarketplaceType: "public",
Capability: "sales",
MinRating: 4.0,
Page: 1,
PageSize: 50,
})
if err != nil {
log.Fatalf("Failed to list agents: %v", err)
}
for _, agent := range agents.Items {
fmt.Printf("%s - Rating: %.1f\n", agent.Name, agent.AvgRating)
}Publishing Components
result, err := client.PublishAgent(marketplace.PublishAgentOptions{
AgentID: "my-sales-agent",
MarketplaceType: "public",
Name: "My Sales Agent",
Description: "Automated sales outreach and follow-up",
Capabilities: []string{"sales", "crm", "outreach"},
MaturityLevel: "supervised",
PricingTier: "team",
Tags: []string{"sales", "automation"},
DocumentationURL: "https://docs.example.com",
})
if err != nil {
log.Fatalf("Failed to publish agent: %v", err)
}
fmt.Printf("Published: %s\n", result.ComponentID)
fmt.Printf("Status: %s\n", result.Status)Error Handling
result, err := client.InstallAgent(marketplace.InstallAgentOptions{
AgentID: "sales-automation-agent",
SourceTenantID: "publisher-tenant-id",
TargetTenantID: "your-tenant-id",
})
if err != nil {
switch e := err.(type) {
case *marketplace.RateLimitError:
log.Printf("Rate limited. Retry after %d seconds", e.RetryAfter)
time.Sleep(time.Duration(e.RetryAfter) * time.Second)
case *marketplace.ComponentNotFoundError:
log.Printf("Agent not found: %s", e.ComponentID)
case *marketplace.InsufficientPermissionsError:
log.Printf("Permission denied: %s", e.RequiredPermission)
default:
log.Printf("Marketplace error: %v", err)
}
}---
cURL Examples
Authentication
# List agents with API token
curl -X GET "https://atom-saas.fly.dev/api/agent-marketplace/agents" \
-H "X-Tenant-ID: your-tenant-id" \
-H "Authorization: Bearer your-api-token" \
-G \
--data-urlencode "marketplace_type=public"Listing Components
# List agents
curl -X GET "https://atom-saas.fly.dev/api/agent-marketplace/agents" \
-H "X-Tenant-ID: your-tenant-id" \
-H "Authorization: Bearer your-api-token" \
-G \
--data-urlencode "marketplace_type=public" \
--data-urlencode "capability=sales" \
--data-urlencode "min_rating=4.0"
# List skills
curl -X GET "https://atom-saas.fly.dev/api/skill-marketplace/skills" \
-H "X-Tenant-ID: your-tenant-id" \
-H "Authorization: Bearer your-api-token" \
-G \
--data-urlencode "marketplace_type=public"
# List canvases
curl -X GET "https://atom-saas.fly.dev/api/canvas-marketplace/components" \
-H "X-Tenant-ID: your-tenant-id" \
-H "Authorization: Bearer your-api-token"Publishing Components
# Publish agent
curl -X POST "https://atom-saas.fly.dev/api/agent-marketplace/publish" \
-H "X-Tenant-ID: your-tenant-id" \
-H "Authorization: Bearer your-api-token" \
-H "Content-Type: application/json" \
-d '{
"agent_id": "my-sales-agent",
"marketplace_type": "public",
"name": "My Sales Agent",
"description": "Automated sales outreach",
"capabilities": ["sales", "crm"],
"maturity_level": "supervised",
"pricing_tier": "team",
"tags": ["sales", "automation"]
}'Installing Components
# Install agent
curl -X POST "https://atom-saas.fly.dev/api/agent-marketplace/install" \
-H "X-Tenant-ID: your-tenant-id" \
-H "Authorization: Bearer your-api-token" \
-H "Content-Type: application/json" \
-d '{
"agent_id": "sales-automation-agent",
"source_tenant_id": "publisher-tenant-id",
"target_tenant_id": "your-tenant-id"
}'Federation
# Discover agents via federation
curl -X GET "https://atom-saas.fly.dev/api/federation/v1/marketplace/agents" \
-H "X-Federation-Key: your-federation-key" \
-H "X-Federation-Source: https://your-instance.fly.dev"
# Download agent bundle
curl -X GET "https://atom-saas.fly.dev/api/federation/v1/marketplace/agents/{agent_id}/bundle" \
-H "X-Federation-Key: your-federation-key" \
-H "X-Federation-Source: https://your-instance.fly.dev" \
-o agent-bundle.jsonWebhooks
# Create webhook
curl -X POST "https://atom-saas.fly.dev/api/webhooks" \
-H "X-Tenant-ID: your-tenant-id" \
-H "Authorization: Bearer your-api-token" \
-H "Content-Type: application/json" \
-d '{
"url": "https://your-app.com/webhooks/atom",
"events": ["component.installed", "component.updated"],
"secret": "your-webhook-secret"
}'---
Complete Workflows
Workflow 1: Publish and Install Agent
# Step 1: Publish agent
result = marketplace.publish_agent(
agent_id="my_sales_agent",
marketplace_type="public",
name="My Sales Agent",
description="Automated sales outreach",
capabilities=["sales"],
maturity_level="supervised",
pricing_tier="team",
tags=["sales"]
)
print(f"Published: {result['component_id']}")
print(f"Status: {result['status']}") # pending_approval
# Step 2: Wait for approval (poll status)
while True:
status = marketplace.get_publication_status(result['component_id'])
if status['status'] == 'published':
break
time.sleep(60)
# Step 3: Another tenant installs agent
result = marketplace.install_agent(
agent_id="my_sales_agent",
source_tenant_id="publisher-tenant-id",
target_tenant_id="your-tenant-id",
install_config={
"api_keys": {"salesforce": "your-key"},
"settings": {"auto_approve": False}
}
)
print(f"Installed: {result['installation_id']}")Workflow 2: Multi-Instance Federation
# Instance A: Publish agent
marketplace_a = MarketplaceClient(tenant_id="tenant-a", ...)
marketplace_a.publish_agent(agent_id="shared_agent", ...)
# Instance B: Discover agents from Instance A
federation_b = FederationClient(...)
agents = federation_b.discover_agents()
# Instance B: Download and install agent
for agent in agents:
if agent['agent_id'] == 'shared_agent':
bundle = federation_b.download_agent_bundle(agent['agent_id'])
# Install from bundle
marketplace_b.install_from_bundle(
bundle=bundle,
source_instance=agent['source_instance']
)Workflow 3: Event-Driven Integration
from flask import Flask, request, jsonify
from atom_marketplace import MarketplaceClient
app = Flask(__name__)
client = MarketplaceClient(...)
@app.route('/webhooks/atom', methods=['POST'])
def handle_webhook():
# Verify signature
payload = request.get_data()
signature = request.headers.get('X-Webhook-Signature')
if not verify_webhook(payload, signature, WEBHOOK_SECRET):
return jsonify({'error': 'Invalid signature'}), 401
# Handle event
event = request.json
if event['type'] == 'component.installed':
# Trigger custom workflow
on_component_installed(event)
return jsonify({'status': 'processed'}), 200
def on_component_installed(event):
# Custom business logic
agent_id = event['component_id']
tenant_id = event['tenant_id']
# Send notification
send_notification(f"Agent {agent_id} installed by {tenant_id}")
# Update internal systems
update_inventory(agent_id, tenant_id)
if __name__ == '__main__':
app.run(port=8080)---
Testing Integrations
Unit Testing
import pytest
from unittest.mock import Mock, patch
from atom_marketplace import MarketplaceClient
@pytest.fixture
def client():
return MarketplaceClient(
base_url="https://atom-saas.fly.dev/api",
tenant_id="test-tenant",
api_token="test-token"
)
@patch('requests.Session.get')
def test_list_agents(mock_get, client):
# Mock response
mock_get.return_value.json.return_value = {
"items": [
{"id": "agent-1", "name": "Agent 1"},
{"id": "agent-2", "name": "Agent 2"}
],
"total": 2,
"page": 1
}
# Test
agents = client.list_agents()
assert len(agents.items) == 2
assert agents.items[0]["id"] == "agent-1"Integration Testing
import pytest
from atom_marketplace import MarketplaceClient
@pytest.fixture
def client():
return MarketplaceClient(
base_url="https://atom-saas.fly.dev/api/test",
tenant_id="test-tenant-id",
api_token="test-api-token"
)
def test_list_agents_integration(client):
"""Test against test environment"""
agents = client.list_agents(marketplace_type="public")
assert len(agents.items) > 0
assert agents.items[0]["id"] is not None
def test_install_agent_integration(client):
"""Test installation workflow"""
result = client.install_agent(
agent_id="test-agent",
target_tenant_id="test-tenant-id"
)
assert result["status"] == "installed"
assert "installation_id" in resultEnd-to-End Testing
def test_publish_to_install_workflow():
"""Test complete publish → install workflow"""
# Step 1: Publish to private marketplace (no approval needed)
result = client.publish_agent(
agent_id="test-agent",
marketplace_type="private",
name="Test Agent",
description="Test agent for E2E",
capabilities=["test"],
maturity_level="student",
pricing_tier="free",
tags=["test"]
)
assert result["status"] == "published"
# Step 2: List agents
agents = client.list_agents(marketplace_type="private")
test_agent = next(a for a in agents.items if a["id"] == "test-agent")
assert test_agent is not None
# Step 3: Install agent
result = client.install_agent(
agent_id="test-agent",
target_tenant_id="test-tenant-id"
)
assert result["status"] == "installed"
# Step 4: Verify installation
installations = client.list_installations()
test_installation = next(i for i in installations if i["agent_id"] == "test-agent")
assert test_installation is not None---
Deployment Considerations
Environment Variables
# Required
ATOM_BASE_URL=https://atom-saas.fly.dev/api
ATOM_TENANT_ID=your-tenant-id
ATOM_API_TOKEN=your-api-token
# Optional (federation)
ATOM_FEDERATION_URL=https://atom-saas.fly.dev/api/federation
ATOM_FEDERATION_KEY=your-federation-key
ATOM_INSTANCE_URL=https://your-instance.fly.dev
# Optional (webhooks)
WEBHOOK_SECRET=your-webhook-secret
WEBHOOK_URL=https://your-app.com/webhooks/atomDocker Deployment
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Set environment variables
ENV ATOM_BASE_URL=https://atom-saas.fly.dev/api
# Run application
CMD ["python", "app.py"]Kubernetes Deployment
apiVersion: v1
kind: ConfigMap
metadata:
name: atom-config
data:
ATOM_BASE_URL: "https://atom-saas.fly.dev/api"
ATOM_TENANT_ID: "your-tenant-id"
---
apiVersion: v1
kind: Secret
metadata:
name: atom-secrets
type: Opaque
stringData:
ATOM_API_TOKEN: "your-api-token"
ATOM_FEDERATION_KEY: "your-federation-key"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: atom-integration
spec:
replicas: 3
selector:
matchLabels:
app: atom-integration
template:
metadata:
labels:
app: atom-integration
spec:
containers:
- name: app
image: your-registry/atom-integration:latest
envFrom:
- configMapRef:
name: atom-config
- secretRef:
name: atom-secrets
ports:
- containerPort: 8080Monitoring
# Add metrics collection
from prometheus_client import Counter, Histogram
# Metrics
request_counter = Counter('atom_requests_total', 'Total ATOM requests')
request_duration = Histogram('atom_request_duration_seconds', 'Request duration')
# Use in client
request_counter.inc()
with request_duration.time():
agents = client.list_agents()---
Performance Optimization
Caching
from functools import lru_cache
import time
@lru_cache(maxsize=1000)
def get_agent_with_cache(agent_id: str, cache_age: int = 3600):
"""Cache agent details for 1 hour"""
return client.get_agent(agent_id)
# Clear cache periodically
def clear_cache_periodically():
while True:
time.sleep(3600)
get_agent_with_cache.cache_clear()Connection Pooling
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
# Mount adapter with connection pooling
adapter = HTTPAdapter(
pool_connections=100,
pool_maxsize=100,
max_retries=retry_strategy
)
session.mount("https://", adapter)
session.mount("http://", adapter)Batch Operations
# Instead of multiple individual calls
for agent_id in agent_ids:
client.install_agent(agent_id, ...)
# Use bulk operation
client.bulk_install_agents(agent_ids, ...)---
Security Best Practices
Credential Management
# Use environment variables (never hardcode)
import os
api_token = os.getenv("ATOM_API_TOKEN")
if not api_token:
raise ValueError("ATOM_API_TOKEN not set")
# Use secret management in production
# AWS Secrets Manager, Azure Key Vault, etc.Webhook Signature Verification
import hmac
import hashlib
def verify_webhook(payload: str, signature: str, secret: str) -> bool:
"""Verify webhook signature"""
expected_signature = hmac.new(
secret.encode(),
payload.encode(),
hashlib.sha256
).hexdigest()
# Use constant-time comparison
return hmac.compare_digest(expected_signature, signature)Rate Limiting
from ratelimit import limits, sleep_and_retry
@sleep_and_retry
@limits(calls=100, period=60) # 100 calls per minute
def rate_limited_api_call():
return client.list_agents()Input Validation
def validate_agent_id(agent_id: str) -> bool:
"""Validate agent ID format"""
import re
if not re.match(r'^[a-zA-Z0-9_-]+$', agent_id):
raise ValueError("Invalid agent ID format")
if len(agent_id) > 100:
raise ValueError("Agent ID too long")
return True---
Canvas and Episode Integration
Recording Canvas Executions
From Plan 276-02, canvas executions are automatically recorded as episodes with full tenant context.
from core.canvas_skill_integration import record_canvas_execution
async def execute_canvas_with_episode(
tenant_id: str,
canvas_id: str,
execution_data: dict
):
"""Execute canvas and record episode with tenant isolation."""
# Execute canvas
result = await canvas_executor.execute(
tenant_id=tenant_id,
canvas_id=canvas_id,
data=execution_data
)
# Record episode with tenant context (from Plan 276-02)
episode = await record_canvas_execution(
tenant_id=tenant_id,
canvas_id=canvas_id,
execution_data={
"result": result,
"timestamp": datetime.utcnow(),
**execution_data
}
)
return {
"result": result,
"episode_id": episode.id,
"tenant_id": tenant_id
}Testing Canvas Integration
import pytest
from tests.fixtures.tenant_fixtures import test_tenant
from tests.fixtures.canvas_factory import CanvasFactory
from core.canvas_skill_integration import record_canvas_execution
@pytest.mark.integration
def test_canvas_episode_recording(test_tenant):
"""Test canvas execution records episode with tenant isolation."""
# Create canvas with tenant
canvas = CanvasFactory.create(
_session=db,
tenant_id=test_tenant.id,
name="Test Canvas"
)
# Execute and record episode
episode = await record_canvas_execution(
tenant_id=test_tenant.id,
canvas_id=canvas.id,
execution_data={"action": "test"}
)
# Verify tenant isolation
assert episode.canvas_id == canvas.id
assert episode.tenant_id == test_tenant.id
# Verify episode only visible to this tenant
episodes = db.query(Episode).filter(
Episode.tenant_id == test_tenant.id
).all()
assert episode.id in [e.id for e in episodes]Episode Feedback Integration
from core.episode_service import EpisodeService
async def submit_episode_feedback(
tenant_id: str,
episode_id: str,
feedback_score: float,
feedback_text: str,
category: str
):
"""Submit feedback for episode with tenant isolation."""
# Verify episode belongs to tenant
episode = db.query(Episode).filter(
Episode.tenant_id == tenant_id,
Episode.id == episode_id
).first()
if not episode:
raise ValueError(f"Episode {episode_id} not found for tenant {tenant_id}")
# Submit feedback
service = EpisodeService(db)
feedback = await service.submit_feedback(
episode_id=episode_id,
score=feedback_score,
feedback=feedback_text,
category=category
)
return {
"feedback_id": feedback.id,
"episode_id": episode_id,
"tenant_id": tenant_id
}Canvas-Aware World Model
From Plan 276-02, the World Model now integrates canvas context:
from core.agent_world_model import WorldModelService
async def recall_canvas_experiences(
tenant_id: str,
agent_id: str,
canvas_id: str
):
"""Recall experiences from canvas executions with tenant isolation."""
world_model = WorldModelService(db)
# Recall canvas-aware experiences
experiences = await world_model.recall_canvas_experiences(
tenant_id=tenant_id,
agent_id=agent_id,
canvas_id=canvas_id,
limit=5
)
return {
"experiences": experiences,
"tenant_id": tenant_id,
"canvas_id": canvas_id
}Multi-Tenant Canvas Testing
@pytest.mark.integration
def test_canvas_tenant_isolation(test_tenant, pro_tenant):
"""Verify canvas executions are isolated between tenants."""
# Create canvases in different tenants
canvas1 = CanvasFactory.create(
_session=db,
tenant_id=test_tenant.id,
name="Canvas 1"
)
canvas2 = CanvasFactory.create(
_session=db,
tenant_id=pro_tenant.id,
name="Canvas 2"
)
# Execute both canvases
episode1 = await record_canvas_execution(
tenant_id=test_tenant.id,
canvas_id=canvas1.id,
execution_data={"action": "test"}
)
episode2 = await record_canvas_execution(
tenant_id=pro_tenant.id,
canvas_id=canvas2.id,
execution_data={"action": "test"}
)
# Verify isolation
assert episode1.tenant_id == test_tenant.id
assert episode2.tenant_id == pro_tenant.id
# Query episodes for tenant 1
tenant1_episodes = db.query(Episode).filter(
Episode.tenant_id == test_tenant.id
).all()
# Verify episode2 not in tenant1's episodes
assert episode2.id not in [e.id for e in tenant1_episodes]---
Additional Resources
- **Multi-Tenant Patterns:**
/docs/MULTI_TENANT_PATTERNS.md- Comprehensive tenant isolation patterns - **Developer Guide:**
/docs/DEVELOPER_GUIDE.md - **API Documentation:**
/docs/api/overview.md - **Marketplace API:**
/docs/api/marketplace-api.md - **Federation Protocol:**
/docs/FEDERATION_PROTOCOL.md - **Canvas Integration:**
/docs/CANVAS_SKILL_INTEGRATION.md - **Support:** support@atom-saas.com
- **Community:** https://community.atom-saas.com
---
Changelog
v13.1 (2026-04-10)
- Added Canvas and Episode integration section
- Canvas execution recording with tenant isolation
- Episode feedback integration patterns
- Canvas-aware World Model integration
- Multi-tenant canvas testing examples
- Patterns from Plan 276-02 (integration service fixes)
v13.0 (2026-04-05)
- Initial integration guide release
- Python, TypeScript, Go integration examples
- Complete workflow examples
- Testing strategies
- Deployment considerations
- Security best practices