Skip to content

process

Run processing jobs on AWS SageMaker.

Synopsis

easy_sm [--docker-tag TAG] process --file FILE --base-job-name NAME --ec2-type TYPE [OPTIONS]

Description

The process command submits a processing job to AWS SageMaker that executes a Python script for data preprocessing, feature engineering, model evaluation, or other data processing tasks.

Processing jobs run in the same Docker container as training but execute custom Python scripts instead of the training entry point.

Options

Option Short Type Required Default Description
--file -f string Yes - Python file name to run (relative to processing/ directory)
--base-job-name -n string Yes - Prefix for the SageMaker processing job
--ec2-type -e string Yes - EC2 instance type (e.g., ml.m5.large)
--iam-role-arn -r string No From SAGEMAKER_ROLE AWS IAM role ARN for SageMaker
--app-name -a string No Auto-detected App name for configuration
--instance-count -c integer No 1 Number of EC2 instances
--s3-input-location -i string No None S3 location for input data
--s3-output-location -o string No None S3 location to save output
--input-sharded -is boolean No false Shard input data across instances
--docker-tag -t string No latest Docker image tag (global option)

Examples

Basic processing job

export SAGEMAKER_ROLE=arn:aws:iam::123456789012:role/SageMakerRole

easy_sm process \
  -f preprocess.py \
  -n data-preprocessing \
  -e ml.m5.large

Output:

data-preprocessing

Processing with input and output

easy_sm process \
  -f feature_engineering.py \
  -n feature-job \
  -e ml.m5.large \
  -i s3://my-bucket/raw-data \
  -o s3://my-bucket/processed-data

Distributed processing

Process data across multiple instances:

easy_sm process \
  -f process_large_dataset.py \
  -n distributed-processing \
  -e ml.m5.xlarge \
  -c 4 \
  -i s3://bucket/large-dataset \
  -o s3://bucket/output \
  --input-sharded

With --input-sharded, each instance receives a portion of the input data.

Processing with specific Docker tag

easy_sm -t v1.0.0 process \
  -f validate_model.py \
  -n model-validation \
  -e ml.m5.large \
  -i s3://bucket/models \
  -o s3://bucket/validation-results

Model evaluation job

easy_sm process \
  -f evaluate.py \
  -n model-evaluation \
  -e ml.m5.large \
  -i s3://bucket/test-data \
  -o s3://bucket/metrics

Output Format

The command outputs the job name prefix:

data-preprocessing

This can be used in scripts:

JOB_NAME=$(easy_sm process -f preprocess.py -n preprocessing -e ml.m5.large)
echo "Started processing job: $JOB_NAME"

Prerequisites

  • Docker image pushed to ECR with easy_sm push
  • Processing script in {app_name}/easy_sm_base/processing/ directory
  • IAM role with SageMaker permissions
  • Input data in S3 (if using -i flag)

Processing Script Requirements

Place your processing script in the processing/ directory:

{app_name}/easy_sm_base/processing/
├── preprocess.py
├── feature_engineering.py
└── evaluate.py

Script Template

# processing/preprocess.py
import os
import pandas as pd
import json

def process():
    """
    Main processing function.
    """
    # Standard SageMaker paths
    input_path = '/opt/ml/processing/input'
    output_path = '/opt/ml/processing/output'

    print(f"Reading data from {input_path}")

    # Read input data
    df = pd.read_csv(os.path.join(input_path, 'raw_data.csv'))

    print(f"Processing {len(df)} rows")

    # Process data
    df_processed = preprocess_data(df)

    # Save processed data
    output_file = os.path.join(output_path, 'processed_data.csv')
    df_processed.to_csv(output_file, index=False)

    print(f"Saved processed data to {output_file}")

    # Save metadata
    metadata = {
        'input_rows': len(df),
        'output_rows': len(df_processed),
        'features': list(df_processed.columns)
    }
    with open(os.path.join(output_path, 'metadata.json'), 'w') as f:
        json.dump(metadata, f)

    print("Processing completed")

def preprocess_data(df):
    """Your preprocessing logic here."""
    # Clean data
    df = df.dropna()

    # Feature engineering
    df['new_feature'] = df['feature1'] * df['feature2']

    # Normalize
    for col in df.select_dtypes(include=['float64', 'int64']).columns:
        df[col] = (df[col] - df[col].mean()) / df[col].std()

    return df

