@ -1,4 +1,4 @@
@@ -1,4 +1,4 @@
# VERSION: 2.8
# VERSION: 2.9
# AUTHORS: imDMG [imdmgg@gmail.com]
# NoNaMe-Club search engine plugin for qBittorrent
@ -7,14 +7,15 @@ import base64
@@ -7,14 +7,15 @@ import base64
import json
import logging
import re
import socket
import sys
import time
from concurrent . futures import ThreadPoolExecutor
from dataclasses import dataclass , field
from html import unescape
from http . cookiejar import Cookie , MozillaCookieJar
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Optional , Union
from urllib . error import URLError , HTTPError
from urllib . parse import urlencode , unquote
from urllib . request import build_opener , HTTPCookieProcessor , ProxyHandler
@ -25,30 +26,16 @@ except ImportError:
@@ -25,30 +26,16 @@ except ImportError:
sys . path . insert ( 0 , str ( Path ( __file__ ) . parent . parent . absolute ( ) ) )
from novaprinter import prettyPrinter
# default config
config = {
" torrentDate " : True ,
" username " : " USERNAME " ,
" password " : " PASSWORD " ,
" proxy " : False ,
" proxies " : {
" http " : " " ,
" https " : " "
} ,
" magnet " : True ,
" ua " : " Mozilla/5.0 (X11; Linux i686; rv:38.0) Gecko/20100101 Firefox/38.0 "
}
FILE = Path ( __file__ )
BASEDIR = FILE . parent . absolute ( )
FILENAME = FILE . name [ : - 3 ]
FILE_J , FILE_C = [ BASEDIR / ( FILENAME + fl ) for fl in [ ' .json ' , ' .cookie ' ] ]
FILE_J , FILE_C = [ BASEDIR / ( FILENAME + fl ) for fl in [ " .json " , " .cookie " ] ]
PAGES = 50
def rng ( t ) :
def rng ( t : int ) - > range :
return range ( PAGES , - ( - t / / PAGES ) * PAGES , PAGES )
@ -58,7 +45,7 @@ RE_TORRENTS = re.compile(
@@ -58,7 +45,7 @@ RE_TORRENTS = re.compile(
)
RE_RESULTS = re . compile ( r ' TP_VER " >(?:Результатов \ sпоиска: \ s( \ d { 1,3}))? \ s ' , re . S )
RE_CODE = re . compile ( r ' name= " code " \ svalue= " (.+?) " ' , re . S )
PATTERNS = ( ' %s tracker.php?nm= %s & %s ' , " %s &start= %s " )
PATTERNS = ( " %s tracker.php?nm= %s & %s " , " %s &start= %s " )
# base64 encoded image
ICON = ( " AAABAAEAEBAAAAEAIABoBAAAFgAAACgAAAAQAAAAIAAAAAEAIAAAAAAAAAAAAAAAAAAAAA "
@ -93,66 +80,105 @@ logging.basicConfig(
@@ -93,66 +80,105 @@ logging.basicConfig(
logger = logging . getLogger ( __name__ )
try :
config = json . loads ( FILE_J . read_text ( ) )
logger . debug ( " Config is loaded. " )
except OSError as e :
logger . error ( e )
# if file doesn't exist, we'll create it
FILE_J . write_text ( json . dumps ( config , indent = 4 , sort_keys = False ) )
# also write/rewrite ico file
( BASEDIR / ( FILENAME + ' .ico ' ) ) . write_bytes ( base64 . b64decode ( ICON ) )
logger . debug ( " Write files. " )
@dataclass
class Config :
username : str = " USERNAME "
password : str = " PASSWORD "
torrent_date : bool = True
# magnet: bool = False
proxy : bool = False
# dynamic_proxy: bool = True
proxies : dict = field ( default_factory = lambda : { " http " : " " , " https " : " " } )
ua : str = ( " Mozilla/5.0 (X11; Linux i686; rv:38.0) Gecko/20100101 "
" Firefox/38.0 " )
class NNMClub :
name = ' NoNaMe-Club '
url = ' https://nnmclub.to/forum/ '
url_dl = ' https://nnm-club.ws/ '
url_login = url + ' login.php '
supported_categories = { ' all ' : ' -1 ' ,
' movies ' : ' 14 ' ,
' tv ' : ' 27 ' ,
' music ' : ' 16 ' ,
' games ' : ' 17 ' ,
' anime ' : ' 24 ' ,
' software ' : ' 21 ' }
def __post_init__ ( self ) :
try :
if not self . _validate_json ( json . loads ( FILE_J . read_text ( ) ) ) :
raise ValueError ( " Incorrect json scheme. " )
except Exception as e :
logger . error ( e )
FILE_J . write_text ( self . to_str ( ) )
( BASEDIR / f " { FILENAME } .ico " ) . write_bytes ( base64 . b64decode ( ICON ) )
def to_str ( self ) - > str :
return json . dumps ( self . to_dict ( ) , indent = 4 , sort_keys = False )
def to_dict ( self ) - > dict :
return { self . _to_camel ( k ) : v for k , v in self . __dict__ . items ( ) }
def _validate_json ( self , obj : dict ) - > bool :
is_valid = True
for k , v in self . __dict__ . items ( ) :
_val = obj . get ( self . _to_camel ( k ) )
if type ( _val ) is not type ( v ) :
is_valid = False
continue
if type ( _val ) is dict :
for dk , dv in v . items ( ) :
if type ( _val . get ( dk ) ) is not type ( dv ) :
_val [ dk ] = dv
is_valid = False
setattr ( self , k , _val )
return is_valid
@staticmethod
def _to_camel ( s : str ) - > str :
return " " . join ( x . title ( ) if i else x
for i , x in enumerate ( s . split ( " _ " ) ) )
config = Config ( )
def __init__ ( self ) :
# error message
self . error = None
# establish connection
self . session = build_opener ( )
class NNMClub :
name = " NoNaMe-Club "
url = " https://nnmclub.to/forum/ "
url_dl = " https://nnm-club.ws/ "
url_login = url + " login.php "
supported_categories = { " all " : " -1 " ,
" movies " : " 14 " ,
" tv " : " 27 " ,
" music " : " 16 " ,
" games " : " 17 " ,
" anime " : " 24 " ,
" software " : " 21 " }
# error message
error : Optional [ str ] = None
# cookies
mcj = MozillaCookieJar ( )
# establish connection
session = build_opener ( HTTPCookieProcessor ( mcj ) )
def __init__ ( self ) :
# add proxy handler if needed
if config [ ' proxy ' ] :
if any ( config [ ' proxies ' ] . values ( ) ) :
self . session . add_handler ( ProxyHandler ( config [ ' proxies ' ] ) )
if config . proxy :
if any ( config . proxies . values ( ) ) :
self . session . add_handler ( ProxyHandler ( config . proxies ) )
logger . debug ( " Proxy is set! " )
else :
self . error = " Proxy enabled, but not set! "
# change user-agent
self . session . addheaders = [ ( ' User-Agent ' , config [ ' ua ' ] ) ]
self . session . addheaders = [ ( " User-Agent " , config . ua ) ]
# load local cookies
mcj = MozillaCookieJar ( )
try :
mcj . load ( FILE_C , ignore_discard = True )
key = ' phpbb2mysql_4_data '
if [ True for c in mcj if c . name == key and c . expires > time . time ( ) ] :
self . mcj . load ( FILE_C , ignore_discard = True )
if " phpbb2mysql_4_data " in [ cookie . name for cookie in self . mcj ] :
# if cookie.expires < int(time.time())
logger . info ( " Local cookies is loaded " )
self . session . add_handler ( HTTPCookieProcessor ( mcj ) )
else :
logger . info ( " Local cookies expired or bad " )
logger . debug ( f " That we have: { [ cookie for cookie in mcj ] } " )
mcj . clear ( )
self . login ( mcj )
logger . debug ( f " That we have: { [ cookie for cookie in self . mcj ] } " )
self . mcj . clear ( )
self . login ( )
except FileNotFoundError :
self . login ( mcj )
self . login ( )
def search ( self , what , cat = ' all ' ) :
def search ( self , what : str , cat : str = " all " ) - > None :
if self . error :
self . pretty_error ( what )
return None
@ -174,61 +200,86 @@ class NNMClub:
@@ -174,61 +200,86 @@ class NNMClub:
logger . debug ( f " --- { time . time ( ) - t0 } seconds --- " )
logger . info ( f " Found torrents: { total } " )
def download_torrent ( self , url : str ) :
def download_torrent ( self , url : str ) - > None :
# Download url
response = self . _catch_error_ request ( url )
response = self . _request ( url )
if self . error :
self . pretty_error ( url )
return None
# Create a torrent file
with NamedTemporaryFile ( suffix = ' .torrent ' , delete = False ) as fd :
with NamedTemporaryFile ( suffix = " .torrent " , delete = False ) as fd :
fd . write ( response )
# return file path
logger . debug ( fd . name + " " + url )
print ( fd . name + " " + url )
def login ( self , mcj ) :
def login ( self ) - > None :
if self . error :
return None
# if we wanna use https we mast add ssl=enable_ssl to cookie
mcj . set_cookie ( Cookie ( 0 , " ssl " , " enable_ssl " , None , False ,
" .nnmclub.to " , True , False , " / " , True ,
False , None , False , None , None , { } ) )
self . session . add_handler ( HTTPCookieProcessor ( mcj ) )
self . mcj . set_cookie ( Cookie ( 0 , " ssl " , " enable_ssl " , None , False ,
" .nnmclub.to " , True , False , " / " , True ,
False , None , False , None , None , { } ) )
response = self . _catch_error_request ( self . url_login )
if not response :
response = self . _request ( self . url_login )
if self . error :
return None
result = RE_CODE . search ( response . decode ( " cp1251 " ) )
if not result :
self . error = " Unexpected page content "
return None
code = RE_CODE . search ( response . decode ( ' cp1251 ' ) ) [ 1 ]
form_data = { " username " : config [ ' username ' ] ,
" password " : config [ ' password ' ] ,
form_data = { " username " : config . username ,
" password " : config . password ,
" autologin " : " on " ,
" code " : code ,
" code " : result [ 1 ] ,
" login " : " Вход " }
# so we first encode vals to cp1251 then do default decode whole string
data_encoded = urlencode (
{ k : v . encode ( ' cp1251 ' ) for k , v in form_data . items ( ) }
) . encode ( )
# encoding to cp1251 then do default encode whole string
data_encoded = urlencode ( form_data , encoding = " cp1251 " ) . encode ( )
self . _catch_error_ request ( self . url_login , data_encoded )
self . _request ( self . url_login , data_encoded )
if self . error :
return None
logger . debug ( f " That we have: { [ cookie for cookie in mcj ] } " )
if ' phpbb2mysql_4_sid ' in [ cookie . name for cookie in mcj ] :
mcj . save ( FILE_C , ignore_discard = True , ignore_expires = True )
logger . info ( ' We successfully authorized ' )
logger . debug ( f " That we have: { [ cookie for cookie in self . mcj ] } " )
if " phpbb2mysql_4_sid " in [ cookie . name for cookie in self . mcj ] :
self . mcj . save ( FILE_C , ignore_discard = True , ignore_expires = True )
logger . info ( " We successfully authorized " )
else :
self . error = " We not authorized, please check your credentials! "
logger . warning ( self . error )
def draw ( self , html : str ) :
torrents = RE_TORRENTS . findall ( html )
def searching ( self , query : str , first : bool = False ) - > Union [ None , int ] :
response = self . _request ( query )
if self . error :
return None
page , torrents_found = response . decode ( " cp1251 " ) , - 1
if first :
# check login status
if f " Выход [ { config . username } ] " not in page :
logger . debug ( " Looks like we lost session id, lets login " )
self . mcj . clear ( )
self . login ( )
if self . error :
return None
# firstly we check if there is a result
result = RE_RESULTS . search ( page )
if not result :
self . error = " Unexpected page content "
return None
torrents_found = int ( result [ 1 ] )
if not torrents_found :
return 0
self . draw ( page )
return torrents_found
for tor in torrents :
def draw ( self , html : str ) - > None :
for tor in RE_TORRENTS . findall ( html ) :
torrent_date = " "
if config [ ' torrentDate ' ] :
if config . torrent_date :
_loc = time . localtime ( int ( tor [ 6 ] ) )
torrent_date = f ' [ { time . strftime ( " % y. % m. %d " , _loc ) } ] '
@ -241,53 +292,32 @@ class NNMClub:
@@ -241,53 +292,32 @@ class NNMClub:
" seeds " : tor [ 4 ] ,
" leech " : tor [ 5 ]
} )
del torrents
def searching ( self , query , first = False ) :
response = self . _catch_error_request ( query )
if not response :
return None
page , torrents_found = response . decode ( ' cp1251 ' ) , - 1
if first :
# check login status
if f ' Выход [ { config [ " username " ] } ] ' not in page :
logger . debug ( " Looks like we lost session id, lets login " )
self . login ( MozillaCookieJar ( ) )
if self . error :
return None
# firstly we check if there is a result
torrents_found = int ( RE_RESULTS . search ( page ) [ 1 ] or 0 )
if not torrents_found :
return 0
self . draw ( page )
return torrents_found
def _catch_error_request ( self , url = None , data = None , repeated = False ) :
url = url or self . url
def _request (
self , url : str , data : Optional [ bytes ] = None , repeated : bool = False
) - > Union [ bytes , None ] :
try :
with self . session . open ( url , data , 5 ) as r :
# checking that tracker isn't blocked
if r . url . startswith ( ( self . url , self . url_dl ) ) :
if r . geturl ( ) . startswith ( ( self . url , self . url_dl ) ) :
return r . read ( )
raise URLError ( f " { self . url } is blocked. Try another proxy. " )
except ( socket . error , socket . timeout ) as err :
if not repeated :
return self . _catch_error_request ( url , data , True )
logger . error ( err )
self . error = f " { self . url } is not response! Maybe it is blocked. "
if " no host given " in err . args :
self . error = " Proxy is bad, try another! "
self . error = f " { url } is blocked. Try another proxy. "
except ( URLError , HTTPError ) as err :
logger . error ( err . reason )
self . error = err . reason
if hasattr ( err , ' code ' ) :
error = str ( err . reason )
if " timed out " in error and not repeated :
logger . debug ( " Repeating request... " )
return self . _request ( url , data , True )
if " no host given " in error :
self . error = " Proxy is bad, try another! "
elif hasattr ( err , " code " ) :
self . error = f " Request to { url } failed with status: { err . code } "
else :
self . error = f " { url } is not response! Maybe it is blocked. "
return None
def pretty_error ( self , what ) :
def pretty_error ( self , what : str ) - > None :
prettyPrinter ( { " engine_url " : self . url ,
" desc_link " : " https://github.com/imDMG/qBt_SE " ,
" name " : f " [ { unquote ( what ) } ][Error]: { self . error } " ,
@ -303,9 +333,9 @@ class NNMClub:
@@ -303,9 +333,9 @@ class NNMClub:
nnmclub = NNMClub
if __name__ == " __main__ " :
if BASEDIR . parent . joinpath ( ' settings_gui.py ' ) . exists ( ) :
if BASEDIR . parent . joinpath ( " settings_gui.py " ) . exists ( ) :
from settings_gui import EngineSettingsGUI
EngineSettingsGUI ( FILENAME )
engine = nnmclub ( )
engine . search ( ' doctor ' )
engine . search ( " doctor " )