Using OpenAI Structured Outputs for IoT Data Pipelines

Shortly after our Golioth for AI launch, which included integrations with platforms such as OpenAI, Anthropic, Hugging Face, and Replicate, OpenAI announced support for Structured Outputs. Structured Outputs allow callers of OpenAI’s APIs to provide a JSON schema to define the structure in which responses should be formatted.

Because OpenAI APIs are invoked via Pipeline Transformers (not to be confused with transformer models in this context) on Golioth, the responses are likely to be subsequently passed to a Pipelines Destination, or even another Transformer. It is helpful if these subsequent steps in a Pipeline can be certain of the structure of the payload they will receive.

The following Pipeline demonstrates the use of Structured Outputs.

filter:
  path: "/accel"
  content_type: application/cbor
steps:
  - name: convert
    transformer:
      type: cbor-to-json
  - name: embed
    transformer:
      type: embed-in-json
      version: v1
      parameters:
        key: readings
  - name: create-payload
    transformer:
      type: json-patch
      version: v1
      parameters:
        patch: |
          [
            {
              "op": "add",
              "path": "/model",
              "value": "gpt-4o-2024-08-06"
            },
            {
              "op": "add",
              "path": "/messages",
              "value": [
                {
                  "role": "user",
                  "content": [
                    {
                      "type": "text",
                      "text": "Rank the top three values in the following sensor readings."
                    },
                    {
                      "type": "text",
                      "text": "PATCH"
                    }
                  ]
                }
              ]
            },
            {
              "op": "add",
              "path": "/response_format",
              "value": {
              "type": "json_schema",
              "json_schema": {
                "name": "math_response",
                "strict": true,
                "schema": {
                  "type": "object",
                  "properties": {
                    "readings": {
                      "type": "array",
                      "items": {
                        "type": "object",
                        "properties": {
                          "reading": {
                            "type": "number"
                          },
                          "rank": {
                            "type": "number"
                          }
                        },
                        "required": ["reading", "rank"],
                        "additionalProperties": false
                      }
                    }
                  },
                  "required": ["readings"],
                  "additionalProperties": false
                }
              }
              }
            },
            {
              "op": "move",
              "from": "/readings",
              "path": "/messages/0/content/1/text"
            },
            {
              "op": "remove",
              "path": "/readings"
            }
          ]
  - name: extract
    transformer:
      type: webhook
      version: v1
      parameters:
        url: https://api.openai.com/v1/chat/completions
        headers:
          Authorization: $OPENAI_TOKEN
  - name: parse-payload
    transformer:
      type: json-patch
      version: v1
      parameters:
        patch: |
          [
            {"op": "add", "path": "/text", "value": ""},
            {"op": "move", "from": "/choices/0/message/content", "path": "/text"}
          ]  
  - name: send-webhook
    destination:
      type: webhook
      version: v1
      parameters:
        url: $SLACK_WEBHOOK

When looking at more complex Pipelines, it is helpful to break them down by each step. Before our first step, we have our Filter.

filter:
  path: "/accel"
  content_type: application/cbor

This restricts the data that will be passed to this Pipeline to messages from devices on the /accel path, indicating that they are accelerometer sensor readings, with content type of application/cbor. Being able to deliver data from devices in a binary encoded format such as CBOR reduces the amount of bandwidth each device consumes. However, because many cloud services operate on JSON data, our first step takes care of converting our CBOR payload to JSON.

- name: convert
  transformer:
    type: cbor-to-json

In order to be able to manipulate this data as part of a larger JSON object, we then embed it with key readings.

- name: embed
  transformer:
    type: embed-in-json
    version: v1
    parameters:
      key: readings

Now it’s time to create a request payload that we can deliver to OpenAI. This will include not only our accelerometer readings, but also information about what model we want to use, a prompt for what the model should do, and our JSON schema that defines how we want it to respond. With our readings embedded in a JSON object, we can operate on that object using the JSON Patch transformer.

