多线程数据核对实战指南:文件 vs 数据库
任务背景
曾经手里有一份 超大的数据文件,我需要解析这个文件并提取每行特定的数据,文件是无法打开的。直接上实操,首先是设计出一个流程,推敲演练,最终总结出如下4大过程
- 解析文件内容,提取关键数据。
- 从数据库中读取对应数据。
- 将两者进行核对。
- 异常处理。
- 将核对结果批量入库。
但问题是:文件太大,内存有限!别担心,我们可以使用用 多线程 + 文件分块 的魔法来解决!
任务目标
- 高效解析文件:将文件切分成块,确保每块内容完整。
- 多线程读取:用多线程解析文件块和数据库数据。
- 数据核对:通过队列实现文件数据与数据库数据的核对。
- 批量入库:将核对结果高效写入数据库。
方案设计
流程图
流程图是GPS导航,清晰指引数据流向
方案设计图
核心步骤
给出关键代码片段
1.文件切分
- 目标:将大文件切分成小块,确保每块内容完整。
- 方法:
- 按行切分,避免切割到半行数据。
- 每块大小根据内存限制动态调整。
public List splitFile(String filePath, int blockSize) throws IOException {
List blocks = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
String line;
StringBuilder buffer = new StringBuilder();
int currentSize = 0;
while ((line = reader.readLine()) != null) {
buffer.append(line).append("\n");
currentSize += line.length();
if (currentSize >= blockSize) {
blocks.add(new FileBlock(buffer.toString()));
buffer = new StringBuilder();
currentSize = 0;
}
}
if (buffer.length() > 0) {
blocks.add(new FileBlock(buffer.toString()));
}
}
return blocks;
}
2.多线程解析文件
- 目标:用多线程解析文件块,提取关键数据。
- 方法:
- 每个线程处理一个文件块。
- 将解析结果放入核对队列。
public class FileParser implements Runnable {
private String blockContent;
private BlockingQueue queue;
public FileParser(String blockContent, BlockingQueue queue) {
this.blockContent = blockContent;
this.queue = queue;
}
@Override
public void run() {
String[] lines = blockContent.split("\n");
for (String line : lines) {
Data data = parseLine(line); // 解析每行数据
queue.put(data); // 放入核对队列
}
}
private Data parseLine(String line) {
// 解析逻辑
return new Data();
}
}
3.多线程读取数据库
- 目标:用多线程从数据库读取数据。
- 方法:
- 每个线程读取一部分数据。
- 将读取结果放入核对队列。
public class DBReader implements Runnable {
private BlockingQueue queue;
private int startId;
private int endId;
public DBReader(BlockingQueue queue, int startId, int endId) {
this.queue = queue;
this.startId = startId;
this.endId = endId;
}
@Override
public void run() {
List dbData = fetchDataFromDB(startId, endId); // 从数据库读取数据
for (Data data : dbData) {
queue.put(data); // 放入核对队列
}
}
private List fetchDataFromDB(int startId, int endId) {
// 数据库查询逻辑
return new ArrayList<>();
}
}
4.数据核对
- 目标:核对文件数据和数据库数据。
- 方法:
- 从队列中取出数据,进行比对。
- 将核对结果放入批量入库队列。
public class DataChecker implements Runnable {
private BlockingQueue fileQueue;
private BlockingQueue dbQueue;
private BlockingQueue resultQueue;
public DataChecker(BlockingQueue fileQueue, BlockingQueue dbQueue, BlockingQueue resultQueue) {
this.fileQueue = fileQueue;
this.dbQueue = dbQueue;
this.resultQueue = resultQueue;
}
@Override
public void run() {
while (true) {
Data fileData = fileQueue.take();
Data dbData = dbQueue.take();
if (fileData.equals(dbData)) {
resultQueue.put(fileData); // 核对通过,放入结果队列
}
}
}
}
5.批量入库
- 目标:将核对结果批量写入数据库。
- 方法:
- 从结果队列中取出数据,批量插入。
public class BatchInserter implements Runnable {
private BlockingQueue resultQueue;
public BatchInserter(BlockingQueue resultQueue) {
this.resultQueue = resultQueue;
}
@Override
public void run() {
List batch = new ArrayList<>();
while (true) {
Data data = resultQueue.take();
batch.add(data);
if (batch.size() >= 1000) { // 每1000条批量插入一次
insertBatch(batch);
batch.clear();
}
}
}
private void insertBatch(List batch) {
// 批量插入逻辑
}
}
异常处理四重防御
异常防御机制是防弹装甲,抵御各种意外攻击。
1.文件解析异常捕获
public class FileParser implements Runnable {
@Override
public void run() {
try {
// 解析逻辑...
} catch (Exception e) {
ErrorTracker.log("文件解析异常", e);
ErrorQueue.put(new ErrorData(blockId, e)); // 记录错误块
} finally {
CompletionCounter.fileBlockDone(); // 完成计数器
}
}
}
2.数据库查询重试机制
public List fetchDataWithRetry(int page, int size) {
int retry = 0;
while (retry < 3) {
try {
return jdbcTemplate.query("SELECT ... LIMIT ?,?", page*size, size);
} catch (SQLException e) {
ErrorTracker.log("数据库查询异常", e);
retry++;
Thread.sleep(1000 * retry); // 指数退避
}
}
throw new RetryFailedException("数据库查询重试失败");
}
3.核对完整性保障
public class DataChecker {
private AtomicInteger fileCount = new AtomicInteger(0);
private AtomicInteger dbCount = new AtomicInteger(0);
public void run() {
while (!isDone()) { // 双重判断
Data fileData = fileQueue.poll(1, TimeUnit.SECONDS);
Data dbData = dbQueue.poll(1, TimeUnit.SECONDS);
if (fileData != null) fileCount.incrementAndGet();
if (dbData != null) dbCount.incrementAndGet();
// 核对逻辑...
}
// 最终校验
if (fileCount.get() != dbCount.get()) {
ErrorTracker.log("数据总量不匹配: 文件数据=" + fileCount + " 数据库数据=" + dbCount);
}
}
}
4.批量插入容错设计
public class BatchInserter {
public void insertWithFallback(List batch) {
try {
jdbcTemplate.batchUpdate("INSERT...", batch);
} catch (DataAccessException e) {
ErrorTracker.log("批量插入失败", e);
// 分片重试:将大分片拆成小分片
if (batch.size() > 1) {
insertWithFallback(batch.subList(0, batch.size()/2));
insertWithFallback(batch.subList(batch.size()/2, batch.size()));
} else {
ErrorQueue.put(batch.get(0)); // 单条进入错误队列
}
}
}
}
完整性保障策略
完整性校验 是质量检测仪,确保不遗漏任何数据
保障机制 | 实现方式 |
原子计数器 | 使用AtomicInteger统计文件/数据库数据量 |
双重完成检测 | 1. 生产者完成标记 |
最终一致性校验 | 核对结束后对比文件行数与数据库记录数 |
错误数据重试 | 错误队列数据定时重新投入核对流程 |
水位线监控 | 实时监控各队列数据积压情况,动态调整线程数 |
性能优化技巧
智能优化策略 是涡轮增压引擎,让处理速度持续飙升
- 动态分块策略
// 根据系统实时状态自动调整分块大小
int dynamicBlockSize = Runtime.getRuntime().freeMemory() > 512MB ? 64MB : 16MB;
- 双缓冲队列设计
// 主队列 + 溢出磁盘队列(防止内存溢出)
BlockingQueue overflowQueue = new DiskBackedBlockingQueue<>();
- 智能线程池管理
// 根据任务类型动态调整线程数
ExecutorService executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new SmartThreadFactory() // 监控线程负载
);
最终效果
运行效果
- 文件切分:确保每块内容完整。
- 多线程解析:高效提取文件数据。
- 多线程读取数据库:快速获取数据库数据。
- 数据核对:通过队列实现高效比对。
- 批量入库:将核对结果高效写入数据库。
总结
通过 文件切分 + 多线程 + 队列 的方案,我们可以轻松解决了 大文件解析 和 数据核对 的难题!
这个方案像给数据处理流程装上了 防弹衣 + GPS追踪器,并且这套方案的思路对以下极端情况:
文件解析中途崩溃、数据库连接闪断、核对数据量级差异、网络波动导致插入失败,系统仍能保证:
- 零数据丢失
- 完整核对覆盖
- 自动恢复能力
- 实时状态可观测
欢迎讨论并提出建议!