home / skills / huiali / rust-skills / rust-dpdk

rust-dpdk skill

/skills/rust-dpdk

This skill helps you design and optimize Rust-based DPDK user-space networking by applying zero-copy, multi-queue, RSS, and memory-pool techniques.

npx playbooks add skill huiali/rust-skills --skill rust-dpdk

Review the files below or copy the command above to add this skill to your agents.

Files (4)
SKILL.md
7.5 KB
---
name: rust-dpdk
description: 用户态网络专家。处理 DPDK, 用户态驱动, 高性能网络, packet processing, 零拷贝, RSS 负载均衡--- # 用户态网络 (DPDK) ## 核心问题 **如何实现百万级 PPS 的高性能网络数据包处理?** 传统内核网络栈有太多上下文切换和内存拷贝开销。
---


## DPDK vs 内核网络栈

| 特性 | 内核网络栈 | DPDK |
|-----|----------|------|
| 上下文切换 | 每次包都切换 | 轮询模式,无切换 |
| 内存拷贝 | 多次拷贝 | 零拷贝 |
| 中断 | 频繁中断 | 轮询 (poll mode driver) |
| 延迟 | 较高 | 微秒级 |
| 吞吐量 | 万级 PPS | 百万级 PPS |
| CPU 利用率 | 较低但有开销 | 高但高效 |


## 核心组件

```rust
// DPDK 核心结构
struct DpdkContext {
    memory_pool: Mempool,        // 内存池
    ports: Vec<Port>,            // 网卡端口
    rx_queues: Vec<RxQueue>,     // 接收队列
    tx_queues: Vec<TxQueue>,     // 发送队列
    cpu_cores: Vec<Core>,        // CPU 核心分配
}

struct Port {
    port_id: u16,
    mac_addr: [u8; 6],
    link_speed: u32,
    max_queues: u16,
}

struct Mempool {
    name: String,
    buffer_size: usize,
    cache_size: usize,
    total_buffers: u32,
}
```


## 内存池管理

```rust
// 创建 DPDK 内存池
fn create_mempool() -> Result<Mempool, DpdkError> {
    let mempool = unsafe {
        rte_mempool_create(
            b"packet_pool\0".as_ptr() as *const c_char,
            NUM_BUFFERS as u32,        // 缓冲区数量
            BUFFER_SIZE as u16,        // 每个缓冲区大小
            CACHE_SIZE as u32,         // CPU 缓存大小
            0,                         // 私有数据大小
            Some(rte_pktmbuf_pool_init), // 初始化函数
            std::ptr::null(),          // 初始化参数
            Some(rte_pktmbuf_init),    // 对象初始化函数
            std::ptr::null(),          // 对象参数
            rte_socket_id() as i32,    // 内存所在 Socket
            0,                         // 标志位
        )
    };
    
    if mempool.is_null() {
        Err(DpdkError::MempoolCreateFailed)
    } else {
        Ok(Mempool { inner: mempool })
    }
}

// 分配缓冲区
fn alloc_mbuf(mempool: &Mempool) -> Option<*mut rte_mbuf> {
    unsafe {
        let mbuf = rte_pktmbuf_alloc(mempool.inner);
        if mbuf.is_null() {
            None
        } else {
            Some(mbuf)
        }
    }
}
```


## 零拷贝接收

```rust
// 零拷贝接收数据包
fn process_packets(
    port_id: u16,
    queue_id: u16,
    bufs: &mut [*mut rte_mbuf; MAX_BURST_SIZE],
) -> usize {
    let num_received = unsafe {
        rte_eth_rx_burst(
            port_id,
            queue_id,
            bufs.as_mut_ptr(),
            MAX_BURST_SIZE as u16,
        )
    };
    
    // 直接处理 mbuf,不需要拷贝
    for i in 0..num_received {
        let mbuf = bufs[i];
        
        // 访问数据(零拷贝)
        let data_ptr = unsafe {
            rte_pktmbuf_mtod(mbuf, *const u8)
        };
        let data_len = unsafe {
            rte_pktmbuf_pkt_len(mbuf)
        };
        
        // 处理数据包
        process_packet(data_ptr, data_len);
        
        // 释放 mbuf 回内存池
        unsafe {
            rte_pktmbuf_free(mbuf);
        }
    }
    
    num_received
}
```


