Pause Your ML Pipelines for Human Evaluate Utilizing AWS Step Capabilities + Slack

wished to pause an automatic workflow to attend for a human choice?

Perhaps you want approval earlier than provisioning cloud sources, selling a machine studying mannequin to manufacturing, or charging a buyer’s bank card.

In lots of information science and machine studying workflows, automation will get you 90% of the way in which — however that important final step usually wants human judgment.

Particularly in manufacturing environments, mannequin retraininganomaly overrides, or giant information actions require cautious human assessment to keep away from costly errors.

In my case, I wanted to manually assessment conditions the place my system flagged greater than 6% of buyer information for anomalies — usually as a consequence of unintended pushes by prospects.

Earlier than I applied a correct workflow, this was dealt with informally: builders would straight replace manufacturing databases (!) — dangerous, error-prone, and unscalable.

To resolve this, I constructed a scalable guide approval system utilizing AWS Step CapabilitiesSlackLambda, and SNS — a cloud-native, low-cost structure that cleanly paused workflows for human approvals with out spinning up idle compute.

On this put up, I’ll stroll you thru the complete design, the AWS sources concerned, and how one can apply it to your individual important workflows.

Let’s get into it 👇

The Resolution

My software is deployed within the AWS ecosystem, so we’ll use Aws Step Capabilities to construct a state machine that:

  1. Executes enterprise logic
  2. Lambda with WaitForTaskToken to pause till approval
  3. Sends a Slack message requesting approval (might be an e mail/)
  4. Waits for a human to click on “Approve” or “Reject”
  5. Resumes mechanically from the identical level
The Step perform stream

Here’s a youtube video exhibiting the demo and precise software in motion:

I’ve additionally hosted the dwell demo app right here →
👉 https://v0-manual-review-app-fwtjca.vercel.app
All code is hosted right here with the precise set of IAM permissions.


Step-by-Step Implementation

  1. Now we’ll create the Step Operate with a guide assessment stream step. Right here is the step perform definition:
Step perform stream with definition

The stream above generates a dataset, uploads it to AWS S3 and if a assessment is required, then invokes the Handbook Evaluate lambda. On the guide assessment step, we’ll use a Process lambda with an invoke on WaitForTaskToken, which pauses execution till resumed. The lambda reads the token this manner:

Python">def lambda_handler(occasion, context):

  config = occasion["Payload"]["config"]
  task_token = occasion["Payload"]["taskToken"] # Step Capabilities auto-generates this

  reviewer = ManualReview(config, task_token)
  reviewer.send_notification()

  return config

This Lambda sends a Slack message that features the duty token so the perform is aware of what execution to renew.

2. Earlier than the we ship out the slack notification, we have to

  1. setup an SNS Matter that receives assessment messages from the lambda
  2. a slack workflow with a web-hook subscribed to the SNS matter, and a confirmed subscription
  3. an https API Gateway with approval and rejection endpoints.
  4. a lambda perform that processes the API Gateway requests: code

I adopted the youtube video right here for my setup.

3. As soon as the above is setup, setup the variables into the web-hook step of the slack workflow:

And use the variables with a useful word within the following step:

The ultimate workflow will appear like this:

4. Ship a Slack Notification printed to an SNS matter (you may alternately use slack-sdk as properly) with job parameters. Here’s what the message will appear like:

def publish_message(self, bucket_name: str, s3_file: str, topic: str = "Handbook Evaluate") -> dict:

    presigned_url = S3.generate_presigned_url(bucket_name, s3_file, expiration=86400)  # 1 day expiration

    message = {
        "approval_link": self.approve_link,
        "rejection_link": self.reject_link,
        "s3_file": presigned_url if presigned_url else s3_file
    }

    logging.data(f"Publishing message to <{self.topic_arn}>, with topic: {topic}, message: {message}")

    response = self.consumer.publish(
        TopicArn=self.topic_arn,
        Message=json.dumps(message),
        Topic=topic
    )

    logging.data(f"Response: {response}")
    return response

This Lambda sends a Slack message that features the duty token so the perform is aware of what execution to renew.

def send_notification(self):

    # As quickly as this message is distributed out, this callback lambda will go right into a wait state,
    # till an express name to this Lambda perform execution is triggered.

    # If you do not need this perform to attend perpetually (or the default Steps timeout), make sure you setup
    # an express timeout on this
    self.sns.publish_message(self.s3_bucket_name, self.s3_key)

