Browse Source

make transaction for each item in crawl queue

main
ghost 2 years ago
parent
commit
4b16b41440
  1. 92
      crontab/crawler.php

92
crontab/crawler.php

@ -43,15 +43,25 @@ $hostPagesBanned = 0;
$hostPagesSnapAdded = 0; $hostPagesSnapAdded = 0;
// Connect database // Connect database
try {
$db = new MySQL(DB_HOST, DB_PORT, DB_NAME, DB_USERNAME, DB_PASSWORD); $db = new MySQL(DB_HOST, DB_PORT, DB_NAME, DB_USERNAME, DB_PASSWORD);
$db->beginTransaction(); } catch(Exception $e) {
try { // Debug std
var_dump($e);
exit;
}
// Process manifests crawl queue // Process manifests crawl queue
foreach ($db->getManifestCrawlQueue(CRAWL_MANIFEST_LIMIT, time() - CRAWL_MANIFEST_SECONDS_OFFSET) as $queueManifest) { foreach ($db->getManifestCrawlQueue(CRAWL_MANIFEST_LIMIT, time() - CRAWL_MANIFEST_SECONDS_OFFSET) as $queueManifest) {
$db->beginTransaction();
try {
$curl = new Curl($queueManifest->url, CRAWL_CURLOPT_USERAGENT); $curl = new Curl($queueManifest->url, CRAWL_CURLOPT_USERAGENT);
// Update curl stats // Update curl stats
@ -66,18 +76,24 @@ try {
// Skip processing non 200 code // Skip processing non 200 code
if (200 != $curl->getCode()) { if (200 != $curl->getCode()) {
$db->commit();
continue; continue;
} }
// Skip processing without returned data // Skip processing without returned data
if (!$remoteManifest = $curl->getContent()) { if (!$remoteManifest = $curl->getContent()) {
$db->commit();
continue; continue;
} }
// Skip processing on json encoding error // Skip processing on json encoding error
if (!$remoteManifest = @json_decode($remoteManifest)) { if (!$remoteManifest = @json_decode($remoteManifest)) {
$db->commit();
continue; continue;
} }
@ -87,30 +103,40 @@ try {
empty($remoteManifest->result->api->version) || empty($remoteManifest->result->api->version) ||
empty($remoteManifest->result->api->hosts)) { empty($remoteManifest->result->api->hosts)) {
$db->commit();
continue; continue;
} }
// Skip processing on API version not compatible // Skip processing on API version not compatible
if ($remoteManifest->result->api->version !== CRAWL_MANIFEST_API_VERSION) { if ($remoteManifest->result->api->version !== CRAWL_MANIFEST_API_VERSION) {
$db->commit();
continue; continue;
} }
// Skip processing on host API not available // Skip processing on host API not available
if (!$remoteManifest->result->api->hosts) { if (!$remoteManifest->result->api->hosts) {
$db->commit();
continue; continue;
} }
// Skip processing on crawlUrlRegexp does not match CRAWL_URL_REGEXP condition // Skip processing on crawlUrlRegexp does not match CRAWL_URL_REGEXP condition
if ($remoteManifest->result->config->crawlUrlRegexp !== CRAWL_URL_REGEXP) { if ($remoteManifest->result->config->crawlUrlRegexp !== CRAWL_URL_REGEXP) {
$db->commit();
continue; continue;
} }
// Skip processing on host link does not match condition // Skip processing on host link does not match condition
if (false === preg_match(CRAWL_URL_REGEXP, $remoteManifest->result->api->hosts)) { if (false === preg_match(CRAWL_URL_REGEXP, $remoteManifest->result->api->hosts)) {
$db->commit();
continue; continue;
} }
@ -126,18 +152,24 @@ try {
// Skip processing non 200 code // Skip processing non 200 code
if (200 != $curl->getCode()) { if (200 != $curl->getCode()) {
$db->commit();
continue; continue;
} }
// Skip processing without returned data // Skip processing without returned data
if (!$remoteManifestHosts = $curl->getContent()) { if (!$remoteManifestHosts = $curl->getContent()) {
$db->commit();
continue; continue;
} }
// Skip processing on json encoding error // Skip processing on json encoding error
if (!$remoteManifestHosts = @json_decode($remoteManifestHosts)) { if (!$remoteManifestHosts = @json_decode($remoteManifestHosts)) {
$db->commit();
continue; continue;
} }
@ -145,6 +177,8 @@ try {
if (empty($remoteManifestHosts->status) || if (empty($remoteManifestHosts->status) ||
empty($remoteManifestHosts->result)) { empty($remoteManifestHosts->result)) {
$db->commit();
continue; continue;
} }
@ -212,11 +246,30 @@ try {
} }
} }
} }
// Apply changes
$db->commit();
// Process update errors
} catch (Exception $e) {
// Debug std
var_dump($e);
// Skip item
$db->rollBack();
continue;
}
} }
// Process pages crawl queue // Process pages crawl queue
foreach ($db->getHostPageCrawlQueue(CRAWL_PAGE_LIMIT, time() - CRAWL_PAGE_SECONDS_OFFSET) as $queueHostPage) { foreach ($db->getHostPageCrawlQueue(CRAWL_PAGE_LIMIT, time() - CRAWL_PAGE_SECONDS_OFFSET) as $queueHostPage) {
$db->beginTransaction();
try {
// Build URL from the DB // Build URL from the DB
$queueHostPageURL = $queueHostPage->scheme . '://' . $queueHostPage->name . ($queueHostPage->port ? ':' . $queueHostPage->port : false) . $queueHostPage->uri; $queueHostPageURL = $queueHostPage->scheme . '://' . $queueHostPage->name . ($queueHostPage->port ? ':' . $queueHostPage->port : false) . $queueHostPage->uri;
@ -253,6 +306,8 @@ try {
if (empty($match[1])) { if (empty($match[1])) {
$db->commit();
continue; continue;
} }
@ -332,6 +387,8 @@ try {
// When page is root, skip next operations // When page is root, skip next operations
if ($hostPageURI->string == '/') { if ($hostPageURI->string == '/') {
$db->commit();
continue; continue;
} }
} }
@ -373,6 +430,8 @@ try {
} }
// Skip other this page actions // Skip other this page actions
$db->commit();
continue; continue;
} }
@ -386,6 +445,8 @@ try {
$hostPagesBanned += $db->updateHostPageTimeBanned($queueHostPage->hostPageId, time()); $hostPagesBanned += $db->updateHostPageTimeBanned($queueHostPage->hostPageId, time());
$db->commit();
continue; continue;
} }
@ -418,6 +479,8 @@ try {
$hostPagesBanned += $db->updateHostPageTimeBanned($queueHostPage->hostPageId, time()); $hostPagesBanned += $db->updateHostPageTimeBanned($queueHostPage->hostPageId, time());
$db->commit();
continue; continue;
} }
@ -428,6 +491,8 @@ try {
// This case possible for multimedia/streaming resources index // This case possible for multimedia/streaming resources index
// $hostPagesBanned += $db->updateHostPageTimeBanned($queueHostPage->hostPageId, time()); // $hostPagesBanned += $db->updateHostPageTimeBanned($queueHostPage->hostPageId, time());
$db->commit();
continue; continue;
} }
@ -453,6 +518,8 @@ try {
$hostPagesBanned += $db->updateHostPageTimeBanned($queueHostPage->hostPageId, time()); $hostPagesBanned += $db->updateHostPageTimeBanned($queueHostPage->hostPageId, time());
$db->commit();
continue; continue;
} else { } else {
@ -928,7 +995,6 @@ try {
} }
} }
} }
}
// Apply changes // Apply changes
$db->commit(); $db->commit();
@ -936,9 +1002,6 @@ try {
// Process update errors // Process update errors
} catch (Exception $e) { } catch (Exception $e) {
// Decline DB changes
$db->rollBack();
// Debug std // Debug std
var_dump($e); var_dump($e);
@ -947,16 +1010,15 @@ try {
!empty($e->errorInfo[0]) && in_array($e->errorInfo[0], ['HY000']) && !empty($e->errorInfo[0]) && in_array($e->errorInfo[0], ['HY000']) &&
!empty($e->errorInfo[1]) && in_array($e->errorInfo[1], [1366])) { // @TODO !empty($e->errorInfo[1]) && in_array($e->errorInfo[1], [1366])) { // @TODO
$hostPagesBanned = $db->updateHostPageTimeBanned($queueHostPage->hostPageId, time()); $hostPagesBanned += $db->updateHostPageTimeBanned($queueHostPage->hostPageId, time());
// Reset counters $hostPagesProcessed++;
$hostPagesProcessed = $hostPagesBanned; }
$manifestsProcessed = 0;
$hostPagesIndexed = 0; // Skip item
$manifestsAdded = 0; $db->rollBack();
$hostPagesAdded = 0;
$hostsAdded = 0; continue;
$hostPagesSnapAdded = 0;
} }
} }

Loading…
Cancel
Save