## 批量发送

```rust
// 批量发送数据包
fn transmit_packets(
    port_id: u16,
    queue_id: u16,
    packets: &[Packet],
) -> usize {
    let mut mbufs: Vec<*mut rte_mbuf> = packets
        .iter()
        .map(|p| p.to_mbuf())
        .collect();
    
    let sent = unsafe {
        rte_eth_tx_burst(
            port_id,
            queue_id,
            mbufs.as_mut_ptr(),
            mbufs.len() as u16,
        )
    };
    
    // 释放未发送的 mbuf
    for i in sent..mbufs.len() {
        unsafe {
            rte_pktmbuf_free(mbufs[i]);
        }
    }
    
    sent
}
```


## RSS 负载均衡

```rust
// 配置 RSS (Receive Side Scaling)
fn configure_rss(port_id: u16) -> Result<(), DpdkError> {
    let mut port_info: rte_eth_dev_info = unsafe { std::mem::zeroed() };
    unsafe {
        rte_eth_dev_info_get(port_id, &mut port_info);
    }
    
    // 配置 RSS 哈希
    let mut rss_conf: rte_eth_rss_conf = unsafe { std::mem::zeroed() };
    rss_conf.rss_key_len = 40;
    rss_conf.rss_hf = RTE_ETH_RSS_TCP | RTE_ETH_RSS_UDP | RTE_ETH_RSS_IPV4;
    
    unsafe {
        let ret = rte_eth_dev_rss_hash_conf_update(
            port_id,
            &rss_conf,
        );
        if ret < 0 {
            return Err(DpdkError::RssConfigFailed);
        }
    }
    
    Ok(())
}

// 根据哈希值分配队列
fn get_queue_by_hash(hash: u32, num_queues: u16) -> u16 {
    // 使用简单的取模分发
    (hash % num_queues as u32) as u16
}
```


## 多队列配置

```rust
// 配置多队列
fn configure_multi_queue(port_id: u16, num_queues: u16) -> Result<(), DpdkError> {
    let mut port_conf: rte_eth_conf = unsafe { std::mem::zeroed() };
    port_conf.rxmode.split_hdr_size = 0;
    port_conf.rxmode.mq_mode = rte_eth_mq_mode::ETH_MQ_RX_RSS;
    port_conf.txmode.mq_mode = rte_eth_mq_mode::ETH_MQ_TX_NONE;
    
    // 配置 RX 队列
    let mut rx_conf: rte_eth_rxconf = unsafe { std::mem::zeroed() };
    rx_conf.rx_free_thresh = 32;
    rx_conf.rx_drop_en = 0;
    
    // 配置 TX 队列
    let mut tx_conf: rte_eth_txconf = unsafe { std::mem::zeroed() };
    tx_conf.tx_free_thresh = 32;
    
    // 分配 RX 队列
    for queue in 0..num_queues {
        unsafe {
            let ret = rte_eth_rx_queue_setup(
                port_id,
                queue,
                1024, // 队列深度
                rte_socket_id() as u32,
                &rx_conf,
                mempool.inner,
            );
            if ret < 0 {
                return Err(DpdkError::QueueSetupFailed);
            }
        }
    }
    
    // 分配 TX 队列
    for queue in 0..num_queues {
        unsafe {
            let ret = rte_eth_tx_queue_setup(
                port_id,
                queue,
                1024,
                rte_socket_id() as u32,
                &tx_conf,
            );
            if ret < 0 {
                return Err(DpdkError::QueueSetupFailed);
            }
        }
    }
    
    Ok(())
}
```


## CPU 亲和性

