home / skills / personamanagmentlayer / pcl / grpc-expert

grpc-expert skill

/stdlib/api/grpc-expert

This skill provides expert guidance on gRPC, protobuf, and streaming patterns to design robust microservices communication.

npx playbooks add skill personamanagmentlayer/pcl --skill grpc-expert

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
9.9 KB
---
name: grpc-expert
version: 1.0.0
description: Expert-level gRPC, Protocol Buffers, microservices communication, and streaming
category: api
tags: [grpc, protobuf, microservices, rpc, streaming]
allowed-tools:
  - Read
  - Write
  - Edit
---

# gRPC Expert

Expert guidance for gRPC services, Protocol Buffers, microservices communication, and streaming patterns.

## Core Concepts

### gRPC Fundamentals
- Protocol Buffers (protobuf)
- Service definitions
- RPC patterns (unary, server streaming, client streaming, bidirectional)
- HTTP/2 transport
- Code generation
- Interceptors and middleware

### Communication Patterns
- Unary RPC (request-response)
- Server streaming RPC
- Client streaming RPC
- Bidirectional streaming RPC
- Deadline/timeout handling
- Error handling and status codes

### Production Features
- Load balancing
- Service discovery
- Health checking
- Authentication (TLS, tokens)
- Monitoring and tracing
- Retry policies

## Protocol Buffer Definition

```protobuf
syntax = "proto3";

package user.v1;

import "google/protobuf/timestamp.proto";

service UserService {
  // Unary RPC
  rpc GetUser(GetUserRequest) returns (GetUserResponse);

  // Server streaming
  rpc ListUsers(ListUsersRequest) returns (stream User);

  // Client streaming
  rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse);

  // Bidirectional streaming
  rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

message User {
  string id = 1;
  string email = 2;
  string name = 3;
  google.protobuf.Timestamp created_at = 4;
  UserRole role = 5;
}

enum UserRole {
  USER_ROLE_UNSPECIFIED = 0;
  USER_ROLE_USER = 1;
  USER_ROLE_ADMIN = 2;
}

message GetUserRequest {
  string id = 1;
}

message GetUserResponse {
  User user = 1;
}

message ListUsersRequest {
  int32 page_size = 1;
  string page_token = 2;
}

message CreateUserRequest {
  string email = 1;
  string name = 2;
}

message CreateUsersResponse {
  repeated string user_ids = 1;
  int32 created_count = 2;
}

message ChatMessage {
  string user_id = 1;
  string message = 2;
  google.protobuf.Timestamp timestamp = 3;
}
```

## Python gRPC Server

```python
import grpc
from concurrent import futures
import logging
from typing import Iterator

import user_pb2
import user_pb2_grpc

class UserService(user_pb2_grpc.UserServiceServicer):
    def __init__(self):
        self.users = {}

    def GetUser(self, request, context):
        """Unary RPC"""
        user_id = request.id

        if user_id not in self.users:
            context.abort(grpc.StatusCode.NOT_FOUND, f"User {user_id} not found")

        user = self.users[user_id]
        return user_pb2.GetUserResponse(user=user)

    def ListUsers(self, request, context):
        """Server streaming RPC"""
        page_size = request.page_size or 10

        for i, user in enumerate(self.users.values()):
            if i >= page_size:
                break
            yield user

    def CreateUsers(self, request_iterator, context):
        """Client streaming RPC"""
        created_ids = []

        for request in request_iterator:
            user_id = self._generate_id()
            user = user_pb2.User(
                id=user_id,
                email=request.email,
                name=request.name
            )
            self.users[user_id] = user
            created_ids.append(user_id)

        return user_pb2.CreateUsersResponse(
            user_ids=created_ids,
            created_count=len(created_ids)
        )

    def Chat(self, request_iterator, context):
        """Bidirectional streaming RPC"""
        for message in request_iterator:
            # Echo back with modification
            response = user_pb2.ChatMessage(
                user_id="server",
                message=f"Echo: {message.message}",
                timestamp=message.timestamp
            )
            yield response

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    user_pb2_grpc.add_UserServiceServicer_to_server(UserService(), server)

    server.add_insecure_port('[::]:50051')
    server.start()
    print("Server started on port 50051")
    server.wait_for_termination()

if __name__ == '__main__':
    logging.basicConfig()
    serve()
```

## Go gRPC Server

```go
package main

import (
    "context"
    "io"
    "log"
    "net"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    pb "example.com/user/v1"
)

type userServer struct {
    pb.UnimplementedUserServiceServer
    users map[string]*pb.User
}

func (s *userServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
    user, exists := s.users[req.Id]
    if !exists {
        return nil, status.Errorf(codes.NotFound, "user %s not found", req.Id)
    }

    return &pb.GetUserResponse{User: user}, nil
}

func (s *userServer) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
    pageSize := req.PageSize
    if pageSize == 0 {
        pageSize = 10
    }

    count := 0
    for _, user := range s.users {
        if count >= int(pageSize) {
            break
        }

        if err := stream.Send(user); err != nil {
            return err
        }
        count++
    }

    return nil
}

func (s *userServer) CreateUsers(stream pb.UserService_CreateUsersServer) error {
    var userIds []string

    for {
        req, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.CreateUsersResponse{
                UserIds:      userIds,
                CreatedCount: int32(len(userIds)),
            })
        }
        if err != nil {
            return err
        }

        userId := generateID()
        user := &pb.User{
            Id:    userId,
            Email: req.Email,
            Name:  req.Name,
        }

        s.users[userId] = user
        userIds = append(userIds, userId)
    }
}

func (s *userServer) Chat(stream pb.UserService_ChatServer) error {
    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }

        response := &pb.ChatMessage{
            UserId:    "server",
            Message:   "Echo: " + msg.Message,
            Timestamp: msg.Timestamp,
        }

        if err := stream.Send(response); err != nil {
            return err
        }
    }
}

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }

    s := grpc.NewServer()
    pb.RegisterUserServiceServer(s, &userServer{
        users: make(map[string]*pb.User),
    })

    log.Println("Server listening on :50051")
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}
```

