Optimize importing

This commit is contained in:
Sebastian Meyer 2024-01-06 12:53:20 +01:00
parent 00d4dc80cc
commit b0f8c49d97
10 changed files with 218 additions and 104 deletions

View File

@ -40,6 +40,14 @@ use Symfony\Component\Console\Output\OutputInterface;
)]
class AddRecordCommand extends Command
{
/**
* Executes the current command.
*
* @param InputInterface $input The input
* @param OutputInterface $output The output
*
* @return int 0 if everything went fine, or an error code
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
Database::getInstance()->pruneOrphanSets();

View File

@ -24,8 +24,10 @@ namespace OCC\OaiPmh2\Console;
use DateTime;
use OCC\OaiPmh2\Database;
use OCC\OaiPmh2\Database\Format;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Helper\ProgressIndicator;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
@ -43,6 +45,11 @@ use Symfony\Component\Console\Output\OutputInterface;
)]
class CsvImportCommand extends Command
{
/**
* Configures the current command.
*
* @return void
*/
protected function configure(): void
{
$this->addArgument(
@ -61,28 +68,28 @@ class CsvImportCommand extends Command
);
$this->addOption(
'idColumn',
null,
'i',
InputOption::VALUE_OPTIONAL,
'Name of the CSV column which holds the records\' identifier.',
'identifier'
);
$this->addOption(
'contentColumn',
null,
'c',
InputOption::VALUE_OPTIONAL,
'Name of the CSV column which holds the records\' content.',
'content'
);
$this->addOption(
'dateColumn',
null,
'd',
InputOption::VALUE_OPTIONAL,
'Name of the CSV column which holds the records\' datetime of last change.',
'lastChanged'
);
$this->addOption(
'setColumn',
null,
's',
InputOption::VALUE_OPTIONAL,
'Name of the CSV column which holds the records\' sets list.',
'sets'
@ -90,77 +97,66 @@ class CsvImportCommand extends Command
parent::configure();
}
/**
* Executes the current command.
*
* @param InputInterface $input The input
* @param OutputInterface $output The output
*
* @return int 0 if everything went fine, or an error code
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (!$this->validateInput($input, $output)) {
return Command::INVALID;
}
$memoryLimit = $this->getMemoryLimit();
/** @var array<string, string> */
$arguments = $input->getArguments();
/** @var array<string, string> */
$options = $input->getOptions();
$formats = Database::getInstance()->getMetadataFormats()->getQueryResult();
if (!in_array($arguments['format'], array_keys($formats), true)) {
// Error: Invalid metadata prefix
echo 1;
return Command::INVALID;
}
/** @var Format */
$format = Database::getInstance()->getEntityManager()->getReference(Format::class, $arguments['format']);
/** @var resource */
$file = fopen($arguments['file'], 'r');
if ($file === false) {
// Error: File not found or not readable
echo 2;
return Command::INVALID;
}
$headers = fgetcsv($file);
if (!is_array($headers)) {
// Error: No CSV
echo 3;
return Command::INVALID;
} else {
$headers = array_flip($headers);
}
$column = [];
foreach ($options as $option => $value) {
if (isset($headers[$value])) {
$column[$option] = $headers[$value];
}
}
if (!isset($column['idColumn']) || !isset($column['contentColumn'])) {
// Error: Required columns missing
echo 4;
$columns = $this->getColumnNames($input, $output, $file);
if (count($columns) === 0) {
return Command::INVALID;
}
$lastChanged = new DateTime();
$count = 0;
$progressIndicator = new ProgressIndicator($output, 'verbose', 200, ['⠏', '⠛', '⠹', '⢸', '⣰', '⣤', '⣆', '⡇']);
$progressIndicator->start('Importing...');
while ($record = fgetcsv($file)) {
$identifier = $record[$column['idColumn']];
$content = $record[$column['contentColumn']];
if ($content === '') {
$content = null;
}
if (isset($column['dateColumn'])) {
$lastChanged = new DateTime($record[$column['dateColumn']]);
}
// TODO: Complete support for sets.
$sets = null;
Database::getInstance()->addOrUpdateRecord(
$identifier,
$arguments['format'],
$content,
$lastChanged,
$sets,
$record[$columns['idColumn']],
$format,
trim($record[$columns['contentColumn']]),
new DateTime($record[$columns['dateColumn']] ?? 'now'),
// TODO: Complete support for sets.
/* $record[$columns['setColumn']] ?? */ null,
true
);
++$count;
if ($count % 500 === 0) {
$progressIndicator->advance();
$progressIndicator->setMessage((string) $count . ' done.');
// Flush to database if memory usage reaches 90% of available limit.
if (memory_get_usage() / $memoryLimit > 0.9) {
Database::getInstance()->flush(true);
/** @var Format */
$format = Database::getInstance()->getEntityManager()->getReference(Format::class, $arguments['format']);
}
}
Database::getInstance()->flush(true);
Database::getInstance()->pruneOrphanSets();
$progressIndicator->finish('All done!');
fclose($file);
$output->writeln([
'',
sprintf(
@ -172,4 +168,117 @@ class CsvImportCommand extends Command
]);
return Command::SUCCESS;
}
/**
* Get the column names of CSV.
*
* @param InputInterface $input The inputs
* @param OutputInterface $output The output interface
* @param resource $file The handle for the CSV file
*
* @return array<string, int|string> The mapped column names
*/
protected function getColumnNames(InputInterface $input, OutputInterface $output, $file): array
{
/** @var array<string, string> */
$options = $input->getOptions();
$columns = [];
$headers = fgetcsv($file);
if (!is_array($headers)) {
$output->writeln([
'',
sprintf(
' [ERROR] File "%s" does not contain valid CSV. ',
stream_get_meta_data($file)['uri']
),
''
]);
return [];
} else {
$headers = array_flip($headers);
}
foreach ($options as $option => $value) {
if (isset($headers[$value])) {
$columns[$option] = $headers[$value];
}
}
if (!isset($columns['idColumn']) || !isset($columns['contentColumn'])) {
$output->writeln([
'',
sprintf(
' [ERROR] File "%s" does not contain valid CSV. ',
stream_get_meta_data($file)['uri']
),
''
]);
return [];
}
return $columns;
}
/**
* Get the PHP memory limit in bytes.
*
* @return int The memory limit in bytes or -1 if unlimited
*/
protected function getMemoryLimit(): int
{
$ini = trim(ini_get('memory_limit'));
$limit = (int) $ini;
$unit = strtolower($ini[strlen($ini)-1]);
switch($unit) {
case 'g':
$limit *= 1024;
case 'm':
$limit *= 1024;
case 'k':
$limit *= 1024;
}
if ($limit < 0) {
return -1;
}
return $limit;
}
/**
* Validate input arguments.
*
* @param InputInterface $input The inputs
* @param OutputInterface $output The output interface
*
* @return bool Whether the inputs validate
*/
protected function validateInput(InputInterface $input, OutputInterface $output): bool
{
/** @var array<string, string> */
$arguments = $input->getArguments();
$formats = Database::getInstance()->getMetadataFormats()->getQueryResult();
if (!in_array($arguments['format'], array_keys($formats), true)) {
$output->writeln([
'',
sprintf(
' [ERROR] Metadata format "%s" is not supported. ',
$arguments['format']
),
''
]);
return false;
}
if (!is_readable($arguments['file'])) {
$output->writeln([
'',
sprintf(
' [ERROR] File "%s" not found or not readable. ',
$arguments['file']
),
''
]);
return false;
}
return true;
}
}

View File

@ -41,6 +41,14 @@ use Symfony\Component\Console\Output\OutputInterface;
)]
class DeleteRecordCommand extends Command
{
/**
* Executes the current command.
*
* @param InputInterface $input The input
* @param OutputInterface $output The output
*
* @return int 0 if everything went fine, or an error code
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$policy = Configuration::getInstance()->deletedRecords;

View File

@ -42,17 +42,30 @@ use Symfony\Component\Console\Output\OutputInterface;
)]
class PruneRecordsCommand extends Command
{
/**
* Configures the current command.
*
* @return void
*/
protected function configure(): void
{
$this->addOption(
'force',
null,
'f',
InputOption::VALUE_NONE,
'Deletes records even under "transient" policy.'
);
parent::configure();
}
/**
* Executes the current command.
*
* @param InputInterface $input The input
* @param OutputInterface $output The output
*
* @return int 0 if everything went fine, or an error code
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$policy = Configuration::getInstance()->deletedRecords;

View File

@ -40,6 +40,14 @@ use Symfony\Component\Console\Output\OutputInterface;
)]
class PruneResumptionTokensCommand extends Command
{
/**
* Executes the current command.
*
* @param InputInterface $input The input
* @param OutputInterface $output The output
*
* @return int 0 if everything went fine, or an error code
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$expired = Database::getInstance()->pruneResumptionTokens();

View File

@ -45,6 +45,14 @@ use Symfony\Component\Validator\Exception\ValidationFailedException;
)]
class UpdateFormatsCommand extends Command
{
/**
* Executes the current command.
*
* @param InputInterface $input The input
* @param OutputInterface $output The output
*
* @return int 0 if everything went fine, or an error code
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$formats = Configuration::getInstance()->metadataPrefix;

View File

@ -106,7 +106,7 @@ class Database
* Add or update record.
*
* @param string $identifier The record identifier
* @param Format|string $format The metadata prefix
* @param Format $format The metadata prefix
* @param ?string $data The record's content
* @param ?DateTime $lastChanged The date of last change
* @param ?array<string, Set> $sets The record's associated sets
@ -116,7 +116,7 @@ class Database
*/
public function addOrUpdateRecord(
string $identifier,
Format|string $format,
Format $format,
?string $data = null,
?DateTime $lastChanged = null,
// TODO: Complete support for sets
@ -124,10 +124,6 @@ class Database
bool $bulkMode = false
): void
{
if (!$format instanceof Format) {
/** @var Format */
$format = $this->entityManager->getReference(Format::class, $format);
}
$record = $this->entityManager->find(Record::class, ['identifier' => $identifier, 'format' => $format]);
if (!isset($data) && Configuration::getInstance()->deletedRecords === 'no') {
if (isset($record)) {
@ -257,7 +253,7 @@ class Database
* Get list of records.
*
* @param string $verb The currently requested verb ('ListIdentifiers' or 'ListRecords')
* @param string $metadataPrefix The metadata prefix
* @param Format $metadataPrefix The metadata format
* @param int $counter Counter for split result sets
* @param ?string $from The "from" datestamp
* @param ?string $until The "until" datestamp
@ -267,7 +263,7 @@ class Database
*/
public function getRecords(
string $verb,
string $metadataPrefix,
Format $metadataPrefix,
int $counter = 0,
?string $from = null,
?string $until = null,
@ -305,7 +301,7 @@ class Database
$token = new Token($verb, [
'counter' => $counter + 1,
'completeListSize' => count($paginator),
'metadataPrefix' => $metadataPrefix,
'metadataPrefix' => $metadataPrefix->getPrefix(),
'from' => $from,
'until' => $until,
'set' => $set

View File

@ -22,8 +22,6 @@ declare(strict_types=1);
namespace OCC\OaiPmh2\Database;
use Doctrine\Common\Collections\ArrayCollection;
use Doctrine\Common\Collections\Collection;
use Doctrine\ORM\Mapping as ORM;
use Symfony\Component\Validator\Constraints as Assert;
use Symfony\Component\Validator\Exception\ValidationFailedException;
@ -58,28 +56,6 @@ class Format
#[ORM\Column(type: 'string')]
private string $xmlSchema;
/**
* Collection of associated records.
*
* @var Collection<int, Record>
*/
#[ORM\OneToMany(targetEntity: Record::class, mappedBy: 'format', fetch: 'EXTRA_LAZY', cascade: ['persist'], orphanRemoval: true)]
private Collection $records;
/**
* Update bi-directional association with records.
*
* @param Record $record The record to add to this format
*
* @return void
*/
public function addRecord(Record $record): void
{
if (!$this->records->contains($record)) {
$this->records->add($record);
}
}
/**
* Get the format's namespace URI.
*
@ -100,16 +76,6 @@ class Format
return $this->prefix;
}
/**
* Get a collection of associated records.
*
* @return Collection<int, Record> The associated records
*/
public function getRecords(): Collection
{
return $this->records;
}
/**
* Get the format's schema URL.
*
@ -221,7 +187,6 @@ class Format
$this->prefix = $this->validatePrefix($prefix);
$this->setNamespace($namespace);
$this->setSchema($schema);
$this->records = new ArrayCollection();
} catch (ValidationFailedException $exception) {
throw $exception;
}

View File

@ -51,7 +51,7 @@ class Record
* The associated format.
*/
#[ORM\Id]
#[ORM\ManyToOne(targetEntity: Format::class, inversedBy: 'records', cascade: ['persist'])]
#[ORM\ManyToOne(targetEntity: Format::class, inversedBy: 'records')]
#[ORM\JoinColumn(name: 'format', referencedColumnName: 'prefix')]
private Format $format;
@ -206,7 +206,6 @@ class Record
protected function setFormat(Format $format): void
{
$this->format = $format;
$format->addRecord($this);
}
/**

View File

@ -70,8 +70,8 @@ class ListIdentifiers extends Middleware
}
}
}
$prefixes = Database::getInstance()->getMetadataFormats();
if (!in_array($metadataPrefix, array_keys($prefixes->getQueryResult()), true)) {
$prefixes = Database::getInstance()->getMetadataFormats()->getQueryResult();
if (!in_array($metadataPrefix, array_keys($prefixes), true)) {
ErrorHandler::getInstance()->withError('cannotDisseminateFormat');
return;
}
@ -83,7 +83,7 @@ class ListIdentifiers extends Middleware
}
}
$records = Database::getInstance()->getRecords($verb, $metadataPrefix, $counter, $from, $until, $set);
$records = Database::getInstance()->getRecords($verb, $prefixes[$metadataPrefix], $counter, $from, $until, $set);
if (count($records) === 0) {
ErrorHandler::getInstance()->withError('noRecordsMatch');
return;