home / skills / jeremylongshore / claude-code-plugins-plus-skills / langchain-enterprise-rbac

This skill enables enterprise RBAC for LangChain apps with multi-tenant isolation, fine-grained permissions, and usage quotas to enhance security.

npx playbooks add skill jeremylongshore/claude-code-plugins-plus-skills --skill langchain-enterprise-rbac

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
12.6 KB
---
name: langchain-enterprise-rbac
description: |
  Implement enterprise role-based access control for LangChain applications.
  Use when implementing user permissions, multi-tenant access,
  or enterprise security controls for LLM applications.
  Trigger with phrases like "langchain RBAC", "langchain permissions",
  "langchain access control", "langchain multi-tenant", "langchain enterprise auth".
allowed-tools: Read, Write, Edit
version: 1.0.0
license: MIT
author: Jeremy Longshore <[email protected]>
---

# LangChain Enterprise RBAC

## Overview
Implement role-based access control (RBAC) for LangChain applications with multi-tenant support and fine-grained permissions.

## Prerequisites
- LangChain application with user authentication
- Identity provider (Auth0, Okta, Azure AD)
- Understanding of RBAC concepts

## Instructions

### Step 1: Define Permission Model
```python
from enum import Enum
from typing import Set, Optional
from pydantic import BaseModel
from datetime import datetime

class Permission(str, Enum):
    # Chain permissions
    CHAIN_READ = "chain:read"
    CHAIN_EXECUTE = "chain:execute"
    CHAIN_CREATE = "chain:create"
    CHAIN_DELETE = "chain:delete"

    # Model permissions
    MODEL_GPT4 = "model:gpt-4"
    MODEL_GPT4_MINI = "model:gpt-4o-mini"
    MODEL_CLAUDE = "model:claude"

    # Feature permissions
    FEATURE_STREAMING = "feature:streaming"
    FEATURE_TOOLS = "feature:tools"
    FEATURE_RAG = "feature:rag"

    # Admin permissions
    ADMIN_USERS = "admin:users"
    ADMIN_BILLING = "admin:billing"
    ADMIN_AUDIT = "admin:audit"

class Role(BaseModel):
    name: str
    permissions: Set[Permission]
    description: str = ""

# Predefined roles
ROLES = {
    "viewer": Role(
        name="viewer",
        permissions={Permission.CHAIN_READ},
        description="Read-only access to chains"
    ),
    "user": Role(
        name="user",
        permissions={
            Permission.CHAIN_READ,
            Permission.CHAIN_EXECUTE,
            Permission.MODEL_GPT4_MINI,
        },
        description="Standard user with execution rights"
    ),
    "power_user": Role(
        name="power_user",
        permissions={
            Permission.CHAIN_READ,
            Permission.CHAIN_EXECUTE,
            Permission.CHAIN_CREATE,
            Permission.MODEL_GPT4_MINI,
            Permission.MODEL_GPT4,
            Permission.FEATURE_STREAMING,
            Permission.FEATURE_TOOLS,
        },
        description="Power user with advanced features"
    ),
    "admin": Role(
        name="admin",
        permissions=set(Permission),  # All permissions
        description="Full administrative access"
    ),
}
```

### Step 2: User and Tenant Management
```python
from typing import Dict, List
import uuid

class Tenant(BaseModel):
    id: str
    name: str
    allowed_models: List[str] = []
    monthly_token_limit: int = 1_000_000
    features: Set[str] = set()
    created_at: datetime = None

class User(BaseModel):
    id: str
    email: str
    tenant_id: str
    roles: List[str]
    created_at: datetime = None

    def get_permissions(self) -> Set[Permission]:
        """Get all permissions for user based on roles."""
        permissions = set()
        for role_name in self.roles:
            if role_name in ROLES:
                permissions.update(ROLES[role_name].permissions)
        return permissions

    def has_permission(self, permission: Permission) -> bool:
        """Check if user has specific permission."""
        return permission in self.get_permissions()

class UserStore:
    """User and tenant management."""

    def __init__(self):
        self.tenants: Dict[str, Tenant] = {}
        self.users: Dict[str, User] = {}

    def create_tenant(self, name: str, **kwargs) -> Tenant:
        tenant = Tenant(
            id=str(uuid.uuid4()),
            name=name,
            created_at=datetime.now(),
            **kwargs
        )
        self.tenants[tenant.id] = tenant
        return tenant

    def create_user(
        self,
        email: str,
        tenant_id: str,
        roles: List[str] = None
    ) -> User:
        if tenant_id not in self.tenants:
            raise ValueError(f"Tenant {tenant_id} not found")

        user = User(
            id=str(uuid.uuid4()),
            email=email,
            tenant_id=tenant_id,
            roles=roles or ["user"],
            created_at=datetime.now()
        )
        self.users[user.id] = user
        return user

    def get_user_tenant(self, user_id: str) -> Optional[Tenant]:
        user = self.users.get(user_id)
        if user:
            return self.tenants.get(user.tenant_id)
        return None
```

