网站首页 > 文章精选 正文
在互联网内容爆炸式增长的时代,图片作为信息的主要载体之一,其存储和管理面临着前所未有的挑战。本文将分享我们如何将搜索业务包含150亿图片数据的图床从传统存储(Cassandra)迁移到S3对象存储的完整历程。从最初的单机单进程迁移,到最终的多集群并行处理,我们通过不断优化和迭代,将原本预计需要120天的迁移工作缩短到了40天完成。
第一阶段:单进程迁移的困境
项目初期,我们采用最简单的单进程迁移方案:
// 伪代码示例
func singleProcessMigration(filePath string) {
file, _ := os.Open(filePath)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
key := scanner.Text()
data := fetchFromCassandra(key)
uploadToS3(key, data)
}
}
这种方案存在明显问题:
- 性能瓶颈:单进程串行处理,迁移速度极慢
- 进度不可控:无法有效记录迁移进度,中断后难以恢复
- 资源浪费:单机CPU和网络利用率极低
实测表明,单进程迁移速度仅为每秒10-20个文件,按此速度完成300亿数据迁移需要约5年时间!
第二阶段:多文件多进程的改进
为解决单进程问题,我们改进为多进程方案:
- 将大文件拆分为多个小文件
- 每个进程处理一个独立文件
- 每个进程维护自己的进度记录
// 伪代码示例
func multiProcessMigration(fileList []string) {
for _, file := range fileList {
go func(f string) {
// 读取上次进度
lastPos := readProgress(f)
// 从断点继续迁移
migrateFromPosition(f, lastPos)
}(file)
}
}
改进后效果:
- 迁移速度提升至每秒1000-2000个文件
- 支持断点续传
但新问题随之而来:
- 进度管理复杂:每个进程独立记录进度,难以统一管理
- 错误处理困难:失败重试机制不完善
- 资源分配不均:某些进程可能处理较慢的文件
第三阶段:引入消息队列的革命性突破
为解决上述问题,我们引入Redis作为消息队列:
// 队列操作核心代码 (app/task/queue/queue.go)
func Push(key string, verify string) {
key = strings.TrimSpace(key)
if key == "" {
return
}
queue := queueName(verify)
size, err := app.RedisW().LPush(queue, key).Result()
if err != nil {
app.Log("push_err").Errorf("Failed to push key: %s to queue: %s, error: %v", key, queue, err)
return
}
app.Log().Infof("Added key: %s to queue: %s, new length: %d", key, queue, size)
}
func Pop(verify string) string {
queue := queueName(verify)
size, err := app.RedisW().LLen(queue).Result()
if err != nil {
log.Fatalf("failed to get queue length: %s, error: %v", queue, err)
}
if size > 0 {
value, err := app.RedisW().RPop(queue).Result()
if err != nil && err != redis.Nil {
app.Log().Errorf("Failed to pop from queue: %s, error: %v", queue, err)
log.Fatalf("queue pop error: %v", err)
}
return value
}
fmt.Printf("Queue %s is empty. Retrying...\n", queue)
time.Sleep(time.Second)
return ""
}
架构优化:
- 生产者:将文件内容导入Redis队列
- 消费者:多进程从队列消费任务
- 监控系统:实时监控队列长度和消费速度
优势体现:
- 解耦生产消费:可以独立扩展生产者和消费者
- 统一进度管理:通过队列长度可估算剩余工作量
- 灵活重试:失败任务可重新放回队列
第四阶段:多集群部署的性能飞跃
为进一步提升性能,我们实现多集群部署:
- 集群配置:
多台机器部署以下程序
nohup ./migrateimgbed -cluster wenku -go 100 >> migrateimgbed.log 2>&1 &
- 并发控制:
// 迁移核心逻辑 (app/task/migrate.go)
func Run() {
data := make(chan string)
goNum := app.GoNum
verify := ""
fmt.Println("redis info dump:", app.RedisW())
// 使用 WaitGroup 来管理并发任务
var wg sync.WaitGroup
// 启动消费者 goroutines
for i := 0; i < goNum; i++ {
wg.Add(1)
go consume(data, &wg)
}
fmt.Println("Starting task run...")
// 生产者:从队列中提取数据
go func() {
defer close(data)
for {
key := queue.Pop(verify)
if key == "" {
fmt.Println("No data in queue, exiting producer.")
break
}
data <- key
}
}()
// 等待消费者完成所有任务
wg.Wait()
}
- 迁移核心逻辑:
// 迁移逻辑 (app/task/migrate.go)
func do(key string) error {
key = strings.TrimSpace(key)
//自定义key 上传到common bucket
if strings.HasPrefix(key, "/d/") {
key = ParseCustomKey(key)
}
//gif 第一帧 转回原图
if strings.HasSuffix(key, "_1") && len(key) == 21 {
key = strings.TrimRight(key, "_1")
}
// 验证 key 格式
arr := strings.SplitAfter(key, "t01")
if len(arr) < 2 || arr[0] != "t01" {
return fmt.Errorf("invalid key format: %s", key)
}
s3Key, valid := ParseKey(key)
if !valid {
return fmt.Errorf("invalid parsed key format: %s", key)
}
start := time.Now()
s3client := s3.NewClient(app.Cluster)
var notFound bool
// 从 Cassandra 获取数据
reader, err := cass.GetFromApi(key, "shyc2")
if err != nil || reader == nil {
// 如果在第一个 Cassandra 集群中未找到,尝试第二个
reader, err = cass.GetFromApi(key, "shbt")
if err != nil || reader == nil {
notFound = true
}
}
// 如果两个集群都没有找到数据
if notFound {
elapsed := time.Since(start)
app.Log("not_found_key").Errorf("upload_failed %s %s", key, elapsed)
return fmt.Errorf("data not found for key: %s", key)
}
// 上传到 S3 带重试机制
if !uploadToS3(s3client, s3Key, reader, 3) {
elapsed := time.Since(start)
app.Log("found_upload_failed").Errorf("upload_failed %s %s", key, elapsed)
return fmt.Errorf("failed to upload data to S3 for key: %s", key)
}
elapsed := time.Since(start)
app.Log("ok").Infof("upload_ok %s %s %s", key, s3Key, elapsed)
return nil
}
性能指标:
- 单机100 goroutine:3000-5000文件/秒(并发可以随时加)
- 6台机器并发:约10000qps(有部分丢失文件,重试会增加耗时)
- 日均迁移量:3-5亿文件(与文件批次大小有关)
第五阶段:校验与差集处理
迁移完成后,我们开发了完善的校验机制:
// 校验逻辑 (app/task/verify.go)
func Verify() {
data := make(chan string)
var wg sync.WaitGroup
fmt.Println(app.RedisW())
// 启动消费者 goroutines
for i := 0; i < app.GoNum; i++ {
wg.Add(1)
go consumeHead(data, &wg)
}
// 启动生产者
go func() {
defer close(data)
for {
key := queue.Pop("verify")
if key == "" {
fmt.Println("No data in queue, exiting producer.")
break
}
data <- key
}
}()
// 等待所有消费者完成
wg.Wait()
}
func checkS3ObjectExistence(client *s3.Client, s3Key, originalKey string) bool {
for retry := 0; retry < 3; retry++ {
exists, _, err := client.Exist(nil, s3Key)
if err != nil {
//time.Sleep(time.Second)
app.Log("s3").Errorf("exist_check_failed: %s, attempt=%d, error=%s", s3Key, retry+1, err)
continue
}
if !exists {
app.Log("s3_not").Errorf("object_not_found: %s", originalKey)
return false
}
return true
}
return false
}
对于百亿级数据差集对比,我们采用归并排序思想:
// 差集对比核心 (cmd/dedup_merge/dedup_merge.go)
func mergeSortedFiles(tempFiles []string, outputFile string) error {
// 使用最小堆多路归并
pq := &MinHeap{}
heap.Init(pq)
// 打开所有临时文件
readers := make([]*bufio.Scanner, len(tempFiles))
files := make([]*os.File, len(tempFiles))
for i, fileName := range tempFiles {
file, err := os.Open(fileName)
if err != nil {
return fmt.Errorf("error opening temp file %s: %w", fileName, err)
}
files[i] = file
readers[i] = bufio.NewScanner(file)
}
// 初始化堆
for i, reader := range readers {
if reader.Scan() {
heap.Push(pq, FileLine{line: reader.Text(), index: i})
}
}
// 打开输出文件
output, err := os.Create(outputFile)
if err != nil {
return fmt.Errorf("error creating output file: %w", err)
}
defer output.Close()
writer := bufio.NewWriter(output)
var prevLine string
for pq.Len() > 0 {
min := heap.Pop(pq).(FileLine)
if min.line != prevLine {
writer.WriteString(min.line + "\n")
prevLine = min.line
}
if readers[min.index].Scan() {
heap.Push(pq, FileLine{line: readers[min.index].Text(), index: min.index})
}
}
writer.Flush()
// 清理临时文件
for _, file := range files {
file.Close()
}
for _, fileName := range tempFiles {
os.Remove(fileName)
}
return nil
}
该工具特点:
- 分块处理:将大文件拆分为多个小文件
- 并行排序:多goroutine并发处理各分块
- 多路归并:使用最小堆高效合并有序文件
- 内存优化:动态调整chunk大小,避免内存溢出
第六阶段:性能优化关键点
- 连接池优化:
- 复用S3和Redis连接
- 合理设置超时和重试参数
- 错误处理机制:
// 带重试的上传逻辑 (app/task/migrate.go)
func uploadToS3(client *s3.Client, key string, reader io.Reader, retries int) bool {
for i := 0; i < retries; i++ {
_, err := client.UploadFileByte(context.TODO(), key, reader)
if err == nil {
return true
}
// 指数退避重试机制:每次重试增加延迟
backoff := time.Duration(1<<i) * time.Second
app.Log("s3").Errorf("up_s3_err key %s, attempt %d, err %s, retrying in %v", key, i+1, err.Error(), backoff)
time.Sleep(backoff)
}
return false
}
- 资源监控:
- 实时监控队列积压
- 动态调整goroutine数量
- 完善的日志系统记录关键指标
总结与展望
通过四个阶段的持续优化,我们最终实现了:
- 效率提升:从预估的4-5个月缩短到40天
- 可靠性保证:完善的错误处理和重试机制
- 资源优化:合理利用集群资源,平衡迁移速度与源站负载
技术收获
- 分治思想的应用:将大问题分解为小问题并行处理
- 内存与磁盘的平衡:合理利用临时文件减少内存压力
- 高效算法的选择:最小堆在多路归并中的优势
- 并发模式的选择:Worker池与生产-消费者模式的实践
- 大文件处理经验:开发专用工具处理超大规模数据
这次150亿图床数据的迁移实践,不仅解决了当下的业务需求,更为我们积累了处理超大规模数据迁移的宝贵经验。技术的演进永无止境,我们将继续探索更高效、更可靠的数据迁移方案。
猜你喜欢
- 2025-07-08 Rust编程语言之父都在用什么工具(rust编程语言怎么样)
- 2025-07-08 go语言中性能分析工具pprof使用心得
- 2025-07-08 爱上开源之golang入门至实战第三章-内存逃逸
- 2025-07-08 GO 编程:Golang的协程调度器原理及GMP设计思想
- 2025-07-08 Go语言核心36讲(新年彩蛋)--学习笔记
- 2025-07-08 c语言中堆和栈的区别(c语言堆和栈的概念和区别)
- 2025-07-08 Go 语言内存管理(一):系统内存管理
- 2025-07-08 深入解读Raft算法与etcd工程实现(etcd raft库原理)
- 2025-07-08 Go语言中的性能考虑和优化(go语言的效率)
- 2025-07-08 高性能 Go 的 6 个技巧 — Go 高级主题
- 最近发表
- 标签列表
-
- newcoder (56)
- 字符串的长度是指 (45)
- drawcontours()参数说明 (60)
- unsignedshortint (59)
- postman并发请求 (47)
- python列表删除 (50)
- 左程云什么水平 (56)
- 计算机网络的拓扑结构是指() (45)
- 编程题 (64)
- postgresql默认端口 (66)
- 数据库的概念模型独立于 (48)
- 产生系统死锁的原因可能是由于 (51)
- 数据库中只存放视图的 (62)
- 在vi中退出不保存的命令是 (53)
- 哪个命令可以将普通用户转换成超级用户 (49)
- noscript标签的作用 (48)
- 联合利华网申 (49)
- swagger和postman (46)
- 结构化程序设计主要强调 (53)
- 172.1 (57)
- apipostwebsocket (47)
- 唯品会后台 (61)
- 简历助手 (56)
- offshow (61)
- mysql数据库面试题 (57)