if __name__ == '__main__':
    process()

Container Paths

Processing jobs use standard SageMaker paths:

Path Purpose
/opt/ml/processing/input/ Input data from S3 (--s3-input-location)
/opt/ml/processing/output/ Output data uploaded to S3 (--s3-output-location)
/opt/ml/processing/code/ Your processing script

Data Flow

S3 Input (--s3-input-location)
/opt/ml/processing/input/
Your processing script (-f file.py)
/opt/ml/processing/output/
S3 Output (--s3-output-location)

Use Cases

1. Data Preprocessing

Clean and transform raw data before training:

# processing/preprocess.py
import pandas as pd
import os

def process():
    input_path = '/opt/ml/processing/input'
    output_path = '/opt/ml/processing/output'

    # Read raw data
    df = pd.read_csv(os.path.join(input_path, 'raw_data.csv'))

    # Clean
    df = df.dropna()
    df = df[df['value'] > 0]

    # Transform
    df['log_value'] = np.log(df['value'])

    # Save
    df.to_csv(os.path.join(output_path, 'clean_data.csv'), index=False)

Run:

easy_sm process -f preprocess.py -n preprocess -e ml.m5.large \
  -i s3://bucket/raw -o s3://bucket/clean

2. Feature Engineering

Generate features from processed data:

# processing/feature_engineering.py
import pandas as pd
import os

def process():
    input_path = '/opt/ml/processing/input'
    output_path = '/opt/ml/processing/output'

    df = pd.read_csv(os.path.join(input_path, 'clean_data.csv'))

    # Create features
    df['feature_interaction'] = df['feature1'] * df['feature2']
    df['feature_ratio'] = df['feature1'] / (df['feature2'] + 1)

    # Time-based features
    df['date'] = pd.to_datetime(df['date'])
    df['day_of_week'] = df['date'].dt.dayofweek
    df['month'] = df['date'].dt.month

    # Save
    df.to_csv(os.path.join(output_path, 'features.csv'), index=False)

3. Model Evaluation

Evaluate trained models on test data:

# processing/evaluate.py
import joblib
import pandas as pd
import json
import os
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

def process():
    input_path = '/opt/ml/processing/input'
    output_path = '/opt/ml/processing/output'

    # Load model
    model = joblib.load(os.path.join(input_path, 'model/model.mdl'))

    # Load test data
    test_df = pd.read_csv(os.path.join(input_path, 'test/test.csv'))
    X_test = test_df.drop('target', axis=1)
    y_test = test_df['target']

    # Evaluate
    predictions = model.predict(X_test)
    accuracy = accuracy_score(y_test, predictions)
    precision, recall, f1, _ = precision_recall_fscore_support(
        y_test, predictions, average='weighted'
    )

    # Save metrics
    metrics = {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1
    }
    with open(os.path.join(output_path, 'metrics.json'), 'w') as f:
        json.dump(metrics, f, indent=2)

    print(f"Model Evaluation Results:")
    print(json.dumps(metrics, indent=2))

Run:

easy_sm process -f evaluate.py -n evaluate -e ml.m5.large \
  -i s3://bucket/test-data -o s3://bucket/metrics

4. Data Validation

Validate data quality before training:

# processing/validate.py
import pandas as pd
import json
import os

def process():
    input_path = '/opt/ml/processing/input'
    output_path = '/opt/ml/processing/output'

    df = pd.read_csv(os.path.join(input_path, 'data.csv'))

    # Validation checks
    validation_report = {
        'total_rows': len(df),
        'missing_values': df.isnull().sum().to_dict(),
        'duplicates': int(df.duplicated().sum()),
        'data_types': df.dtypes.astype(str).to_dict(),
        'numeric_ranges': {}
    }

    # Check numeric ranges
    for col in df.select_dtypes(include=['float64', 'int64']).columns:
        validation_report['numeric_ranges'][col] = {
            'min': float(df[col].min()),
            'max': float(df[col].max()),
            'mean': float(df[col].mean())
        }

    # Save report
    with open(os.path.join(output_path, 'validation_report.json'), 'w') as f:
        json.dump(validation_report, f, indent=2)

    # Fail if data quality issues
    if validation_report['duplicates'] > 100:
        raise ValueError(f"Too many duplicates: {validation_report['duplicates']}")