- name: create-payload
   transformer:
     type: json-patch
     version: v1
     parameters:
       patch: |
         [
           {
             "op": "add",
             "path": "/model",
             "value": "gpt-4o-2024-08-06"
           },
           {
             "op": "add",
             "path": "/messages",
             "value": [
               {
                 "role": "user",
                 "content": [
                   {
                     "type": "text",
                     "text": "Rank the top three values in the following sensor readings."
                   },
                   {
                     "type": "text",
                     "text": "PATCH"
                   }
                 ]
               }
             ]
           },
           {
             "op": "add",
             "path": "/response_format",
             "value": {
             "type": "json_schema",
             "json_schema": {
               "name": "math_response",
               "strict": true,
               "schema": {
                 "type": "object",
                 "properties": {
                   "readings": {
                     "type": "array",
                     "items": {
                       "type": "object",
                       "properties": {
                         "reading": {
                           "type": "number"
                         },
                         "rank": {
                           "type": "number"
                         }
                       },
                       "required": ["reading", "rank"],
                       "additionalProperties": false
                     }
                   }
                 },
                 "required": ["readings"],
                 "additionalProperties": false
               }
             }
             }
           },
           {
             "op": "move",
             "from": "/readings",
             "path": "/messages/0/content/1/text"
           },
           {
             "op": "remove",
             "path": "/readings"
           }
         ]

Altogether, this sequence of patch operations will transform a data payload that looks like this:

{
    "readings": "[3.2, 4.78, 2.36, 5.99, 6.7]"
}

Into a request payload that looks like this:

{
    "model": "gpt-4o-2024-08-06",
    "messages": [
        {
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": "Rank the top three values in the following sensor readings."
                },
                {
                    "type": "text",
                    "text": "[3.2, 4.78, 2.36, 5.99, 6.7]"
                }
            ]
        }
    ],
    "response_format": {
        "type": "json_schema",
        "json_schema": {
            "name": "math_response",
            "strict": true,
            "schema": {
                "type": "object",
                "properties": {
                    "readings": {
                        "type": "array",
                        "items": {
                            "type": "object",
                            "properties": {
                                "reading": {
                                    "type": "number"
                                },
                                "rank": {
                                    "type": "number"
                                }
                            },
                            "required": [
                                "reading",
                                "rank"
                            ],
                            "additionalProperties": false
                        }
                    }
                },
                "required": [
                    "readings"
                ],
                "additionalProperties": false
            }
        }
    }
}

We have asked the model to rank the top three values from our sensor readings, and provide the rankings in an array of objects, each with the reading value and its ranking. We can deliver this payload to OpenAI, leveraging Pipeline Secrets to provide our API token.

- name: extract
  transformer:
    type: webhook
    version: v1
    parameters:
      url: https://api.openai.com/v1/chat/completions
      headers:
        Authorization: $OPENAI_TOKEN

We can once again leverage the JSON Patch transformer to pull the structured output response (/choices/0/messages/content) out of the full JSON object returned by OpenAI. In this case we move it to the key /text as that is expected by our final destination.

- name: parse-payload
  transformer:
    type: json-patch
    version: v1
    parameters:
      patch: |
        [
          {"op": "add", "path": "/text", "value": ""},
          {"op": "move", "from": "/choices/0/message/content", "path": "/text"}
        ]

Finally, we pass our modified JSON object to the Webhook Destination, which we use to deliver a message to Slack for demonstration purposes.

- name: send-webhook
  destination:
    type: webhook
    version: v1
    parameters:
      url: $SLACK_WEBHOOK

In Slack, we can see the ranked sensor readings from ChatGPT, just as we requested.

{"readings":[{"reading":6.7,"rank":1},{"reading":5.99,"rank":2},{"reading":4.78,"rank":3}]}

Where To Next?

One of the most interesting aspects of using an AI model for data processing in this context is that the structure of the data sent by the device could change, and the model could still rank the values, and return in them in a predictable format. This can be extremely valuable if you are supporting a device fleet in which the data payloads may differ, either due to multiple firmware versions or because payload structure is dependent on the device’s environment. And ranking sensor readings is one of the simplest tasks that these models can perform — we’re excited to see users try out more complex operations!

Talk with an Expert

Implementing an IoT project takes a team of people, and we want to help out as part of your team. If you want to troubleshoot a current problem or talk through a new project idea, we're here for you.

Start the discussion at forum.golioth.io