home / skills / pluginagentmarketplace / custom-plugin-mongodb / mongodb-aggregation

mongodb-aggregation skill

/skills/mongodb-aggregation

This skill helps you master MongoDB aggregation pipelines to transform, analyze, and report data with clear, optimized pipelines.

npx playbooks add skill pluginagentmarketplace/custom-plugin-mongodb --skill mongodb-aggregation

Review the files below or copy the command above to add this skill to your agents.

Files (4)
SKILL.md
5.4 KB
---
name: mongodb-aggregation-pipeline
version: "2.1.0"
description: Master MongoDB aggregation pipeline for complex data transformations. Learn pipeline stages, grouping, filtering, and data transformation. Use when analyzing data, creating reports, or transforming documents.
sasmp_version: "1.3.0"
bonded_agent: 02-mongodb-queries-aggregation
bond_type: PRIMARY_BOND

# Production-Grade Skill Configuration
capabilities:
  - pipeline-construction
  - stage-optimization
  - data-transformation
  - analytics-queries
  - report-generation

input_validation:
  required_context:
    - collection_name
    - desired_output
  optional_context:
    - sample_documents
    - performance_requirements
    - index_availability

output_format:
  pipeline: array
  explanation: string
  performance_notes: array
  alternatives: array

error_handling:
  common_errors:
    - code: AGG001
      condition: "Invalid stage syntax"
      recovery: "Validate stage operators and field references"
    - code: AGG002
      condition: "Memory limit exceeded"
      recovery: "Add $allowDiskUse: true or optimize with $match early"
    - code: AGG003
      condition: "Unwind on null array"
      recovery: "Use preserveNullAndEmptyArrays: true"

prerequisites:
  mongodb_version: "4.4+"
  required_knowledge:
    - basic-crud
    - query-operators
  recommended_indexes:
    - "Fields used in $match stages"

testing:
  unit_test_template: |
    // Test aggregation pipeline
    const result = await collection.aggregate(pipeline).toArray()
    expect(result).toHaveLength(expectedCount)
    expect(result[0]).toMatchObject(expectedShape)
---

# MongoDB Aggregation Pipeline

Master powerful data transformation with aggregation pipeline.

## Quick Start

### Basic Pipeline Structure
```javascript
const result = await collection.aggregate([
  { $match: { status: 'active' } },
  { $group: { _id: '$category', count: { $sum: 1 } } },
  { $sort: { count: -1 } },
  { $limit: 10 }
]).toArray();
```

### Common Pipeline Stages

```javascript
// $match: Filter documents (like WHERE in SQL)
{ $match: { age: { $gte: 18 }, status: 'active' } }

// $group: Group documents and aggregate
{ $group: {
  _id: '$city',
  total: { $sum: '$amount' },
  average: { $avg: '$price' },
  count: { $sum: 1 }
}}

// $project: Transform fields
{ $project: {
  name: 1,
  email: 1,
  fullName: { $concat: ['$firstName', ' ', '$lastName'] },
  _id: 0
}}

// $sort: Sort results
{ $sort: { createdAt: -1 } }  // -1 for descending, 1 for ascending

// $limit and $skip: Pagination
{ $limit: 10 }
{ $skip: 20 }

// $unwind: Deconstruct arrays
{ $unwind: '$tags' }  // One document per tag

// $lookup: Join collections
{ $lookup: {
  from: 'categories',
  localField: 'categoryId',
  foreignField: '_id',
  as: 'category'
}}

// $facet: Multi-faceted search
{ $facet: {
  byCategory: [{ $group: { _id: '$category', count: { $sum: 1 } } }],
  byPrice: [{ $group: { _id: null, avg: { $avg: '$price' } } }]
}}
```

### Aggregation Functions

```javascript
// Numeric functions
{ $sum: 1 }                    // Count
{ $sum: '$amount' }            // Sum field
{ $avg: '$price' }             // Average
{ $min: '$quantity' }          // Minimum
{ $max: '$quantity' }          // Maximum

// Array functions
{ $push: '$tags' }             // Collect all values
{ $addToSet: '$category' }     // Collect unique values
{ $first: '$name' }            // First element
{ $last: '$name' }             // Last element

// String functions
{ $concat: ['$firstName', ' ', '$lastName'] }
{ $substr: ['$email', 0, 5] }
{ $toLower: '$name' }
{ $toUpper: '$name' }

// Conditional
{ $cond: [
  { $gte: ['$age', 18] },
  'Adult',
  'Minor'
]}
```

