home / skills / tencentblueking / bk-ci / agent-module-architecture

agent-module-architecture skill

/ai/skills/agent-module-architecture

This skill orchestrates agent startup, heartbeat, task execution, upgrades, and worker interaction for BK-CI build machines.

npx playbooks add skill tencentblueking/bk-ci --skill agent-module-architecture

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
20.9 KB
---
name: agent-module-architecture
description: Agent 构建机模块架构指南(Go 语言),涵盖 Agent 启动流程、心跳机制、任务领取执行、升级更新、与 Dispatch 交互。当用户开发 Agent 功能、修改心跳逻辑、处理任务执行或实现 Agent 升级时使用。
---

# Agent 构建机模块架构指南

> **模块定位**: Agent 是 BK-CI 的构建机核心组件,由 Go 语言编写,负责与后端服务通信、接收构建任务、拉起 Worker 进程执行构建。

## 一、模块概述

### 1.1 核心职责

| 职责 | 说明 |
|------|------|
| **进程管理** | Daemon 守护 Agent 进程,确保持续运行 |
| **任务调度** | 从 Dispatch 服务拉取构建任务并执行 |
| **Worker 管理** | 拉起 Worker(Kotlin JAR)执行实际构建逻辑 |
| **心跳上报** | 定期向后端上报 Agent 状态和环境信息 |
| **自动升级** | 检测并自动升级 Agent、Worker、JDK |
| **数据采集** | 通过 Telegraf 采集构建机指标数据 |
| **Docker 构建** | 支持 Docker 容器化构建(Linux) |

### 1.2 与 Worker 的关系

```
┌─────────────────────────────────────────────────────────────┐
│                    构建机 (Build Machine)                    │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────┐     守护      ┌─────────┐                      │
│  │ Daemon  │ ───────────▶ │  Agent  │                      │
│  │  (Go)   │              │  (Go)   │                      │
│  └─────────┘              └────┬────┘                      │
│                                │ 拉起                       │
│                                ▼                            │
│                          ┌─────────┐                        │
│                          │ Worker  │                        │
│                          │(Kotlin) │                        │
│                          └────┬────┘                        │
│                               │ 执行                        │
│                               ▼                             │
│                    ┌──────────────────────┐                │
│                    │ 插件任务 / 脚本任务   │                │
│                    └──────────────────────┘                │
└─────────────────────────────────────────────────────────────┘
```

- **Agent (Go)**: 负责进程调度、与后端通信、环境管理
- **Worker (Kotlin)**: 负责具体构建任务执行、插件运行、日志上报

## 二、目录结构

```
src/agent/
├── agent/                          # 主 Agent 模块
│   ├── src/
│   │   ├── cmd/                    # 入口程序
│   │   │   ├── agent/main.go       # Agent 主程序入口
│   │   │   ├── daemon/main.go      # Daemon 守护进程入口
│   │   │   ├── installer/main.go   # 安装程序入口
│   │   │   └── upgrader/main.go    # 升级程序入口
│   │   ├── pkg/                    # 核心包
│   │   │   ├── agent/              # Agent 核心逻辑
│   │   │   ├── api/                # API 客户端
│   │   │   ├── collector/          # 数据采集
│   │   │   ├── config/             # 配置管理
│   │   │   ├── cron/               # 定时任务
│   │   │   ├── i18n/               # 国际化
│   │   │   ├── imagedebug/         # Docker 镜像调试
│   │   │   ├── job/                # 构建任务管理
│   │   │   ├── job_docker/         # Docker 构建
│   │   │   ├── pipeline/           # Pipeline 任务
│   │   │   ├── upgrade/            # 升级逻辑
│   │   │   ├── upgrader/           # 升级器实现
│   │   │   └── util/               # 工具函数
│   │   └── third_components/       # 第三方组件管理
│   ├── go.mod
│   ├── Makefile
│   └── README.md
├── agent-slim/                     # 轻量版 Agent
│   └── cmd/slim.go
└── common/                         # 公共工具库
    └── utils/
        ├── fileutil/
        └── slice.go
```

## 三、核心组件详解

### 3.1 Daemon 守护进程

**文件**: `src/cmd/daemon/main.go`

