|
import binascii |
|
from base64 import b64decode |
|
from typing import Optional |
|
|
|
from fastapi.exceptions import HTTPException |
|
from fastapi.openapi.models import HTTPBase as HTTPBaseModel |
|
from fastapi.openapi.models import HTTPBearer as HTTPBearerModel |
|
from fastapi.security.base import SecurityBase |
|
from fastapi.security.utils import get_authorization_scheme_param |
|
from pydantic import BaseModel |
|
from starlette.requests import Request |
|
from starlette.status import HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN |
|
from typing_extensions import Annotated, Doc |
|
|
|
|
|
class HTTPBasicCredentials(BaseModel): |
|
""" |
|
The HTTP Basic credendials given as the result of using `HTTPBasic` in a |
|
dependency. |
|
|
|
Read more about it in the |
|
[FastAPI docs for HTTP Basic Auth](https://fastapi.tiangolo.com/advanced/security/http-basic-auth/). |
|
""" |
|
|
|
username: Annotated[str, Doc("The HTTP Basic username.")] |
|
password: Annotated[str, Doc("The HTTP Basic password.")] |
|
|
|
|
|
class HTTPAuthorizationCredentials(BaseModel): |
|
""" |
|
The HTTP authorization credentials in the result of using `HTTPBearer` or |
|
`HTTPDigest` in a dependency. |
|
|
|
The HTTP authorization header value is split by the first space. |
|
|
|
The first part is the `scheme`, the second part is the `credentials`. |
|
|
|
For example, in an HTTP Bearer token scheme, the client will send a header |
|
like: |
|
|
|
``` |
|
Authorization: Bearer deadbeef12346 |
|
``` |
|
|
|
In this case: |
|
|
|
* `scheme` will have the value `"Bearer"` |
|
* `credentials` will have the value `"deadbeef12346"` |
|
""" |
|
|
|
scheme: Annotated[ |
|
str, |
|
Doc( |
|
""" |
|
The HTTP authorization scheme extracted from the header value. |
|
""" |
|
), |
|
] |
|
credentials: Annotated[ |
|
str, |
|
Doc( |
|
""" |
|
The HTTP authorization credentials extracted from the header value. |
|
""" |
|
), |
|
] |
|
|
|
|
|
class HTTPBase(SecurityBase): |
|
def __init__( |
|
self, |
|
*, |
|
scheme: str, |
|
scheme_name: Optional[str] = None, |
|
description: Optional[str] = None, |
|
auto_error: bool = True, |
|
): |
|
self.model = HTTPBaseModel(scheme=scheme, description=description) |
|
self.scheme_name = scheme_name or self.__class__.__name__ |
|
self.auto_error = auto_error |
|
|
|
async def __call__( |
|
self, request: Request |
|
) -> Optional[HTTPAuthorizationCredentials]: |
|
authorization = request.headers.get("Authorization") |
|
scheme, credentials = get_authorization_scheme_param(authorization) |
|
if not (authorization and scheme and credentials): |
|
if self.auto_error: |
|
raise HTTPException( |
|
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated" |
|
) |
|
else: |
|
return None |
|
return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials) |
|
|
|
|
|
class HTTPBasic(HTTPBase): |
|
""" |
|
HTTP Basic authentication. |
|
|
|
## Usage |
|
|
|
Create an instance object and use that object as the dependency in `Depends()`. |
|
|
|
The dependency result will be an `HTTPBasicCredentials` object containing the |
|
`username` and the `password`. |
|
|
|
Read more about it in the |
|
[FastAPI docs for HTTP Basic Auth](https://fastapi.tiangolo.com/advanced/security/http-basic-auth/). |
|
|
|
## Example |
|
|
|
```python |
|
from typing import Annotated |
|
|
|
from fastapi import Depends, FastAPI |
|
from fastapi.security import HTTPBasic, HTTPBasicCredentials |
|
|
|
app = FastAPI() |
|
|
|
security = HTTPBasic() |
|
|
|
|
|
@app.get("/users/me") |
|
def read_current_user(credentials: Annotated[HTTPBasicCredentials, Depends(security)]): |
|
return {"username": credentials.username, "password": credentials.password} |
|
``` |
|
""" |
|
|
|
def __init__( |
|
self, |
|
*, |
|
scheme_name: Annotated[ |
|
Optional[str], |
|
Doc( |
|
""" |
|
Security scheme name. |
|
|
|
It will be included in the generated OpenAPI (e.g. visible at `/docs`). |
|
""" |
|
), |
|
] = None, |
|
realm: Annotated[ |
|
Optional[str], |
|
Doc( |
|
""" |
|
HTTP Basic authentication realm. |
|
""" |
|
), |
|
] = None, |
|
description: Annotated[ |
|
Optional[str], |
|
Doc( |
|
""" |
|
Security scheme description. |
|
|
|
It will be included in the generated OpenAPI (e.g. visible at `/docs`). |
|
""" |
|
), |
|
] = None, |
|
auto_error: Annotated[ |
|
bool, |
|
Doc( |
|
""" |
|
By default, if the HTTP Basic authentication is not provided (a |
|
header), `HTTPBasic` will automatically cancel the request and send the |
|
client an error. |
|
|
|
If `auto_error` is set to `False`, when the HTTP Basic authentication |
|
is not available, instead of erroring out, the dependency result will |
|
be `None`. |
|
|
|
This is useful when you want to have optional authentication. |
|
|
|
It is also useful when you want to have authentication that can be |
|
provided in one of multiple optional ways (for example, in HTTP Basic |
|
authentication or in an HTTP Bearer token). |
|
""" |
|
), |
|
] = True, |
|
): |
|
self.model = HTTPBaseModel(scheme="basic", description=description) |
|
self.scheme_name = scheme_name or self.__class__.__name__ |
|
self.realm = realm |
|
self.auto_error = auto_error |
|
|
|
async def __call__( |
|
self, request: Request |
|
) -> Optional[HTTPBasicCredentials]: |
|
authorization = request.headers.get("Authorization") |
|
scheme, param = get_authorization_scheme_param(authorization) |
|
if self.realm: |
|
unauthorized_headers = {"WWW-Authenticate": f'Basic realm="{self.realm}"'} |
|
else: |
|
unauthorized_headers = {"WWW-Authenticate": "Basic"} |
|
if not authorization or scheme.lower() != "basic": |
|
if self.auto_error: |
|
raise HTTPException( |
|
status_code=HTTP_401_UNAUTHORIZED, |
|
detail="Not authenticated", |
|
headers=unauthorized_headers, |
|
) |
|
else: |
|
return None |
|
invalid_user_credentials_exc = HTTPException( |
|
status_code=HTTP_401_UNAUTHORIZED, |
|
detail="Invalid authentication credentials", |
|
headers=unauthorized_headers, |
|
) |
|
try: |
|
data = b64decode(param).decode("ascii") |
|
except (ValueError, UnicodeDecodeError, binascii.Error): |
|
raise invalid_user_credentials_exc |
|
username, separator, password = data.partition(":") |
|
if not separator: |
|
raise invalid_user_credentials_exc |
|
return HTTPBasicCredentials(username=username, password=password) |
|
|
|
|
|
class HTTPBearer(HTTPBase): |
|
""" |
|
HTTP Bearer token authentication. |
|
|
|
## Usage |
|
|
|
Create an instance object and use that object as the dependency in `Depends()`. |
|
|
|
The dependency result will be an `HTTPAuthorizationCredentials` object containing |
|
the `scheme` and the `credentials`. |
|
|
|
## Example |
|
|
|
```python |
|
from typing import Annotated |
|
|
|
from fastapi import Depends, FastAPI |
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer |
|
|
|
app = FastAPI() |
|
|
|
security = HTTPBearer() |
|
|
|
|
|
@app.get("/users/me") |
|
def read_current_user( |
|
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)] |
|
): |
|
return {"scheme": credentials.scheme, "credentials": credentials.credentials} |
|
``` |
|
""" |
|
|
|
def __init__( |
|
self, |
|
*, |
|
bearerFormat: Annotated[Optional[str], Doc("Bearer token format.")] = None, |
|
scheme_name: Annotated[ |
|
Optional[str], |
|
Doc( |
|
""" |
|
Security scheme name. |
|
|
|
It will be included in the generated OpenAPI (e.g. visible at `/docs`). |
|
""" |
|
), |
|
] = None, |
|
description: Annotated[ |
|
Optional[str], |
|
Doc( |
|
""" |
|
Security scheme description. |
|
|
|
It will be included in the generated OpenAPI (e.g. visible at `/docs`). |
|
""" |
|
), |
|
] = None, |
|
auto_error: Annotated[ |
|
bool, |
|
Doc( |
|
""" |
|
By default, if the HTTP Bearer token not provided (in an |
|
`Authorization` header), `HTTPBearer` will automatically cancel the |
|
request and send the client an error. |
|
|
|
If `auto_error` is set to `False`, when the HTTP Bearer token |
|
is not available, instead of erroring out, the dependency result will |
|
be `None`. |
|
|
|
This is useful when you want to have optional authentication. |
|
|
|
It is also useful when you want to have authentication that can be |
|
provided in one of multiple optional ways (for example, in an HTTP |
|
Bearer token or in a cookie). |
|
""" |
|
), |
|
] = True, |
|
): |
|
self.model = HTTPBearerModel(bearerFormat=bearerFormat, description=description) |
|
self.scheme_name = scheme_name or self.__class__.__name__ |
|
self.auto_error = auto_error |
|
|
|
async def __call__( |
|
self, request: Request |
|
) -> Optional[HTTPAuthorizationCredentials]: |
|
authorization = request.headers.get("Authorization") |
|
scheme, credentials = get_authorization_scheme_param(authorization) |
|
if not (authorization and scheme and credentials): |
|
if self.auto_error: |
|
raise HTTPException( |
|
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated" |
|
) |
|
else: |
|
return None |
|
if scheme.lower() != "bearer": |
|
if self.auto_error: |
|
raise HTTPException( |
|
status_code=HTTP_403_FORBIDDEN, |
|
detail="Invalid authentication credentials", |
|
) |
|
else: |
|
return None |
|
return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials) |
|
|
|
|
|
class HTTPDigest(HTTPBase): |
|
""" |
|
HTTP Digest authentication. |
|
|
|
## Usage |
|
|
|
Create an instance object and use that object as the dependency in `Depends()`. |
|
|
|
The dependency result will be an `HTTPAuthorizationCredentials` object containing |
|
the `scheme` and the `credentials`. |
|
|
|
## Example |
|
|
|
```python |
|
from typing import Annotated |
|
|
|
from fastapi import Depends, FastAPI |
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPDigest |
|
|
|
app = FastAPI() |
|
|
|
security = HTTPDigest() |
|
|
|
|
|
@app.get("/users/me") |
|
def read_current_user( |
|
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)] |
|
): |
|
return {"scheme": credentials.scheme, "credentials": credentials.credentials} |
|
``` |
|
""" |
|
|
|
def __init__( |
|
self, |
|
*, |
|
scheme_name: Annotated[ |
|
Optional[str], |
|
Doc( |
|
""" |
|
Security scheme name. |
|
|
|
It will be included in the generated OpenAPI (e.g. visible at `/docs`). |
|
""" |
|
), |
|
] = None, |
|
description: Annotated[ |
|
Optional[str], |
|
Doc( |
|
""" |
|
Security scheme description. |
|
|
|
It will be included in the generated OpenAPI (e.g. visible at `/docs`). |
|
""" |
|
), |
|
] = None, |
|
auto_error: Annotated[ |
|
bool, |
|
Doc( |
|
""" |
|
By default, if the HTTP Digest not provided, `HTTPDigest` will |
|
automatically cancel the request and send the client an error. |
|
|
|
If `auto_error` is set to `False`, when the HTTP Digest is not |
|
available, instead of erroring out, the dependency result will |
|
be `None`. |
|
|
|
This is useful when you want to have optional authentication. |
|
|
|
It is also useful when you want to have authentication that can be |
|
provided in one of multiple optional ways (for example, in HTTP |
|
Digest or in a cookie). |
|
""" |
|
), |
|
] = True, |
|
): |
|
self.model = HTTPBaseModel(scheme="digest", description=description) |
|
self.scheme_name = scheme_name or self.__class__.__name__ |
|
self.auto_error = auto_error |
|
|
|
async def __call__( |
|
self, request: Request |
|
) -> Optional[HTTPAuthorizationCredentials]: |
|
authorization = request.headers.get("Authorization") |
|
scheme, credentials = get_authorization_scheme_param(authorization) |
|
if not (authorization and scheme and credentials): |
|
if self.auto_error: |
|
raise HTTPException( |
|
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated" |
|
) |
|
else: |
|
return None |
|
if scheme.lower() != "digest": |
|
raise HTTPException( |
|
status_code=HTTP_403_FORBIDDEN, |
|
detail="Invalid authentication credentials", |
|
) |
|
return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials) |
|
|