CrewAI Guide

CrewAI lets you build multi-agent systems where specialized agents collaborate on tasks. This guide shows how to give CrewAI agents access to Catalogian tools for product catalog monitoring and analysis.

Prerequisites

  • • Python 3.10+
  • • Catalogian API key (cat_live_) — requires Brand plan or higher
  • pip install crewai openai

Creating Catalogian tool wrappers

First, create tool functions that call Catalogian's Responses API:

import os
import json
from openai import OpenAI
from crewai.tools import tool

catalogian = OpenAI(
    base_url="https://api.catalogian.com/v1",
    api_key=os.environ["CATALOGIAN_KEY"],
)

def _call_catalogian(tool_name: str, args: dict = {}) -> str:
    response = catalogian.responses.create(
        model="catalogian-1",
        input=json.dumps(args) if args else "{}",
        tools=[{"type": "function", "name": tool_name}],
        tool_choice={"type": "function", "name": tool_name},
    )
    for item in response.output:
        if hasattr(item, "content"):
            for part in item.content:
                if hasattr(part, "text"):
                    return part.text
    return "No result"

@tool("list_sources")
def list_sources() -> str:
    """List all product feed sources in Catalogian."""
    return _call_catalogian("list_sources")

@tool("get_delta")
def get_delta(source_slug: str, limit: int = 5) -> str:
    """Get recent change events for a product feed source."""
    return _call_catalogian("get_delta", {
        "sourceSlug": source_slug, "limit": limit
    })

@tool("profile_snapshot")
def profile_snapshot(source_slug: str) -> str:
    """Get field stats, null rates, and data quality metrics for a source."""
    return _call_catalogian("profile_snapshot", {
        "sourceSlug": source_slug
    })

@tool("search_snapshot")
def search_snapshot(source_slug: str, query: str) -> str:
    """Search across all fields in a product feed."""
    return _call_catalogian("search_snapshot", {
        "sourceSlug": source_slug, "query": query
    })

@tool("get_delta_rows")
def get_delta_rows(source_slug: str, delta_event_id: str, change_type: str = "changed") -> str:
    """Get row-level before/after data for a specific delta event."""
    return _call_catalogian("get_delta_rows", {
        "sourceSlug": source_slug,
        "deltaEventId": delta_event_id,
        "changeType": change_type,
    })

Defining agents

Create specialized agents for different catalog monitoring tasks:

from crewai import Agent

catalog_monitor = Agent(
    role="Product Catalog Monitor",
    goal="Monitor product feeds for significant changes and anomalies",
    backstory="""You are a product data specialist who monitors vendor feeds
    for changes. You detect price changes, new products, discontinued items,
    and data quality issues. Always call profile_snapshot before analyzing
    a new source to understand its structure.""",
    tools=[list_sources, get_delta, profile_snapshot, get_delta_rows],
    verbose=True,
)

data_analyst = Agent(
    role="Product Data Analyst",
    goal="Analyze product catalog data and provide actionable insights",
    backstory="""You analyze product feed data to find trends, anomalies,
    and opportunities. You search for specific products and compare data
    across time periods.""",
    tools=[search_snapshot, profile_snapshot, get_delta],
    verbose=True,
)

Creating the crew

from crewai import Crew, Task

# Define tasks
monitor_task = Task(
    description="""Check all sources for recent changes.
    For any source with changes in the last 24 hours:
    1. Get the delta events
    2. Summarize what changed (new products, price changes, deletions)
    3. Flag anything unusual (large deletions, many price changes)""",
    agent=catalog_monitor,
    expected_output="A summary of recent catalog changes across all sources",
)

analysis_task = Task(
    description="""Based on the monitoring report, analyze the most significant
    changes in detail. For sources with price changes, compare before/after
    values. Identify any patterns or concerns.""",
    agent=data_analyst,
    expected_output="Detailed analysis with specific examples and recommendations",
)

# Assemble and run
crew = Crew(
    agents=[catalog_monitor, data_analyst],
    tasks=[monitor_task, analysis_task],
    verbose=True,
)

result = crew.kickoff()
print(result)

Single-agent example

For simpler use cases, a single agent works well:

from crewai import Agent, Task, Crew

agent = Agent(
    role="Catalog Assistant",
    goal="Answer questions about product catalog data",
    backstory="You help users query and understand their product feeds.",
    tools=[list_sources, get_delta, profile_snapshot, search_snapshot],
)

task = Task(
    description="Find all wireless headphones in the acme-products feed "
                "and check if any had price changes this week.",
    agent=agent,
    expected_output="List of matching products with any recent price changes",
)

crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
print(result)

Rate limits: Each tool call makes one request to Catalogian. The MCP/Responses endpoint allows 30 requests per minute per key. For multi-agent crews with many tool calls, consider adding delays between tasks or using separate API keys per agent.

Tips for building effective AI agent integrations. Agent Best Practices →