Multiple Modules
An app to upload and query files to S3 Glacier on Scaleway split in multiple modules.
The upload endpoint allows you to upload files to Glacier via the file
form-data key:
echo -e "Hello world!\n My contents will be stored in a bunker!" > myfile.dat
curl -F file=@myfile.dat <upload-function-url>
This example is there to showcase how to split handlers into different Python modules.
Deploying
Deployment can be done with scw_serverless
:
pip install scw_serverless
scw-serverless deploy app.py
Configuration
Here’s all the environments variables that needs to be passed when deploying:
Variable |
Description |
Required |
---|---|---|
|
Secret key to use for S3 operations |
:heavy_check_mark: |
|
Access key to use for S3 operations |
:heavy_check_mark: |
|
Name of the bucket to store files into. |
:heavy_check_mark: |
Sources
import logging
import os
from scw_serverless.app import Serverless
logging.basicConfig(level=logging.INFO)
app = Serverless(
"multiple-modules",
secret={
"SCW_ACCESS_KEY": os.environ["SCW_ACCESS_KEY"],
"SCW_SECRET_KEY": os.environ["SCW_SECRET_KEY"],
},
env={"S3_BUCKET": os.environ["S3_BUCKET"]},
)
import query # noqa
import upload # pylint: disable=all # noqa
import logging
from typing import Any
from app import app
from s3 import bucket
from streaming_form_data import StreamingFormDataParser
from streaming_form_data.targets import ValueTarget
@app.func()
def upload(event: dict[str, Any], _context: dict[str, Any]) -> dict[str, Any]:
"""Upload form data to S3 Glacier."""
headers = event["headers"]
parser = StreamingFormDataParser(headers=headers)
target = ValueTarget()
parser.register("file", target)
body: str = event["body"]
parser.data_received(body.encode("utf-8"))
if not (len(target.value) > 0 and target.multipart_filename):
return {"statusCode": 400}
name = target.multipart_filename
logging.info("Uploading file %s to Glacier on %s", name, bucket.name)
bucket.put_object(Key=name, Body=target.value, StorageClass="GLACIER")
return {"statusCode": 200}
import json
import os
from typing import Any
from app import app
from s3 import bucket
@app.func(
description="List objects in S3 uploads.",
privacy="public",
env={"LIMIT": "100"},
min_scale=0,
max_scale=2,
memory_limit=128,
timeout="300s",
)
def query(_event: dict[str, Any], _context: dict[str, Any]) -> dict[str, Any]:
"""A handler to list objects in a S3 bucket."""
response = []
for obj in bucket.objects.limit(count=int(os.environ["LIMIT"])):
response.append(
{
"name": obj.key,
"last_modified": obj.last_modified.strftime("%m/%d/%Y, %H:%M:%S"),
"storage_class": obj.storage_class,
}
)
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(response),
}