API Endpoints

The Files Management API provides the following main endpoints:

Batch Upload Initialization

# POST /files/init_batch
response = await client.post("/files/init_batch")
batch_id = response.json()["batch_id"]
Response Example:
{
    "batch_id": "550e8400-e29b-41d4-a716-446655440000"
}

Single Upload Initialization

# POST /files/init
payload = {
    "filename": "document.txt",
    "total_chunks": 5,
    "batch_id": "optional-batch-id"
}
response = await client.post("/files/init", json=payload)
upload_id = response.json()["upload_id"]
Response Example:
{
    "upload_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
    "batch_id": "550e8400-e29b-41d4-a716-446655440000"
}

Chunk Upload

# POST /files/chunk
form_data = {
    "upload_id": upload_id,
    "chunk_number": "0"
}
files = {
    "chunk": ("chunk_0", chunk_data, "application/octet-stream")
}
response = await client.post("/files/chunk", data=form_data, files=files)
Response Example:
{
    "status": "chunk received",
    "chunk_number": 0,
    "received_chunks": 1
}
Error Response Example:
{
    "error": "Invalid upload ID - upload not initialized"
}
Note that the maximum size of a single chunk is 1mB (1024 * 1024 bytes).

Upload Completion

# POST /files/complete/{dir_type}
payload = {
    "upload_id": upload_id,
    "filename": "document.txt", 
    "total_chunks": 5,
    "batch_id": batch_id
}
response = await client.post("/files/complete/in", json=payload)
Request Payload:
{
    "upload_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
    "filename": "document.txt",
    "total_chunks": 5,
    "batch_id": "550e8400-e29b-41d4-a716-446655440000"
}
Response Example:
{
    "status": "upload complete",
    "completion_id": "550e8400-e29b-41d4-a716-446655440000",
    "batch_id": "550e8400-e29b-41d4-a716-446655440000",
    "final_file": "document.txt",
    "final_path": "550e8400-e29b-41d4-a716-446655440000/document.txt"
}
Error Response Examples:
{
    "error": "Invalid upload ID"
}
{
    "error": "Not all chunks uploaded"
}
{
    "error": "Missing chunk files: [2, 3]"
}

Automatic Upload Completion on Flow Start

Important: Upload completion is automatically triggered when a flow is started. This happens in the following process:
  1. Form Submission: When a user submits a form with file uploads
  2. Flow Start: The flow is started with a unique flow ID (fid)
  3. Automatic Completion: All incomplete uploads are automatically completed with the flow ID as completion_id

File Listing

# GET /files/{fid}/{dir_type}
response = await client.get("/files/flow-id/in")
Response Example:
[
    {
        "path": "in/docs/readme.txt",
        "size": 1024,
        "is_directory": false
    },
    {
        "path": "in/src/main.py", 
        "size": 2048,
        "is_directory": false
    },
    {
        "path": "in/src/utils/",
        "size": 0,
        "is_directory": true
    }
]

File Download

# GET /files/{fid}/{dir_type}/{filename}
response = await client.get("/files/flow-id/in/document.txt")
Response Headers:
Content-Type: application/octet-stream
Content-Disposition: attachment; filename="document.txt"
Content-Length: 1024
Response Body: Binary file content

Upload Cancellation

# POST /files/cancel
payload = {
    "upload_id": upload_id
}
response = await client.post("/files/cancel", json=payload)
Request Payload:
{
    "upload_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
}
Response Example:
{
    "status": "upload cancelled"
}

Constraints and Limitations

Directory Structure

  • Allowed directories
    • in/ # Input files
    • out/ # Output files
  • Not allowed directories
    • temp/ # Not supported
    • cache/ # Not supported

Path Validation

  • Path traversal and absolute paths are not allowed

File Sizes and Chunking

  • Chunk Size must not exceed 1024 * 1024 bytes (1mB)

Best Practices Summary

  1. Use Context Managers for automatic resource cleanup
  2. Implement error handling for robust applications
  3. Use streaming for large files instead of loading everything into memory
  4. Implement retry logic for network operations
  5. Use SyncFileSystem in Ray Remote Functions
  6. Validate paths before upload operations
  7. Leverage automatic completion when starting flows
  8. Use batch uploads for multiple files

Practices to Avoid

  1. Manual resource management without context managers
  2. Loading large files completely into memory
  3. Using AsyncFileSystem in Ray Remote Functions
  4. Allowing path traversal
  5. Ignoring errors without proper handling
  6. Hardcoding JWT tokens in production code
  7. Manual completion when automatic completion is available
  8. Uploading files individually instead of using batches

Example

Below is a step-by-step guide using the synchronous httpx.Client to upload a real file in chunks. Each step includes a short explanation and the corresponding code. The upload will be automatically completed with the launch of flow execution.

Step 0: Setup imports and constants

Define the API base URL, the path to a real file in your working directory, and the chunk size (1 MiB as constrained above).
import os
import math
import httpx

BASE_URL = "http://localhost:3370"
FILE_PATH = "./assets/architecture.png"
CHUNK_SIZE = 1024 * 1024  # 1 MiB

Step 1: Create a synchronous HTTP client

Manage connections and authenticate.
resp = httpx.post(
    f"{BASE_URL}/api/login", 
    json={"name": "admin", "password": "admin"})
api_key = resp.json().get("KODOSUMI_API_KEY")
client = httpx.Client(
    base_url=BASE_URL,
    headers={"KODOSUMI_API_KEY": api_key})

Step 2: Initialize a batch

Group this upload under a single batch identifier.
resp = client.post("/files/init_batch")
resp.raise_for_status()
batch_id = resp.json()["batch_id"]

Step 3: Calculate file size and total chunks

Compute how many chunks are required for the chosen file.
file_size = os.path.getsize(FILE_PATH)
total_chunks = math.ceil(file_size / CHUNK_SIZE)
filename = os.path.basename(FILE_PATH)

Step 4: Initialize the upload

Inform the server about the filename, expected chunk count, and the batch to associate with.
init_payload = {
    "filename": filename,
    "total_chunks": total_chunks,
    "batch_id": batch_id,
}
resp = client.post("/files/init", json=init_payload)
resp.raise_for_status()
upload_id = resp.json()["upload_id"]

Step 5: Upload chunks (0-based)

Purpose: Read the file in CHUNK_SIZE slices and upload each as a separate chunk.
with open(FILE_PATH, "rb") as fh:
    for i in range(total_chunks):
        chunk_bytes = fh.read(CHUNK_SIZE)
        assert chunk_bytes, "Unexpected end of file"
        form_data = {"upload_id": upload_id, "chunk_number": str(i)}
        files = {
            "chunk": (f"chunk_{i}", 
            chunk_bytes, "application/octet-stream")
        }
        resp = client.post("/files/chunk", data=form_data, files=files)
        resp.raise_for_status()

Step 6: Launch Flow Execution

The upload will be automatically completed and assembled to the flow’s execution ID on POST. Assuming the flow`s input scheme elements are:
"elements": [
    {
      "type": "file",
      "name": "my_upload",
      ...
    },
]
The POST to launch this flow with
import json

complete_payload = {
    "name": "no",
    "my_upload": json.dumps({
        "batchId": batch_id,
        "items": {
            upload_id: {
                "filename": filename,
                "totalChunks": total_chunks
            }
        }
    })
}
resp = client.post("/-/127.0.0.1/8125/-/", json=complete_payload)
resp.raise_for_status()
THe form file upload form element (my_upload) requires a json stringified value with keys batchId and items. The items represent key/value pairs with the individual files’ upload_id as the key and filename and totalChunks as values.

Close the client when done

client.close()