vendor/okvpn/cron-bundle/src/Middleware/CronMiddlewareEngine.php line 120

Open in your IDE?
  1. <?php
  2. declare(strict_types=1);
  3. namespace Okvpn\Bundle\CronBundle\Middleware;
  4. use Okvpn\Bundle\CronBundle\Cron\CronChecker;
  5. use Okvpn\Bundle\CronBundle\Model\EnvelopeTools as ET;
  6. use Okvpn\Bundle\CronBundle\Model\EnvironmentStamp;
  7. use Okvpn\Bundle\CronBundle\Model\PeriodicalStampInterface;
  8. use Okvpn\Bundle\CronBundle\Model\ScheduleEnvelope;
  9. use Okvpn\Bundle\CronBundle\Model\ScheduleStamp;
  10. use Okvpn\Bundle\CronBundle\Runner\ScheduleLoopInterface;
  11. use Okvpn\Bundle\CronBundle\Runner\TimerStorage;
  12. use Okvpn\Bundle\CronBundle\Utils\CronUtils;
  13. use Psr\Clock\ClockInterface as PsrClockInterface;
  14. final class CronMiddlewareEngine implements MiddlewareEngineInterface
  15. {
  16. private $timeZone;
  17. private $checker;
  18. private $clock;
  19. /** @var ScheduleLoopInterface|null */
  20. private $scheduleLoop;
  21. private $lastLoopTasks = [];
  22. private $timers;
  23. public function __construct(CronChecker $checker, ?string $timeZone = null, ?PsrClockInterface $clock = null, ?ScheduleLoopInterface $scheduleLoop = null, ?TimerStorage $timers = null)
  24. {
  25. $this->timeZone = $timeZone;
  26. $this->checker = $checker;
  27. $this->clock = $clock;
  28. $this->scheduleLoop = $scheduleLoop;
  29. $this->timers = $timers ?? new TimerStorage();
  30. }
  31. /**
  32. * {@inheritdoc}
  33. */
  34. public function handle(ScheduleEnvelope $envelope, StackInterface $stack): ScheduleEnvelope
  35. {
  36. $env = $envelope->has(EnvironmentStamp::class) ? $envelope->get(EnvironmentStamp::class)->toArray() : [];
  37. // For testing usage. drops next middlewares
  38. if ($dryRun = ($env['dry-run'] ?? false)) {
  39. $stack->end();
  40. }
  41. if (!$stamp = $envelope->get(PeriodicalStampInterface::class)) {
  42. $dryRun ? null : ET::info($envelope, "{{ task }} > Run schedule task.");
  43. return $stack->next()->handle($envelope, $stack);
  44. }
  45. $useDemand = $env['demand'] ?? false;
  46. $noLoop = $env['no-loop'] ?? false;
  47. $now = ($env['now'] ?? null) instanceof \DateTimeInterface ? $env['now'] : $this->getNow();
  48. return true === $useDemand && null !== $this->scheduleLoop && false === $noLoop ?
  49. $this->handleDemand($now, $envelope, $stamp, $stack) :
  50. $this->handleNoDemand($now, $envelope, $stamp, $stack);
  51. }
  52. private function handleDemand(\DateTimeInterface $now, ScheduleEnvelope $envelope, PeriodicalStampInterface $stamp, StackInterface $stack): ScheduleEnvelope
  53. {
  54. $this->lastLoopTasks[$hash = ET::calculateHash($envelope)] = 1;
  55. if ($this->timers->hasTimer($hash)) {
  56. [$timer, $prevEnvelope] = $this->timers->getTimer($hash);
  57. if ((string)$prevEnvelope->get(PeriodicalStampInterface::class) !== (string) $stamp) {
  58. $this->timers->remove($hash);
  59. $this->scheduleLoop->cancelTimer($timer);
  60. ET::notice($envelope, "{{ task }} > Cron expression has been changed.");
  61. } else {
  62. $this->timers->refreshEnvelope($envelope);
  63. return $stack->end()->handle($envelope, $stack);
  64. }
  65. }
  66. $prevEnvelope = $envelope;
  67. $timers = $this->timers;
  68. $loop = $this->scheduleLoop;
  69. $nextTime = $stamp->getNextRunDate($now = $loop->now());
  70. $timers->attach($envelope, $runner = static function (/* $periodical = true*/) use ($timers, $prevEnvelope, $hash, $stack, $stamp, $loop, &$runner): ScheduleEnvelope {
  71. if (null === ($envelope = $timers->findByHash($hash))) {
  72. ET::notice($prevEnvelope, "{{ task }} > Task canceled. Someone detached an envelope from timers storage");
  73. return ($clone = clone $stack)->end()->handle($envelope->without(PeriodicalStampInterface::class), $clone);
  74. }
  75. ET::info($envelope, "{{ task }} > Run schedule task.");
  76. try {
  77. $result = ($clone = clone $stack)->next()->handle($envelope->without(PeriodicalStampInterface::class), $clone);
  78. } catch (\Throwable $e) {
  79. $result = $envelope;
  80. ET::error($envelope, "{{ task }} > Task ERRORED. {$e->getMessage()}", ['e' => $e]);
  81. }
  82. if (false !== (\func_get_args()[0] ?? null)) {
  83. $nextTime = $stamp->getNextRunDate($now = $loop->now());
  84. $delay = (float) $nextTime->format('U.u') - (float) $now->format('U.u');
  85. $loop->addTimer($delay, $runner);
  86. ET::debug($envelope, \sprintf("{{ task }} > was scheduled with delay %.6F sec.", $delay));
  87. }
  88. return $result;
  89. });
  90. $delay = (float) $nextTime->format('U.u') - (float) $now->format('U.u');
  91. ET::debug($envelope, \sprintf("{{ task }} > was scheduled with delay %.6F sec.", $delay));
  92. $loop->addTimer($delay, $runner);
  93. return ($clone = clone $stack)->end()->handle($envelope, $clone);
  94. }
  95. public function onLoopEnd(): void
  96. {
  97. $this->cancelOrphanTasks();
  98. $this->lastLoopTasks = [];
  99. }
  100. private function handleNoDemand(\DateTimeInterface $now, ScheduleEnvelope $envelope, PeriodicalStampInterface $stamp, StackInterface $stack): ScheduleEnvelope
  101. {
  102. if ($stamp instanceof ScheduleStamp) {
  103. try {
  104. $isDue = $this->checker->isDue($expr = $stamp->cronExpression(), $this->timeZone, $now);
  105. } catch (\Throwable $e) {
  106. ET::error($envelope, "{{ task }} > The cron expression $expr for task is invalid. {$e->getMessage()}", ['e' => $e]);
  107. return $stack->end()->handle($envelope, $stack);
  108. }
  109. } else {
  110. $currentTime = (int)(60 * \floor($now->getTimestamp()/60));
  111. $now = CronUtils::toDate(($currentTime-1), $now);
  112. $nextRun = $stamp->getNextRunDate($now);
  113. $nextRun = (int)(60 * \floor($nextRun->getTimestamp()/60));
  114. $isDue = $nextRun === $currentTime;
  115. }
  116. if ($isDue) {
  117. ET::info($envelope, "{{ task }} > The schedule task is due now!");
  118. return $stack->next()->handle($envelope->without(PeriodicalStampInterface::class), $stack);
  119. } else {
  120. ET::debug($envelope, "{{ task }} > Skipped the schedule task by cron restriction");
  121. }
  122. return $stack->end()->handle($envelope, $stack);
  123. }
  124. private function cancelOrphanTasks(): void
  125. {
  126. foreach ($this->timers->getTimers() as $hash => [$timer, $envelope]) {
  127. if (!isset($this->lastLoopTasks[$hash])) {
  128. ET::notice($envelope, "{{ task }} > task canceled - is not active anymore");
  129. $this->scheduleLoop->cancelTimer($timer);
  130. $this->timers->remove($hash);
  131. }
  132. }
  133. }
  134. private function getNow(): \DateTimeImmutable
  135. {
  136. return $this->clock ? $this->clock->now() : new \DateTimeImmutable('now', $this->timeZone ? new \DateTimeZone($this->timeZone) : null);
  137. }
  138. }