Daemon 负责守护 Agent 进程,确保其持续运行:

```go
// Unix 实现:通过文件锁检测 Agent 是否存活
func watch(isDebug bool) {
    totalLock := flock.New(fmt.Sprintf("%s/%s.lock", systemutil.GetRuntimeDir(), systemutil.TotalLock))
    
    // 首次立即检查
    totalLock.Lock()
    doCheckAndLaunchAgent(isDebug)
    totalLock.Unlock()
    
    // 定时检查(5秒间隔)
    checkTimeTicker := time.NewTicker(agentCheckGap)
    for ; ; totalLock.Unlock() {
        select {
        case <-checkTimeTicker.C:
            if err := totalLock.Lock(); err != nil {
                continue
            }
            doCheckAndLaunchAgent(isDebug)
        }
    }
}

// 检查并拉起 Agent
func doCheckAndLaunchAgent(isDebug bool) {
    agentLock := flock.New(fmt.Sprintf("%s/agent.lock", systemutil.GetRuntimeDir()))
    locked, err := agentLock.TryLock()
    if err == nil && locked {
        // 能获取锁说明 Agent 未运行,需要拉起
        logs.Warn("agent is not available, will launch it")
        process, err := launch(workDir+"/"+config.AgentFileClientLinux, isDebug)
        if err != nil {
            logs.WithError(err).Error("launch agent failed")
        }
    }
}
```

**Windows 实现**: 使用 `github.com/kardianos/service` 库实现 Windows Service

### 3.2 Agent 核心流程

**文件**: `src/pkg/agent/agent.go`

```go
func Run(isDebug bool) {
    // 1. 初始化配置
    config.Init(isDebug)
    third_components.Init()
    
    // 2. 初始化国际化
    i18n.InitAgentI18n()
    
    // 3. 上报启动(重试直到成功)
    _, err := job.AgentStartup()
    if err != nil {
        for {
            _, err = job.AgentStartup()
            if err == nil {
                break
            }
            time.Sleep(5 * time.Second)
        }
    }
    
    // 4. 启动后台任务
    go collector.Collect()      // 数据采集
    go cron.CleanJob()          // 定期清理
    go cron.CleanDebugContainer() // 清理调试容器
    
    // 5. 主循环:Ask 请求
    for {
        doAsk()
        config.LoadAgentIp()
        time.Sleep(5 * time.Second)
    }
}
```

### 3.3 Ask 统一请求模式

Agent 使用 Ask 模式统一处理多种任务:

```go
func doAsk() {
    // 构建 Ask 请求
    enable := genAskEnable()
    heart, upgrad := genHeartInfoAndUpgrade(enable.Upgrade, exiterror)
    
    result, err := api.Ask(&api.AskInfo{
        Enable:  enable,      // 启用的功能
        Heart:   heart,       // 心跳信息
        Upgrade: upgrad,      // 升级信息
    })
    
    // 处理响应
    resp := new(api.AskResp)
    util.ParseJsonToData(result.Data, &resp)
    
    // 执行各类任务
    doAgentJob(enable, resp)
}

func doAgentJob(enable api.AskEnable, resp *api.AskResp) {
    // 心跳响应处理
    if resp.Heart != nil {
        go agentHeartbeat(resp.Heart)
    }
    
    // 构建任务
    hasBuild := (enable.Build != api.NoneBuildType) && (resp.Build != nil)
    if hasBuild {
        go job.DoBuild(resp.Build)
    }
    
    // 升级任务
    if enable.Upgrade && resp.Upgrade != nil {
        go upgrade.AgentUpgrade(resp.Upgrade, hasBuild)
    }
    
    // Pipeline 任务
    if enable.Pipeline && resp.Pipeline != nil {
        go pipeline.RunPipeline(resp.Pipeline)
    }
    
    // Docker 调试
    if enable.DockerDebug && resp.Debug != nil {
        go imagedebug.DoImageDebug(resp.Debug)
    }
}
```

### 3.4 构建任务执行

**文件**: `src/pkg/job/build.go`

