Source code for pydplus.api

# -*- coding: utf-8 -*-
"""
:Module:            pydplus.api
:Synopsis:          Defines the basic functions associated with the RSA ID Plus API
:Created By:        Jeff Shurtliff
:Last Modified:     Jeff Shurtliff
:Modified Date:     30 Mar 2026
"""

from __future__ import annotations

import logging
from typing import Optional, Union

import requests

from . import constants as const
from . import errors

logger = logging.getLogger(__name__)


[docs] def get( pydp_object, endpoint: str, params: Optional[dict] = None, headers: Optional[dict] = None, api_type: str = const.DEFAULT_API_TYPE, timeout: int = const.DEFAULT_API_TIMEOUT_SECONDS, show_full_error: bool = True, return_json: bool = True, allow_failed_response: Optional[bool] = None, ): """Perform a GET request against the ID Plus tenant. :param pydp_object: The instantiated pydplus object :type pydp_object: class[pydplus.PyDPlus] :param endpoint: The API endpoint to query :type endpoint: str :param params: The query parameters (where applicable) :type params: dict, None :param headers: Specific API headers to use when performing the API call (beyond the base headers) :type headers: dict, None :param api_type: Indicates if the ``admin`` (default) or ``auth`` API will be leveraged. :type api_type: str :param timeout: The timeout period in seconds (defaults to ``30``) :type timeout: int :param show_full_error: Determines if the full error message should be displayed (defaults to ``True``) :type show_full_error: bool :param return_json: Determines if the response should be returned in JSON format (defaults to ``True``) :type return_json: bool :param allow_failed_response: Indicates that failed responses should return and should not raise an exception (If not explicitly defined then ``True`` if Strict Mode is disabled) :type allow_failed_response: bool, None :returns: The API response in JSON format or as a ``requests`` object :raises: :py:exc:`errors.exceptions.APIRequestError`, :py:exc:`errors.exceptions.APIResponseConversionError`, :py:exc:`errors.exceptions.InvalidFieldError` """ # Define the parameters as an empty dictionary if none are provided params = {} if params is None else params # Define the headers additional_headers = {} if headers is None else dict(headers) request_headers = _get_headers( pydp_object, _additional_headers=additional_headers, _api_type=api_type, ) # Perform the API call full_api_url = _get_full_api_url(pydp_object, endpoint, api_type) response = requests.get(full_api_url, headers=request_headers, params=params, timeout=timeout, verify=pydp_object.verify_ssl) # Retry once after a forced OAuth token refresh when the token is rejected. if _should_retry_oauth_401(pydp_object, api_type, response): logger.debug('The OAuth token was rejected and will be refreshed before trying the API call again') request_headers = _get_headers( pydp_object, _additional_headers=additional_headers, _api_type=api_type, _force_oauth_refresh=True, ) response = requests.get( full_api_url, headers=request_headers, params=params, timeout=timeout, verify=pydp_object.verify_ssl ) # Examine the result allow_failed_response = _should_allow_failed_responses(pydp_object, allow_failed_response) if response.status_code >= 300 and not allow_failed_response: _raise_status_code_exception(response, const.API_REQUEST_TYPES.GET, show_full_error) if return_json: response = _convert_response_to_json(response, allow_failed_response) return response
[docs] def api_call_with_payload( pydp_object, method: str, endpoint: str, payload: Union[Optional[dict], Optional[str]] = None, params: Optional[dict] = None, headers: Optional[dict] = None, api_type: str = const.DEFAULT_API_TYPE, timeout: int = const.DEFAULT_API_TIMEOUT_SECONDS, show_full_error: bool = True, return_json: bool = True, allow_failed_response: Optional[bool] = None, ): """Perform an API call with payload against the ID Plus tenant. :param pydp_object: The instantiated pydplus object :type pydp_object: class[pydplus.PyDPlus] :param method: The API method (``post``, ``put``, or ``patch``) :type method: str :param endpoint: The API endpoint to query :type endpoint: str :param payload: The payload to leverage in the API call :type payload: dict, str, None :param params: The query parameters (where applicable) :type params: dict, None :param headers: Specific API headers to use when performing the API call (beyond the base headers) :type headers: dict, None :param api_type: Indicates if the ``admin`` (default) or ``auth`` API will be leveraged. :type api_type: str :param timeout: The timeout period in seconds (defaults to ``30``) :type timeout: int :param show_full_error: Determines if the full error message should be displayed (defaults to ``True``) :type show_full_error: bool :param return_json: Determines if the response should be returned in JSON format (defaults to ``True``) :type return_json: bool :param allow_failed_response: Indicates that failed responses should return and should not raise an exception (If not explicitly defined then ``True`` if Strict Mode is disabled) :type allow_failed_response: bool, None :returns: The API response in JSON format or as a ``requests`` object :raises: :py:exc:`TypeError`, :py:exc:`errors.exceptions.APIMethodError`, :py:exc:`errors.exceptions.APIRequestError`, :py:exc:`errors.exceptions.APIResponseConversionError`, :py:exc:`errors.exceptions.InvalidFieldError` """ def _raise_exception_for_payload(): """Raise a :py:exc:`TypeError` exception when the payload is an invalid data type.""" _error_msg = f'The API payload must be a dictionary or string (provided: {type(payload)})' logger.error(_error_msg) raise TypeError(_error_msg) # Define the parameters as an empty dictionary if none are provided params = {} if params is None else params # Define the headers additional_headers = {} if headers is None else dict(headers) request_headers = _get_headers( pydp_object, _additional_headers=additional_headers, _api_type=api_type, ) # Perform the API call full_api_url = _get_full_api_url(pydp_object, endpoint, api_type) response = _perform_api_call_with_payload( pydp_object=pydp_object, method=method, payload=payload, params=params, headers=request_headers, timeout=timeout, full_api_url=full_api_url, raise_payload_exception=_raise_exception_for_payload, ) # Retry once after a forced OAuth token refresh when the token is rejected. if response is not None and _should_retry_oauth_401(pydp_object, api_type, response): request_headers = _get_headers( pydp_object, _additional_headers=additional_headers, _api_type=api_type, _force_oauth_refresh=True, ) response = _perform_api_call_with_payload( pydp_object=pydp_object, method=method, payload=payload, params=params, headers=request_headers, timeout=timeout, full_api_url=full_api_url, raise_payload_exception=_raise_exception_for_payload, ) # Examine the result allow_failed_response = _should_allow_failed_responses(pydp_object, allow_failed_response) if response and response.status_code >= 300 and not allow_failed_response: _raise_status_code_exception(response, method, show_full_error) if response and return_json: response = _convert_response_to_json(response, allow_failed_response) return response
[docs] def post( pydp_object, endpoint: str, payload: Union[Optional[dict], Optional[str]] = None, params: Optional[dict] = None, headers: Optional[dict] = None, api_type: str = const.DEFAULT_API_TYPE, timeout: int = const.DEFAULT_API_TIMEOUT_SECONDS, show_full_error: bool = True, return_json: bool = True, allow_failed_response: Optional[bool] = None, ): """Perform a POST call with payload against the ID Plus tenant. :param pydp_object: The instantiated pydplus object :type pydp_object: class[pydplus.PyDPlus] :param endpoint: The API endpoint to query :type endpoint: str :param payload: The payload to leverage in the API call :type payload: dict, str, None :param params: The query parameters (where applicable) :type params: dict, None :param headers: Specific API headers to use when performing the API call (beyond the base headers) :type headers: dict, None :param api_type: Indicates if the ``admin`` (default) or ``auth`` API will be leveraged. :type api_type: str :param timeout: The timeout period in seconds (defaults to ``30``) :type timeout: int :param show_full_error: Determines if the full error message should be displayed (defaults to ``True``) :type show_full_error: bool :param return_json: Determines if the response should be returned in JSON format (defaults to ``True``) :type return_json: bool :param allow_failed_response: Indicates that failed responses should return and should not raise an exception (If not explicitly defined then ``True`` if Strict Mode is disabled) :type allow_failed_response: bool, None :returns: The API response in JSON format or as a ``requests`` object :raises: :py:exc:`errors.exceptions.APIMethodError`, :py:exc:`errors.exceptions.APIRequestError`, :py:exc:`errors.exceptions.APIResponseConversionError`, :py:exc:`errors.exceptions.InvalidFieldError` """ return api_call_with_payload( pydp_object=pydp_object, method=const.API_REQUEST_TYPES.POST, endpoint=endpoint, payload=payload, params=params, headers=headers, api_type=api_type, timeout=timeout, show_full_error=show_full_error, return_json=return_json, allow_failed_response=allow_failed_response, )
[docs] def patch( pydp_object, endpoint: str, payload: Union[Optional[dict], Optional[str]] = None, params: Optional[dict] = None, headers: Optional[dict] = None, api_type: str = const.DEFAULT_API_TYPE, timeout: int = const.DEFAULT_API_TIMEOUT_SECONDS, show_full_error: bool = True, return_json: bool = True, allow_failed_response: Optional[bool] = None, ): """Perform a PATCH call with payload against the ID Plus tenant. :param pydp_object: The instantiated pydplus object :type pydp_object: class[pydplus.PyDPlus] :param endpoint: The API endpoint to query :type endpoint: str :param payload: The payload to leverage in the API call :type payload: dict, str, None :param params: The query parameters (where applicable) :type params: dict, None :param headers: Specific API headers to use when performing the API call (beyond the base headers) :type headers: dict, None :param api_type: Indicates if the ``admin`` (default) or ``auth`` API will be leveraged. :type api_type: str :param timeout: The timeout period in seconds (defaults to ``30``) :type timeout: int :param show_full_error: Determines if the full error message should be displayed (defaults to ``True``) :type show_full_error: bool :param return_json: Determines if the response should be returned in JSON format (defaults to ``True``) :type return_json: bool :param allow_failed_response: Indicates that failed responses should return and should not raise an exception (If not explicitly defined then ``True`` if Strict Mode is disabled) :type allow_failed_response: bool, None :returns: The API response in JSON format or as a ``requests`` object :raises: :py:exc:`errors.exceptions.APIMethodError`, :py:exc:`errors.exceptions.APIRequestError`, :py:exc:`errors.exceptions.APIResponseConversionError`, :py:exc:`errors.exceptions.InvalidFieldError` """ return api_call_with_payload( pydp_object=pydp_object, method=const.API_REQUEST_TYPES.PATCH, endpoint=endpoint, payload=payload, params=params, headers=headers, api_type=api_type, timeout=timeout, show_full_error=show_full_error, return_json=return_json, allow_failed_response=allow_failed_response, )
[docs] def put( pydp_object, endpoint: str, payload: Union[Optional[dict], Optional[str]] = None, params: Optional[dict] = None, headers: Optional[dict] = None, api_type: str = const.DEFAULT_API_TYPE, timeout: int = const.DEFAULT_API_TIMEOUT_SECONDS, show_full_error: bool = True, return_json: bool = True, allow_failed_response: Optional[bool] = None, ): """Perform a PUT call with payload against the ID Plus tenant. :param pydp_object: The instantiated pydplus object :type pydp_object: class[pydplus.PyDPlus] :param endpoint: The API endpoint to query :type endpoint: str :param payload: The payload to leverage in the API call :type payload: dict, str, None :param params: The query parameters (where applicable) :type params: dict, None :param headers: Specific API headers to use when performing the API call (beyond the base headers) :type headers: dict, None :param api_type: Indicates if the ``admin`` (default) or ``auth`` API will be leveraged. :type api_type: str :param timeout: The timeout period in seconds (defaults to ``30``) :type timeout: int :param show_full_error: Determines if the full error message should be displayed (defaults to ``True``) :type show_full_error: bool :param return_json: Determines if the response should be returned in JSON format (defaults to ``True``) :type return_json: bool :param allow_failed_response: Indicates that failed responses should return and should not raise an exception (If not explicitly defined then ``True`` if Strict Mode is disabled) :type allow_failed_response: bool, None :returns: The API response in JSON format or as a ``requests`` object :raises: :py:exc:`errors.exceptions.APIMethodError`, :py:exc:`errors.exceptions.APIRequestError`, :py:exc:`errors.exceptions.APIResponseConversionError`, :py:exc:`errors.exceptions.InvalidFieldError` """ return api_call_with_payload( pydp_object=pydp_object, method=const.API_REQUEST_TYPES.PUT, endpoint=endpoint, payload=payload, params=params, headers=headers, api_type=api_type, timeout=timeout, show_full_error=show_full_error, return_json=return_json, allow_failed_response=allow_failed_response, )
def _should_allow_failed_responses(_pydp_object, _allow_failed_response: Optional[bool]) -> bool: """Determine if failed responses are allowed based on the defined value or strict mode setting.""" # Only define the value if not already defined if not isinstance(_allow_failed_response, bool) or _allow_failed_response is None: try: # Define the value based on the strict mode define in the instantiated object _allow_failed_response = False if _pydp_object.strict_mode is True else True except Exception as _exc: # Use the default strict mode value to define the value if an exception is raised _allow_failed_response = False if const.DEFAULT_STRICT_MODE is True else True _exc_type = errors.handlers.get_exception_type(_exc) _error_msg = f'Using default strict mode due to the following {_exc_type} exception: {_exc}' logger.error(_error_msg) return _allow_failed_response def _is_admin_oauth_request(_pydp_object, _api_type: str) -> bool: """Return whether the request targets Admin API over an OAuth connection.""" if not isinstance(_api_type, str): return False return ( _api_type.lower() == const.ADMIN_API_TYPE and getattr(_pydp_object, const.CLIENT_SETTINGS.CONNECTION_TYPE, None) == const.CONNECTION_INFO.OAUTH ) def _should_retry_oauth_401(_pydp_object, _api_type: str, _response) -> bool: """Return whether a failed response is eligible for OAuth token refresh retry.""" return ( _is_admin_oauth_request(_pydp_object, _api_type) and _response is not None and getattr(_response, const.RESPONSE_KEYS.STATUS_CODE, None) == 401 ) def _get_headers( _pydp_object, _additional_headers: Optional[dict] = None, _api_type: str = const.DEFAULT_API_TYPE, _header_type: str = const.DEFAULT_HEADER_TYPE, _force_oauth_refresh: bool = const.AUTH_VALUES.OAUTH_DEFAULT_FORCE_REFRESH, ) -> dict: """Return the appropriate HTTP headers to use for different types of API calls.""" _additional_headers = {} if _additional_headers is None else _additional_headers _headers = dict(_pydp_object.base_headers) if isinstance(_pydp_object.base_headers, dict) else {} if _is_admin_oauth_request(_pydp_object, _api_type): if _force_oauth_refresh: _headers = _pydp_object.refresh_oauth_token() else: _headers = _pydp_object._ensure_oauth_headers() # TODO: Define additional headers as needed based on header type _headers.update(_additional_headers) return _headers def _perform_api_call_with_payload( pydp_object, method: str, payload: Union[Optional[dict], Optional[str]] = None, params: Optional[dict] = None, headers: Optional[dict] = None, timeout: int = const.DEFAULT_API_TIMEOUT_SECONDS, full_api_url: Optional[str] = None, raise_payload_exception=None, ): """Perform API requests that include payload data and return the response object.""" if not full_api_url: error_msg = 'A full API URL must be defined before calling _perform_api_call_with_payload()' logger.error(error_msg) raise errors.exceptions.APIMethodError(error_msg) if isinstance(method, str) and method.upper() == const.API_REQUEST_TYPES.POST: if isinstance(payload, dict): return requests.post( full_api_url, json=payload, headers=headers, params=params, timeout=timeout, verify=pydp_object.verify_ssl ) if isinstance(payload, str): return requests.post( full_api_url, data=payload, headers=headers, params=params, timeout=timeout, verify=pydp_object.verify_ssl ) if callable(raise_payload_exception): raise_payload_exception() elif isinstance(method, str) and method.upper() == const.API_REQUEST_TYPES.PATCH: if isinstance(payload, dict): return requests.patch( full_api_url, json=payload, headers=headers, params=params, timeout=timeout, verify=pydp_object.verify_ssl ) if isinstance(payload, str): return requests.patch( full_api_url, data=payload, headers=headers, params=params, timeout=timeout, verify=pydp_object.verify_ssl ) if callable(raise_payload_exception): raise_payload_exception() elif isinstance(method, str) and method.upper() == const.API_REQUEST_TYPES.PUT: if isinstance(payload, dict): return requests.put( full_api_url, json=payload, headers=headers, params=params, timeout=timeout, verify=pydp_object.verify_ssl ) if isinstance(payload, str): return requests.put( full_api_url, data=payload, headers=headers, params=params, timeout=timeout, verify=pydp_object.verify_ssl ) if callable(raise_payload_exception): raise_payload_exception() elif isinstance(method, str) and method.upper() == const.API_REQUEST_TYPES.GET: error_msg = 'The GET API call method is not valid when a payload has been provided.' logger.error(error_msg) raise errors.exceptions.APIMethodError(error_msg) else: error_msg = 'A valid API call method (POST or PATCH or PUT) must be defined.' logger.error(error_msg) raise errors.exceptions.APIMethodError(error_msg) return None def _get_full_api_url(_pydp_object, _endpoint: str, _api_type: str = const.DEFAULT_API_TYPE) -> str: """Construct the full API URL to use in an API call based on the API type. :param _pydp_object: The instantiated pydplus object :type _pydp_object: class[pydplus.PyDPlus] :param _endpoint: The API endpoint to be called :type _endpoint: str :param _api_type: Indicates which API to leverage: ``admin`` (default) or ``auth`` :type _api_type: str :returns: The full API URL path including the base URL as a string :raises: :py:exc:`pydplus.errors.exceptions.InvalidFieldError` """ # Define the base URL to leverage based on the API type or raise an exception if API type is invalid if _api_type.lower() == const.ADMIN_API_TYPE: _base_url = _pydp_object.admin_base_rest_url elif _api_type.lower() == const.AUTH_API_TYPE: _base_url = _pydp_object.auth_base_rest_url else: if not isinstance(_api_type, str): _error_msg = f'The API Type value must be a string. (provided: {type(_api_type)})' else: _error_msg = f"The value '{_api_type}' is not a valid API type. " _error_msg += f"(expected: '{const.ADMIN_API_TYPE}' or '{const.AUTH_API_TYPE}')" logger.error(_error_msg) raise errors.exceptions.InvalidFieldError(_error_msg) # Make sure the endpoint begins with a slash _endpoint = f'/{_endpoint}' if not _endpoint.startswith('/') else _endpoint # Return the crafted full API URL return f'{_base_url}{_endpoint}' def _raise_status_code_exception(_response, _method: str, _show_full_error: bool = True) -> None: """Raise an exception when a non-OK status code is returned for an API call. :param _response: The API response :param _method: The API request type (``GET``, ``POST``, ``PATCH``, ``PUT``, or ``DELETE``) :type _method: str :param _show_full_error: Determine if the full error message should be reported (``True`` by default) :type _show_full_error: bool :returns: None :raises: :py:exc:`pydplus.errors.exceptions.APIRequestError` """ _exc_msg = f'The {_method.upper()} request failed with a {_response.status_code} status code.' if _show_full_error: _exc_msg += f'\n{_response.text}' logger.error(_exc_msg) raise errors.exceptions.APIRequestError(_exc_msg) def _convert_response_to_json(_response, _allow_failed_response: bool = False): """Attempt to convert an API response to JSON format and raises an exception if unsuccessful. :param _response: The API response :param _allow_failed_response: Determines if failed responses are accepted (``False`` by default) or if an exception should be raised if the conversion fails :type _allow_failed_response: bool :returns: The API response converted to a JSON dictionary (or returned unchanged if the conversion failed and no exception was raised) :raises: :py:exc:`pydplus.errors.exceptions.APIResponseConversionError` """ try: _response = _response.json() except Exception as _exc: _exc_type = errors.handlers.get_exception_type(_exc) _error_msg = f'Failed to convert the API response to JSON format due to the following {_exc_type} exception: {_exc}' logger.error(_error_msg) if not _allow_failed_response: raise errors.exceptions.APIResponseConversionError(_error_msg) return _response