程序员求职经验分享与学习资料整理平台

网站首页 > 文章精选 正文

150亿图床数据迁移:从单机到多集群的进化之路

balukai 2025-07-08 16:34:12 文章精选 3 ℃

在互联网内容爆炸式增长的时代,图片作为信息的主要载体之一,其存储和管理面临着前所未有的挑战。本文将分享我们如何将搜索业务包含150亿图片数据的图床从传统存储(Cassandra)迁移到S3对象存储的完整历程。从最初的单机单进程迁移,到最终的多集群并行处理,我们通过不断优化和迭代,将原本预计需要120天的迁移工作缩短到了40天完成。

第一阶段:单进程迁移的困境

项目初期,我们采用最简单的单进程迁移方案:

// 伪代码示例
func singleProcessMigration(filePath string) {
    file, _ := os.Open(filePath)
    scanner := bufio.NewScanner(file)
    
    for scanner.Scan() {
        key := scanner.Text()
        data := fetchFromCassandra(key)
        uploadToS3(key, data)
    }
}

这种方案存在明显问题:

  1. 性能瓶颈:单进程串行处理,迁移速度极慢
  2. 进度不可控:无法有效记录迁移进度,中断后难以恢复
  3. 资源浪费:单机CPU和网络利用率极低

实测表明,单进程迁移速度仅为每秒10-20个文件,按此速度完成300亿数据迁移需要约5年时间!

第二阶段:多文件多进程的改进

为解决单进程问题,我们改进为多进程方案:

  1. 将大文件拆分为多个小文件
  2. 每个进程处理一个独立文件
  3. 每个进程维护自己的进度记录
// 伪代码示例
func multiProcessMigration(fileList []string) {
    for _, file := range fileList {
        go func(f string) {
            // 读取上次进度
            lastPos := readProgress(f) 
            // 从断点继续迁移
            migrateFromPosition(f, lastPos)
        }(file)
    }
}

改进后效果:

  • 迁移速度提升至每秒1000-2000个文件
  • 支持断点续传

但新问题随之而来:

  1. 进度管理复杂:每个进程独立记录进度,难以统一管理
  2. 错误处理困难:失败重试机制不完善
  3. 资源分配不均:某些进程可能处理较慢的文件

第三阶段:引入消息队列的革命性突破

为解决上述问题,我们引入Redis作为消息队列:

// 队列操作核心代码 (app/task/queue/queue.go)
func Push(key string, verify string) {
    key = strings.TrimSpace(key)
    if key == "" {
        return
    }

    queue := queueName(verify)
    size, err := app.RedisW().LPush(queue, key).Result()
    if err != nil {
        app.Log("push_err").Errorf("Failed to push key: %s to queue: %s, error: %v", key, queue, err)
        return
    }
    app.Log().Infof("Added key: %s to queue: %s, new length: %d", key, queue, size)
}

func Pop(verify string) string {
    queue := queueName(verify)

    size, err := app.RedisW().LLen(queue).Result()
    if err != nil {
        log.Fatalf("failed to get queue length: %s, error: %v", queue, err)
    }

    if size > 0 {
        value, err := app.RedisW().RPop(queue).Result()
        if err != nil && err != redis.Nil {
            app.Log().Errorf("Failed to pop from queue: %s, error: %v", queue, err)
            log.Fatalf("queue pop error: %v", err)
        }
        return value
    }

    fmt.Printf("Queue %s is empty. Retrying...\n", queue)
    time.Sleep(time.Second)
    return ""
}

架构优化:

  1. 生产者:将文件内容导入Redis队列
  2. 消费者:多进程从队列消费任务
  3. 监控系统:实时监控队列长度和消费速度

优势体现:

  • 解耦生产消费:可以独立扩展生产者和消费者
  • 统一进度管理:通过队列长度可估算剩余工作量
  • 灵活重试:失败任务可重新放回队列

第四阶段:多集群部署的性能飞跃

为进一步提升性能,我们实现多集群部署:

  1. 集群配置

多台机器部署以下程序