```go
// DoBuild 执行构建任务
func DoBuild(buildInfo *api.ThirdPartyBuildInfo) {
    // 获取任务锁
    BuildTotalManager.Lock.Lock()
    
    // 检查并发数
    dockerCanRun, normalCanRun := CheckParallelTaskCount()
    
    if buildInfo.DockerBuildInfo != nil && dockerCanRun {
        // Docker 构建
        GBuildDockerManager.AddBuild(buildInfo.BuildId, &api.ThirdPartyDockerTaskInfo{...})
        BuildTotalManager.Lock.Unlock()
        runDockerBuild(buildInfo)
        return
    }
    
    if normalCanRun {
        // 普通构建
        GBuildManager.AddPreInstance(buildInfo.BuildId)
        BuildTotalManager.Lock.Unlock()
        runBuild(buildInfo)
    }
}

// runBuild 启动 Worker 进程
func runBuild(buildInfo *api.ThirdPartyBuildInfo) error {
    // 检查 worker.jar 是否存在
    agentJarPath := config.BuildAgentJarPath()
    if !fileutil.Exists(agentJarPath) {
        // 尝试自愈
        upgradeWorkerFile := systemutil.GetUpgradeDir() + "/" + config.WorkAgentFile
        if fileutil.Exists(upgradeWorkerFile) {
            fileutil.CopyFile(upgradeWorkerFile, agentJarPath, true)
        }
    }
    
    // 设置环境变量
    goEnv := map[string]string{
        "DEVOPS_AGENT_VERSION":     config.AgentVersion,
        "DEVOPS_WORKER_VERSION":    third_components.Worker.GetVersion(),
        "DEVOPS_PROJECT_ID":        buildInfo.ProjectId,
        "DEVOPS_BUILD_ID":          buildInfo.BuildId,
        "DEVOPS_VM_SEQ_ID":         buildInfo.VmSeqId,
        "DEVOPS_FILE_GATEWAY":      config.GAgentConfig.FileGateway,
        "DEVOPS_GATEWAY":           config.GetGateWay(),
        "BK_CI_LOCALE_LANGUAGE":    config.GAgentConfig.Language,
        "DEVOPS_AGENT_JDK_8_PATH":  third_components.Jdk.Jdk8.GetJavaOrNull(),
        "DEVOPS_AGENT_JDK_17_PATH": third_components.Jdk.Jdk17.GetJavaOrNull(),
    }
    
    // 创建临时目录并启动构建
    tmpDir, _ := systemutil.MkBuildTmpDir()
    doBuild(buildInfo, tmpDir, workDir, goEnv, runUser)
}
```

### 3.5 配置管理

**文件**: `src/pkg/config/config.go`

Agent 配置从 `.agent.properties` 文件加载:

```go
// 配置键定义
const (
    KeyProjectId         = "devops.project.id"
    KeyAgentId           = "devops.agent.id"
    KeySecretKey         = "devops.agent.secret.key"
    KeyDevopsGateway     = "landun.gateway"
    KeyDevopsFileGateway = "landun.fileGateway"
    KeyTaskCount         = "devops.parallel.task.count"
    KeyEnvType           = "landun.env"
    KeySlaveUser         = "devops.slave.user"
    KeyDockerTaskCount   = "devops.docker.parallel.task.count"
    KeyLanguage          = "devops.language"
    // ...
)

// AgentConfig 配置结构
type AgentConfig struct {
    Gateway                 string
    FileGateway             string
    BuildType               string
    ProjectId               string
    AgentId                 string
    SecretKey               string
    ParallelTaskCount       int
    DockerParallelTaskCount int
    EnableDockerBuild       bool
    Language                string
    // ...
}

// AgentEnv 环境信息
type AgentEnv struct {
    OsName           string
    agentIp          string
    HostName         string
    AgentVersion     string
    AgentInstallPath string
    OsVersion        string
    CPUProductInfo   string
    GPUProductInfo   string
}
```

### 3.6 API 客户端

**文件**: `src/pkg/api/api.go`