## Real-World Examples

### Sales Report by Category
```javascript
await orders.aggregate([
  { $match: { status: 'completed' } },
  { $group: {
    _id: '$category',
    totalSales: { $sum: '$amount' },
    ordersCount: { $sum: 1 },
    avgOrderValue: { $avg: '$amount' }
  }},
  { $sort: { totalSales: -1 } }
]).toArray();
```

### User Activity Summary
```javascript
await users.aggregate([
  { $match: { lastActive: { $gte: new Date(Date.now() - 30*24*60*60*1000) } } },
  { $project: {
    name: 1,
    email: 1,
    lastActive: 1,
    daysInactive: {
      $floor: {
        $divide: [
          { $subtract: [new Date(), '$lastActive'] },
          1000 * 60 * 60 * 24
        ]
      }
    }
  }},
  { $sort: { daysInactive: 1 } }
]).toArray();
```

### Join with Lookup
```javascript
await orders.aggregate([
  { $match: { status: 'pending' } },
  { $lookup: {
    from: 'customers',
    localField: 'customerId',
    foreignField: '_id',
    as: 'customer'
  }},
  { $unwind: '$customer' },
  { $project: {
    orderId: '$_id',
    customerName: '$customer.name',
    total: '$amount',
    _id: 0
  }}
]).toArray();
```

## Python Example

```python
from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017')
collection = client['db']['collection']

pipeline = [
    {'$match': {'status': 'active'}},
    {'$group': {'_id': '$category', 'count': {'$sum': 1}}},
    {'$sort': {'count': -1}}
]

results = list(collection.aggregate(pipeline))
for doc in results:
    print(doc)
```

## Performance Tips

✅ Use $match early to filter documents
✅ Avoid $project before necessary stages
✅ Use indexes on $match fields
✅ Use $limit before expensive operations
✅ Monitor aggregation performance
✅ Use $facet for parallel processing
✅ Avoid large $unwind operations

Overview

This skill teaches mastering the MongoDB aggregation pipeline for complex data transformations, reporting, and analytics. It covers core stages, aggregation operators, and real-world patterns so you can build efficient, readable pipelines in Python or JavaScript. The focus is practical: filtering, grouping, joining, transforming, and performance tuning.

How this skill works

The skill explains how to combine pipeline stages ($match, $group, $project, $sort, $limit, $unwind, $lookup, $facet) to transform document streams step by step. It demonstrates aggregation operators for numeric, array, string, and conditional logic, and shows how to express pipelines in both JavaScript and Python (PyMongo). It also provides performance guidance like placing $match early, leveraging indexes, and minimizing expensive $unwind operations.

When to use it

  • Creating reports and dashboards from MongoDB collections
  • Aggregating metrics such as totals, averages, and counts by category
  • Joining related collections without application-side joins using $lookup
  • Transforming documents (renaming, concatenating, computing fields) before output
  • Paginating, sorting, and limiting result sets efficiently

Best practices

  • Place $match as early as possible to reduce working set size
  • Use indexes on fields used by $match and $sort to avoid collection scans
  • Avoid unnecessary $project stages before grouping or joins
  • Apply $limit prior to expensive operations when appropriate
  • Use $facet to run multiple aggregations in a single pipeline when you need parallel summaries
  • Be cautious with large $unwind or unbounded arrays; consider pre-aggregation or array limits

Example use cases

  • Sales report by category: filter completed orders, group by category, compute totalSales and avgOrderValue
  • User activity summary: compute days inactive using date arithmetic and sort by recency
  • Order enrichment: $lookup customers into orders, $unwind the result, and project a flattened view
  • Top-N queries: $match recent data, $group counts, $sort by count, $limit 10
  • Multi-metrics facet: return category breakdown and overall price statistics in one query

FAQ

Can I write pipelines in Python?

Yes. Use PyMongo's collection.aggregate with a list of dict stages; results are iterable Python dicts.

How do I optimize long-running aggregations?

Filter early with $match, ensure relevant indexes, push $limit before expensive stages, and consider $facet or incremental/TTL-based pre-aggregation.