Build an API with FastAPI on AWS Lambda using AWS SAM
FastAPI is a blazing fast async python framework for building APIs. You can run it on a typical server (like EC2) using an ASGI server. The FastAPI documentation uses uvicorn which is simple to use, fast and dependable.
In this example though I am designing this to run on AWS Lambda with API Gateway as the public entry point thus removing the need for the ASGI service all together. In theory, this enables this solution to be highly scalable and cost efficient.
Just to demonstrate just how good this architecture is I will add the following features along the way:
Routes
OAuth2
API Documentation
Web Content Templates
Static Files
Data Models and MySQL JSON Datatype
Uploading large files to S3 bucket (greater than 10mb which is the API Gateway fixed limit)
How to kick off longer running lambda tasks: lambda vs SNS queues
Websockets
Simple Objective
We are going to make a "Trophy" api that allows us to add trophies and then retrieve trophies. That's it! However, around this simple API we will add some features and patterns that you can use to apply in a much broader sense to yur own use case.
Setup and Pre-requisits
Install what you need
This is what I used to build this project:
iMac running Catalina 10.15.4
An AWS account
Pre-requisites
Make sure you have the following installed and configured:
1. python 3.8
1. aws-sam-cli
1. pip
At the teminal prompt type these commands to check the versions:
1
2
3
python3 --version
sam --version
pip3 --version
Lets setup your project folders.
Create a new folder called trophy-api-sam and then cd into it.
1
2
mkdir trophy-api-sam
cd trophy-api-sam
Create the base files for FastAPI
Create the following files and folder structure. Just create blank files for now.
# Created by https://www.gitignore.io/api/osx,node,linux,windows### Linux ###*~# temporary files which can be created if a process still has a handle open of a deleted file.fuse_hidden*# KDE directory preferences.directory# Linux trash folder which might appear on any partition or disk.Trash-*# .nfs files are created when an open file is removed but is still being accessed.nfs*### Node#### Logslogs*.lognpm-debug.log*yarn-debug.log*yarn-error.log*# Runtime datapids*.pid*.seed*.pid.lock# Directory for instrumented libs generated by jscoverage/JSCoverlib-cov# Coverage directory used by tools like istanbulcoverage# nyc test coverage.nyc_output# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files).grunt# Bower dependency directory (https://bower.io/)bower_components# node-waf configuration.lock-wscript# Compiled binary addons (http://nodejs.org/api/addons.html)build/Release# Dependency directoriesnode_modules/jspm_packages/# Typescript v1 declaration filestypings/# Optional npm cache directory.npm# Optional eslint cache.eslintcache# Optional REPL history.node_repl_history# Output of 'npm pack'*.tgz# Yarn Integrity file.yarn-integrity# dotenv environment variables file.env# python*.pyc### OSX###*.DS_Store.AppleDouble.LSOverride# Icon must end with two \rIcon# Thumbnails._*# Files that might appear in the root of a volume.DocumentRevisions-V100.fseventsd.Spotlight-V100.TemporaryItems.Trashes.VolumeIcon.icns.com.apple.timemachine.donotpresent# Directories potentially created on remote AFP share.AppleDB.AppleDesktopNetwork Trash FolderTemporary Items.apdisk### Windows#### Windows thumbnail cache filesThumbs.dbehthumbs.dbehthumbs_vista.db# Folder config fileDesktop.ini# Recycle Bin used on file shares$RECYCLE.BIN/# Windows Installer files*.cab*.msi*.msm*.msp# Windows shortcuts*.lnk# aws-sam build files.aws-sam# End of https://www.gitignore.io/api/osx,node,linux,windows
cd into api folder and create the following:
1
2
3
4
5
6
7
8
9
# make folders
mkdir helper
mkdir models
mkdir routers
# make files
touch app.py
touch config.py
touch requirements.txt
# fastapi requirements
fastapi
pydantic
email_validator
# templating and static files
jinja2
aiofiles
# aws library and helper tools
boto3
mangum
# halpers for working with form field data
python-multipart
# helpers for working with images
pillow
pdf2image
libgravatar
# database connectors
pymysql
sqlalchemy
# auth tools
python-jose[cryptography]
passlib[bcrypt]
# Stuff for the config fileimportosclassbaseConfig:SECRET_KEY='ee631a9695b9462fa3e374cd2ca1790e'APP_DB={"type":"mysql","server":"xxx.xxx.xxx.xxx","user":"root","password":"xxxx","db":"my_private_db","port":3306}ENV='development'DEBUG=TrueMAX_CONTENT_LENGTH=200*1024*1024TEMPLATES_AUTO_RELOAD=Noneclassdevelopment(baseConfig):SECRET_KEY='efb2d2d98eaf4ec98ed0b33b3751eb67'APP_DB={"type":"mysql","server":"xxx.xxx.xxx.xxx","user":"root","password":"xxxx","db":"my_private_db","port":3306}ENV='development'DEBUG=TrueMAX_CONTENT_LENGTH=200*1024*1024TEMPLATES_AUTO_RELOAD=Trueclassproduction(baseConfig):SECRET_KEY='39fba934f6bc4cdea52fd2b4183a74e5'APP_DB={"type":"mysql","server":"xxx.xxx.xxx.xxx","user":"root","password":"xxxx","db":"my_private_db","port":3306}ENV='production'DEBUG=FalseMAX_CONTENT_LENGTH=200*1024*1024TEMPLATES_AUTO_RELOAD=False# Look for the environment and set config accordinglyenv=os.environ.get('env','development')try:config=globals()[env]exceptExceptionase:config={'error':e}print(e)
fromfastapiimportFastAPI,Dependsfromroutersimporttrophy,ui,inbound_filesfromconfigimportconfigfromstarlette.requestsimportRequestfromstarlette.responsesimportResponsefrommangumimportMangum# this is the main fastapi objectapp=FastAPI(title="Trophies",description="Demo trophy keeper",version="1.0",)# this is how we add routes configured in routersapp.include_router(trophy.router,prefix="/trophy",tags=["trophy"],responses={404:{"description":"Not Found"}},)# this is how we add routes configured in routersapp.include_router(inbound_files.router,prefix="/api/inboundfiles",tags=["inboundfiles"],responses={404:{"description":"Not Found"}},)# this is how we add routes configured in routersapp.include_router(ui.router,prefix="/ui",tags=["ui"],responses={404:{"description":"Not Found"}},)# set test url@app.get("/test")defroot_path(request:Request):print(request.state.test)return{"message":"The simple test is working"}# this is a custom middleware handler for http@app.middleware("http")asyncdefdb_session_middleware(request:Request,call_next):response=Response("Internal server error",status_code=500)try:# this is where you can add objects to the request object that is passed to the route logic.# Typically you would pass a database connect ro security creds# eg: request.state.db = SessionLocal()# This passes the SQLAlchemy connection object with the requestrequest.state.test='this is passed to the function'# Add in the url prefix that exists when using api gateway but will be empty when testing locallyifrequest['aws.event']['requestContext'].get('path',None)=='/{proxy+}':# this is a local instance to prefix is emptyrequest.state.url_prefix=''else:request.state.url_prefix='/'+request['aws.event']['requestContext'].get('stage','prod')response=awaitcall_next(request)finally:pass#request.state.db.close()returnresponse# To enable this to connect via API Gateway we need to wrap this app with Mangumhandler=Mangum(app,enable_lifespan=False)
cd into helper folder
1
2
3
# make these files
touch __init__.py
touch Dict2Obj.py
classdict2obj(object):"""Object view of a dict, updating the passed in dict when values are setor deleted. "obj" the contents of a dict...: """def__init__(self,d):# since __setattr__ is overridden, self.__dict = d doesn't workobject.__setattr__(self,'_dict2obj__dict',d)# Dictionary-like access / updatesdef__getitem__(self,name):ifnamenotinself.__dict:returnNonevalue=self.__dict[name]ifisinstance(value,dict):# recursively view sub-dicts as objectsvalue=dict2obj(value)returnvaluedef__setitem__(self,name,value):self.__dict[name]=valuedef__delitem__(self,name):delself.__dict[name]# Object-like access / updatesdef__getattr__(self,name):returnself[name]def__setattr__(self,name,value):self[name]=valuedef__delattr__(self,name):delself[name]def__repr__(self):return"%s(%r)"%(type(self).__name__,self.__dict)def__str__(self):returnstr(self.__dict)
cd into models folder
1
2
3
# make these files
touch __init__.py
touch trophy.py
cd into routers folder
1
2
3
# make these files
touch __init__.py
touch trophy.py
cd back up a folder to api. Now we are ready to start
1
cd ..
Set up the virtual environment
In the App root folder we need to run these commands to create the virtual environment for this app and ensure all required libraries are added to that location so they can be included when AWS SAM builds the project in later steps.
1
python3 -m venv .env
After the init process completes and the virtualenv is created, you can use the following
step to activate your virtualenv.
1
source .env/bin/activate
Once the virtualenv is activated, you can install the required dependencies. CD into the api folder and pip install dependancies.
1
2
cd api
pip install -r requirements.txt
First local test
To test this locally we need to run the AWS SAM command to invoke our code inside a docker container that recreates the AWS serverless environment to simulate API Gateway and Lambda. Make sure you are in the project root folder. The template.yaml file references the folder api
1
2
3
4
# build the project first
sam build
# then run locally
sam local start-api
NOTE: If you get an error when you make an api call {"message":"Missing Authentication Token"}. There is a path that can't be mapped without special configuration and that is /. If you want to make a simple test path then use something like /test and this will return a correct result
The Deployment Step
Now we have an initial api built we should deploy it to AWS.
Setup
Initially you will need to create an S3 bucket for the coe to be deployed to. AWS uses S3 to hold resources before deploying to their final destinations.
Go to the AWS console then to S3 and create a bucket. Choose a name according to current GroupIT naming conventions
Create the samconfig.toml file in the project root. Add this to the file:
In the s3-bucket variable replace the current text with your new bucket name.
Simple Deployments
Now deployments are really simple
Make sure you are logged into the correct aws account via okta-awscli
1
okta-awscli --profile default
Now just run:
1
sam deploy
Security using OAuth2
The security model I have chosen to implement is the OAuth2 flow using JWT (Json Web Tokens). How the users are stored is another matter completely. In this example I will use a dictionary object called fake_db that has a user called johndoe. Further in this section we will change this to use a users table in mysql.
Make a new file in the folder routers called security.py then paste this code into it.
fromdatetimeimportdatetime,timedeltafromfastapiimportDepends,APIRouter,HTTPException,Responsefromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromstarlette.requestsimportRequestfromstarlette.statusimportHTTP_401_UNAUTHORIZED,HTTP_403_FORBIDDEN,HTTP_307_TEMPORARY_REDIRECTfromtypingimportList,OptionalfromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModelfromlibgravatarimportGravatarimportjson# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","position":"Developer","email":"anthonyg@gji.com.au","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassUser(BaseModel):username:stremail:str=Noneposition:str=Nonefull_name:str=Nonegravatar_url:str=Nonedisabled:bool=NoneclassUserInDB(User):hashed_password:str# extend the OAuth2PasswordBearer to enable cookie access_token to be accepted as well as Authorization: Bearer in headerclassOAuth2PasswordBearerWithCookie(OAuth2PasswordBearer):asyncdef__call__(self,request:Request)->Optional[str]:# check headers for Authorization tokenauthorization:str=request.headers.get("Authorization",None)ifauthorization:scheme,_,param=authorization.partition(" ")else:# check cookie for access_tokenauthorization:str=request.cookies.get("access_token",None)scheme='bearer'param=authorizationifnotauthorizationorscheme.lower()!="bearer":ifself.auto_error:raiseHTTPException(status_code=HTTP_401_UNAUTHORIZED,detail="Not authenticated",headers={"WWW-Authenticate":"Bearer"},)else:returnNonereturnparampwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearerWithCookie(tokenUrl="/auth/token",auto_error=False)router=APIRouter()# helper functions for passwordsdefverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)# get user from database based on username/uiddefget_user(db,username:str):ifusernameindb:user_dict=db[username]# add the gravatar url to the user_dictg=Gravatar(user_dict['email'])user_dict['gravatar_url']=g.get_image()returnUserInDB(**user_dict)# check that the users hashed password matches the hash we have storeddefauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuser# create an access tokendefcreate_access_token(*,data:dict,expires_delta:timedelta=None):to_encode=data.copy()ifexpires_delta:expire=datetime.utcnow()+expires_deltaelse:expire=datetime.utcnow()+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwt# get the current user from the passed access tokenasyncdefget_current_user(request:Request,token:str=Depends(oauth2_scheme)):path=request['path']# redirect for UI onlyifpath[:4]=='/ui/':credentials_exception=HTTPException(status_code=HTTP_307_TEMPORARY_REDIRECT,headers={"location":f"{request.state.url_prefix}/ui/signin"},)else:credentials_exception=HTTPException(status_code=HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)iftokenisNone:raisecredentials_exceptiontry:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionexceptJWTError:raisecredentials_exceptionuser=get_user(fake_users_db,username=username)ifuserisNone:raisecredentials_exceptionreturnuser# wrapper function to get the current user from the passed access tokenasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user# this endpoint is part of OAuth2 and gives a token when supplied correct username and password@router.post("/token",response_model=Token)asyncdeflogin_for_access_token(response:Response,form_data:OAuth2PasswordRequestForm=Depends()):user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)# create tokenaccess_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)# set cookieresponse.set_cookie(key="access_token",value=access_token,path='/')return{"access_token":access_token,"token_type":"bearer"}# Logout@router.get("/logout")asyncdeflogout_html(response:Response,request:Request):redirect=HTTPException(status_code=HTTP_307_TEMPORARY_REDIRECT,headers={"location":f"{request.state.url_prefix}/ui/signin","set-cookie":"access_token=; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT"})raiseredirect
Edit the app.py and add/edit the highlighted lines:
fromfastapiimportFastAPI,Dependsfromroutersimporttrophy,security,ui,inbound_filesfromconfigimportconfigfromstarlette.requestsimportRequestfromstarlette.responsesimportResponsefrommangumimportMangum# this is the main fastapi objectapp=FastAPI(title="Trophies",description="Demo trophy keeper",version="1.0",)# this is how we add routes configured in routersapp.include_router(trophy.router,prefix="/trophy",tags=["trophy"],responses={404:{"description":"Not Found"}},)# this is how we add routes configured in routersapp.include_router(security.router,prefix="/auth",tags=["auth"],responses={404:{"description":"Not Found"}},)# this is how we add routes configured in routersapp.include_router(inbound_files.router,prefix="/api/inboundfiles",tags=["inboundfiles"],responses={404:{"description":"Not Found"}},)# this is how we add routes configured in routersapp.include_router(ui.router,prefix="/ui",tags=["ui"],responses={404:{"description":"Not Found"}},)# set test url@app.get("/test")defroot_path(request:Request,current_user:security.User=Depends(security.get_current_active_user)):print(request.state.test)return{"message":"The simple test is working"}# this is a custom middleware handler for http@app.middleware("http")asyncdefdb_session_middleware(request:Request,call_next):response=Response("Internal server error",status_code=500)try:# this is where you can add objects to the request object that is passed to the route logic.# Typically you would pass a database connect ro security creds# eg: request.state.db = SessionLocal()# This passes the SQLAlchemy connection object with the requestrequest.state.test='this is passed to the function'# Add in the url prefix that exists when using api gateway but will be empty when testing locallyifrequest['aws.event']['requestContext'].get('path',None)=='/{proxy+}':# this is a local instance to prefix is emptyrequest.state.url_prefix=''else:request.state.url_prefix='/'+request['aws.event']['requestContext'].get('stage','prod')response=awaitcall_next(request)finally:pass#request.state.db.close()returnresponse# To enable this to connect via API Gateway we need to wrap this app with Mangumhandler=Mangum(app,enable_lifespan=False)