```go
// 构建 URL
func buildUrl(url string) string {
    return config.GetGateWay() + url
}

// Agent 启动上报
func AgentStartup() (*httputil.DevopsResult, error) {
    url := buildUrl("/ms/environment/api/buildAgent/agent/thirdPartyAgent/startup")
    startInfo := &ThirdPartyAgentStartInfo{
        HostName:      config.GAgentEnv.HostName,
        HostIp:        config.GAgentEnv.GetAgentIp(),
        DetectOs:      config.GAgentEnv.OsName,
        MasterVersion: config.AgentVersion,
        SlaveVersion:  third_components.Worker.GetVersion(),
    }
    return httputil.NewHttpClient().Post(url).Body(startInfo, false).
        SetHeaders(config.GAgentConfig.GetAuthHeaderMap()).Execute(nil).IntoDevopsResult()
}

// 构建完成上报
func WorkerBuildFinish(buildInfo *ThirdPartyBuildWithStatus) (*httputil.DevopsResult, error) {
    url := buildUrl("/ms/dispatch/api/buildAgent/agent/thirdPartyAgent/workerBuildFinish")
    return httputil.NewHttpClient().Post(url).Body(buildInfo, false).
        SetHeaders(config.GAgentConfig.GetAuthHeaderMap()).Execute(nil).IntoDevopsResult()
}

// Ask 统一请求
func Ask(info *AskInfo) (*httputil.AgentResult, error) {
    url := buildUrl("/ms/dispatch/api/buildAgent/agent/thirdPartyAgent/ask")
    return httputil.NewHttpClient().Post(url).Body(info, bodyEq).
        SetHeaders(config.GAgentConfig.GetAuthHeaderMap()).Execute(askRequest.Resp).IntoAgentResult()
}
```

### 3.7 升级机制

**文件**: `src/pkg/upgrade/upgrade.go`

```go
// AgentUpgrade 升级主逻辑
func AgentUpgrade(upgradeItem *api.UpgradeItem, hasBuild bool) {
    upItems := &upgradeItems{
        Agent:          upgradeItem.Agent,
        Worker:         upgradeItem.Worker,
        Jdk:            upgradeItem.Jdk,
        DockerInitFile: upgradeItem.DockerInitFile,
    }
    
    if upItems.NoChange() {
        return
    }
    
    // 有构建任务时跳过升级
    if hasBuild {
        return
    }
    
    // 获取任务锁,确保无任务运行
    if !job.BuildTotalManager.Lock.TryLock() {
        return
    }
    defer job.BuildTotalManager.Lock.Unlock()
    
    if job.CheckRunningJob() {
        return
    }
    
    // 下载升级文件
    downloadUpgradeFiles(upItems)
    
    // 执行升级
    DoUpgradeOperation(upItems)
}
```

### 3.8 数据采集

**文件**: `src/pkg/collector/collector.go`

使用 Telegraf 进行数据采集:

```go
func Collect() {
    if config.GAgentConfig.CollectorOn == false {
        logs.Info("agent collector off")
        return
    }
    
    for {
        ctx, cancel := context.WithCancel(context.Background())
        go func() {
            // 监听 IP 变化事件
            ipData := <-ipChan.DChan
            cancel()
        }()
        doAgentCollect(ctx)
    }
}

func doAgentCollect(ctx context.Context) {
    // 生成 Telegraf 配置
    configContent, _ := genTelegrafConfig()
    
    // 初始化 Telegraf Agent
    tAgent, _ := getTelegrafAgent(configContent.Bytes(), logFile)
    
    // 运行采集
    for {
        tAgent.Run(ctx)
        time.Sleep(telegrafRelaunchTime)
    }
}
```

## 四、数据类型定义

### 4.1 构建信息

**文件**: `src/pkg/api/type.go`

```go
// 构建任务类型
type BuildJobType string

const (
    AllBuildType    BuildJobType = "ALL"
    DockerBuildType BuildJobType = "DOCKER"
    BinaryBuildType BuildJobType = "BINARY"
    NoneBuildType   BuildJobType = "NONE"
)

// 第三方构建信息
type ThirdPartyBuildInfo struct {
    ProjectId       string                     `json:"projectId"`
    BuildId         string                     `json:"buildId"`
    VmSeqId         string                     `json:"vmSeqId"`
    Workspace       string                     `json:"workspace"`
    PipelineId      string                     `json:"pipelineId"`
    DockerBuildInfo *ThirdPartyDockerBuildInfo `json:"dockerBuildInfo"`
    ExecuteCount    *int                       `json:"executeCount"`
    ContainerHashId string                     `json:"containerHashId"`
}

// Docker 构建信息
type ThirdPartyDockerBuildInfo struct {
    AgentId         string        `json:"agentId"`
    SecretKey       string        `json:"secretKey"`
    Image           string        `json:"image"`
    Credential      Credential    `json:"credential"`
    Options         DockerOptions `json:"options"`
    ImagePullPolicy string        `json:"imagePullPolicy"`
}
```

