Skip to content

VaultBackend API

VaultBackend(vault_url: Optional[str] = None, vault_token: Optional[str] = None, vault_role_id: Optional[str] = None, vault_secret_id: Optional[str] = None, service_name: str = 'default_service', mount_point: Optional[str] = None, proxies: Optional[Dict[str, str]] = None, cert: Optional[str] = None, log_level: Union[LogLevel, str] = LogLevel.WARNING, logger: Optional[PyLogShield] = None, mask: bool = True, persist: bool = False)

Bases: BaseSecretBackend

HashiCorp Vault KV-v2 secret backend.

METHOD DESCRIPTION
add_secret

Add or update a secret in Vault (creates a new KV-v2 version).

get_secret

Retrieve a secret by name.

update_secret

Update an existing secret.

delete_secret

Permanently delete a secret and all its versions.

list_secrets

List secret keys under path.

get_config

Return the KV engine configuration for the current mount point.

read_secret_metadata

Return metadata and version info for name.

delete_secret_versions

Soft-delete specific versions of name.

undelete_secret_versions

Restore soft-deleted versions of name.

destroy_secret_versions

Permanently destroy specific versions of name.

Source code in src/credential_bridge/backends/vault.py
Python
def __init__(
    self,
    vault_url: Optional[str] = None,
    vault_token: Optional[str] = None,
    vault_role_id: Optional[str] = None,
    vault_secret_id: Optional[str] = None,
    service_name: str = "default_service",
    mount_point: Optional[str] = None,
    proxies: Optional[Dict[str, str]] = None,
    cert: Optional[str] = None,
    log_level: Union[LogLevel, str] = LogLevel.WARNING,
    logger: Optional[PyLogShield] = None,
    mask: bool = True,
    persist: bool = False,  # opt-in credential persistence to ~/.vault_config.json
) -> None:
    # --- Fail fast: resolve vault address before any other setup ---
    config = load_config()
    if vault_url:
        self.vault_addr = vault_url
    else:
        self.vault_addr = os.environ.get("VAULT_ADDR") or config.get("vault_addr")  # type: ignore[assignment]

    if not self.vault_addr:
        raise ConfigurationError(
            "Vault address must be provided via the vault_url argument, "
            "the VAULT_ADDR environment variable, or ~/.vault_config.json"
        )

    self.mask = mask
    self.service_name = service_name
    self.mount_point = mount_point if mount_point is not None else _safe_getuser()
    self.cert = cert  # None means use system CA bundle; path string means custom cert
    self.proxies = proxies

    if logger and not isinstance(logger, PyLogShield):
        raise ConfigurationError("logger must be a PyLogShield instance. Use: from pylogshield import PyLogShield")
    self.logger = logger or get_logger(name="credential_bridge", log_level=log_level, force=True)

    self.session = get_session(cert, proxies)

    # --- Resolve credentials (args override config) ---
    self.vault_token = vault_token or config.get("vault_token")
    self.vault_role_id = vault_role_id or config.get("vault_role_id")
    self.vault_secret_id = vault_secret_id or config.get("vault_secret_id")

    # Token and AppRole are mutually exclusive
    if self.vault_token and (self.vault_role_id or self.vault_secret_id):
        raise ConfigurationError("Provide either a Vault token or AppRole credentials, not both.")

    # At least one auth method must be present
    if not self.vault_token and not (self.vault_role_id and self.vault_secret_id):
        raise ConfigurationError(
            "No authentication method provided. Please provide either a token or AppRole credentials."
        )

    # --- Persist credentials to config (opt-in) ---
    if persist:
        if vault_token:
            config["vault_token"] = vault_token
            config["vault_role_id"] = None
            config["vault_secret_id"] = None
        elif vault_role_id and vault_secret_id:
            config["vault_role_id"] = vault_role_id
            config["vault_secret_id"] = vault_secret_id
            config["vault_token"] = None

        if vault_url:
            config["vault_addr"] = vault_url

        save_config(config)

    self.client = self._authenticate()

add_secret(name: str, secret: Dict[str, Any]) -> None

Add or update a secret in Vault (creates a new KV-v2 version).

Source code in src/credential_bridge/backends/vault.py
Python
def add_secret(self, name: str, secret: Dict[str, Any]) -> None:
    """Add or update a secret in Vault (creates a new KV-v2 version)."""
    self._refresh_token_if_needed()
    with self._vault_call(f"Failed to add secret '{name}'", name=name):
        self.client.secrets.kv.v2.create_or_update_secret(
            path=name,
            secret=secret,
            mount_point=self.mount_point,
        )
    self.logger.info(f"Secret added: {name}")

get_secret(name: str) -> Dict[str, Any]

Retrieve a secret by name.