nohup ./migrateimgbed -cluster wenku -go 100 >> migrateimgbed.log 2>&1 &
  1. 并发控制
// 迁移核心逻辑 (app/task/migrate.go)
func Run() {
    data := make(chan string)
    goNum := app.GoNum
    verify := ""

    fmt.Println("redis info dump:", app.RedisW())

    // 使用 WaitGroup 来管理并发任务
    var wg sync.WaitGroup

    // 启动消费者 goroutines
    for i := 0; i < goNum; i++ {
        wg.Add(1)
        go consume(data, &wg)
    }

    fmt.Println("Starting task run...")

    // 生产者:从队列中提取数据
    go func() {
        defer close(data)
        for {
            key := queue.Pop(verify)
            if key == "" {
                fmt.Println("No data in queue, exiting producer.")
                break
            }
            data <- key
        }
    }()

    // 等待消费者完成所有任务
    wg.Wait()
}
  1. 迁移核心逻辑
// 迁移逻辑 (app/task/migrate.go)
func do(key string) error {
    key = strings.TrimSpace(key)

    //自定义key 上传到common bucket
    if strings.HasPrefix(key, "/d/") {
        key = ParseCustomKey(key)
    }

    //gif 第一帧 转回原图
    if strings.HasSuffix(key, "_1") && len(key) == 21 {
        key = strings.TrimRight(key, "_1")
    }

    // 验证 key 格式
    arr := strings.SplitAfter(key, "t01")
    if len(arr) < 2 || arr[0] != "t01" {
        return fmt.Errorf("invalid key format: %s", key)
    }

    s3Key, valid := ParseKey(key)
    if !valid {
        return fmt.Errorf("invalid parsed key format: %s", key)
    }

    start := time.Now()
    s3client := s3.NewClient(app.Cluster)

    var notFound bool

    // 从 Cassandra 获取数据
    reader, err := cass.GetFromApi(key, "shyc2")
    if err != nil || reader == nil {
        // 如果在第一个 Cassandra 集群中未找到,尝试第二个
        reader, err = cass.GetFromApi(key, "shbt")
        if err != nil || reader == nil {
            notFound = true
        }
    }

    // 如果两个集群都没有找到数据
    if notFound {
        elapsed := time.Since(start)
        app.Log("not_found_key").Errorf("upload_failed %s %s", key, elapsed)
        return fmt.Errorf("data not found for key: %s", key)
    }

    // 上传到 S3 带重试机制
    if !uploadToS3(s3client, s3Key, reader, 3) {
        elapsed := time.Since(start)
        app.Log("found_upload_failed").Errorf("upload_failed %s %s", key, elapsed)
        return fmt.Errorf("failed to upload data to S3 for key: %s", key)
    }

    elapsed := time.Since(start)
    app.Log("ok").Infof("upload_ok %s %s %s", key, s3Key, elapsed)
    return nil
}

性能指标:

  • 单机100 goroutine:3000-5000文件/秒(并发可以随时加)
  • 6台机器并发:约10000qps(有部分丢失文件,重试会增加耗时)
  • 日均迁移量:3-5亿文件(与文件批次大小有关)

第五阶段:校验与差集处理

迁移完成后,我们开发了完善的校验机制:

// 校验逻辑 (app/task/verify.go)
func Verify() {
    data := make(chan string)
    var wg sync.WaitGroup

    fmt.Println(app.RedisW())

    // 启动消费者 goroutines
    for i := 0; i < app.GoNum; i++ {
        wg.Add(1)
        go consumeHead(data, &wg)
    }

    // 启动生产者
    go func() {
        defer close(data)
        for {
            key := queue.Pop("verify")
            if key == "" {
                fmt.Println("No data in queue, exiting producer.")
                break
            }
            data <- key
        }
    }()

    // 等待所有消费者完成
    wg.Wait()
}