### Step 3: Permission Enforcement
```python
from functools import wraps
from fastapi import HTTPException, Depends, Request
from typing import Callable

class PermissionChecker:
    """Check and enforce permissions."""

    def __init__(self, user_store: UserStore):
        self.user_store = user_store

    def require_permission(self, permission: Permission):
        """Decorator to require specific permission."""
        def decorator(func: Callable):
            @wraps(func)
            async def wrapper(request: Request, *args, **kwargs):
                user_id = request.state.user_id  # Set by auth middleware
                user = self.user_store.users.get(user_id)

                if not user:
                    raise HTTPException(status_code=401, detail="User not found")

                if not user.has_permission(permission):
                    raise HTTPException(
                        status_code=403,
                        detail=f"Permission denied: {permission.value}"
                    )

                return await func(request, *args, **kwargs)
            return wrapper
        return decorator

    def require_any_permission(self, permissions: List[Permission]):
        """Require at least one of the specified permissions."""
        def decorator(func: Callable):
            @wraps(func)
            async def wrapper(request: Request, *args, **kwargs):
                user_id = request.state.user_id
                user = self.user_store.users.get(user_id)

                if not user:
                    raise HTTPException(status_code=401)

                if not any(user.has_permission(p) for p in permissions):
                    raise HTTPException(status_code=403)

                return await func(request, *args, **kwargs)
            return wrapper
        return decorator

# Usage
user_store = UserStore()
checker = PermissionChecker(user_store)

@app.post("/chains/{chain_id}/execute")
@checker.require_permission(Permission.CHAIN_EXECUTE)
async def execute_chain(request: Request, chain_id: str):
    # User has CHAIN_EXECUTE permission
    pass
```

### Step 4: Model Access Control
```python
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic

class ModelAccessController:
    """Control access to LLM models based on permissions."""

    MODEL_PERMISSIONS = {
        "gpt-4o": Permission.MODEL_GPT4,
        "gpt-4o-mini": Permission.MODEL_GPT4_MINI,
        "claude-3-5-sonnet-20241022": Permission.MODEL_CLAUDE,
    }

    def __init__(self, user_store: UserStore):
        self.user_store = user_store

    def get_allowed_models(self, user_id: str) -> List[str]:
        """Get list of models user can access."""
        user = self.user_store.users.get(user_id)
        if not user:
            return []

        permissions = user.get_permissions()
        tenant = self.user_store.get_user_tenant(user_id)

        allowed = []
        for model, permission in self.MODEL_PERMISSIONS.items():
            if permission in permissions:
                # Also check tenant restrictions
                if tenant and tenant.allowed_models:
                    if model in tenant.allowed_models:
                        allowed.append(model)
                else:
                    allowed.append(model)

        return allowed

    def create_llm(self, user_id: str, model: str = None):
        """Create LLM instance with access control."""
        allowed = self.get_allowed_models(user_id)

        if not allowed:
            raise PermissionError("No model access")

        # Use requested model or default to first allowed
        model = model or allowed[0]

        if model not in allowed:
            raise PermissionError(f"Access denied to model: {model}")

        if model.startswith("gpt"):
            return ChatOpenAI(model=model)
        elif model.startswith("claude"):
            return ChatAnthropic(model=model)
        else:
            raise ValueError(f"Unknown model: {model}")
```

### Step 5: Tenant Isolation
```python
from langchain_core.callbacks import BaseCallbackHandler
from contextvars import ContextVar

# Context variable for current tenant
current_tenant: ContextVar[str] = ContextVar("current_tenant")

class TenantIsolationMiddleware:
    """Middleware to enforce tenant isolation."""

    def __init__(self, user_store: UserStore):
        self.user_store = user_store

    async def __call__(self, request: Request, call_next):
        user_id = request.state.user_id
        user = self.user_store.users.get(user_id)

        if user:
            # Set tenant context
            token = current_tenant.set(user.tenant_id)
            try:
                response = await call_next(request)
            finally:
                current_tenant.reset(token)
            return response

        return await call_next(request)

class TenantAwareCallback(BaseCallbackHandler):
    """Tag all LLM calls with tenant ID."""

    def on_llm_start(self, serialized, prompts, **kwargs):
        tenant_id = current_tenant.get(None)
        if tenant_id:
            # Add tenant to metadata for billing/logging
            kwargs.setdefault("metadata", {})["tenant_id"] = tenant_id

# Tenant-scoped data access
class TenantScopedVectorStore:
    """Vector store with tenant isolation."""

    def __init__(self, base_store):
        self.base_store = base_store

    def similarity_search(self, query: str, **kwargs):
        tenant_id = current_tenant.get()
        if not tenant_id:
            raise ValueError("Tenant context required")

        # Add tenant filter
        kwargs["filter"] = kwargs.get("filter", {})
        kwargs["filter"]["tenant_id"] = tenant_id

        return self.base_store.similarity_search(query, **kwargs)
```

