Skip to content

API Reference

control_policy

add_api_key_header

AddApiKeyHeaderPolicy

Bases: ControlPolicy

Source code in luthien_control/control_policy/add_api_key_header.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class AddApiKeyHeaderPolicy(ControlPolicy):
    """Adds the configured OpenAI API key to the request Authorization header."""

    def __init__(self, name: Optional[str] = None):
        """Initializes the policy."""
        self.name = name or self.__class__.__name__
        self.logger = logging.getLogger(__name__)
        # log to file /tmp/luthien_control.log, everything INFO and above
        self.logger.addHandler(logging.FileHandler("luthien_control.log"))
        self.logger.setLevel(logging.INFO)

    async def apply(
        self,
        context: TransactionContext,
        container: DependencyContainer,
        session: AsyncSession,
    ) -> TransactionContext:
        """
        Adds the Authorization: Bearer <api_key> header to the context.request.

        Reads OpenAI API key from settings via the container.
        Requires the DependencyContainer and AsyncSession in signature for interface compliance,
        but session is not directly used in this policy's logic.

        Raises:
            NoRequestError if the request is not found in the context.
            ApiKeyNotFoundError if the OpenAI API key is not configured.

        Args:
            context: The current transaction context.
            container: The application dependency container.
            session: An active SQLAlchemy AsyncSession (unused).

        Returns:
            The potentially modified transaction context.
        """
        if context.request is None:
            raise NoRequestError(f"[{context.transaction_id}] No request in context.")
        settings = container.settings
        api_key = settings.get_openai_api_key()
        if not api_key:
            context.response = httpx.Response(
                status_code=500,
                json={"detail": "Server configuration error: OpenAI API key not configured"},
            )
            raise ApiKeyNotFoundError(f"[{context.transaction_id}] OpenAI API key not configured ({self.name}).")
        self.logger.info(f"[{context.transaction_id}] Adding Authorization header for OpenAI key ({self.name}).")
        context.request.headers["Authorization"] = f"Bearer {api_key}"
        self.logger.info(
            f"[{context.transaction_id}] Authorization header added: {context.request.headers['Authorization']}"
        )
        return context

    def serialize(self) -> SerializableDict:
        """Serializes config. Returns base info as no instance-specific config needed."""
        return cast(SerializableDict, {"name": self.name})

    @classmethod
    def from_serialized(cls, config: SerializableDict) -> "AddApiKeyHeaderPolicy":
        """
        Constructs the policy from serialized configuration.

        Args:
            config: Dictionary possibly containing 'name'.

        Returns:
            An instance of AddApiKeyHeaderPolicy.
        """
        instance_name = cast(Optional[str], config.get("name"))
        return cls(name=instance_name)

Adds the configured OpenAI API key to the request Authorization header.

__init__(name=None)
Source code in luthien_control/control_policy/add_api_key_header.py
20
21
22
23
24
25
26
def __init__(self, name: Optional[str] = None):
    """Initializes the policy."""
    self.name = name or self.__class__.__name__
    self.logger = logging.getLogger(__name__)
    # log to file /tmp/luthien_control.log, everything INFO and above
    self.logger.addHandler(logging.FileHandler("luthien_control.log"))
    self.logger.setLevel(logging.INFO)

Initializes the policy.

apply(context, container, session) async
Source code in luthien_control/control_policy/add_api_key_header.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
async def apply(
    self,
    context: TransactionContext,
    container: DependencyContainer,
    session: AsyncSession,
) -> TransactionContext:
    """
    Adds the Authorization: Bearer <api_key> header to the context.request.

    Reads OpenAI API key from settings via the container.
    Requires the DependencyContainer and AsyncSession in signature for interface compliance,
    but session is not directly used in this policy's logic.

    Raises:
        NoRequestError if the request is not found in the context.
        ApiKeyNotFoundError if the OpenAI API key is not configured.

    Args:
        context: The current transaction context.
        container: The application dependency container.
        session: An active SQLAlchemy AsyncSession (unused).

    Returns:
        The potentially modified transaction context.
    """
    if context.request is None:
        raise NoRequestError(f"[{context.transaction_id}] No request in context.")
    settings = container.settings
    api_key = settings.get_openai_api_key()
    if not api_key:
        context.response = httpx.Response(
            status_code=500,
            json={"detail": "Server configuration error: OpenAI API key not configured"},
        )
        raise ApiKeyNotFoundError(f"[{context.transaction_id}] OpenAI API key not configured ({self.name}).")
    self.logger.info(f"[{context.transaction_id}] Adding Authorization header for OpenAI key ({self.name}).")
    context.request.headers["Authorization"] = f"Bearer {api_key}"
    self.logger.info(
        f"[{context.transaction_id}] Authorization header added: {context.request.headers['Authorization']}"
    )
    return context

Adds the Authorization: Bearer header to the context.request.

Reads OpenAI API key from settings via the container. Requires the DependencyContainer and AsyncSession in signature for interface compliance, but session is not directly used in this policy's logic.

Parameters:

Name Type Description Default
context TransactionContext

The current transaction context.

required
container DependencyContainer

The application dependency container.

required
session AsyncSession

An active SQLAlchemy AsyncSession (unused).

required

Returns:

Type Description
TransactionContext

The potentially modified transaction context.

from_serialized(config) classmethod
Source code in luthien_control/control_policy/add_api_key_header.py
74
75
76
77
78
79
80
81
82
83
84
85
86
@classmethod
def from_serialized(cls, config: SerializableDict) -> "AddApiKeyHeaderPolicy":
    """
    Constructs the policy from serialized configuration.

    Args:
        config: Dictionary possibly containing 'name'.

    Returns:
        An instance of AddApiKeyHeaderPolicy.
    """
    instance_name = cast(Optional[str], config.get("name"))
    return cls(name=instance_name)

Constructs the policy from serialized configuration.

Parameters:

Name Type Description Default
config SerializableDict

Dictionary possibly containing 'name'.

required

Returns:

Type Description
AddApiKeyHeaderPolicy

An instance of AddApiKeyHeaderPolicy.

serialize()
Source code in luthien_control/control_policy/add_api_key_header.py
70
71
72
def serialize(self) -> SerializableDict:
    """Serializes config. Returns base info as no instance-specific config needed."""
    return cast(SerializableDict, {"name": self.name})

Serializes config. Returns base info as no instance-specific config needed.

add_api_key_header_from_env

Add an API key header, where the key is sourced from a configured environment variable.

This policy is used to add an API key to the request Authorization header. The API key is read from an environment variable whose name is configured when the policy is instantiated.

AddApiKeyHeaderFromEnvPolicy

Bases: ControlPolicy

Source code in luthien_control/control_policy/add_api_key_header_from_env.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class AddApiKeyHeaderFromEnvPolicy(ControlPolicy):
    """Adds an API key to the request Authorization header.
    The API key is read from an environment variable whose name is configured
    when the policy is instantiated.
    """

    def __init__(self, api_key_env_var_name: str, name: Optional[str] = None):
        """Initializes the policy.

        Args:
            api_key_env_var_name: The name of the environment variable that holds the API key.
            name: Optional name for this policy instance.
        """
        if not api_key_env_var_name:
            raise ValueError("api_key_env_var_name cannot be empty.")

        self.name = name or self.__class__.__name__
        self.api_key_env_var_name = api_key_env_var_name
        self.logger = logging.getLogger(__name__)

    async def apply(
        self,
        context: TransactionContext,
        container: DependencyContainer,
        session: AsyncSession,
    ) -> TransactionContext:
        """
        Adds the Authorization: Bearer <api_key> header to the context.request.

        The API key is read from the environment variable specified by self.api_key_env_var_name.
        Requires DependencyContainer and AsyncSession for interface compliance, but they are not
        directly used in this policy's primary logic beyond what ControlPolicy might require.

        Raises:
            NoRequestError if the request is not found in the context.
            ApiKeyNotFoundError if the configured environment variable is not set or is empty.

        Args:
            context: The current transaction context.
            container: The application dependency container (unused).
            session: An active SQLAlchemy AsyncSession (unused).

        Returns:
            The potentially modified transaction context.
        """
        if context.request is None:
            raise NoRequestError(f"[{context.transaction_id}] No request in context.")

        api_key = os.environ.get(self.api_key_env_var_name)

        if not api_key:
            error_message = (
                f"API key not found. Environment variable '{self.api_key_env_var_name}' is not set or is empty."
            )
            self.logger.error(f"[{context.transaction_id}] {error_message} ({self.name})")
            context.response = httpx.Response(
                status_code=500,
                json={"detail": f"Server configuration error: {error_message}"},
            )
            raise ApiKeyNotFoundError(f"[{context.transaction_id}] {error_message} ({self.name})")

        self.logger.info(
            f"[{context.transaction_id}] Adding Authorization header from env var "
            f"'{self.api_key_env_var_name}' ({self.name})."
        )
        context.request.headers["Authorization"] = f"Bearer {api_key}"
        return context

    def serialize(self) -> SerializableDict:
        """Serializes the policy's configuration."""
        return cast(
            SerializableDict,
            {
                "name": self.name,
                "api_key_env_var_name": self.api_key_env_var_name,
            },
        )

    @classmethod
    def from_serialized(cls, config: SerializableDict) -> "AddApiKeyHeaderFromEnvPolicy":
        """
        Constructs the policy from serialized configuration.

        Args:
            config: Dictionary expecting 'api_key_env_var_name' and optionally 'name'.

        Returns:
            An instance of AddApiKeyHeaderFromEnvPolicy.

        Raises:
            TypeError if 'name' is not a string.
            KeyError if 'api_key_env_var_name' is not in config.
        """
        instance_name = str(config.get("name"))
        api_key_env_var_name = config.get("api_key_env_var_name")

        if api_key_env_var_name is None:
            raise KeyError("Configuration for AddApiKeyHeaderFromEnvPolicy is missing 'api_key_env_var_name'.")
        if not isinstance(api_key_env_var_name, str):
            raise TypeError(f"API key environment variable name '{api_key_env_var_name}' is not a string.")

        return cls(
            api_key_env_var_name=api_key_env_var_name,
            name=instance_name,
        )

Adds an API key to the request Authorization header. The API key is read from an environment variable whose name is configured when the policy is instantiated.

__init__(api_key_env_var_name, name=None)
Source code in luthien_control/control_policy/add_api_key_header_from_env.py
33
34
35
36
37
38
39
40
41
42
43
44
45
def __init__(self, api_key_env_var_name: str, name: Optional[str] = None):
    """Initializes the policy.

    Args:
        api_key_env_var_name: The name of the environment variable that holds the API key.
        name: Optional name for this policy instance.
    """
    if not api_key_env_var_name:
        raise ValueError("api_key_env_var_name cannot be empty.")

    self.name = name or self.__class__.__name__
    self.api_key_env_var_name = api_key_env_var_name
    self.logger = logging.getLogger(__name__)

Initializes the policy.

Parameters:

Name Type Description Default
api_key_env_var_name str

The name of the environment variable that holds the API key.

required
name Optional[str]

Optional name for this policy instance.

None
apply(context, container, session) async
Source code in luthien_control/control_policy/add_api_key_header_from_env.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
async def apply(
    self,
    context: TransactionContext,
    container: DependencyContainer,
    session: AsyncSession,
) -> TransactionContext:
    """
    Adds the Authorization: Bearer <api_key> header to the context.request.

    The API key is read from the environment variable specified by self.api_key_env_var_name.
    Requires DependencyContainer and AsyncSession for interface compliance, but they are not
    directly used in this policy's primary logic beyond what ControlPolicy might require.

    Raises:
        NoRequestError if the request is not found in the context.
        ApiKeyNotFoundError if the configured environment variable is not set or is empty.

    Args:
        context: The current transaction context.
        container: The application dependency container (unused).
        session: An active SQLAlchemy AsyncSession (unused).

    Returns:
        The potentially modified transaction context.
    """
    if context.request is None:
        raise NoRequestError(f"[{context.transaction_id}] No request in context.")

    api_key = os.environ.get(self.api_key_env_var_name)

    if not api_key:
        error_message = (
            f"API key not found. Environment variable '{self.api_key_env_var_name}' is not set or is empty."
        )
        self.logger.error(f"[{context.transaction_id}] {error_message} ({self.name})")
        context.response = httpx.Response(
            status_code=500,
            json={"detail": f"Server configuration error: {error_message}"},
        )
        raise ApiKeyNotFoundError(f"[{context.transaction_id}] {error_message} ({self.name})")

    self.logger.info(
        f"[{context.transaction_id}] Adding Authorization header from env var "
        f"'{self.api_key_env_var_name}' ({self.name})."
    )
    context.request.headers["Authorization"] = f"Bearer {api_key}"
    return context

Adds the Authorization: Bearer header to the context.request.

The API key is read from the environment variable specified by self.api_key_env_var_name. Requires DependencyContainer and AsyncSession for interface compliance, but they are not directly used in this policy's primary logic beyond what ControlPolicy might require.

Parameters:

Name Type Description Default
context TransactionContext

The current transaction context.

required
container DependencyContainer

The application dependency container (unused).

required
session AsyncSession

An active SQLAlchemy AsyncSession (unused).

required

Returns:

Type Description
TransactionContext

The potentially modified transaction context.

from_serialized(config) classmethod
Source code in luthien_control/control_policy/add_api_key_header_from_env.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
@classmethod
def from_serialized(cls, config: SerializableDict) -> "AddApiKeyHeaderFromEnvPolicy":
    """
    Constructs the policy from serialized configuration.

    Args:
        config: Dictionary expecting 'api_key_env_var_name' and optionally 'name'.

    Returns:
        An instance of AddApiKeyHeaderFromEnvPolicy.

    Raises:
        TypeError if 'name' is not a string.
        KeyError if 'api_key_env_var_name' is not in config.
    """
    instance_name = str(config.get("name"))
    api_key_env_var_name = config.get("api_key_env_var_name")

    if api_key_env_var_name is None:
        raise KeyError("Configuration for AddApiKeyHeaderFromEnvPolicy is missing 'api_key_env_var_name'.")
    if not isinstance(api_key_env_var_name, str):
        raise TypeError(f"API key environment variable name '{api_key_env_var_name}' is not a string.")

    return cls(
        api_key_env_var_name=api_key_env_var_name,
        name=instance_name,
    )

Constructs the policy from serialized configuration.

Parameters:

Name Type Description Default
config SerializableDict

Dictionary expecting 'api_key_env_var_name' and optionally 'name'.

required

Returns:

Type Description
AddApiKeyHeaderFromEnvPolicy

An instance of AddApiKeyHeaderFromEnvPolicy.

serialize()
Source code in luthien_control/control_policy/add_api_key_header_from_env.py
 95
 96
 97
 98
 99
100
101
102
103
def serialize(self) -> SerializableDict:
    """Serializes the policy's configuration."""
    return cast(
        SerializableDict,
        {
            "name": self.name,
            "api_key_env_var_name": self.api_key_env_var_name,
        },
    )

Serializes the policy's configuration.

branching_policy

BranchingPolicy

Bases: ControlPolicy

Source code in luthien_control/control_policy/branching_policy.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class BranchingPolicy(ControlPolicy):
    def __init__(
        self,
        cond_to_policy_map: OrderedDict[Condition, ControlPolicy],
        default_policy: Optional[ControlPolicy] = None,
        name: Optional[str] = None,
    ):
        self.name = name
        self.cond_to_policy_map = cond_to_policy_map
        self.default_policy = default_policy

    async def apply(
        self, context: TransactionContext, container: DependencyContainer, session: AsyncSession
    ) -> TransactionContext:
        """
        Apply the first policy that matches the condition. If no condition matches, apply the default policy (if set).

        Args:
            context: The transaction context to apply the policy to.
            container: The dependency container.
            session: The database session.

        Returns:
            The potentially modified transaction context.
        """
        for cond, policy in self.cond_to_policy_map.items():
            if cond.evaluate(context):
                return await policy.apply(context, container, session)
        if self.default_policy:
            return await self.default_policy.apply(context, container, session)
        return context

    def serialize(self) -> SerializableDict:
        result: SerializableDict = {
            "type": "branching",
            "cond_to_policy_map": {
                json.dumps(cond.serialize()): policy.serialize() for cond, policy in self.cond_to_policy_map.items()
            },
            "default_policy": self.default_policy.serialize() if self.default_policy else None,
        }
        if self.name is not None:
            result["name"] = self.name
        return result

    @classmethod
    def from_serialized(cls, config: SerializableDict) -> "BranchingPolicy":
        cond_to_policy_map = OrderedDict()

        serialized_cond_map = config.get("cond_to_policy_map")
        if not isinstance(serialized_cond_map, dict):
            raise TypeError(
                f"Expected 'cond_to_policy_map' to be a dict in BranchingPolicy config, got {type(serialized_cond_map)}"
            )

        # The keys of serialized_cond_map are expected to be JSON strings of condition configs
        # The values are expected to be policy configs (SerializableDict)
        for cond_json_str, policy_config in serialized_cond_map.items():
            if not isinstance(cond_json_str, str):
                raise TypeError(
                    f"Condition key in 'cond_to_policy_map' must be a JSON string, got {type(cond_json_str)}"
                )

            if not isinstance(policy_config, dict):
                raise TypeError(
                    f"Policy config for condition '{cond_json_str}' must be a dict, got {type(policy_config)}"
                )

            try:
                condition_serializable_dict = json.loads(cond_json_str)
            except json.JSONDecodeError as e:
                raise ValueError(f"Failed to parse condition JSON string '{cond_json_str}': {e}")

            if not isinstance(condition_serializable_dict, dict):
                raise TypeError(
                    f"Deserialized condition config for '{cond_json_str}' must be a dict, "
                    f"got {type(condition_serializable_dict)}"
                )

            # Assuming Condition.from_serialized and ControlPolicy.from_serialized
            # expect SerializableDict (which is Dict[str, Union[SP, List[Any], Dict[str, Any], None]])
            # The isinstance(..., dict) checks are sufficient for pyright to allow passage
            # to functions expecting Dict[str, X] or Mapping[str, X].
            condition = Condition.from_serialized(condition_serializable_dict)
            policy = ControlPolicy.from_serialized(policy_config)
            cond_to_policy_map[condition] = policy

        default_policy_serializable = config.get("default_policy")
        default_policy: Optional[ControlPolicy] = None
        if default_policy_serializable is not None:
            if not isinstance(default_policy_serializable, dict):
                raise TypeError(
                    f"Expected 'default_policy' config to be a dict, got {type(default_policy_serializable)}"
                )
            default_policy = ControlPolicy.from_serialized(default_policy_serializable)

        instance_name = config.get("name")
        resolved_name: Optional[str] = None
        if instance_name is not None:
            if not isinstance(instance_name, str):
                raise TypeError(f"BranchingPolicy name must be a string, got {type(instance_name)}")
            resolved_name = instance_name

        return cls(cond_to_policy_map=cond_to_policy_map, default_policy=default_policy, name=resolved_name)
apply(context, container, session) async
Source code in luthien_control/control_policy/branching_policy.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
async def apply(
    self, context: TransactionContext, container: DependencyContainer, session: AsyncSession
) -> TransactionContext:
    """
    Apply the first policy that matches the condition. If no condition matches, apply the default policy (if set).

    Args:
        context: The transaction context to apply the policy to.
        container: The dependency container.
        session: The database session.

    Returns:
        The potentially modified transaction context.
    """
    for cond, policy in self.cond_to_policy_map.items():
        if cond.evaluate(context):
            return await policy.apply(context, container, session)
    if self.default_policy:
        return await self.default_policy.apply(context, container, session)
    return context

Apply the first policy that matches the condition. If no condition matches, apply the default policy (if set).

Parameters:

Name Type Description Default
context TransactionContext

The transaction context to apply the policy to.

required
container DependencyContainer

The dependency container.

required
session AsyncSession

The database session.

required

Returns:

Type Description
TransactionContext

The potentially modified transaction context.

client_api_key_auth

ClientApiKeyAuthPolicy

Bases: ControlPolicy

Source code in luthien_control/control_policy/client_api_key_auth.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class ClientApiKeyAuthPolicy(ControlPolicy):
    """Verifies the client API key provided in the Authorization header.

    Attributes:
        name (str): The name of this policy instance.
        logger (logging.Logger): The logger instance for this policy.
    """

    def __init__(self, name: Optional[str] = None):
        """Initializes the policy."""
        self.name = name or self.__class__.__name__
        self.logger = logging.getLogger(__name__)

    async def apply(
        self,
        context: TransactionContext,
        container: DependencyContainer,
        session: AsyncSession,
    ) -> TransactionContext:
        """
        Verifies the API key from the Authorization header in the context's request.
        Requires the DependencyContainer and an active SQLAlchemy AsyncSession.

        Raises:
            NoRequestError: If context.fastapi_request is None.
            ClientAuthenticationError: If the key is missing, invalid, or inactive.

        Args:
            context: The current transaction context.
            container: The application dependency container.
            session: An active SQLAlchemy AsyncSession.

        Returns:
            The unmodified transaction context if authentication is successful.
        """
        if context.request is None:
            raise NoRequestError(f"[{context.transaction_id}] No request in context for API key auth.")

        api_key_header_value: Optional[str] = context.request.headers.get(API_KEY_HEADER)

        if not api_key_header_value:
            self.logger.warning(f"[{context.transaction_id}] Missing API key in {API_KEY_HEADER} header.")
            context.response = httpx.Response(
                status_code=401,
                headers={"Content-Type": "application/json"},
                content=json.dumps({"detail": "Not authenticated: Missing API key."}).encode("utf-8"),
            )
            raise ClientAuthenticationNotFoundError(detail="Not authenticated: Missing API key.")

        # Strip "Bearer " prefix if present
        api_key_value = api_key_header_value
        if api_key_value.startswith(BEARER_PREFIX):
            api_key_value = api_key_value[len(BEARER_PREFIX) :]

        db_key = await get_api_key_by_value(session, api_key_value)

        if not db_key:
            self.logger.warning(
                f"[{context.transaction_id}] Invalid API key provided "
                f"(key starts with: {api_key_value[:4]}...) ({self.__class__.__name__})."
            )
            context.response = httpx.Response(
                status_code=401,
                headers={"Content-Type": "application/json"},
                content=json.dumps({"detail": "Invalid API Key"}).encode("utf-8"),
            )
            raise ClientAuthenticationError(detail="Invalid API Key")

        if not db_key.is_active:
            self.logger.warning(
                f"[{context.transaction_id}] Inactive API key provided "
                f"(Name: {db_key.name}, ID: {db_key.id}). ({self.__class__.__name__})."
            )
            context.response = httpx.Response(
                status_code=401,
                headers={"Content-Type": "application/json"},
                content=json.dumps({"detail": "Inactive API Key"}).encode("utf-8"),
            )
            raise ClientAuthenticationError(detail="Inactive API Key")

        self.logger.info(
            f"[{context.transaction_id}] Client API key authenticated successfully "
            f"(Name: {db_key.name}, ID: {db_key.id}). ({self.__class__.__name__})."
        )
        context.response = None  # Clear any previous error response set above
        return context

    def serialize(self) -> SerializableDict:
        """Serializes the policy's configuration.

        This method converts the policy's configuration into a serializable
        dictionary. For this policy, only the 'name' attribute is included
        if it has been set to a non-default value.

        Returns:
            SerializableDict: A dictionary representation of the policy's
                              configuration. It may be empty or contain a 'name' key.
        """
        return SerializableDict(name=self.name)

    @classmethod
    def from_serialized(cls, config: SerializableDict) -> "ClientApiKeyAuthPolicy":
        """
        Constructs the policy from serialized data.

        Args:
            config: The serialized configuration dictionary. May optionally
                    contain a 'name' key to set a custom name for the policy instance.

        Returns:
            An instance of ClientApiKeyAuthPolicy.
        """
        instance = cls()  # Name is set to class name by default in __init__

        instance.name = str(config.get("name"))

        return instance

Verifies the client API key provided in the Authorization header.

Attributes:

Name Type Description
name str

The name of this policy instance.

logger Logger

The logger instance for this policy.

__init__(name=None)
Source code in luthien_control/control_policy/client_api_key_auth.py
35
36
37
38
def __init__(self, name: Optional[str] = None):
    """Initializes the policy."""
    self.name = name or self.__class__.__name__
    self.logger = logging.getLogger(__name__)

Initializes the policy.

