The Kodosumi Files Management API provides two main classes for working with files: AsyncFileSystem and SyncFileSystem. Both classes enable you to download files uploaded by the user and to upload files created during flow execution. Files and folders uploaded by the user are located in a root folder /in (input files). Files and folders created during flow execution which will be delivered back to the user are located in a root folder /out (output files).

Overview

AsyncFileSystem vs SyncFileSystem

FeatureAsyncFileSystemSyncFileSystem
UsageAsynchronous functions (async def)Synchronous functions
PerformanceNon-blocking, better I/O performanceBlocking, easier to use
Ray Remote FunctionsNot compatibleCompatible
Context Managerasync withwith

AsyncFileSystem and SyncFileSystem Classes

Basic Structure

Both classes share the same interface but differ in implementation. As a developer of agentic services you access both classes through the Tracer object:
# agentic service entrypoint
async def run(inputs: dict, tracer: Tracer):
    fs = await tracer.fs()
    files = fs.ls("in")
    # ... additional operations
    await fs.close()
Access SyncFileSystem if you pass the tracer object to a Ray remote function. This constraint is due to the fact that Ray remote functions do not support async execution.
import ray

# agentic service entrypoint spawning remote functions
async def run(inputs: dict, tracer: Tracer):
    fs = await tracer.fs()
    files = fs.ls("in")
    await fs.close()
    futures = [process.remote(f['path'], tracer) for f in files]

@ray.remote
def process(file: str, tracer: Tracer):
    fs = tracer.fs_sync()
    files = fs.ls("in")
    # ... additional operations
    fs.close()

Working with Context Managers

Both AsyncFileSystem and SyncFileSystem support context managers for automatic resource cleanup:

AsyncFileSystem Context Manager

async def run(inputs: dict, tracer: Tracer):
    async with await tracer.fs() as fs:
        # File system operations here
        files = await fs.ls("in")
        # Automatic cleanup when exiting the context

SyncFileSystem Context Manager

def process(file: str, tracer: Tracer):
    with tracer.fs_sync() as fs:
        # File system operations here
        files = fs.ls("in")
        # Automatic cleanup when exiting the context

Listing Files and Folders

Use the ls() method to list files and subfolders in the input or output directories:

AsyncFileSystem

async def run(inputs: dict, tracer: Tracer):
    async with await tracer.fs() as fs:
        # List all files in the input directory
        input_files = await fs.ls("in")
        
        # List files in a specific subfolder
        subfolder_files = await fs.ls("in/subfolder")
        
        # List files in the output directory
        output_files = await fs.ls("out")
        
        for file_info in input_files:
            print(f"File: {file_info['path']}, Size: {file_info['size']}")

SyncFileSystem

def process(file: str, tracer: Tracer):
    with tracer.fs_sync() as fs:
        # List all files in the input directory
        input_files = fs.ls("in")
        
        # List files in a specific subfolder
        subfolder_files = fs.ls("in/subfolder")
        
        # List files in the output directory
        output_files = fs.ls("out")
        
        for file_info in input_files:
            print(f"File: {file_info['path']}, Size: {file_info['size']}")

Downloading Files and Folders

The download() method allows you to download files uploaded by the user to a local temporary directory:

AsyncFileSystem

async def run(inputs: dict, tracer: Tracer):
    async with await tracer.fs() as fs:
        # Download a single file
        async for local_path in fs.download("in/document.pdf"):
            print(f"Downloaded to: {local_path}")
        
        # Download an entire folder
        async for local_path in fs.download("in/documents"):
            print(f"Downloaded: {local_path}")
        
        # Download a specific subfolder
        async for local_path in fs.download("in/documents/reports"):
            print(f"Downloaded: {local_path}")

SyncFileSystem

def process(file: str, tracer: Tracer):
    with tracer.fs_sync() as fs:
        # Download a single file
        for local_path in fs.download("in/document.pdf"):
            print(f"Downloaded to: {local_path}")
        
        # Download an entire folder
        for local_path in fs.download("in/documents"):
            print(f"Downloaded: {local_path}")
        
        # Download a specific subfolder
        for local_path in fs.download("in/documents/reports"):
            print(f"Downloaded: {local_path}")

Uploading Files and Folders

The upload() method allows you to upload files created during service execution to the output directory:

AsyncFileSystem

