Skip to content

Commit

Permalink
Merge pull request #71 from esmero/ISSUE-70
Browse files Browse the repository at this point in the history
ISSUE-70 and ISSUE-69: Improves process logic and cleans up atomic/processor generated file/garbage
  • Loading branch information
DiegoPino authored Nov 17, 2022
2 parents 15b5b5b + 68af964 commit dbac9cf
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 38 deletions.
170 changes: 149 additions & 21 deletions src/Plugin/QueueWorker/AbstractPostProcessorQueueWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
namespace Drupal\strawberry_runners\Plugin\QueueWorker;

use Drupal;
use Drupal\Core\Datetime\DrupalDateTime;
use Drupal\Core\Entity\ContentEntityInterface;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Entity\TranslatableInterface;
Expand All @@ -21,8 +22,10 @@
use Drupal\file\FileInterface;
use Drupal\search_api\Query\QueryInterface;
use Drupal\strawberry_runners\Plugin\StrawberryRunnersPostProcessorPluginInterface;
use Drupal\strawberryfield\Event\StrawberryfieldFileEvent;
use Drupal\strawberryfield\Semantic\ActivityStream;
use Drupal\Core\File\Exception\FileException;
use Drupal\strawberryfield\StrawberryfieldEventType;
use Exception;
use Drupal\Core\Queue\RequeueException;
use Psr\Log\LoggerInterface;
Expand All @@ -32,6 +35,7 @@
use Drupal\strawberry_runners\Plugin\StrawberryRunnersPostProcessorPluginManager;
use Drupal\strawberryfield\Plugin\search_api\datasource\StrawberryfieldFlavorDatasource;
use Drupal\search_api\ParseMode\ParseModePluginManager;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