def lambda_handler(occasion, context):

    config = occasion["Payload"]["config"]
    task_token = occasion["Payload"]["taskToken"]  # Step Capabilities auto-generates this

    reviewer = ManualReview(config, task_token)
    reviewer.send_notification()

5. As soon as a assessment notification is acquired in slack, the person can approve or reject it. The step perform goes right into a wait state till it receives a person response; nonetheless the duty token is ready to run out in 24 hours, so inactivity will timeout the step perform.

Primarily based on whether or not the person approves or rejects the assessment request, the rawPath will get set and might be parsed right here: code

motion = occasion.get("rawPath", "").strip("/").decrease()  
# Extracts 'approve' or 'reject'

The receiving API Gateway + Lambda combo:

  • Parses the Slack payload
  • Extracts taskToken + choice
  • Makes use of StepFunctions.send_task_success() or send_task_failure()

Instance:

match motion:
    case "approve":
        output_dict["is_manually_approved"] = True
        response_message = "Approval processed efficiently."
    case "reject":
        output_dict["is_manually_rejected"] = True
        response_message = "Rejection processed efficiently."
    case _:
        return {
            "statusCode": 400,
            "physique": json.dumps({"error": "Invalid motion. Use '/approve' or '/reject' in URL."})
        }

...

sfn_client.send_task_success(
    taskToken=task_token,
    output=output
)

Observe: Lambda configured with WaitForTaskToken should wait. In case you don’t ship the token, your workflow simply stalls.

Bonus: In case you want e mail or SMS alerts, use SNS to inform a broader group.
Simply sns.publish() from inside your Lambda or Step Operate.

Testing

As soon as the guide approval system was wired up, it was time to kick the tires. Right here’s how I examined it:

  • Proper after publishing the slack workflow, I confirmed the SNS subscription earlier than messages get forwarded. Don’t skip this step.
  • Then, I triggered the Step Operate manually with a faux payload simulating an information flagging occasion.
  • When the workflow hit the guide approval step, it despatched a Slack message with Approve/Reject buttons.

I examined all main paths:

  • Approve: Clicked Approve — noticed the Step Operate resume and full efficiently.
  • Reject: Clicked Reject — Step Operate moved cleanly right into a failure state.
  • Timeout: Ignored the Slack message — Step Operate waited for the configured timeout after which gracefully timed out with out hanging.

Behind the scenes, I additionally verified that:

  • The Lambda receiving Slack responses was accurately parsing motion payloads.
  • No rogue job tokens had been left hanging.
  • Step Capabilities metrics and Slack error logs had been clear.

I extremely advocate testing not simply completely happy paths, but in addition “what if no person clicks?” and “what if Slack glitches?” — catching these edge circumstances early saved me complications later.


Classes Discovered

  • At all times use timeouts: Set a timeout each on the WaitForTaskToken step and on the complete Step Operate. With out it, workflows can get caught indefinitely if nobody responds.
  • Move needed context: In case your Step Operate wants sure recordsdata, paths, or config settings after resuming, be sure you encode and ship them alongside within the SNS notification.
    Step Capabilities don’t mechanically retain earlier in-memory context when resuming from a Process Token.
  • Handle Slack noise: Watch out about spamming a Slack channel with too many assessment requests. I like to recommend creating separate channels for improvement, UAT, and manufacturing flows to maintain issues clear.
  • Lock down permissions early: Make certain all of your AWS sources (Lambda capabilities, API Gateway, S3 buckets, SNS Subjects) have right and minimal permissions following the precept of least privilege. The place I wanted to customise past AWS’s defaults, I wrote and posted inline IAM insurance policies as JSON. (You’ll discover examples within the GitHub repo).
  • Pre-sign and shorten URLs: In case you’re sending hyperlinks (e.g., to S3 recordsdata) in Slack messages, pre-sign the URLs for safe entry — and shorten them for a cleaner Slack UI. Right here’s a fast instance I used:
shorten_url = requests.get(f"http://tinyurl.com/api-create.php?url={presigned_url}").textual content
default_links[key] = shorten_url if shorten_url else presigned_url

Wrapping Up

Including human-in-the-loop logic doesn’t should imply duct tape and cron jobs. With Step Capabilities + Slack, you may construct reviewable, traceable, and production-safe approval flows.

If this helped, otherwise you’re making an attempt one thing comparable, drop a word within the feedback! Let’s construct higher workflows. 

Observe: All photographs on this article had been created by the writer