ThinkPHP6同步千万级流水数据

news/2024/11/18 15:25:12/文章来源:https://www.cnblogs.com/aeolian/p/18373757

ThinkPHP6定时任务同步千万级流水数据

多数据源配置

自定义指令

定时同步单次1000条

<?php
declare (strict_types = 1);namespace app\command\SyncDtaTask;use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\Exception;
use think\facade\Db;/*** 分批同步数据*/
class DevLogSyncCmd extends Command
{protected function configure(){// 指令配置$this->setName('DevLogSyncCmd')->setDescription('分批次同步流水数据');}protected function execute(Input $input, Output $output){$tableList = ['dev_log',];//$citys = Db::query('SELECT id FROM city');//同步城市固定写死$citys = [37];foreach ($tableList as $item) {foreach ($citys as $city) {// 检查今年的流水表$todayTable = date('Y');$tblName = "{$item}_{$city}_{$todayTable}";$res = Db::execute("show tables like '{$tblName}'");if ($res === 0) {$output->writeln('补建流水表'.$item.'_'.$city['id'].'_'.$todayTable);Db::execute("create table {$tblName} like {$item}");}// 获取源数据库和目标数据库的连接$sourceDb = Db::connect("source1");$targetDb = Db::connect();//获取待同步最大值,现从目的数据库中获取$maxSyncedIdResult = $targetDb->query("SELECT MAX(id) as max_id FROM {$tblName}");//如果没获取到,则取源数据库的最大ID$maxSyncedId = $maxSyncedIdResult[0]['max_id'] ?? 0;if (!$maxSyncedId){$maxSyncedIdResult = $sourceDb->query("SELECT MAX(id) as max_id FROM {$tblName}");$maxSyncedId = ($maxSyncedIdResult[0]['max_id'] ?? 0) - 10000;}// 获取源库中的数据$sourceData = $sourceDb->query("SELECT * FROM {$tblName} WHERE id > {$maxSyncedId} ORDER BY id asc LIMIT 1000");//方案一: 直接使用插入方法//$targetDb->name($tblName)->insertAll($sourceData);//方案二: 需要处理字段if (!empty($sourceData)) {// 准备插入语句$insertQuery = "INSERT INTO {$tblName} (" . implode(", ", array_keys($sourceData[0])).",flag" . ") VALUES ";// 准备参数绑定的占位符和值$insertValues = [];$params = [];foreach ($sourceData as $index => $row) {$rowValues = [];foreach ($row as $key => $value) {//$paramKey = ":{$key}_{$index}";//$params[$paramKey] = $value;$rowValues[] = !empty($value)?"'".$value."'":"null";}$insertValues[] = "(" . implode(", ", $rowValues) .",1" . ")";}//每一行的value值用逗号分割$insertQuery .= implode(", ", $insertValues);// 执行插入操作$targetDb->execute($insertQuery);$output->writeln("已同步 " . count($sourceData) . " 条记录到 {$tblName}。");} else {$output->writeln("没有需要同步的数据。");}}}}
}

循环同步单次1000条

<?php
declare(strict_types=1);namespace app\command\sycnTask;use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\facade\Db;class SyncAllDevLogCmd extends Command
{protected function configure(){// 指令配置$this->setName('SyncAllDevLogCmd')->setDescription('循环同步全部流水数据');}protected function execute(Input $input, Output $output){$tableList = ['dev_log',];// 同步城市固定写死$citys = [37];foreach ($tableList as $item) {foreach ($citys as $city) {// 检查今天的表$todayTable = date('Y');$tblName = "{$item}_{$city}_{$todayTable}";$res = Db::execute("SHOW TABLES LIKE '{$tblName}'");if ($res === 0) {$output->writeln("补建流水表 {$tblName}");Db::execute("CREATE TABLE {$tblName} LIKE {$item}");}// 获取源数据库和目标数据库的连接$sourceDb = Db::connect("source");$targetDb = Db::connect();$totalSynced = 0;while (true) {// 获取目标表中已同步的最大ID,没有默认为0$maxSyncedIdResult = $targetDb->query("SELECT MAX(id) AS max_id FROM {$tblName}");$maxSyncedId = $maxSyncedIdResult[0]['max_id'] ?? 0;// 从源数据库中获取数据$sourceData = $sourceDb->query("SELECT * FROM {$tblName} WHERE id > {$maxSyncedId} ORDER BY id ASC LIMIT 1000");if (empty($sourceData)) {// 如果没有数据则退出循环$output->writeln('没有需要同步的数据。');break;}// 插入数据到目标数据库$targetDb->name($tblName)->insertAll($sourceData);$count = count($sourceData);$totalSynced += $count;$output->writeln("已同步 {$totalSynced} 条记录到 {$tblName}。");}$output->writeln("同步完成,共同步 {$totalSynced} 条记录。");}}}
}

设置定时任务

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/785437.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AtCoder Beginner Contest 047

A - Fighting over Candies 简单排序。 #include <bits/stdc++.h>using namespace std; using i64 = long long;int main() {ios::sync_with_stdio(false), cin.tie(nullptr);vector<int> a(3);cin >> a[0] >> a[1] >> a[2];sort(a.begin(), a.e…

CRTP 和静态多态

c++古典面试问题之一:面向对象编程三大特性--封装,继承,多态 c++古典面试问题之二:如何实现多态-- 当基类指针指向派生类对象,并通过这个指针调用在派生类中被重写的虚函数基于上述知识点,今天我们讲下另一种多态实现方式:CRTP (curiously recurring template pattern)虚…

小程序直传oss

直传使用 const host = <host>; const signature = <signatureString>; const ossAccessKeyId = <accessKey>; const policy = <policyBase64Str>; const key = <object name>; const securityToken = <x-oss-security-token>; const fi…

呼死你 手机轰炸机 (29021243

基于 短信轰炸机 原理研究并实现之后 又研究起了电话轰炸机 实现原理其实大同小异,最终的区别的 用户在进行短信发送 并未收到短信的情况下 【产生的原因有 网络信号原因、用户手机自动屏蔽原因】 可以利用第三方平台提供的语音验证码进行发送 这种情况也是通过fiddler进行抓包…

pygame物体碰撞

代码:#coding=utf-8import os,sys,re,time import pygame import random import math from win32api import GetSystemMetrics from tkinter import messageboxpygame.init() pygame.display.set_caption("我的游戏")percent = 0.6 screen_width = GetSystemMetrics…

NSSCTF [HNCTF 2022 Week1]Interesting_include

NSSCTF [HNCTF 2022 Week1]Interesting_include<?php //WEB手要懂得搜索 //flag in ./flag.phpif(isset($_GET[filter])){$file = $_GET[filter];if(!preg_match("/flag/i", $file)){die("error");}include($file); }else{highlight_file(__FILE__); }…

02-HTMLJS相关练习

1、使用 html 写一个网页,要求满足以下条件: (1)网页中含有任意一张图片,图片路径使用绝对路径(这里绝对路径无法识别故使用相对路径),鼠标悬停在图片时出现“马哥教育”文本,且点击图片可跳转至马哥教育官方页面 (2)网页中包含账号、密码登录,且账号提前定义好是 …

织梦dedecms判断文章是否为推荐文章

{dede:arclist row=10 typeid=typeid titlelen=36 runphp=yes} {dede:loop value=$dls}{if preg_match(/c/, $fields[flag])}<strong>推荐文章: </strong>{/if}<a href="[field:arcurl]" title="[field:title]">[field:title]</a>…

织梦dedecms判断缩略图是否存在

{dede:field name=litpic runphp=yes} if(!empty(@me)) {<img src="@me" alt="缩略图" /> } else {<img src="/images/no-image.png" alt="默认图片" /> } [/dede:field]扫码添加技术【解决问题】专注中小企业网站建设、网…

织梦dedecms判断子菜单是否存在

{dede:channeltype typeid=typeid runphp=yes} if(@list != ) {<ul class="submenu">{dede:loop value=@list}<li><a href="[url]">[@typename]</a></li>{/dede:loop}</ul> } else {"没有子菜单" } {/dede:…

使用SiliconCloud快速体验SimpleRAG(手把手教程)

使用SiliconCloud快速体验SimpleRAG(手把手教程)SiliconCloud介绍 SiliconCloud 基于优秀的开源基础模型,提供高性价比的 GenAI 服务。 不同于多数大模型云服务平台只提供自家大模型 API,SiliconCloud上架了包括 Qwen、DeepSeek、GLM、Yi、Mistral、LLaMA 3、SDXL、Instant…

axis参数讲解

axis在python使用中非常常见,比如numpy、pandas等使用情景。但是笔者几乎每次都会忘记axis = 0和axis = 1各自对应的是行还是列,最终决定写下笔记以防再次忘记。axis的作用:指明以行为单位进行处理数据,还是以列为单位处理数据。下图就已经很清晰地展示了axis = 0和axis = …