Source code for bioservices.services

#
#  This file is part of bioservices software
#
#  Copyright (c) 2013-2014 - EBI-EMBL
#
#  File author(s):
#      https://github.com/cokelaer/bioservices
#
#  Distributed under the GPLv3 License.
#  See accompanying file LICENSE.txt or copy at
#      http://www.gnu.org/licenses/gpl-3.0.html
#
#  source: http://github.com/cokelaer/bioservices
#  documentation: http://packages.python.org/bioservices
#
##############################################################################
"""Modules with common tools to access web resources"""
import os
import platform
import time
import traceback
from urllib.error import HTTPError, URLError
from urllib.request import urlopen

import colorlog
from easydev import DevTools

from bioservices.settings import BioServicesConfig

__all__ = ["Service", "BioServicesError", "HTTPResponseError", "REST"]


[docs]class BioServicesError(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value)
[docs]class HTTPResponseError(int): """Returned when a remote service replies with a non-2xx HTTP status code. Behaves exactly like an ``int`` so that existing code that does:: if res == 404: ... if isinstance(res, int): ... continues to work. However, trying to use the return value as a mapping or sequence — the most common source of the cryptic ``TypeError: 'int' object is not subscriptable`` — raises a :class:`BioServicesError` with a human-readable explanation instead. Attributes ---------- status_code : int The HTTP status code (e.g. 404). reason : str The HTTP reason phrase (e.g. ``"Not Found"``). url : str The URL that produced the error. """ # Map common status codes to a short user-facing hint. _HINTS = { 400: "Check the query parameters — the server could not understand the request.", 401: "Authentication is required. Check your credentials or API token.", 403: "Access denied. You may not have permission to access this resource.", 404: "The requested resource was not found. Check the identifier or query.", 408: "The request timed out. The remote service may be busy; try again later.", 429: "Too many requests. Slow down or add a delay between calls.", 500: "Internal server error. The remote service is experiencing issues; try again later.", 503: "Service unavailable. The remote service is down or being maintained; try again later.", } status_code: int reason: str url: str def __new__(cls, status_code, reason="", url=""): obj = super().__new__(cls, status_code) obj.status_code = int(status_code) obj.reason = reason obj.url = url return obj # ------------------------------------------------------------------ # # Nicer representations # # ------------------------------------------------------------------ # def __repr__(self): return f"HTTPResponseError({self.status_code}: {self.reason})" def __str__(self): return f"HTTP {self.status_code} {self.reason}" # ------------------------------------------------------------------ # # Raise a friendly error whenever the caller treats the result as a # # dict, list, or other subscriptable / iterable object. # # ------------------------------------------------------------------ # def _raise_friendly(self): hint = self._HINTS.get(self.status_code, "Check the input and try again.") url_part = f" (URL: {self.url!r})" if self.url else "" raise BioServicesError(f"HTTP {self.status_code} {self.reason}{url_part}. {hint}") def __getitem__(self, key): self._raise_friendly() def __iter__(self): self._raise_friendly() def __len__(self): self._raise_friendly()
[docs] def keys(self): self._raise_friendly()
[docs] def values(self): self._raise_friendly()
[docs] def items(self): self._raise_friendly()
[docs]class Service: """Base class for REST service classes .. seealso:: :class:`REST` """ #: some useful response codes response_codes = { 200: "OK", 201: "Created", 400: "Bad Request. There is a problem with your input", 404: "Not found. The resource you requests does not exist", 405: "Method not allowed", 406: "Not Acceptable. Usually headers issue", 410: "Gone. The resource you requested was removed.", 415: "Unsupported Media Type", 500: "Internal server error. Most likely a temporary problem", 503: "Service not available. The server is being updated, try again later", } def __init__(self, name, url=None, verbose=True, requests_per_sec=10, url_defined_later=False): """.. rubric:: Constructor :param str name: a name for this service :param str url: its URL :param bool verbose: prints informative messages if True (default is True) :param requests_per_sec: maximum number of requests per seconds are restricted to 3. You can change that value. If you reach the limit, an error is raise. The reason for this limitation is that some services (e.g.., NCBI) may black list you IP. If you need or can do more (e.g., ChEMBL does not seem to have restrictions), change the value. You can also have several instance but again, if you send too many requests at the same, your future requests may be retricted. Currently implemented for REST only All instances have an attribute called :attr:`~Service.logging` that is an instanceof the :mod:`logging` module. It can be used to print information, warning, error messages:: self.logging.info("informative message") self.logging.warning("warning message") self.logging.error("error message") The attribute :attr:`~Service.debugLevel` can be used to set the behaviour of the logging messages. If the argument verbose is True, the debugLebel is set to INFO. If verbose if False, the debugLevel is set to WARNING. However, you can use the :attr:`debugLevel` attribute to change it to one of DEBUG, INFO, WARNING, ERROR, CRITICAL. debugLevel=WARNING means that only WARNING, ERROR and CRITICAL messages are shown. """ super(Service, self).__init__() self.requests_per_sec = requests_per_sec self.name = name self.logging = colorlog.getLogger(f"bioservices.{self.name}") if verbose: self.logging.setLevel("INFO") else: self.logging.setLevel("WARNING") self._url = url try: if self._url is not None: urlopen(self._url) except HTTPError: # The server returned an HTTP error code (4xx or 5xx), # but it is reachable, so no warning is needed. pass except URLError: if url_defined_later is False: self.logging.warning("The URL (%s) provided cannot be reached." % self._url) self.devtools = DevTools() self.settings = BioServicesConfig() self._last_call = 0 def _calls(self): time_lapse = 1.0 / self.requests_per_sec current_time = time.time() dt = current_time - self._last_call if self._last_call == 0: self._last_call = current_time return else: self._last_call = current_time if dt > time_lapse: return else: time.sleep(time_lapse - dt) def _get_caching(self): return self.settings.params["cache.on"][0] def _set_caching(self, caching): self.devtools.check_param_in_list(caching, [True, False]) self.settings.params["cache.on"][0] = caching # reset the session, which will be automatically created if we # access to the session attribute self._session = None CACHING = property(_get_caching, _set_caching) def _get_url(self): return self._url def _set_url(self, url): # something more clever here to check the URL e.g. starts with http if url is not None: url = url.rstrip("/") self._url = url url = property(_get_url, _set_url, doc="URL of this service") def __str__(self): txt = "This is an instance of %s service" % self.name return txt
[docs] def pubmed(self, Id): """Open a pubmed Id into a browser tab :param Id: a valid pubmed Id in string or integer format. The URL is a concatenation of the pubmed URL http://www.ncbi.nlm.nih.gov/pubmed/ and the provided Id. """ url = "http://www.ncbi.nlm.nih.gov/pubmed/" import webbrowser webbrowser.open(url + str(Id))
[docs] def on_web(self, url): """Open a URL into a browser""" import webbrowser webbrowser.open(url)
[docs] def save_str_to_image(self, data, filename): """Save string object into a file converting into binary""" with open(filename, "wb") as f: import binascii try: # python3 newres = binascii.a2b_base64(bytes(data, "utf-8")) except Exception: newres = binascii.a2b_base64(data) f.write(newres)
class RESTbase(Service): _service = "REST" def __init__(self, name, url=None, verbose=True, requests_per_sec=3, url_defined_later=False): super(RESTbase, self).__init__( name, url, verbose=verbose, requests_per_sec=requests_per_sec, url_defined_later=url_defined_later, ) self.logging.info("Initialising %s service (REST)" % self.name) self.last_response = None def http_get(self): # should return unicode raise NotImplementedError def http_post(self): raise NotImplementedError def http_put(self): raise NotImplementedError def http_delete(self): raise NotImplementedError import requests # replacement for urllib2 (2-3 times faster) import requests_cache # use caching wihh requests from requests.models import Response # import grequests # use asynchronous requests with gevent # Note that grequests should be imported after requests_cache. Otherwise, # one should use a session instance when calling grequests.get, which we do # here below
[docs]class REST(RESTbase): """ The ideas (sync/async) and code using requests were inspired from the chembl python wrapper but significantly changed. Get one value:: >>> from bioservices import REST >>> s = REST("test", "https://www.ebi.ac.uk/chemblws") >>> res = s.get_one("targets/CHEMBL2476.json", "json") >>> res['organism'] u'Homo sapiens' The caching has two major interests. First one is that it speed up requests if you repeat requests. :: >>> s = REST("test", "https://www.ebi.ac.uk/chemblws") >>> s.CACHING = True >>> # requests will be stored in a local sqlite database >>> s.get_one("targets/CHEMBL2476") >>> # Disconnect your wiki and any network connections. >>> # Without caching you cannot fetch any requests but with >>> # the CACHING on, you can retrieve previous requests: >>> s.get_one("targets/CHEMBL2476") Advantages of requests over urllib requests length is not limited to 2000 characters http://www.g-loaded.eu/2008/10/24/maximum-url-length/ There is no need for authentication if the web services available in bioservices except for a few exception. In such case, the username and password are to be provided with the method call. However, in the future if a services requires authentication, one can set the attribute :attr:`authentication` to a tuple:: s = REST() s.authentication = ('user', 'pass') Note about headers and content type. The Accept header is used by HTTP clients to tell the server what content types they will accept. The server will then send back a response, which will include a Content-Type header telling the client what the content type of the returned content actually is. When using the :meth:`get__headers`, you can see the User-Agent, the Accept and Content-Type keys. So, here the HTTP requests also contain Content-Type headers. In POST or PUT requests the client is actually sendingdata to the server as part of the request, and the Content-Type header tells the server what the data actually is For a POST request resulting from an HTML form submission, the Content-Type of the request should be one of the standard form content types: application/x-www-form-urlencoded (default, older, simpler) or multipart/form-data (newer, adds support for file uploads) """ content_types = { "bed": "text/x-bed", "default": "application/x-www-form-urlencoded", "gff3": "text/x-gff3", "fasta": "text/x-fasta", "json": "application/json", "jsonp": "text/javascript", "nh": "text/x-nh", "phylip": "text/x-phyloxml+xml", "phyloxml": "text/x-phyloxml+xml", "seqxml": "text/x-seqxml+xml", "png": "image/png", "jpg": "image/jpg", "svg": "image/svg", "svg+xml": "image/svg+xml", "gif": "image/gif", "jpeg": "image/jpg", "txt": "text/plain", "text": "text/plain", "xml": "application/xml", "yaml": "text/x-yaml", } # special_characters = ['/', '#', '+'] def __init__( self, name, url=None, verbose=True, cache=False, requests_per_sec=3, proxies=[], cert=None, url_defined_later=False, ): super(REST, self).__init__( name, url, verbose=verbose, requests_per_sec=requests_per_sec, url_defined_later=url_defined_later, ) self.proxies = proxies self.cert = cert bspath = self.settings.user_config_dir self.CACHE_NAME = bspath + os.sep + self.name + "_bioservices_db" self._session = None self.settings.params["cache.on"][0] = cache if self.CACHING: # import requests_cache self.logging.info("Using local cache %s" % self.CACHE_NAME) try: requests_cache.install_cache(self.CACHE_NAME) except Exception as err: self.logging.warning("Could not install cache ({}). Caching will be disabled.".format(err)) self.CACHING = False
[docs] def delete_cache(self): cache_file = self.CACHE_NAME + ".sqlite" if os.path.exists(cache_file): msg = "You are about to delete this bioservices cache: %s. Proceed? (y/[n]) " res = input(msg % cache_file) if res == "y": os.remove(cache_file) self.logging.info("Removed cache") else: self.logging.info("Reply 'y' to delete the file")
[docs] def clear_cache(self): from requests_cache import clear clear()
def _build_url(self, query): url = None if query is None: url = self.url else: if query.startswith("http"): # assume we do want to use self.url url = query else: url = "%s/%s" % (self.url, query) return url def _get_session(self): if self._session is None: if self.CACHING is True: self._session = self._create_cache_session() else: self._session = self._create_session() return self._session session = property(_get_session) def _create_session(self): """Creates a normal session using HTTPAdapter max retries is defined in the :attr:`MAX_RETRIES` """ self.logging.debug("Creating session (uncached version)") self._session = requests.Session() adapter = requests.adapters.HTTPAdapter(max_retries=self.settings.MAX_RETRIES) # , pool_block=True does not work with asynchronous requests self._session.mount("http://", adapter) self._session.mount("https://", adapter) return self._session def _create_cache_session(self): """Creates a cached session using requests_cache package""" self.logging.debug("Creating session (cache version)") if not self._session: # import requests_cache self.logging.debug("No cached session created yet. Creating one") try: self._session = requests_cache.CachedSession( self.CACHE_NAME, backend="sqlite", fast_save=self.settings.FAST_SAVE ) except Exception as err: self.logging.warning( "Could not create a cached session ({}). Falling back to a regular session.".format(err) ) self._session = self._create_session() return self._session def _get_timeout(self): return self.settings.TIMEOUT def _set_timeout(self, value): self.settings.TIMEOUT = value TIMEOUT = property(_get_timeout, _set_timeout) def _process_get_request(self, url, session, frmt, data=None, **kwargs): try: res = session.get(url, **kwargs) self.last_response = res res = self._interpret_returned_request(res, frmt) return res except Exception: return None def _interpret_returned_request(self, res, frmt): # must be a Response if isinstance(res, Response) is False: return res # if a response, there is a status code that should be ok if not res.ok: url = getattr(res, "url", "") self.logging.warning("HTTP %s %s%s", res.status_code, res.reason, f" ({url})" if url else "") return HTTPResponseError(res.status_code, reason=res.reason, url=url) if frmt == "json": try: return res.json() except Exception: return res # finally return res.content def _apply(self, iterable, fn, *args, **kwargs): return [fn(x, *args, **kwargs) for x in iterable if x is not None] def _get_async(self, keys, frmt="json", params={}): # does not work under pyhon3 so local import import grequests session = self._get_session() try: # build the requests urls = self._get_all_urls(keys, frmt) self.logging.debug("grequests.get processing") rs = (grequests.get(url, session=session, params=params) for key, url in zip(keys, urls)) # execute them self.logging.debug("grequests.map call") ret = grequests.map(rs, size=min(self.settings.CONCURRENT, len(keys))) self.last_response = ret self.logging.debug("grequests.map call done") return ret except Exception as err: self.logging.warning("Error caught in async. " + err.message) return [] def _get_all_urls(self, keys, frmt=None): return ("%s/%s" % (self.url, query) for query in keys)
[docs] def get_async(self, keys, frmt="json", params={}, **kargs): ret = self._get_async(keys, frmt, params=params, **kargs) return self._apply(ret, self._interpret_returned_request, frmt)
[docs] def get_sync(self, keys, frmt="json", **kargs): return [self.get_one(key, frmt=frmt, **kargs) for key in keys]
[docs] def http_get(self, query, frmt="json", params={}, **kargs): """ * query is the suffix that will be appended to the main url attribute. * query is either a string or a list of strings. * if list is larger than ASYNC_THRESHOLD, use asynchronous call. """ if isinstance(query, list) and len(query) > self.settings.ASYNC_THRESHOLD: self.logging.debug("Running async call for a list") return self.get_async(query, frmt, params=params, **kargs) if isinstance(query, list) and len(query) <= self.settings.ASYNC_THRESHOLD: self.logging.debug("Running sync call for a list") return [self.get_one(key, frmt, params=params, **kargs) for key in query] # return self.get_sync(query, frmt) # OTHERWISE self.logging.debug("Running http_get (single call mode)") # return self.get_one(**{'frmt': frmt, 'query': query, 'params':params}) # if user provide a content, let us use it, otherwise, it will be the # same as the frmt provided content = kargs.get("content", self.content_types[frmt]) # if user provide a header, we use it otherwise, we use the header from # bioservices and the content defined here above headers = kargs.get("headers") if headers is None: headers = {} headers["User-Agent"] = self.getUserAgent() if content is None: headers["Accept"] = self.content_types[frmt] else: headers["Accept"] = content kargs.update({"headers": headers}) return self.get_one(query, frmt=frmt, params=params, **kargs)
[docs] def get_one(self, query=None, frmt="json", params={}, **kargs): """ if query starts with http:// do not use self.url """ self._calls() url = self._build_url(query) if url.count("//") > 1: self.logging.warning("URL of the services contains a double //." + "Check your URL and remove trailing /") self.logging.debug(url) try: kargs["params"] = params kargs["timeout"] = self.TIMEOUT kargs["proxies"] = self.proxies kargs["cert"] = self.cert # Used only in biomart with cosmic database # See doc/source/biomart.rst for an example if hasattr(self, "authentication"): kargs["auth"] = self.authentication # res = self.session.get(url, **{'timeout':self.TIMEOUT, 'params':params}) res = self.session.get(url, **kargs) self.last_response = res res = self._interpret_returned_request(res, frmt) try: # for python 3 compatibility res = res.decode() except Exception: pass return res except Exception as err: self.logging.critical(err) self.logging.critical( """Query unsuccesful. Maybe too slow response. Consider increasing it with settings.TIMEOUT attribute {}""".format( self.settings.TIMEOUT ) )
[docs] def http_post(self, query, params=None, data=None, frmt="xml", headers=None, files=None, content=None, **kargs): # query and frmt are bioservices parameters. Others are post parameters # NOTE in requests.get you can use params parameter # BUT in post, you use data # only single post implemented for now unlike get that can be asynchronous # or list of queries # if user provide a header, we use it otherwise, we use the header from # bioservices and the content defined here above if headers is None: headers = {} headers["User-Agent"] = self.getUserAgent() if content is None: headers["Accept"] = self.content_types[frmt] else: headers["Accept"] = content self.logging.debug("Running http_post (single call mode)") kargs.update({"query": query}) kargs.update({"headers": headers}) kargs.update({"files": files}) kargs["proxies"] = self.proxies kargs["cert"] = self.cert kargs.update({"params": params}) kargs.update({"data": data}) kargs.update({"frmt": frmt}) return self.post_one(**kargs)
[docs] def post_one(self, query=None, frmt="json", **kargs): self._calls() self.logging.debug("BioServices:: Entering post_one function") if query is None: url = self.url else: url = "%s/%s" % (self.url, query) self.logging.debug(url) try: res = self.session.post(url, **kargs) self.last_response = res res = self._interpret_returned_request(res, frmt) try: return res.decode() except Exception: self.logging.debug("BioServices:: Could not decode the response") return res except Exception: traceback.print_exc() return None
[docs] def getUserAgent(self): # self.logging.info('getUserAgent: Begin') urllib_agent = "Python-requests/%s" % requests.__version__ # clientRevision = '' from bioservices import version clientVersion = version user_agent = "BioServices/%s (bioservices.%s; Python %s; %s) %s" % ( clientVersion, os.path.basename(__file__), platform.python_version(), platform.system(), urllib_agent, ) # self.logging.info('getUserAgent: user_agent: ' + user_agent) # self.logging.info('getUserAgent: End') return user_agent
[docs] def get_headers(self, content="default"): """ :param str content: set to default that is application/x-www-form-urlencoded so that it has the same behaviour as urllib2 (Sept 2014) """ headers = {} headers["User-Agent"] = self.getUserAgent() headers["Accept"] = self.content_types[content] headers["Content-Type"] = self.content_types[content] # "application/json;odata=verbose" required in reactome # headers['Content-Type'] = "application/json;odata=verbose" required in reactome return headers
[docs] def debug_message(self): print(self.last_response.content) print(self.last_response.reason) print(self.last_response.status_code)
[docs] def http_delete(self, query, params=None, frmt="xml", headers=None, **kargs): kargs.update({"query": query}) kargs.update({"params": params}) kargs.update({"frmt": frmt}) return self.delete_one(**kargs)
[docs] def delete_one(self, query, frmt="json", **kargs): self._calls() self.logging.debug("BioServices:: Entering delete_one function") if query is None: url = self.url else: url = "%s/%s" % (self.url, query) self.logging.debug(url) try: res = self.session.delete(url, **kargs) self.last_response = res res = self._interpret_returned_request(res, frmt) try: return res.decode() except Exception: self.debug("BioServices:: Could not decode the response") return res except Exception as err: print(err) return None except Exception: pass