当前位置:首页 > 技术分析 > 正文内容

多线程数据核对实战指南:文件 vs 数据库

ruisui882个月前 (03-16)技术分析17

任务背景

曾经手里有一份 超大的数据文件,我需要解析这个文件并提取每行特定的数据,文件是无法打开的。直接上实操,首先是设计出一个流程,推敲演练,最终总结出如下4大过程

  1. 解析文件内容,提取关键数据。
  2. 从数据库中读取对应数据。
  3. 将两者进行核对。
  4. 异常处理。
  5. 将核对结果批量入库。

但问题是:文件太大,内存有限!别担心,我们可以使用用 多线程 + 文件分块 的魔法来解决!


任务目标

  1. 高效解析文件:将文件切分成块,确保每块内容完整。
  2. 多线程读取:用多线程解析文件块和数据库数据。
  3. 数据核对:通过队列实现文件数据与数据库数据的核对。
  4. 批量入库:将核对结果高效写入数据库。

方案设计

流程图

流程图是GPS导航,清晰指引数据流向

方案设计图

核心步骤

给出关键代码片段

1.文件切分

  • 目标:将大文件切分成小块,确保每块内容完整。
  • 方法
    • 按行切分,避免切割到半行数据。
    • 每块大小根据内存限制动态调整。
public List splitFile(String filePath, int blockSize) throws IOException {
    List blocks = new ArrayList<>();
    try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
        String line;
        StringBuilder buffer = new StringBuilder();
        int currentSize = 0;
        while ((line = reader.readLine()) != null) {
            buffer.append(line).append("\n");
            currentSize += line.length();
            if (currentSize >= blockSize) {
                blocks.add(new FileBlock(buffer.toString()));
                buffer = new StringBuilder();
                currentSize = 0;
            }
        }
        if (buffer.length() > 0) {
            blocks.add(new FileBlock(buffer.toString()));
        }
    }
    return blocks;
}

2.多线程解析文件

  • 目标:用多线程解析文件块,提取关键数据。
  • 方法
    • 每个线程处理一个文件块。
    • 将解析结果放入核对队列。
public class FileParser implements Runnable {
    private String blockContent;
    private BlockingQueue queue;

    public FileParser(String blockContent, BlockingQueue queue) {
        this.blockContent = blockContent;
        this.queue = queue;
    }

    @Override
    public void run() {
        String[] lines = blockContent.split("\n");
        for (String line : lines) {
            Data data = parseLine(line); // 解析每行数据
            queue.put(data); // 放入核对队列
        }
    }

    private Data parseLine(String line) {
        // 解析逻辑
        return new Data();
    }
}

3.多线程读取数据库

  • 目标:用多线程从数据库读取数据。
  • 方法
    • 每个线程读取一部分数据。
    • 将读取结果放入核对队列。
public class DBReader implements Runnable {
    private BlockingQueue queue;
    private int startId;
    private int endId;

    public DBReader(BlockingQueue queue, int startId, int endId) {
        this.queue = queue;
        this.startId = startId;
        this.endId = endId;
    }

    @Override
    public void run() {
        List dbData = fetchDataFromDB(startId, endId); // 从数据库读取数据
        for (Data data : dbData) {
            queue.put(data); // 放入核对队列
        }
    }

    private List fetchDataFromDB(int startId, int endId) {
        // 数据库查询逻辑
        return new ArrayList<>();
    }
}

4.数据核对

  • 目标:核对文件数据和数据库数据。
  • 方法
    • 从队列中取出数据,进行比对。
    • 将核对结果放入批量入库队列。
public class DataChecker implements Runnable {
    private BlockingQueue fileQueue;
    private BlockingQueue dbQueue;
    private BlockingQueue resultQueue;

    public DataChecker(BlockingQueue fileQueue, BlockingQueue dbQueue, BlockingQueue resultQueue) {
        this.fileQueue = fileQueue;
        this.dbQueue = dbQueue;
        this.resultQueue = resultQueue;
    }

