# -*- coding: utf-8 -*-
"""
:Module: pydplus.core
:Synopsis: This module performs the core operations of the package
:Usage: ``from pydplus import PyDPlus``
:Example: ``pydp = PyDPlus()``
:Created By: Jeff Shurtliff
:Last Modified: Jeff Shurtliff (via GPT-5.3-codex)
:Modified Date: 01 Apr 2026
"""
from __future__ import annotations
import logging
import os
import urllib.parse
from collections.abc import Iterable, Mapping
from pathlib import Path
from typing import Any, Optional, Tuple, Union
from . import api, auth, errors
from . import constants as const
from . import users as users_module
from .credentials import IDPlusLegacyKeyMaterial
from .utils import core_utils
from .utils.helper import get_helper_settings
logger = logging.getLogger(__name__)
[docs]
class PyDPlus:
"""Class for the core client object.
:param connection_info: Dictionary that defines the connection info to use
:type connection_info: dict, None
:param connection_type: Determines whether to leverage a(n) ``oauth`` (default) or ``legacy`` connection
:type connection_type: str, None
:param base_url: The base URL to leverage when performing Administration API calls
.. note::
This parameter is for backwards compatibility only and will eventually be fully deprecated.
The ``base_admin_url`` should be leveraged instead as a best practice.
:type base_url: str, None
:param base_admin_url: The base URL to leverage when performing Administration API calls
:type base_admin_url: str, None
:param base_auth_url: The base URL to leverage when performing Authentication API calls
:type base_auth_url: str, None
:param tenant_name: Specify the tenant name for the RSA ID Plus tenant (e.g. ``example-corporation``)
.. note::
Specifying the tenant name will allow the base URLs to be defined if not already
defined via argument, helper setting, or environment variable.
:type tenant_name: str, None
:param env: Optionally specify the environment as ``PROD``, ``DEV``, or a custom name. (e.g. ``STAGING``)
.. note::
This parameter will impact which environment variables are referenced when the client object
is instantiated. (e.g. ``PYDPLUS_PROD_BASE_URL`` rather than ``PYDPLUS_BASE_URL``)
:type env: str, None
:param private_key: The file path to the private key used for legacy API authentication
:type private_key: str, None
:param legacy_access_id: The Access ID associated with the Legacy API connection
:type legacy_access_id: str, None
:param legacy_key_material: Legacy key material as a ``.key`` file path or parsed object
:type legacy_key_material: str, pathlib.Path, pydplus.credentials.IDPlusLegacyKeyMaterial, None
:param oauth_client_id: The Client ID associated with the OAuth API connection
:type oauth_client_id: str, None
:param oauth_issuer_url: The explicit OAuth issuer URL to use for token requests (e.g. ``https://<tenant>.auth.securid.com/oauth``)
:type oauth_issuer_url: str, None
:param oauth_private_key: The file path to the OAuth private-key JWK file used for Private Key JWT authentication
:type oauth_private_key: str, None
:param oauth_private_key_jwk: The OAuth private-key JWK payload used for Private Key JWT authentication
:type oauth_private_key_jwk: dict, str, None
:param oauth_scope: One or more OAuth scopes to request in token requests
(``+``-delimited string or iterable of scope strings)
:type oauth_scope: str, tuple, list, set, frozenset, None
:param oauth_scope_preset: One or more scope preset names to merge with explicit OAuth scopes
.. note::
Presets can also be provided through helper settings or environment variables.
(e.g. ``all``, ``user_read_only``, etc.)
:type oauth_scope_preset: str, tuple, list, set, frozenset, None
:param oauth_api_type: Defines which API base URL should be used when inferring the OAuth issuer URL
(``auth`` by default; ``admin`` supported when configured)
:type oauth_api_type: str, None
:param verify_ssl: Determines if SSL connections should be verified (``True`` by default)
:type verify_ssl: bool, None
:param auto_connect: Determines if an API connection should be established when the object is instantiated
(``True`` by default)
:type auto_connect: bool
:param strict_mode: Determines if failed API responses should result in an exception being raised
(``False`` by default)
:type strict_mode: bool, None
:param env_variables: Optionally define custom environment variable names to use instead of the default names
:type env_variables: dict, None
:param helper: Optionally provide the file path for a helper file used to define the object configuration
:type helper: str, tuple, list, set, dict, None
:returns: The instantiated PyDPlus object
:raises: :py:exc:`TypeError`,
:py:exc:`ValueError`,
:py:exc:`pydplus.errors.exceptions.MissingRequiredDataError`,
:py:exc:`pydplus.errors.exceptions.APIConnectionError`
"""
def __init__(
self,
connection_info: Optional[dict] = None,
connection_type: Optional[str] = None,
tenant_name: Optional[str] = None,
base_url: Optional[str] = None,
base_admin_url: Optional[str] = None,
base_auth_url: Optional[str] = None,
env: Optional[str] = None,
private_key: Optional[str] = None,
legacy_access_id: Optional[str] = None,
legacy_key_material: Union[Optional[str], Optional[Path], Optional[IDPlusLegacyKeyMaterial]] = None,
oauth_client_id: Optional[str] = None,
oauth_private_key: Optional[str] = None,
oauth_private_key_jwk: Union[Optional[dict], Optional[str]] = None,
oauth_scope: Union[Optional[str], Optional[tuple], Optional[list], Optional[set], Optional[frozenset]] = None,
oauth_scope_preset: Union[Optional[str], Optional[tuple], Optional[list], Optional[set], Optional[frozenset]] = None,
verify_ssl: Optional[bool] = None,
auto_connect: bool = const.CLIENT_SETTINGS.DEFAULT_AUTO_CONNECT_VALUE,
strict_mode: Optional[bool] = None,
env_variables: Optional[dict] = None,
helper: Union[Optional[str], Optional[tuple], Optional[list], Optional[set], Optional[dict]] = None,
oauth_api_type: Optional[str] = None,
oauth_issuer_url: Optional[str] = None,
):
"""Instantiate the core client object."""
# Define the initial properties and settings
self._helper_settings = {}
self._env_variables = {}
self.base_headers = {}
self.auto_connect = auto_connect
self.connected = False
self.connection_type = None
self.env = None
self._oauth_token_data = None
self.oauth_api_type = const.AUTH_API_TYPE
self.strict_mode = strict_mode
self.tenant_name = tenant_name
# Check for a supplied helper file and extract the configuration settings if found
self._get_helper_settings(helper)
# Define the environment if explicitly defined as an argument, helper setting, or environment variable
self._get_env_name(env)
# Define the environment variable names to retrieve when defined
self._define_env_variable_names(env_variables)
# Check for any defined environment variables using the environment variable names defined above
self._get_env_variables()
# Define the strict_mode setting using a passed argument, helper setting, or environment variable
self._define_strict_mode(strict_mode) # Defines self.strict_mode
# Define the verify_ssl value either from a user-defined setting or using the default value
self._get_verify_ssl_setting(verify_ssl) # Defines self.verify_ssl
# Define the legacy key material when applicable
self.legacy_key_material = self._parse_legacy_key_material(legacy_key_material, connection_info)
# Use parsed key material as a base URL fallback when no explicit values were provided
if self.legacy_key_material:
if not base_url:
base_url = self.legacy_key_material.admin_rest_api_url # Base URL will be parsed below
if not base_admin_url:
base_admin_url = self.legacy_key_material.admin_rest_api_url # Base Admin URL will be parsed below
# Define the base_url value or raise an exception if it cannot be defined
self._define_base_url(base_url) # Defines self.base_url
# Define the admin_base_url (required) and auth_base_url (optional) values
self._define_base_urls(base_admin_url, base_auth_url) # Defines self.admin_base_url, self.auth_base_url
# Define the Administration API base REST URL to use in API calls
self.admin_base_rest_url = self.admin_base_url + const.REST_PATHS.ADMIN_BASE
# Define the Authentication API base URL to use in API calls
self.auth_base_rest_url = self.auth_base_url + const.REST_PATHS.AUTH_BASE if self.auth_base_url else None
# Define which API type should be used when inferring OAuth issuer URL values
self._define_oauth_api_type(oauth_api_type) # Defines self.oauth_api_type
# Check for provided connection info and define the class object attribute
self._validate_connection_info(
connection_info,
private_key,
legacy_access_id,
oauth_client_id,
oauth_issuer_url,
oauth_private_key,
oauth_private_key_jwk,
oauth_scope,
oauth_scope_preset,
self.legacy_key_material,
)
# Define the connection type that should be used to authenticate
self._get_connection_type(connection_type) # Defines self.connection_type
# Connect to the tenant (if auto-connect is enabled) and retrieve the base API headers
if self.auto_connect:
self.connected, self.base_headers = self.connect()
# TODO: Figure out how to connect after instantiation and update self.connected and self.base_headers
# Import inner object classes so their methods can be called from the primary object
self.users: PyDPlus.User = self._import_user_class()
def _import_user_class(self):
"""Allow the :py:class:`pydplus.core.PyDPlus.User` class to be utilized within the core object."""
return PyDPlus.User(self)
def _get_helper_settings(self, _helper):
"""Retrieve the settings from a helper configuration file if passed as an argument."""
if _helper:
# Parse the helper file contents
if any((isinstance(_helper, tuple), isinstance(_helper, list), isinstance(_helper, set))):
_helper_file_path, _helper_file_type = _helper
elif isinstance(_helper, str):
_helper_file_path, _helper_file_type = (_helper, const.HELPER_SETTINGS.DEFAULT_HELPER_FILE_TYPE)
elif isinstance(_helper, dict):
_helper_file_path, _helper_file_type = _helper.values()
else:
_error_msg = "The 'helper' argument can only be supplied as string, tuple, list, set or dict"
logger.error(_error_msg)
raise TypeError(_error_msg)
self.helper_path = _helper_file_path
self._helper_settings = get_helper_settings(_helper_file_path, _helper_file_type)
logger.info('The helper configuration settings have been loaded successfully')
else:
logger.debug('No helper configuration settings were found and therefore none have been configured')
self._helper_settings = {}
def _define_legacy_key_material_path(self, _connection_info: Optional[dict] = None) -> str:
"""Defines the path to the legacy API key material file if the file name and/or path has been configured."""
# Initially define variables
_key_material_path = ''
_legacy_key = const.CONNECTION_INFO.LEGACY
_material_file_key = const.CONNECTION_INFO.LEGACY_KEY_MATERIAL_FILE
_material_path_key = const.CONNECTION_INFO.LEGACY_KEY_MATERIAL_PATH
_helper_conn_key = const.HELPER_SETTINGS.CONNECTION
_env_material_file_key = const.ENV_VARIABLES.LEGACY_KEY_MATERIAL_FILE_FIELD
_env_material_path_key = const.ENV_VARIABLES.LEGACY_KEY_MATERIAL_PATH_FIELD
# Attempt to define the full path using the connection_info dictionary if defined
if (
_connection_info
and isinstance(_connection_info.get(_legacy_key), dict)
and isinstance(_connection_info[_legacy_key].get(_material_file_key), str)
and _connection_info[_legacy_key][_material_file_key]
):
_key_material_path = _connection_info[_legacy_key][_material_file_key]
# Check if a path is also defined as part of the connection_info dictionary
if (
isinstance(_connection_info[_legacy_key].get(_material_path_key), str)
and _connection_info[_legacy_key][_material_path_key]
):
_path_to_material_file = core_utils.ensure_ending_slash(_connection_info[_legacy_key][_material_path_key])
_key_material_path = _path_to_material_file + _key_material_path
logger.debug(f"Defined '{_key_material_path}' as the path to the key material file via connection_info")
# Attempt to define the full path using the helper settings if defined
elif (
self._helper_settings
and _helper_conn_key in self._helper_settings
and isinstance(self._helper_settings[_helper_conn_key].get(_legacy_key), dict)
and isinstance(self._helper_settings[_helper_conn_key][_legacy_key].get(_material_file_key), str)
and self._helper_settings[_helper_conn_key][_legacy_key][_material_file_key]
):
_key_material_path = self._helper_settings[_helper_conn_key][_legacy_key][_material_file_key]
# Check if a path is also defined as part of the helper settings
if (
isinstance(self._helper_settings[_helper_conn_key][_legacy_key].get(_material_path_key), str)
and self._helper_settings[_helper_conn_key][_legacy_key][_material_path_key]
):
_path_to_material_file = core_utils.ensure_ending_slash(
self._helper_settings[_helper_conn_key][_legacy_key][_material_path_key]
)
_key_material_path = _path_to_material_file + _key_material_path
logger.debug(f"Defined '{_key_material_path}' as the path to the key material file via helper settings")
# Attempt to define the full path using environment variables if defined
elif (
self._env_variables
and isinstance(self._env_variables.get(_env_material_file_key), str)
and self._env_variables[_env_material_file_key]
):
_key_material_path = self._env_variables[_env_material_file_key]
# Check if a path is also defined as an environment variable
if isinstance(self._env_variables.get(_env_material_path_key), str) and self._env_variables[_env_material_path_key]:
_path_to_material_file = core_utils.ensure_ending_slash(self._env_variables[_env_material_path_key])
_key_material_path = _path_to_material_file + _key_material_path
logger.debug(f"Defined '{_key_material_path}' as the path to the key material file via environment variables")
# Returned the defined (or empty) path to the key material file
if not _key_material_path:
logger.debug('The key material file path could not be defined and will not be used for authentication')
return _key_material_path
def _parse_legacy_key_material(
self,
_legacy_key_material: Union[Optional[str], Optional[Path], Optional[IDPlusLegacyKeyMaterial]] = None,
_connection_info: Optional[dict] = None,
) -> Optional[IDPlusLegacyKeyMaterial]:
"""Parse and validate the legacy key material if provided."""
# Attempt to define the legacy key material file path if not defined via argument
if _legacy_key_material is None:
_legacy_key_material = self._define_legacy_key_material_path(_connection_info)
if not _legacy_key_material:
return None
if isinstance(_legacy_key_material, IDPlusLegacyKeyMaterial):
_legacy_key_material.validate()
return _legacy_key_material
if isinstance(_legacy_key_material, (str, Path)):
return IDPlusLegacyKeyMaterial.from_file(_legacy_key_material)
_error_msg = (
"The 'legacy_key_material' parameter must be a string path, pathlib.Path value, or IDPlusLegacyKeyMaterial object"
)
logger.error(_error_msg)
raise TypeError(_error_msg)
def _get_env_name(self, _env: Optional[str] = None) -> None:
"""Identify the environment name if defined with an argument or environment variable."""
setting = 'environment name'
methods = const.ARGUMENT_VALUES.PROVIDED_METHODS # arg, helper, or env
debug_msg = const._LOG_MESSAGES._CLIENT_SETTING_CONFIGURED
# Attempt to define the environment name using a passed argument
if _env:
if not isinstance(_env, str):
_error_msg = f"The 'env' argument is an invalid data type (Expected: str, Provided: {type(_env)})"
logger.error(_error_msg)
raise TypeError(_error_msg)
self.env = _env.upper()
logger.debug(debug_msg.format(setting=setting, value=self.env, method=methods[0]))
# Attempt to define the environment name using helper settings if configured
if (
not self.env
and self._helper_settings
and isinstance(self._helper_settings.get(const.HELPER_SETTINGS.ENV_NAME), str)
and self._helper_settings[const.HELPER_SETTINGS.ENV_NAME]
):
self.env = self._helper_settings[const.HELPER_SETTINGS.ENV_NAME].upper()
logger.debug(debug_msg.format(setting=setting, value=self.env, method=methods[1]))
# Define the environment name (or lack thereof) using the environment variable
if not self.env:
_env = os.getenv(const.ENV_VARIABLES.ENV_NAME) # Returns None if not found
self.env = _env.upper() if _env else None
if self.env:
logger.debug(debug_msg.format(setting=setting, value=self.env, method=methods[2]))
else:
logger.debug('The environment name could not be defined as it was not specified anywhere')
def _define_env_variable_names(self, _env_variables_from_arg: Optional[dict]) -> None:
"""Define the environment variable names to use based on an explicit argument or helper settings."""
# Check for custom environment variable names passed as an argument (or environment-specific)
if _env_variables_from_arg:
if not isinstance(_env_variables_from_arg, dict):
logger.error("The 'env_variables' parameter must be a dictionary and will be ignored")
else:
self._get_env_variable_names(_env_variables_from_arg)
# Check for custom environment variable names provided within the helper settings (or environment-specific)
elif const.HELPER_SETTINGS.ENV_VARIABLES in self._helper_settings:
self._get_env_variable_names(self._helper_settings.get(const.HELPER_SETTINGS.ENV_VARIABLES, {}))
# Check for environment-specific variable names or use the default
else:
self._get_env_variable_names()
def _get_env_variable_names(self, _custom_dict: Optional[Mapping[str, str]] = None) -> None:
"""Return the environment variable names to use when checking the OS for environment variables."""
# Define the dictionary with the default environment variable names
_env_variable_names = dict(const.HELPER_SETTINGS.ENV_VARIABLE_DEFAULT_MAPPING)
# Update the environment variables to be specific to an environment is one has been defined
if self.env:
_env_specific_names = {}
for _name_key, _name_value in _env_variable_names.items():
try:
_env_specific_value = core_utils.get_env_variable_name_by_environment(_name_key, self.env)
_env_specific_names[_name_key] = _env_specific_value
except Exception as _exc:
_exc_type = core_utils.get_exception_type(_exc)
_error_msg = (
f"Failed to retrieve the '{_name_key}' environment variable name specific to the "
f'{self.env} environment due to {_exc_type} exception: {_exc}'
)
logger.exception(_error_msg)
logger.warning(f"Defaulting to the environment variable {_name_value} for '{_name_key}'")
_env_specific_names[_name_key] = _name_value
_env_variable_names.update(_env_specific_names)
# Update the dictionary to use any defined custom names instead of the default (or env-specific) names
if _custom_dict and not isinstance(_custom_dict, Mapping):
_error_msg = 'Unable to parse custom environment variable names because variable is not a dictionary'
logger.error(_error_msg)
raise TypeError(_error_msg)
if _custom_dict:
for _name_key, _name_value in _custom_dict.items():
if _name_key in _env_variable_names:
_env_variable_names[_name_key] = _name_value
else:
_warn_msg = f"'{_name_key}' is not a recognized environment variable identifier and will be ignored"
logger.warning(_warn_msg)
# Return the finalized dictionary with the mapped environment variable names
self._env_variable_names = _env_variable_names
def _get_env_variables(self):
"""Retrieve any defined environment variables to use with the instantiated core object."""
_env_variables = {}
for _config_name, _var_name in self._env_variable_names.items():
_var_value = os.getenv(_var_name) # Returns None if not found
_env_variables.update({_config_name: _var_value})
self._env_variables = _env_variables
def _define_strict_mode(self, _strict_mode_from_arg: Optional[bool]) -> None:
"""Define the strict_mode setting using a passed argument, helper setting, or environment variable."""
setting = const.CLIENT_SETTINGS.STRICT_MODE
methods = const.ARGUMENT_VALUES.PROVIDED_METHODS # arg, helper, or env
debug_msg = const._LOG_MESSAGES._CLIENT_SETTING_CONFIGURED
default_debug_msg = const._LOG_MESSAGES._WILL_USE_DEFAULT_VALUE
# Check if the strict_mode value was passed as an argument
if _strict_mode_from_arg is not None:
if not isinstance(_strict_mode_from_arg, bool):
_error_msg = const._LOG_MESSAGES._MUST_BE_DATA_TYPE_ERROR.format(param=setting, data_type='bool')
logger.error(_error_msg)
raise TypeError(_error_msg)
self.strict_mode = _strict_mode_from_arg
logger.debug(debug_msg.format(setting=setting, value=self.strict_mode, method=methods[0]))
# Check the helper settings to see if strict mode was defined
elif (
self._helper_settings
and const.HELPER_SETTINGS.STRICT_MODE in self._helper_settings
and isinstance(self._helper_settings[const.HELPER_SETTINGS.STRICT_MODE], bool)
and self._helper_settings[const.HELPER_SETTINGS.STRICT_MODE] is not None
):
self.strict_mode = self._helper_settings.get(const.HELPER_SETTINGS.STRICT_MODE)
logger.debug(debug_msg.format(setting=setting, value=self.strict_mode, method=methods[1]))
# Check the environment variables to see if strict mode was defined
elif (
const.ENV_VARIABLES.STRICT_MODE_FIELD in self._env_variables
and isinstance(self._env_variables[const.ENV_VARIABLES.STRICT_MODE_FIELD], bool)
and self._env_variables[const.ENV_VARIABLES.STRICT_MODE_FIELD] is not None
):
self.strict_mode = self._env_variables.get(const.ENV_VARIABLES.STRICT_MODE_FIELD)
logger.debug(debug_msg.format(setting=setting, value=self.strict_mode, method=methods[2]))
# Use the default value (True) if not strict mode was not explicitly defined
else:
self.strict_mode = const.DEFAULT_STRICT_MODE
logger.debug(default_debug_msg.format(setting=setting, value=self.strict_mode))
def _check_for_connection_type_mismatch(self):
if self.legacy_key_material and self.connection_type == const.CONNECTION_INFO.OAUTH:
_warn_msg = (
'Legacy key material was provided but will be ignored as the connection_type was explicitly defined as OAuth'
)
# TODO: Call method for displaying warnings when a related setting is enabled
logger.warning(_warn_msg)
def _has_complete_legacy_connection_info(self) -> bool:
"""Return whether current connection info has the required legacy credentials."""
_legacy_info = self.connection_info.get(const.CONNECTION_INFO.LEGACY, {})
if not isinstance(_legacy_info, dict):
return False
_has_access_id = bool(_legacy_info.get(const.CONNECTION_INFO.LEGACY_ACCESS_ID))
_has_private_key = bool(
_legacy_info.get(const.CONNECTION_INFO.LEGACY_PRIVATE_KEY_FILE)
or _legacy_info.get(const.CONNECTION_INFO.LEGACY_PRIVATE_KEY_PEM)
)
return _has_access_id and _has_private_key
def _has_complete_oauth_connection_info(self) -> bool:
"""Return whether current connection info has the required OAuth credentials."""
_oauth_info = self.connection_info.get(const.CONNECTION_INFO.OAUTH, {})
if not isinstance(_oauth_info, dict):
return False
_has_issuer_url = bool(_oauth_info.get(const.CONNECTION_INFO.OAUTH_ISSUER_URL))
_has_client_id = bool(_oauth_info.get(const.CONNECTION_INFO.OAUTH_CLIENT_ID))
_has_scope = bool(_oauth_info.get(const.CONNECTION_INFO.OAUTH_SCOPE))
_has_private_key = bool(
_oauth_info.get(const.CONNECTION_INFO.OAUTH_PRIVATE_KEY_FILE)
or _oauth_info.get(const.CONNECTION_INFO.OAUTH_PRIVATE_KEY_JWK)
)
return _has_issuer_url and _has_client_id and _has_scope and _has_private_key
def _get_connection_type(self, _connection_type_from_arg: Optional[str]) -> None:
"""Define the connection type that should be used to authenticate to the RSA ID Plus tenant."""
self.connection_type = None
setting = const.CLIENT_SETTINGS.CONNECTION_TYPE
methods = const.ARGUMENT_VALUES.PROVIDED_METHODS # arg, helper, or env
debug_msg = const._LOG_MESSAGES._CLIENT_SETTING_CONFIGURED
# Check if the connection type was passed as an argument and leverage it if valid
if _connection_type_from_arg is not None:
if _connection_type_from_arg in const.CONNECTION_INFO.VALID_CONNECTION_TYPES:
self.connection_type = _connection_type_from_arg
logger.debug(debug_msg.format(setting=setting, value=self.connection_type, method=methods[0]))
else:
_error_msg = const._LOG_MESSAGES._INVALID_ARG_IGNORE.format(arg=setting)
_expected_types = ','.join(const.CONNECTION_INFO.VALID_CONNECTION_TYPES)
_error_msg += f' (Expected: {_expected_types}; Provided: {_connection_type_from_arg})'
logger.error(_error_msg)
# Attempt to retrieve the connection type via helper settings if present and populated
if (
not self.connection_type
and self._helper_settings
and const.HELPER_SETTINGS.CONNECTION_TYPE in self._helper_settings
and self._helper_settings[const.HELPER_SETTINGS.CONNECTION_TYPE] is not None
):
_helper_connection_type = self._helper_settings.get(const.HELPER_SETTINGS.CONNECTION_TYPE)
if _helper_connection_type in const.CONNECTION_INFO.VALID_CONNECTION_TYPES:
self.connection_type = _helper_connection_type
logger.debug(debug_msg.format(setting=setting, value=self.connection_type, method=methods[1]))
else:
_error_msg = 'The connection_type value in the helper settings is invalid and will be ignored'
_expected_types = ','.join(const.CONNECTION_INFO.VALID_CONNECTION_TYPES)
_error_msg += f' (Expected: {_expected_types}; Provided: {_helper_connection_type})'
logger.error(_error_msg)
# Attempt to retrieve the connection type via environment variable if defined
if (
not self.connection_type
and self._env_variables
and const.ENV_VARIABLES.CONNECTION_TYPE_FIELD in self._env_variables
and self._env_variables[const.ENV_VARIABLES.CONNECTION_TYPE_FIELD] is not None
):
_env_connection_type = self._env_variables[const.ENV_VARIABLES.CONNECTION_TYPE_FIELD]
if _env_connection_type in const.CONNECTION_INFO.VALID_CONNECTION_TYPES:
self.connection_type = _env_connection_type
logger.debug(debug_msg.format(setting=setting, value=self.connection_type, method=methods[2]))
else:
_error_msg = 'The connection_type environment variable is invalid and will be ignored'
_expected_types = ','.join(const.CONNECTION_INFO.VALID_CONNECTION_TYPES)
_error_msg += f' (Expected: {_expected_types}; Provided: {_env_connection_type})'
logger.error(_error_msg)
# Explicit/declared connection type wins over auto-detection
if self.connection_type:
self._check_for_connection_type_mismatch()
return
# Auto-detect connection type based on complete credential sets
if self._has_complete_oauth_connection_info():
self.connection_type = const.CONNECTION_INFO.OAUTH
logger.info("The 'oauth' connection_type was selected automatically based on provided OAuth credentials")
return
if self._has_complete_legacy_connection_info():
self.connection_type = const.CONNECTION_INFO.LEGACY
logger.info("The 'legacy' connection_type was selected automatically based on provided legacy credentials")
return
# Fallback to default when no complete credential set is detected.
self.connection_type = const.CONNECTION_INFO.DEFAULT_CONNECTION_TYPE
logger.info(
f"The default connection_type '{const.CONNECTION_INFO.DEFAULT_CONNECTION_TYPE}' will be used "
'as a complete credential set could not be auto-detected'
)
def _get_verify_ssl_setting(self, _verify_ssl_from_arg: Optional[bool]) -> None:
"""Determine the ``verify_ssl`` value from a passed argument, helper setting, or environment variable."""
setting = const.CLIENT_SETTINGS.VERIFY_SSL
methods = const.ARGUMENT_VALUES.PROVIDED_METHODS # arg, helper, or env
debug_msg = const._LOG_MESSAGES._CLIENT_SETTING_CONFIGURED
default_debug_msg = const._LOG_MESSAGES._WILL_USE_DEFAULT_VALUE
# Define the verify_ssl value using the argument if defined
if _verify_ssl_from_arg is not None and isinstance(_verify_ssl_from_arg, bool):
self.verify_ssl = _verify_ssl_from_arg
logger.debug(debug_msg.format(setting=setting, value=self.verify_ssl, method=methods[0]))
# Attempt to define the verify_ssl value using Helper Settings if present and populated
elif (
self._helper_settings
and const.HELPER_SETTINGS.VERIFY_SSL in self._helper_settings
and self._helper_settings[const.HELPER_SETTINGS.VERIFY_SSL] is not None
):
self.verify_ssl = self._helper_settings.get(
const.HELPER_SETTINGS.VERIFY_SSL,
const.CLIENT_SETTINGS.DEFAULT_VERIFY_SSL_VALUE, # Fallback value
)
logger.debug(debug_msg.format(setting=setting, value=self.verify_ssl, method=methods[1]))
# Attempt to define the verify_ssl value using an environment variable if defined
elif (
self._env_variables
and const.ENV_VARIABLES.VERIFY_SSL_FIELD in self._env_variables
and self._env_variables[const.ENV_VARIABLES.VERIFY_SSL_FIELD] is not None
):
self.verify_ssl = self._env_variables.get(
const.ENV_VARIABLES.VERIFY_SSL_FIELD,
const.CLIENT_SETTINGS.DEFAULT_VERIFY_SSL_VALUE, # Fallback value
)
logger.debug(debug_msg.format(setting=setting, value=self.verify_ssl, method=methods[2]))
# Use the default value (True) if not defined elsewhere
else:
self.verify_ssl = const.CLIENT_SETTINGS.DEFAULT_VERIFY_SSL_VALUE
logger.debug(default_debug_msg.format(setting=setting, value=self.verify_ssl))
def _define_oauth_api_type(self, _oauth_api_type_from_arg: Optional[str]) -> None:
"""Define which API type should be used for OAuth issuer URL inference."""
setting = const.CLIENT_SETTINGS.OAUTH_API_TYPE
methods = const.ARGUMENT_VALUES.PROVIDED_METHODS # arg, helper, or env
debug_msg = const._LOG_MESSAGES._CLIENT_SETTING_CONFIGURED
default_debug_msg = const._LOG_MESSAGES._WILL_USE_DEFAULT_VALUE
if _oauth_api_type_from_arg is not None:
if not isinstance(_oauth_api_type_from_arg, str):
_error_msg = (
f"The '{setting}' argument is an invalid data type "
f'(Expected: str, Provided: {type(_oauth_api_type_from_arg)})'
)
logger.error(_error_msg)
raise TypeError(_error_msg)
_oauth_api_type = _oauth_api_type_from_arg.strip().lower()
if _oauth_api_type not in const.VALID_API_TYPES:
_valid_values = ','.join(sorted(const.VALID_API_TYPES))
_error_msg = f"The '{setting}' value '{_oauth_api_type_from_arg}' is invalid (Expected one of: {_valid_values})"
logger.error(_error_msg)
raise ValueError(_error_msg)
self.oauth_api_type = _oauth_api_type
logger.debug(debug_msg.format(setting=setting, value=self.oauth_api_type, method=methods[0]))
return
self.oauth_api_type = const.AUTH_API_TYPE
logger.debug(default_debug_msg.format(setting=setting, value=self.oauth_api_type))
def _construct_base_url_with_tenant_name(self, _api_type: str, _tenant_name: Optional[str] = None) -> str:
"""Construct the base URL for a given API type using a tenant name."""
# Ensure the tenant name is defined
if not _tenant_name:
if self.tenant_name:
_tenant_name = self.tenant_name
else:
_error_msg = 'A tenant name must be defined or specified to construct a base URL'
logger.error(_error_msg)
raise errors.exceptions.MissingRequiredDataError(_error_msg)
# Construct the appropriate URL based on the provided API type
if _api_type == const.ADMIN_API_TYPE:
_base_url = const.URLS.BASE_ADMIN_URL.format(tenant_name=_tenant_name)
elif _api_type == const.AUTH_API_TYPE:
_base_url = const.URLS.BASE_AUTH_URL.format(tenant_name=_tenant_name)
else:
_error_msg = f"'{_api_type}' is not a valid API type"
logger.error(_error_msg)
raise ValueError(_error_msg)
# Return the constructed base URL
return _base_url
def _define_base_url(self, _base_url_from_arg: Optional[str]) -> None:
"""Define the base_url value from user-defined setting or raise an exception if it cannot be defined."""
# Attempt to define the base URL value for the Administration API by first checking if defined via argument
setting = const.CLIENT_SETTINGS.BASE_URL
methods = const.ARGUMENT_VALUES.PROVIDED_METHODS # arg, helper, or env
debug_msg = const._LOG_MESSAGES._CLIENT_SETTING_CONFIGURED
if _base_url_from_arg:
self.base_url = core_utils.get_base_url(_base_url_from_arg)
logger.debug(debug_msg.format(setting=setting, value=self.base_url, method=methods[0]))
# Attempt to define the base URL using the helper settings if defined and populated
elif (
self._helper_settings
and const.HELPER_SETTINGS.BASE_URL in self._helper_settings
and self._helper_settings[const.HELPER_SETTINGS.BASE_URL]
):
self.base_url = core_utils.get_base_url(self._helper_settings.get(const.HELPER_SETTINGS.BASE_URL))
logger.debug(debug_msg.format(setting=setting, value=self.base_url, method=methods[1]))
# Attempt to define the base URL using an environment variable if defined
elif (
const.ENV_VARIABLES.BASE_URL_FIELD in self._env_variables and self._env_variables[const.ENV_VARIABLES.BASE_URL_FIELD]
):
self.base_url = core_utils.get_base_url(self._env_variables.get(const.ENV_VARIABLES.BASE_URL_FIELD))
logger.debug(debug_msg.format(setting=setting, value=self.base_url, method=methods[2]))
# Set the value to None if the base URL could not be found
else:
self.base_url = None
def _identify_base_url(
self,
_api_type: str,
_base_url_arg: Optional[str] = None,
) -> str:
"""Identify the base URL for a specific API type and return the value."""
# Define the lookup field for the applicable base URL
_lookup_field = const.HELPER_SETTINGS.API_BASE_URL.format(type=_api_type)
# Leverage the base URL passed as an argument if defined
if _base_url_arg:
_base_url = core_utils.get_base_url(_base_url_arg)
# Attempt to define the base URL using the helper settings if defined and populated
elif self._helper_settings and _lookup_field in self._helper_settings and self._helper_settings[_lookup_field]:
_base_url = core_utils.get_base_url(self._helper_settings.get(_lookup_field))
# Attempt to define the base URL using an environment variable if defined
elif _lookup_field in self._env_variables and self._env_variables[_lookup_field]:
_base_url = core_utils.get_base_url(self._env_variables.get(_lookup_field))
# Set the value to None if the base URL could not be found
else:
_base_url = None
# Return the base URL value
return _base_url
def _define_base_urls(
self,
_base_admin_url_arg: Optional[str],
_base_auth_url_arg: Optional[str],
) -> None:
"""Define the base URLs for the Administration API (required)and the Authentication API (optional)."""
# Attempt to define the admin_base_url value (required)
self.admin_base_url = self._identify_base_url(const.ADMIN_API_TYPE, _base_admin_url_arg)
# Leverage the standard base_url value for the admin base URL if the previous attempt returned no result
if not self.admin_base_url:
self.admin_base_url = self.base_url
# Attempt to construct the base URL using the tenant name if the value is still undefined
if not self.admin_base_url and self.tenant_name:
self.admin_base_url = self._construct_base_url_with_tenant_name(const.ADMIN_API_TYPE)
# Raise an exception if a base URL could not be defined
if not self.admin_base_url:
_error_msg = (
'A base URL for the Administration API must be defined in order to fully '
'instantiate the PyDPlus client object'
)
logger.error(_error_msg)
raise errors.exceptions.MissingRequiredDataError(_error_msg)
# Ensure there is no ending slash at the end of the admin_base_url value
self.admin_base_url = core_utils.remove_ending_slash(self.admin_base_url)
# Attempt to define the auth_base_url value (optional)
self.auth_base_url = self._identify_base_url(const.AUTH_API_TYPE, _base_auth_url_arg)
# Attempt to construct the base URL using the tenant name if the value is still undefined
if not self.auth_base_url and self.tenant_name:
self.auth_base_url = self._construct_base_url_with_tenant_name(const.AUTH_API_TYPE)
# Attempt to infer the auth base URL from a known admin base URL pattern if still undefined
if not self.auth_base_url and self.admin_base_url:
_inferred_auth_base_url = _infer_auth_base_url_from_admin_base_url(self.admin_base_url)
if _inferred_auth_base_url:
self.auth_base_url = _inferred_auth_base_url
logger.debug('The auth_base_url value was inferred from the admin_base_url setting')
# Log a warning if the Authentication API base URL could not be defined but do not raise an exception
if not self.auth_base_url:
_warn_msg = 'The base URL for the Authentication API could not be defined and calls to that API will fail.'
logger.warning(_warn_msg)
else:
# Ensure there is no ending slash at the end of the admin_base_url value
self.auth_base_url = core_utils.remove_ending_slash(self.auth_base_url)
def _validate_connection_info(
self,
_connection_info: Optional[dict] = None,
_private_key: Optional[str] = None,
_legacy_access_id: Optional[str] = None,
_oauth_client_id: Optional[str] = None,
_oauth_issuer_url: Optional[str] = None,
_oauth_private_key: Optional[str] = None,
_oauth_private_key_jwk: Union[Optional[dict], Optional[str]] = None,
_oauth_scope: Union[Optional[str], Optional[tuple], Optional[list], Optional[set], Optional[frozenset]] = None,
_oauth_scope_preset: Union[Optional[str], Optional[tuple[str]], Optional[list[str]], Optional[set[str]]] = None,
_legacy_key_material: Optional[IDPlusLegacyKeyMaterial] = None,
) -> None:
"""Check for provided connection info and define the class object attribute."""
if _legacy_key_material and not _legacy_access_id:
_legacy_access_id = _legacy_key_material.access_id
if not _connection_info:
# Check for individual parameters defined in object instantiation
_connection_info = compile_connection_info(
base_url=self.base_url,
admin_base_url=self.admin_base_url,
auth_base_url=self.auth_base_url,
oauth_api_type=self.oauth_api_type,
private_key=_private_key,
legacy_access_id=_legacy_access_id,
oauth_client_id=_oauth_client_id,
oauth_issuer_url=_oauth_issuer_url,
oauth_private_key=_oauth_private_key,
oauth_private_key_jwk=_oauth_private_key_jwk,
oauth_scope_preset=_oauth_scope_preset,
oauth_scope=_oauth_scope,
)
# Check for defined helper settings
if self._helper_settings:
_helper_connection_info = self._parse_helper_connection_info()
_connection_info = self._merge_connection_variables(_connection_info, _helper_connection_info)
# Check for defined environment variables
if self._env_variables:
_env_connection_info = self._parse_env_connection_info()
_connection_info = self._merge_connection_variables(_connection_info, _env_connection_info)
_connection_info = self._define_oauth_scope_from_presets(
_connection_info=_connection_info,
_oauth_scope_preset_from_arg=_oauth_scope_preset,
)
# Add missing field values where possible and when needed
_connection_info = self._populate_missing_connection_details(_connection_info)
# Merge parsed legacy key material into connection info when available
if _legacy_key_material:
_connection_info = self._merge_legacy_key_material(_connection_info, _legacy_key_material)
self.connection_info = _connection_info
@staticmethod
def _merge_legacy_key_material(
_connection_info: Optional[dict],
_legacy_key_material: IDPlusLegacyKeyMaterial,
) -> dict:
"""Merge legacy key material into connection info without overriding explicit file-path values."""
if not _connection_info:
_connection_info = dict(const.CONNECTION_INFO.EMPTY_CONNECTION_INFO)
_legacy_section = dict(_connection_info.get(const.CONNECTION_INFO.LEGACY, {}))
if not _legacy_section.get(const.CONNECTION_INFO.LEGACY_ACCESS_ID):
_legacy_section[const.CONNECTION_INFO.LEGACY_ACCESS_ID] = _legacy_key_material.access_id
_has_explicit_key_file = bool(_legacy_section.get(const.CONNECTION_INFO.LEGACY_PRIVATE_KEY_FILE))
if not _has_explicit_key_file:
_legacy_section[const.CONNECTION_INFO.LEGACY_PRIVATE_KEY_PEM] = _legacy_key_material.access_key_pem
_connection_info[const.CONNECTION_INFO.LEGACY] = _legacy_section
return _connection_info
def _parse_helper_connection_info(self) -> dict[str, dict[str, Any]]:
"""Parse the helper content to populate the connection info."""
_helper_connection = self._helper_settings.get(const.HELPER_SETTINGS.CONNECTION, {})
_helper_connection_info: dict[str, dict[str, Any]] = {}
for _section, _key_list in const.CONNECTION_INFO.CONNECTION_FIELDS.items():
_section_data = _helper_connection.get(_section, {})
_helper_connection_info[_section] = {_key: _section_data.get(_key) for _key in _key_list}
return _helper_connection_info
def _parse_env_connection_info(self) -> dict[str, dict[str, Any]]:
"""Parse environment variable definitions into the connection info dictionary."""
_env_connection_info: dict[str, dict[str, Any]] = {}
for _section, _field_mapping in (
(const.CONNECTION_INFO.LEGACY, const.HELPER_SETTINGS.ENV_LEGACY_CONNECTION_MAPPING),
(const.CONNECTION_INFO.OAUTH, const.HELPER_SETTINGS.ENV_OAUTH_CONNECTION_MAPPING),
):
_env_connection_info[_section] = {
_connection_field: self._env_variables.get(_env_variable_field)
for _connection_field, _env_variable_field in _field_mapping.items()
}
return _env_connection_info
@staticmethod
def _parse_oauth_scope_preset_values(
_oauth_scope_preset: Union[Optional[str], Optional[tuple[str]], Optional[list[str]], Optional[set[str]]],
) -> list[str]:
"""Parse OAuth scope preset values into a normalized list."""
if _oauth_scope_preset is None:
return []
_parsed_preset_values: list[str] = []
if isinstance(_oauth_scope_preset, str):
_parsed_preset_values.extend(_oauth_scope_preset.replace('+', ' ').replace(',', ' ').split())
return _parsed_preset_values
if isinstance(_oauth_scope_preset, Iterable):
for _scope_preset_value in _oauth_scope_preset:
if not isinstance(_scope_preset_value, str):
_error_msg = (
f"The 'oauth_scope_preset' values must be strings (provided element type: {type(_scope_preset_value)})"
)
logger.error(_error_msg)
raise TypeError(_error_msg)
_parsed_preset_values.extend(_scope_preset_value.replace('+', ' ').replace(',', ' ').split())
return _parsed_preset_values
_error_msg = (
"The 'oauth_scope_preset' value must be supplied as a string or iterable of strings "
f'(provided: {type(_oauth_scope_preset)})'
)
logger.error(_error_msg)
raise TypeError(_error_msg)
def _define_oauth_scope_from_presets(
self,
_connection_info: dict[str, dict[str, Any]],
_oauth_scope_preset_from_arg: Union[Optional[str], Optional[tuple[str]], Optional[list[str]], Optional[set[str]]],
) -> dict[str, dict[str, Any]]:
"""Define OAuth scopes by merging explicit scopes with any presets from all supported sources."""
_combined_scope_presets: list[str] = []
_seen_scope_presets: set[str] = set()
_helper_scope_preset = None
if (
const.HELPER_SETTINGS.CONNECTION in self._helper_settings
and isinstance(self._helper_settings[const.HELPER_SETTINGS.CONNECTION], dict)
and const.CONNECTION_INFO.OAUTH in self._helper_settings[const.HELPER_SETTINGS.CONNECTION]
and isinstance(self._helper_settings[const.HELPER_SETTINGS.CONNECTION][const.CONNECTION_INFO.OAUTH], dict)
):
_helper_oauth = self._helper_settings[const.HELPER_SETTINGS.CONNECTION][const.CONNECTION_INFO.OAUTH]
_helper_scope_preset = _helper_oauth.get(const.HELPER_SETTINGS.OAUTH_SCOPE_PRESET)
if _helper_scope_preset is None:
_helper_scope_preset = _helper_oauth.get(const.HELPER_SETTINGS.LEGACY_OAUTH_SCOPE_PRESET)
# Fallback for any helper settings parsed with older root-level field names.
if _helper_scope_preset is None:
_helper_scope_preset = self._helper_settings.get(const.HELPER_SETTINGS.LEGACY_OAUTH_SCOPE_PRESET)
for _scope_preset_values in (
_oauth_scope_preset_from_arg,
_helper_scope_preset,
self._env_variables.get(const.ENV_VARIABLES.OAUTH_SCOPE_PRESET_FIELD),
):
for _scope_preset_value in self._parse_oauth_scope_preset_values(_scope_preset_values):
_normalized_scope_preset_value = _scope_preset_value.strip().lower()
if _normalized_scope_preset_value and _normalized_scope_preset_value not in _seen_scope_presets:
_combined_scope_presets.append(_normalized_scope_preset_value)
_seen_scope_presets.add(_normalized_scope_preset_value)
if not _combined_scope_presets:
return _connection_info
_oauth_section = _connection_info.get(const.CONNECTION_INFO.OAUTH, {})
_existing_scope = _oauth_section.get(const.CONNECTION_INFO.OAUTH_SCOPE)
_merged_oauth_scope = auth._get_scope_from_preset(_combined_scope_presets, _existing_scope)
_oauth_section[const.CONNECTION_INFO.OAUTH_SCOPE] = core_utils.normalize_oauth_scope(_merged_oauth_scope)
_connection_info[const.CONNECTION_INFO.OAUTH] = _oauth_section
return _connection_info
@staticmethod
def _merge_connection_variables(
_defined_info: Optional[Mapping[str, Mapping[str, Any]]] = None,
_supplemental_info: Optional[Mapping[str, Mapping[str, Any]]] = None,
) -> dict[str, dict[str, Any]]:
"""Merge explicit connection values with supplemental helper or environment values."""
_merged_connection_info: dict[str, dict[str, Any]] = {}
for _section, _key_list in const.CONNECTION_INFO.CONNECTION_FIELDS.items():
_defined_section = _defined_info.get(_section, {}) if _defined_info else {}
_supplemental_section = _supplemental_info.get(_section, {}) if _supplemental_info else {}
_merged_connection_info[_section] = {
_key: (
_defined_section[_key]
if _key in _defined_section and _defined_section[_key] is not None
else _supplemental_section.get(_key)
)
for _key in _key_list
}
return _merged_connection_info
def _populate_missing_connection_details(self, _partial_connection_info: dict) -> dict:
"""Add missing field values the connection info dictionary as needed."""
# Define variables for the dictionary keys/fields
_issuer_url_key = const.CONNECTION_INFO.OAUTH_ISSUER_URL
_oauth_key = const.CONNECTION_INFO.OAUTH
_grant_type_key = const.CONNECTION_INFO.OAUTH_GRANT_TYPE
_client_auth_key = const.CONNECTION_INFO.OAUTH_CLIENT_AUTHENTICATION
if self.oauth_api_type == const.AUTH_API_TYPE:
_issuer_base_url = self.auth_base_url if self.auth_base_url else self.admin_base_url
else:
_issuer_base_url = self.admin_base_url if self.admin_base_url else self.auth_base_url
if not _issuer_base_url:
_issuer_base_url = self.base_url
# Populate the Issuer URL value for OAuth connections if not defined
if (
_issuer_url_key not in _partial_connection_info[_oauth_key]
or not _partial_connection_info[_oauth_key][_issuer_url_key]
) and _issuer_base_url is not None:
_partial_connection_info[_oauth_key][_issuer_url_key] = const.URLS.OAUTH.format(base_url=_issuer_base_url)
# Populate the Grant Type value for OAuth connections if not defined
if (
_grant_type_key not in _partial_connection_info[_oauth_key]
or not _partial_connection_info[_oauth_key][_grant_type_key]
):
_partial_connection_info[_oauth_key][_grant_type_key] = const.CONNECTION_INFO.OAUTH_DEFAULT_GRANT_TYPE
# Populate the Client Authentication value for OAuth connections if not defined
if (
_client_auth_key not in _partial_connection_info[_oauth_key]
or not _partial_connection_info[_oauth_key][_client_auth_key]
):
_partial_connection_info[_oauth_key][_client_auth_key] = const.CONNECTION_INFO.OAUTH_DEFAULT_CLIENT_AUTH
# Return the updated connection info dictionary
return _partial_connection_info
def _ensure_oauth_headers(self, force_refresh: bool = False) -> dict[str, str]:
"""Ensure valid OAuth headers are available for Administration API calls."""
if self.connection_type != const.CONNECTION_INFO.OAUTH:
return self.base_headers
base_headers, self._oauth_token_data = auth.get_oauth_headers(
connection_info=self.connection_info,
verify_ssl=self.verify_ssl,
token_data=self._oauth_token_data,
force_refresh=force_refresh,
)
self.base_headers = base_headers
return base_headers
[docs]
def refresh_oauth_token(self) -> dict[str, str]:
"""Force refresh the OAuth access token and return updated base headers."""
return self._ensure_oauth_headers(force_refresh=True)
def _check_if_connected(self) -> None:
"""Check to see if the object is connected to the tenant and raises an exception if not."""
if not self.connected:
_error_msg = 'Must be connected to the tenant before performing an API call. Call the connect() method.'
logger.error(_error_msg)
raise errors.exceptions.APIConnectionError(_error_msg)
[docs]
def connect(self) -> Tuple[bool, dict[str, str]]:
"""Connect to the RSA ID Plus tenant using the Legacy API or OAuth method.
:returns: Boolean value indicating if connection was established and dictionary with base API headers
:raises: :py:exc:`errors.exceptions.APIConnectionError`,
:py:exc:`errors.exceptions.FeatureNotConfiguredError`
"""
base_headers = self.base_headers
connected = self.connected
if connected and self.connection_type != const.CLIENT_SETTINGS.CONNECTION_TYPE_OAUTH:
logger.debug('The client is already connected to the RSA ID Plus tenant')
return connected, base_headers
if self.connection_type == const.CLIENT_SETTINGS.CONNECTION_TYPE_LEGACY:
# Connect to the tenant using the legacy API method
try:
base_headers = auth.get_legacy_headers(base_url=self.base_url, connection_info=self.connection_info)
self._oauth_token_data = None
connected = True
except Exception as exc:
exc_type = type(exc).__name__
error_msg = f'Failed to connect using Legacy API due to the following {exc_type} exception: {exc}'
logger.error(error_msg)
raise errors.exceptions.APIConnectionError(error_msg)
elif self.connection_type == const.CLIENT_SETTINGS.CONNECTION_TYPE_OAUTH:
# Connect to the tenant using the OAuth method
try:
base_headers = self._ensure_oauth_headers(force_refresh=connected)
connected = True
except Exception as exc:
exc_type = type(exc).__name__
error_msg = f'Failed to connect using OAuth due to the following {exc_type} exception: {exc}'
logger.error(error_msg)
raise errors.exceptions.APIConnectionError(error_msg)
else:
error_msg = f"Unsupported connection_type '{self.connection_type}'"
logger.error(error_msg)
raise errors.exceptions.APIConnectionError(error_msg)
return connected, base_headers
[docs]
def get(
self,
endpoint: str,
params: Optional[dict] = None,
headers: Optional[dict] = None,
api_type: str = const.ADMIN_API_TYPE,
timeout: int = const.DEFAULT_API_TIMEOUT_SECONDS,
show_full_error: bool = True,
return_json: bool = True,
allow_failed_response: Optional[bool] = None,
):
"""Perform a GET request against the ID Plus tenant.
:param endpoint: The API endpoint to query
:type endpoint: str
:param params: The query parameters (where applicable)
:type params: dict, None
:param headers: Specific API headers to use when performing the API call (beyond the base headers)
:type headers: dict, None
:param api_type: Indicates if the ``admin`` (default) or ``auth`` API will be leveraged.
:type api_type: str
:param timeout: The timeout period in seconds (defaults to ``30``)
:type timeout: int
:param show_full_error: Determines if the full error message should be displayed (defaults to ``True``)
:type show_full_error: bool
:param return_json: Determines if the response should be returned in JSON format (defaults to ``True``)
:type return_json: bool
:param allow_failed_response: Indicates that failed responses should return and should not raise an exception
(If not explicitly defined then ``True`` if Strict Mode is disabled)
:type allow_failed_response: bool, None
:returns: The API response in JSON format or as a ``requests`` object
:raises: :py:exc:`errors.exceptions.APIConnectionError`,
:py:exc:`errors.exceptions.APIRequestError`,
:py:exc:`errors.exceptions.APIResponseConversionError`,
:py:exc:`errors.exceptions.InvalidFieldError`
"""
self._check_if_connected()
return api.get(
self,
endpoint=endpoint,
params=params,
headers=headers,
api_type=api_type,
timeout=timeout,
show_full_error=show_full_error,
return_json=return_json,
allow_failed_response=allow_failed_response,
)
[docs]
def patch(
self,
endpoint: str,
payload: dict,
params: Optional[dict] = None,
headers: Optional[dict] = None,
api_type: str = const.ADMIN_API_TYPE,
timeout: int = const.DEFAULT_API_TIMEOUT_SECONDS,
show_full_error: bool = True,
return_json: bool = True,
allow_failed_response: Optional[bool] = None,
):
"""Perform a PATCH call with payload against the ID Plus tenant.
:param endpoint: The API endpoint to query
:type endpoint: str
:param payload: The payload to leverage in the API call
:type payload: dict
:param params: The query parameters (where applicable)
:type params: dict, None
:param headers: Specific API headers to use when performing the API call (beyond the base headers)
:type headers: dict, None
:param api_type: Indicates if the ``admin`` (default) or ``auth`` API will be leveraged.
:type api_type: str
:param timeout: The timeout period in seconds (defaults to ``30``)
:type timeout: int
:param show_full_error: Determines if the full error message should be displayed (defaults to ``True``)
:type show_full_error: bool
:param return_json: Determines if the response should be returned in JSON format (defaults to ``True``)
:type return_json: bool
:param allow_failed_response: Indicates that failed responses should return and should not raise an exception
(If not explicitly defined then ``True`` if Strict Mode is disabled)
:type allow_failed_response: bool, None
:returns: The API response in JSON format or as a ``requests`` object
:raises: :py:exc:`errors.exceptions.APIConnectionError`,
:py:exc:`errors.exceptions.APIMethodError`,
:py:exc:`errors.exceptions.APIRequestError`,
:py:exc:`errors.exceptions.APIResponseConversionError`,
:py:exc:`errors.exceptions.InvalidFieldError`
"""
self._check_if_connected()
return api.patch(
self,
endpoint=endpoint,
payload=payload,
params=params,
headers=headers,
api_type=api_type,
timeout=timeout,
show_full_error=show_full_error,
return_json=return_json,
allow_failed_response=allow_failed_response,
)
[docs]
def post(
self,
endpoint: str,
payload: dict,
params: Optional[dict] = None,
headers: Optional[dict] = None,
api_type: str = const.ADMIN_API_TYPE,
timeout: int = const.DEFAULT_API_TIMEOUT_SECONDS,
show_full_error: bool = True,
return_json: bool = True,
allow_failed_response: Optional[bool] = None,
):
"""Perform a POST call with payload against the ID Plus tenant.
:param endpoint: The API endpoint to query
:type endpoint: str
:param payload: The payload to leverage in the API call
:type payload: dict
:param params: The query parameters (where applicable)
:type params: dict, None
:param headers: Specific API headers to use when performing the API call (beyond the base headers)
:type headers: dict, None
:param api_type: Indicates if the ``admin`` (default) or ``auth`` API will be leveraged.
:type api_type: str
:param timeout: The timeout period in seconds (defaults to ``30``)
:type timeout: int
:param show_full_error: Determines if the full error message should be displayed (defaults to ``True``)
:type show_full_error: bool
:param return_json: Determines if the response should be returned in JSON format (defaults to ``True``)
:type return_json: bool
:param allow_failed_response: Indicates that failed responses should return and should not raise an exception
(If not explicitly defined then ``True`` if Strict Mode is disabled)
:type allow_failed_response: bool, None
:returns: The API response in JSON format or as a ``requests`` object
:raises: :py:exc:`errors.exceptions.APIConnectionError`,
:py:exc:`errors.exceptions.APIMethodError`,
:py:exc:`errors.exceptions.APIRequestError`,
:py:exc:`errors.exceptions.APIResponseConversionError`,
:py:exc:`errors.exceptions.InvalidFieldError`
"""
self._check_if_connected()
return api.post(
self,
endpoint=endpoint,
payload=payload,
params=params,
headers=headers,
api_type=api_type,
timeout=timeout,
show_full_error=show_full_error,
return_json=return_json,
allow_failed_response=allow_failed_response,
)
[docs]
def put(
self,
endpoint: str,
payload: dict,
params: Optional[dict] = None,
headers: Optional[dict] = None,
api_type: str = const.ADMIN_API_TYPE,
timeout: int = const.DEFAULT_API_TIMEOUT_SECONDS,
show_full_error: bool = True,
return_json: bool = True,
allow_failed_response: Optional[bool] = None,
):
"""Perform a PUT call with payload against the ID Plus tenant.
:param endpoint: The API endpoint to query
:type endpoint: str
:param payload: The payload to leverage in the API call
:type payload: dict
:param params: The query parameters (where applicable)
:type params: dict, None
:param headers: Specific API headers to use when performing the API call (beyond the base headers)
:type headers: dict, None
:param api_type: Indicates if the ``admin`` (default) or ``auth`` API will be leveraged.
:type api_type: str
:param timeout: The timeout period in seconds (defaults to ``30``)
:type timeout: int
:param show_full_error: Determines if the full error message should be displayed (defaults to ``True``)
:type show_full_error: bool
:param return_json: Determines if the response should be returned in JSON format (defaults to ``True``)
:type return_json: bool
:param allow_failed_response: Indicates that failed responses should return and should not raise an exception
(If not explicitly defined then ``True`` if Strict Mode is disabled)
:type allow_failed_response: bool, None
:returns: The API response in JSON format or as a ``requests`` object
:raises: :py:exc:`errors.exceptions.APIConnectionError`,
:py:exc:`errors.exceptions.APIMethodError`,
:py:exc:`errors.exceptions.APIRequestError`,
:py:exc:`errors.exceptions.APIResponseConversionError`,
:py:exc:`errors.exceptions.InvalidFieldError`
"""
self._check_if_connected()
return api.put(
self,
endpoint=endpoint,
payload=payload,
params=params,
headers=headers,
api_type=api_type,
timeout=timeout,
show_full_error=show_full_error,
return_json=return_json,
allow_failed_response=allow_failed_response,
)
[docs]
class User:
"""Class containing user-related methods."""
def __init__(self, pydp_object) -> None:
"""Initialize the :py:class:`pydplus.core.PyDPlus.User` inner class object.
:param pydp_object: The core :py:class:`pydplus.PyDPlus` object
:type pydp_object: class[pydplus.PyDPlus]
:returns: None
"""
self.pydp_object: PyDPlus = pydp_object
[docs]
def get_user_details(
self,
email: str,
search_unsynced: Optional[bool] = None,
timeout: int = const.DEFAULT_API_TIMEOUT_SECONDS,
show_full_error: bool = True,
return_json: bool = True,
allow_failed_response: Optional[bool] = None,
):
"""Retrieve the details for a specific user based on their email address.
:param email: The email address of the user for whom to retrieve details
:type email: str
:param search_unsynced: Indicates if the user search should include unsynchronized users (optional)
:type search_unsynced: bool, None
:param timeout: The timeout period in seconds (defaults to ``30``)
:type timeout: int
:param show_full_error: Determines if the full error message should be displayed (defaults to ``True``)
:type show_full_error: bool
:param return_json: Determines if the response should be returned in JSON format (defaults to ``True``)
:type return_json: bool
:param allow_failed_response: Indicates that failed responses should return and should not raise an exception
(If not explicitly defined then ``True`` if Strict Mode is disabled)
:type allow_failed_response: bool, None
:returns: The user details in JSON format or the API response as a ``requests`` object
:raises: :py:exc:`TypeError`,
:py:exc:`errors.exceptions.APIConnectionError`,
:py:exc:`errors.exceptions.APIMethodError`,
:py:exc:`errors.exceptions.APIRequestError`,
:py:exc:`errors.exceptions.APIResponseConversionError`,
:py:exc:`errors.exceptions.InvalidFieldError`
"""
self.pydp_object._check_if_connected()
return users_module.get_user_details(
self.pydp_object,
email=email,
search_unsynced=search_unsynced,
timeout=timeout,
show_full_error=show_full_error,
return_json=return_json,
allow_failed_response=allow_failed_response,
)
[docs]
def get_user_id(
self,
email: Optional[str] = None,
user_details: Optional[dict] = None,
search_unsynced: Optional[bool] = None,
timeout: int = const.DEFAULT_API_TIMEOUT_SECONDS,
show_full_error: bool = True,
) -> str:
"""Retrieve the User ID associated with a specific user.
:param email: The email address of the user for whom to retrieve details
:type email: str, None
:param user_details: The user details data from the :py:func:`pydplus.users.get_user_details` function
:type user_details: dict, None
:param search_unsynced: Indicates if the user search should include unsynchronized users (optional)
:type search_unsynced: bool, None
:param timeout: The timeout period in seconds (defaults to ``30``)
:type timeout: int, str, None
:param show_full_error: Determines if the full error message should be displayed (defaults to ``True``)
:type show_full_error: bool
:returns: The User ID for the given user as a string (e.g. ``54082ac6-4713-6368-2251-df813c41159f``)
or an empty string if the User ID could not be retrieved successfully
:raises: :py:exc:`TypeError`,
:py:exc:`errors.exceptions.APIConnectionError`,
:py:exc:`errors.exceptions.APIMethodError`,
:py:exc:`errors.exceptions.APIRequestError`,
:py:exc:`errors.exceptions.APIResponseConversionError`,
:py:exc:`errors.exceptions.InvalidFieldError`,
:py:exc:`errors.exceptions.MissingRequiredDataError`
"""
self.pydp_object._check_if_connected()
return users_module.get_user_id(
self.pydp_object,
email=email,
user_details=user_details,
search_unsynced=search_unsynced,
timeout=timeout,
show_full_error=show_full_error,
)
[docs]
def enable_user(
self,
user_id: str,
timeout: int = const.DEFAULT_API_TIMEOUT_SECONDS,
show_full_error: bool = True,
return_json: bool = True,
allow_failed_response: Optional[bool] = None,
):
"""Enable a user that is currently disabled.
:param user_id: The ID of an existing user (e.g. ``54082ac6-4713-6368-2251-df813c41159f``)
:type user_id: str
:param timeout: The timeout period in seconds (defaults to ``30``)
:type timeout: int
:param show_full_error: Determines if the full error message should be displayed (defaults to ``True``)
:type show_full_error: bool
:param return_json: Determines if the response should be returned in JSON format (defaults to ``True``)
:type return_json: bool
:param allow_failed_response: Indicates that failed responses should return and should not raise an exception
(If not explicitly defined then ``True`` if Strict Mode is disabled)
:type allow_failed_response: bool, None
:returns: The API response in JSON format or as a ``requests`` object
:raises: :py:exc:`TypeError`,
:py:exc:`errors.exceptions.APIConnectionError`,
:py:exc:`errors.exceptions.APIMethodError`,
:py:exc:`errors.exceptions.APIRequestError`,
:py:exc:`errors.exceptions.APIResponseConversionError`,
:py:exc:`errors.exceptions.InvalidFieldError`,
:py:exc:`errors.exceptions.MissingRequiredDataError`
"""
self.pydp_object._check_if_connected()
return users_module.enable_user(
self.pydp_object,
user_id=user_id,
timeout=timeout,
show_full_error=show_full_error,
return_json=return_json,
allow_failed_response=allow_failed_response,
)
[docs]
def disable_user(
self,
user_id: str,
timeout: int = const.DEFAULT_API_TIMEOUT_SECONDS,
show_full_error: bool = True,
return_json: bool = True,
allow_failed_response: Optional[bool] = None,
):
"""Disable a user that is currently enabled.
:param user_id: The ID of an existing user (e.g. ``54082ac6-4713-6368-2251-df813c41159f``)
:type user_id: str
:param timeout: The timeout period in seconds (defaults to ``30``)
:type timeout: int
:param show_full_error: Determines if the full error message should be displayed (defaults to ``True``)
:type show_full_error: bool
:param return_json: Determines if the response should be returned in JSON format (defaults to ``True``)
:type return_json: bool
:param allow_failed_response: Indicates that failed responses should return and should not raise an exception
(If not explicitly defined then ``True`` if Strict Mode is disabled)
:type allow_failed_response: bool, None
:returns: The API response in JSON format or as a ``requests`` object
:raises: :py:exc:`TypeError`,
:py:exc:`errors.exceptions.APIConnectionError`,
:py:exc:`errors.exceptions.APIMethodError`,
:py:exc:`errors.exceptions.APIRequestError`,
:py:exc:`errors.exceptions.APIResponseConversionError`,
:py:exc:`errors.exceptions.InvalidFieldError`,
:py:exc:`errors.exceptions.MissingRequiredDataError`
"""
self.pydp_object._check_if_connected()
return users_module.disable_user(
self.pydp_object,
user_id=user_id,
timeout=timeout,
show_full_error=show_full_error,
return_json=return_json,
allow_failed_response=allow_failed_response,
)
[docs]
def synchronize_user(
self,
user_id: str,
timeout: int = const.DEFAULT_API_TIMEOUT_SECONDS,
show_full_error: bool = True,
return_json: bool = True,
allow_failed_response: Optional[bool] = None,
):
"""Synchronize the details of a user between an identity source and the Cloud Access Service.
:param user_id: The ID of an existing user (e.g. ``54082ac6-4713-6368-2251-df813c41159f``)
:type user_id: str
:param timeout: The timeout period in seconds (defaults to ``30``)
:type timeout: int
:param show_full_error: Determines if the full error message should be displayed (defaults to ``True``)
:type show_full_error: bool
:param return_json: Determines if the response should be returned in JSON format (defaults to ``True``)
:type return_json: bool
:param allow_failed_response: Indicates that failed responses should return and should not raise an exception
(If not explicitly defined then ``True`` if Strict Mode is disabled)
:type allow_failed_response: bool, None
:returns: The API response in JSON format or as a ``requests`` object
:raises: :py:exc:`TypeError`,
:py:exc:`errors.exceptions.APIConnectionError`,
:py:exc:`errors.exceptions.APIMethodError`,
:py:exc:`errors.exceptions.APIRequestError`,
:py:exc:`errors.exceptions.APIResponseConversionError`,
:py:exc:`errors.exceptions.InvalidFieldError`,
:py:exc:`errors.exceptions.MissingRequiredDataError`
"""
self.pydp_object._check_if_connected()
return users_module.synchronize_user(
self.pydp_object,
user_id=user_id,
timeout=timeout,
show_full_error=show_full_error,
return_json=return_json,
allow_failed_response=allow_failed_response,
)
[docs]
def mark_deleted(
self,
user_id: str,
timeout: int = const.DEFAULT_API_TIMEOUT_SECONDS,
show_full_error: bool = True,
return_json: bool = True,
allow_failed_response: Optional[bool] = None,
):
"""Mark a specific user to be deleted during the next automated bulk deletion process.
:param user_id: The ID of an existing user (e.g. ``54082ac6-4713-6368-2251-df813c41159f``)
:type user_id: str
:param timeout: The timeout period in seconds (defaults to ``30``)
:type timeout: int
:param show_full_error: Determines if the full error message should be displayed (defaults to ``True``)
:type show_full_error: bool
:param return_json: Determines if the response should be returned in JSON format (defaults to ``True``)
:type return_json: bool
:param allow_failed_response: Indicates that failed responses should return and should not raise an exception
(If not explicitly defined then ``True`` if Strict Mode is disabled)
:type allow_failed_response: bool, None
:returns: The API response in JSON format or as a ``requests`` object
:raises: :py:exc:`TypeError`,
:py:exc:`errors.exceptions.APIConnectionError`,
:py:exc:`errors.exceptions.APIMethodError`,
:py:exc:`errors.exceptions.APIRequestError`,
:py:exc:`errors.exceptions.APIResponseConversionError`,
:py:exc:`errors.exceptions.InvalidFieldError`,
:py:exc:`errors.exceptions.MissingRequiredDataError`
"""
self.pydp_object._check_if_connected()
return users_module.mark_deleted(
self.pydp_object,
user_id=user_id,
timeout=timeout,
show_full_error=show_full_error,
return_json=return_json,
allow_failed_response=allow_failed_response,
)
[docs]
def unmark_deleted(
self,
user_id: str,
timeout: int = const.DEFAULT_API_TIMEOUT_SECONDS,
show_full_error: bool = True,
return_json: bool = True,
allow_failed_response: Optional[bool] = None,
):
"""Unmark a specific user that was flagged to be deleted.
:param user_id: The ID of an existing user (e.g. ``54082ac6-4713-6368-2251-df813c41159f``)
:type user_id: str
:param timeout: The timeout period in seconds (defaults to ``30``)
:type timeout: int, str, None
:param show_full_error: Determines if the full error message should be displayed (defaults to ``True``)
:type show_full_error: bool
:param return_json: Determines if the response should be returned in JSON format (defaults to ``True``)
:type return_json: bool
:param allow_failed_response: Indicates that failed responses should return and should not raise an exception
(If not explicitly defined then ``True`` if Strict Mode is disabled)
:type allow_failed_response: bool, None
:returns: The API response in JSON format or as a ``requests`` object
:raises: :py:exc:`TypeError`,
:py:exc:`errors.exceptions.APIConnectionError`,
:py:exc:`errors.exceptions.APIMethodError`,
:py:exc:`errors.exceptions.APIRequestError`,
:py:exc:`errors.exceptions.APIResponseConversionError`,
:py:exc:`errors.exceptions.InvalidFieldError`,
:py:exc:`errors.exceptions.MissingRequiredDataError`
"""
self.pydp_object._check_if_connected()
return users_module.unmark_deleted(
self.pydp_object,
user_id=user_id,
timeout=timeout,
show_full_error=show_full_error,
return_json=return_json,
allow_failed_response=allow_failed_response,
)
[docs]
def compile_connection_info(
base_url: Optional[str] = None,
admin_base_url: Optional[str] = None,
private_key: Optional[str] = None,
legacy_access_id: Optional[str] = None,
oauth_client_id: Optional[str] = None,
oauth_private_key: Optional[str] = None,
oauth_private_key_jwk: Union[Optional[dict], Optional[str]] = None,
oauth_scope: Union[Optional[str], Optional[tuple], Optional[list], Optional[set], Optional[frozenset]] = None,
oauth_scope_preset: Union[Optional[str], Optional[tuple[str]], Optional[list[str]], Optional[set[str]]] = None,
auth_base_url: Optional[str] = None,
oauth_api_type: Optional[str] = None,
oauth_issuer_url: Optional[str] = None,
) -> dict:
"""Compile the connection_info dictionary to use when authenticating to the API.
:param base_url: The base URL to leverage when performing API calls (deprecated and kept for backwards compatibility)
:type base_url: str, None
:param admin_base_url: The base URL for the Administration API
:type admin_base_url: str, None
:param private_key: The file path to the private key used for legacy API authentication
:type private_key: str, None
:param legacy_access_id: The Access ID associated with the Legacy API connection
:type legacy_access_id: str, None
:param oauth_client_id: The Client ID associated with the OAuth API connection
:type oauth_client_id: str, None
:param oauth_private_key: The file path to the OAuth private-key JWK used for Private Key JWT authentication
:type oauth_private_key: str, None
:param oauth_private_key_jwk: The OAuth private-key JWK payload used for Private Key JWT authentication
:type oauth_private_key_jwk: dict, str, None
:param oauth_scope: One or more OAuth scopes to request in token requests
(``+``-delimited string or iterable of scope strings)
:type oauth_scope: str, tuple, list, set, frozenset, None
:param oauth_scope_preset: One or more presets representing groupings of OAuth scopes and permissions
:type oauth_scope_preset: str, tuple, list, set, None
:param auth_base_url: The base URL for the Authentication API
:type auth_base_url: str, None
:param oauth_api_type: The API type to use when inferring OAuth issuer URL values (``auth`` by default)
:type oauth_api_type: str, None
:param oauth_issuer_url: The explicit OAuth issuer URL to use for token requests
:type oauth_issuer_url: str, None
:returns: The compiled connection_info dictionary
:raises: :py:exc:`TypeError`,
:py:exc:`ValueError`
"""
# Define the two private key variables if defined
if private_key and isinstance(private_key, str):
private_key_path, private_key_file = core_utils.split_file_path(private_key)
else:
private_key_path, private_key_file = None, None
# Define OAuth private-key file path variables if defined
if oauth_private_key and isinstance(oauth_private_key, str):
oauth_private_key_path, oauth_private_key_file = core_utils.split_file_path(oauth_private_key)
else:
oauth_private_key_path, oauth_private_key_file = None, None
# Validate the OAuth private key JWK payload when provided
if oauth_private_key_jwk is not None and not isinstance(oauth_private_key_jwk, (dict, str)):
_error_msg = (
"The 'oauth_private_key_jwk' parameter must be supplied as a dictionary or string "
f'(provided: {type(oauth_private_key_jwk)})'
)
logger.error(_error_msg)
raise TypeError(_error_msg)
# Define and normalize the OAuth scope
# TODO: Check the helper settings and environment variables for scope presets
oauth_scope = auth._get_scope_from_preset(oauth_scope_preset, oauth_scope)
oauth_scope = core_utils.normalize_oauth_scope(oauth_scope)
# Prepare the admin_base_url value in order to construct the issuer_url value
if base_url and admin_base_url:
if base_url == admin_base_url:
logger.debug("The 'base_url' argument is not needed if 'admin_base_url' is defined")
else:
logger.warning("The 'base_url' and 'admin_base_url' values do not match and the latter will be used")
admin_base_url = core_utils.get_base_url(admin_base_url)
elif base_url and not admin_base_url:
admin_base_url = core_utils.get_base_url(base_url)
elif admin_base_url:
admin_base_url = core_utils.get_base_url(admin_base_url)
# Normalize the auth_base_url value for OAuth issuer inference if defined
if auth_base_url:
auth_base_url = core_utils.get_base_url(auth_base_url)
# Normalize and validate oauth_api_type when provided
if oauth_api_type is None:
oauth_api_type = const.AUTH_API_TYPE
if not isinstance(oauth_api_type, str):
_error_msg = f"The 'oauth_api_type' parameter must be a string (provided: {type(oauth_api_type)})"
logger.error(_error_msg)
raise TypeError(_error_msg)
oauth_api_type = oauth_api_type.strip().lower()
if oauth_api_type not in const.VALID_API_TYPES:
_valid_values = ','.join(sorted(const.VALID_API_TYPES))
_error_msg = f"The 'oauth_api_type' value '{oauth_api_type}' is invalid (Expected one of: {_valid_values})"
logger.error(_error_msg)
raise ValueError(_error_msg)
# Validate and normalize an explicitly provided OAuth issuer URL if defined
if oauth_issuer_url is not None and not isinstance(oauth_issuer_url, str):
_error_msg = f"The 'oauth_issuer_url' parameter must be a string (provided: {type(oauth_issuer_url)})"
logger.error(_error_msg)
raise TypeError(_error_msg)
oauth_issuer_url = oauth_issuer_url.strip() if isinstance(oauth_issuer_url, str) else None
if oauth_issuer_url:
_parsed_issuer_url = urllib.parse.urlparse(oauth_issuer_url)
if not _parsed_issuer_url.netloc or not _parsed_issuer_url.scheme:
_error_msg = f"The provided OAuth issuer URL '{oauth_issuer_url}' is invalid"
logger.error(_error_msg)
raise ValueError(_error_msg)
issuer_url = core_utils.remove_ending_slash(oauth_issuer_url)
else:
# Infer an auth base URL from the admin base URL when the auth URL is not explicitly defined
if not auth_base_url and admin_base_url:
auth_base_url = _infer_auth_base_url_from_admin_base_url(admin_base_url)
# Define the issuer URL and compile the connection info
if oauth_api_type == const.AUTH_API_TYPE:
issuer_base_url = auth_base_url if auth_base_url else admin_base_url
else:
issuer_base_url = admin_base_url if admin_base_url else auth_base_url
issuer_url = const.URLS.OAUTH.format(base_url=issuer_base_url) if issuer_base_url else None
connection_info = {
const.CONNECTION_INFO.LEGACY: {
const.CONNECTION_INFO.LEGACY_ACCESS_ID: legacy_access_id,
const.CONNECTION_INFO.LEGACY_PRIVATE_KEY_PATH: private_key_path,
const.CONNECTION_INFO.LEGACY_PRIVATE_KEY_FILE: private_key_file,
},
const.CONNECTION_INFO.OAUTH: {
const.CONNECTION_INFO.OAUTH_ISSUER_URL: issuer_url,
const.CONNECTION_INFO.OAUTH_CLIENT_ID: oauth_client_id,
const.CONNECTION_INFO.OAUTH_SCOPE: oauth_scope,
const.CONNECTION_INFO.OAUTH_GRANT_TYPE: const.CONNECTION_INFO.OAUTH_DEFAULT_GRANT_TYPE,
const.CONNECTION_INFO.OAUTH_CLIENT_AUTHENTICATION: const.CONNECTION_INFO.OAUTH_DEFAULT_CLIENT_AUTH,
const.CONNECTION_INFO.OAUTH_PRIVATE_KEY_PATH: oauth_private_key_path,
const.CONNECTION_INFO.OAUTH_PRIVATE_KEY_FILE: oauth_private_key_file,
const.CONNECTION_INFO.OAUTH_PRIVATE_KEY_JWK: oauth_private_key_jwk,
},
}
return connection_info
def _infer_auth_base_url_from_admin_base_url(_admin_base_url: Optional[str]) -> Optional[str]:
"""Infer an Authentication API base URL from a matching Administration API base URL."""
if not isinstance(_admin_base_url, str) or not _admin_base_url:
return None
try:
_normalized_admin_base_url = core_utils.get_base_url(_admin_base_url)
except Exception as _exc:
_exc_type = core_utils.get_exception_type(_exc)
_error_msg = f'Failed to infer Authentication API base URL from the Administration due to {_exc_type} exception: {_exc}'
logger.error(_error_msg)
return None
if '.access.' not in _normalized_admin_base_url:
return None
return _normalized_admin_base_url.replace('.access.', '.auth.', 1)