apply(context, container, session) async
Source code in luthien_control/control_policy/client_api_key_auth.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
async def apply(
    self,
    context: TransactionContext,
    container: DependencyContainer,
    session: AsyncSession,
) -> TransactionContext:
    """
    Verifies the API key from the Authorization header in the context's request.
    Requires the DependencyContainer and an active SQLAlchemy AsyncSession.

    Raises:
        NoRequestError: If context.fastapi_request is None.
        ClientAuthenticationError: If the key is missing, invalid, or inactive.

    Args:
        context: The current transaction context.
        container: The application dependency container.
        session: An active SQLAlchemy AsyncSession.

    Returns:
        The unmodified transaction context if authentication is successful.
    """
    if context.request is None:
        raise NoRequestError(f"[{context.transaction_id}] No request in context for API key auth.")

    api_key_header_value: Optional[str] = context.request.headers.get(API_KEY_HEADER)

    if not api_key_header_value:
        self.logger.warning(f"[{context.transaction_id}] Missing API key in {API_KEY_HEADER} header.")
        context.response = httpx.Response(
            status_code=401,
            headers={"Content-Type": "application/json"},
            content=json.dumps({"detail": "Not authenticated: Missing API key."}).encode("utf-8"),
        )
        raise ClientAuthenticationNotFoundError(detail="Not authenticated: Missing API key.")

    # Strip "Bearer " prefix if present
    api_key_value = api_key_header_value
    if api_key_value.startswith(BEARER_PREFIX):
        api_key_value = api_key_value[len(BEARER_PREFIX) :]

    db_key = await get_api_key_by_value(session, api_key_value)

    if not db_key:
        self.logger.warning(
            f"[{context.transaction_id}] Invalid API key provided "
            f"(key starts with: {api_key_value[:4]}...) ({self.__class__.__name__})."
        )
        context.response = httpx.Response(
            status_code=401,
            headers={"Content-Type": "application/json"},
            content=json.dumps({"detail": "Invalid API Key"}).encode("utf-8"),
        )
        raise ClientAuthenticationError(detail="Invalid API Key")

    if not db_key.is_active:
        self.logger.warning(
            f"[{context.transaction_id}] Inactive API key provided "
            f"(Name: {db_key.name}, ID: {db_key.id}). ({self.__class__.__name__})."
        )
        context.response = httpx.Response(
            status_code=401,
            headers={"Content-Type": "application/json"},
            content=json.dumps({"detail": "Inactive API Key"}).encode("utf-8"),
        )
        raise ClientAuthenticationError(detail="Inactive API Key")

    self.logger.info(
        f"[{context.transaction_id}] Client API key authenticated successfully "
        f"(Name: {db_key.name}, ID: {db_key.id}). ({self.__class__.__name__})."
    )
    context.response = None  # Clear any previous error response set above
    return context

Verifies the API key from the Authorization header in the context's request. Requires the DependencyContainer and an active SQLAlchemy AsyncSession.

Raises:

Type Description
NoRequestError

If context.fastapi_request is None.

ClientAuthenticationError

If the key is missing, invalid, or inactive.

Parameters:

Name Type Description Default
context TransactionContext

The current transaction context.

required
container DependencyContainer

The application dependency container.

required
session AsyncSession

An active SQLAlchemy AsyncSession.

required

Returns:

Type Description
TransactionContext

The unmodified transaction context if authentication is successful.

from_serialized(config) classmethod
Source code in luthien_control/control_policy/client_api_key_auth.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
@classmethod
def from_serialized(cls, config: SerializableDict) -> "ClientApiKeyAuthPolicy":
    """
    Constructs the policy from serialized data.

    Args:
        config: The serialized configuration dictionary. May optionally
                contain a 'name' key to set a custom name for the policy instance.

    Returns:
        An instance of ClientApiKeyAuthPolicy.
    """
    instance = cls()  # Name is set to class name by default in __init__

    instance.name = str(config.get("name"))

    return instance

Constructs the policy from serialized data.

Parameters:

Name Type Description Default
config SerializableDict

The serialized configuration dictionary. May optionally contain a 'name' key to set a custom name for the policy instance.

required

Returns:

Type Description
ClientApiKeyAuthPolicy

An instance of ClientApiKeyAuthPolicy.

serialize()
Source code in luthien_control/control_policy/client_api_key_auth.py
114
115
116
117
118
119
120
121
122
123
124
125
def serialize(self) -> SerializableDict:
    """Serializes the policy's configuration.

    This method converts the policy's configuration into a serializable
    dictionary. For this policy, only the 'name' attribute is included
    if it has been set to a non-default value.

    Returns:
        SerializableDict: A dictionary representation of the policy's
                          configuration. It may be empty or contain a 'name' key.
    """
    return SerializableDict(name=self.name)

Serializes the policy's configuration.

This method converts the policy's configuration into a serializable dictionary. For this policy, only the 'name' attribute is included if it has been set to a non-default value.

Returns:

Name Type Description
SerializableDict SerializableDict

A dictionary representation of the policy's configuration. It may be empty or contain a 'name' key.

conditions

Condition

Bases: ABC

Source code in luthien_control/control_policy/conditions/condition.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class Condition(abc.ABC):
    """
    Abstract base class for conditions in control policies.

    Conditions are used to evaluate whether a policy should be applied based on
    the current transaction context.
    """

    type: ClassVar[str]

    @abc.abstractmethod
    def evaluate(self, context: TransactionContext) -> bool:
        pass

    @abc.abstractmethod
    def serialize(self) -> SerializableDict:
        pass

    @classmethod
    def from_serialized(cls, serialized: SerializableDict) -> "Condition":
        """Construct a condition from a serialized configuration.

        This method acts as a dispatcher. It looks up the concrete condition class
        based on the 'type' field in the config and delegates to its from_serialized method.

        Args:
            serialized: The condition-specific configuration dictionary. It must contain
                        a 'type' key that maps to a registered condition type.

        Returns:
            An instance of the concrete condition class.

        Raises:
            ValueError: If the 'type' key is missing in config or the type is not registered.
        """
        # Moved import inside the method to break circular dependency
        from luthien_control.control_policy.conditions.registry import NAME_TO_CONDITION_CLASS

        condition_type_name_val = serialized.get("type")
        if not isinstance(condition_type_name_val, str):
            # If 'type' is missing (None) or not a string, it's an invalid configuration.
            raise ValueError(
                f"Condition configuration must include a 'type' field as a string. "
                f"Got: {condition_type_name_val!r} (type: {type(condition_type_name_val).__name__})"
            )

        target_condition_class = NAME_TO_CONDITION_CLASS.get(condition_type_name_val)
        if not target_condition_class:
            raise ValueError(
                f"Unknown condition type '{condition_type_name_val}'. "
                f"Ensure it is registered in NAME_TO_CONDITION_CLASS."
            )

        return target_condition_class.from_serialized(serialized)

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self.serialize()})"

    def __hash__(self) -> int:
        return hash(str(self))

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, self.__class__):
            return False
        return self.serialize() == other.serialize()

Abstract base class for conditions in control policies.

Conditions are used to evaluate whether a policy should be applied based on the current transaction context.

from_serialized(serialized) classmethod
Source code in luthien_control/control_policy/conditions/condition.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@classmethod
def from_serialized(cls, serialized: SerializableDict) -> "Condition":
    """Construct a condition from a serialized configuration.

    This method acts as a dispatcher. It looks up the concrete condition class
    based on the 'type' field in the config and delegates to its from_serialized method.

    Args:
        serialized: The condition-specific configuration dictionary. It must contain
                    a 'type' key that maps to a registered condition type.

    Returns:
        An instance of the concrete condition class.

    Raises:
        ValueError: If the 'type' key is missing in config or the type is not registered.
    """
    # Moved import inside the method to break circular dependency
    from luthien_control.control_policy.conditions.registry import NAME_TO_CONDITION_CLASS

    condition_type_name_val = serialized.get("type")
    if not isinstance(condition_type_name_val, str):
        # If 'type' is missing (None) or not a string, it's an invalid configuration.
        raise ValueError(
            f"Condition configuration must include a 'type' field as a string. "
            f"Got: {condition_type_name_val!r} (type: {type(condition_type_name_val).__name__})"
        )

    target_condition_class = NAME_TO_CONDITION_CLASS.get(condition_type_name_val)
    if not target_condition_class:
        raise ValueError(
            f"Unknown condition type '{condition_type_name_val}'. "
            f"Ensure it is registered in NAME_TO_CONDITION_CLASS."
        )

    return target_condition_class.from_serialized(serialized)

Construct a condition from a serialized configuration.

This method acts as a dispatcher. It looks up the concrete condition class based on the 'type' field in the config and delegates to its from_serialized method.

Parameters:

Name Type Description Default
serialized SerializableDict

The condition-specific configuration dictionary. It must contain a 'type' key that maps to a registered condition type.

required

Returns:

Type Description
Condition

An instance of the concrete condition class.

Raises:

Type Description
ValueError

If the 'type' key is missing in config or the type is not registered.

ContainsCondition

Bases: ComparisonCondition

Source code in luthien_control/control_policy/conditions/comparisons.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
class ContainsCondition(ComparisonCondition):
    """
    Condition to check if a value contains another value.

    Example:
        key = "response.completion_tokens_details"
        value = "audio_tokens"

        Matches when the `completion_tokens_details` in the response contains "audio_tokens"
    """

    type = "contains"
    comparator = contains

Condition to check if a value contains another value.

Example

key = "response.completion_tokens_details" value = "audio_tokens"

Matches when the completion_tokens_details in the response contains "audio_tokens"

EqualsCondition

Bases: ComparisonCondition

Source code in luthien_control/control_policy/conditions/comparisons.py
69
70
71
72
73
74
75
76
77
78
79
80
81
class EqualsCondition(ComparisonCondition):
    """
    Condition to check if a value is equal to another value.

    Example:
        key = "request.content.model"
        value = "gpt-4o"

        Matches when the value of `'content'` in `request.content` is "gpt-4o"
    """

    type = "equals"
    comparator = equals

Condition to check if a value is equal to another value.

Example

key = "request.content.model" value = "gpt-4o"

Matches when the value of 'content' in request.content is "gpt-4o"

GreaterThanCondition

Bases: ComparisonCondition

Source code in luthien_control/control_policy/conditions/comparisons.py
144
145
146
147
148
149
150
151
152
153
154
155
156
class GreaterThanCondition(ComparisonCondition):
    """
    Condition to check if a value is greater than another value.

    Example:
        key = "response.content.created"
        value = 1741569952  # 2025-01-01 00:00:00 UTC

        Matches when the timestamp of the response is after 2025-01-01 00:00:00 UTC
    """

    type = "greater_than"
    comparator = greater_than

Condition to check if a value is greater than another value.

Example

key = "response.content.created" value = 1741569952 # 2025-01-01 00:00:00 UTC

Matches when the timestamp of the response is after 2025-01-01 00:00:00 UTC

GreaterThanOrEqualCondition

Bases: ComparisonCondition

Source code in luthien_control/control_policy/conditions/comparisons.py
159
160
161
162
163
164
165
166
167
168
169
170
171
class GreaterThanOrEqualCondition(ComparisonCondition):
    """
    Condition to check if a value is greater than or equal to another value.

    Example:
        key = "response.content.created"
        value = 1741569952  # 2025-01-01 00:00:00 UTC

        Matches when the timestamp of the response is after or equal to 2025-01-01 00:00:00 UTC
    """

    type = "greater_than_or_equal"
    comparator = greater_than_or_equal

Condition to check if a value is greater than or equal to another value.

Example

key = "response.content.created" value = 1741569952 # 2025-01-01 00:00:00 UTC

Matches when the timestamp of the response is after or equal to 2025-01-01 00:00:00 UTC

LessThanCondition

Bases: ComparisonCondition

Source code in luthien_control/control_policy/conditions/comparisons.py
114
115
116
117
118
119
120
121
122
123
124
125
126
class LessThanCondition(ComparisonCondition):
    """
    Condition to check if a value is less than another value.

    Example:
        key = "response.content.created"
        value = 1741569952  # 2025-01-01 00:00:00 UTC

        Matches when the timestamp of the response is before 2025-01-01 00:00:00 UTC
    """

    type = "less_than"
    comparator = less_than

Condition to check if a value is less than another value.

Example

key = "response.content.created" value = 1741569952 # 2025-01-01 00:00:00 UTC

Matches when the timestamp of the response is before 2025-01-01 00:00:00 UTC

LessThanOrEqualCondition

Bases: ComparisonCondition

Source code in luthien_control/control_policy/conditions/comparisons.py
129
130
131
132
133
134
135
136
137
138
139
140
141
class LessThanOrEqualCondition(ComparisonCondition):
    """
    Condition to check if a value is less than or equal to another value.

    Example:
        key = "response.content.created"
        value = 1741569952  # 2025-01-01 00:00:00 UTC

        Matches when the timestamp of the response is before or equal to 2025-01-01 00:00:00 UTC
    """

    type = "less_than_or_equal"
    comparator = less_than_or_equal

Condition to check if a value is less than or equal to another value.

Example

key = "response.content.created" value = 1741569952 # 2025-01-01 00:00:00 UTC

Matches when the timestamp of the response is before or equal to 2025-01-01 00:00:00 UTC

NotEqualsCondition

Bases: ComparisonCondition

Source code in luthien_control/control_policy/conditions/comparisons.py
84
85
86
87
88
89
90
91
92
93
94
95
96
class NotEqualsCondition(ComparisonCondition):
    """
    Condition to check if a value is *NOT* equal to another value.

    Example:
        key = "request.content.model"
        value = "gpt-4o"

        Matches when the value of `'content'` in `request.content` is NOT "gpt-4o"
    """

    type = "not_equals"
    comparator = not_equals

Condition to check if a value is NOT equal to another value.

Example

key = "request.content.model" value = "gpt-4o"

Matches when the value of 'content' in request.content is NOT "gpt-4o"

RegexMatchCondition

Bases: ComparisonCondition

Source code in luthien_control/control_policy/conditions/comparisons.py
174
175
176
177
178
179
180
181
182
183
184
185
186
class RegexMatchCondition(ComparisonCondition):
    """
    Condition to check if a value matches a regular expression.

    Example:
        key = "request.content.model"
        value = "gpt-4"

        Matches when the model specified in the request contains "gpt-4"
    """

    type = "regex_match"
    comparator = regex_match

Condition to check if a value matches a regular expression.

Example

key = "request.content.model" value = "gpt-4"

Matches when the model specified in the request contains "gpt-4"

comparisons

ComparisonCondition

Bases: Condition, ABC

Source code in luthien_control/control_policy/conditions/comparisons.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class ComparisonCondition(Condition, ABC):
    type: ClassVar[str]  # To be defined by concrete subclasses
    comparator: ClassVar[Comparator]  # To be defined by concrete subclasses

    key: str
    value: Any

    def __init__(self, key: str, value: Any):
        """
        Args:
            key: The key to the value to compare against in the transaction context (see get_tx_value)
            value: The value to compare against the key.

        Example:
            key = "request.data.user"
            value = "John"
        """
        self.key = key
        self.value = value

    def evaluate(self, context: TransactionContext) -> bool:
        return type(self).comparator.evaluate(get_tx_value(context, self.key), self.value)

    def __repr__(self) -> str:
        return f"{type(self).__name__}(key={self.key!r}, value={self.value!r})"

    def serialize(self) -> SerializableDict:
        return {
            "type": type(self).type,
            "key": self.key,
            "comparator": COMPARATOR_TO_NAME[type(self).comparator],
            "value": self.value,
        }

    @classmethod
    def from_serialized(cls, serialized: SerializableDict) -> "ComparisonCondition":
        key_val = serialized.get("key")
        if not isinstance(key_val, str):
            raise TypeError(
                f"Configuration for {cls.__name__} is missing 'key' or it is not a string. "
                f"Got: {key_val!r} (type: {type(key_val).__name__})"
            )

        value_val = serialized.get("value")

        return cls(key=key_val, value=value_val)
__init__(key, value)
Source code in luthien_control/control_policy/conditions/comparisons.py
28
29
30
31
32
33
34
35
36
37
38
39
def __init__(self, key: str, value: Any):
    """
    Args:
        key: The key to the value to compare against in the transaction context (see get_tx_value)
        value: The value to compare against the key.

    Example:
        key = "request.data.user"
        value = "John"
    """
    self.key = key
    self.value = value

Parameters:

Name Type Description Default
key str

The key to the value to compare against in the transaction context (see get_tx_value)

required
value Any

The value to compare against the key.

required
Example

key = "request.data.user" value = "John"

ContainsCondition

Bases: ComparisonCondition

Source code in luthien_control/control_policy/conditions/comparisons.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
class ContainsCondition(ComparisonCondition):
    """
    Condition to check if a value contains another value.

    Example:
        key = "response.completion_tokens_details"
        value = "audio_tokens"

        Matches when the `completion_tokens_details` in the response contains "audio_tokens"
    """

    type = "contains"
    comparator = contains

Condition to check if a value contains another value.

Example

key = "response.completion_tokens_details" value = "audio_tokens"

Matches when the completion_tokens_details in the response contains "audio_tokens"

EqualsCondition

Bases: ComparisonCondition

Source code in luthien_control/control_policy/conditions/comparisons.py
69
70
71
72
73
74
75
76
77
78
79
80
81
class EqualsCondition(ComparisonCondition):
    """
    Condition to check if a value is equal to another value.

    Example:
        key = "request.content.model"
        value = "gpt-4o"

        Matches when the value of `'content'` in `request.content` is "gpt-4o"
    """

    type = "equals"
    comparator = equals

Condition to check if a value is equal to another value.

Example

key = "request.content.model" value = "gpt-4o"

Matches when the value of 'content' in request.content is "gpt-4o"

GreaterThanCondition

Bases: ComparisonCondition

Source code in luthien_control/control_policy/conditions/comparisons.py
144
145
146
147
148
149
150
151
152
153
154
155
156
class GreaterThanCondition(ComparisonCondition):
    """
    Condition to check if a value is greater than another value.

    Example:
        key = "response.content.created"
        value = 1741569952  # 2025-01-01 00:00:00 UTC

        Matches when the timestamp of the response is after 2025-01-01 00:00:00 UTC
    """

    type = "greater_than"
    comparator = greater_than

Condition to check if a value is greater than another value.

Example

key = "response.content.created" value = 1741569952 # 2025-01-01 00:00:00 UTC

Matches when the timestamp of the response is after 2025-01-01 00:00:00 UTC

GreaterThanOrEqualCondition

Bases: ComparisonCondition

Source code in luthien_control/control_policy/conditions/comparisons.py
159
160
161
162
163
164
165
166
167
168
169
170
171
class GreaterThanOrEqualCondition(ComparisonCondition):
    """
    Condition to check if a value is greater than or equal to another value.

    Example:
        key = "response.content.created"
        value = 1741569952  # 2025-01-01 00:00:00 UTC

        Matches when the timestamp of the response is after or equal to 2025-01-01 00:00:00 UTC
    """

    type = "greater_than_or_equal"
    comparator = greater_than_or_equal

Condition to check if a value is greater than or equal to another value.

Example

key = "response.content.created" value = 1741569952 # 2025-01-01 00:00:00 UTC

Matches when the timestamp of the response is after or equal to 2025-01-01 00:00:00 UTC

LessThanCondition

Bases: ComparisonCondition

Source code in luthien_control/control_policy/conditions/comparisons.py
114
115
116
117
118
119
120
121
122
123
124
125
126
class LessThanCondition(ComparisonCondition):
    """
    Condition to check if a value is less than another value.

    Example:
        key = "response.content.created"
        value = 1741569952  # 2025-01-01 00:00:00 UTC

        Matches when the timestamp of the response is before 2025-01-01 00:00:00 UTC
    """

    type = "less_than"
    comparator = less_than

Condition to check if a value is less than another value.

Example

key = "response.content.created" value = 1741569952 # 2025-01-01 00:00:00 UTC

Matches when the timestamp of the response is before 2025-01-01 00:00:00 UTC

LessThanOrEqualCondition

Bases: ComparisonCondition

Source code in luthien_control/control_policy/conditions/comparisons.py
129
130
131
132
133
134
135
136
137
138
139
140
141
class LessThanOrEqualCondition(ComparisonCondition):
    """
    Condition to check if a value is less than or equal to another value.

    Example:
        key = "response.content.created"
        value = 1741569952  # 2025-01-01 00:00:00 UTC

        Matches when the timestamp of the response is before or equal to 2025-01-01 00:00:00 UTC
    """

    type = "less_than_or_equal"
    comparator = less_than_or_equal

Condition to check if a value is less than or equal to another value.

Example

key = "response.content.created" value = 1741569952 # 2025-01-01 00:00:00 UTC

Matches when the timestamp of the response is before or equal to 2025-01-01 00:00:00 UTC

NotEqualsCondition

Bases: ComparisonCondition

Source code in luthien_control/control_policy/conditions/comparisons.py
84
85
86
87
88
89
90
91
92
93
94
95
96
class NotEqualsCondition(ComparisonCondition):
    """
    Condition to check if a value is *NOT* equal to another value.

    Example:
        key = "request.content.model"
        value = "gpt-4o"

        Matches when the value of `'content'` in `request.content` is NOT "gpt-4o"
    """

    type = "not_equals"
    comparator = not_equals

Condition to check if a value is NOT equal to another value.

Example

key = "request.content.model" value = "gpt-4o"

Matches when the value of 'content' in request.content is NOT "gpt-4o"

RegexMatchCondition

Bases: ComparisonCondition

Source code in luthien_control/control_policy/conditions/comparisons.py
174
175
176
177
178
179
180
181
182
183
184
185
186
class RegexMatchCondition(ComparisonCondition):
    """
    Condition to check if a value matches a regular expression.

    Example:
        key = "request.content.model"
        value = "gpt-4"

        Matches when the model specified in the request contains "gpt-4"
    """

    type = "regex_match"
    comparator = regex_match

Condition to check if a value matches a regular expression.

Example

key = "request.content.model" value = "gpt-4"

Matches when the model specified in the request contains "gpt-4"

condition

Condition

Bases: ABC

Source code in luthien_control/control_policy/conditions/condition.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class Condition(abc.ABC):
    """
    Abstract base class for conditions in control policies.

    Conditions are used to evaluate whether a policy should be applied based on
    the current transaction context.
    """

    type: ClassVar[str]

    @abc.abstractmethod
    def evaluate(self, context: TransactionContext) -> bool:
        pass

    @abc.abstractmethod
    def serialize(self) -> SerializableDict:
        pass

    @classmethod
    def from_serialized(cls, serialized: SerializableDict) -> "Condition":
        """Construct a condition from a serialized configuration.

        This method acts as a dispatcher. It looks up the concrete condition class
        based on the 'type' field in the config and delegates to its from_serialized method.

        Args:
            serialized: The condition-specific configuration dictionary. It must contain
                        a 'type' key that maps to a registered condition type.

        Returns:
            An instance of the concrete condition class.

        Raises:
            ValueError: If the 'type' key is missing in config or the type is not registered.
        """
        # Moved import inside the method to break circular dependency
        from luthien_control.control_policy.conditions.registry import NAME_TO_CONDITION_CLASS

        condition_type_name_val = serialized.get("type")
        if not isinstance(condition_type_name_val, str):
            # If 'type' is missing (None) or not a string, it's an invalid configuration.
            raise ValueError(
                f"Condition configuration must include a 'type' field as a string. "
                f"Got: {condition_type_name_val!r} (type: {type(condition_type_name_val).__name__})"
            )

        target_condition_class = NAME_TO_CONDITION_CLASS.get(condition_type_name_val)
        if not target_condition_class:
            raise ValueError(
                f"Unknown condition type '{condition_type_name_val}'. "
                f"Ensure it is registered in NAME_TO_CONDITION_CLASS."
            )

        return target_condition_class.from_serialized(serialized)

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self.serialize()})"

    def __hash__(self) -> int:
        return hash(str(self))

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, self.__class__):
            return False
        return self.serialize() == other.serialize()

Abstract base class for conditions in control policies.

Conditions are used to evaluate whether a policy should be applied based on the current transaction context.

from_serialized(serialized) classmethod
Source code in luthien_control/control_policy/conditions/condition.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@classmethod
def from_serialized(cls, serialized: SerializableDict) -> "Condition":
    """Construct a condition from a serialized configuration.

    This method acts as a dispatcher. It looks up the concrete condition class
    based on the 'type' field in the config and delegates to its from_serialized method.

    Args:
        serialized: The condition-specific configuration dictionary. It must contain
                    a 'type' key that maps to a registered condition type.

    Returns:
        An instance of the concrete condition class.

    Raises:
        ValueError: If the 'type' key is missing in config or the type is not registered.
    """
    # Moved import inside the method to break circular dependency
    from luthien_control.control_policy.conditions.registry import NAME_TO_CONDITION_CLASS

    condition_type_name_val = serialized.get("type")
    if not isinstance(condition_type_name_val, str):
        # If 'type' is missing (None) or not a string, it's an invalid configuration.
        raise ValueError(
            f"Condition configuration must include a 'type' field as a string. "
            f"Got: {condition_type_name_val!r} (type: {type(condition_type_name_val).__name__})"
        )

    target_condition_class = NAME_TO_CONDITION_CLASS.get(condition_type_name_val)
    if not target_condition_class:
        raise ValueError(
            f"Unknown condition type '{condition_type_name_val}'. "
            f"Ensure it is registered in NAME_TO_CONDITION_CLASS."
        )

    return target_condition_class.from_serialized(serialized)

Construct a condition from a serialized configuration.

This method acts as a dispatcher. It looks up the concrete condition class based on the 'type' field in the config and delegates to its from_serialized method.

Parameters:

Name Type Description Default
serialized SerializableDict

The condition-specific configuration dictionary. It must contain a 'type' key that maps to a registered condition type.

required

Returns:

Type Description
Condition

An instance of the concrete condition class.

Raises:

Type Description
ValueError

If the 'type' key is missing in config or the type is not registered.

control_policy

ControlPolicy

Bases: ABC