Distributed Processing

For large datasets, use multiple instances with sharding:

easy_sm process \
  -f process_large_data.py \
  -n distributed-job \
  -e ml.m5.xlarge \
  -c 4 \
  --input-sharded \
  -i s3://bucket/large-dataset \
  -o s3://bucket/output

With --input-sharded: - Input data is automatically split across instances - Each instance processes its shard independently - Outputs are combined in S3

Distributed Processing Script

# processing/process_large_data.py
import os
import json

def process():
    input_path = '/opt/ml/processing/input'
    output_path = '/opt/ml/processing/output'

    # Get current instance info
    resource_config = json.loads(
        os.environ.get('SM_RESOURCE_CONFIG', '{}')
    )
    current_host = resource_config.get('current_host', 'unknown')
    hosts = resource_config.get('hosts', [])

    print(f"Processing on {current_host} (total hosts: {len(hosts)})")

    # Process local shard
    files = os.listdir(input_path)
    for file in files:
        process_file(os.path.join(input_path, file))

    # Save output
    output_file = os.path.join(output_path, f'output_{current_host}.csv')
    save_results(output_file)

Instance Types

Common instance types for processing:

Instance Type vCPUs Memory Use Case
ml.m5.large 2 8 GB Small datasets
ml.m5.xlarge 4 16 GB Medium datasets
ml.m5.4xlarge 16 64 GB Large datasets
ml.c5.2xlarge 8 16 GB CPU-intensive processing
ml.r5.2xlarge 8 64 GB Memory-intensive processing

Monitoring

Monitor processing jobs in AWS Console: - SageMaker → Processing → Processing jobs - CloudWatch Logs: /aws/sagemaker/ProcessingJobs

Or via CLI:

# Describe job
aws sagemaker describe-processing-job \
  --processing-job-name my-processing-job

# View logs
aws logs tail /aws/sagemaker/ProcessingJobs \
  --log-stream-name my-processing-job/... \
  --follow

Troubleshooting

"Processing file not found"

Problem: Script doesn't exist in processing/ directory.

Solution: Ensure the file exists:

ls {app_name}/easy_sm_base/processing/
# Should show: preprocess.py

Script execution errors

Problem: Script fails during processing.

Solution: Test locally first:

# Test locally
easy_sm local process -f preprocess.py

# Check logs
docker logs $(docker ps -q --filter name=my-ml-app)

"No space left on device"

Problem: Instance runs out of disk space.

Solution: 1. Use larger instance type with more storage 2. Process data in chunks 3. Clean up intermediate files in script

Input/output path errors

Problem: Can't find input or output paths.

Solution: Use standard paths:

# Always use these paths
input_path = '/opt/ml/processing/input'
output_path = '/opt/ml/processing/output'

# Never use
# input_path = './input'  # Wrong!

Module import errors

Problem: Can't import required packages.

Solution: Add packages to requirements.txt and rebuild:

# Add to requirements.txt
echo "scikit-learn==1.3.0" >> requirements.txt

# Rebuild and push
easy_sm build
easy_sm push

Complete Processing Workflow

export SAGEMAKER_ROLE=arn:aws:iam::123456789012:role/SageMakerRole

# 1. Upload raw data
easy_sm upload-data \
  -i ./raw-data \
  -t s3://my-bucket/raw

# 2. Preprocess data
easy_sm process \
  -f preprocess.py \
  -n preprocess \
  -e ml.m5.large \
  -i s3://my-bucket/raw \
  -o s3://my-bucket/clean

# 3. Feature engineering
easy_sm process \
  -f feature_engineering.py \
  -n features \
  -e ml.m5.large \
  -i s3://my-bucket/clean \
  -o s3://my-bucket/features

# 4. Train model
easy_sm train \
  -n training \
  -e ml.m5.large \
  -i s3://my-bucket/features \
  -o s3://my-bucket/models

# 5. Evaluate model
MODEL=$(easy_sm get-model-artifacts -j training)
easy_sm process \
  -f evaluate.py \
  -n evaluate \
  -e ml.m5.large \
  -i s3://my-bucket/test-data \
  -o s3://my-bucket/metrics

See Also