第4代多进程任务系统 采集模块单机 14 进程 采集效果图 作用实时抓取指定ASIN 价格 峰值11页/S
重构了一下 数据处理以及采集系统[SP4 项目] 使结构跟简单明了 以及更方便的添加 数据处理模块[过滤 数据处理]
由于我将资源的预先分配改为了按需求实时分配
由于是并发执行 如果没有锁对额支持在同时处理一些任务的时候 会出现并发性问题
比如 突然出现大量的并发查询修改同一条数据 以及 产生在严格模式下的资源关联问题
并且同时 的大量查询会导致突然的大量资源占用并导致数据库宕机
我们来做个并发读写实验
不加锁
$worker_num = 200; for ($i = 0; $i < $worker_num; $i++) { $process = new \swoole_process( function (\swoole_process $process) use ($i) { sleep(rand(1, 2)); echo"P".getmypid()."[".($i+1)."|->".increment()."]\r\n"; }); $pid = $process->start(); $workers[$pid] = $process; } function increment() { if (!file_exists('testlockfile')) { file_put_contents('testlockfile', 0); } $num = file_get_contents('testlockfile'); $num = $num + 1; file_put_contents('testlockfile', $num); return file_get_contents('testlockfile'); }
测试结果
P5808 进程ID[200任务号|->197 读取的值] P13388[188|->193] P9240[189|->194] P10188[194|->195] P11216[195|->196] P5808[200|->197] Administrator@CN-20160707PGJG /sp4 $
我们可以看到开200 个进程 去读写文件 按道理文件的值应该是200 但并不是
加锁
$worker_num = 200; for ($i = 0; $i < $worker_num; $i++) { $process = new \swoole_process( function (\swoole_process $process) use ($i) { sleep(rand(1, 2)); $lock = new \Weicot\Core\SpLock("exc"); $lock->startLock(); $status = $lock->Lock(); if (!$status) { exit ("lock error"); } echo"P".getmypid()."[".($i+1)."|->".increment()."]\r\n"; $lock->unlock(); $lock->endLock(); }); $pid = $process->start(); $workers[$pid] = $process; } function increment() { if (!file_exists('testlockfile')) { file_put_contents('testlockfile', 0); } $num = file_get_contents('testlockfile'); $num = $num + 1; file_put_contents('testlockfile', $num); return file_get_contents('testlockfile'); }
结果
//我测试了三次 数据完全正确 P5808 进程ID[200任务号|->600 3次累计] P17092[169|->589] P11180[178|->590] P15504[179|->591] P12604[182|->592] P17248[185|->593] P13616[186|->594] P14812[187|->595] P13080[190|->596] P8100[191|->597] P1084[197|->598] P13304[199|->599] P15652[200|->600] Administrator@CN-20160707PGJG /sp4
这里用的是文件锁 内存锁效率更高
本进程锁用来解决php在并发时候的锁控制
他根据文件锁来模拟多个进程之间的锁定,效率不是非常高。如果文件建立在内存中,可以大大提高效率。
在使用过程中,会在指定的目录产生$hashNum个文件用来产生对应粒度的锁。不同锁之间可以并行执行。
这有点类似mysql的innodb的行级锁,不同行的更新可以并发的执行。
<?php /** * Created by PhpStorm. * User: Administrator * Date: 2017/10/9 0009 * Time: 9:51 */ namespace Weicot\Core; /** * 进程锁 * 本进程锁用来解决php在并发时候的锁控制 * 他根据文件锁来模拟多个进程之间的锁定,效率不是非常高。如果文件建立在内存中,可以大大提高效率。 * PHPLOCK在使用过程中,会在指定的目录产生$hashNum个文件用来产生对应粒度的锁。不同锁之间可以并行执行。 * 这有点类似mysql的innodb的行级锁,不同行的更新可以并发的执行。 * @link http://code.google.com/p/phplock/ * 本带码与源代码不一致 有所改动( weicot.com ajing) * */ class SpLock { /** * 锁文件路径 * * @var String */ private $path = null; /** * 文件句柄 * * @var resource */ private $fp = null; /** * 锁的粒度控制,设置的越大粒度越小 * * @var int */ private $hashNum = 100; private $name; private $eAccelerator = false; private $basePath; /** * 构造函数 * * @param string $path 锁的存放目录,以"/"结尾 * @param string $name 锁名称,一般在对资源加锁的时候,会命名一个名字,这样不同的资源可以并发的进行。 */ public function __construct($name) { $path = $this->basePath = BASE_PATH . "/var/lock/"; $this->path = $path . ($this->mycrc32($name) % $this->hashNum) . '.txt'; $this->eAccelerator = function_exists("eaccelerator_lock"); $this->name = $name; } /** * crc32的封装 * * @param string $string * @return int */ private function mycrc32($string) { $crc = abs(crc32($string)); if ($crc & 0x80000000) { $crc ^= 0xffffffff; $crc += 1; } return $crc; } /** * 初始化锁,是加锁前的必须步骤 * 打开一个文件 * */ public function startLock() { if (!$this->eAccelerator) { $this->fp = fopen($this->path, "w+"); } } /** * 开始加锁 * * @return bool 加锁成功返回true,失败返回false */ public function lock() { if (!$this->eAccelerator) { if ($this->fp === false) { return false; } return flock($this->fp, LOCK_EX); } else { return eaccelerator_lock($this->name); } } /** * 释放锁 * */ public function unlock() { if (!$this->eAccelerator) { if ($this->fp !== false) { flock($this->fp, LOCK_UN); clearstatcache(); } } else { return eaccelerator_unlock($this->name); } } /** * 结束锁控制 * */ public function endLock() { if (!$this->eAccelerator) { return fclose($this->fp); } } /** * @return bool */ public function locklBeg() { $this->startLock(); return $this->lock(); } /** * @return bool */ public function lockOff() { $this->unlock(); return $this->endLock(); } }