Source code for procountor.client

import json
import re
import requests
from requests_toolbelt.multipart import decoder

    from urllib.parse import urlparse, parse_qs, urlencode
except ImportError:
    from urlparse import urlparse, parse_qs
    from urllib import urlencode

from .api_methods import ApiMethods

[docs]class Client(ApiMethods): """Class for Procountor accounting API Following packages need to be installed: - requests :param api_key: Procountor API Key, string :param client_id: Procountor REST API client id, string :param client_secret: Procountor REST API client secret, string :param redirect_uri: URI where redirected after authentication, string :param test_mode: Wether to use test api or real api, bool :param api_version: Cen be latest, supported or >= 20.01, string """ _endpoints = { "hosts": { "production": "", "test": "", }, "version": { "latest": "latest/api", "supported": "supported/api", "specified": "v{}/api", }, } def __init__( self, api_key, client_id, client_secret, redirect_uri, test_mode=True, api_version="supported", ): self.api_key = (api_key,) self.client_id = client_id self.client_secret = client_secret self.redirect_uri = redirect_uri self.test_mode = test_mode self.api_version = api_version self._get_token() @property def api_url(self): if self.api_version in ["latest", "supported"]: version = Client._endpoints["version"][self.api_version] elif re.match("^2[0-9]\.[0-9]{2}$", self.api_version): version_number = "{}{}".format(self.api_version[0:2], self.api_version[3:5]) version = Client._endpoints["version"]["specified"].format(version_number) else: raise ValueError( "Given value for api version {} is not valid. Valid values are latest, supported or Api version >= 20.01".format( self.api_version ) ) return "{}/{}/".format(self.api_host, version) @property def api_host(self): host = Client._endpoints["hosts"]["test" if self.test_mode else "production"] return host def _create_endpoint(self, endpoint, queries={}): """ :return: url, string """ return "{}{}".format(endpoint, self._dict_to_url_query(queries)) def _dict_to_url_query(self, url_dict): """ :return: url s """ if isinstance(url_dict, dict): if len(url_dict) > 0: return "?{}".format(urlencode(url_dict)) else: return "" else: raise Exception( "Given params are not dict. The type was {}".format(type(url_dict)) )
[docs] def invalidate_token(self): """Method invalidates the access token""" method = "POST" endpoint = "logout" headers = {"authorization": "Bearer " + self.access_token} url = "{}/{}".format(self.api_host, endpoint) return self.request(method, endpoint, headers, url)
def _get_token(self): """Makes a request and returns an access token. Access token is valid for 3600 seconds. :return: granted tokens, str """ params = { "grant_type": "client_credentials", "client_id": self.client_id, "client_secret": self.client_secret, "redirect_uri": self.redirect_uri, "api_key": self.api_key, } headers = {"content-type": "application/x-www-form-urlencoded"} url = self.api_url + "oauth/token/" response =, data=params, headers=headers) status_code = response.status_code if status_code != 200: if status_code == 404: raise RuntimeError( "Not found api endpoint. Please, check your API version." ) if status_code == 401: raise RuntimeError( "Authentication failed. Please, check your credentials." ) raise RuntimeError( "Authentication got an unexpected HTTP Status code: " + str(status_code) + ". Content: " + response.text ) json_content = response.json() access_token = json_content.get("access_token", None) if access_token is None: raise RuntimeError( "Cannot read the access_token from the response. Response was: " + json_content ) self.access_token = access_token return access_token def _handleResponse(self, response): """ TODO: This text :return: response as dict. Dict has a key named status for HTTP-status code. In error case there is string """ answer = {"status": response.status_code} if response.status_code in (200, 202): try: if response.headers["Content-Type"].startswith("multipart/"): meta, filebytes = decoder.MultipartDecoder.from_response( response ).parts answer["content"] = filebytes.content answer["metadata"] = json.loads(meta.content.decode("utf-8")) else: answer["content"] = response.json() except: answer["message"] = response.text else: answer["message"] = response.text return answer
[docs] def request( self, method, endpoint, headers=None, url=None, files=None, *args, **kwargs ): """Method to make HTTP requests over Procountor REST API :param method: wanted request method, uppercase string :param endpoint: wanted REST API endpoint, string :param headers: Overwrite HTTP-headers, dict :param kwargs: query parameters to pass to Procountor, dict :return: response from rest server, dict """ headers = headers or self._headers(method, endpoint) url = url or self.api_url + endpoint # Test environment (Microsoft-Azure-Application-Gateway) # doesn't like if there is a json body in (for ex.) GET method. json = None if len(kwargs) == 0 else kwargs response = requests.request( method, url, headers=headers, files=files, json=json ) # refresh token if out of date if response.status_code == 401: self.access_token = self._get_token() response = requests.request( method, url, headers=headers, files=files, json=json ) return self._handleResponse(response)
def _headers(self, method, endpoint): """Method returns correct headers for request :param method: request method, string :param endpoint: request endpoint, string :return: headers for request, dict """ if "attachments" in endpoint: if method == "GET": headers = { "content-type": "multipart/mixed", "authorization": "Bearer {}".format(self.access_token), } elif method == "POST": headers = { # can't put 'content-type': 'multipart/form-data' here as documentation says. Requests generates it # automatically. "authorization": "Bearer {}".format(self.access_token), } else: headers = { "content-type": "application/json", "authorization": "Bearer {}".format(self.access_token), } else: headers = { "content-type": "application/json", "authorization": "Bearer {}".format(self.access_token), } return headers