home / skills / personamanagmentlayer / pcl / grpc-expert
This skill provides expert guidance on gRPC, protobuf, and streaming patterns to design robust microservices communication.
npx playbooks add skill personamanagmentlayer/pcl --skill grpc-expertReview the files below or copy the command above to add this skill to your agents.
---
name: grpc-expert
version: 1.0.0
description: Expert-level gRPC, Protocol Buffers, microservices communication, and streaming
category: api
tags: [grpc, protobuf, microservices, rpc, streaming]
allowed-tools:
- Read
- Write
- Edit
---
# gRPC Expert
Expert guidance for gRPC services, Protocol Buffers, microservices communication, and streaming patterns.
## Core Concepts
### gRPC Fundamentals
- Protocol Buffers (protobuf)
- Service definitions
- RPC patterns (unary, server streaming, client streaming, bidirectional)
- HTTP/2 transport
- Code generation
- Interceptors and middleware
### Communication Patterns
- Unary RPC (request-response)
- Server streaming RPC
- Client streaming RPC
- Bidirectional streaming RPC
- Deadline/timeout handling
- Error handling and status codes
### Production Features
- Load balancing
- Service discovery
- Health checking
- Authentication (TLS, tokens)
- Monitoring and tracing
- Retry policies
## Protocol Buffer Definition
```protobuf
syntax = "proto3";
package user.v1;
import "google/protobuf/timestamp.proto";
service UserService {
// Unary RPC
rpc GetUser(GetUserRequest) returns (GetUserResponse);
// Server streaming
rpc ListUsers(ListUsersRequest) returns (stream User);
// Client streaming
rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse);
// Bidirectional streaming
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
message User {
string id = 1;
string email = 2;
string name = 3;
google.protobuf.Timestamp created_at = 4;
UserRole role = 5;
}
enum UserRole {
USER_ROLE_UNSPECIFIED = 0;
USER_ROLE_USER = 1;
USER_ROLE_ADMIN = 2;
}
message GetUserRequest {
string id = 1;
}
message GetUserResponse {
User user = 1;
}
message ListUsersRequest {
int32 page_size = 1;
string page_token = 2;
}
message CreateUserRequest {
string email = 1;
string name = 2;
}
message CreateUsersResponse {
repeated string user_ids = 1;
int32 created_count = 2;
}
message ChatMessage {
string user_id = 1;
string message = 2;
google.protobuf.Timestamp timestamp = 3;
}
```
## Python gRPC Server
```python
import grpc
from concurrent import futures
import logging
from typing import Iterator
import user_pb2
import user_pb2_grpc
class UserService(user_pb2_grpc.UserServiceServicer):
def __init__(self):
self.users = {}
def GetUser(self, request, context):
"""Unary RPC"""
user_id = request.id
if user_id not in self.users:
context.abort(grpc.StatusCode.NOT_FOUND, f"User {user_id} not found")
user = self.users[user_id]
return user_pb2.GetUserResponse(user=user)
def ListUsers(self, request, context):
"""Server streaming RPC"""
page_size = request.page_size or 10
for i, user in enumerate(self.users.values()):
if i >= page_size:
break
yield user
def CreateUsers(self, request_iterator, context):
"""Client streaming RPC"""
created_ids = []
for request in request_iterator:
user_id = self._generate_id()
user = user_pb2.User(
id=user_id,
email=request.email,
name=request.name
)
self.users[user_id] = user
created_ids.append(user_id)
return user_pb2.CreateUsersResponse(
user_ids=created_ids,
created_count=len(created_ids)
)
def Chat(self, request_iterator, context):
"""Bidirectional streaming RPC"""
for message in request_iterator:
# Echo back with modification
response = user_pb2.ChatMessage(
user_id="server",
message=f"Echo: {message.message}",
timestamp=message.timestamp
)
yield response
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
user_pb2_grpc.add_UserServiceServicer_to_server(UserService(), server)
server.add_insecure_port('[::]:50051')
server.start()
print("Server started on port 50051")
server.wait_for_termination()
if __name__ == '__main__':
logging.basicConfig()
serve()
```
## Go gRPC Server
```go
package main
import (
"context"
"io"
"log"
"net"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
pb "example.com/user/v1"
)
type userServer struct {
pb.UnimplementedUserServiceServer
users map[string]*pb.User
}
func (s *userServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
user, exists := s.users[req.Id]
if !exists {
return nil, status.Errorf(codes.NotFound, "user %s not found", req.Id)
}
return &pb.GetUserResponse{User: user}, nil
}
func (s *userServer) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
pageSize := req.PageSize
if pageSize == 0 {
pageSize = 10
}
count := 0
for _, user := range s.users {
if count >= int(pageSize) {
break
}
if err := stream.Send(user); err != nil {
return err
}
count++
}
return nil
}
func (s *userServer) CreateUsers(stream pb.UserService_CreateUsersServer) error {
var userIds []string
for {
req, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.CreateUsersResponse{
UserIds: userIds,
CreatedCount: int32(len(userIds)),
})
}
if err != nil {
return err
}
userId := generateID()
user := &pb.User{
Id: userId,
Email: req.Email,
Name: req.Name,
}
s.users[userId] = user
userIds = append(userIds, userId)
}
}
func (s *userServer) Chat(stream pb.UserService_ChatServer) error {
for {
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
response := &pb.ChatMessage{
UserId: "server",
Message: "Echo: " + msg.Message,
Timestamp: msg.Timestamp,
}
if err := stream.Send(response); err != nil {
return err
}
}
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterUserServiceServer(s, &userServer{
users: make(map[string]*pb.User),
})
log.Println("Server listening on :50051")
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
```
## gRPC Client with Interceptors
```python
import grpc
import user_pb2
import user_pb2_grpc
from typing import Callable
def logging_interceptor(
method: Callable,
request,
call_details: grpc.ClientCallDetails
):
"""Unary-unary interceptor for logging"""
print(f"Calling method: {call_details.method}")
print(f"Request: {request}")
response = method(request, call_details)
print(f"Response: {response}")
return response
class UserClient:
def __init__(self, host: str = 'localhost:50051'):
# Create channel with interceptor
self.channel = grpc.insecure_channel(host)
self.channel = grpc.intercept_channel(
self.channel,
logging_interceptor
)
self.stub = user_pb2_grpc.UserServiceStub(self.channel)
def get_user(self, user_id: str) -> user_pb2.User:
"""Call unary RPC"""
request = user_pb2.GetUserRequest(id=user_id)
try:
response = self.stub.GetUser(
request,
timeout=5.0 # 5 second timeout
)
return response.user
except grpc.RpcError as e:
print(f"RPC failed: {e.code()} - {e.details()}")
raise
def list_users(self, page_size: int = 10):
"""Call server streaming RPC"""
request = user_pb2.ListUsersRequest(page_size=page_size)
try:
for user in self.stub.ListUsers(request):
yield user
except grpc.RpcError as e:
print(f"RPC failed: {e.code()} - {e.details()}")
def create_users_batch(self, users: list):
"""Call client streaming RPC"""
def request_generator():
for user in users:
yield user_pb2.CreateUserRequest(
email=user['email'],
name=user['name']
)
try:
response = self.stub.CreateUsers(request_generator())
return response
except grpc.RpcError as e:
print(f"RPC failed: {e.code()} - {e.details()}")
def close(self):
self.channel.close()
```
## Best Practices
### Design
- Use semantic versioning for protobuf packages
- Design backward-compatible changes
- Use proper field numbering (never reuse)
- Include metadata in messages
- Use enums with explicit UNSPECIFIED value
- Design for pagination in list operations
### Performance
- Enable HTTP/2 connection pooling
- Use streaming for large data transfers
- Implement proper timeouts
- Use compression for large payloads
- Batch operations when possible
- Monitor and tune thread pool sizes
### Production
- Implement health checks
- Use TLS for secure communication
- Add authentication/authorization
- Implement retry policies with backoff
- Monitor gRPC metrics (latency, errors)
- Use service discovery for dynamic endpoints
## Anti-Patterns
❌ Breaking backward compatibility
❌ No timeout configuration
❌ Ignoring error status codes
❌ Not handling stream cancellation
❌ Over-sized messages
❌ No health checking
❌ Missing authentication
## Resources
- gRPC: https://grpc.io/
- Protocol Buffers: https://protobuf.dev/
- gRPC-Go: https://github.com/grpc/grpc-go
- gRPC-Python: https://grpc.io/docs/languages/python/
This skill provides expert guidance on gRPC, Protocol Buffers, microservices communication, and streaming patterns. It focuses on practical design, implementation, and production hardening for TypeScript and cross-language systems. The content emphasizes common RPC patterns, schema design, streaming, and operational concerns like load balancing and observability.
The skill inspects RPC design and Protocol Buffer schemas, and explains service patterns: unary, server/client streaming, and bidirectional streaming. It details server and client behaviors, interceptor usage, deadline and error handling, and production features such as TLS, health checks, and retry policies. Examples show typical server and client implementations and best-practice settings for timeouts, batching, and monitoring.
How do I handle backward-incompatible protobuf changes?
Avoid breaking changes by adding new optional fields, reserving removed field numbers, and publishing new package versions. Use semantic versioning for protobuf packages.
When should I use streaming instead of unary RPC?
Use streaming for large datasets, continuous event flows, or low-latency bidirectional interactions. Prefer unary for simple, short-lived requests to reduce complexity.