func checkS3ObjectExistence(client *s3.Client, s3Key, originalKey string) bool {
    for retry := 0; retry < 3; retry++ {
        exists, _, err := client.Exist(nil, s3Key)
        if err != nil {
            //time.Sleep(time.Second)
            app.Log("s3").Errorf("exist_check_failed: %s, attempt=%d, error=%s", s3Key, retry+1, err)
            continue
        }
        if !exists {
            app.Log("s3_not").Errorf("object_not_found: %s", originalKey)
            return false
        }
        return true
    }
    return false
}

对于百亿级数据差集对比,我们采用归并排序思想:

// 差集对比核心 (cmd/dedup_merge/dedup_merge.go)
func mergeSortedFiles(tempFiles []string, outputFile string) error {
    // 使用最小堆多路归并
    pq := &MinHeap{}
    heap.Init(pq)
    
    // 打开所有临时文件
    readers := make([]*bufio.Scanner, len(tempFiles))
    files := make([]*os.File, len(tempFiles))
    for i, fileName := range tempFiles {
        file, err := os.Open(fileName)
        if err != nil {
            return fmt.Errorf("error opening temp file %s: %w", fileName, err)
        }
        files[i] = file
        readers[i] = bufio.NewScanner(file)
    }
    
    // 初始化堆
    for i, reader := range readers {
        if reader.Scan() {
            heap.Push(pq, FileLine{line: reader.Text(), index: i})
        }
    }
    
    // 打开输出文件
    output, err := os.Create(outputFile)
    if err != nil {
        return fmt.Errorf("error creating output file: %w", err)
    }
    defer output.Close()
    writer := bufio.NewWriter(output)

    var prevLine string
    for pq.Len() > 0 {
        min := heap.Pop(pq).(FileLine)
        if min.line != prevLine {
            writer.WriteString(min.line + "\n")
            prevLine = min.line
        }
        if readers[min.index].Scan() {
            heap.Push(pq, FileLine{line: readers[min.index].Text(), index: min.index})
        }
    }
    writer.Flush()
    
    // 清理临时文件
    for _, file := range files {
        file.Close()
    }
    for _, fileName := range tempFiles {
        os.Remove(fileName)
    }
    
    return nil
}

该工具特点:

  1. 分块处理:将大文件拆分为多个小文件
  2. 并行排序:多goroutine并发处理各分块
  3. 多路归并:使用最小堆高效合并有序文件
  4. 内存优化:动态调整chunk大小,避免内存溢出

第六阶段:性能优化关键点

  1. 连接池优化
  2. 复用S3和Redis连接
  3. 合理设置超时和重试参数
  4. 错误处理机制
// 带重试的上传逻辑 (app/task/migrate.go)
func uploadToS3(client *s3.Client, key string, reader io.Reader, retries int) bool {
    for i := 0; i < retries; i++ {
        _, err := client.UploadFileByte(context.TODO(), key, reader)
        if err == nil {
            return true
        }
        // 指数退避重试机制:每次重试增加延迟
        backoff := time.Duration(1<<i) * time.Second
        app.Log("s3").Errorf("up_s3_err key %s, attempt %d, err %s, retrying in %v", key, i+1, err.Error(), backoff)
        time.Sleep(backoff)
    }
    return false
}
  1. 资源监控
  2. 实时监控队列积压
  3. 动态调整goroutine数量
  4. 完善的日志系统记录关键指标

总结与展望

通过四个阶段的持续优化,我们最终实现了:

  1. 效率提升:从预估的4-5个月缩短到40天
  2. 可靠性保证:完善的错误处理和重试机制
  3. 资源优化:合理利用集群资源,平衡迁移速度与源站负载

技术收获

  1. 分治思想的应用:将大问题分解为小问题并行处理
  2. 内存与磁盘的平衡:合理利用临时文件减少内存压力
  3. 高效算法的选择:最小堆在多路归并中的优势
  4. 并发模式的选择:Worker池与生产-消费者模式的实践
  5. 大文件处理经验:开发专用工具处理超大规模数据

这次150亿图床数据的迁移实践,不仅解决了当下的业务需求,更为我们积累了处理超大规模数据迁移的宝贵经验。技术的演进永无止境,我们将继续探索更高效、更可靠的数据迁移方案。

Tags:

最近发表
标签列表