abstract class AbstractPostProcessorQueueWorker extends QueueWorkerBase implements ContainerFactoryPluginInterface {

Expand Down Expand Up @@ -82,6 +86,18 @@ abstract class AbstractPostProcessorQueueWorker extends QueueWorkerBase implemen
*/
private $strawberryRunnerProcessorPluginManager;

/**
* An array containing files that can be deleted.
*
* @var array
*/
protected $instanceFiles = [];

/**
* @var \Symfony\Component\EventDispatcher\EventDispatcherInterface
*/
protected $eventDispatcher;

/**
* Constructor.
*
Expand All @@ -96,7 +112,7 @@ abstract class AbstractPostProcessorQueueWorker extends QueueWorkerBase implemen
* @param \Psr\Log\LoggerInterface $logger
* @param \Drupal\search_api\ParseMode\ParseModePluginManager $parse_mode_manager
*/
public function __construct(array $configuration, $plugin_id, $plugin_definition, EntityTypeManagerInterface $entity_type_manager, StrawberryRunnersPostProcessorPluginManager $strawberry_runner_processor_plugin_manager, FileSystemInterface $file_system, StreamWrapperManagerInterface $stream_wrapper_manager, KeyValueFactoryInterface $key_value, LoggerInterface $logger, ParseModePluginManager $parse_mode_manager) {
public function __construct(array $configuration, $plugin_id, $plugin_definition, EntityTypeManagerInterface $entity_type_manager, StrawberryRunnersPostProcessorPluginManager $strawberry_runner_processor_plugin_manager, FileSystemInterface $file_system, StreamWrapperManagerInterface $stream_wrapper_manager, KeyValueFactoryInterface $key_value, LoggerInterface $logger, ParseModePluginManager $parse_mode_manager, EventDispatcherInterface $event_dispatcher) {
parent::__construct($configuration, $plugin_id, $plugin_definition);
$this->entityTypeManager = $entity_type_manager;
$this->strawberryRunnerProcessorPluginManager = $strawberry_runner_processor_plugin_manager;
Expand All @@ -105,6 +121,7 @@ public function __construct(array $configuration, $plugin_id, $plugin_definition
$this->keyValue = $key_value;
$this->logger = $logger;
$this->parseModeManager = $parse_mode_manager;
$this->eventDispatcher = $event_dispatcher;
}

/**
Expand All @@ -128,7 +145,8 @@ public static function create(ContainerInterface $container, array $configuratio
$container->get('stream_wrapper_manager'),
$container->get('keyvalue'),
$container->get('logger.channel.strawberry_runners'),
$container->get('plugin.manager.search_api.parse_mode')
$container->get('plugin.manager.search_api.parse_mode'),
$container->get('event_dispatcher')
);
}

Expand All @@ -137,16 +155,32 @@ public static function create(ContainerInterface $container, array $configuratio
*/
public function processItem($data) {

// If $data->filepath_to_clean is present and is an array (can be a single entry) and
// $data->sbr_cleanup == TRUE
// Then this is will not do its processor invoking, this is a cleanup local files
// We send the composter even invocation and return;
if (is_array($data->filepath_to_clean ?? NULL) && ($data->sbr_cleanup ?? FALSE)) {
$this->dispatchComposter($data);
return;
}


$processor_instance = $this->getProcessorPlugin($data->plugin_config_entity_id);

if (!$processor_instance) {
$this->logger->log(LogLevel::ERROR, 'Strawberry Runners Processing aborted because the @processor may be inactive', ['@processor' => $processor_instance->label()]);
return;
}
$processor_config = $processor_instance->getConfiguration();

// @TODO check on this Diego. This is a bit misleading since it assumes
// every processor will work only on Files.
// True for now, but eventually we want processors that do only
// metadata to metadata.
if (!isset($data->fid) || $data->fid == NULL || !isset($data->nid) || $data->nid == NULL || !is_array($data->metadata)) {
return;
}

$file = $this->entityTypeManager->getStorage('file')->load($data->fid);
// 0 byte files have checksum, check what it is!
if ($file === NULL || !isset($data->metadata['checksum'])) {
Expand All @@ -161,7 +195,7 @@ public function processItem($data) {
->load($data->nid);

if (!$entity) {
$this->logger->log(LogLevel::ERROR, 'Sorry the Node ID @nodeid passed to the Strawberry Runners processor does not (longer?) exists. Skipping.', [
$this->logger->log(LogLevel::ERROR, 'Sorry the ADO Node ID @nodeid passed to the Strawberry Runners processor does not (longer?) exists. Skipping.', [
'@nodeid' => $data->nid,
]);
return;
Expand All @@ -170,21 +204,67 @@ public function processItem($data) {
$filelocation = $this->ensureFileAvailability($file);

if ($filelocation === FALSE) {
$this->logger->log(LogLevel::ERROR, 'Strawberry Runners Processing aborted for ADO Node ID @nodeid because we could not ensure a local file location needed for @processor. You might have run out space or have permission issues or (less likely) the original File/ADO was removed milliseconds ago.',
[
'@processor' => $processor_instance->label(),
'@nodeid' => $data->nid,
]
);
// Note. If $filelocation could not be acquired, means we do not need to compost neither
// its already gone/not possible
return;
}
// Means we could pass also a file directly anytime
// Means we could pass also a file directly anytime. But not really as such
// only into $data->filepath but not into $filelocation bc
// that would compost and remove the file. What if its needed later?
$data->filepath = $filelocation;

// We preset it up here.
$this->instanceFiles = [$filelocation];

if (!isset($processor_config['output_destination']) || !is_array($processor_config['output_destination'])) {
$this->logger->log(LogLevel::ERROR, 'Strawberry Runners Processing aborted because there is no output destination setup for @processor', ['@processor' => $processor_instance->label()]);
$this->logger->log(LogLevel::ERROR, 'Strawberry Runners Processing aborted for ADO Node ID @nodeid because there is no output destination setup for @processor',
[
'@processor' => $processor_instance->label(),
'@nodeid' => $data->nid,
]
);
return;
}


// Get the whole processing chain
$childprocessorschain = $this->getChildProcessorIds($data->plugin_config_entity_id ?? '', true);

$needs_localfile_cleanup = FALSE;
// If a child processor at any level will eventually chain up to a leaf (means generate queue items again)
$will_chain_future = FALSE;
// Just in case someone decides to avoid setting this one up
$data->sbr_cleanedup_before = $data->sbr_cleanedup_before ?? FALSE;

if (!$data->sbr_cleanedup_before) {
foreach ($childprocessorschain as $plugin_info) {
/* @var $postprocessor_config_entity_chain \Drupal\strawberry_runners\Entity\strawberryRunnerPostprocessorEntity */
$postprocessor_config_entity_chain = $plugin_info['config_entity'];
$chains = $postprocessor_config_entity_chain->getPluginconfig(
)['output_destination']['plugin'] ?? FALSE;
$chains = $chains === 'plugin' ? TRUE : FALSE;
$will_chain_future = $will_chain_future || $chains;
}
}
// When to clean up?
// If not cleaned up before
// AND won't chain in the future

$needs_localfile_cleanup = !$will_chain_future && !$data->sbr_cleanedup_before;
// We set this before triggering cleanup, means future thinking
// bc we need to make sure IF there is a next processor it will get
// The info that during this queuworker processing cleanup at the end
// Will happen at the end.
$data->sbr_cleanedup_before = $data->sbr_cleanedup_before == TRUE ? $data->sbr_cleanedup_before : $needs_localfile_cleanup;

$enabled_processor_output_types = array_intersect_assoc(StrawberryRunnersPostProcessorPluginInterface::OUTPUT_TYPE, $processor_config['output_destination']);

// make all this options constants

$tobeindexed = FALSE;
$tobeupdated = FALSE;
$tobechained = FALSE;
Expand All @@ -199,7 +279,6 @@ public function processItem($data) {
$tobechained = TRUE;
}


// Only applies to those that will be indexed
if ($tobeindexed) {
try {
Expand Down Expand Up @@ -228,8 +307,6 @@ public function processItem($data) {
// Here goes the main trick for making sure out $sequence_key in the Solr ID
// Is the right now (relative to its own, 1 if single file, or an increasing number if a pdf
// We check if the current item has siblings!
// If not, we immediatelly, independently of the actual internal
// $sequence is 1
if (!isset($data->siblings) || isset($data->siblings) && $data->siblings == 1) {
$sequence_key = 1;
}
Expand All @@ -247,10 +324,10 @@ public function processItem($data) {
// Check if we already have this entry in Solr
if ($inindex !== 0 && !$data->force) {
$this->logger->log(LogLevel::INFO, 'Flavor already in index for @plugin on ADO Node ID @nodeid, not forced, so skipping.',
[
'@plugin' => $processor_instance->getPluginId(),
'@nodeid' => $data->nid,
]
[
'@plugin' => $processor_instance->getPluginId(),
'@nodeid' => $data->nid,
]
);
}
$inkeystore = TRUE;
Expand Down Expand Up @@ -348,8 +425,11 @@ public function processItem($data) {
}
// Chains a new Processor into the QUEUE, if there are any children
if ($tobechained && isset($io->output->plugin) && !empty($io->output->plugin)) {
$childprocessors = $this->getChildProcessorIds($data->plugin_config_entity_id);
foreach ($childprocessors as $plugin_info) {
foreach ($childprocessorschain as $plugin_info) {
if ($plugin_info['parent_plugin_id'] !== $data->plugin_config_entity_id) {
// Means its another processor up the tree
continue ;
}
$childdata = clone $data; // So we do not touch original data
/* @var $strawberry_runners_postprocessor_config \Drupal\strawberry_runners\Entity\strawberryRunnerPostprocessorEntity */
$postprocessor_config_entity = $plugin_info['config_entity'];
Expand All @@ -363,7 +443,7 @@ public function processItem($data) {
// For each we enqueue a child using that property in its data
// Possible input properties:
// - Can come from the original Data (most likely)
// - May be overriden by the $io->output, e.g when a processor generates a file that is not part of any node
// - May be overridden by the $io->output, e.g when a processor generates a file that is not part of any node
$input_property_value_from_plugin = TRUE;
$input_property_value = isset($io->output->plugin) && isset($io->output->plugin[$input_property]) ? $io->output->plugin[$input_property] : NULL;
// If was not defined by the previous processor try from the main data.
Expand All @@ -382,7 +462,7 @@ public function processItem($data) {
]);
continue;
}
// Warning Diego. This may lead to a null
// Warning Diego. This may lead to a null?
$childdata->{$input_property} = $input_property_value;
$childdata->plugin_config_entity_id = $postprocessor_config_entity->id();
$input_argument_value = isset($io->output->plugin) && isset($io->output->plugin[$input_argument]) ?
Expand Down Expand Up @@ -414,6 +494,23 @@ public function processItem($data) {
}
}
}
// If we enqueued means we can not compost the original file.
// Safest route to get rid of the ensured local file
// Is to enqueue it on the same `strawberryrunners_process_background` queue
// Sadly not on a processor that is a leaf but on the one that creates leaves! (enques)
// or had will never enqueue at all. (yes, some just process).
// Why? because for one that creates there might be hundreds of leaves and we don't want
// to enqueue for cleanup hundred of times. Right?
// There is a bit of statistics (when) here but that said
// Since the file is re-checked of existence everytime a queue worker jumps
// in, if lost, we will simply regenerate it.
if ($needs_localfile_cleanup && $filelocation) {
$data_cleanup = new \stdClass();
$data_cleanup->filepath_to_clean = [$filelocation];
$data_cleanup->sbr_cleanup = TRUE;
Drupal::queue('strawberryrunners_process_background', TRUE)
->createItem($data_cleanup);
}
}

/**
Expand Down Expand Up @@ -462,7 +559,6 @@ private function ensureFileAvailability(FileInterface $file) {
$uri = $file->getFileUri();
// Local stream.
$cache_key = md5($uri);
// Check first if the file is already around in temp?
// @TODO can be sure its the same one? Ideas?
if (is_readable(
$this->fileSystem->realpath(
Expand Down Expand Up @@ -632,7 +728,7 @@ private function invokeProcessor(StrawberryRunnersPostProcessorPluginInterface $
'@nodeuuid' => $input->nuuid,
]
);
throw new RequeueException('I am not done yet!');
throw new RequeueException('I am not done yet. Will re-enqueu myself');
}
return $io;
}
Expand Down Expand Up @@ -758,9 +854,11 @@ protected function addActivityStream($name = NULL) {
*
* @param string $plugin_config_entity_id
*
* @param bool $wholechain
* If we will keep getting children up and accumulating the whole tree to a leaf
* @return array
*/
private function getChildProcessorIds(string $plugin_config_entity_id): array {
private function getChildProcessorIds(string $plugin_config_entity_id, $wholechain = FALSE): array {
/* @var $plugin_config_entities \Drupal\strawberry_runners\Entity\strawberryRunnerPostprocessorEntity[] */
$plugin_config_entities = $this->entityTypeManager->getListBuilder('strawberry_runners_postprocessor')
->load();
Expand All @@ -771,10 +869,16 @@ private function getChildProcessorIds(string $plugin_config_entity_id): array {
foreach ($plugin_config_entities as $plugin_config_entity) {
// Only get first level (no Parents) and Active ones.
if ($plugin_config_entity->isActive() && $plugin_config_entity->getParent() == $plugin_config_entity_id) {
// We keep the parent here too since we might need to fetch (when $wholechain == TRUE)
// Just the ones directly attached to the current one.
$active_plugins[] = [
'config_entity' => $plugin_config_entity,
'plugin_definition' => $plugin_definitions[$plugin_config_entity->getPluginid()],
'parent_plugin_id' => $plugin_config_entity_id,
];
if ($wholechain) {
$active_plugins = array_merge($active_plugins, $this->getChildProcessorIds($plugin_config_entity->id(), $wholechain));
}
}
}
return $active_plugins;
Expand Down Expand Up @@ -838,5 +942,29 @@ private function setNiceName($current_uri, $pluginid, $uuid) {
return $pluginid . '_from_' . $uuid . '.' . $destination_extension;
}

private function dispatchComposter(\StdClass $data):void {
// Just in case bc the destructor will be invoked
$this->instanceFiles = [];
foreach($data->filepath_to_clean ?? [] as $instanceFile) {
$event_type = StrawberryfieldEventType::TEMP_FILE_CREATION;
$current_timestamp = (new DrupalDateTime())->getTimestamp();
$event = new StrawberryfieldFileEvent($event_type, 'strawberry_runners', $instanceFile, $current_timestamp);
// This will allow any temp file on ADO save to be managed
// IN a queue by \Drupal\strawberryfield\EventSubscriber\StrawberryfieldEventCompostBinSubscriber
$this->eventDispatcher->dispatch($event, $event_type);
}
}

public function __destruct() {

/* foreach($this->instanceFiles as $instanceFile) {
$event_type = StrawberryfieldEventType::TEMP_FILE_CREATION;
$current_timestamp = (new DrupalDateTime())->getTimestamp();
$event = new StrawberryfieldFileEvent($event_type, 'strawberry_runners', $instanceFile, $current_timestamp);
// This will allow any temp file on ADO save to be managed
// IN a queue by \Drupal\strawberryfield\EventSubscriber\StrawberryfieldEventCompostBinSubscriber
$this->eventDispatcher->dispatch($event, $event_type);
}*/
}

}
Loading

0 comments on commit dbac9cf

Please sign in to comment.