Source code in luthien_control/control_policy/control_policy.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class ControlPolicy(abc.ABC):
    """Abstract Base Class defining the interface for a processing step.

    Attributes:
        name (Optional[str]): An optional name for the policy instance.
            Subclasses are expected to set this, often in their `__init__` method.
            It's used for logging and identification purposes.
    """

    name: Optional[str] = None

    def __init__(self, **kwargs: Any) -> None:
        """Initializes the ControlPolicy.

        This is an abstract base class, and this constructor typically handles
        common initialization or can be overridden by subclasses.

        Args:
            **kwargs: Arbitrary keyword arguments that subclasses might use.
        """
        pass

    @abc.abstractmethod
    async def apply(
        self, context: "TransactionContext", container: "DependencyContainer", session: "AsyncSession"
    ) -> "TransactionContext":
        """
        Apply the policy to the transaction context using provided dependencies.

        Args:
            context: The current transaction context.
            container: The dependency injection container.
            session: The database session for the current request.

        Returns:
            The potentially modified transaction context.

        Raises:
            Exception: Processors may raise exceptions to halt the processing flow.
        """
        raise NotImplementedError

    @abc.abstractmethod
    def serialize(self) -> SerializableDict:
        """
        Serialize the policy's instance-specific configuration needed for reloading.

        Returns:
            A serializable dictionary containing configuration parameters.
        """
        raise NotImplementedError

    # construct from serialization
    @classmethod
    def from_serialized(cls: Type[PolicyT], config: SerializableDict) -> PolicyT:
        """
        Construct a policy from a serialized configuration and optional dependencies.

        This method acts as a dispatcher. It looks up the concrete policy class
        based on the 'type' field in the config and delegates to its from_serialized method.

        Args:
            config: The policy-specific configuration dictionary. It must contain a 'type' key
                    that maps to a registered policy type.
            **kwargs: Additional dependencies needed for instantiation, passed to the
                      concrete policy's from_serialized method.

        Returns:
            An instance of the concrete policy class.

        Raises:
            ValueError: If the 'type' key is missing in config or the type is not registered.
        """
        # Mimport inside the method to break circular dependency
        from luthien_control.control_policy.registry import POLICY_NAME_TO_CLASS

        policy_type_name_val = config.get("type")
        if not isinstance(policy_type_name_val, str):
            raise ValueError(
                f"Policy configuration must include a 'type' field as a string. "
                f"Got: {policy_type_name_val!r} (type: {type(policy_type_name_val).__name__})"
            )

        target_policy_class = POLICY_NAME_TO_CLASS.get(policy_type_name_val)
        if not target_policy_class:
            raise ValueError(
                f"Unknown policy type '{policy_type_name_val}'. Ensure it is registered in POLICY_NAME_TO_CLASS."
            )

        return cast(PolicyT, target_policy_class.from_serialized(config))

Abstract Base Class defining the interface for a processing step.

Attributes:

Name Type Description
name Optional[str]

An optional name for the policy instance. Subclasses are expected to set this, often in their __init__ method. It's used for logging and identification purposes.

__init__(**kwargs)
Source code in luthien_control/control_policy/control_policy.py
27
28
29
30
31
32
33
34
35
36
def __init__(self, **kwargs: Any) -> None:
    """Initializes the ControlPolicy.

    This is an abstract base class, and this constructor typically handles
    common initialization or can be overridden by subclasses.

    Args:
        **kwargs: Arbitrary keyword arguments that subclasses might use.
    """
    pass

Initializes the ControlPolicy.

This is an abstract base class, and this constructor typically handles common initialization or can be overridden by subclasses.

Parameters:

Name Type Description Default
**kwargs Any

Arbitrary keyword arguments that subclasses might use.

{}
apply(context, container, session) abstractmethod async
Source code in luthien_control/control_policy/control_policy.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@abc.abstractmethod
async def apply(
    self, context: "TransactionContext", container: "DependencyContainer", session: "AsyncSession"
) -> "TransactionContext":
    """
    Apply the policy to the transaction context using provided dependencies.

    Args:
        context: The current transaction context.
        container: The dependency injection container.
        session: The database session for the current request.

    Returns:
        The potentially modified transaction context.

    Raises:
        Exception: Processors may raise exceptions to halt the processing flow.
    """
    raise NotImplementedError

Apply the policy to the transaction context using provided dependencies.

Parameters:

Name Type Description Default
context TransactionContext

The current transaction context.

required
container DependencyContainer

The dependency injection container.

required
session AsyncSession

The database session for the current request.

required

Returns:

Type Description
TransactionContext

The potentially modified transaction context.

Raises:

Type Description
Exception

Processors may raise exceptions to halt the processing flow.

from_serialized(config) classmethod
Source code in luthien_control/control_policy/control_policy.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@classmethod
def from_serialized(cls: Type[PolicyT], config: SerializableDict) -> PolicyT:
    """
    Construct a policy from a serialized configuration and optional dependencies.

    This method acts as a dispatcher. It looks up the concrete policy class
    based on the 'type' field in the config and delegates to its from_serialized method.

    Args:
        config: The policy-specific configuration dictionary. It must contain a 'type' key
                that maps to a registered policy type.
        **kwargs: Additional dependencies needed for instantiation, passed to the
                  concrete policy's from_serialized method.

    Returns:
        An instance of the concrete policy class.

    Raises:
        ValueError: If the 'type' key is missing in config or the type is not registered.
    """
    # Mimport inside the method to break circular dependency
    from luthien_control.control_policy.registry import POLICY_NAME_TO_CLASS

    policy_type_name_val = config.get("type")
    if not isinstance(policy_type_name_val, str):
        raise ValueError(
            f"Policy configuration must include a 'type' field as a string. "
            f"Got: {policy_type_name_val!r} (type: {type(policy_type_name_val).__name__})"
        )

    target_policy_class = POLICY_NAME_TO_CLASS.get(policy_type_name_val)
    if not target_policy_class:
        raise ValueError(
            f"Unknown policy type '{policy_type_name_val}'. Ensure it is registered in POLICY_NAME_TO_CLASS."
        )

    return cast(PolicyT, target_policy_class.from_serialized(config))

Construct a policy from a serialized configuration and optional dependencies.

This method acts as a dispatcher. It looks up the concrete policy class based on the 'type' field in the config and delegates to its from_serialized method.

Parameters:

Name Type Description Default
config SerializableDict

The policy-specific configuration dictionary. It must contain a 'type' key that maps to a registered policy type.

required
**kwargs

Additional dependencies needed for instantiation, passed to the concrete policy's from_serialized method.

required

Returns:

Type Description
PolicyT

An instance of the concrete policy class.

Raises:

Type Description
ValueError

If the 'type' key is missing in config or the type is not registered.

serialize() abstractmethod
Source code in luthien_control/control_policy/control_policy.py
58
59
60
61
62
63
64
65
66
@abc.abstractmethod
def serialize(self) -> SerializableDict:
    """
    Serialize the policy's instance-specific configuration needed for reloading.

    Returns:
        A serializable dictionary containing configuration parameters.
    """
    raise NotImplementedError

Serialize the policy's instance-specific configuration needed for reloading.

Returns:

Type Description
SerializableDict

A serializable dictionary containing configuration parameters.

exceptions

ApiKeyNotFoundError

Bases: ControlPolicyError

Source code in luthien_control/control_policy/exceptions.py
60
61
62
63
class ApiKeyNotFoundError(ControlPolicyError):
    """Exception raised when the API key is not found in the settings."""

    pass

Exception raised when the API key is not found in the settings.

ClientAuthenticationError

Bases: ControlPolicyError

Source code in luthien_control/control_policy/exceptions.py
72
73
74
75
76
77
78
79
80
81
82
83
84
class ClientAuthenticationError(ControlPolicyError):
    """Exception raised when client API key authentication fails."""

    def __init__(self, detail: str, status_code: int = 401):
        """Initializes the ClientAuthenticationError.

        Args:
            detail (str): A detailed error message explaining the authentication failure.
            status_code (int): The HTTP status code to associate with this error.
                               Defaults to 401 (Unauthorized).
        """
        # Pass detail positionally for Exception.__str__ and keywords for ControlPolicyError attributes
        super().__init__(detail, status_code=status_code, detail=detail)

Exception raised when client API key authentication fails.

__init__(detail, status_code=401)
Source code in luthien_control/control_policy/exceptions.py
75
76
77
78
79
80
81
82
83
84
def __init__(self, detail: str, status_code: int = 401):
    """Initializes the ClientAuthenticationError.

    Args:
        detail (str): A detailed error message explaining the authentication failure.
        status_code (int): The HTTP status code to associate with this error.
                           Defaults to 401 (Unauthorized).
    """
    # Pass detail positionally for Exception.__str__ and keywords for ControlPolicyError attributes
    super().__init__(detail, status_code=status_code, detail=detail)

Initializes the ClientAuthenticationError.

Parameters:

Name Type Description Default
detail str

A detailed error message explaining the authentication failure.

required
status_code int

The HTTP status code to associate with this error. Defaults to 401 (Unauthorized).

401

ClientAuthenticationNotFoundError

Bases: ControlPolicyError

Source code in luthien_control/control_policy/exceptions.py
87
88
89
90
91
92
93
94
95
96
97
98
99
class ClientAuthenticationNotFoundError(ControlPolicyError):
    """Exception raised when the client API key is not found in the request."""

    def __init__(self, detail: str, status_code: int = 401):
        """Initializes the ClientAuthenticationNotFoundError.

        Args:
            detail (str): A detailed error message explaining why the key was not found.
            status_code (int): The HTTP status code to associate with this error.
                               Defaults to 401 (Unauthorized).
        """
        # Pass detail positionally for Exception.__str__ and keywords for ControlPolicyError attributes
        super().__init__(detail, status_code=status_code, detail=detail)

Exception raised when the client API key is not found in the request.

__init__(detail, status_code=401)
Source code in luthien_control/control_policy/exceptions.py
90
91
92
93
94
95
96
97
98
99
def __init__(self, detail: str, status_code: int = 401):
    """Initializes the ClientAuthenticationNotFoundError.

    Args:
        detail (str): A detailed error message explaining why the key was not found.
        status_code (int): The HTTP status code to associate with this error.
                           Defaults to 401 (Unauthorized).
    """
    # Pass detail positionally for Exception.__str__ and keywords for ControlPolicyError attributes
    super().__init__(detail, status_code=status_code, detail=detail)

Initializes the ClientAuthenticationNotFoundError.

Parameters:

Name Type Description Default
detail str

A detailed error message explaining why the key was not found.

required
status_code int

The HTTP status code to associate with this error. Defaults to 401 (Unauthorized).

401

ControlPolicyError

Bases: Exception

Source code in luthien_control/control_policy/exceptions.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class ControlPolicyError(Exception):
    """Base exception for all control policy errors.

    Attributes:
        policy_name (Optional[str]): The name of the policy where the error
            occurred, if specified.
        status_code (Optional[int]): An HTTP status code associated with this
            error, if specified.
        detail (Optional[str]): A detailed error message. If not provided directly
            during initialization but other arguments are, the first positional
            argument is used as the detail.
    """

    def __init__(
        self, *args, policy_name: str | None = None, status_code: int | None = None, detail: str | None = None
    ):
        """Initializes the ControlPolicyError.

        Args:
            *args: Arguments passed to the base Exception class.
            policy_name (Optional[str]): The name of the policy where the error occurred.
            status_code (Optional[int]): An HTTP status code associated with this error.
            detail (Optional[str]): A detailed error message. If not provided and `args`
                                    is not empty, the first argument in `args` is used.
        """
        super().__init__(*args)
        self.policy_name = policy_name
        self.status_code = status_code
        # Use the first arg as detail if detail kwarg is not provided and args exist
        self.detail = detail or (args[0] if args else None)

Base exception for all control policy errors.

Attributes:

Name Type Description
policy_name Optional[str]

The name of the policy where the error occurred, if specified.

status_code Optional[int]

An HTTP status code associated with this error, if specified.

detail Optional[str]

A detailed error message. If not provided directly during initialization but other arguments are, the first positional argument is used as the detail.

__init__(*args, policy_name=None, status_code=None, detail=None)
Source code in luthien_control/control_policy/exceptions.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def __init__(
    self, *args, policy_name: str | None = None, status_code: int | None = None, detail: str | None = None
):
    """Initializes the ControlPolicyError.

    Args:
        *args: Arguments passed to the base Exception class.
        policy_name (Optional[str]): The name of the policy where the error occurred.
        status_code (Optional[int]): An HTTP status code associated with this error.
        detail (Optional[str]): A detailed error message. If not provided and `args`
                                is not empty, the first argument in `args` is used.
    """
    super().__init__(*args)
    self.policy_name = policy_name
    self.status_code = status_code
    # Use the first arg as detail if detail kwarg is not provided and args exist
    self.detail = detail or (args[0] if args else None)

Initializes the ControlPolicyError.

Parameters:

Name Type Description Default
*args

Arguments passed to the base Exception class.

()
policy_name Optional[str]

The name of the policy where the error occurred.

None
status_code Optional[int]

An HTTP status code associated with this error.

None
detail Optional[str]

A detailed error message. If not provided and args is not empty, the first argument in args is used.

None

LeakedApiKeyError

Bases: ControlPolicyError

Source code in luthien_control/control_policy/exceptions.py
102
103
104
105
106
107
108
109
110
111
112
113
114
class LeakedApiKeyError(ControlPolicyError):
    """Exception raised when a leaked API key is detected."""

    def __init__(self, detail: str, status_code: int = 403):
        """Initializes the LeakedApiKeyError.

        Args:
            detail (str): A detailed error message explaining the leaked key detection.
            status_code (int): The HTTP status code to associate with this error.
                               Defaults to 403 (Forbidden).
        """
        # Pass detail positionally for Exception.__str__ and keywords for ControlPolicyError attributes
        super().__init__(detail, status_code=status_code, detail=detail)

Exception raised when a leaked API key is detected.

__init__(detail, status_code=403)
Source code in luthien_control/control_policy/exceptions.py
105
106
107
108
109
110
111
112
113
114
def __init__(self, detail: str, status_code: int = 403):
    """Initializes the LeakedApiKeyError.

    Args:
        detail (str): A detailed error message explaining the leaked key detection.
        status_code (int): The HTTP status code to associate with this error.
                           Defaults to 403 (Forbidden).
    """
    # Pass detail positionally for Exception.__str__ and keywords for ControlPolicyError attributes
    super().__init__(detail, status_code=status_code, detail=detail)

Initializes the LeakedApiKeyError.

Parameters:

Name Type Description Default
detail str

A detailed error message explaining the leaked key detection.

required
status_code int

The HTTP status code to associate with this error. Defaults to 403 (Forbidden).

403

NoRequestError

Bases: ControlPolicyError

Source code in luthien_control/control_policy/exceptions.py
66
67
68
69
class NoRequestError(ControlPolicyError):
    """Exception raised when the request object is not found in the context."""

    pass

Exception raised when the request object is not found in the context.

PolicyLoadError

Bases: ValueError, ControlPolicyError

Source code in luthien_control/control_policy/exceptions.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class PolicyLoadError(ValueError, ControlPolicyError):
    """Custom exception for errors during policy loading/instantiation."""

    # Inherit from ValueError for semantic meaning (bad value/config)
    # Inherit from ControlPolicyError for categorization
    def __init__(
        self, *args, policy_name: str | None = None, status_code: int | None = None, detail: str | None = None
    ):
        """Initializes the PolicyLoadError.

        Args:
            *args: Arguments passed to the base Exception class.
            policy_name (Optional[str]): The name of the policy that failed to load.
            status_code (Optional[int]): An HTTP status code associated with this error.
            detail (Optional[str]): A detailed error message. If not provided and `args`
                                    is not empty, the first argument in `args` is used.
        """
        # Explicitly call ControlPolicyError.__init__ to handle kwargs
        ControlPolicyError.__init__(self, *args, policy_name=policy_name, status_code=status_code, detail=detail)

Custom exception for errors during policy loading/instantiation.

__init__(*args, policy_name=None, status_code=None, detail=None)
Source code in luthien_control/control_policy/exceptions.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(
    self, *args, policy_name: str | None = None, status_code: int | None = None, detail: str | None = None
):
    """Initializes the PolicyLoadError.

    Args:
        *args: Arguments passed to the base Exception class.
        policy_name (Optional[str]): The name of the policy that failed to load.
        status_code (Optional[int]): An HTTP status code associated with this error.
        detail (Optional[str]): A detailed error message. If not provided and `args`
                                is not empty, the first argument in `args` is used.
    """
    # Explicitly call ControlPolicyError.__init__ to handle kwargs
    ControlPolicyError.__init__(self, *args, policy_name=policy_name, status_code=status_code, detail=detail)

Initializes the PolicyLoadError.

Parameters:

Name Type Description Default
*args

Arguments passed to the base Exception class.

()
policy_name Optional[str]

The name of the policy that failed to load.

None
status_code Optional[int]

An HTTP status code associated with this error.

None
detail Optional[str]

A detailed error message. If not provided and args is not empty, the first argument in args is used.

None

leaked_api_key_detection

Control Policy for detecting leaked API keys in LLM message content.

This policy inspects the 'messages' field in request bodies to prevent sensitive API keys from being sent to language models.

LeakedApiKeyDetectionPolicy

Bases: ControlPolicy

Source code in luthien_control/control_policy/leaked_api_key_detection.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class LeakedApiKeyDetectionPolicy(ControlPolicy):
    """Detects API keys that might be leaked in message content sent to LLMs."""

    # Common API key patterns
    DEFAULT_PATTERNS = [
        r"sk-[a-zA-Z0-9]{48}",  # OpenAI API key pattern
        r"xoxb-[a-zA-Z0-9\-]{50,}",  # Slack bot token pattern
        r"github_pat_[a-zA-Z0-9]{22}_[a-zA-Z0-9]{59}",  # GitHub PAT pattern
    ]

    def __init__(self, patterns: Optional[List[str]] = None, name: Optional[str] = None):
        """Initializes the policy.

        Args:
            patterns: Optional list of regex patterns to detect API keys.
                     If not provided, uses DEFAULT_PATTERNS.
            name: Optional name for this policy instance.
        """
        self.name = name or self.__class__.__name__
        self.patterns = patterns or self.DEFAULT_PATTERNS
        self.compiled_patterns: List[Pattern] = [re.compile(pattern) for pattern in self.patterns]
        self.logger = logging.getLogger(__name__)

    async def apply(
        self,
        context: TransactionContext,
        container: DependencyContainer,
        session: AsyncSession,
    ) -> TransactionContext:
        """
        Checks message content for potentially leaked API keys.

        Args:
            context: The current transaction context.
            container: The application dependency container.
            session: An active SQLAlchemy AsyncSession.

        Returns:
            The transaction context, potentially with an error response set.

        Raises:
            NoRequestError: If the request is not found in the context.
            LeakedApiKeyError: If a potential API key is detected in message content.
        """
        if context.request is None:
            raise NoRequestError(f"[{context.transaction_id}] No request in context.")

        self.logger.info(f"[{context.transaction_id}] Checking for leaked API keys in message content ({self.name}).")

        # Only look at POST requests with content
        if not hasattr(context.request, "content") or not context.request.content:
            self.logger.debug(f"[{context.transaction_id}] No content to check for API keys.")
            return context

        try:
            # Get the request body as JSON
            body_content = context.request.content.decode("utf-8")
            body_json = json.loads(body_content)

            # Check the "messages" field for leaked API keys
            if "messages" in body_json and isinstance(body_json["messages"], list):
                messages = body_json["messages"]

                # Inspect each message's content
                for message in messages:
                    if "content" in message and isinstance(message["content"], str):
                        content = message["content"]
                        if self._check_text(content):
                            error_message = (
                                "Potential API key detected in message content. "
                                "For security, the request has been blocked."
                            )
                            self.logger.warning(f"[{context.transaction_id}] {error_message} ({self.name})")

                            context.response = httpx.Response(
                                status_code=403,
                                json={"detail": error_message},
                            )
                            raise LeakedApiKeyError(detail=error_message)

        except (UnicodeDecodeError, json.JSONDecodeError):
            # If the body isn't valid JSON or text, we can't check it effectively
            self.logger.debug(f"[{context.transaction_id}] Could not decode request body as JSON.")
            pass

        return context

    def _check_text(self, text: str) -> bool:
        """
        Checks if the given text contains any patterns matching potential API keys.

        Args:
            text: The text to check.

        Returns:
            True if a potential API key is found, False otherwise.
        """
        for pattern in self.compiled_patterns:
            if pattern.search(text):
                return True
        return False

    def serialize(self) -> SerializableDict:
        """Serializes the policy's configuration."""
        return cast(
            SerializableDict,
            {
                "name": self.name,
                "patterns": self.patterns,
            },
        )

    @classmethod
    def from_serialized(cls, config: SerializableDict) -> "LeakedApiKeyDetectionPolicy":
        """
        Constructs the policy from serialized configuration.

        Args:
            config: Dictionary containing configuration options.

        Returns:
            An instance of LeakedApiKeyDetectionPolicy.

        Raises:
            ValueError: If the 'name' or 'patterns' keys are missing or incorrectly typed in the config.
        """
        resolved_name = str(config.get("name", cls.__name__))

        resolved_patterns = cast(List[str], config.get("patterns", cls.DEFAULT_PATTERNS))

        if not isinstance(resolved_patterns, list):
            raise ValueError(
                f"LeakedApiKeyDetectionPolicy 'patterns' must be a list of strings. "
                f"Got: {resolved_patterns!r} (type: {type(resolved_patterns).__name__})"
            )
        if not all(isinstance(p, str) for p in resolved_patterns):
            raise ValueError(
                f"LeakedApiKeyDetectionPolicy 'patterns' must be a list of strings. "
                f"Got: {resolved_patterns!r} (type: {type(resolved_patterns).__name__})"
            )

        return cls(
            name=resolved_name,
            patterns=resolved_patterns,
        )

Detects API keys that might be leaked in message content sent to LLMs.

__init__(patterns=None, name=None)
Source code in luthien_control/control_policy/leaked_api_key_detection.py
34
35
36
37
38
39
40
41
42
43
44
45
def __init__(self, patterns: Optional[List[str]] = None, name: Optional[str] = None):
    """Initializes the policy.

    Args:
        patterns: Optional list of regex patterns to detect API keys.
                 If not provided, uses DEFAULT_PATTERNS.
        name: Optional name for this policy instance.
    """
    self.name = name or self.__class__.__name__
    self.patterns = patterns or self.DEFAULT_PATTERNS
    self.compiled_patterns: List[Pattern] = [re.compile(pattern) for pattern in self.patterns]
    self.logger = logging.getLogger(__name__)

Initializes the policy.

Parameters:

Name Type Description Default
patterns Optional[List[str]]

Optional list of regex patterns to detect API keys. If not provided, uses DEFAULT_PATTERNS.

None
name Optional[str]

Optional name for this policy instance.

None
apply(context, container, session) async
Source code in luthien_control/control_policy/leaked_api_key_detection.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
async def apply(
    self,
    context: TransactionContext,
    container: DependencyContainer,
    session: AsyncSession,
) -> TransactionContext:
    """
    Checks message content for potentially leaked API keys.

    Args:
        context: The current transaction context.
        container: The application dependency container.
        session: An active SQLAlchemy AsyncSession.

    Returns:
        The transaction context, potentially with an error response set.

    Raises:
        NoRequestError: If the request is not found in the context.
        LeakedApiKeyError: If a potential API key is detected in message content.
    """
    if context.request is None:
        raise NoRequestError(f"[{context.transaction_id}] No request in context.")

    self.logger.info(f"[{context.transaction_id}] Checking for leaked API keys in message content ({self.name}).")

    # Only look at POST requests with content
    if not hasattr(context.request, "content") or not context.request.content:
        self.logger.debug(f"[{context.transaction_id}] No content to check for API keys.")
        return context

    try:
        # Get the request body as JSON
        body_content = context.request.content.decode("utf-8")
        body_json = json.loads(body_content)

        # Check the "messages" field for leaked API keys
        if "messages" in body_json and isinstance(body_json["messages"], list):
            messages = body_json["messages"]

            # Inspect each message's content
            for message in messages:
                if "content" in message and isinstance(message["content"], str):
                    content = message["content"]
                    if self._check_text(content):
                        error_message = (
                            "Potential API key detected in message content. "
                            "For security, the request has been blocked."
                        )
                        self.logger.warning(f"[{context.transaction_id}] {error_message} ({self.name})")

                        context.response = httpx.Response(
                            status_code=403,
                            json={"detail": error_message},
                        )
                        raise LeakedApiKeyError(detail=error_message)

    except (UnicodeDecodeError, json.JSONDecodeError):
        # If the body isn't valid JSON or text, we can't check it effectively
        self.logger.debug(f"[{context.transaction_id}] Could not decode request body as JSON.")
        pass

    return context

Checks message content for potentially leaked API keys.

Parameters:

Name Type Description Default
context TransactionContext

The current transaction context.

required
container DependencyContainer

The application dependency container.

required
session AsyncSession

An active SQLAlchemy AsyncSession.

required

Returns:

Type Description
TransactionContext

The transaction context, potentially with an error response set.