## gRPC Client with Interceptors

```python
import grpc
import user_pb2
import user_pb2_grpc
from typing import Callable

def logging_interceptor(
    method: Callable,
    request,
    call_details: grpc.ClientCallDetails
):
    """Unary-unary interceptor for logging"""
    print(f"Calling method: {call_details.method}")
    print(f"Request: {request}")

    response = method(request, call_details)

    print(f"Response: {response}")
    return response

class UserClient:
    def __init__(self, host: str = 'localhost:50051'):
        # Create channel with interceptor
        self.channel = grpc.insecure_channel(host)
        self.channel = grpc.intercept_channel(
            self.channel,
            logging_interceptor
        )
        self.stub = user_pb2_grpc.UserServiceStub(self.channel)

    def get_user(self, user_id: str) -> user_pb2.User:
        """Call unary RPC"""
        request = user_pb2.GetUserRequest(id=user_id)

        try:
            response = self.stub.GetUser(
                request,
                timeout=5.0  # 5 second timeout
            )
            return response.user
        except grpc.RpcError as e:
            print(f"RPC failed: {e.code()} - {e.details()}")
            raise

    def list_users(self, page_size: int = 10):
        """Call server streaming RPC"""
        request = user_pb2.ListUsersRequest(page_size=page_size)

        try:
            for user in self.stub.ListUsers(request):
                yield user
        except grpc.RpcError as e:
            print(f"RPC failed: {e.code()} - {e.details()}")

    def create_users_batch(self, users: list):
        """Call client streaming RPC"""
        def request_generator():
            for user in users:
                yield user_pb2.CreateUserRequest(
                    email=user['email'],
                    name=user['name']
                )

        try:
            response = self.stub.CreateUsers(request_generator())
            return response
        except grpc.RpcError as e:
            print(f"RPC failed: {e.code()} - {e.details()}")

    def close(self):
        self.channel.close()
```

## Best Practices

### Design
- Use semantic versioning for protobuf packages
- Design backward-compatible changes
- Use proper field numbering (never reuse)
- Include metadata in messages
- Use enums with explicit UNSPECIFIED value
- Design for pagination in list operations

### Performance
- Enable HTTP/2 connection pooling
- Use streaming for large data transfers
- Implement proper timeouts
- Use compression for large payloads
- Batch operations when possible
- Monitor and tune thread pool sizes

### Production
- Implement health checks
- Use TLS for secure communication
- Add authentication/authorization
- Implement retry policies with backoff
- Monitor gRPC metrics (latency, errors)
- Use service discovery for dynamic endpoints

## Anti-Patterns

❌ Breaking backward compatibility
❌ No timeout configuration
❌ Ignoring error status codes
❌ Not handling stream cancellation
❌ Over-sized messages
❌ No health checking
❌ Missing authentication

## Resources

- gRPC: https://grpc.io/
- Protocol Buffers: https://protobuf.dev/
- gRPC-Go: https://github.com/grpc/grpc-go
- gRPC-Python: https://grpc.io/docs/languages/python/

Overview

This skill provides expert guidance on gRPC, Protocol Buffers, microservices communication, and streaming patterns. It focuses on practical design, implementation, and production hardening for TypeScript and cross-language systems. The content emphasizes common RPC patterns, schema design, streaming, and operational concerns like load balancing and observability.

How this skill works

The skill inspects RPC design and Protocol Buffer schemas, and explains service patterns: unary, server/client streaming, and bidirectional streaming. It details server and client behaviors, interceptor usage, deadline and error handling, and production features such as TLS, health checks, and retry policies. Examples show typical server and client implementations and best-practice settings for timeouts, batching, and monitoring.

When to use it

  • Designing or refactoring microservice APIs that need strong typing and low-latency comms
  • Implementing streaming workflows (large transfers, real-time chat, telemetry)
  • Adding observability, retries, and health checks to gRPC services
  • Creating cross-language gRPC clients and servers (TypeScript, Python, Go)
  • Defining protobuf schemas with backward-compatibility and versioning

Best practices

  • Use semantic versioning for protobuf packages and never reuse field numbers
  • Design for backward compatibility: optional fields, reserved tags, and explicit UNSPECIFIED enums
  • Choose the right RPC pattern: unary for simple requests, streaming for high-throughput or long-lived flows
  • Enforce deadlines, timeouts, and cancellation handling to avoid resource leaks
  • Secure production with TLS, authentication, health checks, and service discovery
  • Monitor latency, error codes, and stream metrics; implement retries with exponential backoff

Example use cases

  • User profile service with GetUser (unary), ListUsers (server streaming), and CreateUsers (client streaming)
  • Real-time chat using bidirectional streaming to exchange messages with low latency
  • Batch import pipeline using client streaming to send many records and receive a summary response
  • Observability setup: interceptors for logging, metrics, and distributed tracing
  • Production migration: evolving protobuf schemas while preserving backward compatibility

FAQ

How do I handle backward-incompatible protobuf changes?

Avoid breaking changes by adding new optional fields, reserving removed field numbers, and publishing new package versions. Use semantic versioning for protobuf packages.

When should I use streaming instead of unary RPC?

Use streaming for large datasets, continuous event flows, or low-latency bidirectional interactions. Prefer unary for simple, short-lived requests to reduce complexity.