Source code in src/credential_bridge/backends/vault.py
Python
def get_secret(self, name: str) -> Dict[str, Any]:
    """Retrieve a secret by *name*."""
    self._refresh_token_if_needed()
    with self._vault_call(f"Failed to get secret '{name}'", name=name):
        response = self.client.secrets.kv.v2.read_secret(
            path=name,
            mount_point=self.mount_point,
        )
    return response["data"]["data"]  # type: ignore[no-any-return]

update_secret(name: str, secret: Dict[str, Any]) -> None

Update an existing secret.

Source code in src/credential_bridge/backends/vault.py
Python
def update_secret(self, name: str, secret: Dict[str, Any]) -> None:
    """Update an existing secret."""
    self._refresh_token_if_needed()
    with self._vault_call(f"Failed to update secret '{name}'", name=name):
        self.client.secrets.kv.v2.patch(
            path=name,
            secret=secret,
            mount_point=self.mount_point,
        )
    self.logger.info(f"Secret updated: {name}")

delete_secret(name: str) -> None

Permanently delete a secret and all its versions.

Source code in src/credential_bridge/backends/vault.py
Python
def delete_secret(self, name: str) -> None:
    """Permanently delete a secret and all its versions."""
    self._refresh_token_if_needed()
    with self._vault_call(f"Failed to delete secret '{name}'", name=name):
        self.client.secrets.kv.v2.delete_metadata_and_all_versions(
            path=name,
            mount_point=self.mount_point,
        )
    self.logger.info(f"Secret deleted: {name}")

list_secrets(path: str = '') -> List[str]

List secret keys under path.

Source code in src/credential_bridge/backends/vault.py
Python
def list_secrets(self, path: str = "") -> List[str]:
    """List secret keys under *path*."""
    self._refresh_token_if_needed()
    with self._vault_call(f"Failed to list secrets at '{path}'"):
        response = self.client.secrets.kv.v2.list_secrets(
            path=path,
            mount_point=self.mount_point,
        )
    return response["data"]["keys"]  # type: ignore[no-any-return]

get_config() -> Optional[Dict[str, Any]]

Return the KV engine configuration for the current mount point.

Source code in src/credential_bridge/backends/vault.py
Python
def get_config(self) -> Optional[Dict[str, Any]]:
    """Return the KV engine configuration for the current mount point."""
    self._refresh_token_if_needed()
    with self._vault_call(f"Failed to read config for mount '{self.mount_point}'"):
        return self.client.secrets.kv.v2.read_configuration(mount_point=self.mount_point)  # type: ignore[no-any-return]

read_secret_metadata(name: str) -> Optional[Dict[str, Any]]

Return metadata and version info for name.

Source code in src/credential_bridge/backends/vault.py
Python
def read_secret_metadata(self, name: str) -> Optional[Dict[str, Any]]:
    """Return metadata and version info for *name*."""
    self._refresh_token_if_needed()
    with self._vault_call(f"Failed to read metadata for '{name}'", name=name):
        return self.client.secrets.kv.v2.read_secret_metadata(  # type: ignore[no-any-return]
            path=name,
            mount_point=self.mount_point,
        )

delete_secret_versions(name: str, versions: List[int]) -> None

Soft-delete specific versions of name.

Source code in src/credential_bridge/backends/vault.py
Python
def delete_secret_versions(self, name: str, versions: List[int]) -> None:
    """Soft-delete specific versions of *name*."""
    self._refresh_token_if_needed()
    with self._vault_call(f"Failed to delete versions {versions} of '{name}'"):
        self.client.secrets.kv.v2.delete_secret_versions(
            path=name,
            versions=versions,
            mount_point=self.mount_point,
        )
    self.logger.info(f"Soft-deleted versions {versions} of '{name}'.")

undelete_secret_versions(name: str, versions: List[int]) -> None

Restore soft-deleted versions of name.

Source code in src/credential_bridge/backends/vault.py
Python
def undelete_secret_versions(self, name: str, versions: List[int]) -> None:
    """Restore soft-deleted versions of *name*."""
    self._refresh_token_if_needed()
    with self._vault_call(f"Failed to undelete versions {versions} of '{name}'"):
        self.client.secrets.kv.v2.undelete_secret_versions(
            path=name,
            versions=versions,
            mount_point=self.mount_point,
        )
    self.logger.info(f"Undeleted versions {versions} of '{name}'.")

destroy_secret_versions(name: str, versions: List[int]) -> None

Permanently destroy specific versions of name.

Source code in src/credential_bridge/backends/vault.py
Python
def destroy_secret_versions(self, name: str, versions: List[int]) -> None:
    """Permanently destroy specific versions of *name*."""
    self._refresh_token_if_needed()
    with self._vault_call(f"Failed to destroy versions {versions} of '{name}'"):
        self.client.secrets.kv.v2.destroy_secret_versions(
            path=name,
            versions=versions,
            mount_point=self.mount_point,
        )
    self.logger.info(f"Destroyed versions {versions} of '{name}'.")