### 4.2 心跳信息

```go
// Agent 心跳信息
type AgentHeartbeatInfo struct {
    MasterVersion           string                     `json:"masterVersion"`
    SlaveVersion            string                     `json:"slaveVersion"`
    HostName                string                     `json:"hostName"`
    AgentIp                 string                     `json:"agentIp"`
    ParallelTaskCount       int                        `json:"parallelTaskCount"`
    AgentInstallPath        string                     `json:"agentInstallPath"`
    StartedUser             string                     `json:"startedUser"`
    TaskList                []ThirdPartyTaskInfo       `json:"taskList"`
    DockerParallelTaskCount int                        `json:"dockerParallelTaskCount"`
    DockerTaskList          []ThirdPartyDockerTaskInfo `json:"dockerTaskList"`
}

// 心跳响应
type AgentHeartbeatResponse struct {
    MasterVersion           string            `json:"masterVersion"`
    SlaveVersion            string            `json:"slaveVersion"`
    AgentStatus             string            `json:"agentStatus"`
    ParallelTaskCount       int               `json:"parallelTaskCount"`
    Envs                    map[string]string `json:"envs"`
    Gateway                 string            `json:"gateway"`
    FileGateway             string            `json:"fileGateway"`
    DockerParallelTaskCount int               `json:"dockerParallelTaskCount"`
    Language                string            `json:"language"`
}
```

## 五、跨平台支持

### 5.1 平台特定代码

Agent 通过 Go 的构建标签支持多平台:

```
src/pkg/config/
├── config.go           # 通用配置
├── config_darwin.go    # macOS 特定
├── config_linux.go     # Linux 特定
└── config_win.go       # Windows 特定

src/pkg/upgrader/
├── upgrader_darwin.go  # macOS 升级器
├── upgrader_unix.go    # Unix 升级器
└── upgrader_win.go     # Windows 升级器
```

### 5.2 构建命令

```bash
# Linux
make clean build_linux

# macOS
make clean build_macos

# Windows
build_windows.bat
```

生成的二进制文件:
- `devopsDaemon_linux` / `devopsDaemon_macos` / `devopsDaemon.exe`
- `devopsAgent_linux` / `devopsAgent_macos` / `devopsAgent.exe`
- `upgrader_linux` / `upgrader_macos` / `upgrader.exe`

## 六、与后端服务交互

### 6.1 API 端点

| 服务 | 端点 | 用途 |
|------|------|------|
| Environment | `/ms/environment/api/buildAgent/agent/thirdPartyAgent/startup` | Agent 启动上报 |
| Dispatch | `/ms/dispatch/api/buildAgent/agent/thirdPartyAgent/ask` | 统一 Ask 请求 |
| Dispatch | `/ms/dispatch/api/buildAgent/agent/thirdPartyAgent/workerBuildFinish` | 构建完成上报 |
| Environment | `/ms/environment/api/buildAgent/agent/thirdPartyAgent/agents/pipelines` | Pipeline 任务 |
| Environment | `/ms/environment/api/buildAgent/agent/thirdPartyAgent/upgrade/files/download` | 下载升级文件 |

### 6.2 认证头

```go
func (a *AgentConfig) GetAuthHeaderMap() map[string]string {
    return map[string]string{
        "X-DEVOPS-BUILD-TYPE": a.BuildType,
        "X-DEVOPS-PROJECT-ID": a.ProjectId,
        "X-DEVOPS-AGENT-ID":   a.AgentId,
        "X-DEVOPS-AGENT-SECRET-KEY": a.SecretKey,
    }
}
```

## 七、开发规范

### 7.1 错误处理

