Using OpenAI Structured Outputs for IoT Data Pipelines

Shortly after our Golioth for AI launch, which included integrations with platforms such as OpenAI, Anthropic, Hugging Face, and Replicate, OpenAI announced support for Structured Outputs. Structured Outputs allow callers of OpenAI’s APIs to provide a JSON schema to define the structure in which responses should be formatted.

Because OpenAI APIs are invoked via Pipeline Transformers (not to be confused with transformer models in this context) on Golioth, the responses are likely to be subsequently passed to a Pipelines Destination, or even another Transformer. It is helpful if these subsequent steps in a Pipeline can be certain of the structure of the payload they will receive.

The following Pipeline demonstrates the use of Structured Outputs.

filter:
  path: "/accel"
  content_type: application/cbor
steps:
  - name: convert
    transformer:
      type: cbor-to-json
  - name: embed
    transformer:
      type: embed-in-json
      version: v1
      parameters:
        key: readings
  - name: create-payload
    transformer:
      type: json-patch
      version: v1
      parameters:
        patch: |
          [
            {
              "op": "add",
              "path": "/model",
              "value": "gpt-4o-2024-08-06"
            },
            {
              "op": "add",
              "path": "/messages",
              "value": [
                {
                  "role": "user",
                  "content": [
                    {
                      "type": "text",
                      "text": "Rank the top three values in the following sensor readings."
                    },
                    {
                      "type": "text",
                      "text": "PATCH"
                    }
                  ]
                }
              ]
            },
            {
              "op": "add",
              "path": "/response_format",
              "value": {
              "type": "json_schema",
              "json_schema": {
                "name": "math_response",
                "strict": true,
                "schema": {
                  "type": "object",
                  "properties": {
                    "readings": {
                      "type": "array",
                      "items": {
                        "type": "object",
                        "properties": {
                          "reading": {
                            "type": "number"
                          },
                          "rank": {
                            "type": "number"
                          }
                        },
                        "required": ["reading", "rank"],
                        "additionalProperties": false
                      }
                    }
                  },
                  "required": ["readings"],
                  "additionalProperties": false
                }
              }
              }
            },
            {
              "op": "move",
              "from": "/readings",
              "path": "/messages/0/content/1/text"
            },
            {
              "op": "remove",
              "path": "/readings"
            }
          ]
  - name: extract
    transformer:
      type: webhook
      version: v1
      parameters:
        url: https://api.openai.com/v1/chat/completions
        headers:
          Authorization: $OPENAI_TOKEN
  - name: parse-payload
    transformer:
      type: json-patch
      version: v1
      parameters:
        patch: |
          [
            {"op": "add", "path": "/text", "value": ""},
            {"op": "move", "from": "/choices/0/message/content", "path": "/text"}
          ]  
  - name: send-webhook
    destination:
      type: webhook
      version: v1
      parameters:
        url: $SLACK_WEBHOOK

When looking at more complex Pipelines, it is helpful to break them down by each step. Before our first step, we have our Filter.

filter:
  path: "/accel"
  content_type: application/cbor

This restricts the data that will be passed to this Pipeline to messages from devices on the /accel path, indicating that they are accelerometer sensor readings, with content type of application/cbor. Being able to deliver data from devices in a binary encoded format such as CBOR reduces the amount of bandwidth each device consumes. However, because many cloud services operate on JSON data, our first step takes care of converting our CBOR payload to JSON.

- name: convert
  transformer:
    type: cbor-to-json

In order to be able to manipulate this data as part of a larger JSON object, we then embed it with key readings.

- name: embed
  transformer:
    type: embed-in-json
    version: v1
    parameters:
      key: readings

Now it’s time to create a request payload that we can deliver to OpenAI. This will include not only our accelerometer readings, but also information about what model we want to use, a prompt for what the model should do, and our JSON schema that defines how we want it to respond. With our readings embedded in a JSON object, we can operate on that object using the JSON Patch transformer.

- name: create-payload
   transformer:
     type: json-patch
     version: v1
     parameters:
       patch: |
         [
           {
             "op": "add",
             "path": "/model",
             "value": "gpt-4o-2024-08-06"
           },
           {
             "op": "add",
             "path": "/messages",
             "value": [
               {
                 "role": "user",
                 "content": [
                   {
                     "type": "text",
                     "text": "Rank the top three values in the following sensor readings."
                   },
                   {
                     "type": "text",
                     "text": "PATCH"
                   }
                 ]
               }
             ]
           },
           {
             "op": "add",
             "path": "/response_format",
             "value": {
             "type": "json_schema",
             "json_schema": {
               "name": "math_response",
               "strict": true,
               "schema": {
                 "type": "object",
                 "properties": {
                   "readings": {
                     "type": "array",
                     "items": {
                       "type": "object",
                       "properties": {
                         "reading": {
                           "type": "number"
                         },
                         "rank": {
                           "type": "number"
                         }
                       },
                       "required": ["reading", "rank"],
                       "additionalProperties": false
                     }
                   }
                 },
                 "required": ["readings"],
                 "additionalProperties": false
               }
             }
             }
           },
           {
             "op": "move",
             "from": "/readings",
             "path": "/messages/0/content/1/text"
           },
           {
             "op": "remove",
             "path": "/readings"
           }
         ]

