[enh] timeout and total HTTP time are managed by searx.poolrequests
This commit is contained in:
		
							parent
							
								
									bf02b8613f
								
							
						
					
					
						commit
						c1cfe97851
					
				| @ -1,8 +1,9 @@ | |||||||
| import requests | import requests | ||||||
| 
 | 
 | ||||||
| from itertools import cycle | from itertools import cycle | ||||||
| from threading import RLock | from threading import RLock, local | ||||||
| from searx import settings | from searx import settings | ||||||
|  | from time import time | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class HTTPAdapterWithConnParams(requests.adapters.HTTPAdapter): | class HTTPAdapterWithConnParams(requests.adapters.HTTPAdapter): | ||||||
| @ -41,6 +42,7 @@ class HTTPAdapterWithConnParams(requests.adapters.HTTPAdapter): | |||||||
|                               block=self._pool_block, **self._conn_params) |                               block=self._pool_block, **self._conn_params) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | threadLocal = local() | ||||||
| connect = settings['outgoing'].get('pool_connections', 100)  # Magic number kept from previous code | connect = settings['outgoing'].get('pool_connections', 100)  # Magic number kept from previous code | ||||||
| maxsize = settings['outgoing'].get('pool_maxsize', requests.adapters.DEFAULT_POOLSIZE)  # Picked from constructor | maxsize = settings['outgoing'].get('pool_maxsize', requests.adapters.DEFAULT_POOLSIZE)  # Picked from constructor | ||||||
| if settings['outgoing'].get('source_ips'): | if settings['outgoing'].get('source_ips'): | ||||||
| @ -72,12 +74,57 @@ class SessionSinglePool(requests.Session): | |||||||
|         super(SessionSinglePool, self).close() |         super(SessionSinglePool, self).close() | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | def set_timeout_for_thread(timeout, start_time=None): | ||||||
|  |     threadLocal.timeout = timeout | ||||||
|  |     threadLocal.start_time = start_time | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def reset_time_for_thread(): | ||||||
|  |     threadLocal.total_time = 0 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def get_time_for_thread(): | ||||||
|  |     return threadLocal.total_time | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| def request(method, url, **kwargs): | def request(method, url, **kwargs): | ||||||
|     """same as requests/requests/api.py request(...) except it use SessionSinglePool and force proxies""" |     """same as requests/requests/api.py request(...)""" | ||||||
|  |     time_before_request = time() | ||||||
|  | 
 | ||||||
|  |     # session start | ||||||
|     session = SessionSinglePool() |     session = SessionSinglePool() | ||||||
|  | 
 | ||||||
|  |     # proxies | ||||||
|     kwargs['proxies'] = settings['outgoing'].get('proxies') or None |     kwargs['proxies'] = settings['outgoing'].get('proxies') or None | ||||||
|  | 
 | ||||||
|  |     # timeout | ||||||
|  |     if 'timeout' in kwargs: | ||||||
|  |         timeout = kwargs['timeout'] | ||||||
|  |     else: | ||||||
|  |         timeout = getattr(threadLocal, 'timeout', None) | ||||||
|  |         if timeout is not None: | ||||||
|  |             kwargs['timeout'] = timeout | ||||||
|  | 
 | ||||||
|  |     # do request | ||||||
|     response = session.request(method=method, url=url, **kwargs) |     response = session.request(method=method, url=url, **kwargs) | ||||||
|  | 
 | ||||||
|  |     time_after_request = time() | ||||||
|  | 
 | ||||||
|  |     # is there a timeout for this engine ? | ||||||
|  |     if timeout is not None: | ||||||
|  |         timeout_overhead = 0.2  # seconds | ||||||
|  |         # start_time = when the user request started | ||||||
|  |         start_time = getattr(threadLocal, 'start_time', time_before_request) | ||||||
|  |         search_duration = time_after_request - start_time | ||||||
|  |         if search_duration > timeout + timeout_overhead: | ||||||
|  |             raise requests.exceptions.Timeout(response=response) | ||||||
|  | 
 | ||||||