### Step 6: Usage Quotas
```python
from datetime import datetime, timedelta
from collections import defaultdict

class UsageQuotaManager:
    """Manage usage quotas per user and tenant."""

    def __init__(self, user_store: UserStore):
        self.user_store = user_store
        self.usage = defaultdict(lambda: {"tokens": 0, "requests": 0})
        self.reset_time = {}

    def check_quota(self, user_id: str, tokens: int = 0) -> bool:
        """Check if user has available quota."""
        user = self.user_store.users.get(user_id)
        tenant = self.user_store.get_user_tenant(user_id)

        if not tenant:
            return False

        # Reset monthly quota if needed
        self._maybe_reset(tenant.id)

        current = self.usage[tenant.id]["tokens"]
        return (current + tokens) <= tenant.monthly_token_limit

    def record_usage(self, user_id: str, tokens: int) -> None:
        """Record token usage."""
        tenant = self.user_store.get_user_tenant(user_id)
        if tenant:
            self.usage[tenant.id]["tokens"] += tokens
            self.usage[tenant.id]["requests"] += 1

    def _maybe_reset(self, tenant_id: str) -> None:
        """Reset quota at start of month."""
        now = datetime.now()
        last_reset = self.reset_time.get(tenant_id)

        if not last_reset or last_reset.month != now.month:
            self.usage[tenant_id] = {"tokens": 0, "requests": 0}
            self.reset_time[tenant_id] = now

    def get_usage_report(self, tenant_id: str) -> dict:
        """Get usage report for tenant."""
        tenant = self.user_store.tenants.get(tenant_id)
        usage = self.usage[tenant_id]

        return {
            "tenant_id": tenant_id,
            "tokens_used": usage["tokens"],
            "tokens_limit": tenant.monthly_token_limit if tenant else 0,
            "requests": usage["requests"],
            "percentage_used": (usage["tokens"] / tenant.monthly_token_limit * 100) if tenant else 0
        }
```

## Output
- Permission model with roles
- User and tenant management
- Model access control
- Tenant isolation
- Usage quotas

## Resources
- [RBAC Best Practices](https://auth0.com/docs/manage-users/access-control/rbac)
- [Multi-Tenant Architecture](https://docs.microsoft.com/en-us/azure/architecture/guide/multitenant/)
- [OAuth 2.0 Scopes](https://oauth.net/2/scope/)

## Next Steps
Use `langchain-data-handling` for data privacy controls.

Overview

This skill implements enterprise role-based access control (RBAC) for LangChain applications, including multi-tenant isolation, model-level permissions, and usage quotas. It provides a concrete permission model, tenant and user management, middleware for tenant context, and hooks to enforce model and feature access. The goal is to enable secure, auditable, and scalable access control for LLM-driven systems.

How this skill works

Define a Permission enum and Role objects that map human-friendly roles to fine-grained capabilities (chains, models, features, admin). Store tenants and users with role assignments, then derive effective permissions from roles. Use decorators and middleware to enforce permissions on API routes, control which LLM models a user can instantiate, and scope vector store queries and callbacks to the current tenant. Track and enforce monthly token quotas per tenant and surface usage reports.

When to use it

  • When you need multi-tenant isolation for a LangChain application
  • When model access must be restricted by role or tenant policy
  • When you require feature-level gating (streaming, tools, RAG)
  • When you must audit or bill usage per tenant
  • When integrating enterprise identity providers (Auth0, Okta, Azure AD)

Best practices

  • Map OAuth/OIDC scopes or IdP groups to roles rather than trusting client-supplied roles
  • Keep role permissions minimal and use power_user/admin only when necessary
  • Enforce tenant context at middleware level so every LLM call and store access is tagged
  • Combine permission checks with quota checks to avoid unexpected overages
  • Log permission denials and model access attempts for audit and alerting

Example use cases

  • Restrict gpt-4 access to approved tenants and users while allowing gpt-4o-mini to everyone
  • Expose a chains execution endpoint that only users with CHAIN_EXECUTE can call
  • Tag all LLM calls with tenant_id for billing and telemetry using a TenantAwareCallback
  • Filter vector store similarity searches by tenant_id to prevent cross-tenant data leakage
  • Automatically reset and report monthly token usage and deny requests that exceed quotas

FAQ

How do I integrate this with my identity provider?

Map IdP groups or claims to the role names used by the user store at authentication time and set request.state.user_id from the validated token.

Can I add custom permissions or roles?

Yes. Extend the Permission enum and add Role entries to the ROLES dictionary, then assign them to users or tenants as needed.