Files Upload/Download Example
This documentation describes the implementation of an Upload App, which demonstrates the management of AsyncFileSystem and SyncFileSystem for synchronous and asynchronous file operations. The app shows how Ray Remote Execution can be used for distributed file processing across multiple nodes.
Overview
The Upload App is an example of:
- Forms with File Upload: Using
InputFilesfor multiple file uploads - Ray Remote Execution: Parallel processing of files across distributed nodes
- AsyncFileSystem vs SyncFileSystem: Demonstration of the differences between synchronous and asynchronous file system operations
- File Download/Upload across Nodes: Ray Remote Functions enable file operations across different nodes
Step-by-Step Implementation
1. Imports and Setup
import asyncio, os, re, time
from pathlib import Path
from tempfile import mkdtemp
from traceback import format_exc
import fastapi
import uvicorn
from pptx import Presentation
from ray import remote, serve
from kodosumi.core import ServeAPI, forms as F, Tracer, Launch
from kodosumi.response import Markdown
app = ServeAPI()Explanation:
ray.remote: Decorator for Remote Functionsray.serve: For service deploymentkodosumi.core: Core components for Kodosumi appspptx.Presentation: For PowerPoint file processing (example only)
2. Ray Remote Function for File Processing
@remote
def process_file(file: str, tracer: Tracer, ignore_errors: bool = True):
fs = tracer.fs_sync() # SyncFileSystem for Remote Functions
tempfile = next(fs.download(file))
tracer.debug_sync(f"start processing `{file}`")
start_time = time.time()
try:
# File processing (here: PowerPoint to text)
prs = Presentation(tempfile)
text_runs = [
run.text for slide in prs.slides
for shape in slide.shapes if shape.has_text_frame
for paragraph in shape.text_frame.paragraphs
for run in paragraph.runs
]
# Create local markdown result file
tempdir = mkdtemp()
md_file = Path(tempdir) / Path(file).with_suffix(".md").name
content = re.sub(r"\s+", " ", " ".join(text_runs))
with md_file.open("w") as f:
f.write(content)
# Upload file to the flow execution
fs.upload(str(md_file))
# Remove the local result file
os.remove(md_file)
return {
"source": file,
"target": md_file.name,
"length": len(text_runs),
"words": len(content.split()),
"runtime": time.time() - start_time,
"error": None
}
except Exception as e:
if not ignore_errors:
raise RuntimeError(f"error processing `{file}`: {format_exc()}")
return {
"source": file,
"target": None,
"length": 0,
"words": 0,
"runtime": time.time() - start_time,
"error": str(e)
}
finally:
# Remove the downloaded file on the node
os.remove(tempfile)
fs.close()Key Points:
@remote: Makes the function a Ray Remote Functiontracer.fs_sync(): Uses SyncFileSystem since Ray Remote Functions are always synchronousfs.download(file): Downloads file from Kodosumi File Systemfs.upload(str(md_file)): Uploads processed file- Error Handling: Robust handling of processing errors
3. Main Function with AsyncFileSystem
async def run(inputs: dict, tracer: Tracer):
afs = await tracer.fs() # AsyncFileSystem for main function
files = await afs.ls("in") # List all uploaded files in `/in` folder
await tracer.markdown("### File Processing Started")
# Display file links
file_links = [
f"* [{f['path']}](/files/{tracer.fid}/{f['path']})" for f in files]
await tracer.markdown("\n".join(file_links))
# Parallel processing with Ray
futures = [
process_file.remote(
f['path'], tracer, ignore_errors=inputs['ignore_errors'])
for f in files
]
await afs.close()
results = await asyncio.gather(*futures)
# Summarize results
output = ["### File Processing Completed"]
for r in results:
err = r['error']
link = f"**ERROR:** {err}" if err else f"[markdown](/files/{tracer.fid}/out/{r['target']})"
output.append(
f"* {r['source']} with {r['length']} text runs in {r['runtime']:.2f}s - {link}"
)
return Markdown("\n".join(output))Key Points:
await tracer.fs(): Uses AsyncFileSystem for asynchronous operationsawait afs.ls("in"): Asynchronous file listingprocess_file.remote(): Starts Remote Functionsasyncio.gather(*futures): Waits for all Remote Functionsawait afs.close(): Properly closes AsyncFileSystem
4. Form Definition
model = F.Model(
F.Markdown("""
# Parallel Text Processing
This application demonstrates the use of:
* Forms with text input and file upload
* `InputsFile` for file processing
* Ray for parallel processing
"""),
F.Break(),
F.HR(),
F.InputFiles(
label="Upload Files",
name="files",
multiple=True,
directory=False,
required=False
),
F.HR(),
F.Submit("Start"),
F.Cancel("Cancel"),
F.Checkbox(
name="ignore_errors",
option="ignore errors",
value=True
),
)Form Elements:
F.InputFiles: Multiple file uploadF.Checkbox: Option to ignore errorsF.Submit/F.Cancel: Action buttons
5. Endpoint Registration
@app.enter(
path="/",
model=model,
summary="Text Processing with Ray",
description="Demonstrates forms, InputsFile and Ray for parallel file processing.",
version="0.1.0",
author="[email protected]",
organization="Kodosumi Examples",
tags=["Test"]
)
async def enter(request: fastapi.Request, inputs: dict):
return Launch(request, "kodosumi_examples.upload.app:run", inputs=inputs)6. Ray Serve Deployment
@serve.deployment
@serve.ingress(app)
class TextProcessor: pass
fast_app = TextProcessor.bind() # type: ignore
if __name__ == "__main__":
uvicorn.run(
"kodosumi_examples.upload.app:app",
host="0.0.0.0",
port=8013,
reload=True
)AsyncFileSystem vs SyncFileSystem
AsyncFileSystem
- Usage: In asynchronous functions (
async def) - Methods:
await fs.ls(),await fs.upload(),await fs.close() - Advantages: Non-blocking, better performance for I/O operations
- Example: Main function
run()
SyncFileSystem
- Usage: In synchronous functions (Ray Remote Functions)
- Methods:
fs.ls(),fs.upload(),fs.close() - Advantages: Easier to use, compatible with Ray Remote Functions
- Example:
process_file()Remote Function
Ray Remote Execution
Why Ray Remote Functions?
- Distributed Processing: Files can be processed on different nodes
- Scalability: Automatic load balancing
- Fault Tolerance: Error handling at node level
- Resource Management: Efficient use of CPU and memory
Remote Function Lifecycle
- File Upload → Kodosumi File System
- AsyncFileSystem.ls() → Get file list
- process_file.remote() → Start Remote Function
- SyncFileSystem.download() → Download file to remote node
- File Processing → PowerPoint to text
- SyncFileSystem.upload() → Upload result
- Result Return → To main function
Running the Upload Example
This section provides step-by-step instructions for running the Upload App example from the kodosumi-examples repository.
Prerequisites
- Python Environment: Python 3.12 or higher
- Ray Cluster: Running Ray cluster (local or distributed)
- Kodosumi: Installed and configured
- Dependencies: Required packages for the example
Setup Instructions
1. Clone the Examples Repository
git clone https://github.com/masumi-network/kodosumi-examples.git
cd kodosumi-examples2. Install Dependencies
# Install the example package
pip install -e .
# Install additional dependencies for the upload example
pip install python-pptx # For PowerPoint file processing3. Start Ray Cluster
# Start Ray head node
ray start --head
# Verify Ray is running
ray status4. Deploy the Upload Example
Create a deployment configuration file data/config/upload_example.yaml and a Ray serve deployment file data/config/config.yaml:
# upload_example.yaml
name: upload_example
route_prefix: /upload
import_path: kodosumi_examples.upload.app:fast_app# config.yaml
proxy_location: EveryNode
http_options:
host: 127.0.0.1
port: 8001
grpc_options:
port: 9001
grpc_servicer_functions: []
logging_config:
encoding: TEXT
log_level: DEBUG
logs_dir: null
enable_access_log: trueDeploy both the Ray and the service configuration:
# Deploy using Kodosumi CLI
koco deploy --run --file ./data/config/config.yaml5. Start Kodosumi Services
# Start the spooler daemon
koco spool
# Start the panel web interface
koco serve --register http://localhost:8001/-/routes6. Access the Application
Open your web browser and navigate to http://localhost:3370/.

