Multiple Modules

An app to upload and query files to S3 Glacier on Scaleway split in multiple modules.

The upload endpoint allows you to upload files to Glacier via the file form-data key:

echo -e "Hello world!\n My contents will be stored in a bunker!" > myfile.dat
curl -F file=@myfile.dat <upload-function-url>

This example is there to showcase how to split handlers into different Python modules.

Deploying

Deployment can be done with scw_serverless:

pip install scw_serverless
scw-serverless deploy app.py

Configuration

Here’s all the environments variables that needs to be passed when deploying:

Variable

Description

Required

SCW_SECRET_KEY

Secret key to use for S3 operations

:heavy_check_mark:

SCW_ACCESS_KEY

Access key to use for S3 operations

:heavy_check_mark:

S3_BUCKET

Name of the bucket to store files into.

:heavy_check_mark:

Sources

import logging
import os

from scw_serverless.app import Serverless

logging.basicConfig(level=logging.INFO)

app = Serverless(
    "multiple-modules",
    secret={
        "SCW_ACCESS_KEY": os.environ["SCW_ACCESS_KEY"],
        "SCW_SECRET_KEY": os.environ["SCW_SECRET_KEY"],
    },
    env={"S3_BUCKET": os.environ["S3_BUCKET"]},
)

import query  # noqa
import upload  # pylint: disable=all # noqa
import logging
from typing import Any

from app import app
from s3 import bucket
from streaming_form_data import StreamingFormDataParser
from streaming_form_data.targets import ValueTarget


@app.func()
def upload(event: dict[str, Any], _context: dict[str, Any]) -> dict[str, Any]:
    """Upload form data to S3 Glacier."""

    headers = event["headers"]
    parser = StreamingFormDataParser(headers=headers)

    target = ValueTarget()
    parser.register("file", target)

    body: str = event["body"]
    parser.data_received(body.encode("utf-8"))

    if not (len(target.value) > 0 and target.multipart_filename):
        return {"statusCode": 400}

    name = target.multipart_filename

    logging.info("Uploading file %s to Glacier on %s", name, bucket.name)
    bucket.put_object(Key=name, Body=target.value, StorageClass="GLACIER")

    return {"statusCode": 200}
import json
import os
from typing import Any

from app import app
from s3 import bucket


@app.func(
    description="List objects in S3 uploads.",
    privacy="public",
    env={"LIMIT": "100"},
    min_scale=0,
    max_scale=2,
    memory_limit=128,
    timeout="300s",
)
def query(_event: dict[str, Any], _context: dict[str, Any]) -> dict[str, Any]:
    """A handler to list objects in a S3 bucket."""

    response = []

    for obj in bucket.objects.limit(count=int(os.environ["LIMIT"])):
        response.append(
            {
                "name": obj.key,
                "last_modified": obj.last_modified.strftime("%m/%d/%Y, %H:%M:%S"),
                "storage_class": obj.storage_class,
            }
        )

    return {
        "statusCode": 200,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps(response),
    }