Yo/src/cli/document/crawl.php
2024-04-07 20:14:45 +03:00

1114 lines
35 KiB
PHP

<?php
// Debug
$microtime = microtime(true);
// Load dependencies
require_once __DIR__ . '/../../../vendor/autoload.php';
// Init config
$config = json_decode(
file_get_contents(
__DIR__ . '/../../../config.json'
)
);
// Prevent multi-thread execution
$semaphore = sem_get(
crc32(
__DIR__ . '.yo.cli.document.crawl'
), 1
);
if (false === sem_acquire($semaphore, true))
{
if ($config->cli->document->crawl->debug->level->warning)
{
echo sprintf(
_('[%s] [warning] process execution locked by another thread!') . PHP_EOL,
date('c')
);
}
exit;
}
// Check network connection
if ($config->cli->document->crawl->network->check->enabled)
{
$network = false;
foreach ($config->cli->document->crawl->network->check->socket as $host => $port)
{
if ($config->cli->document->crawl->debug->level->notice)
{
echo sprintf(
_('[%s] [notice] check network connection to socket "%s" port "%d"...') . PHP_EOL,
date('c'),
$host,
$port
);
}
if (\Yggverse\Net\Socket::isOpen($host, $port, $config->cli->document->crawl->network->check->timeout))
{
if ($config->cli->document->crawl->debug->level->notice)
{
echo sprintf(
_('[%s] [notice] network connection test successful') . PHP_EOL,
date('c')
);
}
$network = true;
break;
}
else
{
if ($config->cli->document->crawl->debug->level->warning)
{
echo sprintf(
_('[%s] [warning] could not connect to socket "%s" port "%d"...') . PHP_EOL,
date('c'),
$host,
$port
);
}
}
}
if (!$network)
{
if ($config->cli->document->crawl->debug->level->error)
{
echo sprintf(
_('[%s] [error] network unreachable!') . PHP_EOL,
date('c')
);
}
exit;
}
}
// Init client
try {
$client = new \Manticoresearch\Client(
[
'host' => $config->manticore->server->host,
'port' => $config->manticore->server->port,
]
);
$index = $client->index(
$config->manticore->index->document->name
);
}
catch (Exception $exception)
{
if ($config->cli->document->crawl->debug->level->error)
{
echo sprintf(
_('[%s] [error] %s') . PHP_EOL,
date('c'),
print_r(
$exception,
true
)
);
}
exit;
}
// Init memory
try
{
$memory = new \Memcached();
$memory->addServer(
$config->memcached->server->host,
$config->memcached->server->port
);
}
catch (Exception $exception)
{
if ($config->cli->document->crawl->debug->level->error)
{
echo sprintf(
_('[%s] [error] %s') . PHP_EOL,
date('c'),
print_r(
$exception,
true
)
);
}
exit;
}
// Debug totals
if ($config->cli->document->crawl->debug->level->notice)
{
echo sprintf(
_('[%s] [notice] crawl queue begin...') . PHP_EOL,
date('c')
);
}
// Begin crawl queue
// thanks to @manticoresearch for help with random feature implementation:
// https://github.com/manticoresoftware/manticoresearch-php/discussions/176
foreach($index->search('')
->expression('random', 'rand()')
->sort('index', 'desc')
->sort('time', 'asc')
->sort('random', 'asc')
->limit($config->cli->document->crawl->queue->limit)
->get() as $document)
{
// Define data
$time = time();
$data =
[
'url' => $document->get('url'),
'h1' => $document->get('h1'),
'h2' => $document->get('h2'),
'h3' => $document->get('h3'),
'code' => $document->get('code'),
'size' => $document->get('size'),
'meta' => $document->get('meta'),
'rank' => $document->get('rank'),
'time' => $time,
'index' => 0
];
// Debug target
if ($config->cli->document->crawl->debug->level->notice)
{
echo sprintf(
_('[%s] [notice] index "%s" in "%s"') . PHP_EOL,
date('c'),
$document->get('url'),
$config->manticore->index->document->name
);
} // @TODO
// Init base address
$base = new \Yggverse\Net\Address(
$document->get('url')
);
// Init worker address
$address = new \Yggverse\Net\Address(
$document->get('url')
);
// Custom resolver enabled
if ($config->cli->document->crawl->resolver->enabled
&&
// Host still not resolved
\Yggverse\Net\Valid::domainHostName(
$address->getHost()
)
) {
// Generate memory ID
$id = sprintf(
'%s.%s.resolved',
$config->manticore->index->document->name,
$address->getHost()
);
// Check for cached results
if ($host = $memory->get($id))
{
$address->setHost(
$host
);
// Debug event
if ($config->cli->document->crawl->debug->level->notice)
{
echo sprintf(
_('[%s] [notice] resolve "%s" as "%s" from cache') . PHP_EOL,
date('c'),
$base->getHost(),
$address->getHost()
);
}
}
// Init connection loop until the address will be resolved
else
{
$attempt = 1;
do
{
// Resolve begin
$resolve = new \Yggverse\Net\Resolve(
$config->cli->document->crawl->resolver->records,
$config->cli->document->crawl->resolver->providers,
$config->cli->document->crawl->resolver->connection->timeout,
$config->cli->document->crawl->resolver->result->shuffle
);
$result = [];
$errors = [];
$resolved = $resolve->address(
$address,
$result,
$errors
);
if ($resolved)
{
// Update address
$address = $resolved;
// Update cache
$memory->set(
$id,
$address->getHost(),
$config->cli->document->crawl->resolver->result->cache->timeout + time()
);
// Debug event
if ($config->cli->document->crawl->debug->level->notice)
{
echo sprintf(
_('[%s] [notice] resolve "%s" as "%s"') . PHP_EOL,
date('c'),
$base->getHost(),
$address->getHost()
);
}
}
else
{
// Custom resolver required to continue
if ($config->cli->document->crawl->resolver->require)
{
// Log event
if ($config->cli->document->crawl->debug->level->warning)
{
echo sprintf(
_('[%s] [warning] could not resolve "%s" (attempt: %d, response: %s), wait for reconnection...') . PHP_EOL,
date('c'),
$base->getHost(),
$attempt++,
print_r(
$errors,
true
)
);
}
// Next connection delay
sleep(
$config->cli->document->crawl->resolver->connection->delay
);
}
// Use system-wide resolver, break the pending loop
else
{
if ($config->cli->document->crawl->debug->level->warning)
{
echo sprintf(
_('[%s] [warning] could not resolve "%s" (attempt: %d, response: %s), use system-wide...') . PHP_EOL,
date('c'),
$base->getHost(),
$attempt++,
print_r(
$errors,
true
)
);
}
break;
}
}
} while (!$resolved);
}
}
// Update index time anyway and set reset code to 51
$index->updateDocument(
[
'time' => time(),
'code' => 20,
'index' => 0
],
$document->getId()
);
// Prepare remote request
$request = new \Yggverse\Gemini\Client\Request(
$base->get(),
$address->get()
);
// Apply stream options
$request->setOptions(
[
'ssl' =>
[
'peer_name' => $base->getHost(), // SNI
'verify_peer' => $config->cli->document->crawl->connection->options->ssl->verify_peer,
'verify_peer_name' => $config->cli->document->crawl->connection->options->ssl->verify_peer_name
]
]
);
$response = new \Yggverse\Gemini\Client\Response(
$request->getResponse(
$config->cli->document->crawl->connection->timeout,
$config->cli->document->crawl->connection->length,
$length
)
);
// Begin request
if ($code = $response->getCode()) // @TODO process redirects
{
// Update status code
$data['code'] = $code;
// Update size or skip on empty
if ($length)
{
$data['size'] = $length;
} else continue;
// Update meta or skip on empty
if ($meta = $response->getMeta())
{
$data['meta'] = $meta;
// On document charset specified
if (preg_match('/charset=([^\s;]+)/i', $meta, $charset))
{
if (!empty($charset[1]))
{
// Get system encodings
foreach (mb_list_encodings() as $encoding)
{
if (strtolower($charset[1]) == strtolower($encoding))
{
// Convert response to UTF-8
$response->setBody(
mb_convert_encoding(
$response->getBody(),
'UTF-8',
$charset[1]
)
);
break;
}
}
}
}
} else continue;
// Gemtext parser
if (false !== stripos($response->getMeta(), 'text/gemini'))
{
$body = new \Yggverse\Gemini\Gemtext\Body(
$response->getBody()
);
// Get H1
$h1 = [];
foreach ($body->getH1() as $value)
{
$h1[] = $value;
}
$data['h1'] = implode(
',',
array_unique(
$h1
)
);
// Get H1
$h2 = [];
foreach ($body->getH2() as $value)
{
$h2[] = $value;
}
$data['h2'] = implode(
',',
array_unique(
$h2
)
);
// Get H3
$h3 = [];
foreach ($body->getH3() as $value)
{
$h3[] = $value;
}
$data['h3'] = implode(
',',
array_unique(
$h3
)
);
// Save document body text to index
$data['body'] = trim(
preg_replace(
'/[\s]{2,}/', // strip extra separators
' ',
$body->skipTags()
)
);
// Crawl links
$documents = [];
foreach ($body->getLinks() as $line)
{
$link = new \Yggverse\Gemini\Gemtext\Link(
$line
);
if ($url = $link->getAddress())
{
// Convert relative links to absolute
$address = new \Yggverse\Net\Address(
$url
);
if ($address->isRelative())
{
if ($absolute = $address->getAbsolute($base))
{
$url = $absolute;
}
}
// Regex rules
if (!preg_match($config->cli->document->crawl->url->regex, $url))
{
continue;
}
// External host rules
if (!$config->cli->document->crawl->url->external && $address->getHost() != $base->getHost())
{
continue;
}
$documents[] = $url;
}
}
// @TODO find document links by protocol ($body->findLinks('gemini'))
if ($documents)
{
foreach (array_unique($documents) as $url)
{
// Apply stripos condition
$skip = false;
foreach ($config->cli->document->crawl->url->skip->stripos as $condition)
{
if (false !== stripos($url, $condition)) {
$skip = true;
break;
}
}
if ($skip)
{
if ($config->cli->document->crawl->debug->level->notice)
{
echo sprintf(
_('[%s] [notice] skip "%s" by stripos condition "%s"') . PHP_EOL,
date('c'),
$url,
print_r(
$config->cli->document->crawl->url->skip->stripos,
true
)
);
}
continue;
}
// Save index
$url = trim(
$url
);
$crc32url = crc32(
$url
);
// Check url does not registered yet
$results = $index->search('')->filter('id', $crc32url)->get();
if (!$results->getTotal())
{
$index->addDocument(
[
'url' => $url
],
$crc32url
);
if ($config->cli->document->crawl->debug->level->notice)
{
echo sprintf(
_('[%s] [notice] add "%s" to "%s"') . PHP_EOL,
date('c'),
$url,
$config->manticore->index->document->name
);
}
}
// URL already exists
else
{
// Print notice level notice
if ($config->cli->document->crawl->debug->level->notice)
{
echo sprintf(
_('[%s] [notice] URL "%s" already registered with CRC32 "%d"') . PHP_EOL,
date('c'),
$url,
$crc32url
);
}
// Check for event details
foreach ($results as $result)
{
// Is collision
if ($url != $result->get('url'))
{
if ($config->cli->document->crawl->debug->level->warning)
{
echo sprintf(
_('[%s] [warning] ID "%d" collision for target url "%s" stored as "%s"') . PHP_EOL,
date('c'),
$crc32url,
$url,
$result->get('url')
);
}
}
}
}
}
}
}
// Replace document data
// https://github.com/manticoresoftware/manticoresearch-php/issues/10#issuecomment-612685916
$result = $index->replaceDocument(
$data,
$document->getId()
);
// Debug result
if ($config->cli->document->crawl->debug->level->notice)
{
echo sprintf(
'[%s] [notice] index "%s" updated: %s %s' . PHP_EOL,
date('c'),
$config->manticore->index->document->name,
print_r(
$result,
true
),
print_r(
$data,
true
),
);
}
// Create snap
if ($config->cli->document->crawl->snap->enabled && $response->getCode() === 20)
{
try
{
// Generate path
$time = time();
/// absolute
if (str_starts_with($config->snap->storage->tmp->directory, '/'))
{
$filepath = $config->snap->storage->tmp->directory;
}
/// relative
else
{
$filepath = __DIR__ . '/../../../' . $config->snap->storage->tmp->directory;
}
@mkdir($filepath, 0755, true);
$tmp = sprintf(
'%s/%s.%s.tar',
$filepath,
$document->getId(),
$time
);
// Compress response to archive
$snap = new PharData($tmp);
$snap->addFromString(
'DATA',
$response->getBody()
);
$snap->addFromString(
'META',
$response->getMeta()
);
$snap->addFromString(
'URL',
$document->get('url')
);
$snap->compress(
Phar::GZ
);
unlink( // remove tarball
$tmp
);
$tmp = sprintf(
'%s.gz',
$tmp
);
$filesize = filesize(
$tmp
);
// Copy to local storage on enabled
if ($config->snap->storage->local->enabled)
{
// Check for meta allowed
$allowed = false;
foreach ($config->snap->storage->local->meta->stripos as $whitelist)
{
if (false !== stripos($response->getMeta(), $whitelist))
{
$allowed = true;
break;
}
}
// Check for url allowed
if ($allowed)
{
$allowed = false;
foreach ($config->snap->storage->local->url->stripos as $whitelist)
{
if (false !== stripos($document->get('url'), $whitelist))
{
$allowed = true;
break;
}
}
// Check size limits
if ($allowed)
{
$allowed = false;
if ($filesize <= $config->snap->storage->local->size->max)
{
$allowed = true;
}
}
}
// Copy snap to the permanent storage
if ($allowed)
{
/// absolute
if (str_starts_with($config->snap->storage->local->directory, '/'))
{
$filepath = $config->snap->storage->local->directory;
}
/// relative
else
{
$filepath = __DIR__ . '/../../../' . $config->snap->storage->local->directory;
}
$filepath = sprintf(
'%s/%s',
$filepath,
implode(
'/',
str_split(
$document->getId()
)
)
);
@mkdir($filepath, 0755, true);
// Check latest snap older than defined in settings
if (time() - \Yggverse\YoTools\Snap::getTimeLast((array) scandir($filepath)) > $config->cli->document->crawl->snap->timeout)
{
$filename = sprintf(
'%s/%s',
$filepath,
sprintf(
'%s.tar.gz',
$time
)
);
if (copy($tmp, $filename))
{
if ($config->cli->document->crawl->debug->level->notice)
{
echo sprintf(
_('[%s] [notice] save snap to "%s" on local storage') . PHP_EOL,
date('c'),
$filename
);
}
}
else
{
if ($config->cli->document->crawl->debug->level->error)
{
echo sprintf(
_('[%s] [error] could not copy "%s" to "%s" on local storage') . PHP_EOL,
date('c'),
$tmp,
$filename
);
}
}
}
else
{
if ($config->cli->document->crawl->debug->level->notice)
{
echo sprintf(
_('[%s] [notice] local snap is up to date by timeout settings') . PHP_EOL,
date('c')
);
}
}
}
else
{
if ($config->cli->document->crawl->debug->level->notice)
{
echo sprintf(
_('[%s] [notice] local snap skipped by settings condition') . PHP_EOL,
date('c')
);
}
}
}
// Copy to FTP storage on enabled
foreach ($config->snap->storage->remote->ftp as $ftp)
{
// Resource enabled
if (!$ftp->enabled)
{
continue;
}
// Check for meta allowed
$allowed = false;
foreach ($ftp->meta->stripos as $whitelist)
{
if (false !== stripos($response->getMeta(), $whitelist))
{
$allowed = true;
break;
}
}
if (!$allowed)
{
continue;
}
// Check for url allowed
$allowed = false;
foreach ($ftp->url->stripos as $whitelist)
{
if (false !== stripos($document->get('url'), $whitelist))
{
$allowed = true;
break;
}
}
if (!$allowed)
{
continue;
}
// Check size limits
$allowed = false;
if ($filesize <= $ftp->size->max)
{
$allowed = true;
}
if (!$allowed)
{
if ($config->cli->document->crawl->debug->level->notice)
{
echo sprintf(
_('[%s] [notice] remote snap skipped on "%s" by settings condition') . PHP_EOL,
date('c'),
$ftp->connection->host
);
}
continue;
}
// Prepare location
$filepath = implode(
'/',
str_split(
$document->getId()
)
);
$filename = sprintf(
'%s/%s',
$filepath,
sprintf(
'%s.tar.gz',
$time
)
);
// Init connection
$attempt = 1;
do {
$remote = new \Yggverse\Ftp\Client();
$connection = $remote->connect(
$ftp->connection->host,
$ftp->connection->port,
$ftp->connection->username,
$ftp->connection->password,
$ftp->connection->directory,
$ftp->connection->timeout,
$ftp->connection->passive
);
// Remote host connected
if ($connection) {
$remote->mkdir(
$filepath,
true
);
// Check latest snap older than defined in settings
if (time() - \Yggverse\YoTools\Snap::getTimeLast((array) $remote->nlist($filepath)) > $config->cli->document->crawl->snap->timeout)
{
if ($remote->copy($tmp, $filename))
{
if ($config->cli->document->crawl->debug->level->notice)
{
echo sprintf(
_('[%s] [notice] save snap to "%s" on remote host "%s"') . PHP_EOL,
date('c'),
$filename,
$ftp->connection->host
);
}
}
else
{
if ($config->cli->document->crawl->debug->level->error)
{
echo sprintf(
_('[%s] [error] could not copy snap "%s" to "%s" on destination "%s"') . PHP_EOL,
date('c'),
$tmp,
$filename,
$ftp->connection->host
);
}
}
}
else
{
if ($config->cli->document->crawl->debug->level->notice)
{
echo sprintf(
_('[%s] [notice] remote snap on destination "%s" is up to date by timeout settings') . PHP_EOL,
date('c'),
$ftp->connection->host
);
}
}
$remote->close();
// On remote connection lost, repeat attempt
} else {
// Stop connection attempts on limit provided
if ($ftp->connection->attempts->limit > 0 && $attempt > $ftp->connection->attempts->limit)
{
break;
}
// Log event
if ($config->cli->document->crawl->debug->level->warning)
{
echo sprintf(
_('[%s] [warning] attempt: %s, wait for remote storage "%s" reconnection...') . PHP_EOL,
date('c'),
$attempt++,
$ftp->connection->host,
);
}
// Delay next attempt
if ($ftp->connection->attempts->delay)
{
if ($config->cli->document->crawl->debug->level->warning)
{
echo sprintf(
_('[%s] [warning] pending %s seconds to reconnect...') . PHP_EOL,
date('c'),
$ftp->connection->attempts->delay
);
}
sleep(
$ftp->connection->attempts->delay
);
}
}
} while ($connection === false);
}
// Remove tmp data
if (unlink($tmp))
{
if ($config->cli->document->crawl->debug->level->notice)
{
echo sprintf(
_('[%s] [notice] remove tmp snap file "%s"') . PHP_EOL,
date('c'),
$tmp
);
}
}
else
{
if ($config->cli->document->crawl->debug->level->error)
{
echo sprintf(
_('[%s] [error] could not remove tmp snap file "%s"') . PHP_EOL,
date('c'),
$tmp
);
}
}
}
catch (Exception $exception)
{
if ($config->cli->document->crawl->debug->level->error)
{
echo sprintf(
_('[%s] [error] %s') . PHP_EOL,
date('c'),
print_r(
$exception,
true
)
);
}
}
}
}
// Crawl queue delay
if ($config->cli->document->crawl->queue->delay)
{
if ($config->cli->document->crawl->debug->level->notice)
{
echo sprintf(
_('[%s] [notice] pending %s seconds...') . PHP_EOL,
date('c'),
$config->cli->document->crawl->queue->delay
);
}
sleep(
$config->cli->document->crawl->queue->delay
);
}
// Debug totals
if ($config->cli->document->crawl->debug->level->notice)
{
echo sprintf(
_('[%s] [notice] crawl queue completed in %s') . PHP_EOL,
date('c'),
microtime(true) - $microtime
);
}
}