async def run(inputs: dict, tracer: Tracer):
    async with await tracer.fs() as fs:
        # Create some output files
        with open("/tmp/result.txt", "w") as f:
            f.write("Processing completed")
        
        # Upload a single file
        batch_id = await fs.upload("/tmp/result.txt")
        print(f"Uploaded with batch ID: {batch_id}")
        
        # Upload an entire folder
        batch_id = await fs.upload("/tmp/output_folder")
        print(f"Uploaded folder with batch ID: {batch_id}")

SyncFileSystem

def process(file: str, tracer: Tracer):
    with tracer.fs_sync() as fs:
        # Create some output files
        with open("/tmp/result.txt", "w") as f:
            f.write("Processing completed")
        
        # Upload a single file
        batch_id = fs.upload("/tmp/result.txt")
        print(f"Uploaded with batch ID: {batch_id}")
        
        # Upload an entire folder
        batch_id = fs.upload("/tmp/output_folder")
        print(f"Uploaded folder with batch ID: {batch_id}")

Opening and Reading Files

Use the open() method to create file streams for reading files. The streams support both read_all() and read() methods:

AsyncFileSystem

async def run(inputs: dict, tracer: Tracer):
    async with await tracer.fs() as fs:
        # Open and read entire file content
        async with fs.open("in/document.txt") as file_stream:
            content = await file_stream.read_all()
            print(f"File content: {content.decode('utf-8')}")
        
        # Open and read file in chunks
        async with fs.open("in/large_file.bin") as file_stream:
            async for chunk in file_stream.read():
                # Process each chunk
                process_chunk(chunk)

SyncFileSystem

def process(file: str, tracer: Tracer):
    with tracer.fs_sync() as fs:
        # Open and read entire file content
        with fs.open("in/document.txt") as file_stream:
            content = file_stream.read_all()
            print(f"File content: {content.decode('utf-8')}")
        
        # Open and read file in chunks
        with fs.open("in/large_file.bin") as file_stream:
            for chunk in file_stream.read():
                # Process each chunk
                process_chunk(chunk)

Removing Files and Folders

Use the remove() method to delete files and folders:

AsyncFileSystem

async def run(inputs: dict, tracer: Tracer):
    async with await tracer.fs() as fs:
        # Remove a single file
        success = await fs.remove("in/temp_file.txt")
        if success:
            print("File removed successfully")
        
        # Remove a folder (and all its contents)
        success = await fs.remove("in/temp_folder")
        if success:
            print("Folder removed successfully")

SyncFileSystem

def process(file: str, tracer: Tracer):
    with tracer.fs_sync() as fs:
        # Remove a single file
        success = fs.remove("in/temp_file.txt")
        if success:
            print("File removed successfully")
        
        # Remove a folder (and all its contents)
        success = fs.remove("in/temp_folder")
        if success:
            print("Folder removed successfully")

Complete Example

Here’s a complete example showing how to use the file management API in a typical workflow:
async def run(inputs: dict, tracer: Tracer):
    async with await tracer.fs() as fs:
        # List all input files
        input_files = await fs.ls("in")
        print(f"Found {len(input_files)} input files")
        
        # Process each input file
        for file_info in input_files:
            file_path = file_info['path']
            
            # Read the file content
            async with fs.open(file_path) as file_stream:
                content = await file_stream.read_all()
                
            # Process the content
            processed_content = process_content(content)
            
            # Create output file
            output_filename = f"processed_{file_info['name']}"
            with open(f"/tmp/{output_filename}", "wb") as f:
                f.write(processed_content)
            
            # Upload the processed file
            batch_id = await fs.upload(f"/tmp/{output_filename}")
            print(f"Uploaded {output_filename} with batch ID: {batch_id}")
        
        # Clean up temporary files
        for file_info in input_files:
            if file_info['name'].startswith('temp_'):
                await fs.remove(file_info['path'])

Error Handling

The file management API raises appropriate exceptions for common error conditions:
  • FileNotFoundError: When trying to access a file or folder that doesn’t exist
  • RuntimeError: When trying to use a closed stream or re-enter a stream context
  • HTTP exceptions: For network-related errors during file operations
async def run(inputs: dict, tracer: Tracer):
    async with await tracer.fs() as fs:
        try:
            # Try to access a non-existent file
            async with fs.open("in/nonexistent.txt") as file_stream:
                content = await file_stream.read_all()
        except FileNotFoundError:
            print("File not found")
        except Exception as e:
            print(f"Error accessing file: {e}")
Where to get from here?