API Endpoints
The Files Management API provides the following main endpoints:
Batch Upload Initialization
# POST /files/init_batch
response = await client.post("/files/init_batch")
batch_id = response.json()["batch_id"]
Response Example:
{
"batch_id": "550e8400-e29b-41d4-a716-446655440000"
}
Single Upload Initialization
# POST /files/init
payload = {
"filename": "document.txt",
"total_chunks": 5,
"batch_id": "optional-batch-id"
}
response = await client.post("/files/init", json=payload)
upload_id = response.json()["upload_id"]
Response Example:
{
"upload_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"batch_id": "550e8400-e29b-41d4-a716-446655440000"
}
Chunk Upload
# POST /files/chunk
form_data = {
"upload_id": upload_id,
"chunk_number": "0"
}
files = {
"chunk": ("chunk_0", chunk_data, "application/octet-stream")
}
response = await client.post("/files/chunk", data=form_data, files=files)
Response Example:
{
"status": "chunk received",
"chunk_number": 0,
"received_chunks": 1
}
Error Response Example:
{
"error": "Invalid upload ID - upload not initialized"
}
Note that the maximum size of a single chunk is 1mB (1024 * 1024 bytes).
Upload Completion
# POST /files/complete/{dir_type}
payload = {
"upload_id": upload_id,
"filename": "document.txt",
"total_chunks": 5,
"batch_id": batch_id
}
response = await client.post("/files/complete/in", json=payload)
Request Payload:
{
"upload_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"filename": "document.txt",
"total_chunks": 5,
"batch_id": "550e8400-e29b-41d4-a716-446655440000"
}
Response Example:
{
"status": "upload complete",
"completion_id": "550e8400-e29b-41d4-a716-446655440000",
"batch_id": "550e8400-e29b-41d4-a716-446655440000",
"final_file": "document.txt",
"final_path": "550e8400-e29b-41d4-a716-446655440000/document.txt"
}
Error Response Examples:
{
"error": "Invalid upload ID"
}
{
"error": "Not all chunks uploaded"
}
{
"error": "Missing chunk files: [2, 3]"
}
Automatic Upload Completion on Flow Start
Important: Upload completion is automatically triggered when a flow is started. This happens in the following process:
- Form Submission: When a user submits a form with file uploads
- Flow Start: The flow is started with a unique flow ID (
fid
)
- Automatic Completion: All incomplete uploads are automatically completed with the flow ID as
completion_id
File Listing
# GET /files/{fid}/{dir_type}
response = await client.get("/files/flow-id/in")
Response Example:
[
{
"path": "in/docs/readme.txt",
"size": 1024,
"is_directory": false
},
{
"path": "in/src/main.py",
"size": 2048,
"is_directory": false
},
{
"path": "in/src/utils/",
"size": 0,
"is_directory": true
}
]
File Download
# GET /files/{fid}/{dir_type}/{filename}
response = await client.get("/files/flow-id/in/document.txt")
Response Headers:
Content-Type: application/octet-stream
Content-Disposition: attachment; filename="document.txt"
Content-Length: 1024
Response Body: Binary file content
Upload Cancellation
# POST /files/cancel
payload = {
"upload_id": upload_id
}
response = await client.post("/files/cancel", json=payload)
Request Payload:
{
"upload_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
}
Response Example:
{
"status": "upload cancelled"
}
Constraints and Limitations
Directory Structure
-
Allowed directories
in/
# Input files
out/
# Output files
-
Not allowed directories
temp/
# Not supported
cache/
# Not supported
Path Validation
- Path traversal and absolute paths are not allowed
File Sizes and Chunking
- Chunk Size must not exceed 1024 * 1024 bytes (1mB)
Best Practices Summary
Recommended Practices
- Use Context Managers for automatic resource cleanup
- Implement error handling for robust applications
- Use streaming for large files instead of loading everything into memory
- Implement retry logic for network operations
- Use SyncFileSystem in Ray Remote Functions
- Validate paths before upload operations
- Leverage automatic completion when starting flows
- Use batch uploads for multiple files
Practices to Avoid
- Manual resource management without context managers
- Loading large files completely into memory
- Using AsyncFileSystem in Ray Remote Functions
- Allowing path traversal
- Ignoring errors without proper handling
- Hardcoding JWT tokens in production code
- Manual completion when automatic completion is available
- Uploading files individually instead of using batches
Example
Below is a step-by-step guide using the synchronous httpx.Client
to upload a real file in chunks. Each step includes a short explanation and the corresponding code. The upload will be automatically completed with the launch of flow execution.
Step 0: Setup imports and constants
Define the API base URL, the path to a real file in your working directory, and the chunk size (1 MiB as constrained above).
import os
import math
import httpx
BASE_URL = "http://localhost:3370"
FILE_PATH = "./assets/architecture.png"
CHUNK_SIZE = 1024 * 1024 # 1 MiB
Step 1: Create a synchronous HTTP client
Manage connections and authenticate.
resp = httpx.post(
f"{BASE_URL}/api/login",
json={"name": "admin", "password": "admin"})
api_key = resp.json().get("KODOSUMI_API_KEY")
client = httpx.Client(
base_url=BASE_URL,
headers={"KODOSUMI_API_KEY": api_key})
Step 2: Initialize a batch
Group this upload under a single batch identifier.
resp = client.post("/files/init_batch")
resp.raise_for_status()
batch_id = resp.json()["batch_id"]
Step 3: Calculate file size and total chunks
Compute how many chunks are required for the chosen file.
file_size = os.path.getsize(FILE_PATH)
total_chunks = math.ceil(file_size / CHUNK_SIZE)
filename = os.path.basename(FILE_PATH)
Step 4: Initialize the upload
Inform the server about the filename, expected chunk count, and the batch to associate with.
init_payload = {
"filename": filename,
"total_chunks": total_chunks,
"batch_id": batch_id,
}
resp = client.post("/files/init", json=init_payload)
resp.raise_for_status()
upload_id = resp.json()["upload_id"]
Step 5: Upload chunks (0-based)
Purpose: Read the file in CHUNK_SIZE
slices and upload each as a separate chunk.
with open(FILE_PATH, "rb") as fh:
for i in range(total_chunks):
chunk_bytes = fh.read(CHUNK_SIZE)
assert chunk_bytes, "Unexpected end of file"
form_data = {"upload_id": upload_id, "chunk_number": str(i)}
files = {
"chunk": (f"chunk_{i}",
chunk_bytes, "application/octet-stream")
}
resp = client.post("/files/chunk", data=form_data, files=files)
resp.raise_for_status()
Step 6: Launch Flow Execution
The upload will be automatically completed and assembled to the flow’s execution ID on POST
. Assuming the flow`s input scheme elements are:
"elements": [
{
"type": "file",
"name": "my_upload",
...
},
]
The POST
to launch this flow with
import json
complete_payload = {
"name": "no",
"my_upload": json.dumps({
"batchId": batch_id,
"items": {
upload_id: {
"filename": filename,
"totalChunks": total_chunks
}
}
})
}
resp = client.post("/-/127.0.0.1/8125/-/", json=complete_payload)
resp.raise_for_status()
THe form file upload form element (my_upload
) requires a json stringified value with keys batchId
and items
. The items
represent key/value pairs with the individual files’ upload_id
as the key and filename
and totalChunks
as values.
Close the client when done