Raises:

Type Description
NoRequestError

If the request is not found in the context.

LeakedApiKeyError

If a potential API key is detected in message content.

from_serialized(config) classmethod
Source code in luthien_control/control_policy/leaked_api_key_detection.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@classmethod
def from_serialized(cls, config: SerializableDict) -> "LeakedApiKeyDetectionPolicy":
    """
    Constructs the policy from serialized configuration.

    Args:
        config: Dictionary containing configuration options.

    Returns:
        An instance of LeakedApiKeyDetectionPolicy.

    Raises:
        ValueError: If the 'name' or 'patterns' keys are missing or incorrectly typed in the config.
    """
    resolved_name = str(config.get("name", cls.__name__))

    resolved_patterns = cast(List[str], config.get("patterns", cls.DEFAULT_PATTERNS))

    if not isinstance(resolved_patterns, list):
        raise ValueError(
            f"LeakedApiKeyDetectionPolicy 'patterns' must be a list of strings. "
            f"Got: {resolved_patterns!r} (type: {type(resolved_patterns).__name__})"
        )
    if not all(isinstance(p, str) for p in resolved_patterns):
        raise ValueError(
            f"LeakedApiKeyDetectionPolicy 'patterns' must be a list of strings. "
            f"Got: {resolved_patterns!r} (type: {type(resolved_patterns).__name__})"
        )

    return cls(
        name=resolved_name,
        patterns=resolved_patterns,
    )

Constructs the policy from serialized configuration.

Parameters:

Name Type Description Default
config SerializableDict

Dictionary containing configuration options.

required

Returns:

Type Description
LeakedApiKeyDetectionPolicy

An instance of LeakedApiKeyDetectionPolicy.

Raises:

Type Description
ValueError

If the 'name' or 'patterns' keys are missing or incorrectly typed in the config.

serialize()
Source code in luthien_control/control_policy/leaked_api_key_detection.py
126
127
128
129
130
131
132
133
134
def serialize(self) -> SerializableDict:
    """Serializes the policy's configuration."""
    return cast(
        SerializableDict,
        {
            "name": self.name,
            "patterns": self.patterns,
        },
    )

Serializes the policy's configuration.

loader

load_policy(serialized_policy)

Source code in luthien_control/control_policy/loader.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def load_policy(serialized_policy: SerializedPolicy) -> "ControlPolicy":
    """
    Loads a ControlPolicy instance from a dictionary containing its name and config,
    injecting required dependencies.

    Args:
        serialized_policy: A SerializedPolicy object.

    Returns:
        An instantiated ControlPolicy object.

    Raises:
        PolicyLoadError: If the policy name is unknown, data is missing/malformed,
                         or a required dependency is not provided.
        Exception: Potentially from the policy's from_serialized method if config is invalid.
    """
    # Import the policy registry here to avoid circular import
    from .registry import POLICY_NAME_TO_CLASS  # noqa: F401

    logger = logging.getLogger(__name__)

    policy_type = serialized_policy.type
    policy_config = serialized_policy.config

    if not isinstance(policy_type, str):
        raise PolicyLoadError(f"Policy 'type' must be a string, got: {type(policy_type)}")
    if not isinstance(policy_config, dict):
        raise PolicyLoadError(f"Policy 'config' must be a dictionary, got: {type(policy_config)}")

    policy_class = POLICY_NAME_TO_CLASS.get(policy_type)

    # Explicitly check if the policy type was found in the registry
    if policy_class is None:
        raise PolicyLoadError(
            f"Unknown policy type: '{policy_type}'. Available policies: {list(POLICY_NAME_TO_CLASS.keys())}"
        )

    try:
        instance = policy_class.from_serialized(policy_config)
        logger.info(f"Successfully loaded policy: {getattr(instance, 'name', policy_type)}")
        return instance
    except Exception as e:
        logger.error(f"Error instantiating policy '{policy_type}': {e}", exc_info=True)
        raise PolicyLoadError(f"Error instantiating policy '{policy_type}': {e}") from e

Loads a ControlPolicy instance from a dictionary containing its name and config, injecting required dependencies.

Parameters:

Name Type Description Default
serialized_policy SerializedPolicy

A SerializedPolicy object.

required

Returns:

Type Description
ControlPolicy

An instantiated ControlPolicy object.

Raises:

Type Description
PolicyLoadError

If the policy name is unknown, data is missing/malformed, or a required dependency is not provided.

Exception

Potentially from the policy's from_serialized method if config is invalid.

load_policy_from_file(filepath)

Source code in luthien_control/control_policy/loader.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def load_policy_from_file(filepath: str) -> "ControlPolicy":
    """Load a policy configuration from a file and instantiate it using the control_policy loader."""
    with open(filepath, "r") as f:
        raw_policy_data = json.load(f)

    if not isinstance(raw_policy_data, dict):
        raise PolicyLoadError(f"Policy data loaded from {filepath} must be a dictionary, got {type(raw_policy_data)}")

    policy_type = raw_policy_data.get("type")
    policy_config = raw_policy_data.get("config")

    if not isinstance(policy_type, str):
        raise PolicyLoadError(
            f"Policy file {filepath} must contain a 'type' field as a string. Got: {type(policy_type)}"
        )
    if not isinstance(policy_config, dict):
        raise PolicyLoadError(
            f"Policy file {filepath} must contain a 'config' field as a dictionary. Got: {type(policy_config)}"
        )

    serialized_policy_obj = SerializedPolicy(type=policy_type, config=policy_config)
    return load_policy(serialized_policy_obj)

Load a policy configuration from a file and instantiate it using the control_policy loader.

model_name_replacement

ModelNameReplacementPolicy

Bases: ControlPolicy

Source code in luthien_control/control_policy/model_name_replacement.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class ModelNameReplacementPolicy(ControlPolicy):
    """Replaces model names in requests based on a configured mapping.

    This policy allows clients to use fake model names that will be
    replaced with real model names before the request is sent to the backend.
    This is useful for services like Cursor that assume model strings that match
    known models must route through specific endpoints.
    """

    def __init__(self, model_mapping: Dict[str, str], name: Optional[str] = None):
        """Initializes the policy with a mapping of fake to real model names.

        Args:
            model_mapping: Dictionary mapping fake model names to real model names.
            name: Optional name for this policy instance.
        """
        self.model_mapping = model_mapping
        self.name = name or self.__class__.__name__
        self.logger = logging.getLogger(__name__)

    async def apply(
        self,
        context: TransactionContext,
        container: DependencyContainer,
        session: AsyncSession,
    ) -> TransactionContext:
        """
        Replaces the model name in the request content based on the configured mapping.

        Args:
            context: The current transaction context.
            container: The application dependency container.
            session: An active SQLAlchemy AsyncSession (unused).

        Returns:
            The potentially modified transaction context.

        Raises:
            NoRequestError: If no request is found in the context.
        """
        if context.request is None:
            raise NoRequestError(f"[{context.transaction_id}] No request in context.")

        if not hasattr(context.request, "content") or not context.request.content:
            self.logger.debug(f"[{context.transaction_id}] No content to modify for model name replacement.")
            return context

        try:
            body_content = context.request.content.decode("utf-8")
            body_json = json.loads(body_content)

            if "model" in body_json:
                original_model = body_json["model"]

                if original_model in self.model_mapping:
                    new_model = self.model_mapping[original_model]
                    self.logger.info(
                        f"[{context.transaction_id}] Replacing model name: {original_model} -> {new_model}"
                    )
                    body_json["model"] = new_model

                    modified_content = json.dumps(body_json).encode("utf-8")
                    context.request = httpx.Request(
                        method=context.request.method,
                        url=context.request.url,
                        headers=context.request.headers,
                        content=modified_content,
                    )

        except (json.JSONDecodeError, UnicodeDecodeError) as e:
            self.logger.warning(f"[{context.transaction_id}] Error processing request content: {e}")

        return context

    def serialize(self) -> SerializableDict:
        """Serializes the policy configuration."""
        return cast(
            SerializableDict,
            {
                "type": "ModelNameReplacement",
                "name": self.name,
                "model_mapping": self.model_mapping,
            },
        )

    @classmethod
    def from_serialized(cls, config: SerializableDict) -> "ModelNameReplacementPolicy":
        """Constructs the policy from serialized configuration."""
        instance_name = cast(Optional[str], config.get("name"))
        model_mapping = cast(Dict[str, str], config.get("model_mapping", {}))
        return cls(model_mapping=model_mapping, name=instance_name)

Replaces model names in requests based on a configured mapping.

This policy allows clients to use fake model names that will be replaced with real model names before the request is sent to the backend. This is useful for services like Cursor that assume model strings that match known models must route through specific endpoints.

__init__(model_mapping, name=None)
Source code in luthien_control/control_policy/model_name_replacement.py
25
26
27
28
29
30
31
32
33
34
def __init__(self, model_mapping: Dict[str, str], name: Optional[str] = None):
    """Initializes the policy with a mapping of fake to real model names.

    Args:
        model_mapping: Dictionary mapping fake model names to real model names.
        name: Optional name for this policy instance.
    """
    self.model_mapping = model_mapping
    self.name = name or self.__class__.__name__
    self.logger = logging.getLogger(__name__)

Initializes the policy with a mapping of fake to real model names.

Parameters:

Name Type Description Default
model_mapping Dict[str, str]

Dictionary mapping fake model names to real model names.

required
name Optional[str]

Optional name for this policy instance.

None
apply(context, container, session) async
Source code in luthien_control/control_policy/model_name_replacement.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
async def apply(
    self,
    context: TransactionContext,
    container: DependencyContainer,
    session: AsyncSession,
) -> TransactionContext:
    """
    Replaces the model name in the request content based on the configured mapping.

    Args:
        context: The current transaction context.
        container: The application dependency container.
        session: An active SQLAlchemy AsyncSession (unused).

    Returns:
        The potentially modified transaction context.

    Raises:
        NoRequestError: If no request is found in the context.
    """
    if context.request is None:
        raise NoRequestError(f"[{context.transaction_id}] No request in context.")

    if not hasattr(context.request, "content") or not context.request.content:
        self.logger.debug(f"[{context.transaction_id}] No content to modify for model name replacement.")
        return context

    try:
        body_content = context.request.content.decode("utf-8")
        body_json = json.loads(body_content)

        if "model" in body_json:
            original_model = body_json["model"]

            if original_model in self.model_mapping:
                new_model = self.model_mapping[original_model]
                self.logger.info(
                    f"[{context.transaction_id}] Replacing model name: {original_model} -> {new_model}"
                )
                body_json["model"] = new_model

                modified_content = json.dumps(body_json).encode("utf-8")
                context.request = httpx.Request(
                    method=context.request.method,
                    url=context.request.url,
                    headers=context.request.headers,
                    content=modified_content,
                )

    except (json.JSONDecodeError, UnicodeDecodeError) as e:
        self.logger.warning(f"[{context.transaction_id}] Error processing request content: {e}")

    return context

Replaces the model name in the request content based on the configured mapping.

Parameters:

Name Type Description Default
context TransactionContext

The current transaction context.

required
container DependencyContainer

The application dependency container.

required
session AsyncSession

An active SQLAlchemy AsyncSession (unused).

required

Returns:

Type Description
TransactionContext

The potentially modified transaction context.

Raises:

Type Description
NoRequestError

If no request is found in the context.

from_serialized(config) classmethod
Source code in luthien_control/control_policy/model_name_replacement.py
101
102
103
104
105
106
@classmethod
def from_serialized(cls, config: SerializableDict) -> "ModelNameReplacementPolicy":
    """Constructs the policy from serialized configuration."""
    instance_name = cast(Optional[str], config.get("name"))
    model_mapping = cast(Dict[str, str], config.get("model_mapping", {}))
    return cls(model_mapping=model_mapping, name=instance_name)

Constructs the policy from serialized configuration.

serialize()
Source code in luthien_control/control_policy/model_name_replacement.py
90
91
92
93
94
95
96
97
98
99
def serialize(self) -> SerializableDict:
    """Serializes the policy configuration."""
    return cast(
        SerializableDict,
        {
            "type": "ModelNameReplacement",
            "name": self.name,
            "model_mapping": self.model_mapping,
        },
    )

Serializes the policy configuration.

noop_policy

NoopPolicy

Bases: ControlPolicy

Source code in luthien_control/control_policy/noop_policy.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class NoopPolicy(ControlPolicy):
    """A policy that does nothing."""

    def __init__(self, name: str = "NoopPolicy"):
        self.name = name

    async def apply(
        self, context: TransactionContext, container: DependencyContainer, session: AsyncSession
    ) -> TransactionContext:
        return context

    def serialize(self) -> SerializableDict:
        return SerializableDict(name=self.name)

    @classmethod
    def from_serialized(cls, config: SerializableDict) -> "NoopPolicy":
        return cls(name=str(config.get("name", cls.__name__)))

A policy that does nothing.

send_backend_request

SendBackendRequestPolicy

Bases: ControlPolicy

Source code in luthien_control/control_policy/send_backend_request.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
class SendBackendRequestPolicy(ControlPolicy):
    """
    Policy responsible for sending the request to the backend, storing the response,
    and reading the raw response body.

    Attributes:
        name (str): The name of this policy instance, used for logging and
            identification. It defaults to the class name if not provided
            during initialization.
    """

    _EXCLUDED_BACKEND_HEADERS = {
        b"host",
        b"transfer-encoding",
        b"accept-encoding",  # We force identity
        b"authorization",  # We add our own
    }

    def __init__(self, name: Optional[str] = None):
        self.name = name or self.__class__.__name__

    def _build_target_url(self, base_url: str, relative_path: str) -> str:
        """Constructs the full target URL for the backend request."""
        # Ensure no double slashes
        target_url = f"{base_url.rstrip('/')}/{relative_path.lstrip('/')}"
        logger.debug(f"Constructed target URL: {target_url} (from base: {base_url}, relative: {relative_path})")
        return target_url

    def _prepare_backend_headers(self, context: TransactionContext, settings: Settings) -> list[tuple[bytes, bytes]]:
        """Prepares the headers to be sent to the backend.

        Args:
            context: The current transaction context, containing the original request.
            settings: The application settings, used to get the backend URL
                and API key.

        Returns:
            A list of (header_name, header_value) tuples for the backend request.

        Raises:
            ValueError: If the BACKEND_URL setting is invalid and its hostname
                cannot be parsed for the Host header.
        """
        original_request = cast(httpx.Request, context.request)

        backend_headers: list[tuple[bytes, bytes]] = []
        backend_url_base = settings.get_backend_url()

        # Copy necessary headers from original request, excluding problematic ones
        for key_bytes, value_bytes in original_request.headers.raw:
            if key_bytes.lower() not in self._EXCLUDED_BACKEND_HEADERS:
                backend_headers.append((key_bytes, value_bytes))

        # Add the correct Host header for the backend
        try:
            parsed_backend_url = urlparse(backend_url_base)
            # Pyright infers hostname as Optional[Union[str, bytes]]; standard stubs suggest Optional[str].
            # Explicitly cast to Optional[str] to align with expectations.
            backend_host_nullable: Optional[str] = cast(Optional[str], parsed_backend_url.hostname)

            if not backend_host_nullable:
                raise ValueError("Could not parse hostname from BACKEND_URL")
            backend_host_str: str = backend_host_nullable  # Explicitly typed after check
            backend_headers.append((b"host", backend_host_str.encode("latin-1")))
        except ValueError as e:
            logger.error(
                f"[{context.transaction_id}] Invalid BACKEND_URL '{backend_url_base}' for Host header parsing: {e}",
                extra={"request_id": context.transaction_id},
            )
            raise ValueError(f"Could not determine backend Host from BACKEND_URL: {e}")

        # Force Accept-Encoding: identity (avoids downstream decompression issues)
        backend_headers.append((b"accept-encoding", b"identity"))

        # Add Backend Authorization Header using passed settings
        openai_key = settings.get_openai_api_key()
        backend_headers.append((b"authorization", f"Bearer {openai_key}".encode("latin-1")))

        return backend_headers

    async def apply(
        self,
        context: TransactionContext,
        container: DependencyContainer,
        session: AsyncSession,  # session is unused but required by interface
    ) -> TransactionContext:
        """
        Sends the request from context to the backend and stores the response.

        This policy constructs the target URL, prepares headers, and uses the
        HTTP client from the `DependencyContainer` to send the `context.request`.
        The backend's response (an `httpx.Response` object) is stored in
        `context.response`. The response body is read immediately.

        Args:
            context: The current transaction context, containing the `request` to be sent.
            container: The application dependency container, providing `settings` and `http_client`.
            session: An active SQLAlchemy `AsyncSession`. (Unused by this policy but required by the interface).

        Returns:
            The `TransactionContext`, updated with `context.response`
            containing the `httpx.Response` from the backend.

        Raises:
            ValueError: If `context.request` is None or if `BACKEND_URL` is invalid.
            httpx.TimeoutException: If the request to the backend times out.
            httpx.RequestError: For other network-related issues during the backend request.
            Exception: For any other unexpected errors during request preparation or execution.
        """
        settings = container.settings
        http_client = container.http_client

        if not context.request:
            raise ValueError(f"[{context.transaction_id}] Cannot send request: context.request is None")

        backend_url_base = settings.get_backend_url()
        if backend_url_base is None:
            error_msg = f"[{context.transaction_id}] BACKEND_URL is not configured."
            logger.error(error_msg)
            raise ValueError(error_msg)

        # --- Prepare Request Components ---
        try:
            target_url = self._build_target_url(backend_url_base, context.request.url.path)
            backend_headers = self._prepare_backend_headers(context, settings)
            context.request.url = httpx.URL(target_url)  # Ensure it's a URL object
            context.request.headers = httpx.Headers(backend_headers)  # Ensure it's a Headers object
        except ValueError as e:
            # Configuration or header preparation error
            logger.error(
                f"[{context.transaction_id}] Error preparing backend request components: {e}",
                extra={"request_id": context.transaction_id},
            )
            raise  # Re-raise the configuration error

        # --- Send Backend Request ---
        try:
            logger.debug(
                f"[{context.transaction_id}] Sending request to backend: {context.request.method} {context.request.url}"
            )
            response = await http_client.send(context.request)
            # Read response body immediately to ensure connection is closed
            await response.aread()
            context.response = response
            logger.debug(f"[{context.transaction_id}] Received backend response: {response.status_code}")
            logger.debug(f"[{context.transaction_id}] Read {len(response.content)} bytes from backend response body.")

        except (httpx.TimeoutException, httpx.RequestError) as e:
            request_url_for_log = context.request.url if context.request else target_url  # Fallback
            error_type = type(e).__name__
            logger.error(
                f"[{context.transaction_id}] {error_type} connecting to backend '{request_url_for_log}': {e}",
                extra={"request_id": context.transaction_id},
            )
            raise  # Re-raise the httpx error
        except Exception as e:
            request_url_for_log = context.request.url if context.request else target_url
            logger.exception(
                f"[{context.transaction_id}] Unexpected error during backend request "
                f"to '{request_url_for_log}' or body read: {e}",
                extra={"request_id": context.transaction_id},
            )
            raise  # Re-raise the unexpected error

        return context

    def serialize(self) -> SerializableDict:
        """Serializes the policy's configuration.

        For this policy, only the 'name' attribute is included, as all other
        dependencies (like HTTP client, settings) are resolved from the
        DependencyContainer at runtime.

        Returns:
            SerializableDict: A dictionary containing the 'name' of the policy instance.
        """
        return cast(SerializableDict, {"name": self.name})

    @classmethod
    def from_serialized(cls, config: SerializableDict) -> "SendBackendRequestPolicy":
        """
        Constructs the policy from serialized configuration.

        Args:
            config: A dictionary that may optionally contain a 'name' key
                    to set a custom name for the policy instance.

        Returns:
            An instance of SendBackendRequestPolicy.
        """
        resolved_name = str(config.get("name", cls.__name__))
        # If name_val is None, resolved_name remains None, and __init__ will use default.
        return cls(name=resolved_name)

Policy responsible for sending the request to the backend, storing the response, and reading the raw response body.

Attributes:

Name Type Description
name str

The name of this policy instance, used for logging and identification. It defaults to the class name if not provided during initialization.

apply(context, container, session) async
Source code in luthien_control/control_policy/send_backend_request.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
async def apply(
    self,
    context: TransactionContext,
    container: DependencyContainer,
    session: AsyncSession,  # session is unused but required by interface
) -> TransactionContext:
    """
    Sends the request from context to the backend and stores the response.

    This policy constructs the target URL, prepares headers, and uses the
    HTTP client from the `DependencyContainer` to send the `context.request`.
    The backend's response (an `httpx.Response` object) is stored in
    `context.response`. The response body is read immediately.

    Args:
        context: The current transaction context, containing the `request` to be sent.
        container: The application dependency container, providing `settings` and `http_client`.
        session: An active SQLAlchemy `AsyncSession`. (Unused by this policy but required by the interface).

    Returns:
        The `TransactionContext`, updated with `context.response`
        containing the `httpx.Response` from the backend.

    Raises:
        ValueError: If `context.request` is None or if `BACKEND_URL` is invalid.
        httpx.TimeoutException: If the request to the backend times out.
        httpx.RequestError: For other network-related issues during the backend request.
        Exception: For any other unexpected errors during request preparation or execution.
    """
    settings = container.settings
    http_client = container.http_client

    if not context.request:
        raise ValueError(f"[{context.transaction_id}] Cannot send request: context.request is None")

    backend_url_base = settings.get_backend_url()
    if backend_url_base is None:
        error_msg = f"[{context.transaction_id}] BACKEND_URL is not configured."
        logger.error(error_msg)
        raise ValueError(error_msg)

    # --- Prepare Request Components ---
    try:
        target_url = self._build_target_url(backend_url_base, context.request.url.path)
        backend_headers = self._prepare_backend_headers(context, settings)
        context.request.url = httpx.URL(target_url)  # Ensure it's a URL object
        context.request.headers = httpx.Headers(backend_headers)  # Ensure it's a Headers object
    except ValueError as e:
        # Configuration or header preparation error
        logger.error(
            f"[{context.transaction_id}] Error preparing backend request components: {e}",
            extra={"request_id": context.transaction_id},
        )
        raise  # Re-raise the configuration error

    # --- Send Backend Request ---
    try:
        logger.debug(
            f"[{context.transaction_id}] Sending request to backend: {context.request.method} {context.request.url}"
        )
        response = await http_client.send(context.request)
        # Read response body immediately to ensure connection is closed
        await response.aread()
        context.response = response
        logger.debug(f"[{context.transaction_id}] Received backend response: {response.status_code}")
        logger.debug(f"[{context.transaction_id}] Read {len(response.content)} bytes from backend response body.")

    except (httpx.TimeoutException, httpx.RequestError) as e:
        request_url_for_log = context.request.url if context.request else target_url  # Fallback
        error_type = type(e).__name__
        logger.error(
            f"[{context.transaction_id}] {error_type} connecting to backend '{request_url_for_log}': {e}",
            extra={"request_id": context.transaction_id},
        )
        raise  # Re-raise the httpx error
    except Exception as e:
        request_url_for_log = context.request.url if context.request else target_url
        logger.exception(
            f"[{context.transaction_id}] Unexpected error during backend request "
            f"to '{request_url_for_log}' or body read: {e}",
            extra={"request_id": context.transaction_id},
        )
        raise  # Re-raise the unexpected error

    return context

Sends the request from context to the backend and stores the response.

This policy constructs the target URL, prepares headers, and uses the HTTP client from the DependencyContainer to send the context.request. The backend's response (an httpx.Response object) is stored in context.response. The response body is read immediately.

Parameters:

Name Type Description Default
context TransactionContext

The current transaction context, containing the request to be sent.

required
container DependencyContainer

The application dependency container, providing settings and http_client.

required
session AsyncSession

An active SQLAlchemy AsyncSession. (Unused by this policy but required by the interface).

required

Returns:

Type Description
TransactionContext

The TransactionContext, updated with context.response

TransactionContext

containing the httpx.Response from the backend.

Raises:

Type Description
ValueError

If context.request is None or if BACKEND_URL is invalid.

TimeoutException

If the request to the backend times out.

RequestError

For other network-related issues during the backend request.

Exception

For any other unexpected errors during request preparation or execution.

from_serialized(config) classmethod
Source code in luthien_control/control_policy/send_backend_request.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
@classmethod
def from_serialized(cls, config: SerializableDict) -> "SendBackendRequestPolicy":
    """
    Constructs the policy from serialized configuration.

    Args:
        config: A dictionary that may optionally contain a 'name' key
                to set a custom name for the policy instance.

    Returns:
        An instance of SendBackendRequestPolicy.
    """
    resolved_name = str(config.get("name", cls.__name__))
    # If name_val is None, resolved_name remains None, and __init__ will use default.
    return cls(name=resolved_name)

Constructs the policy from serialized configuration.

Parameters:

Name Type Description Default
config SerializableDict

A dictionary that may optionally contain a 'name' key to set a custom name for the policy instance.

required

Returns:

Type Description
SendBackendRequestPolicy

An instance of SendBackendRequestPolicy.