|  |     # session end | ||||||
|     session.close() |     session.close() | ||||||
|  | 
 | ||||||
|  |     # | ||||||
|  |     threadLocal.total_time += time_after_request - time_before_request | ||||||
|  | 
 | ||||||
|     return response |     return response | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -47,16 +47,12 @@ logger = logger.getChild('search') | |||||||
| number_of_searches = 0 | number_of_searches = 0 | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def send_http_request(engine, request_params, start_time, timeout_limit): | def send_http_request(engine, request_params): | ||||||
|     # for page_load_time stats |  | ||||||
|     time_before_request = time() |  | ||||||
| 
 |  | ||||||
|     # create dictionary which contain all |     # create dictionary which contain all | ||||||
|     # informations about the request |     # informations about the request | ||||||
|     request_args = dict( |     request_args = dict( | ||||||
|         headers=request_params['headers'], |         headers=request_params['headers'], | ||||||
|         cookies=request_params['cookies'], |         cookies=request_params['cookies'], | ||||||
|         timeout=timeout_limit, |  | ||||||
|         verify=request_params['verify'] |         verify=request_params['verify'] | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
| @ -68,29 +64,10 @@ def send_http_request(engine, request_params, start_time, timeout_limit): | |||||||
|         request_args['data'] = request_params['data'] |         request_args['data'] = request_params['data'] | ||||||
| 
 | 
 | ||||||
|     # send the request |     # send the request | ||||||
|     response = req(request_params['url'], **request_args) |     return req(request_params['url'], **request_args) | ||||||
| 
 |  | ||||||
|     # is there a timeout (no parsing in this case) |  | ||||||
|     timeout_overhead = 0.2  # seconds |  | ||||||
|     time_after_request = time() |  | ||||||
|     search_duration = time_after_request - start_time |  | ||||||
|     if search_duration > timeout_limit + timeout_overhead: |  | ||||||
|         raise requests.exceptions.Timeout(response=response) |  | ||||||
| 
 |  | ||||||
|     with threading.RLock(): |  | ||||||
|         # no error : reset the suspend variables |  | ||||||
|         engine.continuous_errors = 0 |  | ||||||
|         engine.suspend_end_time = 0 |  | ||||||
|         # update stats with current page-load-time |  | ||||||
|         # only the HTTP request |  | ||||||
|         engine.stats['page_load_time'] += time_after_request - time_before_request |  | ||||||
|         engine.stats['page_load_count'] += 1 |  | ||||||
| 
 |  | ||||||
|     # everything is ok : return the response |  | ||||||
|     return response |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def search_one_request(engine, query, request_params, start_time, timeout_limit): | def search_one_request(engine, query, request_params): | ||||||
|     # update request parameters dependent on |     # update request parameters dependent on | ||||||
|     # search-engine (contained in engines folder) |     # search-engine (contained in engines folder) | ||||||
|     engine.request(query, request_params) |     engine.request(query, request_params) | ||||||
| @ -103,7 +80,7 @@ def search_one_request(engine, query, request_params, start_time, timeout_limit) | |||||||
|         return [] |         return [] | ||||||
| 
 | 
 | ||||||
|     # send request |     # send request | ||||||
|     response = send_http_request(engine, request_params, start_time, timeout_limit) |     response = send_http_request(engine, request_params) | ||||||
| 
 | 
 | ||||||
|     # parse the response |     # parse the response | ||||||
|     response.search_params = request_params |     response.search_params = request_params | ||||||
| @ -111,11 +88,20 @@ def search_one_request(engine, query, request_params, start_time, timeout_limit) | |||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def search_one_request_safe(engine_name, query, request_params, result_container, start_time, timeout_limit): | def search_one_request_safe(engine_name, query, request_params, result_container, start_time, timeout_limit): | ||||||
|  |     # set timeout for all HTTP requests | ||||||
|  |     requests_lib.set_timeout_for_thread(timeout_limit, start_time=start_time) | ||||||
|  |     # reset the HTTP total time | ||||||
|  |     requests_lib.reset_time_for_thread() | ||||||
|  | 
 | ||||||
|  |     # | ||||||
|     engine = engines[engine_name] |     engine = engines[engine_name] | ||||||
| 
 | 
 | ||||||
|  |     # suppose everything will be alright | ||||||
|  |     requests_exception = False | ||||||
|  | 
 | ||||||
|     try: |     try: | ||||||
|         # send requests and parse the results |         # send requests and parse the results | ||||||
|         search_results = search_one_request(engine, query, request_params, start_time, timeout_limit) |         search_results = search_one_request(engine, query, request_params) | ||||||
| 
 | 
 | ||||||
|         # add results |         # add results | ||||||
|         result_container.extend(engine_name, search_results) |         result_container.extend(engine_name, search_results) | ||||||
| @ -124,14 +110,15 @@ def search_one_request_safe(engine_name, query, request_params, result_container | |||||||
|         with threading.RLock(): |         with threading.RLock(): | ||||||
|             engine.stats['engine_time'] += time() - start_time |             engine.stats['engine_time'] += time() - start_time | ||||||
|             engine.stats['engine_time_count'] += 1 |             engine.stats['engine_time_count'] += 1 | ||||||
| 
 |             # update stats with the total HTTP time | ||||||
|         return True |             engine.stats['page_load_time'] += requests_lib.get_time_for_thread() | ||||||
|  |             engine.stats['page_load_count'] += 1 | ||||||
| 
 | 
 | ||||||
|     except Exception as e: |     except Exception as e: | ||||||
|         engine.stats['errors'] += 1 |  | ||||||
| 
 |  | ||||||
|         search_duration = time() - start_time |         search_duration = time() - start_time | ||||||
|         requests_exception = False | 
 | ||||||
|  |         with threading.RLock(): | ||||||
|  |             engine.stats['errors'] += 1 | ||||||
| 
 | 
 | ||||||
|         if (issubclass(e.__class__, requests.exceptions.Timeout)): |         if (issubclass(e.__class__, requests.exceptions.Timeout)): | ||||||
|             result_container.add_unresponsive_engine((engine_name, gettext('timeout'))) |             result_container.add_unresponsive_engine((engine_name, gettext('timeout'))) | ||||||
| @ -152,14 +139,17 @@ def search_one_request_safe(engine_name, query, request_params, result_container | |||||||
|             # others errors |             # others errors | ||||||
|             logger.exception('engine {0} : exception : {1}'.format(engine_name, e)) |             logger.exception('engine {0} : exception : {1}'.format(engine_name, e)) | ||||||
| 
 | 
 | ||||||
|         # update continuous_errors / suspend_end_time |     # suspend or not the engine if there are HTTP errors | ||||||
|  |     with threading.RLock(): | ||||||
|         if requests_exception: |         if requests_exception: | ||||||
|             with threading.RLock(): |             # update continuous_errors / suspend_end_time | ||||||
|                 engine.continuous_errors += 1 |             engine.continuous_errors += 1 | ||||||
|                 engine.suspend_end_time = time() + min(60, engine.continuous_errors) |             engine.suspend_end_time = time() + min(60, engine.continuous_errors) | ||||||
| 
 |         else: | ||||||
|         # |             # no HTTP error (perhaps an engine error) | ||||||
|         return False |             # anyway, reset the suspend variables | ||||||
|  |             engine.continuous_errors = 0 | ||||||
|  |             engine.suspend_end_time = 0 | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def search_multiple_requests(requests, result_container, start_time, timeout_limit): | def search_multiple_requests(requests, result_container, start_time, timeout_limit): | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user
	 Alexandre Flament
						Alexandre Flament