Using OpenAI Structured Outputs for IoT Data Pipelines
Shortly after our Golioth for AI launch, which included integrations with platforms such as OpenAI, Anthropic, Hugging Face, and Replicate, OpenAI announced support for Structured Outputs. Structured Outputs allow callers of OpenAI’s APIs to provide a JSON schema to define the structure in which responses should be formatted.
Because OpenAI APIs are invoked via Pipeline Transformers (not to be confused with transformer models in this context) on Golioth, the responses are likely to be subsequently passed to a Pipelines Destination, or even another Transformer. It is helpful if these subsequent steps in a Pipeline can be certain of the structure of the payload they will receive.
The following Pipeline demonstrates the use of Structured Outputs.
filter: path: "/accel" content_type: application/cbor steps: - name: convert transformer: type: cbor-to-json - name: embed transformer: type: embed-in-json version: v1 parameters: key: readings - name: create-payload transformer: type: json-patch version: v1 parameters: patch: | [ { "op": "add", "path": "/model", "value": "gpt-4o-2024-08-06" }, { "op": "add", "path": "/messages", "value": [ { "role": "user", "content": [ { "type": "text", "text": "Rank the top three values in the following sensor readings." }, { "type": "text", "text": "PATCH" } ] } ] }, { "op": "add", "path": "/response_format", "value": { "type": "json_schema", "json_schema": { "name": "math_response", "strict": true, "schema": { "type": "object", "properties": { "readings": { "type": "array", "items": { "type": "object", "properties": { "reading": { "type": "number" }, "rank": { "type": "number" } }, "required": ["reading", "rank"], "additionalProperties": false } } }, "required": ["readings"], "additionalProperties": false } } } }, { "op": "move", "from": "/readings", "path": "/messages/0/content/1/text" }, { "op": "remove", "path": "/readings" } ] - name: extract transformer: type: webhook version: v1 parameters: url: https://api.openai.com/v1/chat/completions headers: Authorization: $OPENAI_TOKEN - name: parse-payload transformer: type: json-patch version: v1 parameters: patch: | [ {"op": "add", "path": "/text", "value": ""}, {"op": "move", "from": "/choices/0/message/content", "path": "/text"} ] - name: send-webhook destination: type: webhook version: v1 parameters: url: $SLACK_WEBHOOK
When looking at more complex Pipelines, it is helpful to break them down by each step. Before our first step, we have our Filter.
filter: path: "/accel" content_type: application/cbor
This restricts the data that will be passed to this Pipeline to messages from devices on the /accel
path, indicating that they are accelerometer sensor readings, with content type of application/cbor
. Being able to deliver data from devices in a binary encoded format such as CBOR reduces the amount of bandwidth each device consumes. However, because many cloud services operate on JSON data, our first step takes care of converting our CBOR payload to JSON.
- name: convert transformer: type: cbor-to-json
In order to be able to manipulate this data as part of a larger JSON object, we then embed it with key readings
.
- name: embed transformer: type: embed-in-json version: v1 parameters: key: readings
Now it’s time to create a request payload that we can deliver to OpenAI. This will include not only our accelerometer readings, but also information about what model we want to use, a prompt for what the model should do, and our JSON schema that defines how we want it to respond. With our readings embedded in a JSON object, we can operate on that object using the JSON Patch transformer.
- name: create-payload transformer: type: json-patch version: v1 parameters: patch: | [ { "op": "add", "path": "/model", "value": "gpt-4o-2024-08-06" }, { "op": "add", "path": "/messages", "value": [ { "role": "user", "content": [ { "type": "text", "text": "Rank the top three values in the following sensor readings." }, { "type": "text", "text": "PATCH" } ] } ] }, { "op": "add", "path": "/response_format", "value": { "type": "json_schema", "json_schema": { "name": "math_response", "strict": true, "schema": { "type": "object", "properties": { "readings": { "type": "array", "items": { "type": "object", "properties": { "reading": { "type": "number" }, "rank": { "type": "number" } }, "required": ["reading", "rank"], "additionalProperties": false } } }, "required": ["readings"], "additionalProperties": false } } } }, { "op": "move", "from": "/readings", "path": "/messages/0/content/1/text" }, { "op": "remove", "path": "/readings" } ]
Altogether, this sequence of patch operations will transform a data payload that looks like this:
{ "readings": "[3.2, 4.78, 2.36, 5.99, 6.7]" }
Into a request payload that looks like this:
{ "model": "gpt-4o-2024-08-06", "messages": [ { "role": "user", "content": [ { "type": "text", "text": "Rank the top three values in the following sensor readings." }, { "type": "text", "text": "[3.2, 4.78, 2.36, 5.99, 6.7]" } ] } ], "response_format": { "type": "json_schema", "json_schema": { "name": "math_response", "strict": true, "schema": { "type": "object", "properties": { "readings": { "type": "array", "items": { "type": "object", "properties": { "reading": { "type": "number" }, "rank": { "type": "number" } }, "required": [ "reading", "rank" ], "additionalProperties": false } } }, "required": [ "readings" ], "additionalProperties": false } } } }
We have asked the model to rank the top three values from our sensor readings, and provide the rankings in an array of objects, each with the reading value and its ranking. We can deliver this payload to OpenAI, leveraging Pipeline Secrets to provide our API token.
- name: extract transformer: type: webhook version: v1 parameters: url: https://api.openai.com/v1/chat/completions headers: Authorization: $OPENAI_TOKEN
We can once again leverage the JSON Patch transformer to pull the structured output response (/choices/0/messages/content
) out of the full JSON object returned by OpenAI. In this case we move it to the key /text
as that is expected by our final destination.
- name: parse-payload transformer: type: json-patch version: v1 parameters: patch: | [ {"op": "add", "path": "/text", "value": ""}, {"op": "move", "from": "/choices/0/message/content", "path": "/text"} ]
Finally, we pass our modified JSON object to the Webhook Destination, which we use to deliver a message to Slack for demonstration purposes.
- name: send-webhook destination: type: webhook version: v1 parameters: url: $SLACK_WEBHOOK
In Slack, we can see the ranked sensor readings from ChatGPT, just as we requested.
{"readings":[{"reading":6.7,"rank":1},{"reading":5.99,"rank":2},{"reading":4.78,"rank":3}]}
Where To Next?
One of the most interesting aspects of using an AI model for data processing in this context is that the structure of the data sent by the device could change, and the model could still rank the values, and return in them in a predictable format. This can be extremely valuable if you are supporting a device fleet in which the data payloads may differ, either due to multiple firmware versions or because payload structure is dependent on the device’s environment. And ranking sensor readings is one of the simplest tasks that these models can perform — we’re excited to see users try out more complex operations!
Start the discussion at forum.golioth.io