serialize()
Source code in luthien_control/control_policy/send_backend_request.py
185
186
187
188
189
190
191
192
193
194
195
def serialize(self) -> SerializableDict:
    """Serializes the policy's configuration.

    For this policy, only the 'name' attribute is included, as all other
    dependencies (like HTTP client, settings) are resolved from the
    DependencyContainer at runtime.

    Returns:
        SerializableDict: A dictionary containing the 'name' of the policy instance.
    """
    return cast(SerializableDict, {"name": self.name})

Serializes the policy's configuration.

For this policy, only the 'name' attribute is included, as all other dependencies (like HTTP client, settings) are resolved from the DependencyContainer at runtime.

Returns:

Name Type Description
SerializableDict SerializableDict

A dictionary containing the 'name' of the policy instance.

serial_policy

SerialPolicy

Bases: ControlPolicy

Source code in luthien_control/control_policy/serial_policy.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
class SerialPolicy(ControlPolicy):
    """
    A Control Policy that applies an ordered sequence of other policies.

    Policies are applied sequentially. If any policy raises an exception,
    the execution stops, and the exception propagates.

    Attributes:
        policies (Sequence[ControlPolicy]): The ordered sequence of ControlPolicy
            instances that this policy will apply.
        logger (logging.Logger): The logger instance for this policy.
        name (str): The name of this policy instance, used for logging and
            identification.
    """

    def __init__(self, policies: Sequence[ControlPolicy], name: Optional[str] = None):
        """
        Initializes the SerialPolicy.

        Args:
            policies: An ordered sequence of ControlPolicy instances to apply.
            name: An optional name for logging/identification purposes.
        """
        if not policies:
            logger.warning(f"Initializing SerialPolicy '{name}' with an empty policy list.")
        self.policies = policies
        self.logger = logger
        self.name = name or self.__class__.__name__

    async def apply(
        self,
        context: "TransactionContext",
        container: DependencyContainer,
        session: AsyncSession,
    ) -> "TransactionContext":
        """
        Applies the contained policies sequentially to the context.
        Requires the DependencyContainer and an active SQLAlchemy AsyncSession.

        Args:
            context: The current transaction context.
            container: The application dependency container.
            session: An active SQLAlchemy AsyncSession, passed to member policies.

        Returns:
            The transaction context after all contained policies have been applied.

        Raises:
            Exception: Propagates any exception raised by a contained policy.
        """
        self.logger.debug(f"[{context.transaction_id}] Entering SerialPolicy: {self.name}")
        current_context = context
        for i, policy in enumerate(self.policies):
            member_policy_name = getattr(policy, "name", policy.__class__.__name__)  # Get policy name if available
            self.logger.info(
                f"[{current_context.transaction_id}] Applying policy {i + 1}/{len(self.policies)} "
                f"in {self.name}: {member_policy_name}"
            )
            try:
                current_context = await policy.apply(current_context, container=container, session=session)
            except Exception as e:
                self.logger.error(
                    f"[{current_context.transaction_id}] Error applying policy {member_policy_name} "
                    f"within {self.name}: {e}",
                    exc_info=True,
                )
                raise  # Re-raise the exception to halt processing
        self.logger.debug(f"[{context.transaction_id}] Exiting SerialPolicy: {self.name}")
        return current_context

    def __repr__(self) -> str:
        """Provides a developer-friendly representation."""
        # Get the name of each policy, using getattr as fallback like in apply
        policy_reprs = [f"{p.name} <{p.__class__.__name__}>" for p in self.policies]
        policy_list_str = ", ".join(policy_reprs)
        return f"<{self.name}(policies=[{policy_list_str}])>"

    def serialize(self) -> SerializableDict:
        """Serializes the SerialPolicy into a dictionary.

        This method converts the policy and its contained member policies
        into a serializable dictionary format. It uses the POLICY_CLASS_TO_NAME
        mapping to determine the 'type' string for each member policy.

        Returns:
            SerializableDict: A dictionary representation of the policy,
                              suitable for JSON serialization or persistence.
                              The dictionary has a "policies" key, which is a list
                              of serialized member policies. Each member policy dict
                              contains "type" and "config" keys.

        Raises:
            PolicyLoadError: If the type of a member policy cannot be determined
                             from POLICY_CLASS_TO_NAME.
        """
        # Import from registry here to avoid circular import
        from .registry import POLICY_CLASS_TO_NAME

        member_configs = []
        for p in self.policies:
            try:
                policy_type = POLICY_CLASS_TO_NAME[type(p)]
            except KeyError:
                raise PolicyLoadError(
                    f"Could not determine policy type for {type(p)} during serialization in {self.name} "
                    "(Not in POLICY_CLASS_TO_NAME)"
                )

            member_configs.append(
                {
                    "type": policy_type,
                    "config": p.serialize(),
                }
            )
        return cast(SerializableDict, {"policies": member_configs})

    @classmethod
    def from_serialized(cls, config: SerializableDict) -> "SerialPolicy":
        """
        Constructs a SerialPolicy from serialized data, loading member policies.

        Args:
            config: The serialized configuration dictionary. Expects a 'policies' key
                    containing a list of dictionaries, each with 'type' and 'config'.

        Returns:
            An instance of SerialPolicy.

        Raises:
            PolicyLoadError: If 'policies' key is missing, not a list, or if loading
                             a member policy fails.
        """
        member_policy_data_list_val = config.get("policies")

        if member_policy_data_list_val is None:
            raise PolicyLoadError("SerialPolicy config missing 'policies' list (key not found).")
        if not isinstance(member_policy_data_list_val, Iterable):
            raise PolicyLoadError(
                f"SerialPolicy 'policies' must be an iterable. Got {type(member_policy_data_list_val)}"
            )

        instantiated_policies = []

        for i, member_data in enumerate(member_policy_data_list_val):
            if not isinstance(member_data, dict):
                raise PolicyLoadError(
                    f"Item at index {i} in SerialPolicy 'policies' is not a dictionary. Got {type(member_data)}"
                )

            # Extract 'type' and 'config' for SerializedPolicy construction
            member_policy_type = member_data.get("type")
            member_policy_config = member_data.get("config")

            if not isinstance(member_policy_type, str):
                raise PolicyLoadError(
                    f"Member policy at index {i} in SerialPolicy 'policies' is missing 'type' "
                    f"or it's not a string. Got {type(member_policy_type)}"
                )
            if not isinstance(member_policy_config, dict):
                raise PolicyLoadError(
                    f"Member policy at index {i} in SerialPolicy 'policies' is missing 'config' "
                    f"or it's not a dictionary. Got {type(member_policy_config)}"
                )

            try:
                # Construct SerializedPolicy dataclass instance
                serialized_member_policy = SerializedPolicy(type=member_policy_type, config=member_policy_config)
                member_policy = load_policy(serialized_member_policy)
                instantiated_policies.append(member_policy)
            except PolicyLoadError as e:
                raise PolicyLoadError(
                    f"Failed to load member policy at index {i} "
                    f"(name: {member_data.get('name', 'unknown')}) "
                    f"within SerialPolicy: {e}"
                ) from e
            except Exception as e:
                raise PolicyLoadError(
                    f"Unexpected error loading member policy at index {i} "
                    f"(name: {member_data.get('name', 'unknown')}) "
                    f"within SerialPolicy: {e}"
                ) from e

        name_val = config.get("name")
        resolved_name: Optional[str]
        if name_val is not None:
            if not isinstance(name_val, str):
                logger.warning(f"SerialPolicy name '{name_val}' from config is not a string. Coercing to string.")
                resolved_name = str(name_val)
            else:
                resolved_name = name_val
        else:
            # Default name if not in config. Could also use cls.__name__
            resolved_name = "SerialPolicy"

        return cls(policies=instantiated_policies, name=resolved_name)

A Control Policy that applies an ordered sequence of other policies.

Policies are applied sequentially. If any policy raises an exception, the execution stops, and the exception propagates.

Attributes:

Name Type Description
policies Sequence[ControlPolicy]

The ordered sequence of ControlPolicy instances that this policy will apply.

logger Logger

The logger instance for this policy.

name str

The name of this policy instance, used for logging and identification.

__init__(policies, name=None)
Source code in luthien_control/control_policy/serial_policy.py
33
34
35
36
37
38
39
40
41
42
43
44
45
def __init__(self, policies: Sequence[ControlPolicy], name: Optional[str] = None):
    """
    Initializes the SerialPolicy.

    Args:
        policies: An ordered sequence of ControlPolicy instances to apply.
        name: An optional name for logging/identification purposes.
    """
    if not policies:
        logger.warning(f"Initializing SerialPolicy '{name}' with an empty policy list.")
    self.policies = policies
    self.logger = logger
    self.name = name or self.__class__.__name__

Initializes the SerialPolicy.

Parameters:

Name Type Description Default
policies Sequence[ControlPolicy]

An ordered sequence of ControlPolicy instances to apply.

required
name Optional[str]

An optional name for logging/identification purposes.

None
__repr__()
Source code in luthien_control/control_policy/serial_policy.py
88
89
90
91
92
93
def __repr__(self) -> str:
    """Provides a developer-friendly representation."""
    # Get the name of each policy, using getattr as fallback like in apply
    policy_reprs = [f"{p.name} <{p.__class__.__name__}>" for p in self.policies]
    policy_list_str = ", ".join(policy_reprs)
    return f"<{self.name}(policies=[{policy_list_str}])>"

Provides a developer-friendly representation.

apply(context, container, session) async
Source code in luthien_control/control_policy/serial_policy.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
async def apply(
    self,
    context: "TransactionContext",
    container: DependencyContainer,
    session: AsyncSession,
) -> "TransactionContext":
    """
    Applies the contained policies sequentially to the context.
    Requires the DependencyContainer and an active SQLAlchemy AsyncSession.

    Args:
        context: The current transaction context.
        container: The application dependency container.
        session: An active SQLAlchemy AsyncSession, passed to member policies.

    Returns:
        The transaction context after all contained policies have been applied.

    Raises:
        Exception: Propagates any exception raised by a contained policy.
    """
    self.logger.debug(f"[{context.transaction_id}] Entering SerialPolicy: {self.name}")
    current_context = context
    for i, policy in enumerate(self.policies):
        member_policy_name = getattr(policy, "name", policy.__class__.__name__)  # Get policy name if available
        self.logger.info(
            f"[{current_context.transaction_id}] Applying policy {i + 1}/{len(self.policies)} "
            f"in {self.name}: {member_policy_name}"
        )
        try:
            current_context = await policy.apply(current_context, container=container, session=session)
        except Exception as e:
            self.logger.error(
                f"[{current_context.transaction_id}] Error applying policy {member_policy_name} "
                f"within {self.name}: {e}",
                exc_info=True,
            )
            raise  # Re-raise the exception to halt processing
    self.logger.debug(f"[{context.transaction_id}] Exiting SerialPolicy: {self.name}")
    return current_context

Applies the contained policies sequentially to the context. Requires the DependencyContainer and an active SQLAlchemy AsyncSession.

Parameters:

Name Type Description Default
context TransactionContext

The current transaction context.

required
container DependencyContainer

The application dependency container.

required
session AsyncSession

An active SQLAlchemy AsyncSession, passed to member policies.

required

Returns:

Type Description
TransactionContext

The transaction context after all contained policies have been applied.

Raises:

Type Description
Exception

Propagates any exception raised by a contained policy.

from_serialized(config) classmethod
Source code in luthien_control/control_policy/serial_policy.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
@classmethod
def from_serialized(cls, config: SerializableDict) -> "SerialPolicy":
    """
    Constructs a SerialPolicy from serialized data, loading member policies.

    Args:
        config: The serialized configuration dictionary. Expects a 'policies' key
                containing a list of dictionaries, each with 'type' and 'config'.

    Returns:
        An instance of SerialPolicy.

    Raises:
        PolicyLoadError: If 'policies' key is missing, not a list, or if loading
                         a member policy fails.
    """
    member_policy_data_list_val = config.get("policies")

    if member_policy_data_list_val is None:
        raise PolicyLoadError("SerialPolicy config missing 'policies' list (key not found).")
    if not isinstance(member_policy_data_list_val, Iterable):
        raise PolicyLoadError(
            f"SerialPolicy 'policies' must be an iterable. Got {type(member_policy_data_list_val)}"
        )

    instantiated_policies = []

    for i, member_data in enumerate(member_policy_data_list_val):
        if not isinstance(member_data, dict):
            raise PolicyLoadError(
                f"Item at index {i} in SerialPolicy 'policies' is not a dictionary. Got {type(member_data)}"
            )

        # Extract 'type' and 'config' for SerializedPolicy construction
        member_policy_type = member_data.get("type")
        member_policy_config = member_data.get("config")

        if not isinstance(member_policy_type, str):
            raise PolicyLoadError(
                f"Member policy at index {i} in SerialPolicy 'policies' is missing 'type' "
                f"or it's not a string. Got {type(member_policy_type)}"
            )
        if not isinstance(member_policy_config, dict):
            raise PolicyLoadError(
                f"Member policy at index {i} in SerialPolicy 'policies' is missing 'config' "
                f"or it's not a dictionary. Got {type(member_policy_config)}"
            )

        try:
            # Construct SerializedPolicy dataclass instance
            serialized_member_policy = SerializedPolicy(type=member_policy_type, config=member_policy_config)
            member_policy = load_policy(serialized_member_policy)
            instantiated_policies.append(member_policy)
        except PolicyLoadError as e:
            raise PolicyLoadError(
                f"Failed to load member policy at index {i} "
                f"(name: {member_data.get('name', 'unknown')}) "
                f"within SerialPolicy: {e}"
            ) from e
        except Exception as e:
            raise PolicyLoadError(
                f"Unexpected error loading member policy at index {i} "
                f"(name: {member_data.get('name', 'unknown')}) "
                f"within SerialPolicy: {e}"
            ) from e

    name_val = config.get("name")
    resolved_name: Optional[str]
    if name_val is not None:
        if not isinstance(name_val, str):
            logger.warning(f"SerialPolicy name '{name_val}' from config is not a string. Coercing to string.")
            resolved_name = str(name_val)
        else:
            resolved_name = name_val
    else:
        # Default name if not in config. Could also use cls.__name__
        resolved_name = "SerialPolicy"

    return cls(policies=instantiated_policies, name=resolved_name)

Constructs a SerialPolicy from serialized data, loading member policies.

Parameters:

Name Type Description Default
config SerializableDict

The serialized configuration dictionary. Expects a 'policies' key containing a list of dictionaries, each with 'type' and 'config'.

required

Returns:

Type Description
SerialPolicy

An instance of SerialPolicy.

Raises:

Type Description
PolicyLoadError

If 'policies' key is missing, not a list, or if loading a member policy fails.

serialize()
Source code in luthien_control/control_policy/serial_policy.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def serialize(self) -> SerializableDict:
    """Serializes the SerialPolicy into a dictionary.

    This method converts the policy and its contained member policies
    into a serializable dictionary format. It uses the POLICY_CLASS_TO_NAME
    mapping to determine the 'type' string for each member policy.

    Returns:
        SerializableDict: A dictionary representation of the policy,
                          suitable for JSON serialization or persistence.
                          The dictionary has a "policies" key, which is a list
                          of serialized member policies. Each member policy dict
                          contains "type" and "config" keys.

    Raises:
        PolicyLoadError: If the type of a member policy cannot be determined
                         from POLICY_CLASS_TO_NAME.
    """
    # Import from registry here to avoid circular import
    from .registry import POLICY_CLASS_TO_NAME

    member_configs = []
    for p in self.policies:
        try:
            policy_type = POLICY_CLASS_TO_NAME[type(p)]
        except KeyError:
            raise PolicyLoadError(
                f"Could not determine policy type for {type(p)} during serialization in {self.name} "
                "(Not in POLICY_CLASS_TO_NAME)"
            )

        member_configs.append(
            {
                "type": policy_type,
                "config": p.serialize(),
            }
        )
    return cast(SerializableDict, {"policies": member_configs})

Serializes the SerialPolicy into a dictionary.

This method converts the policy and its contained member policies into a serializable dictionary format. It uses the POLICY_CLASS_TO_NAME mapping to determine the 'type' string for each member policy.

Returns:

Name Type Description
SerializableDict SerializableDict

A dictionary representation of the policy, suitable for JSON serialization or persistence. The dictionary has a "policies" key, which is a list of serialized member policies. Each member policy dict contains "type" and "config" keys.

Raises:

Type Description
PolicyLoadError

If the type of a member policy cannot be determined from POLICY_CLASS_TO_NAME.

serialization

SerializedPolicy dataclass

Source code in luthien_control/control_policy/serialization.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@dataclass
class SerializedPolicy:
    """Represents the serialized form of a ControlPolicy.

    This structure is used to store and transfer policy configurations.
    The 'type' field identifies the specific policy class, and the 'config'
    field contains the parameters needed to reconstruct that policy instance.

    Attributes:
        type (str): The registered name of the policy type (e.g., "AddApiKeyHeader").
        config (SerializableDict): A dictionary containing the configuration
                                   parameters for the policy instance.
    """

    type: str
    config: SerializableDict

Represents the serialized form of a ControlPolicy.

This structure is used to store and transfer policy configurations. The 'type' field identifies the specific policy class, and the 'config' field contains the parameters needed to reconstruct that policy instance.

Attributes:

Name Type Description
type str

The registered name of the policy type (e.g., "AddApiKeyHeader").

config SerializableDict

A dictionary containing the configuration parameters for the policy instance.

tx_logging

Transaction logging policy components, including serializers.

LuthienLogData

Bases: NamedTuple

Source code in luthien_control/control_policy/tx_logging/tx_logging_spec.py
18
19
20
21
22
23
class LuthienLogData(NamedTuple):
    """Data structure for what a TxLoggingSpec should return."""

    datatype: str
    data: Optional[SerializableDict]
    notes: Optional[SerializableDict]

Data structure for what a TxLoggingSpec should return.

TxLoggingSpec

Bases: ABC

Source code in luthien_control/control_policy/tx_logging/tx_logging_spec.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class TxLoggingSpec(abc.ABC):
    """Abstract Base Class for defining how to generate a log entry from a TransactionContext."""

    TYPE_NAME: str
    __is_abstract_type__: bool = False  # User-defined to mark intermediate abstract classes

    def __init_subclass__(cls, **kwargs: Any) -> None:
        """Registers subclasses in the LOGGING_SPEC_REGISTRY."""
        super().__init_subclass__(**kwargs)
        if hasattr(cls, "TYPE_NAME") and cls.TYPE_NAME:
            if cls.TYPE_NAME in LOGGING_SPEC_REGISTRY:
                pass  # Allow re-registration
            LOGGING_SPEC_REGISTRY[cls.TYPE_NAME] = cls
        elif not getattr(cls, "__is_abstract_type__", False):
            print(
                f"Warning: TxLoggingSpec subclass {cls.__name__} does not have a TYPE_NAME defined or it is empty. "
                "It will not be registered."
            )

    @abc.abstractmethod
    def generate_log_data(
        self, context: "TransactionContext", notes: Optional[SerializableDict] = None
    ) -> Optional[LuthienLogData]:
        raise NotImplementedError

    @abc.abstractmethod
    def serialize(self) -> SerializableDict:
        raise NotImplementedError

    @classmethod
    def from_serialized(cls: Type[LoggingSpecT], config: SerializableDict) -> LoggingSpecT:
        spec_type_name = config.get("type")
        if not isinstance(spec_type_name, str):
            raise ValueError(
                f"TxLoggingSpec configuration must include a 'type' field as a string. "
                f"Got: {spec_type_name!r} (type: {type(spec_type_name).__name__})"
            )
        target_spec_class = LOGGING_SPEC_REGISTRY.get(spec_type_name)
        if not target_spec_class:
            raise ValueError(
                f"Unknown TxLoggingSpec type '{spec_type_name}'. Ensure it is registered in LOGGING_SPEC_REGISTRY."
                f" Available types: {list(LOGGING_SPEC_REGISTRY.keys())}"
            )
        return cast(LoggingSpecT, target_spec_class._from_serialized_impl(config))

    @classmethod
    @abc.abstractmethod
    def _from_serialized_impl(cls: Type[LoggingSpecT], config: SerializableDict) -> LoggingSpecT:
        raise NotImplementedError

Abstract Base Class for defining how to generate a log entry from a TransactionContext.

__init_subclass__(**kwargs)
Source code in luthien_control/control_policy/tx_logging/tx_logging_spec.py
32
33
34
35
36
37
38
39
40
41
42
43
def __init_subclass__(cls, **kwargs: Any) -> None:
    """Registers subclasses in the LOGGING_SPEC_REGISTRY."""
    super().__init_subclass__(**kwargs)
    if hasattr(cls, "TYPE_NAME") and cls.TYPE_NAME:
        if cls.TYPE_NAME in LOGGING_SPEC_REGISTRY:
            pass  # Allow re-registration
        LOGGING_SPEC_REGISTRY[cls.TYPE_NAME] = cls
    elif not getattr(cls, "__is_abstract_type__", False):
        print(
            f"Warning: TxLoggingSpec subclass {cls.__name__} does not have a TYPE_NAME defined or it is empty. "
            "It will not be registered."
        )

Registers subclasses in the LOGGING_SPEC_REGISTRY.

full_transaction_context_spec

Defines the FullTransactionContextSpec for TxLoggingPolicy.

logging_utils

Utility functions and constants for logging serialization.

openai_request_spec

Defines the OpenAIRequestSpec for TxLoggingPolicy.

serialize_openai_chat_request(request)
Source code in luthien_control/control_policy/tx_logging/openai_request_spec.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def serialize_openai_chat_request(request: httpx.Request) -> Dict[str, Any]:
    """Serializes an httpx.Request known to be for OpenAI Chat Completions.

    Extracts relevant fields from the JSON body and sanitizes headers.

    Args:
        request: The httpx.Request object for OpenAI Chat Completions.

    Returns:
        A dictionary representing the serialized OpenAI chat request.
    """
    serialized_data = {
        "method": request.method,
        "url": str(request.url),
        "headers": _sanitize_headers(request.headers),  # General header sanitization
    }
    openai_payload = {}
    try:
        request_body = json.loads(request.content.decode("utf-8"))
        for field in OPENAI_CHAT_REQUEST_FIELDS:
            if field in request_body:
                openai_payload[field] = request_body[field]

    except (json.JSONDecodeError, UnicodeDecodeError, AttributeError) as e:
        logger.error(f"Error parsing OpenAI request: {e}")
        openai_payload["error"] = f"{type(e).__name__}: {str(e)}"

    serialized_data["content"] = openai_payload
    return serialized_data

Serializes an httpx.Request known to be for OpenAI Chat Completions.

Extracts relevant fields from the JSON body and sanitizes headers.

Parameters:

Name Type Description Default
request Request

The httpx.Request object for OpenAI Chat Completions.

required

Returns:

Type Description
Dict[str, Any]

A dictionary representing the serialized OpenAI chat request.

openai_response_spec

Defines the OpenAIResponseSpec for TxLoggingPolicy.

serialize_openai_chat_response(response)
Source code in luthien_control/control_policy/tx_logging/openai_response_spec.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def serialize_openai_chat_response(response: httpx.Response) -> Dict[str, Any]:
    """Serializes an httpx.Response known to be from OpenAI Chat Completions.

    Extracts relevant fields from the JSON body and includes sanitized headers
    and status code.

    Args:
        response: The httpx.Response object from OpenAI Chat Completions.

    Returns:
        A dictionary representing the serialized OpenAI chat response.
    """
    serialized_data = {
        "status_code": response.status_code,
        "headers": _sanitize_headers(response.headers),  # General header sanitization
        "elapsed_ms": response.elapsed.total_seconds() * 1000,
        "reason_phrase": response.reason_phrase,
        "http_version": response.http_version,
    }
    openai_payload = {}
    try:
        # Ensure content is read. httpx.Response.json() handles decoding.
        response_body = response.json()
        for field in OPENAI_CHAT_RESPONSE_FIELDS:
            if field in response_body:
                openai_payload[field] = response_body[field]

    except (json.JSONDecodeError, httpx.ResponseNotRead, UnicodeDecodeError, AttributeError) as e:
        logger.error(f"Error parsing OpenAI response: {e}")
        openai_payload["error"] = f"{type(e).__name__}: {str(e)}"

    serialized_data["content"] = openai_payload
    return serialized_data

Serializes an httpx.Response known to be from OpenAI Chat Completions.

Extracts relevant fields from the JSON body and includes sanitized headers and status code.

Parameters:

Name Type Description Default
response Response

The httpx.Response object from OpenAI Chat Completions.

required

Returns:

Type Description
Dict[str, Any]

A dictionary representing the serialized OpenAI chat response.

request_headers_spec

Defines the RequestHeadersSpec for TxLoggingPolicy.

response_headers_spec

Defines the ResponseHeadersSpec for TxLoggingPolicy.

tx_logging_spec

Defines logging specifications for TxLoggingPolicy.

LuthienLogData

Bases: NamedTuple

Source code in luthien_control/control_policy/tx_logging/tx_logging_spec.py
18
19
20
21
22
23
class LuthienLogData(NamedTuple):
    """Data structure for what a TxLoggingSpec should return."""

    datatype: str
    data: Optional[SerializableDict]
    notes: Optional[SerializableDict]

