vendor/shopware/core/Content/Product/DataAbstractionLayer/StockUpdater.php line 140

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Content\Product\DataAbstractionLayer;
  3. use Doctrine\DBAL\Connection;
  4. use Shopware\Core\Checkout\Cart\Event\CheckoutOrderPlacedEvent;
  5. use Shopware\Core\Checkout\Cart\LineItem\LineItem;
  6. use Shopware\Core\Checkout\Order\Aggregate\OrderLineItem\OrderLineItemDefinition;
  7. use Shopware\Core\Checkout\Order\OrderEvents;
  8. use Shopware\Core\Checkout\Order\OrderStates;
  9. use Shopware\Core\Content\Product\DataAbstractionLayer\StockUpdate\StockUpdateFilterProvider;
  10. use Shopware\Core\Content\Product\Events\ProductNoLongerAvailableEvent;
  11. use Shopware\Core\Defaults;
  12. use Shopware\Core\Framework\Context;
  13. use Shopware\Core\Framework\DataAbstractionLayer\Doctrine\RetryableQuery;
  14. use Shopware\Core\Framework\DataAbstractionLayer\EntityWriteResult;
  15. use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenEvent;
  16. use Shopware\Core\Framework\DataAbstractionLayer\Write\Command\ChangeSetAware;
  17. use Shopware\Core\Framework\DataAbstractionLayer\Write\Command\DeleteCommand;
  18. use Shopware\Core\Framework\DataAbstractionLayer\Write\Command\WriteCommand;
  19. use Shopware\Core\Framework\DataAbstractionLayer\Write\Validation\PreWriteValidationEvent;
  20. use Shopware\Core\Framework\Log\Package;
  21. use Shopware\Core\Framework\Uuid\Uuid;
  22. use Shopware\Core\Profiling\Profiler;
  23. use Shopware\Core\System\StateMachine\Event\StateMachineTransitionEvent;
  24. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  25. use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
  26. /**
  27.  * @deprecated tag:v6.5.0 - reason:becomes-internal - EventSubscribers will become internal in v6.5.0
  28.  */
  29. #[Package('core')]
  30. class StockUpdater implements EventSubscriberInterface
  31. {
  32.     private Connection $connection;
  33.     private EventDispatcherInterface $dispatcher;
  34.     private StockUpdateFilterProvider $stockUpdateFilter;
  35.     /**
  36.      * @internal
  37.      */
  38.     public function __construct(
  39.         Connection $connection,
  40.         EventDispatcherInterface $dispatcher,
  41.         StockUpdateFilterProvider $stockUpdateFilter
  42.     ) {
  43.         $this->connection $connection;
  44.         $this->dispatcher $dispatcher;
  45.         $this->stockUpdateFilter $stockUpdateFilter;
  46.     }
  47.     /**
  48.      * Returns a list of custom business events to listen where the product maybe changed
  49.      *
  50.      * @return array<string, string|array{0: string, 1: int}|list<array{0: string, 1?: int}>>
  51.      */
  52.     public static function getSubscribedEvents()
  53.     {
  54.         return [
  55.             CheckoutOrderPlacedEvent::class => 'orderPlaced',
  56.             StateMachineTransitionEvent::class => 'stateChanged',
  57.             PreWriteValidationEvent::class => 'triggerChangeSet',
  58.             OrderEvents::ORDER_LINE_ITEM_WRITTEN_EVENT => 'lineItemWritten',
  59.             OrderEvents::ORDER_LINE_ITEM_DELETED_EVENT => 'lineItemWritten',
  60.         ];
  61.     }
  62.     public function triggerChangeSet(PreWriteValidationEvent $event): void
  63.     {
  64.         if ($event->getContext()->getVersionId() !== Defaults::LIVE_VERSION) {
  65.             return;
  66.         }
  67.         foreach ($event->getCommands() as $command) {
  68.             if (!$command instanceof ChangeSetAware) {
  69.                 continue;
  70.             }
  71.             /** @var ChangeSetAware&WriteCommand $command */
  72.             if ($command->getDefinition()->getEntityName() !== OrderLineItemDefinition::ENTITY_NAME) {
  73.                 continue;
  74.             }
  75.             if ($command instanceof DeleteCommand) {
  76.                 $command->requestChangeSet();
  77.                 continue;
  78.             }
  79.             /** @var WriteCommand&ChangeSetAware $command */
  80.             if ($command->hasField('referenced_id') || $command->hasField('product_id') || $command->hasField('quantity')) {
  81.                 $command->requestChangeSet();
  82.             }
  83.         }
  84.     }
  85.     public function orderPlaced(CheckoutOrderPlacedEvent $event): void
  86.     {
  87.         $ids = [];
  88.         foreach ($event->getOrder()->getLineItems() ?? [] as $lineItem) {
  89.             if ($lineItem->getType() !== LineItem::PRODUCT_LINE_ITEM_TYPE) {
  90.                 continue;
  91.             }
  92.             $referencedId $lineItem->getReferencedId();
  93.             if (!$referencedId) {
  94.                 continue;
  95.             }
  96.             if (!\array_key_exists($referencedId$ids)) {
  97.                 $ids[$referencedId] = 0;
  98.             }
  99.             $ids[$referencedId] += $lineItem->getQuantity();
  100.         }
  101.         $filteredIds $this->stockUpdateFilter->filterProductIdsForStockUpdates(\array_keys($ids), $event->getContext());
  102.         $ids \array_filter($ids, static fn (string $id) => \in_array($id$filteredIdstrue), \ARRAY_FILTER_USE_KEY);
  103.         // order placed event is a high load event. Because of the high load, we simply reduce the quantity here instead of executing the high costs `update` function
  104.         $query = new RetryableQuery(
  105.             $this->connection,
  106.             $this->connection->prepare('UPDATE product SET available_stock = available_stock - :quantity WHERE id = :id')
  107.         );
  108.         Profiler::trace('order::update-stock', static function () use ($query$ids): void {
  109.             foreach ($ids as $id => $quantity) {
  110.                 $query->execute(['id' => Uuid::fromHexToBytes((string) $id), 'quantity' => $quantity]);
  111.             }
  112.         });
  113.         Profiler::trace('order::update-flag', function () use ($ids$event): void {
  114.             $this->updateAvailableFlag(\array_keys($ids), $event->getContext());
  115.         });
  116.     }
  117.     /**
  118.      * If the product of an order item changed, the stocks of the old product and the new product must be updated.
  119.      */
  120.     public function lineItemWritten(EntityWrittenEvent $event): void
  121.     {
  122.         $ids = [];
  123.         // we don't want to trigger to `update` method when we are inside the order process
  124.         if ($event->getContext()->hasState('checkout-order-route')) {
  125.             return;
  126.         }
  127.         foreach ($event->getWriteResults() as $result) {
  128.             if ($result->hasPayload('referencedId') && $result->getProperty('type') === LineItem::PRODUCT_LINE_ITEM_TYPE) {
  129.                 $ids[] = $result->getProperty('referencedId');
  130.             }
  131.             if ($result->getOperation() === EntityWriteResult::OPERATION_INSERT) {
  132.                 continue;
  133.             }
  134.             $changeSet $result->getChangeSet();
  135.             if (!$changeSet) {
  136.                 continue;
  137.             }
  138.             $type $changeSet->getBefore('type');
  139.             if ($type !== LineItem::PRODUCT_LINE_ITEM_TYPE) {
  140.                 continue;
  141.             }
  142.             if (!$changeSet->hasChanged('referenced_id') && !$changeSet->hasChanged('quantity')) {
  143.                 continue;
  144.             }
  145.             $ids[] = $changeSet->getBefore('referenced_id');
  146.             $ids[] = $changeSet->getAfter('referenced_id');
  147.         }
  148.         $ids array_filter(array_unique($ids));
  149.         if (empty($ids)) {
  150.             return;
  151.         }
  152.         $this->update($ids$event->getContext());
  153.     }
  154.     public function stateChanged(StateMachineTransitionEvent $event): void
  155.     {
  156.         if ($event->getContext()->getVersionId() !== Defaults::LIVE_VERSION) {
  157.             return;
  158.         }
  159.         if ($event->getEntityName() !== 'order') {
  160.             return;
  161.         }
  162.         if ($event->getToPlace()->getTechnicalName() === OrderStates::STATE_COMPLETED) {
  163.             $this->decreaseStock($event);
  164.             return;
  165.         }
  166.         if ($event->getFromPlace()->getTechnicalName() === OrderStates::STATE_COMPLETED) {
  167.             $this->increaseStock($event);
  168.             return;
  169.         }
  170.         if ($event->getToPlace()->getTechnicalName() === OrderStates::STATE_CANCELLED || $event->getFromPlace()->getTechnicalName() === OrderStates::STATE_CANCELLED) {
  171.             $products $this->getProductsOfOrder($event->getEntityId(), $event->getContext());
  172.             $ids array_column($products'referenced_id');
  173.             $this->updateAvailableStockAndSales($ids$event->getContext());
  174.             $this->updateAvailableFlag($ids$event->getContext());
  175.         }
  176.     }
  177.     /**
  178.      * @param list<string> $ids
  179.      */
  180.     public function update(array $idsContext $context): void
  181.     {
  182.         if ($context->getVersionId() !== Defaults::LIVE_VERSION) {
  183.             return;
  184.         }
  185.         $ids $this->stockUpdateFilter->filterProductIdsForStockUpdates($ids$context);
  186.         $this->updateAvailableStockAndSales($ids$context);
  187.         $this->updateAvailableFlag($ids$context);
  188.     }
  189.     private function increaseStock(StateMachineTransitionEvent $event): void
  190.     {
  191.         $products $this->getProductsOfOrder($event->getEntityId(), $event->getContext());
  192.         $ids array_column($products'referenced_id');
  193.         $this->updateStock($products, +1);
  194.         $this->updateAvailableStockAndSales($ids$event->getContext());
  195.         $this->updateAvailableFlag($ids$event->getContext());
  196.     }
  197.     private function decreaseStock(StateMachineTransitionEvent $event): void
  198.     {
  199.         $products $this->getProductsOfOrder($event->getEntityId(), $event->getContext());
  200.         $ids array_column($products'referenced_id');
  201.         $this->updateStock($products, -1);
  202.         $this->updateAvailableStockAndSales($ids$event->getContext());
  203.         $this->updateAvailableFlag($ids$event->getContext());
  204.     }
  205.     /**
  206.      * @param list<string> $ids
  207.      */
  208.     private function updateAvailableStockAndSales(array $idsContext $context): void
  209.     {
  210.         $ids array_filter(array_keys(array_flip($ids)));
  211.         if (empty($ids)) {
  212.             return;
  213.         }
  214.         $sql '
  215. SELECT LOWER(HEX(order_line_item.product_id)) as product_id,
  216.     IFNULL(
  217.         SUM(IF(state_machine_state.technical_name = :completed_state, 0, order_line_item.quantity)),
  218.         0
  219.     ) as open_quantity,
  220.     IFNULL(
  221.         SUM(IF(state_machine_state.technical_name = :completed_state, order_line_item.quantity, 0)),
  222.         0
  223.     ) as sales_quantity
  224. FROM order_line_item
  225.     INNER JOIN `order`
  226.         ON `order`.id = order_line_item.order_id
  227.         AND `order`.version_id = order_line_item.order_version_id
  228.     INNER JOIN state_machine_state
  229.         ON state_machine_state.id = `order`.state_id
  230.         AND state_machine_state.technical_name <> :cancelled_state
  231. WHERE order_line_item.product_id IN (:ids)
  232.     AND order_line_item.type = :type
  233.     AND order_line_item.version_id = :version
  234.     AND order_line_item.product_id IS NOT NULL
  235. GROUP BY product_id;
  236.         ';
  237.         $rows $this->connection->fetchAllAssociative(
  238.             $sql,
  239.             [
  240.                 'type' => LineItem::PRODUCT_LINE_ITEM_TYPE,
  241.                 'version' => Uuid::fromHexToBytes($context->getVersionId()),
  242.                 'completed_state' => OrderStates::STATE_COMPLETED,
  243.                 'cancelled_state' => OrderStates::STATE_CANCELLED,
  244.                 'ids' => Uuid::fromHexToBytesList($ids),
  245.             ],
  246.             [
  247.                 'ids' => Connection::PARAM_STR_ARRAY,
  248.             ]
  249.         );
  250.         $fallback array_column($rows'product_id');
  251.         $fallback array_diff($ids$fallback);
  252.         $update = new RetryableQuery(
  253.             $this->connection,
  254.             $this->connection->prepare('UPDATE product SET available_stock = stock - :open_quantity, sales = :sales_quantity, updated_at = :now WHERE id = :id')
  255.         );
  256.         foreach ($fallback as $id) {
  257.             $update->execute([
  258.                 'id' => Uuid::fromHexToBytes((string) $id),
  259.                 'open_quantity' => 0,
  260.                 'sales_quantity' => 0,
  261.                 'now' => (new \DateTime())->format(Defaults::STORAGE_DATE_TIME_FORMAT),
  262.             ]);
  263.         }
  264.         foreach ($rows as $row) {
  265.             $update->execute([
  266.                 'id' => Uuid::fromHexToBytes($row['product_id']),
  267.                 'open_quantity' => $row['open_quantity'],
  268.                 'sales_quantity' => $row['sales_quantity'],
  269.                 'now' => (new \DateTime())->format(Defaults::STORAGE_DATE_TIME_FORMAT),
  270.             ]);
  271.         }
  272.     }
  273.     /**
  274.      * @param list<string> $ids
  275.      */
  276.     private function updateAvailableFlag(array $idsContext $context): void
  277.     {
  278.         $ids array_filter(array_unique($ids));
  279.         if (empty($ids)) {
  280.             return;
  281.         }
  282.         $bytes Uuid::fromHexToBytesList($ids);
  283.         $sql '
  284.             UPDATE product
  285.             LEFT JOIN product parent
  286.                 ON parent.id = product.parent_id
  287.                 AND parent.version_id = product.version_id
  288.             SET product.available = IFNULL((
  289.                 IFNULL(product.is_closeout, parent.is_closeout) * product.available_stock
  290.                 >=
  291.                 IFNULL(product.is_closeout, parent.is_closeout) * IFNULL(product.min_purchase, parent.min_purchase)
  292.             ), 0)
  293.             WHERE product.id IN (:ids)
  294.             AND product.version_id = :version
  295.         ';
  296.         RetryableQuery::retryable($this->connection, function () use ($sql$context$bytes): void {
  297.             $this->connection->executeStatement(
  298.                 $sql,
  299.                 ['ids' => $bytes'version' => Uuid::fromHexToBytes($context->getVersionId())],
  300.                 ['ids' => Connection::PARAM_STR_ARRAY]
  301.             );
  302.         });
  303.         $updated $this->connection->fetchFirstColumn(
  304.             'SELECT LOWER(HEX(id)) FROM product WHERE available = 0 AND id IN (:ids) AND product.version_id = :version',
  305.             ['ids' => $bytes'version' => Uuid::fromHexToBytes($context->getVersionId())],
  306.             ['ids' => Connection::PARAM_STR_ARRAY]
  307.         );
  308.         if (!empty($updated)) {
  309.             $this->dispatcher->dispatch(new ProductNoLongerAvailableEvent($updated$context));
  310.         }
  311.     }
  312.     /**
  313.      * @param list<array{referenced_id: string, quantity: string}> $products
  314.      */
  315.     private function updateStock(array $productsint $multiplier): void
  316.     {
  317.         $query = new RetryableQuery(
  318.             $this->connection,
  319.             $this->connection->prepare('UPDATE product SET stock = stock + :quantity WHERE id = :id AND version_id = :version')
  320.         );
  321.         foreach ($products as $product) {
  322.             $query->execute([
  323.                 'quantity' => (int) $product['quantity'] * $multiplier,
  324.                 'id' => Uuid::fromHexToBytes($product['referenced_id']),
  325.                 'version' => Uuid::fromHexToBytes(Defaults::LIVE_VERSION),
  326.             ]);
  327.         }
  328.     }
  329.     /**
  330.      * @return list<array{referenced_id: string, quantity: string}>
  331.      */
  332.     private function getProductsOfOrder(string $orderIdContext $context): array
  333.     {
  334.         $query $this->connection->createQueryBuilder();
  335.         $query->select(['referenced_id''quantity']);
  336.         $query->from('order_line_item');
  337.         $query->andWhere('type = :type');
  338.         $query->andWhere('order_id = :id');
  339.         $query->andWhere('version_id = :version');
  340.         $query->setParameter('id'Uuid::fromHexToBytes($orderId));
  341.         $query->setParameter('version'Uuid::fromHexToBytes(Defaults::LIVE_VERSION));
  342.         $query->setParameter('type'LineItem::PRODUCT_LINE_ITEM_TYPE);
  343.         /** @var list<array{referenced_id: string, quantity: string}> $result */
  344.         $result $query->executeQuery()->fetchAllAssociative();
  345.         $filteredIds $this->stockUpdateFilter->filterProductIdsForStockUpdates(\array_column($result'referenced_id'), $context);
  346.         return \array_filter($result, static fn (array $item) => \in_array($item['referenced_id'], $filteredIdstrue));
  347.     }
  348. }