```rust
use std::os::raw::c_int;
use std::thread;

fn set_cpu_affinity(core_id: u32) -> Result<(), DpdkError> {
    let mut cpuset: cpu_set_t = unsafe { std::mem::zeroed() };
    
    unsafe {
        CPU_SET(core_id as usize, &mut cpuset);
        
        let ret = pthread_setaffinity_np(
            pthread_self(),
            std::mem::size_of::<cpu_set_t>(),
            &cpuset,
        );
        
        if ret != 0 {
            return Err(DpdkError::AffinitySetFailed);
        }
    }
    
    Ok(())
}

// 为每个 RX 队列分配专用核心
fn allocate_cores_for_queues(num_queues: u16) {
    for queue in 0..num_queues {
        thread::spawn(move || {
            set_cpu_affinity(queue as u32).unwrap();
            process_queue(queue);
        });
    }
}
```


## 性能优化

| 优化点 | 方法 |
|-------|------|
| 内存对齐 | 缓存行对齐 (64 字节) |
| 无锁队列 | 使用 SPSC 队列 |
| 批处理 | 批量收发减少系统调用 |
| CPU 亲和性 | 核心绑定减少上下文切换 |
| Hugepages | 2MB/1GB 大页减少 TLB miss |


## 与其他技能关联

```
rust-dpdk
    │
    ├─► rust-performance → 性能优化
    ├─► rust-embedded → no_std 环境
    └─► rust-concurrency → 并发模型
```

Overview

This skill is a Rust-focused DPDK (Data Plane Development Kit) expert for user-space high-performance networking. It helps diagnose, design, and optimize packet processing pipelines, memory pools, multi-queue/RSS setups, and CPU affinity strategies. The guidance targets zero-copy, low-latency, and high-throughput Rust deployments using DPDK bindings.

How this skill works

The skill inspects DPDK architecture choices and Rust integration patterns: mempool creation and mbuf lifecycle, zero-copy receive paths, burst transmit logic, RSS and multi-queue configuration, and CPU/core assignment. It analyzes performance hotspots and recommends concrete code-level or configuration changes (cache sizes, queue depths, hugepages, affinity). It outputs actionable steps, shell commands, and minimal Rust snippets to validate fixes and measure improvement.

When to use it

  • Building or optimizing a Rust packet-processing application with DPDK bindings
  • Diagnosing high CPU usage, poor throughput, or increased latency in user-space networking
  • Designing multi-queue and RSS configurations for NICs in multi-core systems
  • Tuning mempool, mbuf lifecycle, and zero-copy paths to reduce copies and overhead
  • Mapping RX/TX queues to cores and validating core affinity and hugepage setup

Best practices

  • Use pre-allocated mempools with appropriate buffer and cache sizes to avoid allocation stalls
  • Process packets in bursts (rx/tx burst) to amortize syscall and loop overhead
  • Prioritize zero-copy access to packet data; free mbufs promptly after processing
  • Bind RX queues to dedicated cores and pin threads with pthread_setaffinity to reduce context switches
  • Enable RSS and distribute flows by hash to balance load across queues and cores
  • Reserve hugepages and align buffers to cache line size to reduce TLB and cache overhead

Example use cases

  • Create a mempool and allocate mbufs for a 10/40/100G NIC test harness and validate no mbuf failures under load
  • Migrate a kernel-based packet pipeline to DPDK in Rust to achieve microsecond latency and million-PPS throughput
  • Configure RSS and multi-queue on a dual-socket server to evenly distribute TCP/UDP flows across worker cores
  • Diagnose dropped packets by evaluating rx/tx queue depths, rte_eth_rx_burst returns, and mempool exhaustion
  • Tune a latency-sensitive path: reduce burst sizes, adjust cache_size, and bind critical threads to isolated cores

FAQ

How do I avoid mempool exhaustion under high load?

Preallocate enough total_buffers based on peak PPS and pipeline depth, increase per-core cache_size moderately, and monitor rte_mempool_free_count. If exhaustion occurs, increase pool size or add per-lcore pools.

When should I prefer more queues vs larger queue depth?

More queues help scale across cores and RSS distribution; larger queue depth helps absorb bursts. Prefer enough queues to match worker cores, and tune per-queue depth to balance latency and burst handling.