Data structure for what a TxLoggingSpec should return.

TxLoggingSpec

Bases: ABC

Source code in luthien_control/control_policy/tx_logging/tx_logging_spec.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class TxLoggingSpec(abc.ABC):
    """Abstract Base Class for defining how to generate a log entry from a TransactionContext."""

    TYPE_NAME: str
    __is_abstract_type__: bool = False  # User-defined to mark intermediate abstract classes

    def __init_subclass__(cls, **kwargs: Any) -> None:
        """Registers subclasses in the LOGGING_SPEC_REGISTRY."""
        super().__init_subclass__(**kwargs)
        if hasattr(cls, "TYPE_NAME") and cls.TYPE_NAME:
            if cls.TYPE_NAME in LOGGING_SPEC_REGISTRY:
                pass  # Allow re-registration
            LOGGING_SPEC_REGISTRY[cls.TYPE_NAME] = cls
        elif not getattr(cls, "__is_abstract_type__", False):
            print(
                f"Warning: TxLoggingSpec subclass {cls.__name__} does not have a TYPE_NAME defined or it is empty. "
                "It will not be registered."
            )

    @abc.abstractmethod
    def generate_log_data(
        self, context: "TransactionContext", notes: Optional[SerializableDict] = None
    ) -> Optional[LuthienLogData]:
        raise NotImplementedError

    @abc.abstractmethod
    def serialize(self) -> SerializableDict:
        raise NotImplementedError

    @classmethod
    def from_serialized(cls: Type[LoggingSpecT], config: SerializableDict) -> LoggingSpecT:
        spec_type_name = config.get("type")
        if not isinstance(spec_type_name, str):
            raise ValueError(
                f"TxLoggingSpec configuration must include a 'type' field as a string. "
                f"Got: {spec_type_name!r} (type: {type(spec_type_name).__name__})"
            )
        target_spec_class = LOGGING_SPEC_REGISTRY.get(spec_type_name)
        if not target_spec_class:
            raise ValueError(
                f"Unknown TxLoggingSpec type '{spec_type_name}'. Ensure it is registered in LOGGING_SPEC_REGISTRY."
                f" Available types: {list(LOGGING_SPEC_REGISTRY.keys())}"
            )
        return cast(LoggingSpecT, target_spec_class._from_serialized_impl(config))

    @classmethod
    @abc.abstractmethod
    def _from_serialized_impl(cls: Type[LoggingSpecT], config: SerializableDict) -> LoggingSpecT:
        raise NotImplementedError

Abstract Base Class for defining how to generate a log entry from a TransactionContext.

__init_subclass__(**kwargs)
Source code in luthien_control/control_policy/tx_logging/tx_logging_spec.py
32
33
34
35
36
37
38
39
40
41
42
43
def __init_subclass__(cls, **kwargs: Any) -> None:
    """Registers subclasses in the LOGGING_SPEC_REGISTRY."""
    super().__init_subclass__(**kwargs)
    if hasattr(cls, "TYPE_NAME") and cls.TYPE_NAME:
        if cls.TYPE_NAME in LOGGING_SPEC_REGISTRY:
            pass  # Allow re-registration
        LOGGING_SPEC_REGISTRY[cls.TYPE_NAME] = cls
    elif not getattr(cls, "__is_abstract_type__", False):
        print(
            f"Warning: TxLoggingSpec subclass {cls.__name__} does not have a TYPE_NAME defined or it is empty. "
            "It will not be registered."
        )

Registers subclasses in the LOGGING_SPEC_REGISTRY.

tx_logging_policy

Control Policy for logging requests and responses based on TxLoggingSpec instances.

TxLoggingPolicy

Bases: ControlPolicy

Source code in luthien_control/control_policy/tx_logging_policy.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class TxLoggingPolicy(ControlPolicy):
    """A control policy that logs data based on a list of TxLoggingSpec instances."""

    TYPE_NAME = "TxLoggingPolicy"

    def __init__(
        self,
        spec: TxLoggingSpec,
        name: Optional[str] = "TxLoggingPolicy",
        **kwargs: Any,
    ) -> None:
        """Initializes the TxLoggingPolicy.

        Args:
            spec: A TxLoggingSpec instance that defines what to log.
            name: An optional name for the policy instance.
            **kwargs: Additional keyword arguments passed to the superclass.
        """
        super().__init__(**kwargs)
        self.name = name or self.TYPE_NAME
        self.spec = spec

    async def _log_database_entry(
        self,
        session: "AsyncSession",
        transaction_id: str,
        log_datetime: datetime,
        log_data_obj: LuthienLogData,
    ) -> None:
        """Helper to create and add LuthienLog entry to session from LuthienLogData."""
        log_entry = LuthienLog(
            transaction_id=transaction_id,
            datetime=log_datetime,
            data=log_data_obj.data,
            datatype=log_data_obj.datatype,
            notes=log_data_obj.notes,
        )
        session.add(log_entry)
        logger.debug(f"Prepared log entry for {log_data_obj.datatype} (tx: {transaction_id})")

    async def apply(
        self, context: "TransactionContext", container: "DependencyContainer", session: "AsyncSession"
    ) -> "TransactionContext":
        """Applies all configured logging specifications to the transaction context."""

        current_dt = datetime.now(timezone.utc)
        tx_id = str(context.transaction_id)

        try:
            log_data_obj = self.spec.generate_log_data(context)
            if log_data_obj:
                await self._log_database_entry(session, tx_id, current_dt, log_data_obj)
                logger.info(
                    f"Logged data for transaction {tx_id} via spec type {self.spec.TYPE_NAME}, "
                    f"datatype {log_data_obj.datatype}"
                )
        except Exception as e:
            logger.error(
                f"Error during logging for transaction {tx_id} with spec "
                f"{getattr(self.spec, 'TYPE_NAME', 'UnknownSpec')} (policy: {self.name}): {e}",
                exc_info=True,
            )
            # Do not re-raise, logging failure should not break the main flow.

        return context

    def serialize(self) -> SerializableDict:
        """Serializes the policy's configuration, including its spec."""
        return SerializableDict(
            {
                "type": self.TYPE_NAME,
                "name": self.name,
                "spec": self.spec.serialize(),
            }
        )

    @classmethod
    def from_serialized(cls, config: SerializableDict) -> "TxLoggingPolicy":
        """Creates an instance of TxLoggingPolicy from serialized data.

        This involves deserializing the configured logging spec.
        """
        policy_name = config.get("name", cls.TYPE_NAME)
        if not isinstance(policy_name, (str, type(None))):
            # Fallback to TYPE_NAME if policy_name is not a string or None
            policy_name = cls.TYPE_NAME

        serialized_spec = config.get("spec")

        if not isinstance(serialized_spec, dict):
            raise ValueError("TxLoggingPolicy config missing 'spec' dictionary.")

        try:
            # Use the TxLoggingSpec base class from_serialized as a dispatcher
            spec_instance = TxLoggingSpec.from_serialized(serialized_spec)
        except Exception as e:
            raise ValueError(f"Error deserializing spec (config: {serialized_spec}): {e}") from e

        return cls(name=policy_name, spec=spec_instance)

A control policy that logs data based on a list of TxLoggingSpec instances.

__init__(spec, name='TxLoggingPolicy', **kwargs)
Source code in luthien_control/control_policy/tx_logging_policy.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def __init__(
    self,
    spec: TxLoggingSpec,
    name: Optional[str] = "TxLoggingPolicy",
    **kwargs: Any,
) -> None:
    """Initializes the TxLoggingPolicy.

    Args:
        spec: A TxLoggingSpec instance that defines what to log.
        name: An optional name for the policy instance.
        **kwargs: Additional keyword arguments passed to the superclass.
    """
    super().__init__(**kwargs)
    self.name = name or self.TYPE_NAME
    self.spec = spec

Initializes the TxLoggingPolicy.

Parameters:

Name Type Description Default
spec TxLoggingSpec

A TxLoggingSpec instance that defines what to log.

required
name Optional[str]

An optional name for the policy instance.

'TxLoggingPolicy'
**kwargs Any

Additional keyword arguments passed to the superclass.

{}
apply(context, container, session) async
Source code in luthien_control/control_policy/tx_logging_policy.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
async def apply(
    self, context: "TransactionContext", container: "DependencyContainer", session: "AsyncSession"
) -> "TransactionContext":
    """Applies all configured logging specifications to the transaction context."""

    current_dt = datetime.now(timezone.utc)
    tx_id = str(context.transaction_id)

    try:
        log_data_obj = self.spec.generate_log_data(context)
        if log_data_obj:
            await self._log_database_entry(session, tx_id, current_dt, log_data_obj)
            logger.info(
                f"Logged data for transaction {tx_id} via spec type {self.spec.TYPE_NAME}, "
                f"datatype {log_data_obj.datatype}"
            )
    except Exception as e:
        logger.error(
            f"Error during logging for transaction {tx_id} with spec "
            f"{getattr(self.spec, 'TYPE_NAME', 'UnknownSpec')} (policy: {self.name}): {e}",
            exc_info=True,
        )
        # Do not re-raise, logging failure should not break the main flow.

    return context

Applies all configured logging specifications to the transaction context.

from_serialized(config) classmethod
Source code in luthien_control/control_policy/tx_logging_policy.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
@classmethod
def from_serialized(cls, config: SerializableDict) -> "TxLoggingPolicy":
    """Creates an instance of TxLoggingPolicy from serialized data.

    This involves deserializing the configured logging spec.
    """
    policy_name = config.get("name", cls.TYPE_NAME)
    if not isinstance(policy_name, (str, type(None))):
        # Fallback to TYPE_NAME if policy_name is not a string or None
        policy_name = cls.TYPE_NAME

    serialized_spec = config.get("spec")

    if not isinstance(serialized_spec, dict):
        raise ValueError("TxLoggingPolicy config missing 'spec' dictionary.")

    try:
        # Use the TxLoggingSpec base class from_serialized as a dispatcher
        spec_instance = TxLoggingSpec.from_serialized(serialized_spec)
    except Exception as e:
        raise ValueError(f"Error deserializing spec (config: {serialized_spec}): {e}") from e

    return cls(name=policy_name, spec=spec_instance)

Creates an instance of TxLoggingPolicy from serialized data.

This involves deserializing the configured logging spec.

serialize()
Source code in luthien_control/control_policy/tx_logging_policy.py
85
86
87
88
89
90
91
92
93
def serialize(self) -> SerializableDict:
    """Serializes the policy's configuration, including its spec."""
    return SerializableDict(
        {
            "type": self.TYPE_NAME,
            "name": self.name,
            "spec": self.spec.serialize(),
        }
    )

Serializes the policy's configuration, including its spec.

core

dependencies

get_db_session(dependencies=Depends(get_dependencies)) async

Source code in luthien_control/core/dependencies.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
async def get_db_session(
    dependencies: DependencyContainer = Depends(get_dependencies),
) -> AsyncGenerator[AsyncSession, None]:
    """FastAPI dependency to get an async database session using the container's factory."""
    session_factory = dependencies.db_session_factory
    if session_factory is None:
        # This shouldn't happen if the container is initialized correctly
        logger.critical("DB Session Factory not found in DependencyContainer.")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="Internal server error: Database session factory not available.",
        )

    async with session_factory() as session:
        try:
            yield session
        except Exception:
            await session.rollback()
            raise
        finally:
            # The session context manager should handle commit/close,
            # but rollback is explicit on exception.
            pass

FastAPI dependency to get an async database session using the container's factory.

get_dependencies(request)

Source code in luthien_control/core/dependencies.py
29
30
31
32
33
34
35
36
37
38
39
40
41
def get_dependencies(request: Request) -> DependencyContainer:
    """Dependency to retrieve the DependencyContainer from application state."""
    dependencies: DependencyContainer | None = getattr(request.app.state, "dependencies", None)
    if dependencies is None:
        logger.critical(
            "DependencyContainer not found in application state. "
            "This indicates a critical setup error in the application lifespan."
        )
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="Internal server error: Application dependencies not initialized.",
        )
    return dependencies

Dependency to retrieve the DependencyContainer from application state.

get_main_control_policy(dependencies=Depends(get_dependencies)) async

Source code in luthien_control/core/dependencies.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
async def get_main_control_policy(
    dependencies: DependencyContainer = Depends(get_dependencies),
) -> ControlPolicy:
    """
    Dependency to load and provide the main ControlPolicy instance.

    Uses the DependencyContainer to access settings, http_client, and a database session.
    """
    settings = dependencies.settings
    policy_filepath = settings.get_policy_filepath()
    if policy_filepath:
        logger.info(f"Loading main control policy from file: {policy_filepath}")
        return load_policy_from_file(policy_filepath)

    top_level_policy_name = settings.get_top_level_policy_name()
    if not top_level_policy_name:
        logger.error("TOP_LEVEL_POLICY_NAME is not configured in settings.")
        raise HTTPException(status_code=500, detail="Internal server error: Control policy name not configured.")
    try:
        # Get a session using the container's factory - No longer needed here, load_policy_from_db handles it
        # async with session_factory() as session:
        # Pass the container directly to load_policy_from_db
        main_policy = await load_policy_from_db(
            name=top_level_policy_name,
            container=dependencies,  # Pass the whole container
        )

        if not main_policy:
            logger.error(f"Main control policy '{top_level_policy_name}' could not be loaded (not found or inactive).")
            raise HTTPException(
                status_code=500,
                detail=f"Internal server error: Main control policy '{top_level_policy_name}' not found or inactive.",
            )

        return main_policy

    except PolicyLoadError as e:
        logger.exception(f"Failed to load main control policy '{top_level_policy_name}': {e}")
        raise HTTPException(status_code=500, detail=f"Internal server error: Could not load main control policy. {e}")
    except HTTPException:  # Re-raise HTTPExceptions from session creation
        raise
    except Exception as e:
        logger.exception(f"Unexpected error loading main control policy '{top_level_policy_name}': {e}")
        raise HTTPException(
            status_code=500, detail="Internal server error: Unexpected issue loading main control policy."
        )

Dependency to load and provide the main ControlPolicy instance.

Uses the DependencyContainer to access settings, http_client, and a database session.

initialize_app_dependencies(app_settings) async

Source code in luthien_control/core/dependencies.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
async def initialize_app_dependencies(app_settings: Settings) -> DependencyContainer:
    """Initialize and configure core application dependencies.

    This function sets up essential services required by the application,
    including an HTTP client and a database connection pool. It encapsulates
    the creation and configuration of these dependencies into a
    DependencyContainer instance.

    Args:
        app_settings: The application settings instance.

    Returns:
        A DependencyContainer instance populated with initialized dependencies.

    Raises:
        RuntimeError: If initialization of the HTTP client or database engine fails.
    """
    logger.info("Initializing core application dependencies...")

    # Initialize HTTP client
    timeout = httpx.Timeout(5.0, connect=5.0, read=60.0, write=5.0)
    http_client = httpx.AsyncClient(timeout=timeout)
    logger.info("HTTP Client initialized for DependencyContainer.")

    # Initialize Database Engine and Session Factory
    try:
        logger.info("Attempting to create main DB engine and session factory for DependencyContainer...")
        _db_engine = await create_db_engine()  # Uses app_settings implicitly via global settings instance
        logger.info("Main DB engine successfully created for DependencyContainer.")
        # Use the actual session factory from database_async module
        db_session_factory = db_get_session
        logger.info("DB Session Factory reference obtained for DependencyContainer.")

    except Exception as db_exc:
        logger.critical(f"Failed to initialize database for DependencyContainer due to exception: {db_exc}")
        await http_client.aclose()  # Clean up client
        logger.info("HTTP Client closed due to DB initialization failure.")
        # No need to call close_db_engine here, as db_engine might not be valid or fully initialized.
        # The caller (lifespan) will handle global engine cleanup if needed.
        raise RuntimeError(f"Failed to initialize database for DependencyContainer: {db_exc}") from db_exc

    # Create and return Dependency Container
    try:
        dependencies = DependencyContainer(
            settings=app_settings,
            http_client=http_client,
            db_session_factory=db_session_factory,
        )
        logger.info("Dependency Container created successfully.")
        return dependencies
    except Exception as container_exc:
        logger.critical(f"Failed to create Dependency Container instance: {container_exc}", exc_info=True)
        # Clean up resources created within this helper function
        await http_client.aclose()
        logger.info("HTTP Client closed due to Dependency Container instantiation failure.")
        # If db_engine was successfully created, it's now managed by the global close_db_engine,
        # which will be called by the lifespan's shutdown phase.
        # We don't call close_db_engine(db_engine_instance_if_any) here because the global one handles it.
        raise RuntimeError(f"Failed to create Dependency Container instance: {container_exc}") from container_exc

Initialize and configure core application dependencies.

This function sets up essential services required by the application, including an HTTP client and a database connection pool. It encapsulates the creation and configuration of these dependencies into a DependencyContainer instance.

Parameters:

Name Type Description Default
app_settings Settings

The application settings instance.

required

Returns:

Type Description
DependencyContainer

A DependencyContainer instance populated with initialized dependencies.

Raises:

Type Description
RuntimeError

If initialization of the HTTP client or database engine fails.

dependency_container

DependencyContainer

Source code in luthien_control/core/dependency_container.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class DependencyContainer:
    """Holds shared dependencies for the application."""

    def __init__(
        self,
        settings: Settings,
        http_client: httpx.AsyncClient,
        db_session_factory: Callable[[], AsyncContextManager[AsyncSession]],
    ) -> None:
        """
        Initializes the container.

        Args:
            settings: Application settings.
            http_client: Shared asynchronous HTTP client.
            db_session_factory: A factory function that returns an async context manager
                                yielding an SQLAlchemy AsyncSession.
        """
        self.settings = settings
        self.http_client = http_client
        self.db_session_factory = db_session_factory

Holds shared dependencies for the application.

__init__(settings, http_client, db_session_factory)
Source code in luthien_control/core/dependency_container.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def __init__(
    self,
    settings: Settings,
    http_client: httpx.AsyncClient,
    db_session_factory: Callable[[], AsyncContextManager[AsyncSession]],
) -> None:
    """
    Initializes the container.

    Args:
        settings: Application settings.
        http_client: Shared asynchronous HTTP client.
        db_session_factory: A factory function that returns an async context manager
                            yielding an SQLAlchemy AsyncSession.
    """
    self.settings = settings
    self.http_client = http_client
    self.db_session_factory = db_session_factory

Initializes the container.

Parameters:

Name Type Description Default
settings Settings

Application settings.

required
http_client AsyncClient

Shared asynchronous HTTP client.

required
db_session_factory Callable[[], AsyncContextManager[AsyncSession]]

A factory function that returns an async context manager yielding an SQLAlchemy AsyncSession.

required

logging

setup_logging()

Source code in luthien_control/core/logging.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def setup_logging():
    """
    Configures logging for the application.

    Reads the desired log level from the LOG_LEVEL environment variable.
    Defaults to INFO if not set or invalid.
    Sets a standard format and directs logs to stderr.
    Sets louder libraries to WARNING level.
    """
    settings = Settings()
    log_level_name = settings.get_log_level(default=DEFAULT_LOG_LEVEL)

    if log_level_name not in VALID_LOG_LEVELS:
        print(
            f"WARNING: Invalid LOG_LEVEL '{log_level_name}'. "
            f"Defaulting to {DEFAULT_LOG_LEVEL}. "
            f"Valid levels are: {', '.join(VALID_LOG_LEVELS)}",
            file=sys.stderr,
        )
        log_level_name = DEFAULT_LOG_LEVEL

    log_level = logging.getLevelName(log_level_name)

    # Use basicConfig for simplicity, directing to stderr
    logging.basicConfig(level=log_level, format=LOG_FORMAT, stream=sys.stderr)

    # Quiet down noisy libraries
    for lib_name in NOISY_LIBRARIES:
        logging.getLogger(lib_name).setLevel(logging.WARNING)

    # Log that configuration is complete (useful for debugging setup issues)
    logging.getLogger(__name__).info(f"Logging configured with level {log_level_name}.")

Configures logging for the application.

Reads the desired log level from the LOG_LEVEL environment variable. Defaults to INFO if not set or invalid. Sets a standard format and directs logs to stderr. Sets louder libraries to WARNING level.

response_builder

ResponseBuilder

Source code in luthien_control/core/response_builder.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class ResponseBuilder:
    """
    Builds a FastAPI response from the TransactionContext based on successful policy execution.

    Headers are filtered to remove hop-by-hop headers.
    Returns a 500 error response ONLY IF building the response itself fails unexpectedly,
    or if essential data (like status code) is missing after policies ran successfully.
    Policy execution errors (like auth failure) should be handled upstream by exception handlers.
    """

    def __init__(self):
        self.logger = logging.getLogger(__name__)
        # Headers that should not be forwarded from backend response to client
        # Based on https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers#hop-by-hop_headers
        # Stored as lowercase strings for case-insensitive matching
        self.hop_by_hop_headers = {
            "connection",
            "keep-alive",
            "proxy-authenticate",
            "proxy-authorization",
            "te",
            "trailers",
            "transfer-encoding",
            "upgrade",
            "content-length",  # Exclude even if present, FastAPI recalculates
            "content-encoding",  # Exclude, FastAPI handles compression if needed
        }

    def _convert_to_fastapi_response(self, context: TransactionContext) -> Response:
        """Converts an httpx.Response object to a FastAPI Response object.

        Filters hop-by-hop headers. The content, status code, and headers
        are taken directly from the httpx_response.

        Args:
            httpx_response: The httpx.Response object to convert.
            context: The transaction context, used for logging or other metadata.

        Returns:
            A FastAPI Response object.

        Raises:
            TypeError: if httpx_response is not an instance of httpx.Response.
        """
        # This check is a safeguard; callers should ideally ensure type correctness.
        if context.response is None:
            self.logger.error(f"[{context.transaction_id}] _convert_to_fastapi_response received None response.")
            raise TypeError("_convert_to_fastapi_response received None response.")
        if not isinstance(context.response, httpx.Response):
            self.logger.error(
                f"[{context.transaction_id}] _convert_to_fastapi_response received incorrect type: "
                f"{type(context.response)}. Expected httpx.Response."
            )
            raise TypeError(f"_convert_to_fastapi_response expected httpx.Response, got {type(context.response)}")

        # Filter headers from the backend response
        filtered_backend_headers = {
            k: v for k, v in context.response.headers.items() if k.lower() not in self.hop_by_hop_headers
        }

        # Extract media type from backend response headers if present
        media_type = context.response.headers.get("content-type")

        # Create a FastAPI Response using details from httpx_response
        # httpx_response.content is bytes
        return Response(
            content=context.response.content,
            status_code=context.response.status_code,
            headers=filtered_backend_headers,
            media_type=media_type,
        )

    def build_response(self, context: TransactionContext, dependencies: DependencyContainer) -> Response:
        try:
            return self._convert_to_fastapi_response(context)
        except Exception as convert_exc:  # Catch any other unexpected error during conversion
            self.logger.exception(
                f"[{context.transaction_id}] Unexpected error during conversion of context.response: {convert_exc}"
            )
            if dependencies.settings.dev_mode():
                return JSONResponse(
                    content={
                        "detail": f"Policy Error: {str(convert_exc)}",
                        "transaction_id": str(context.transaction_id),
                    },
                    status_code=500,
                )
            else:
                return JSONResponse(
                    content={
                        "detail": "Internal Server Error",
                        "transaction_id": str(context.transaction_id),
                    },
                    status_code=500,
                )

Builds a FastAPI response from the TransactionContext based on successful policy execution.

Headers are filtered to remove hop-by-hop headers. Returns a 500 error response ONLY IF building the response itself fails unexpectedly, or if essential data (like status code) is missing after policies ran successfully. Policy execution errors (like auth failure) should be handled upstream by exception handlers.

transaction_context

TransactionContext dataclass

Source code in luthien_control/core/transaction_context.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@dataclass
class TransactionContext:
    """Holds the state for a single transaction through the proxy.

    Attributes:
        transaction_id: A unique identifier for the transaction.
        request: The incoming HTTP request object.
        response: The outgoing HTTP response object.
        data: A general-purpose dictionary for policies to store and share
            information related to this transaction.
    """

    # Core Identifiers and State
    transaction_id: uuid.UUID = field(default_factory=uuid.uuid4)
    request: Optional[Request] = None
    response: Optional[Response] = None

    # General purpose data store for policies to share information
    data: Dict[str, Any] = field(default_factory=dict)

Holds the state for a single transaction through the proxy.

Attributes:

Name Type Description
transaction_id UUID

A unique identifier for the transaction.

request Optional[Request]

The incoming HTTP request object.

response Optional[Response]

The outgoing HTTP response object.

data Dict[str, Any]

A general-purpose dictionary for policies to store and share information related to this transaction.

get_tx_value(transaction_context, path)

