Skip to content About The people and vision powering Probo Blog The latest news from Probo Stories Hear from our customers Docs Documentation for Probo GitHub Explore our open-source compliance tools

Pagination

List operations in the Probo MCP Server use cursor-based pagination.

  1. Call a list tool (e.g., listRisks) without a cursor
  2. Get results plus a next_cursor if more data exists
  3. Use the next_cursor to fetch the next page
  4. When next_cursor is null, you’ve reached the end
{
"organization_id": "org_xxx",
"size": 50,
"cursor": "optional_cursor"
}
  • organization_id (required): Organization to query
  • size (optional): Items per page (default: 20, max: 100)
  • cursor (optional): Cursor from previous response
{
"organization_id": "org_xxx",
"order_by": {
"field": "CREATED_AT",
"direction": "DESC"
}
}

Common fields: CREATED_AT, UPDATED_AT, NAME

{
"organization_id": "org_xxx",
"filter": {
"query": "security",
"status": "OPEN"
}
}
{
"risks": [
{
"id": "risk_abc123",
"name": "Data breach risk",
"residual_risk_score": 20
}
],
"next_cursor": "eyJpZCI6InJpc2tfNTAiLCJvcmRlciI6MTcwMDAwMDAwMH0="
}

When there are no more pages:

{
"risks": [...],
"next_cursor": null
}

Request:

{
"organization_id": "org_abc123",
"size": 50
}

Response:

{
"risks": [...],
"next_cursor": "eyJpZCI6InJpc2tfMDUwIn0"
}

Next Request:

{
"organization_id": "org_abc123",
"size": 50,
"cursor": "eyJpZCI6InJpc2tfMDUwIn0"
}

Final Response:

{
"risks": [...],
"next_cursor": null
}
def fetch_all_risks(organization_id):
"""Fetch all risks using pagination."""
all_risks = []
cursor = None
while True:
# Build request
request = {
"organization_id": organization_id,
"size": 100
}
if cursor:
request["cursor"] = cursor
# Call API
response = call_tool("listRisks", request)
# Collect results
all_risks.extend(response["risks"])
# Check if done
cursor = response.get("next_cursor")
if not cursor:
break
return all_risks

Choose appropriate page sizes based on your use case:

  • Small pages (20-50): Interactive UI, quick initial response
  • Medium pages (50-100): Balanced performance for most cases
  • Large pages (100+): Batch processing, data exports

Considerations:

  • Larger pages mean fewer requests but more memory usage
  • Smaller pages provide faster initial response times
  • Network latency affects optimal page size

Treat cursors as opaque tokens:

  • Do: Store cursors exactly as received
  • Do: Pass cursors without modification
  • Don’t: Try to decode or modify cursors
  • Don’t: Make assumptions about cursor format

Implement robust error handling:

def fetch_with_retry(organization_id, max_retries=3):
cursor = None
all_results = []
while True:
for attempt in range(max_retries):
try:
response = call_tool("listRisks", {
"organization_id": organization_id,
"cursor": cursor,
"size": 100
})
break
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # Exponential backoff
all_results.extend(response["risks"])
cursor = response.get("next_cursor")
if not cursor:
break
return all_results

Always specify order_by for predictable results:

{
"organization_id": "org_xxx",
"order_by": {
"field": "CREATED_AT",
"direction": "DESC"
}
}

Without explicit ordering, results may appear in arbitrary order.

Use filters to reduce the dataset size:

# Instead of fetching everything and filtering in code
all_risks = fetch_all_risks(org_id)
high_risks = [r for r in all_risks if r["residual_risk_score"] > 15]
# Filter server-side during pagination
high_risks = call_tool("listRisks", {
"organization_id": org_id,
"filter": {
"min_residual_risk_score": 15
}
})

Be aware of data changes during pagination:

  • New items: May or may not appear in subsequent pages
  • Updated items: May change position in sort order
  • Deleted items: Will not appear in subsequent pages

For consistent snapshots, complete pagination quickly.

For very large datasets:

# Bad: Load everything into memory
all_risks = fetch_all_risks(org_id)
process_risks(all_risks)
# Good: Process as you paginate
def process_risks_streaming(org_id):
cursor = None
while True:
response = call_tool("listRisks", {
"organization_id": org_id,
"cursor": cursor,
"size": 100
})
# Process this page immediately
for risk in response["risks"]:
process_single_risk(risk)
cursor = response.get("next_cursor")
if not cursor:
break

Avoid parallel requests with the same cursor:

# Bad: Race conditions
cursor = get_current_cursor()
results1 = fetch_page(cursor) # May invalidate cursor
results2 = fetch_page(cursor) # May fail
# Good: Sequential pagination
cursor = get_current_cursor()
results1 = fetch_page(cursor)
cursor = results1["next_cursor"]
results2 = fetch_page(cursor)

Error: 400 Bad Request - Invalid cursor

Causes:

  • Cursor has expired (TTL exceeded)
  • Cursor was modified or corrupted
  • Cursor from different organization/query

Solutions:

  1. Start pagination over from the beginning
  2. Verify cursor is passed exactly as received
  3. Complete pagination within cursor TTL

Issue: Same items appearing multiple times or missing items

Causes:

  • Data being modified during pagination
  • Inconsistent sorting order
  • Not using stable sort fields

Solutions:

  1. Use stable sort fields (e.g., id, created_at)
  2. Complete pagination quickly
  3. Use timestamps or version fields for deduplication

Issue: Out of memory errors with large datasets

Causes:

  • Loading all pages into memory
  • Page size too large
  • Processing too slow

Solutions:

  1. Process items as you paginate (streaming)
  2. Reduce page size
  3. Use filtering to reduce dataset size
  4. Implement backpressure mechanisms