今天这节开始,我们就从reverb启动这个过程进行源代码的学习分析。
广播驱动
但是在看reverb启动过程前,这节我们先看看laravel Broadcasting 的新驱动的这部分源码,当我们使用reverb后,广播事件的触发等操作就由新的驱动负责了。
追踪源码的技巧
我是根据reverb是一个新的driver,于是我通过ReverbDrvier
在laravel包目录进行了查找(搜索或者 ctrl+p),然后快速的就在vendor/laravel/framework/src/Illuminate/Broadcasting/BroadcastManager.php
找到了我们新的驱动创建的方法:
protected function createReverbDriver(array $config){return $this->createPusherDriver($config);}
BroadcastManager 的 driver方法 通过 工厂模式来驱动实例的创建,通过策略模式来替换不同的驱动。
通过代码我们知道reverb是兼容适配的pusher,因此看到这里的朋友可以去看看我第二节写的pusher 的知识。
触发一个广播事件是如何到reverb服务的呢?
要找到这个具体的实现,我们还是从追源代码开始,一步步找。首先,我们知道触发事件的调用方法有好几个操作方式,我大致罗列如下:
// event(new \App\Events\DemoPushEvent('你好呀,欢迎你们使用我们的聊天软件'));
// \App\Events\DemoPushEvent::dispatch('你好呀,欢迎你们使用我们的聊天软件');
// broadcast(new \App\Events\DemoPushEvent(Auth::user()->getAuthIdentifier(), '你好呀,欢迎你们使用我们的聊天软件'))->toOthers();
我们就用\App\Events\DemoPushEvent::dispatch
这种方式来追代码。我们通过命令行生成的事件,都会使用一个特性:Dispatchable
,dispatch就是里面的方法,而dispatch实际调用的代码是:
app('events')->dispatch(...$args);
接下来,我们就追到了vendor/laravel/framework/src/Illuminate/Events/Dispatcher.php
里的public function dispatch($event, $payload = [], $halt = false)
方法了,这个dispatch方法的作用就是 触发事件并调用侦听器。继续往下,我们看到了找到核心的方法:protected function invokeListeners($event, $payload, $halt = false)
,这个方法里面我们要找的代码是:
if ($this->shouldBroadcast($payload)) {$this->broadcastEvent($payload[0]);}
而shouldBroadcast
也对应了文档说明的,如何确定是:确定负载是否具有可广播事件,同时可以广播的事件。关键条件:实现ShouldBroadcast和broadcastWhen是否 为真
protected function shouldBroadcast(array $payload){return isset($payload[0]) &&$payload[0] instanceof ShouldBroadcast &&$this->broadcastWhen($payload[0]);}
在继续往下走,进入broadcastEvent后,我们回到了vendor/laravel/framework/src/Illuminate/Broadcasting/BroadcastManager.php
BroadcastManager里面,然后我们就开始分析这个queue方法,它的作用实际就是:将给定事件排队以进行广播,不过如果事件实现的是ShouldBroadcastNow
接口,那么就会立即广播出去。
public function queue($event){if ($event instanceof ShouldBroadcastNow ||(is_object($event) &&method_exists($event, 'shouldBroadcastNow') &&$event->shouldBroadcastNow())) {return $this->app->make(BusDispatcherContract::class)->dispatchNow(new BroadcastEvent(clone $event));}$queue = null;if (method_exists($event, 'broadcastQueue')) {$queue = $event->broadcastQueue();} elseif (isset($event->broadcastQueue)) {$queue = $event->broadcastQueue;} elseif (isset($event->queue)) {$queue = $event->queue;}$broadcastEvent = new BroadcastEvent(clone $event);if ($event instanceof ShouldBeUnique) {$broadcastEvent = new UniqueBroadcastEvent(clone $event);if ($this->mustBeUniqueAndCannotAcquireLock($broadcastEvent)) {return;}}$this->app->make('queue')->connection($event->connection ?? null)->pushOn($queue, $broadcastEvent);}
因此,我们就走到了vendor/laravel/framework/src/Illuminate/Bus/Dispatcher.php
文件里来分析。
ShouldBroadcastNow 立即触发
public function dispatchNow($command, $handler = null){$uses = class_uses_recursive($command);if (isset($uses[InteractsWithQueue::class], $uses[Queueable::class]) && ! $command->job) {$command->setJob(new SyncJob($this->container, json_encode([]), 'sync', 'sync'));}if ($handler || $handler = $this->getCommandHandler($command)) {$callback = function ($command) use ($handler) {$method = method_exists($handler, 'handle') ? 'handle' : '__invoke';return $handler->{$method}($command);};} else {$callback = function ($command) {$method = method_exists($command, 'handle') ? 'handle' : '__invoke';return $this->container->call([$command, $method]);};}return $this->pipeline->send($command)->through($this->pipes)->then($callback);}
这个方法的作用是:将命令分派到当前进程中的相应处理程序,而不使用同步队列。
整个方法还是有些晦涩,我就替观众老爷用权威的AI给我们进行这个代码分析:
这段代码来自 Laravel 11 的 Illuminate\Bus\Dispatcher
,它的 dispatchNow()
方法用于同步地执行 Laravel 任务(Command)。在 Laravel 的 任务派发机制 中,这个方法用于立即执行给定的命令(Command),而不会通过队列异步执行。
1. 处理命令的队列属性
$uses = class_uses_recursive($command);if (isset($uses[InteractsWithQueue::class], $uses[Queueable::class]) && ! $command->job) {$command->setJob(new SyncJob($this->container, json_encode([]), 'sync', 'sync'));
}
解析
class_uses_recursive($command)
:获取$command
类及其所有父类、trait 使用的 trait 列表。InteractsWithQueue::class
和Queueable::class
:- 如果
$command
使用了这两个 trait,说明它是一个可以被队列化的任务。 ! $command->job
:如果这个任务没有被分配到具体的 Job,说明它还没有真正进入队列。setJob(new SyncJob(...))
:如果满足条件,就创建一个同步任务 (SyncJob
),表示这个任务将在当前进程内立即执行,而不是放入队列异步执行。
- 如果
2. 确定任务执行的处理器
if ($handler || $handler = $this->getCommandHandler($command)) {$callback = function ($command) use ($handler) {$method = method_exists($handler, 'handle') ? 'handle' : '__invoke';return $handler->{$method}($command);};
} else {$callback = function ($command) {$method = method_exists($command, 'handle') ? 'handle' : '__invoke';return $this->container->call([$command, $method]);};
}
解析
getCommandHandler($command)
:获取与$command
关联的处理器(Handler)。- 如果
$command
具有一个特定的 Handler 类,那么它将使用handle()
或__invoke()
方法来执行该任务。 - 如果
$command
自身 具有handle()
方法,则直接调用$command->handle()
。 $this->container->call([...])
:使用 Laravel 服务容器 调用handle()
方法,支持 依赖注入。
3. 通过管道(Pipeline)执行命令
return $this->pipeline->send($command)->through($this->pipes)->then($callback);
解析
- Laravel Pipeline 是一个中间件管道,类似 HTTP 中间件:
send($command)
: 将$command
任务送入处理流程。through($this->pipes)
: 通过 任务中间件 进行处理,比如日志、权限检查等。then($callback)
: 执行最终的任务逻辑(上面定义的$callback
)。
流程总结
-
检查
$command
是否支持队列:- 如果它实现了
Queueable
,但没有job
,就为它创建一个SyncJob
,让它立即执行,而不是进入队列。
- 如果它实现了
-
确定如何执行
$command
:- 如果
$command
有专门的 处理器 (Handler),就用handle()
或__invoke()
方法执行。 - 如果
$command
自身 有handle()
方法,就直接调用它。
- 如果
-
通过 Laravel Pipeline 处理任务:
- 任务可能会经过多个中间件(类似 HTTP 请求的中间件)。
- 最终,
then($callback)
负责真正执行任务。
dispatchNow()
强制同步执行 Laravel 任务,不会进入队列。- 如果命令使用了
Queueable
,但未进入队列,dispatchNow()
会创建SyncJob
,仍然同步执行。 - 任务可以有单独的 Handler,Laravel 会自动解析并执行它。
- 使用
Pipeline
允许任务经过多个中间件,例如日志、权限检查等。
好的,上面的,AI说的很清楚了,那么我通过dump给大家看下拿到的$command
到底是啥:
Illuminate\Broadcasting\BroadcastEvent Object (
[event] => App\Events\DemoPushEvent Object (
[userId] => 1 [message] => 你好呀,欢迎你们使用我们的聊天软件 [type] => system [socket] => ) [tries] => [timeout] => [backoff] => [maxExceptions] => [connection] => [queue] => [delay] => [afterCommit] => [middleware] => Array ( ) [chained] => Array ( ) [chainConnection] => [chainQueue] => [chainCatchCallbacks] => )
打印后,我们就又悟了,继续走到:Illuminate\Broadcasting\BroadcastEvent
, 到了这里,我们就看handle方法:处理排队的作业
public function handle(BroadcastingFactory $manager){$name = method_exists($this->event, 'broadcastAs')? $this->event->broadcastAs() : get_class($this->event);$channels = Arr::wrap($this->event->broadcastOn());if (empty($channels)) {return;}$connections = method_exists($this->event, 'broadcastConnections')? $this->event->broadcastConnections(): [null];$payload = $this->getPayloadFromEvent($this->event);foreach ($connections as $connection) {$manager->connection($connection)->broadcast($this->getConnectionChannels($channels, $connection),$name,$this->getConnectionPayload($payload, $connection));}}
整个方法里,$connections
中的broadcastConnections
也是文档上提到的,可以针对事件定义广播连接,获取应广播事件的广播连接。 事件如果没有连接,默认的值:[null]
,然后是$payload
拿到就是你事件定义的属性后解析的数组。再然后就是遍历连接执行广播了。
广播的操作又把我们带回到
meipian.cn/5bhmnnku?share_depth=1
meipian.cn/5bhmo6w4?share_depth=1
meipian.cn/5bhmnxbi?share_depth=1
meipian.cn/5bhmohm2?share_depth=1
meipian.cn/5bhmozm5?share_depth=1
meipian.cn/5bhmosn2?share_depth=1
meipian.cn/5bhmpb5a?share_depth=1
meipian.cn/5bhmp5ti?share_depth=1
meipian.cn/5bhmpinb?share_depth=1
meipian.cn/5bhmpn52?share_depth=1
meipian.cn/5bhmpxqu?share_depth=1
meipian.cn/5bhmqjn3?share_depth=1
meipian.cn/5bhmq7j3?share_depth=1
meipian.cn/5bhmqudo?share_depth=1
meipian.cn/5bhms7am?share_depth=1
meipian.cn/5bhmrvjm?share_depth=1
meipian.cn/5bhmt589?share_depth=1
meipian.cn/5bhmsqaq?share_depth=1
meipian.cn/5bhmtu9m?share_depth=1
meipian.cn/5bhmti5p?share_depth=1
meipian.cn/5bhmueuw?share_depth=1
meipian.cn/5bhmu5ok?share_depth=1
meipian.cn/5bhmuu21?share_depth=1
meipian.cn/5bhmum5t?share_depth=1
meipian.cn/5bhmvd4z?share_depth=1
meipian.cn/5bhmv2fk?share_depth=1
meipian.cn/5bhmvpob?share_depth=1
meipian.cn/5bhmwwky?share_depth=1
meipian.cn/5bhmwqxj?share_depth=1
meipian.cn/5bhmxbcj?share_depth=1
meipian.cn/5bhmx490?share_depth=1
meipian.cn/5bhnduzv?share_depth=1
meipian.cn/5bhnbyb6?share_depth=1
meipian.cn/5bhncrhc?share_depth=1
meipian.cn/5bhncfo6?share_depth=1
meipian.cn/5bhndiqy?share_depth=1
meipian.cn/5bhngg1o?share_depth=1
meipian.cn/5bhneg3c?share_depth=1
meipian.cn/5bhnety8?share_depth=1
meipian.cn/5bhnfhwx?share_depth=1
meipian.cn/5bhnfzfc?share_depth=1
meipian.cn/5bhnf4lu?share_depth=1
meipian.cn/5bhngwce?share_depth=1
meipian.cn/5bhnl8kn?share_depth=1
meipian.cn/5bhnk1v2?share_depth=1
meipian.cn/5bhnj1xy?share_depth=1
meipian.cn/5bhni6ze?share_depth=1
meipian.cn/5bhnikc8?share_depth=1
meipian.cn/5bhnkosx?share_depth=1
meipian.cn/5bhnhryw?share_depth=1
meipian.cn/5bhnheuq?share_depth=1
meipian.cn/5bhnlj68?share_depth=1
meipian.cn/5bhnmrha?share_depth=1
meipian.cn/5bhnmccz?share_depth=1
meipian.cn/5bhnlt2l?share_depth=1
meipian.cn/5bhnowve?share_depth=1
meipian.cn/5bhnn8f8?share_depth=1
meipian.cn/5bhnopx1?share_depth=1
meipian.cn/5bhno0zd?share_depth=1
meipian.cn/5bhnof4l?share_depth=1
meipian.cn/5bhnnoah?share_depth=1
meipian.cn/5bhnpasf?share_depth=1
meipian.cn/5bhnqrzo?share_depth=1
meipian.cn/5bhnpocz?share_depth=1
meipian.cn/5bhnqd0s?share_depth=1
meipian.cn/5bhnpzpo?share_depth=1
meipian.cn/5bhnrdk2?share_depth=1
meipian.cn/5bhnr3km?share_depth=1
meipian.cn/5bhnuxfb?share_depth=1
meipian.cn/5bhnv7kr?share_depth=1
meipian.cn/5bhnu7fl?share_depth=1
meipian.cn/5bhnunft?share_depth=1
meipian.cn/5bhnvgax?share_depth=1
meipian.cn/5bhns228?share_depth=1
meipian.cn/5bhnrpk7?share_depth=1
meipian.cn/5bhnxm20?share_depth=1
meipian.cn/5bhnwrn4?share_depth=1
meipian.cn/5bhnx7n4?share_depth=1
meipian.cn/5bhnz7wh?share_depth=1
meipian.cn/5bhnw2wl?share_depth=1
meipian.cn/5bhnwck9?share_depth=1
meipian.cn/5bhnzha0?share_depth=1
meipian.cn/5bhnyn70?share_depth=1
meipian.cn/5bhnyabn?share_depth=1
meipian.cn/5bhnxzl4?share_depth=1
meipian.cn/5bhnzwln?share_depth=1
meipian.cn/5bhnvt5m?share_depth=1
meipian.cn/5bho0krd?share_depth=1
meipian.cn/5bho0xfw?share_depth=1
meipian.cn/5bho06m6?share_depth=1
meipian.cn/5bhohz3m?share_depth=1
meipian.cn/5bho25nh?share_depth=1
meipian.cn/5bhohjx8?share_depth=1
meipian.cn/5bhojf0i?share_depth=1
meipian.cn/5bhoil9b?share_depth=1
meipian.cn/5bhoj1in?share_depth=1
meipian.cn/5bhoid65?share_depth=1
meipian.cn/5bhok1bk?share_depth=1
meipian.cn/5bhopne0?share_depth=1
meipian.cn/5bhokhda?share_depth=1
meipian.cn/5bhojph3?share_depth=1
meipian.cn/5bhoqvpl?share_depth=1
meipian.cn/5bhoqjka?share_depth=1
meipian.cn/5bhora4n?share_depth=1
meipian.cn/5bhoq34n?share_depth=1
meipian.cn/5bhot1qy?share_depth=1
meipian.cn/5bhoseu9?share_depth=1
meipian.cn/5bhos217?share_depth=1
meipian.cn/5bhosrnw?share_depth=1
meipian.cn/5bhorr4j?share_depth=1
meipian.cn/5bhou6s8?share_depth=1
meipian.cn/5bhoulku?share_depth=1
meipian.cn/5bhotq35?share_depth=1
meipian.cn/5bhotd4p?share_depth=1
meipian.cn/5bhp64ov?share_depth=1
meipian.cn/5bhovdyj?share_depth=1
meipian.cn/5bhouxj2?share_depth=1
meipian.cn/5bhp6y0y?share_depth=1
meipian.cn/5bhp6pca?share_depth=1
meipian.cn/5bhp6dyh?share_depth=1
meipian.cn/5bhp9x16?share_depth=1
meipian.cn/5bhpbnwg?share_depth=1
meipian.cn/5bhp7aw2?share_depth=1
meipian.cn/5bhpb19g?share_depth=1
meipian.cn/5bhpanjy?share_depth=1
meipian.cn/5bhpa89z?share_depth=1
meipian.cn/5bhpe6ja?share_depth=1
meipian.cn/5bhpc2yh?share_depth=1
meipian.cn/5bhpcs9a?share_depth=1
meipian.cn/5bhpw27x?share_depth=1
meipian.cn/5bhpdrhd?share_depth=1
meipian.cn/5bhpepqj?share_depth=1
meipian.cn/5bhpus3g?share_depth=1
meipian.cn/5bhpd71e?share_depth=1
meipian.cn/5bhpvczh?share_depth=1
meipian.cn/5bhpxlt4?share_depth=1
meipian.cn/5bhpvrsc?share_depth=1
meipian.cn/5bhpxcxp?share_depth=1
meipian.cn/5bhpx1bz?share_depth=1
meipian.cn/5bhpy07s?share_depth=1
meipian.cn/5bhpwk9u?share_depth=1
meipian.cn/5bhpxr7p?share_depth=1
meipian.cn/5bhq04uk?share_depth=1
meipian.cn/5bhq3j68?share_depth=1
meipian.cn/5bhpyxhd?share_depth=1
meipian.cn/5bhqd1e6?share_depth=1
meipian.cn/5bhpzst5?share_depth=1
meipian.cn/5bhq0dxr?share_depth=1
meipian.cn/5bhpyic8?share_depth=1
meipian.cn/5bhqfnd8?share_depth=1
meipian.cn/5bhqfa36?share_depth=1
meipian.cn/5bhqgi65?share_depth=1
meipian.cn/5bhqg7dp?share_depth=1
meipian.cn/5bhqfxme?share_depth=1
meipian.cn/5bhql963?share_depth=1
meipian.cn/5bhqh46e?share_depth=1
meipian.cn/5bhqhe99?share_depth=1
meipian.cn/5bhqgrci?share_depth=1
meipian.cn/5bhqljjs?share_depth=1
meipian.cn/5bhqo5gv?share_depth=1
meipian.cn/5bhqnkm6?share_depth=1
meipian.cn/5bhqnbxl?share_depth=1
meipian.cn/5bhqow5w?share_depth=1
meipian.cn/5bhqojrf?share_depth=1
meipian.cn/5bhqmy6u?share_depth=1
meipian.cn/5bhqqz1x?share_depth=1
meipian.cn/5bhqpvr8?share_depth=1
meipian.cn/5bhqqi8e?share_depth=1
meipian.cn/5bhqq1yd?share_depth=1
meipian.cn/5bhqpkt3?share_depth=1
meipian.cn/5bhqrqmf?share_depth=1
meipian.cn/5bhqrbx3?share_depth=1
meipian.cn/5bhqsd5o?share_depth=1
meipian.cn/5bhqy1jy?share_depth=1
meipian.cn/5bhqxau4?share_depth=1
meipian.cn/5bhqxn17?share_depth=1
meipian.cn/5bhqs2pg?share_depth=1
meipian.cn/5bhqzf0o?share_depth=1
meipian.cn/5bhqynnn?share_depth=1
meipian.cn/5bhqz4bs?share_depth=1
meipian.cn/5bhqybfj?share_depth=1
meipian.cn/5bhr22lm?share_depth=1
meipian.cn/5bhr1pps?share_depth=1
meipian.cn/5bhr80wa?share_depth=1
meipian.cn/5bhr1718?share_depth=1
meipian.cn/5bhr0vqx?share_depth=1
meipian.cn/5bhr0mgc?share_depth=1
meipian.cn/5bhr9ey8?share_depth=1
meipian.cn/5bhr8khy?share_depth=1
meipian.cn/5bhrc4aw?share_depth=1
meipian.cn/5bhra9qt?share_depth=1
meipian.cn/5bhrb0bi?share_depth=1
meipian.cn/5bhr9wyo?share_depth=1
meipian.cn/5bhrba5l?share_depth=1
meipian.cn/5bhrakwf?share_depth=1
meipian.cn/5bhrcmim?share_depth=1
meipian.cn/5bhrebcy?share_depth=1
meipian.cn/5bhrd4uc?share_depth=1
meipian.cn/5bhrdxxe?share_depth=1
meipian.cn/5bhrdkkx?share_depth=1
meipian.cn/5bhrf4yr?share_depth=1
meipian.cn/5bhreof2?share_depth=1
meipian.cn/5bhrghoi?share_depth=1
meipian.cn/5bhrg2g2?share_depth=1
meipian.cn/5bhrfbc6?share_depth=1
meipian.cn/5bhrifqg?share_depth=1
meipian.cn/5bhrh0uw?share_depth=1
meipian.cn/5bhrhpq3?share_depth=1
meipian.cn/5bhrhboh?share_depth=1
meipian.cn/5bhrgts3?share_depth=1
meipian.cn/5bhrjh5j?share_depth=1
meipian.cn/5bhrjtjz?share_depth=1
meipian.cn/5bhrj3tr?share_depth=1
meipian.cn/5bhrkm9b?share_depth=1
meipian.cn/5bhrk8lr?share_depth=1
meipian.cn/5bhrn7ps?share_depth=1
meipian.cn/5bhrnnrm?share_depth=1
meipian.cn/5bhrmoi6?share_depth=1
meipian.cn/5bhrlsl3?share_depth=1
meipian.cn/5bhrmfme?share_depth=1
meipian.cn/5bhrm2h4?share_depth=1
meipian.cn/5bhrl403?share_depth=1
meipian.cn/5bhrofde?share_depth=1
meipian.cn/5bhrshru?share_depth=1
meipian.cn/5bhrt7ls?share_depth=1
meipian.cn/5bhrovj7?share_depth=1
meipian.cn/5bhrsvsq?share_depth=1
meipian.cn/5bhromxu?share_depth=1
meipian.cn/5bhrvfmg?share_depth=1
meipian.cn/5bhrtqdj?share_depth=1
meipian.cn/5bhrv56c?share_depth=1
meipian.cn/5bhruopj?share_depth=1
meipian.cn/5bhru6kr?share_depth=1
meipian.cn/5bhrtg21?share_depth=1
meipian.cn/5bhrxkj4?share_depth=1
meipian.cn/5bhrx424?share_depth=1
meipian.cn/5bhrwm4x?share_depth=1
meipian.cn/5bhrvwm2?share_depth=1
meipian.cn/5bhry26c?share_depth=1
meipian.cn/5bhs2miv?share_depth=1
meipian.cn/5bhs3l05?share_depth=1
meipian.cn/5bhs113x?share_depth=1
meipian.cn/5bhs2yqe?share_depth=1
meipian.cn/5bhs0cxi?share_depth=1
meipian.cn/5bhrzwne?share_depth=1
meipian.cn/5bhrz5bl?share_depth=1
meipian.cn/5bhryji9?share_depth=1
meipian.cn/5bhs4509?share_depth=1
meipian.cn/5bhs5trg?share_depth=1
meipian.cn/5bhs4g0o?share_depth=1
meipian.cn/5bhs5bzs?share_depth=1
meipian.cn/5bhs4xts?share_depth=1
meipian.cn/5bhs6k4n?share_depth=1