Source code in luthien_control/core/transaction_context.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def get_tx_value(transaction_context: TransactionContext, path: str) -> Any:
    """Get a value from the transaction context using a path.

    Args:
        transaction_context: The transaction context.
        path: The path to the value e.g. "request.headers.user-agent", "response.status_code", "data.user_id".

    Returns:
        The value at the path.

    Raises:
        ValueError: If the path is invalid or the value cannot be accessed.
        TypeError: If the transaction_id is not a UUID.
    """
    vals = path.split(".")
    if len(vals) < 2:
        raise ValueError("Path must contain at least two components")

    x: Any = getattr(transaction_context, vals.pop(0))
    while vals:
        # If x is bytes, and we still have path segments to process,
        # it implies these segments are keys into the JSON content.
        if isinstance(x, bytes) and vals:  # Check if vals is not empty
            try:
                x = json.loads(x)
            except json.JSONDecodeError as e:
                # Wrapping the original error for better diagnostics
                raise ValueError(f"Failed to decode JSON content for path '{path}' at segment '{vals[0]}'") from e

        if isinstance(x, dict):
            x = x[vals.pop(0)]
        else:
            x = getattr(x, vals.pop(0))
    return x

Get a value from the transaction context using a path.

Parameters:

Name Type Description Default
transaction_context TransactionContext

The transaction context.

required
path str

The path to the value e.g. "request.headers.user-agent", "response.status_code", "data.user_id".

required

Returns:

Type Description
Any

The value at the path.

Raises:

Type Description
ValueError

If the path is invalid or the value cannot be accessed.

TypeError

If the transaction_id is not a UUID.

custom_openapi_schema

create_custom_openapi(app)

Source code in luthien_control/custom_openapi_schema.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def create_custom_openapi(app: FastAPI):
    """
    Generate a custom OpenAPI schema for the FastAPI application.

    This function retrieves the default schema and modifies it, specifically
    to set `allowReserved=True` for the `full_path` path parameter used
    in proxy routes. This is necessary for correctly handling URLs containing
    reserved characters within that path segment.

    Args:
        app: The FastAPI application instance.

    Returns:
        The modified OpenAPI schema dictionary.
    """
    # Check if schema already exists to avoid redundant generation
    if app.openapi_schema:
        return app.openapi_schema

    logger.debug("Generating custom OpenAPI schema.")
    openapi_schema = get_openapi(
        title=app.title,
        version=app.version,
        description=app.description,
        routes=app.routes,
    )

    # Modify the schema for the path parameter
    paths = openapi_schema.get("paths", {})
    logger.debug(f"Found {len(paths)} paths in schema. Searching for '{{full_path}}'.")
    for path_key, path_item in paths.items():
        if "{full_path}" in path_key:
            logger.debug(f"Processing path: {path_key}")
            # path_item contains methods like 'get', 'post', etc.
            for method, method_item in path_item.items():
                # Ensure 'parameters' exists and is a list
                parameters = method_item.get("parameters", [])
                if not isinstance(parameters, list):
                    logger.warning(f"Unexpected 'parameters' format in {path_key} -> {method}. Skipping.")
                    continue

                found_param = False
                for param in parameters:
                    # Ensure param is a dictionary and has 'name' and 'in' keys
                    if not isinstance(param, dict) or "name" not in param or "in" not in param:
                        logger.warning(
                            f"Malformed parameter definition in {path_key} -> {method}. Skipping param: {param}"
                        )
                        continue

                    if param["name"] == "full_path" and param["in"] == "path":
                        param["allowReserved"] = True
                        found_param = True
                        logger.info(f"Set allowReserved=true for 'full_path' parameter in {path_key} -> {method}")
                        # Assuming only one 'full_path' param per method
                        break  # No need to check other params for this method
                if not found_param:
                    logger.debug(f"No 'full_path' path parameter found in {path_key} -> {method}")

    # Cache the generated schema in the app instance
    app.openapi_schema = openapi_schema
    logger.debug("Custom OpenAPI schema generation complete.")
    return app.openapi_schema

Generate a custom OpenAPI schema for the FastAPI application.

This function retrieves the default schema and modifies it, specifically to set allowReserved=True for the full_path path parameter used in proxy routes. This is necessary for correctly handling URLs containing reserved characters within that path segment.

Parameters:

Name Type Description Default
app FastAPI

The FastAPI application instance.

required

Returns:

Type Description

The modified OpenAPI schema dictionary.

db

Database models and session management.

ControlPolicy

Bases: SQLModel

Source code in luthien_control/db/sqlmodel_models.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class ControlPolicy(SQLModel, table=True):
    __tablename__ = "policies"  # type: ignore (again, shut up pyright)
    """Database model for storing control policy configurations."""

    # Primary key
    id: Optional[int] = Field(default=None, primary_key=True)

    # --- Core Fields ---
    name: str = Field(index=True, unique=True)  # Unique name used for lookup
    type: str = Field()  # Type of policy, used for instantiation
    config: dict[str, Any] = Field(default={}, sa_column=Column(JSON))
    is_active: bool = Field(default=True, index=True)
    description: Optional[str] = Field(default=None)

    # --- Timestamps ---
    created_at: dt.datetime = Field(
        default_factory=lambda: datetime.now(timezone.utc).replace(tzinfo=None), nullable=False
    )
    updated_at: dt.datetime = Field(
        default_factory=lambda: datetime.now(timezone.utc).replace(tzinfo=None), nullable=False
    )

    def __init__(self, **data: Any):
        # Ensure timestamps are set on creation if not provided
        if "created_at" not in data:
            data["created_at"] = datetime.now(timezone.utc).replace(tzinfo=None)
        if "updated_at" not in data:
            data["updated_at"] = datetime.now(timezone.utc).replace(tzinfo=None)
        super().__init__(**data)

    @model_validator(mode="before")
    @classmethod
    def validate_timestamps(cls, values):
        """Ensure updated_at is always set/updated."""
        if isinstance(values, dict):
            values["updated_at"] = datetime.now(timezone.utc).replace(tzinfo=None)
        return values

class-attribute instance-attribute

Database model for storing control policy configurations.

__tablename__ = 'policies' class-attribute instance-attribute

Database model for storing control policy configurations.

validate_timestamps(values) classmethod

Source code in luthien_control/db/sqlmodel_models.py
75
76
77
78
79
80
81
@model_validator(mode="before")
@classmethod
def validate_timestamps(cls, values):
    """Ensure updated_at is always set/updated."""
    if isinstance(values, dict):
        values["updated_at"] = datetime.now(timezone.utc).replace(tzinfo=None)
    return values

Ensure updated_at is always set/updated.

LuthienLog

Bases: SQLModel

Source code in luthien_control/db/sqlmodel_models.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class LuthienLog(SQLModel, table=True):
    """
    Represents a log entry in the Luthien logging system using SQLModel.

    Attributes:
        id: Unique identifier for the log entry (primary key).
        transaction_id: Identifier to group related log entries.
        datetime: Timestamp indicating when the log entry was generated (timezone-aware).
        data: JSON blob containing the primary logged data.
        datatype: String identifier for the nature and schema of 'data'.
        notes: JSON blob for additional contextual information.
    """

    # __tablename__ = "luthien_log" # SQLModel infers this or use class Config

    id: Optional[int] = Field(default=None, primary_key=True, index=True)
    transaction_id: str = Field(index=True, nullable=False)
    datetime: dt.datetime = Field(
        default_factory=lambda: datetime.now(timezone.utc),  # Store as UTC
        nullable=False,
        index=True,  # Add index directly here
    )
    data: Optional[dict[str, Any]] = Field(default=None, sa_column=Column(JsonBOrJson))
    datatype: str = Field(index=True, nullable=False)
    notes: Optional[dict[str, Any]] = Field(default=None, sa_column=Column(JsonBOrJson))

    # __table_args__ = (
    #     Index("ix_sqlmodel_luthien_log_transaction_id", "transaction_id"),
    #     Index("ix_sqlmodel_luthien_log_datetime", "datetime"),
    #     Index("ix_sqlmodel_luthien_log_datatype", "datatype"),
    #     {"extend_existing": True},
    # )

    # __repr__ is not automatically generated by SQLModel like Pydantic models,
    # but you can add one if desired.
    def __repr__(self) -> str:
        return (
            f"<LuthienLog(id={self.id}, "
            f"transaction_id='{self.transaction_id}', "
            f"datetime='{self.datetime}', "
            f"datatype='{self.datatype}')>"
        )

Represents a log entry in the Luthien logging system using SQLModel.

Attributes:

Name Type Description
id Optional[int]

Unique identifier for the log entry (primary key).

transaction_id str

Identifier to group related log entries.

datetime datetime

Timestamp indicating when the log entry was generated (timezone-aware).

data Optional[dict[str, Any]]

JSON blob containing the primary logged data.

datatype str

String identifier for the nature and schema of 'data'.

notes Optional[dict[str, Any]]

JSON blob for additional contextual information.

client_api_key_crud

create_api_key(session, api_key) async

Source code in luthien_control/db/client_api_key_crud.py
39
40
41
42
43
44
45
46
47
48
49
50
async def create_api_key(session: AsyncSession, api_key: ClientApiKey) -> Optional[ClientApiKey]:
    """Create a new API key in the database."""
    try:
        session.add(api_key)
        await session.commit()
        await session.refresh(api_key)
        logger.info(f"Successfully created API key with ID: {api_key.id}")
        return api_key
    except Exception as e:
        await session.rollback()
        logger.error(f"Error creating API key: {e}")
        return None

Create a new API key in the database.

get_api_key_by_value(session, key_value) async

Source code in luthien_control/db/client_api_key_crud.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
async def get_api_key_by_value(session: AsyncSession, key_value: str) -> Optional[ClientApiKey]:
    """Get an active API key by its value."""
    if not isinstance(session, AsyncSession):
        # This check might be better handled by type hinting/DI framework upstream
        logger.error("Invalid session object type passed to get_api_key_by_value")
        raise TypeError("Invalid session object provided to get_api_key_by_value.")
    try:
        stmt = select(ClientApiKey).where(
            ClientApiKey.key_value == key_value,  # type: ignore[arg-type]
            ClientApiKey.is_active,  # type: ignore[arg-type]
        )
        result = await session.execute(stmt)
        api_key = result.scalar_one_or_none()
        # logger.debug(f"API key lookup for value ending '...{key_value[-4:]}': {'Found' if api_key else 'Not Found'}")
        return api_key
    except Exception as e:
        # Avoid logging the key_value directly in case of errors if it's sensitive
        logger.error(f"Error fetching API key by value: {e}", exc_info=True)
        # Depending on policy, might want to raise exception instead of returning None
        return None

Get an active API key by its value.

list_api_keys(session, active_only=False) async

Source code in luthien_control/db/client_api_key_crud.py
53
54
55
56
57
58
59
60
61
62
63
64
65
async def list_api_keys(session: AsyncSession, active_only: bool = False) -> List[ClientApiKey]:
    """Get a list of all API keys."""
    try:
        if active_only:
            stmt = select(ClientApiKey).where(ClientApiKey.is_active)  # type: ignore[arg-type]
        else:
            stmt = select(ClientApiKey)

        result = await session.execute(stmt)
        return list(result.scalars().all())
    except Exception as e:
        logger.error(f"Error listing API keys: {e}")
        return []

Get a list of all API keys.

update_api_key(session, key_id, api_key_update) async

Source code in luthien_control/db/client_api_key_crud.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
async def update_api_key(session: AsyncSession, key_id: int, api_key_update: ClientApiKey) -> Optional[ClientApiKey]:
    """Update an existing API key."""
    try:
        stmt = select(ClientApiKey).where(ClientApiKey.id == key_id)  # type: ignore[arg-type]
        result = await session.execute(stmt)
        api_key = result.scalar_one_or_none()

        if not api_key:
            logger.warning(f"API key with ID {key_id} not found")
            return None

        # Update fields
        api_key.name = api_key_update.name
        api_key.is_active = api_key_update.is_active
        api_key.metadata_ = api_key_update.metadata_

        await session.commit()
        await session.refresh(api_key)
        logger.info(f"Successfully updated API key with ID: {api_key.id}")
        return api_key
    except Exception as e:
        await session.rollback()
        logger.error(f"Error updating API key: {e}")
        return None

Update an existing API key.

control_policy_crud

get_policy_by_name(session, name) async

Source code in luthien_control/db/control_policy_crud.py
44
45
46
47
48
49
50
51
52
53
54
55
56
async def get_policy_by_name(session: AsyncSession, name: str) -> Optional[DBControlPolicy]:
    """Get a policy by its name."""
    try:
        stmt = select(DBControlPolicy).where(
            DBControlPolicy.name == name,  # type: ignore[arg-type]
            DBControlPolicy.is_active,  # type: ignore[arg-type]
        )
        result = await session.execute(stmt)
        policy = result.scalar_one_or_none()
        return policy
    except Exception as e:
        logger.error(f"Error fetching policy by name '{name}': {e}", exc_info=True)
        return None

Get a policy by its name.

get_policy_config_by_name(session, name) async

Source code in luthien_control/db/control_policy_crud.py
143
144
145
146
147
148
149
150
151
152
153
async def get_policy_config_by_name(session: AsyncSession, name: str) -> Optional[DBControlPolicy]:
    """Get a policy configuration by its name, regardless of its active status."""
    if not isinstance(session, AsyncSession):
        raise TypeError("Invalid session object provided to get_policy_config_by_name.")
    try:
        stmt = select(DBControlPolicy).where(DBControlPolicy.name == name)  # type: ignore[arg-type]
        result = await session.execute(stmt)
        return result.scalar_one_or_none()
    except Exception as e:
        logger.error(f"Error fetching policy configuration by name '{name}': {e}", exc_info=True)
        return None

Get a policy configuration by its name, regardless of its active status.

list_policies(session, active_only=False) async

Source code in luthien_control/db/control_policy_crud.py
59
60
61
62
63
64
65
66
67
68
69
70
71
async def list_policies(session: AsyncSession, active_only: bool = False) -> List[DBControlPolicy]:
    """Get a list of all policies."""
    try:
        if active_only:
            stmt = select(DBControlPolicy).where(DBControlPolicy.is_active)  # type: ignore[arg-type]
        else:
            stmt = select(DBControlPolicy)

        result = await session.execute(stmt)
        return list(result.scalars().all())
    except Exception as e:
        logger.error(f"Error listing policies: {e}")
        return []

Get a list of all policies.

load_policy_from_db(name, container) async

Source code in luthien_control/db/control_policy_crud.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
async def load_policy_from_db(
    name: str,
    container: "DependencyContainer",
) -> "ABCControlPolicy":
    """Load a policy configuration from the database and instantiate it using the control_policy loader."""
    async with container.db_session_factory() as session:
        policy_name = await get_policy_by_name(session, name)

    if not policy_name:
        raise PolicyLoadError(f"Active policy configuration named '{name}' not found in database.")

    # Prepare the data for the simple loader
    policy_data_dict = {
        "type": policy_name.type,  # The loader uses this to find the class
        "config": policy_name.config or {},
    }

    # Construct the SerializedPolicy dataclass instance
    serialized_policy_obj = SerializedPolicy(type=policy_data_dict["type"], config=policy_data_dict["config"])

    try:
        instance = load_policy(serialized_policy_obj)
        logger.info(f"Successfully loaded and instantiated policy '{policy_name.name}' from database.")
        return instance
    except PolicyLoadError as e:
        logger.error(f"Failed to load policy '{name}' from database: {e}")
        raise e
    except Exception as e:
        logger.exception(f"Unexpected error loading policy '{name}' from database: {e}")
        raise PolicyLoadError(f"Unexpected error during loading process for '{name}'.") from e

Load a policy configuration from the database and instantiate it using the control_policy loader.

save_policy_to_db(session, policy) async

Source code in luthien_control/db/control_policy_crud.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
async def save_policy_to_db(session: AsyncSession, policy: DBControlPolicy) -> Optional[DBControlPolicy]:
    """Create a new policy in the database."""
    try:
        session.add(policy)
        await session.commit()
        await session.refresh(policy)
        logger.info(f"Successfully created policy with ID: {policy.id}")
        return policy
    except IntegrityError as ie:
        await session.rollback()
        logger.error(f"Integrity error creating policy: {ie}")
        raise  # Re-raise the specific integrity error
    except SQLAlchemyError as sqla_err:
        await session.rollback()
        logger.error(f"SQLAlchemy error creating policy: {sqla_err}")
        return None  # Or re-raise depending on desired handling
    except Exception as e:
        await session.rollback()
        logger.error(f"Error creating policy: {e}")
        return None

Create a new policy in the database.

update_policy(session, policy_id, policy_update) async

Source code in luthien_control/db/control_policy_crud.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
async def update_policy(
    session: AsyncSession, policy_id: int, policy_update: DBControlPolicy
) -> Optional[DBControlPolicy]:
    """Update an existing policy."""
    try:
        stmt = select(DBControlPolicy).where(DBControlPolicy.id == policy_id)  # type: ignore[arg-type]
        result = await session.execute(stmt)
        policy = result.scalar_one_or_none()

        if not policy:
            logger.warning(f"Policy with ID {policy_id} not found")
            return None

        # Update fields
        policy.name = policy_update.name
        policy.config = policy_update.config
        policy.is_active = policy_update.is_active
        policy.description = policy_update.description

        await session.commit()
        await session.refresh(policy)
        logger.info(f"Successfully updated policy with ID: {policy.id}")
        return policy
    except IntegrityError as ie:
        await session.rollback()
        logger.error(f"Integrity error updating policy: {ie}")
        raise  # Re-raise the specific integrity error
    except SQLAlchemyError as sqla_err:
        await session.rollback()
        logger.error(f"SQLAlchemy error updating policy: {sqla_err}")
        return None  # Or re-raise
    except Exception as e:
        await session.rollback()
        logger.error(f"Error updating policy: {e}")
        return None

Update an existing policy.

database_async

close_db_engine() async

Source code in luthien_control/db/database_async.py
116
117
118
119
120
121
122
123
124
125
126
127
128
async def close_db_engine() -> None:
    """Closes the database engine."""
    global _db_engine
    if _db_engine:
        try:
            await _db_engine.dispose()
            logger.info("Database engine closed successfully.")
        except Exception as e:
            logger.error(f"Error closing database engine: {e}", exc_info=True)
        finally:
            _db_engine = None
    else:
        logger.info("Database engine was already None or not initialized during shutdown.")

Closes the database engine.

create_db_engine() async

Source code in luthien_control/db/database_async.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
async def create_db_engine() -> AsyncEngine:
    """Creates the asyncpg engine for the application DB.
    Returns:
        The asyncpg engine for the application DB.

    Raises:
        LuthienDBConfigurationError: If the database configuration is invalid.
        LuthienDBConnectionError: If the database connection fails.
    """
    global _db_engine, _db_session_factory
    if _db_engine:
        logger.debug("Database engine already initialized.")
        return _db_engine

    logger.info("Attempting to create database engine...")

    db_url = _get_db_url()

    try:
        # Get and validate pool sizes
        pool_min_size = settings.get_main_db_pool_min_size()
        pool_max_size = settings.get_main_db_pool_max_size()

        _db_engine = create_async_engine(
            db_url,
            echo=False,  # Set to True for debugging SQL queries
            pool_pre_ping=True,
            pool_size=pool_min_size,
            max_overflow=pool_max_size - pool_min_size,
        )

        _db_session_factory = async_sessionmaker(
            _db_engine,
            expire_on_commit=False,
            class_=AsyncSession,
        )

        logger.info("Database engine created successfully.")
        return _db_engine
    except Exception as e:
        masked_url = _mask_password(db_url)
        raise LuthienDBConnectionError(f"Failed to create database engine using URL ({masked_url}): {e}")

Creates the asyncpg engine for the application DB. Returns: The asyncpg engine for the application DB.

Raises:

Type Description
LuthienDBConfigurationError

If the database configuration is invalid.

LuthienDBConnectionError

If the database connection fails.

get_db_session() async

Source code in luthien_control/db/database_async.py
131
132
133
134
135
136
137
138
139
140
141
142
@contextlib.asynccontextmanager
async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
    """Get a SQLAlchemy async session for the database as a context manager."""
    if _db_session_factory is None:
        raise RuntimeError("Database session factory has not been initialized")

    async with _db_session_factory() as session:
        try:
            yield session
        except Exception:
            await session.rollback()
            raise

Get a SQLAlchemy async session for the database as a context manager.

sqlmodel_models

ControlPolicy

Bases: SQLModel

Source code in luthien_control/db/sqlmodel_models.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class ControlPolicy(SQLModel, table=True):
    __tablename__ = "policies"  # type: ignore (again, shut up pyright)
    """Database model for storing control policy configurations."""

    # Primary key
    id: Optional[int] = Field(default=None, primary_key=True)

    # --- Core Fields ---
    name: str = Field(index=True, unique=True)  # Unique name used for lookup
    type: str = Field()  # Type of policy, used for instantiation
    config: dict[str, Any] = Field(default={}, sa_column=Column(JSON))
    is_active: bool = Field(default=True, index=True)
    description: Optional[str] = Field(default=None)

    # --- Timestamps ---
    created_at: dt.datetime = Field(
        default_factory=lambda: datetime.now(timezone.utc).replace(tzinfo=None), nullable=False
    )
    updated_at: dt.datetime = Field(
        default_factory=lambda: datetime.now(timezone.utc).replace(tzinfo=None), nullable=False
    )

    def __init__(self, **data: Any):
        # Ensure timestamps are set on creation if not provided
        if "created_at" not in data:
            data["created_at"] = datetime.now(timezone.utc).replace(tzinfo=None)
        if "updated_at" not in data:
            data["updated_at"] = datetime.now(timezone.utc).replace(tzinfo=None)
        super().__init__(**data)

    @model_validator(mode="before")
    @classmethod
    def validate_timestamps(cls, values):
        """Ensure updated_at is always set/updated."""
        if isinstance(values, dict):
            values["updated_at"] = datetime.now(timezone.utc).replace(tzinfo=None)
        return values
class-attribute instance-attribute

Database model for storing control policy configurations.

__tablename__ = 'policies' class-attribute instance-attribute

Database model for storing control policy configurations.

validate_timestamps(values) classmethod
Source code in luthien_control/db/sqlmodel_models.py
75
76
77
78
79
80
81
@model_validator(mode="before")
@classmethod
def validate_timestamps(cls, values):
    """Ensure updated_at is always set/updated."""
    if isinstance(values, dict):
        values["updated_at"] = datetime.now(timezone.utc).replace(tzinfo=None)
    return values

Ensure updated_at is always set/updated.

JsonBOrJson

Bases: TypeDecorator

Source code in luthien_control/db/sqlmodel_models.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class JsonBOrJson(types.TypeDecorator):
    """
    Represents a JSON type that uses JSONB for PostgreSQL and JSON for other dialects (like SQLite).

    This is mostly a hack for unit testing, as SQLite does not support JSONB.
    """

    impl = JSON  # Default implementation if dialect-specific is not found
    cache_ok = True  # Safe to cache this type decorator

    def load_dialect_impl(self, dialect):
        if dialect.name == "postgresql":
            return dialect.type_descriptor(JSONB())
        else:
            return dialect.type_descriptor(JSON())

Represents a JSON type that uses JSONB for PostgreSQL and JSON for other dialects (like SQLite).

This is mostly a hack for unit testing, as SQLite does not support JSONB.

LuthienLog

Bases: SQLModel

Source code in luthien_control/db/sqlmodel_models.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class LuthienLog(SQLModel, table=True):
    """
    Represents a log entry in the Luthien logging system using SQLModel.

    Attributes:
        id: Unique identifier for the log entry (primary key).
        transaction_id: Identifier to group related log entries.
        datetime: Timestamp indicating when the log entry was generated (timezone-aware).
        data: JSON blob containing the primary logged data.
        datatype: String identifier for the nature and schema of 'data'.
        notes: JSON blob for additional contextual information.
    """

    # __tablename__ = "luthien_log" # SQLModel infers this or use class Config

    id: Optional[int] = Field(default=None, primary_key=True, index=True)
    transaction_id: str = Field(index=True, nullable=False)
    datetime: dt.datetime = Field(
        default_factory=lambda: datetime.now(timezone.utc),  # Store as UTC
        nullable=False,
        index=True,  # Add index directly here
    )
    data: Optional[dict[str, Any]] = Field(default=None, sa_column=Column(JsonBOrJson))
    datatype: str = Field(index=True, nullable=False)
    notes: Optional[dict[str, Any]] = Field(default=None, sa_column=Column(JsonBOrJson))

    # __table_args__ = (
    #     Index("ix_sqlmodel_luthien_log_transaction_id", "transaction_id"),
    #     Index("ix_sqlmodel_luthien_log_datetime", "datetime"),
    #     Index("ix_sqlmodel_luthien_log_datatype", "datatype"),
    #     {"extend_existing": True},
    # )

    # __repr__ is not automatically generated by SQLModel like Pydantic models,
    # but you can add one if desired.
    def __repr__(self) -> str:
        return (
            f"<LuthienLog(id={self.id}, "
            f"transaction_id='{self.transaction_id}', "
            f"datetime='{self.datetime}', "
            f"datatype='{self.datatype}')>"
        )

Represents a log entry in the Luthien logging system using SQLModel.

Attributes:

Name Type Description
id Optional[int]

Unique identifier for the log entry (primary key).

transaction_id str

Identifier to group related log entries.

datetime datetime

Timestamp indicating when the log entry was generated (timezone-aware).

data Optional[dict[str, Any]]

JSON blob containing the primary logged data.

datatype str

String identifier for the nature and schema of 'data'.

