Gearman jobs are passed to more than one worker (PHP) -


i have problem in php application gearman jobs passed more 1 worker. reduce code reproduce 1 file. not sure if bug in gearman or bug in pecl library or maybe in code.

here code reproduce error:

#!/usr/bin/php <?php  // try 'standard', 'exception' or 'exception-sleep'. $sworker = 'exception';    // detect run mode "client" or "worker". if (!isset($argv[1]))     $smode = 'client'; else     $smode = 'worker-' . $sworker;  $slogfilepath = __dir__ . '/log.txt';  switch ($smode) {      case 'client':          // remove queued test jobs , quit if there test workers running.         prepare();          // init greaman client.         $client= new gearmanclient;         $client->addserver();          // empty log file.         file_put_contents($slogfilepath, '');          // start worker processes.         $apids = array();                ($i = 0; $i < 100; $i++)             $apids[] = exec('php ' . __file__ . ' worker > /dev/null 2>&1 & echo $!');          // start jobs. try dohigh(), dobackground() ,         // dobackgroundhigh();         ($i = 0; $i < 50; $i++)             $client->donormal('test', $i);          // wait second (when running jobs in background).         // sleep(1);          // prepare log file entries.         $ajobs = array();         $alines = file($slogfilepath);         foreach ($alines $sline) {             list($stime, $spid, $shandle, $sworkload) = $aattributes = explode("\t", $sline);             $sworkload = trim($sworkload);             if (!isset($ajobs[$sworkload]))                 $ajobs[$sworkload] = array();             $ajobs[$sworkload][] = $aattributes;         }          // remove jobs have been passed 1 worker expected.         foreach ($ajobs $sworkload => $ajob) {             if (count($ajob) === 1)                 unset($ajobs[$sworkload]);         }          echo "\n\n";          if (empty($ajobs))             echo "no job has been passed more 1 worker.";         else {             echo "those jobs has been passed more 1 times worker:\n";             foreach ($ajobs $sworload => $ajob) {                  echo "\njob #" . $sworload . ":\n";                 foreach ($ajob $aattributes)                     echo "  $aattributes[2] (worker pid: $aattributes[1])\n";             }         }          echo "\n\n";          // kill started workers.         foreach ($apids $spid)             exec('kill ' . $spid . ' > /dev/null 2>&1');      break;      case 'worker-standard':         $worker = new gearmanworker;         $worker->addserver();         $worker->addfunction('test', 'logjob');                     $bshutdown = false;         while ($worker->work())             if ($bshutdown)                 continue;         break;      case 'worker-exception':         try {             $worker = new gearmanworker;             $worker->addserver();             $worker->addfunction('test', 'logjob');             $bshutdown = false;             while ($worker->work())                 if ($bshutdown)                     throw new \exception;          } catch (\exception $e) {         }     break;      case 'worker-exception-sleep':         try {             $worker = new gearmanworker;             $worker->addserver();             $worker->addfunction('test', 'logjob');             $bshutdown = false;             while ($worker->work())             {                 if ($bshutdown) {                     sleep(1);                     throw new \exception;                 }             }         } catch (\exception $e) {         }     break; }  function logjob(\gearmanjob $job) {     global $bshutdown, $slogfilepath;     $sline = microtime(true) . "\t" . getmypid() . "\t" . $job->handle() . "\t" . $job->workload() . "\n";     file_put_contents($slogfilepath, $sline, file_append);     $bshutdown = true; }   function prepare() {     $rgearman = fsockopen('127.0.0.1', '4730', $ierrno, $serrstr, 3);     $abuffer = array();     fputs ($rgearman, 'status' . php_eol);     stream_set_timeout($rgearman, 1);     while (!feof($rgearman))         if ('.' . php_eol !== $sline = fgets($rgearman, 128))             $abuffer[] = $sline;         else             break;     fclose($rgearman);      $bjobsinqueue = false;     $bworkersrunning = false;     foreach ($abuffer $sfunctionline) {         list($sfunctionname, $iqueuedjobs, $irunningjobs, $iworkers) = explode("\t", $sfunctionline);         if ('test' === $sfunctionname) {             if (0 != $iqueuedjobs)                 $bjobsinqueue = true;             if (0 != $iworkers)                 $bworkersrunning = true;;         }     }      // exit if there workers running.     if ($bworkersrunning)         die("there gearman workers running have registered 'test' function. please stop these workers , run again.\n\n");      // if there test jobs in queue start worker eat jobs.     if ($bjobsinqueue) {         $spid = exec('gearman -n -w -f test > /dev/null 2>&1 & echo $!');         sleep(1);         exec ("kill $spid > /dev/null 2>&1");         // repeat method make sure jobs removed.         prepare();     } } 

when run code on command line should output "no job has been passed more 1 worker." insted alway outputs list of jobs have been passed more 1 worker. error doesn't appear if set $sworker = 'standard'; or 'exception-sleep'.

it me lot if run code , tell me if able reproduce error of if have bug in code.

had same issue gearman 0.24, pecl lib 1.0.2. able reproduce error script every time.

an older version of gearman (0.14 think) used work fine.

upgrading gearman 0.33 fixed issue.


Comments

Popular posts from this blog

delphi - How to convert bitmaps to video? -

jasper reports - Fixed header in Excel using JasperReports -

python - ('The SQL contains 0 parameter markers, but 50 parameters were supplied', 'HY000') or TypeError: 'tuple' object is not callable -