```go
// 标准错误检查
if err != nil {
    logs.WithError(err).Error("operation failed")
    return errors.Wrap(err, "context message")
}

// Panic 恢复
defer func() {
    if err := recover(); err != nil {
        logs.Error("panic: ", err)
    }
}()
```

### 7.2 日志规范

```go
// 日志级别
logs.Debug("debug message")
logs.Info("info message")
logs.Infof("formatted: %s", value)
logs.Warn("warning message")
logs.Error("error message")
logs.WithError(err).Error("error with context")
```

### 7.3 并发模式

```go
// 启动 goroutine
go collector.Collect()
go cron.CleanJob()

// 使用锁保护共享资源
BuildTotalManager.Lock.Lock()
defer BuildTotalManager.Lock.Unlock()

// 使用文件锁进行进程间同步
agentLock := flock.New(fmt.Sprintf("%s/agent.lock", runtimeDir))
locked, err := agentLock.TryLock()
```

### 7.4 新增功能开发

1. **新增 API 调用**:在 `src/pkg/api/api.go` 添加函数
2. **新增数据类型**:在 `src/pkg/api/type.go` 定义结构体
3. **新增配置项**:在 `src/pkg/config/config.go` 添加常量和字段
4. **新增后台任务**:在 `doAgentJob()` 中添加处理逻辑

## 八、控制脚本

```bash
# Linux 示例
scripts/linux/install.sh    # 安装
scripts/linux/start.sh      # 启动
scripts/linux/stop.sh       # 停止
scripts/linux/uninstall.sh  # 卸载
```

## 九、相关模块

| 模块 | 关系 | 说明 |
|------|------|------|
| Worker | 下游 | Agent 拉起 Worker 执行构建 |
| Environment | 上游 | Agent 状态管理、心跳上报 |
| Dispatch | 上游 | 构建任务分发 |
| Log | 下游 | 构建日志上报(通过 Worker) |

Overview

This skill documents the module architecture and runtime behavior of the Go-based Agent used as a build machine in the BlueKing CI platform. It focuses on Agent startup, heartbeat/Ask flow, task acquisition and execution, automatic upgrades, and interactions with Dispatch and Worker (Kotlin) components. Use it as a focused guide when developing or modifying Agent features or operational logic.

How this skill works

The Agent runs as a daemon that ensures continuous operation, initializes config and third-party components, and enters a main Ask loop to poll the Dispatch API for work and control commands. Ask requests bundle enabled features, heartbeat info and upgrade signals; responses trigger concurrent handlers for heartbeats, builds, pipelines, Docker debug and upgrades. Builds launch Worker JVM processes (Kotlin JAR) with environment variables and temporary workdirs, while upgrades and collector tasks run under guarded locks to avoid interference with running builds.

When to use it

  • Implementing or changing Agent startup, daemon or service behavior.
  • Modifying heartbeat/Ask request/response handling or timing.
  • Adding or debugging task scheduling and Worker process launch logic.
  • Designing or adjusting automatic upgrade and self-healing workflows.
  • Integrating Telegraf-based metrics collection or Docker build support.

Best practices

  • Keep Ask payloads minimal and idempotent; parse responses defensively to tolerate partial fields.
  • Guard upgrades with global task locks and check for running jobs to avoid disrupting active builds.
  • Use file locks or OS services consistently between Unix and Windows implementations for reliable daemon behavior.
  • Copy worker artifacts from a safe upgrade directory if worker.jar is missing to enable self-healing.
  • Set explicit environment variables for Worker JVM runs to ensure reproducible build contexts.

Example use cases

  • Extend Ask response handling to support a new remote command without blocking main loop.
  • Implement custom heartbeat metrics or change heartbeat frequency to reflect network conditions.
  • Add new conditions to skip upgrades (e.g., pinned versions or maintenance windows).
  • Enable additional collector plugins in Telegraf and ensure graceful restart on IP changes.
  • Introduce platform-specific Docker build optimizations under job_docker module.

FAQ

How does the Agent avoid upgrading during a build?

Upgrades acquire the global BuildTotalManager lock and check for running jobs; if any build is running or the lock cannot be acquired, the upgrade is deferred.

What triggers a Worker process restart or re-install?

If worker.jar is missing, the Agent attempts to copy the file from the upgrade directory as a self-heal step before launching the Worker JVM.