    @Override
    public void run() {
        while (true) {
            Data fileData = fileQueue.take();
            Data dbData = dbQueue.take();
            if (fileData.equals(dbData)) {
                resultQueue.put(fileData); // 核对通过,放入结果队列
            }
        }
    }
}

5.批量入库

  • 目标:将核对结果批量写入数据库。
  • 方法
    • 从结果队列中取出数据,批量插入。
public class BatchInserter implements Runnable {
    private BlockingQueue resultQueue;

    public BatchInserter(BlockingQueue resultQueue) {
        this.resultQueue = resultQueue;
    }

    @Override
    public void run() {
        List batch = new ArrayList<>();
        while (true) {
            Data data = resultQueue.take();
            batch.add(data);
            if (batch.size() >= 1000) { // 每1000条批量插入一次
                insertBatch(batch);
                batch.clear();
            }
        }
    }

    private void insertBatch(List batch) {
        // 批量插入逻辑
    }
}

异常处理四重防御

异常防御机制是防弹装甲,抵御各种意外攻击。

1.文件解析异常捕获

public class FileParser implements Runnable {
    @Override
    public void run() {
        try {
            // 解析逻辑...
        } catch (Exception e) {
            ErrorTracker.log("文件解析异常", e);
            ErrorQueue.put(new ErrorData(blockId, e)); // 记录错误块
        } finally {
            CompletionCounter.fileBlockDone(); // 完成计数器
        }
    }
}

2.数据库查询重试机制

public List fetchDataWithRetry(int page, int size) {
    int retry = 0;
    while (retry < 3) {
        try {
            return jdbcTemplate.query("SELECT ... LIMIT ?,?", page*size, size);
        } catch (SQLException e) {
            ErrorTracker.log("数据库查询异常", e);
            retry++;
            Thread.sleep(1000 * retry); // 指数退避
        }
    }
    throw new RetryFailedException("数据库查询重试失败");
}

3.核对完整性保障

public class DataChecker {
    private AtomicInteger fileCount = new AtomicInteger(0);
    private AtomicInteger dbCount = new AtomicInteger(0);

    public void run() {
        while (!isDone()) { // 双重判断
            Data fileData = fileQueue.poll(1, TimeUnit.SECONDS);
            Data dbData = dbQueue.poll(1, TimeUnit.SECONDS);
            
            if (fileData != null) fileCount.incrementAndGet();
            if (dbData != null) dbCount.incrementAndGet();
            
            // 核对逻辑...
        }
        
        // 最终校验
        if (fileCount.get() != dbCount.get()) {
            ErrorTracker.log("数据总量不匹配: 文件数据=" + fileCount + " 数据库数据=" + dbCount);
        }
    }
}

4.批量插入容错设计

public class BatchInserter {
    public void insertWithFallback(List batch) {
        try {
            jdbcTemplate.batchUpdate("INSERT...", batch);
        } catch (DataAccessException e) {
            ErrorTracker.log("批量插入失败", e);
            // 分片重试:将大分片拆成小分片
            if (batch.size() > 1) {
                insertWithFallback(batch.subList(0, batch.size()/2));
                insertWithFallback(batch.subList(batch.size()/2, batch.size()));
            } else {
                ErrorQueue.put(batch.get(0)); // 单条进入错误队列
            }
        }
    }
}

完整性保障策略

完整性校验 是质量检测仪,确保不遗漏任何数据

保障机制

实现方式

原子计数器

使用AtomicInteger统计文件/数据库数据量

双重完成检测

1. 生产者完成标记
2. 队列空状态检测

最终一致性校验

核对结束后对比文件行数与数据库记录数

错误数据重试

错误队列数据定时重新投入核对流程

水位线监控

实时监控各队列数据积压情况,动态调整线程数


性能优化技巧

智能优化策略 是涡轮增压引擎,让处理速度持续飙升

  1. 动态分块策略
// 根据系统实时状态自动调整分块大小
int dynamicBlockSize = Runtime.getRuntime().freeMemory() > 512MB ? 64MB : 16MB;
  1. 双缓冲队列设计