notes Optional[dict[str, Any]]

JSON blob for additional contextual information.

exceptions

LuthienDBConfigurationError

Bases: LuthienDBException

Source code in luthien_control/exceptions.py
 7
 8
 9
10
class LuthienDBConfigurationError(LuthienDBException):
    """Exception raised when a database configuration is invalid or missing required variables."""

    pass

Exception raised when a database configuration is invalid or missing required variables.

LuthienDBConnectionError

Bases: LuthienDBException

Source code in luthien_control/exceptions.py
13
14
15
16
class LuthienDBConnectionError(LuthienDBException):
    """Exception raised when a connection to the database fails."""

    pass

Exception raised when a connection to the database fails.

LuthienDBException

Bases: Exception

Source code in luthien_control/exceptions.py
1
2
3
4
class LuthienDBException(Exception):
    """Base exception for all Luthien DB related errors."""

    pass

Base exception for all Luthien DB related errors.

main

health_check() async

Source code in luthien_control/main.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@app.get("/health", tags=["General"], status_code=200)
async def health_check():
    """Perform a basic health check.

    This endpoint can be used to verify that the application is running
    and responsive.

    Returns:
        A dictionary indicating the application status.
    """
    return {"status": "ok"}

Perform a basic health check.

This endpoint can be used to verify that the application is running and responsive.

Returns:

Type Description

A dictionary indicating the application status.

lifespan(app) async

Source code in luthien_control/main.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@asynccontextmanager
async def lifespan(app: FastAPI):
    """Manage the lifespan of the application resources.

    This asynchronous context manager handles the startup and shutdown events
    of the FastAPI application. It initializes dependencies on startup
    and ensures they are properly cleaned up on shutdown.

    Args:
        app: The FastAPI application instance.

    Yields:
        None: After startup procedures are complete, allowing the application to run.

    Raises:
        RuntimeError: If critical application dependencies fail to initialize during startup.
    """
    logger.info("Application startup sequence initiated.")

    # Startup: Load Settings
    app_settings = Settings()
    logger.info("Settings loaded.")

    # Startup: Initialize Application Dependencies via helper
    # This variable will hold the container if successfully created.
    initialized_dependencies: DependencyContainer | None = None
    try:
        initialized_dependencies = await initialize_app_dependencies(app_settings)
        app.state.dependencies = initialized_dependencies
        logger.info("Core application dependencies initialized and stored in app state.")

    except Exception as init_exc:
        # _initialize_app_dependencies is responsible for cleaning up resources it
        # attempted to create (like its own http_client) if it fails internally.
        # The main concern here is logging and ensuring the app doesn't start.
        logger.critical(f"Fatal error during application dependency initialization: {init_exc}", exc_info=True)
        # If _initialize_app_dependencies failed before creating db_engine, close_db_engine is safe.
        # If it failed *after* db_engine creation but before container, db_engine might be open.
        # The helper itself doesn't call close_db_engine(); it expects lifespan to do so.
        # Global close_db_engine handles if engine was never set or already closed.
        await close_db_engine()
        logger.info("DB Engine closed due to dependency initialization failure during startup.")
        # Re-raise to prevent application from starting up in a bad state.
        raise RuntimeError(
            f"Application startup failed due to dependency initialization error: {init_exc}"
        ) from init_exc

    yield  # Application runs here

    # Shutdown: Clean up resources
    logger.info("Application shutdown sequence initiated.")

    # Close main DB engine (handles its own check if already closed or never initialized)
    await close_db_engine()
    logger.info("Main DB Engine closed.")

    # Shutdown: Close the HTTP client via the container if available
    await initialized_dependencies.http_client.aclose()
    logger.info("HTTP Client from DependencyContainer closed.")

    logger.info("Application shutdown complete.")

Manage the lifespan of the application resources.

This asynchronous context manager handles the startup and shutdown events of the FastAPI application. It initializes dependencies on startup and ensures they are properly cleaned up on shutdown.

Parameters:

Name Type Description Default
app FastAPI

The FastAPI application instance.

required

Yields:

Name Type Description
None

After startup procedures are complete, allowing the application to run.

Raises:

Type Description
RuntimeError

If critical application dependencies fail to initialize during startup.

read_root() async

Source code in luthien_control/main.py
110
111
112
113
114
115
116
117
@app.get("/")
async def read_root():
    """Provide a simple root endpoint.

    Returns:
        A welcome message indicating the proxy is running.
    """
    return {"message": "Luthien Control Proxy is running."}

Provide a simple root endpoint.

Returns:

Type Description

A welcome message indicating the proxy is running.

proxy

orchestration

run_policy_flow(request, main_policy, dependencies, session) async

Source code in luthien_control/proxy/orchestration.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
async def run_policy_flow(
    request: fastapi.Request,
    main_policy: ControlPolicy,
    dependencies: DependencyContainer,
    session: AsyncSession,
) -> fastapi.Response:
    """
    Orchestrates the execution of the main ControlPolicy using injected dependencies.
    Exceptions raised by policies are expected to be caught by FastAPI exception handlers.

    Args:
        request: The incoming FastAPI request.
        main_policy: The main policy instance to execute.
        dependencies: The application's dependency container.
        session: The database session for this request.

    Returns:
        The final FastAPI response.
    """
    # 1. Initialize Context
    body = await request.body()
    context = _initialize_context(request, body)

    builder = ResponseBuilder()

    # 2. Apply the main policy
    try:
        policy_name = getattr(main_policy, "name", main_policy.__class__.__name__)
        logger.info(f"[{context.transaction_id}] Applying main policy: {policy_name}")

        # Call apply directly with context, container (dependencies), and session
        context = await main_policy.apply(context=context, container=dependencies, session=session)

        # Always call the builder after successful policy execution
        logger.info(f"[{context.transaction_id}] Policy execution complete. Building final response.")
        final_response = builder.build_response(context, dependencies)

    except ControlPolicyError as e:
        logger.warning(f"[{context.transaction_id}] Control policy error halted execution: {e}")
        # Directly build a JSONResponse for policy errors
        policy_name_for_error = getattr(e, "policy_name", "unknown")
        status_code = getattr(e, "status_code", status.HTTP_400_BAD_REQUEST)  # Use 400 if not specified
        error_detail = getattr(e, "detail", str(e))  # Use str(e) if no detail attribute

        final_response = JSONResponse(
            status_code=status_code,
            content={
                "detail": f"Policy error in '{policy_name_for_error}': {error_detail}",
                "transaction_id": str(context.transaction_id),
            },
        )

    except Exception as e:
        # Handle unexpected errors during initialization or policy execution
        logger.exception(f"[{context.transaction_id}] Unhandled exception during policy flow: {e}")
        # Try to build an error response using the builder
        policy_name_for_error = getattr(main_policy, "name", main_policy.__class__.__name__)
        try:
            final_response = builder.build_response(context, dependencies)
        except Exception as build_e:
            # Log the exception that occurred *during response building*
            logger.exception(
                f"[{context.transaction_id}] Exception occurred *during* error response building: "
                f"{build_e}. Original error was: {e}"
            )
            # Fallback to a basic JSONResponse, mentioning both errors if possible
            error_detail = f"Internal Server Error while processing policy '{policy_name_for_error}'."
            if dependencies.settings.dev_mode():
                # Include more detail if available
                error_detail += f" Initial error: {e}. Error during response building: {build_e}"
                error_detail += f"\n\nFull request: {context.request}"
            final_response = JSONResponse(
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                content={
                    "detail": error_detail,
                    "transaction_id": str(context.transaction_id),
                },
            )

    return final_response

Orchestrates the execution of the main ControlPolicy using injected dependencies. Exceptions raised by policies are expected to be caught by FastAPI exception handlers.

Parameters:

Name Type Description Default
request Request

The incoming FastAPI request.

required
main_policy ControlPolicy

The main policy instance to execute.

required
dependencies DependencyContainer

The application's dependency container.

required
session AsyncSession

The database session for this request.

required

Returns:

Type Description
Response

The final FastAPI response.

server

api_proxy_endpoint(request, full_path=default_path, dependencies=Depends(get_dependencies), main_policy=Depends(get_main_control_policy), session=Depends(get_db_session), payload=default_payload, token=Security(http_bearer_auth)) async

Source code in luthien_control/proxy/server.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@router.post(
    "/api/{full_path:path}",
)
async def api_proxy_endpoint(
    request: Request,
    full_path: str = default_path,
    # --- Core Dependencies ---
    dependencies: DependencyContainer = Depends(get_dependencies),
    main_policy: ControlPolicy = Depends(get_main_control_policy),
    session: AsyncSession = Depends(get_db_session),
    # --- Swagger UI Enhancements ---
    # The 'payload' and 'token' parameters enhance the Swagger UI:
    # - 'payload' (dict[str, Any], optional): Provides a schema for the request body.
    #   Actual body content is read directly from the 'request' object.
    # - 'token' (Optional[str]): Enables the 'Authorize' button (Bearer token).
    #   Actual token validation is handled by the policy flow.
    payload: dict[str, Any] = default_payload,
    token: Optional[str] = Security(http_bearer_auth),
):
    """
    Main API proxy endpoint using the policy orchestration flow.
    Handles requests starting with /api/.
    Uses Dependency Injection Container and provides a DB session.

    **Authentication Note:** This endpoint uses Bearer Token authentication
    (Authorization: Bearer <token>). However, the requirement for a valid token
    depends on whether the currently configured control policy includes client
    authentication (e.g., ClientApiKeyAuthPolicy). If the policy does not require
    authentication, the token field can be left blank.
    """
    logger.info(f"Authenticated request received for /api/{full_path}")

    # Orchestrate the policy flow
    response = await run_policy_flow(
        request=request,
        main_policy=main_policy,
        dependencies=dependencies,
        session=session,
    )

    logger.info(f"Returning response for {request.url.path}")
    return response

Main API proxy endpoint using the policy orchestration flow. Handles requests starting with /api/. Uses Dependency Injection Container and provides a DB session.

Authentication Note: This endpoint uses Bearer Token authentication (Authorization: Bearer ). However, the requirement for a valid token depends on whether the currently configured control policy includes client authentication (e.g., ClientApiKeyAuthPolicy). If the policy does not require authentication, the token field can be left blank.

api_proxy_get_endpoint(request, full_path=default_path, dependencies=Depends(get_dependencies), main_policy=Depends(get_main_control_policy), session=Depends(get_db_session), token=Security(http_bearer_auth)) async

Source code in luthien_control/proxy/server.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@router.get(
    "/api/{full_path:path}",
)
async def api_proxy_get_endpoint(
    request: Request,
    full_path: str = default_path,
    # --- Core Dependencies ---
    dependencies: DependencyContainer = Depends(get_dependencies),
    main_policy: ControlPolicy = Depends(get_main_control_policy),
    session: AsyncSession = Depends(get_db_session),
    # --- Swagger UI Enhancements ---
    # - 'token' (Optional[str]): Enables the 'Authorize' button (Bearer token).
    #   Actual token validation is handled by the policy flow.
    token: Optional[str] = Security(http_bearer_auth),
):
    """
    Main API proxy endpoint for GET requests using the policy orchestration flow.
    Handles GET requests starting with /api/.
    Uses Dependency Injection Container and provides a DB session.

    **Authentication Note:** This endpoint uses Bearer Token authentication
    (Authorization: Bearer <token>). However, the requirement for a valid token
    depends on whether the currently configured control policy includes client
    authentication (e.g., ClientApiKeyAuthPolicy). If the policy does not require
    authentication, the token field can be left blank.
    """
    logger.info(f"Authenticated GET request received for /api/{full_path}")

    # Orchestrate the policy flow
    response = await run_policy_flow(
        request=request,
        main_policy=main_policy,
        dependencies=dependencies,
        session=session,
    )

    logger.info(f"Returning response for GET {request.url.path}")
    return response

Main API proxy endpoint for GET requests using the policy orchestration flow. Handles GET requests starting with /api/. Uses Dependency Injection Container and provides a DB session.

Authentication Note: This endpoint uses Bearer Token authentication (Authorization: Bearer ). However, the requirement for a valid token depends on whether the currently configured control policy includes client authentication (e.g., ClientApiKeyAuthPolicy). If the policy does not require authentication, the token field can be left blank.

api_proxy_options_handler(full_path=default_path) async

Source code in luthien_control/proxy/server.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@router.options("/api/{full_path:path}")
async def api_proxy_options_handler(
    full_path: str = default_path,  # Keep for path consistency, though not used in this simple handler
):
    """
    Handles OPTIONS requests for the API proxy endpoint, indicating allowed methods.
    """
    logger.info(f"Explicit OPTIONS request received for /api/{full_path}")
    headers = {
        "Allow": "GET, POST, OPTIONS",
        "Access-Control-Allow-Origin": "*",  # Allow any origin
        "Access-Control-Allow-Methods": "GET, POST, OPTIONS",  # Allowed methods
        "Access-Control-Allow-Headers": "Authorization, Content-Type",  # Allowed headers
    }
    return Response(status_code=200, headers=headers)

Handles OPTIONS requests for the API proxy endpoint, indicating allowed methods.

settings

Settings

Source code in luthien_control/settings.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
class Settings:
    """Application configuration settings loaded from environment variables."""

    # --- Core Settings ---
    BACKEND_URL: Optional[str] = None
    # Comma-separated list of control policies for the beta framework

    # --- Database Settings ---
    DB_SERVER: str = "localhost"
    DB_USER: Optional[str] = None
    DB_PASSWORD: Optional[str] = None
    DB_NAME: Optional[str] = None
    DB_HOST: Optional[str] = None
    DB_PORT: Optional[int] = 5432

    # --- OpenAI Settings ---
    OPENAI_API_KEY: Optional[str] = None

    # --- Helper Methods using os.getenv ---
    def get_backend_url(self) -> Optional[str]:
        """Returns the backend URL as a string, if set."""
        url = os.getenv("BACKEND_URL")
        if url:
            # Basic validation (can be enhanced)
            parsed = urlparse(url)
            if not all([parsed.scheme, parsed.netloc]):
                raise ValueError(f"Invalid BACKEND_URL format: {url}")
        return url

    def get_database_url(self) -> Optional[str]:
        """Returns the primary DATABASE_URL, if set."""
        return os.getenv("DATABASE_URL")

    def get_openai_api_key(self) -> str | None:
        """Returns the OpenAI API key, if set."""
        return os.getenv("OPENAI_API_KEY")

    def get_top_level_policy_name(self) -> str:
        """Returns the name of the top-level policy instance to load."""
        return os.getenv("TOP_LEVEL_POLICY_NAME", "root")

    def get_policy_filepath(self) -> str | None:
        """Returns the path to the policy file, if set."""
        return os.getenv("POLICY_FILEPATH")

    # --- Database settings Getters using os.getenv ---
    def get_postgres_user(self) -> str | None:
        return os.getenv("DB_USER")

    def get_postgres_password(self) -> str | None:
        return os.getenv("DB_PASSWORD")

    def get_postgres_db(self) -> str | None:
        return os.getenv("DB_NAME")

    def get_postgres_host(self) -> str | None:
        return os.getenv("DB_HOST")

    def get_postgres_port(self) -> int | None:
        """Returns the PostgreSQL port as an integer, or None if not set."""
        port_str = os.getenv("DB_PORT")
        if port_str is None:
            return None
        try:
            return int(port_str)
        except ValueError:
            raise ValueError("DB_PORT environment variable must be an integer.")

    # --- DB Pool Size Getters ---
    def get_main_db_pool_min_size(self) -> int:
        """Returns the minimum pool size for the main DB."""
        try:
            return int(os.getenv("MAIN_DB_POOL_MIN_SIZE", "1"))
        except ValueError:
            raise ValueError("MAIN_DB_POOL_MIN_SIZE environment variable must be an integer.")

    def get_main_db_pool_max_size(self) -> int:
        """Returns the maximum pool size for the main DB."""
        try:
            return int(os.getenv("MAIN_DB_POOL_MAX_SIZE", "10"))
        except ValueError:
            raise ValueError("MAIN_DB_POOL_MAX_SIZE environment variable must be an integer.")

    # --- Logging Settings --- #
    def get_log_level(self, default: str = "INFO") -> str:
        """Gets the configured log level, defaulting if not set."""
        return os.getenv("LOG_LEVEL", default).upper()

    # uvicorn
    def get_app_host(self, default: str = "0.0.0.0") -> str:
        """Gets the configured app host, defaulting if not set."""
        return os.getenv("LUTHIEN_CONTROL_HOST", default)

    def get_app_port(self, default: int = 8000) -> int:
        """Gets the configured app port, defaulting if not set."""
        return int(os.getenv("LUTHIEN_CONTROL_PORT", default))

    def get_app_reload(self, default: bool = False) -> bool:
        """Gets the configured app reload, defaulting if not set."""
        reload = os.getenv("LUTHIEN_CONTROL_RELOAD")
        if reload is None:
            return default
        elif reload.lower() == "true":
            return True
        elif reload.lower() == "false":
            return False
        else:
            raise ValueError(f"LUTHIEN_CONTROL_RELOAD environment variable must be 'true' or 'false' (got {reload}).")

    # get_log_level is reused

    # --- Database DSN Helper Properties using Getters ---
    @property
    def admin_dsn(self) -> str:
        """DSN for connecting to the default 'postgres' db for admin tasks.
        Raises ValueError if required DB settings are missing.
        """
        user = self.get_postgres_user()
        password = self.get_postgres_password()
        host = self.get_postgres_host()
        port = self.get_postgres_port()

        if not all([user, password, host, port]):
            missing = [
                name
                for name, val in [("USER", user), ("PASSWORD", password), ("HOST", host), ("PORT", port)]
                if not val
            ]
            raise ValueError(f"Missing required database settings ({', '.join(missing)}) for admin_dsn")

        return f"postgresql://{user}:{password}@{host}:{port}/postgres"

    @property
    def base_dsn(self) -> str:
        """Base DSN without a specific database name.
        Raises ValueError if required DB settings are missing.
        """
        user = self.get_postgres_user()
        password = self.get_postgres_password()
        host = self.get_postgres_host()
        port = self.get_postgres_port()

        if not all([user, password, host, port]):
            missing = [
                name
                for name, val in [("USER", user), ("PASSWORD", password), ("HOST", host), ("PORT", port)]
                if not val
            ]
            raise ValueError(f"Missing required database settings ({', '.join(missing)}) for base_dsn")

        return f"postgresql://{user}:{password}@{host}:{port}"

    def get_db_dsn(self, db_name: str | None = None) -> str:
        """Returns the DSN for a specific database name, or the default DB_NAME.
        Raises ValueError if required DB settings or the target db_name are missing.
        """
        target_db = db_name or self.get_postgres_db()
        if not target_db:
            raise ValueError("Missing target database name (either provide db_name or set DB_NAME env var)")
        base = self.base_dsn  # Use property
        return f"{base}/{target_db}"

    def get_run_mode(self) -> str:
        """Returns the run mode, defaulting to 'prod' if not set."""
        return os.getenv("RUN_MODE", "prod")

    def dev_mode(self) -> bool:
        """Returns True if the run mode is 'dev', False otherwise."""
        return self.get_run_mode() == "dev"

Application configuration settings loaded from environment variables.

property

DSN for connecting to the default 'postgres' db for admin tasks. Raises ValueError if required DB settings are missing.

admin_dsn property

DSN for connecting to the default 'postgres' db for admin tasks. Raises ValueError if required DB settings are missing.

property

Base DSN without a specific database name. Raises ValueError if required DB settings are missing.

base_dsn property

Base DSN without a specific database name. Raises ValueError if required DB settings are missing.

dev_mode()

Source code in luthien_control/settings.py
177
178
179
def dev_mode(self) -> bool:
    """Returns True if the run mode is 'dev', False otherwise."""
    return self.get_run_mode() == "dev"

Returns True if the run mode is 'dev', False otherwise.

get_app_host(default='0.0.0.0')

Source code in luthien_control/settings.py
100
101
102
def get_app_host(self, default: str = "0.0.0.0") -> str:
    """Gets the configured app host, defaulting if not set."""
    return os.getenv("LUTHIEN_CONTROL_HOST", default)

Gets the configured app host, defaulting if not set.

get_app_port(default=8000)

Source code in luthien_control/settings.py
104
105
106
def get_app_port(self, default: int = 8000) -> int:
    """Gets the configured app port, defaulting if not set."""
    return int(os.getenv("LUTHIEN_CONTROL_PORT", default))

Gets the configured app port, defaulting if not set.

get_app_reload(default=False)

Source code in luthien_control/settings.py
108
109
110
111
112
113
114
115
116
117
118
def get_app_reload(self, default: bool = False) -> bool:
    """Gets the configured app reload, defaulting if not set."""
    reload = os.getenv("LUTHIEN_CONTROL_RELOAD")
    if reload is None:
        return default
    elif reload.lower() == "true":
        return True
    elif reload.lower() == "false":
        return False
    else:
        raise ValueError(f"LUTHIEN_CONTROL_RELOAD environment variable must be 'true' or 'false' (got {reload}).")

Gets the configured app reload, defaulting if not set.

get_backend_url()

Source code in luthien_control/settings.py
30
31
32
33
34
35
36
37
38
def get_backend_url(self) -> Optional[str]:
    """Returns the backend URL as a string, if set."""
    url = os.getenv("BACKEND_URL")
    if url:
        # Basic validation (can be enhanced)
        parsed = urlparse(url)
        if not all([parsed.scheme, parsed.netloc]):
            raise ValueError(f"Invalid BACKEND_URL format: {url}")
    return url

Returns the backend URL as a string, if set.

get_database_url()

Source code in luthien_control/settings.py
40
41
42
def get_database_url(self) -> Optional[str]:
    """Returns the primary DATABASE_URL, if set."""
    return os.getenv("DATABASE_URL")

Returns the primary DATABASE_URL, if set.

get_db_dsn(db_name=None)

Source code in luthien_control/settings.py
163
164
165
166
167
168
169
170
171
def get_db_dsn(self, db_name: str | None = None) -> str:
    """Returns the DSN for a specific database name, or the default DB_NAME.
    Raises ValueError if required DB settings or the target db_name are missing.
    """
    target_db = db_name or self.get_postgres_db()
    if not target_db:
        raise ValueError("Missing target database name (either provide db_name or set DB_NAME env var)")
    base = self.base_dsn  # Use property
    return f"{base}/{target_db}"

Returns the DSN for a specific database name, or the default DB_NAME. Raises ValueError if required DB settings or the target db_name are missing.

get_log_level(default='INFO')

Source code in luthien_control/settings.py
95
96
97
def get_log_level(self, default: str = "INFO") -> str:
    """Gets the configured log level, defaulting if not set."""
    return os.getenv("LOG_LEVEL", default).upper()

Gets the configured log level, defaulting if not set.

get_main_db_pool_max_size()

Source code in luthien_control/settings.py
87
88
89
90
91
92
def get_main_db_pool_max_size(self) -> int:
    """Returns the maximum pool size for the main DB."""
    try:
        return int(os.getenv("MAIN_DB_POOL_MAX_SIZE", "10"))
    except ValueError:
        raise ValueError("MAIN_DB_POOL_MAX_SIZE environment variable must be an integer.")

Returns the maximum pool size for the main DB.

get_main_db_pool_min_size()

Source code in luthien_control/settings.py
80
81
82
83
84
85
def get_main_db_pool_min_size(self) -> int:
    """Returns the minimum pool size for the main DB."""
    try:
        return int(os.getenv("MAIN_DB_POOL_MIN_SIZE", "1"))
    except ValueError:
        raise ValueError("MAIN_DB_POOL_MIN_SIZE environment variable must be an integer.")

Returns the minimum pool size for the main DB.

get_openai_api_key()

Source code in luthien_control/settings.py
44
45
46
def get_openai_api_key(self) -> str | None:
    """Returns the OpenAI API key, if set."""
    return os.getenv("OPENAI_API_KEY")

Returns the OpenAI API key, if set.

get_policy_filepath()

Source code in luthien_control/settings.py
52
53
54
def get_policy_filepath(self) -> str | None:
    """Returns the path to the policy file, if set."""
    return os.getenv("POLICY_FILEPATH")

Returns the path to the policy file, if set.

get_postgres_port()

Source code in luthien_control/settings.py
69
70
71
72
73
74
75
76
77
def get_postgres_port(self) -> int | None:
    """Returns the PostgreSQL port as an integer, or None if not set."""
    port_str = os.getenv("DB_PORT")
    if port_str is None:
        return None
    try:
        return int(port_str)
    except ValueError:
        raise ValueError("DB_PORT environment variable must be an integer.")

Returns the PostgreSQL port as an integer, or None if not set.

get_run_mode()

Source code in luthien_control/settings.py
173
174
175
def get_run_mode(self) -> str:
    """Returns the run mode, defaulting to 'prod' if not set."""
    return os.getenv("RUN_MODE", "prod")

Returns the run mode, defaulting to 'prod' if not set.

get_top_level_policy_name()

Source code in luthien_control/settings.py
48
49
50
def get_top_level_policy_name(self) -> str:
    """Returns the name of the top-level policy instance to load."""
    return os.getenv("TOP_LEVEL_POLICY_NAME", "root")

Returns the name of the top-level policy instance to load.