Altogether, this sequence of patch operations will transform a data payload that looks like this:

{
    "readings": "[3.2, 4.78, 2.36, 5.99, 6.7]"
}

Into a request payload that looks like this:

{
    "model": "gpt-4o-2024-08-06",
    "messages": [
        {
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": "Rank the top three values in the following sensor readings."
                },
                {
                    "type": "text",
                    "text": "[3.2, 4.78, 2.36, 5.99, 6.7]"
                }
            ]
        }
    ],
    "response_format": {
        "type": "json_schema",
        "json_schema": {
            "name": "math_response",
            "strict": true,
            "schema": {
                "type": "object",
                "properties": {
                    "readings": {
                        "type": "array",
                        "items": {
                            "type": "object",
                            "properties": {
                                "reading": {
                                    "type": "number"
                                },
                                "rank": {
                                    "type": "number"
                                }
                            },
                            "required": [
                                "reading",
                                "rank"
                            ],
                            "additionalProperties": false
                        }
                    }
                },
                "required": [
                    "readings"
                ],
                "additionalProperties": false
            }
        }
    }
}

We have asked the model to rank the top three values from our sensor readings, and provide the rankings in an array of objects, each with the reading value and its ranking. We can deliver this payload to OpenAI, leveraging Pipeline Secrets to provide our API token.

- name: extract
  transformer:
    type: webhook
    version: v1
    parameters:
      url: https://api.openai.com/v1/chat/completions
      headers:
        Authorization: $OPENAI_TOKEN

We can once again leverage the JSON Patch transformer to pull the structured output response (/choices/0/messages/content) out of the full JSON object returned by OpenAI. In this case we move it to the key /text as that is expected by our final destination.

- name: parse-payload
  transformer:
    type: json-patch
    version: v1
    parameters:
      patch: |
        [
          {"op": "add", "path": "/text", "value": ""},
          {"op": "move", "from": "/choices/0/message/content", "path": "/text"}
        ]

Finally, we pass our modified JSON object to the Webhook Destination, which we use to deliver a message to Slack for demonstration purposes.

- name: send-webhook
  destination:
    type: webhook
    version: v1
    parameters:
      url: $SLACK_WEBHOOK

In Slack, we can see the ranked sensor readings from ChatGPT, just as we requested.

{"readings":[{"reading":6.7,"rank":1},{"reading":5.99,"rank":2},{"reading":4.78,"rank":3}]}

Where To Next?

One of the most interesting aspects of using an AI model for data processing in this context is that the structure of the data sent by the device could change, and the model could still rank the values, and return in them in a predictable format. This can be extremely valuable if you are supporting a device fleet in which the data payloads may differ, either due to multiple firmware versions or because payload structure is dependent on the device’s environment. And ranking sensor readings is one of the simplest tasks that these models can perform — we’re excited to see users try out more complex operations!

Dan Mangum
Dan Mangum
Dan is an experienced engineering leader, having built products and teams at both large companies and small startups. He has a history of leadership in open source communities, and has worked across many layers of the technical stack, giving him unique insight into the constraints faced by Golioth’s customers and the requirements of a platform that enables their success.

Post Comments

No comments yet! Start the discussion at forum.golioth.io

More from this author

Related posts

spot_img

Latest posts

Zephyr: How to Monitor Heap and Locate Memory Leaks

Dynamic memory allocation in C is a powerful tool but also a potential source of hard to replicate bugs. This post goes over tools that are built into Zephyr that monitor consumption over time.

Advanced IoT Data Uploads Using the Golioth Blockwise Stream

Golioth makes it easy to upload large data payloads from your IoT devices to the cloud. We do this using the blockwise Stream service, which includes a versatile set of transform and destination options so the data goes where you want, in the format that you need. In this post we extend the capabilities of the Blockwise stream to show how async makes a large upload even more reliable over a connection to Golioth.

Golioth Firmware SDK v0.21.0

Golioth released v0.21.0 of the Golioth Firmware SDK, a multiplatform enabler for embedded devices communicating back to the Golioth cloud. This post recaps some of the changes and how it will improve user experience on low power devices.

Want to stay up to date with the latest news?

Subscribe to our newsletter and get updates every 2 weeks. Follow the latest blogs and industry trends.