// 主队列 + 溢出磁盘队列(防止内存溢出)
BlockingQueue overflowQueue = new DiskBackedBlockingQueue<>(); 
  1. 智能线程池管理
// 根据任务类型动态调整线程数
ExecutorService executor = new ThreadPoolExecutor(
    corePoolSize, 
    maxPoolSize,
    60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(1000),
    new SmartThreadFactory() // 监控线程负载
);

最终效果

运行效果

  1. 文件切分:确保每块内容完整。
  2. 多线程解析:高效提取文件数据。
  3. 多线程读取数据库:快速获取数据库数据。
  4. 数据核对:通过队列实现高效比对。
  5. 批量入库:将核对结果高效写入数据库。

总结

通过 文件切分 + 多线程 + 队列 的方案,我们可以轻松解决了 大文件解析 和 数据核对 的难题!

这个方案像给数据处理流程装上了 防弹衣 + GPS追踪器,并且这套方案的思路对以下极端情况:

文件解析中途崩溃、数据库连接闪断、核对数据量级差异、网络波动导致插入失败,系统仍能保证:

  1. 零数据丢失
  2. 完整核对覆盖
  3. 自动恢复能力
  4. 实时状态可观测

欢迎讨论并提出建议!

扫描二维码推送至手机访问。

版权声明:本文由ruisui88发布,如需转载请注明出处。

本文链接:http://www.ruisui88.com/post/2782.html

分享给朋友:

“多线程数据核对实战指南:文件 vs 数据库” 的相关文章

Gitlab之间进行同步备份

目前,我们公司有两个研发团队,分别在北京和武汉,考虑到访问速度的问题,原有武汉的研发环境在近端部署。也就是北京和武汉分别有两套独立的研发管理环境,虽然这解决了近端访问速度的问题,但是管理上较为分散,比如研发环境备份和恢复就是最重要的问题之一。最近,处于对安全性和合规性的考虑,希望将北京和武汉的源代码...

一文让你彻底搞懂 vue-Router

路由是网络工程里面的专业术语,就是通过互联把信息从源地址传输到目的地址的活动。本质上就是一种对应关系。分为前端路由和后端路由。后端路由:URL 的请求地址与服务器上的资源对应,根据不同的请求地址返回不同的资源。前端路由:在单页面应用中,根据用户触发的事件,改变URL在不刷新页面的前提下,改变显示内容...

Vue2的16种传参通信方式

前言先直入主题列出有哪些传参方式,下面再通过事例一一讲解。props(父传子)$emit与v-on (子传父)EventBus (兄弟传参).sync与update: (父子双向)v-model (父子双向)ref$children与$parent$attrs与$listeners (爷孙双向)pr...

关于Vue页面跳转传参,参数不同, 但页面只获取参数一次的问题

#头条创作挑战赛#1.问题描述问题描述: element 展示表格(页面A),点击表格的每一行的查看详情按钮,可以携带此行的信息参数跳转到另一个页面(页面B),但是从A页面到B页面,只有第一次跳转的时候B页面可以获取到A页面的参数,返回再次A->B ,B页面无法获取到参数。2.解决办法:方法一...

Vue真是太好了 壹万多字的Vue知识点 超详细!

1??、Vue和其他两大框架的区别Angular 学习成本太高React 代码可读性差Vue 学习成本较低 很容易上手VUE官方: https://cn.vuejs.org/v2/guide/comparison.html?2??、Vue是什么Vue是一套用于构建用户界面的渐进式框架 "前端...

基于Spring Cloud+VUE的多租户小程序商城源码「快速二开可商用」

一、系统介绍JooLun平台是一个专注微信快速二开系统研发的平台,采用Java语言开发,使用的是最新微服务前后端分离技术,目前有公众号和小程序商城两个版本,有公众号后台管理、小程序商城。基于Spring Cloud微服务+VUE实现的核心框架多租户小程序商城源码,核心框架采用SpringBoot2+...