ThinkPHP6定时任务同步千万级流水数据
多数据源配置
自定义指令
定时同步单次1000条
<?php
declare (strict_types = 1);namespace app\command\SyncDtaTask;use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\Exception;
use think\facade\Db;/*** 分批同步数据*/
class DevLogSyncCmd extends Command
{protected function configure(){// 指令配置$this->setName('DevLogSyncCmd')->setDescription('分批次同步流水数据');}protected function execute(Input $input, Output $output){$tableList = ['dev_log',];//$citys = Db::query('SELECT id FROM city');//同步城市固定写死$citys = [37];foreach ($tableList as $item) {foreach ($citys as $city) {// 检查今年的流水表$todayTable = date('Y');$tblName = "{$item}_{$city}_{$todayTable}";$res = Db::execute("show tables like '{$tblName}'");if ($res === 0) {$output->writeln('补建流水表'.$item.'_'.$city['id'].'_'.$todayTable);Db::execute("create table {$tblName} like {$item}");}// 获取源数据库和目标数据库的连接$sourceDb = Db::connect("source1");$targetDb = Db::connect();//获取待同步最大值,现从目的数据库中获取$maxSyncedIdResult = $targetDb->query("SELECT MAX(id) as max_id FROM {$tblName}");//如果没获取到,则取源数据库的最大ID$maxSyncedId = $maxSyncedIdResult[0]['max_id'] ?? 0;if (!$maxSyncedId){$maxSyncedIdResult = $sourceDb->query("SELECT MAX(id) as max_id FROM {$tblName}");$maxSyncedId = ($maxSyncedIdResult[0]['max_id'] ?? 0) - 10000;}// 获取源库中的数据$sourceData = $sourceDb->query("SELECT * FROM {$tblName} WHERE id > {$maxSyncedId} ORDER BY id asc LIMIT 1000");//方案一: 直接使用插入方法//$targetDb->name($tblName)->insertAll($sourceData);//方案二: 需要处理字段if (!empty($sourceData)) {// 准备插入语句$insertQuery = "INSERT INTO {$tblName} (" . implode(", ", array_keys($sourceData[0])).",flag" . ") VALUES ";// 准备参数绑定的占位符和值$insertValues = [];$params = [];foreach ($sourceData as $index => $row) {$rowValues = [];foreach ($row as $key => $value) {//$paramKey = ":{$key}_{$index}";//$params[$paramKey] = $value;$rowValues[] = !empty($value)?"'".$value."'":"null";}$insertValues[] = "(" . implode(", ", $rowValues) .",1" . ")";}//每一行的value值用逗号分割$insertQuery .= implode(", ", $insertValues);// 执行插入操作$targetDb->execute($insertQuery);$output->writeln("已同步 " . count($sourceData) . " 条记录到 {$tblName}。");} else {$output->writeln("没有需要同步的数据。");}}}}
}
循环同步单次1000条
<?php
declare(strict_types=1);namespace app\command\sycnTask;use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\facade\Db;class SyncAllDevLogCmd extends Command
{protected function configure(){// 指令配置$this->setName('SyncAllDevLogCmd')->setDescription('循环同步全部流水数据');}protected function execute(Input $input, Output $output){$tableList = ['dev_log',];// 同步城市固定写死$citys = [37];foreach ($tableList as $item) {foreach ($citys as $city) {// 检查今天的表$todayTable = date('Y');$tblName = "{$item}_{$city}_{$todayTable}";$res = Db::execute("SHOW TABLES LIKE '{$tblName}'");if ($res === 0) {$output->writeln("补建流水表 {$tblName}");Db::execute("CREATE TABLE {$tblName} LIKE {$item}");}// 获取源数据库和目标数据库的连接$sourceDb = Db::connect("source");$targetDb = Db::connect();$totalSynced = 0;while (true) {// 获取目标表中已同步的最大ID,没有默认为0$maxSyncedIdResult = $targetDb->query("SELECT MAX(id) AS max_id FROM {$tblName}");$maxSyncedId = $maxSyncedIdResult[0]['max_id'] ?? 0;// 从源数据库中获取数据$sourceData = $sourceDb->query("SELECT * FROM {$tblName} WHERE id > {$maxSyncedId} ORDER BY id ASC LIMIT 1000");if (empty($sourceData)) {// 如果没有数据则退出循环$output->writeln('没有需要同步的数据。');break;}// 插入数据到目标数据库$targetDb->name($tblName)->insertAll($sourceData);$count = count($sourceData);$totalSynced += $count;$output->writeln("已同步 {$totalSynced} 条记录到 {$tblName}。");}$output->writeln("同步完成,共同步 {$totalSynced} 条记录。");}}}
}
设置定时任务